diff --git a/chipathlon/module_generator.py b/chipathlon/module_generator.py index 9c1cb8d03e59a659bdc864b0b12dbbb38657c2ce..d424acef0e00b61ea0ff8a70619a4469a0e2faf3 100644 --- a/chipathlon/module_generator.py +++ b/chipathlon/module_generator.py @@ -80,7 +80,15 @@ class ModuleGenerator(object): all_jobs.append(self.workflow_jobs[job_name]) for output_info in job_info["outputs"]: logical_name = output_info.keys()[0] - result = Result(logical_name, final_result.control_samples, final_result.signal_samples, all_markers, all_jobs, should_save=False) + result = Result( + logical_name, + final_result.control_samples, + final_result.signal_samples, + all_markers, + all_jobs, + should_save=output_info[logical_name]["save_result"] if "save_result" in output_info[logical_name] else False, + prefix_join=output_info[logical_name].get("prefix_join") + ) results.append(result) run.add_result(self.module.name, result) results.append(final_result) @@ -163,10 +171,12 @@ class ModuleGenerator(object): self._download_from_gridfs(run, result) return - def generate(self, run): + def generate(self, run, rewrite=False): """ :param run: The target run to generate jobs for. :type run: :py:class:chipathlon.run.Run + :param rewrite: Whether or not to rewrite data even if it exists. + :type rewrite: bool Generates actual workflow jobs for a particular run. The logic for job generation is based on the fact that all output files are unique, @@ -184,9 +194,9 @@ class ModuleGenerator(object): """ final_results = self.create_final_results(run) for result in final_results: - if not result.exists_in_db(self.mdb, run.genome): + if rewrite or not result.exists_in_db(self.mdb, run.genome): for prev_result in self.get_prev_results(run, result): - if prev_result.exists_in_encode() or prev_result.exists_in_db(self.mdb, run.genome): + if prev_result.exists_in_encode() or (not rewrite and prev_result.exists_in_db(self.mdb, run.genome)): self.add_download_job(run, prev_result) self.add_jobs(run, result) return @@ -276,4 +286,3 @@ class ModuleGenerator(object): will be used to get a target result. This should be overriden by the actual generators. """ - return diff --git a/chipathlon/result.py b/chipathlon/result.py index 7515d3b877c7436c0b4e630f70da2579709c63f2..ad71e1e35ce9d6da8cd83d5bba2af1e3e19cc093 100644 --- a/chipathlon/result.py +++ b/chipathlon/result.py @@ -10,7 +10,7 @@ class Result(object): run on the result file up to this point. """ - def __init__(self, logical_name, control_samples, signal_samples, all_markers, all_jobs, should_save=False): + def __init__(self, logical_name, control_samples, signal_samples, all_markers, all_jobs, should_save=False, prefix_join=None): """ :param logical_name: The unique name of the file as presented in the module yaml :type logical_name: string @@ -43,7 +43,7 @@ class Result(object): self.should_save = should_save self.prefix = self._get_prefix() - self.full_name = self.prefix + "_" + self.logical_name + self.full_name = self.prefix + ("_" if prefix_join is None else prefix_join) + self.logical_name self.file_type = os.path.splitext(self.logical_name)[1][1:] self.pegasus_file = File(self.full_name) return diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py index 3d73db28ecd2abd3a65e42aa5fd319bb19393c64..14f3d50b5f240c5545d2eeff54eb8af5e33a7609 100644 --- a/chipathlon/workflow.py +++ b/chipathlon/workflow.py @@ -25,18 +25,19 @@ from Pegasus.DAX3 import * class Workflow(object): def __init__(self, jobhome, run_file, param_file, config_file, host, username, password, rewrite=False, debug=False): + # debug mode, print out additional information + self.debug = debug + self.username = username self.host = host self.password = password # Initialize db connection - self.mdb = chipathlon.db.MongoDB(host, username, password) + self.mdb = chipathlon.db.MongoDB(host, username, password, self.debug) # Jobname info & err self.jobhome = os.path.abspath(jobhome) self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/")) self.errors = [] - # debug mode, print out additional information - self.debug = debug # add new results even if they exist self.rewrite = rewrite # Input file info @@ -225,9 +226,9 @@ class Workflow(object): def _generate_jobs(self): for run in self.runs: if run.file_type == "fastq": - self.align_gen.generate(run) - self.remove_dup_gen.generate(run) - self.peak_call_gen.generate(run) + self.align_gen.generate(run, self.rewrite) + self.remove_dup_gen.generate(run, self.rewrite) + self.peak_call_gen.generate(run, self.rewrite) return def _create_setup(self):