From e1194a80f73d2265c612fe4be51183eaa66c551d Mon Sep 17 00:00:00 2001
From: aknecht2 <aknecht2@unl.edu>
Date: Sun, 29 Jan 2017 21:34:09 -0600
Subject: [PATCH] Adjusted peak call generator to match the new super class. 
 Added peak call to workflow generation.

---
 chipathlon/peak_call_generator.py | 333 +++++++++++++-----------------
 chipathlon/workflow.py            |   2 +
 2 files changed, 143 insertions(+), 192 deletions(-)

diff --git a/chipathlon/peak_call_generator.py b/chipathlon/peak_call_generator.py
index 67f235e..7286274 100644
--- a/chipathlon/peak_call_generator.py
+++ b/chipathlon/peak_call_generator.py
@@ -1,21 +1,29 @@
 from chipathlon.module_generator import ModuleGenerator
+from chipathlon.result import Result
 import random
 
 class PeakCallGenerator(ModuleGenerator):
 
-    def __init__(self, master_files, workflow_module, run_data, debug = False):
+    def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
         """
-        :param master_files: The dictionary mapping file name -> file object.
+        :param dax: The workflow graph object
+        :type dax: Peagasus.DAX3.ADAG
+        :param master_jobs: The dictionary mapping job name -> pegasus job object.
+        :type master_jobs: dict
+        :param master_files: The dictionary mapping file name -> pegasus file object.
         :type master_files: dict
+        :param mdb: A MongoDB database class for fetching sample meta data.
+        :type mdb: :py:class:chipathlon.db.MongoDB
         :param workflow_module: The actual module being used.
         :type workflow_module: chipathlon.workflow_module.WorkflowModule
-        :param run_data: Input sample data.
-        :type run_data: chipathlon.run_data.RunData
+        :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
+        :type workflow_jobs: dict
+        :param base_path: Base location of the workflow, used to save metadata files.
+        :type base_path: str
         :param debug: If true, prints out params for each job & module.
         :type debug: bool
         """
-        super(PeakCallGenerator, self).__init__(master_files, workflow_module, run_data, debug)
-        self.module_name = "peak_call"
+        super(PeakCallGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
         self.generate_calls = {
             "gem": self._gem,
             "spp": self._spp,
@@ -23,216 +31,81 @@ class PeakCallGenerator(ModuleGenerator):
             "jamm": self._jamm,
             "ccat": self._ccat
         }
+        self.call_pairs = {}
         return
 
-    def _ccat(self, run, file_pair):
+    def _ccat(self, run, result):
         """
         :param run: The run to generate jobs for
-        :type run: dictionary
-        :param file_pair: The tuple of input files.
-        :type file_pair: tuple
-
-        Creates inputs, additional_inputs, and outputs for jamm peak calling.
-        Jamm needs to prep inputs since it has a weird input format
+        :type run: :py:class:chipathlon.run.Run
+        :param result: The result to generate jobs for.
+        :type result: :py:class:chipathlon.result.Result
         """
         markers = {"tool": "ccat"}
-        all_markers = file_pair[0]["all_markers"]
-        all_markers["peak_call"] = markers
-        prefix = "%s_ccat" % (file_pair[0]["prefix"],)
-        inputs = {
-            "chrom.sizes": self.run_data.genomes[run["genome"]]["chrom.sizes"],
-            "exp.bed": file_pair[0]["file_name"],
-            "control.bed": file_pair[1]["file_name"],
-            "conf.txt": self.module.workflow_jobs["ccat_callpeak"].raw_files.keys()[0],
-            "prefix": prefix
-        }
-        additional_inputs = {}
-        sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
-        experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
-        control_sample_ids = file_pair[1]["control_sample_ids"]
-        file_data = [
-            [
-                {"file_name": "%s.significant.peak" % (prefix,)},
-                {"file_name": "%s.significant.region" % (prefix,)},
-                {"file_name": "%s.top100000.peak" % (prefix,)},
-                {"file_name": "%s.log" % (prefix,)}
-            ],
-            [
-                {"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
-                 "save_result" : True,
-                 "result_type" : "peak"
-                }
-            ]
-        ]
-        previous_jobs = file_pair[0]["all_jobs"]
-        outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
-        return markers, inputs, additional_inputs, outputs
-
-    def _jamm(self, run, file_pair):
-        """
-        :param run: The run to generate jobs for
-        :type run: dictionary
-        :param file_pair: The tuple of input files.
-        :type file_pair: tuple
-
-        Creates inputs, additional_inputs, and outputs for jamm peak calling.
-        Jamm needs to prep inputs since it has a weird input format
-        """
-        markers = {"tool": "jamm"}
-        all_markers = file_pair[0]["all_markers"]
-        all_markers["peak_call"] = markers
-        prefix = "%s_jamm" % (file_pair[0]["prefix"],)
+        call_pair = self.call_pairs[result.full_name]
         inputs = {
-            "chrom.sizes": self.run_data.genomes[run["genome"]]["chrom.sizes"],
-            "exp.bed": file_pair[0]["file_name"],
-            "control.bed": file_pair[1]["file_name"],
-            "jamm_dir": prefix
+            "chrom.sizes": run.genome.get_chrom_sizes()["name"],
+            "control.bed": call_pair[0].full_name,
+            "exp.bed": call_pair[1].full_name,
+            "conf.txt": self.module.workflow_jobs["ccat_callpeak"].raw_files.keys()[0]
+            "prefix": call_pair[0].prefix
         }
         additional_inputs = {}
-        sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
-        experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
-        control_sample_ids = file_pair[1]["control_sample_ids"]
-        file_data = [
-            [
-                {"file_name": "%s/sample/%s_exp.bed" % (prefix, prefix)},
-                {"file_name": "%s/control/%s_control.bed" % (prefix, prefix)}
-            ],
-            [
-                {"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
-                 "save_result" : True,
-                 "result_type" : "peak"
-                }
-            ]
-        ]
-        previous_jobs = file_pair[0]["all_jobs"]
-        outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
-        return markers, inputs, additional_inputs, outputs
+        return (markers, inputs, additional_inputs)
 
-    def _gem(self, run, file_pair):
+    def _gem(self, run, result):
         """
         :param run: The run to generate jobs for
-        :type run: dictionary
-        :param file_pair: The tuple of input files.
-        :type file_pair: tuple
-
-        Creates inputs, additional_inputs, and outputs for gem peak calling.
-        The file pair should contain the experiment file first, and the
-        control file second.
+        :type run: :py:class:chipathlon.run.Run
+        :param result: The result to generate jobs for.
+        :type result: :py:class:chipathlon.result.Result
         """
         markers = {"tool": "gem"}
-        all_markers = file_pair[0]["all_markers"]
-        all_markers["peak_call"] = markers
-        prefix = "%s_gem" % (file_pair[0]["prefix"],)
+        call_pair = self.call_pairs[result.full_name]
         inputs = {
-            "chrom.sizes": self.run_data.genomes[run["genome"]]["chrom.sizes"],
-            "exp.bed": file_pair[0]["file_name"],
-            "control.bed": file_pair[1]["file_name"],
-            "prefix": prefix
+            "chrom.sizes": run.genome.get_chrom_sizes()["name"],
+            "control.bed": call_pair[0].full_name,
+            "exp.bed": call_pair[1].full_name,
+            "prefix": call_pair[0].prefix
         }
-        chr_fasta = []
         additional_inputs = {
-            "read.dist" : self.module.workflow_jobs['gem_callpeak'].raw_files.keys()[0],
-            "chr_fasta": self.run_data.genomes[run["genome"]]["chr_fasta"]
+            "read.dist": self.module.workflow_jobs["gem_callpeak"].raw_files.keys()[0],
+            "chr_fasta": [fasta["name"] for fasta in run.genome.get_chr_fasta_files()]
         }
-        sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
-        experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
-        control_sample_ids = file_pair[1]["control_sample_ids"]
-        file_data = [
-            [
-                {"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
-                 "save_result" : True,
-                 "result_type" : "peak"}
-            ]
-        ]
-        previous_jobs = file_pair[0]["all_jobs"]
-        outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
-        return markers, inputs, additional_inputs, outputs
+        return (markers, inputs, additional_inputs)
 
-    def _spp(self, run, file_pair):
+    def _spp(self, run, result):
         """
         :param run: The run to generate jobs for
-        :type run: dictionary
-        :param file_pair: The tuple of input files.
-        :type file_pair: tuple
-
-        Creates inputs, additional_inputs, and outputs for spp peak calling.
-        The file pair should contain the experiment file first, and the
-        control file second.
+        :type run: :py:class:chipathlon.run.Run
+        :param result: The result to generate jobs for.
+        :type result: :py:class:chipathlon.result.Result
         """
         markers = {"tool": "spp"}
-        all_markers = file_pair[0]["all_markers"]
-        all_markers["peak_call"] = markers
-        prefix = "%s_spp" % (file_pair[0]["prefix"],)
+        call_pair = self.call_pairs[result.full_name]
         inputs = {
-            "exp.bed": file_pair[0]["file_name"],
-            "control.bed": file_pair[1]["file_name"]
+            "control.bed": call_pair[0].full_name,
+            "exp.bed": call_pair[1].full_name
         }
         additional_inputs = {}
-        sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
-        experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
-        control_sample_ids = file_pair[1]["control_sample_ids"]
-        file_data = [
-            [
-                {"file_name": "%s_exp.tagAlign" % (prefix,)}
-            ],
-            [
-                {"file_name": "%s_control.tagAlign" % (prefix,)}
-            ],
-            [
-                {"file_name": "%s_results.narrowPeak.gz" % (prefix,)},
-                {"file_name": "%s_results.pdf" % (prefix,)},
-                {"file_name": "%s_results.ccscore" % (prefix,)}
-            ],
-            [
-                {"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
-                 "save_result" : True,
-                 "result_type" : "peak"
-                }
-            ]
-        ]
-        previous_jobs = file_pair[0]["all_jobs"]
-        outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
-        return markers, inputs, additional_inputs, outputs
+        return (markers, inputs, additional_inputs)
 
-    def _macs2(self, run, file_pair):
+    def _macs2(self, run, result):
         """
         :param run: The run to generate jobs for
-        :type run: dictionary
-        :param file_pair: The tuple of input files.
-        :type file_pair: tuple
-
-        Creates inputs, additional_inputs, and outputs for macs2 peak calling.
-        The file pair should contain the experiment file first, and the
-        control file second.
+        :type run: :py:class:chipathlon.run.Run
+        :param result: The result to generate jobs for.
+        :type result: :py:class:chipathlon.result.Result
         """
         markers = {"tool": "macs2"}
-        all_markers = file_pair[0]["all_markers"]
-        all_markers["peak_call"] = markers
-        prefix = "%s_macs2" % (file_pair[0]["prefix"],)
+        call_pair = self.call_pairs[result.full_name]
         inputs = {
-            "exp.bed": file_pair[0]["file_name"],
-            "control.bed": file_pair[1]["file_name"],
-            "prefix": prefix
+            "control.bed": call_pair[0].full_name,
+            "exp.bed": call_pair[1].full_name,
+            "prefix": call_pair[0].prefix
         }
         additional_inputs = {}
-        sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
-        experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
-        control_sample_ids = file_pair[1]["control_sample_ids"]
-        file_data = [
-            [
-                {"file_name": "%s_peaks.narrowPeak" % (prefix,)},
-                {"file_name": "%s_peaks.xls" % (prefix,)},
-                {"file_name": "%s_summits.bed" % (prefix,)}
-            ],
-            [
-                {"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
-                 "save_result" : True,
-                 "result_type" : "peak"}
-            ]
-        ]
-        previous_jobs = file_pair[0]["all_jobs"]
-        outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
-        return markers, inputs, additional_inputs, outputs
+        return (markers, inputs, additional_inputs)
 
     def _make_call_pairs(self, run, file_list):
         """
@@ -280,14 +153,90 @@ class PeakCallGenerator(ModuleGenerator):
                 control_files += peak_data[experiment_id]["control"]
         return zip(experiment_files, control_files)
 
-    def parse_run(self, run_index):
+    def _make_call_pairs(self, run, result_list):
         """
-        :param run_index: The index of the run in the yaml file.
-        :type run_index: int
-
-        Generate necessary params for a single run.
-        """
-        run = self.run_data.runs[run_index]
-        for file_pair in self._make_call_pairs(run, self.run_data.get_output_files(run_index, "remove_duplicates", "no_dups_chr.bed")):
-            markers, inputs, additional_inputs, outputs = self.generate_calls[run["peak"]](run, file_pair)
-        yield markers, inputs, additional_inputs, outputs
+        :param run: The run currently being processed.
+        :type run: dict
+        :param result_list: A list of results to pair.
+        :type result_list: list
+
+        Construct a list of file tuples where the first file is the control
+        and the second is a signal file.
+        If we have more of one type of result than the other, we randomly add
+        results until we have the same number of results & controls.
+        """
+        control_results = []
+        signal_results = []
+        for result in result_list:
+            if len(result.control_samples) > 0:
+                control_results.append(result)
+            elif len(result.signal_samples) > 0:
+                signal_results.append(result)
+        num_control = len(control_results)
+        num_signal = len(signal_results)
+        if num_control < num_signal:
+            while len(control_results) < num_signal:
+                # Want to randomly select only from original controls
+                control_results.append(random.choice(control_results[:num_control]))
+        elif num_signal < num_control:
+            while len(signal_results) < num_control:
+                # Want to randomly select only from original controls
+                signal_results.append(random.choice(signal_results[:num_signal]))
+        return zip(control_results, signal_results)
+
+    def get_markers(self, run):
+        return {"tool": run.peak}
+
+    def create_final_results(self, run):
+        """
+        :param run: The target run to generate jobs for.
+        :type run: :py:class:chipathlon.run.Run
+        """
+        remove_duplicates_results = run.get_results("remove_duplicates", "no_dups_chr.bed")
+        module_markers = {"peak_call": self.get_markers(run)}
+        module_jobs = [self.workflow_jobs[job_name] for job_name in self.module.get_job_names(module_markers["peak_call"])]
+
+        for paired_result in self._make_call_pairs(run, remove_duplicates_results):
+            markers = dict(paired_result[0].all_markers, **module_markers)
+            prev_result_jobs = list(set(paired_result[0].all_jobs).union(paired_result[1].all_jobs))
+            result = Result(
+                "results_sorted.narrowPeak",
+                paired_result[0].control_samples + paired_result[1].control_samples,
+                paired_result[0].signal_samples + paired_result[1].signal_samples,
+                markers,
+                prev_result_jobs + module_jobs,
+                should_save = True
+            )
+            run.add_result("peak_call", result)
+            self.call_pairs[result.full_name] = paired_result
+
+        return run.get_results("peak_call", "results_sorted.narrowPeak")
+
+    def find_prev_results(self, run, result):
+        """
+        :param run: The target run to generate jobs for.
+        :type run: :py:class:chipathlon.run.Run
+        :param result: The target result to create jobs for.
+        :type result: :py:class:chipathlon.result.Result
+        """
+        remove_duplicate_results = run.get_results("remove_duplicates", "no_dups_chr.bed")
+        prev_results = []
+        control_accessions = result.get_accessions("control")
+        signal_accessions = result.get_accessions("signal")
+        for prev_result in remove_duplicate_results:
+            if (set(prev_result.get_accessions("control")).issubset(control_accessions) and
+                set(prev_result.get_accessions("signal")).issubset(signal_accessions)):
+                prev_results.append(prev_result)
+        return prev_results
+
+    def parse_result(self, run, result):
+        """
+        :param run: The target run to generate jobs for.
+        :type run: :py:class:chipathlon.run.Run
+        :param result: The target result to create jobs for.
+        :type result: :py:class:chipathlon.result.Result
+        """
+        prev_results = self.get_prev_results(run, result)
+        markers, inputs, additional_inputs = self.generate_calls[run.peak](run, result)
+        results = self.create_results(run, result)
+        return markers, inputs, additional_inputs, self.get_outputs(results)
diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py
index a9a9ad2..1c3ce6f 100644
--- a/chipathlon/workflow.py
+++ b/chipathlon/workflow.py
@@ -215,12 +215,14 @@ class Workflow(object):
     def _load_generators(self):
         self.align_gen = AlignGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["align"], self.workflow_jobs, self.basepath, debug=self.debug)
         self.remove_dup_gen = RemoveDuplicatesGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"], self.workflow_jobs, self.basepath, debug=self.debug)
+        self.peak_call_gen = PeakCallGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["peak_call"], self.workflow_jobs, self.basepath, debug=self.debug)
         return
 
     def _generate_jobs(self):
         for run in self.runs:
             self.align_gen.generate(run)
             self.remove_dup_gen.generate(run)
+            self.peak_call_gen.generate(run)
         return
 
     # When generating actual modules, IF results already exist for that job:
-- 
GitLab