idr_generator.py 6.08 KB
Newer Older
1
2
from module_generator import ModuleGenerator
from chipathlon.result import Result
3
import collections
4
5

class IdrGenerator(ModuleGenerator):
aknecht2's avatar
aknecht2 committed
6
7
8
9
10
11
12
13
14
15
    """
    :param dax: The workflow graph object
    :type dax: Peagasus.DAX3.ADAG
    :param master_jobs: The dictionary mapping job name -> pegasus job object.
    :type master_jobs: dict
    :param master_files: The dictionary mapping file name -> pegasus file object.
    :type master_files: dict
    :param mdb: A MongoDB database class for fetching sample meta data.
    :type mdb: :py:class:`~chipathlon.db.MongoDB`
    :param workflow_module: The actual module being used.
16
    :type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
aknecht2's avatar
aknecht2 committed
17
18
19
20
    :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
    :type workflow_jobs: dict
    :param base_path: Base location of the workflow, used to save metadata files.
    :type base_path: str
21
22
23
24
25
26
27
    :param save_db: Whether or not we want to save results to the database.
        True by default.
    :type save_db: bool
    :param rewrite: Whether or not to rewrite existing files.  If true, it will
        ignore files in Mongo and recreate them.  If false, it will download
        files based on the latest available completed job.
    :type rewrite: bool
aknecht2's avatar
aknecht2 committed
28
29
30
    :param debug: If true, prints out params for each job & module.
    :type debug: bool
    """
31

32
33
34
35
36
37
    def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
                workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
        super(IdrGenerator, self).__init__(
            dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
            base_path, save_db=save_db, rewrite=rewrite, debug=debug
        )
38
39
        self.module_name = "idr"
        self.result_dict = {}
40
41
        self.output_files = collections.defaultdict(dict)

42
43
44
45
46
        self.output_files["peakranger"]["narrow"] = ["region_sorted.bed", "summit_sorted.bed"]
        self.output_files["ccat"]["broad"] = ["region_sorted.bed", "peak_sorted.bed"]
        self.output_files["gem"]["narrow"] = ["results_GEM_sorted.bed", "results_GPS_sorted.bed"]
        self.output_files["spp"]["narrow"] = self.output_files["spp"]["broad"] = ["results_sorted.bed"]
        self.output_files["macs2"]["narrow"] = self.output_files["macs2"]["broad"] = ["results_sorted.bed"]
47
        self.output_files["music"]["narrow"] = ["sorted_scale_%s_all.bed" % (i,) for i in [129, 194, 291]]
48
        self.output_files["music"]["punctate"] = ["sorted_scale_%s_all.bed" % (i,) for i in [129, 194, 291, 437, 656, 985, 1477, 2216]]
49
        self.output_files["music"]["broad"] = ["sorted_scale_%s_all.bed" % (i,) for i in [1459, 2189, 3284, 4926, 7389, 11084, 16626]]
50
51
52
        self.output_files["zerone"]["broad"] = ["results_sorted.bed"]
        self.output_files["hiddendomains"]["broad"] = ["results_sorted.bed"]
        self.output_files["pepr"]["broad"] = ["results_sorted.bed"]
53
        self.output_files["pepr"]["sharp"] = ["results_sorted.bed"]
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
        if debug:
            print "[LOADING GENERATOR] IdrGenerator"
        return

    def _make_idr_pairs(self, run, result_list):
        potential_files = []
        for idr_accession in run.idr:
            for result in result_list:
                if idr_accession in result.get_accessions("signal"):
                    potential_files.append(result)
                    break
        return potential_files

    def create_final_results(self, run):
        """
        :param run: The target run to generate jobs for.
70
        :type run: :py:class:`~chipathlon.run.Run`
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
        """
        module_jobs = [self.workflow_jobs[job_name] for job_name in self.module.get_job_names({})]
        peak_results = self._find_prev_results(run)

        for result_dict in self._find_prev_results(run):
            idr_pair = self._make_idr_pairs(run, result_dict["results"])
            markers = dict({"idr": {}}, **idr_pair[0].all_markers)
            prev_result_jobs = list(set(idr_pair[0].all_jobs).union(idr_pair[1].all_jobs))
            result = Result(
                "sorted_idr.bed",
                idr_pair[0].control_samples + idr_pair[1].control_samples,
                idr_pair[0].signal_samples + idr_pair[1].signal_samples,
                markers,
                prev_result_jobs + module_jobs,
                should_save=True,
                last_result=True
            )
            result.add_to_prefix("_" + "_".join(result_dict["file_name"].split("_")[:-1]))
            result.add_meta("prev_result_name", result_dict["file_name"])
            result.add_meta("add_prefix", "_" + "_".join(result_dict["file_name"].split("_")[:-1]))
            run.add_result("idr", result)

        return run.get_results("idr", "sorted_idr.bed")

    def _find_prev_results(self, run):
        return [{
            "results": run.get_results("peak_call", output_file),
            "file_name": output_file
99
        } for output_file in self.output_files[run.peak][run.peak_type]]
100
101
102
103

    def find_prev_results(self, run, result):
        """
        :param run: The target run to generate jobs for.
aknecht2's avatar
aknecht2 committed
104
        :type run: :py:class:`~chipathlon.run.Run`
105
        :param result: The target result to create jobs for.
aknecht2's avatar
aknecht2 committed
106
        :type result: :py:class:`~chipathlon.result.Result`
107
108
109
110
111
112
113
114
115
116
        """
        prev_result_name = result.get_meta("prev_result_name")
        for result_dict in self._find_prev_results(run):
            if result_dict["file_name"] == prev_result_name:
                return self._make_idr_pairs(run, result_dict["results"])
        return []

    def parse_result(self, run, result):
        """
        :param run: The target run to generate jobs for.
aknecht2's avatar
aknecht2 committed
117
        :type run: :py:class:`~chipathlon.run.Run`
118
        :param result: The target result to create jobs for.
aknecht2's avatar
aknecht2 committed
119
        :type result: :py:class:`~chipathlon.result.Result`
120
121
122
123
124
125
126
127
        """
        result_pair = self.get_prev_results(run, result)
        markers = {}
        inputs = {
            "sorted_sample_1.bed": result_pair[0].full_name,
            "sorted_sample_2.bed": result_pair[1].full_name
        }
        results = self.create_results(run, result)
128
        return markers, inputs, self.get_outputs(results)