diff --git a/chipathlon/remove_duplicates_generator.py b/chipathlon/remove_duplicates_generator.py index 7d239c30dd76d75691190e5a8aca441c28826e61..087236b06389b6d1b352a0e46dde487743728053 100644 --- a/chipathlon/remove_duplicates_generator.py +++ b/chipathlon/remove_duplicates_generator.py @@ -1,41 +1,76 @@ from chipathlon.module_generator import ModuleGenerator +from chipathlon.result import Result class RemoveDuplicatesGenerator(ModuleGenerator): - def __init__(self, master_files, workflow_module, run_data, debug = False): - super(RemoveDuplicatesGenerator, self).__init__(master_files, workflow_module, run_data, debug) + def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False): + """ + :param dax: The workflow graph object + :type dax: Peagasus.DAX3.ADAG + :param master_jobs: The dictionary mapping job name -> pegasus job object. + :type master_jobs: dict + :param master_files: The dictionary mapping file name -> pegasus file object. + :type master_files: dict + :param mdb: A MongoDB database class for fetching sample meta data. + :type mdb: :py:class:chipathlon.db.MongoDB + :param workflow_module: The actual module being used. + :type workflow_module: chipathlon.workflow_module.WorkflowModule + :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance + :type workflow_jobs: dict + :param base_path: Base location of the workflow, used to save metadata files. + :type base_path: str + :param debug: If true, prints out params for each job & module. + :type debug: bool + """ + super(RemoveDuplicatesGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug) self.module_name = "remove_duplicates" + self.result_dict = {} return - def parse_run(self, run_index): - """ - :param run_index: The index of the run in the yaml file. - :type run_index: int - """ - run = self.run_data.runs[run_index] - for output_file in self.run_data.get_output_files(run_index, "align", "align.bam"): - inputs = { - "align.bam": output_file["file_name"] - } - additional_inputs = {} - prefix = output_file["prefix"] - file_data = [ - [ - {"file_name": "%s_filtered.bam" % (prefix,)} - ], - [ - {"file_name": "%s_sorted.bam" % (prefix,), "save_result": True, "result_type": "bam"} - ], - [ - {"file_name": "%s_no_dups.bam" % (prefix,)}, - {"file_name": "%s_quality.qc" % (prefix,)} - ], - [ - {"file_name": "%s_no_dups.bed" % (prefix,)} - ], - [ - {"file_name": "%s_no_dups_chr.bed" % (prefix,), "save_result": True, "result_type": "bed"} - ] - ] - outputs = self.construct_outputs(file_data, {}, output_file["all_markers"], prefix, output_file["sample"], output_file["experiment_sample_ids"], output_file["control_sample_ids"], output_file["all_jobs"]) - yield ({}, inputs, additional_inputs, outputs) + def create_final_results(self, run): + """ + :param run: The target run to generate jobs for. + :type run: :py:class:chipathlon.run.Run + """ + for align_result in run.get_results("align", "align.bam"): + result = Result( + "no_dups_chr.bed", + align_result.control_samples, + align_result.signal_samples, + {"remove_duplicates": {}}, + [self.workflow_jobs[job_name] for job_name in self.module.get_job_names({})], + should_save=True + ) + run.add_result("remove_duplicates", result) + return run.get_results("remove_duplicates", "no_dups_chr.bed") + + def find_prev_results(self, run, result): + """ + :param run: The target run to generate jobs for. + :type run: :py:class:chipathlon.run.Run + :param result: The target result to create jobs for. + :type result: :py:class:chipathlon.result.Result + """ + align_results = run.get_results("align", "align.bam") + prev_results = [] + control_accessions = result.get_accessions("control") + signal_accessions = result.get_accessions("signal") + for prev_result in align_results: + if (set(prev_result.get_accessions("control")).issubset(control_accessions) and + set(prev_result.get_accessions("signal")).issubset(signal_accessions)): + prev_results.append(prev_result) + return prev_results + + def parse_result(self, run, result): + """ + :param run: The target run to generate jobs for. + :type run: :py:class:chipathlon.run.Run + :param result: The target result to create jobs for. + :type result: :py:class:chipathlon.result.Result + """ + prev_results = self.get_prev_results(run, result) + markers = {} + inputs = {"align.bam": prev_results[0].full_name} + additional_inputs = {} + results = self.create_results(run, result) + return markers, inputs, additional_inputs, self.get_outputs(results) diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py index dd8a25f8ab10001eafe7582225ebe25000d933a4..a9a9ad27959667af50816a959b57343e7daa31ca 100644 --- a/chipathlon/workflow.py +++ b/chipathlon/workflow.py @@ -214,11 +214,13 @@ class Workflow(object): def _load_generators(self): self.align_gen = AlignGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["align"], self.workflow_jobs, self.basepath, debug=self.debug) + self.remove_dup_gen = RemoveDuplicatesGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"], self.workflow_jobs, self.basepath, debug=self.debug) return def _generate_jobs(self): for run in self.runs: self.align_gen.generate(run) + self.remove_dup_gen.generate(run) return # When generating actual modules, IF results already exist for that job: