idr_generator.py 4.96 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from module_generator import ModuleGenerator
from chipathlon.result import Result

class IdrGenerator(ModuleGenerator):

    def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
        """
        :param dax: The workflow graph object
        :type dax: Peagasus.DAX3.ADAG
        :param master_jobs: The dictionary mapping job name -> pegasus job object.
        :type master_jobs: dict
        :param master_files: The dictionary mapping file name -> pegasus file object.
        :type master_files: dict
        :param mdb: A MongoDB database class for fetching sample meta data.
        :type mdb: :py:class:chipathlon.db.MongoDB
        :param workflow_module: The actual module being used.
        :type workflow_module: chipathlon.workflow_module.WorkflowModule
        :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
        :type workflow_jobs: dict
        :param base_path: Base location of the workflow, used to save metadata files.
        :type base_path: str
        :param debug: If true, prints out params for each job & module.
        :type debug: bool
        """
        super(IdrGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
        self.module_name = "idr"
        self.result_dict = {}
        self.output_files = {
            "peakranger": ["region_sorted.bed", "summit_sorted.bed"],
            "ccat": ["region_sorted.bed", "peak_sorted.bed"],
            "gem": ["results_GEM_sorted.bed", "results_GPS_sorted.bed"],
            "spp": ["results_sorted.bed"],
33
34
35
36
            "zerone": ["results_sorted_final.bed"],
            "hiddendomains": ["results_final.bed"],
            "macs2": ["results_sorted.bed"],
            "pepr": ["pepr_result.bed"]
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
        }
        if debug:
            print "[LOADING GENERATOR] IdrGenerator"
        return

    def _make_idr_pairs(self, run, result_list):
        potential_files = []
        for idr_accession in run.idr:
            for result in result_list:
                if idr_accession in result.get_accessions("signal"):
                    potential_files.append(result)
                    break
        return potential_files

    def create_final_results(self, run):
        """
        :param run: The target run to generate jobs for.
54
        :type run: :py:class:`~chipathlon.run.Run`
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
        """
        module_jobs = [self.workflow_jobs[job_name] for job_name in self.module.get_job_names({})]
        peak_results = self._find_prev_results(run)

        for result_dict in self._find_prev_results(run):
            idr_pair = self._make_idr_pairs(run, result_dict["results"])
            markers = dict({"idr": {}}, **idr_pair[0].all_markers)
            prev_result_jobs = list(set(idr_pair[0].all_jobs).union(idr_pair[1].all_jobs))
            result = Result(
                "sorted_idr.bed",
                idr_pair[0].control_samples + idr_pair[1].control_samples,
                idr_pair[0].signal_samples + idr_pair[1].signal_samples,
                markers,
                prev_result_jobs + module_jobs,
                should_save=True,
                last_result=True
            )
            result.add_to_prefix("_" + "_".join(result_dict["file_name"].split("_")[:-1]))
            result.add_meta("prev_result_name", result_dict["file_name"])
            result.add_meta("add_prefix", "_" + "_".join(result_dict["file_name"].split("_")[:-1]))
            run.add_result("idr", result)

        return run.get_results("idr", "sorted_idr.bed")

    def _find_prev_results(self, run):
        return [{
            "results": run.get_results("peak_call", output_file),
            "file_name": output_file
        } for output_file in self.output_files[run.peak]]

    def find_prev_results(self, run, result):
        """
        :param run: The target run to generate jobs for.
        :type run: :py:class:chipathlon.run.Run
        :param result: The target result to create jobs for.
        :type result: :py:class:chipathlon.result.Result
        """
        prev_result_name = result.get_meta("prev_result_name")
        for result_dict in self._find_prev_results(run):
            if result_dict["file_name"] == prev_result_name:
                return self._make_idr_pairs(run, result_dict["results"])
        return []

    def parse_result(self, run, result):
        """
        :param run: The target run to generate jobs for.
        :type run: :py:class:chipathlon.run.Run
        :param result: The target result to create jobs for.
        :type result: :py:class:chipathlon.result.Result
        """
        result_pair = self.get_prev_results(run, result)
        markers = {}
        inputs = {
            "sorted_sample_1.bed": result_pair[0].full_name,
            "sorted_sample_2.bed": result_pair[1].full_name
        }
        additional_inputs = {}
        results = self.create_results(run, result)
        return markers, inputs, additional_inputs, self.get_outputs(results)