Skip to content
Snippets Groups Projects
Commit b84573c8 authored by aknecht2's avatar aknecht2
Browse files

Removed unused generator calls in workflow.

parent 7bea252d
Branches
No related tags found
1 merge request!11Resolve "Module generator improvements"
......@@ -225,66 +225,6 @@ class Workflow(object):
self.peak_call_gen.generate(run)
return
# When generating actual modules, IF results already exist for that job:
# 1. Skip the module.add_jobs call
# 2. Add the download gridfs job
# 3. Find the module job dependencies with module.get_dependent_jobs,
# and remove them from the dax directly.
# This will trickle through each step, that way if a single workflow
# path has both align & remove_duplicates output, it will remove the
# job that was going to download the alignment output.
#
# NOTE: Make sure to use outputs and NOT module_outputs when checking for file existence!
# BUT, make sure to use module_outputs and NOT outputs when getting dependent jobs.
def _add_module(self, generator, rewrite=None):
rewrite = rewrite if rewrite is not None else self.rewrite
remove_jobs = []
required_jobs = []
for markers, inputs, additional_inputs, outputs, ref_genome in generator.generate():
module_outputs = {mod_file_name : outputs[mod_file_name]["file_name"] for mod_file_name in outputs}
results = self._get_existing_results(outputs, ref_genome, generator.module.get_final_outputs(markers))
module_outputs = {mod_file_name : outputs[mod_file_name]["file_name"] for mod_file_name in outputs}
if not rewrite and all([result is not None for result in results]):
for result in results:
self._download_from_gridfs(result["gridfs_id"], result["file_name"])
for job_name in generator.module.get_dependent_jobs(self.dax, self.jobs, self.files, markers, inputs, additional_inputs, module_outputs):
remove_jobs.append(job_name)
else:
generator.module.add_jobs(self.dax, self.jobs, self.files, markers, inputs, additional_inputs, module_outputs)
self._save_results(outputs, ref_genome)
for job_name in generator.module.get_dependent_jobs(self.dax, self.jobs, self.files, markers, inputs, additional_inputs, module_outputs):
required_jobs.append(job_name)
self._remove_unused_jobs(remove_jobs, required_jobs)
return
def _add_download(self):
download_gen = DownloadGenerator(self.files, self.modules["download"], self.run_data, self.debug)
for markers, inputs, additional_inputs, outputs, ref_genome in download_gen.generate():
# The output returned from the generator is more complex than
# what the module needs
module_outputs = {mod_file_name : outputs[mod_file_name]["file_name"] for mod_file_name in outputs}
download_gen.module.add_jobs(self.dax, self.jobs, self.files, markers, inputs, additional_inputs, module_outputs)
self._save_results(outputs, ref_genome)
return
def _add_align(self):
align_gen = AlignGenerator(self.files, self.modules["align"], self.run_data, self.debug)
self._add_module(align_gen)
return
def _add_remove_duplicates(self):
remove_duplicates_gen = RemoveDuplicatesGenerator(self.files, self.modules["remove_duplicates"], self.run_data, self.debug)
self._add_module(remove_duplicates_gen)
return
def _add_peak_calling(self):
peak_call_gen = PeakCallGenerator(self.files, self.modules["peak_call"], self.run_data, self.debug)
self._add_module(peak_call_gen, True)
return
def _create_setup(self):
"""
Creates the base structure for job submission. Everything is contained
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment