Commit e1194a80 authored by aknecht2's avatar aknecht2
Browse files

Adjusted peak call generator to match the new super class. Added peak call to workflow generation.

parent a8c9c1af
from chipathlon.module_generator import ModuleGenerator
from chipathlon.result import Result
import random
class PeakCallGenerator(ModuleGenerator):
def __init__(self, master_files, workflow_module, run_data, debug = False):
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
"""
:param master_files: The dictionary mapping file name -> file object.
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:chipathlon.db.MongoDB
:param workflow_module: The actual module being used.
:type workflow_module: chipathlon.workflow_module.WorkflowModule
:param run_data: Input sample data.
:type run_data: chipathlon.run_data.RunData
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
super(PeakCallGenerator, self).__init__(master_files, workflow_module, run_data, debug)
self.module_name = "peak_call"
super(PeakCallGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
self.generate_calls = {
"gem": self._gem,
"spp": self._spp,
......@@ -23,216 +31,81 @@ class PeakCallGenerator(ModuleGenerator):
"jamm": self._jamm,
"ccat": self._ccat
}
self.call_pairs = {}
return
def _ccat(self, run, file_pair):
def _ccat(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
Creates inputs, additional_inputs, and outputs for jamm peak calling.
Jamm needs to prep inputs since it has a weird input format
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
"""
markers = {"tool": "ccat"}
all_markers = file_pair[0]["all_markers"]
all_markers["peak_call"] = markers
prefix = "%s_ccat" % (file_pair[0]["prefix"],)
inputs = {
"chrom.sizes": self.run_data.genomes[run["genome"]]["chrom.sizes"],
"exp.bed": file_pair[0]["file_name"],
"control.bed": file_pair[1]["file_name"],
"conf.txt": self.module.workflow_jobs["ccat_callpeak"].raw_files.keys()[0],
"prefix": prefix
}
additional_inputs = {}
sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
control_sample_ids = file_pair[1]["control_sample_ids"]
file_data = [
[
{"file_name": "%s.significant.peak" % (prefix,)},
{"file_name": "%s.significant.region" % (prefix,)},
{"file_name": "%s.top100000.peak" % (prefix,)},
{"file_name": "%s.log" % (prefix,)}
],
[
{"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
"save_result" : True,
"result_type" : "peak"
}
]
]
previous_jobs = file_pair[0]["all_jobs"]
outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
return markers, inputs, additional_inputs, outputs
def _jamm(self, run, file_pair):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
Creates inputs, additional_inputs, and outputs for jamm peak calling.
Jamm needs to prep inputs since it has a weird input format
"""
markers = {"tool": "jamm"}
all_markers = file_pair[0]["all_markers"]
all_markers["peak_call"] = markers
prefix = "%s_jamm" % (file_pair[0]["prefix"],)
call_pair = self.call_pairs[result.full_name]
inputs = {
"chrom.sizes": self.run_data.genomes[run["genome"]]["chrom.sizes"],
"exp.bed": file_pair[0]["file_name"],
"control.bed": file_pair[1]["file_name"],
"jamm_dir": prefix
"chrom.sizes": run.genome.get_chrom_sizes()["name"],
"control.bed": call_pair[0].full_name,
"exp.bed": call_pair[1].full_name,
"conf.txt": self.module.workflow_jobs["ccat_callpeak"].raw_files.keys()[0]
"prefix": call_pair[0].prefix
}
additional_inputs = {}
sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
control_sample_ids = file_pair[1]["control_sample_ids"]
file_data = [
[
{"file_name": "%s/sample/%s_exp.bed" % (prefix, prefix)},
{"file_name": "%s/control/%s_control.bed" % (prefix, prefix)}
],
[
{"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
"save_result" : True,
"result_type" : "peak"
}
]
]
previous_jobs = file_pair[0]["all_jobs"]
outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
return markers, inputs, additional_inputs, outputs
return (markers, inputs, additional_inputs)
def _gem(self, run, file_pair):
def _gem(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
Creates inputs, additional_inputs, and outputs for gem peak calling.
The file pair should contain the experiment file first, and the
control file second.
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
"""
markers = {"tool": "gem"}
all_markers = file_pair[0]["all_markers"]
all_markers["peak_call"] = markers
prefix = "%s_gem" % (file_pair[0]["prefix"],)
call_pair = self.call_pairs[result.full_name]
inputs = {
"chrom.sizes": self.run_data.genomes[run["genome"]]["chrom.sizes"],
"exp.bed": file_pair[0]["file_name"],
"control.bed": file_pair[1]["file_name"],
"prefix": prefix
"chrom.sizes": run.genome.get_chrom_sizes()["name"],
"control.bed": call_pair[0].full_name,
"exp.bed": call_pair[1].full_name,
"prefix": call_pair[0].prefix
}
chr_fasta = []
additional_inputs = {
"read.dist" : self.module.workflow_jobs['gem_callpeak'].raw_files.keys()[0],
"chr_fasta": self.run_data.genomes[run["genome"]]["chr_fasta"]
"read.dist": self.module.workflow_jobs["gem_callpeak"].raw_files.keys()[0],
"chr_fasta": [fasta["name"] for fasta in run.genome.get_chr_fasta_files()]
}
sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
control_sample_ids = file_pair[1]["control_sample_ids"]
file_data = [
[
{"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
"save_result" : True,
"result_type" : "peak"}
]
]
previous_jobs = file_pair[0]["all_jobs"]
outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
return markers, inputs, additional_inputs, outputs
return (markers, inputs, additional_inputs)
def _spp(self, run, file_pair):
def _spp(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
Creates inputs, additional_inputs, and outputs for spp peak calling.
The file pair should contain the experiment file first, and the
control file second.
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
"""
markers = {"tool": "spp"}
all_markers = file_pair[0]["all_markers"]
all_markers["peak_call"] = markers
prefix = "%s_spp" % (file_pair[0]["prefix"],)
call_pair = self.call_pairs[result.full_name]
inputs = {
"exp.bed": file_pair[0]["file_name"],
"control.bed": file_pair[1]["file_name"]
"control.bed": call_pair[0].full_name,
"exp.bed": call_pair[1].full_name
}
additional_inputs = {}
sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
control_sample_ids = file_pair[1]["control_sample_ids"]
file_data = [
[
{"file_name": "%s_exp.tagAlign" % (prefix,)}
],
[
{"file_name": "%s_control.tagAlign" % (prefix,)}
],
[
{"file_name": "%s_results.narrowPeak.gz" % (prefix,)},
{"file_name": "%s_results.pdf" % (prefix,)},
{"file_name": "%s_results.ccscore" % (prefix,)}
],
[
{"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
"save_result" : True,
"result_type" : "peak"
}
]
]
previous_jobs = file_pair[0]["all_jobs"]
outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
return markers, inputs, additional_inputs, outputs
return (markers, inputs, additional_inputs)
def _macs2(self, run, file_pair):
def _macs2(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
Creates inputs, additional_inputs, and outputs for macs2 peak calling.
The file pair should contain the experiment file first, and the
control file second.
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
"""
markers = {"tool": "macs2"}
all_markers = file_pair[0]["all_markers"]
all_markers["peak_call"] = markers
prefix = "%s_macs2" % (file_pair[0]["prefix"],)
call_pair = self.call_pairs[result.full_name]
inputs = {
"exp.bed": file_pair[0]["file_name"],
"control.bed": file_pair[1]["file_name"],
"prefix": prefix
"control.bed": call_pair[0].full_name,
"exp.bed": call_pair[1].full_name,
"prefix": call_pair[0].prefix
}
additional_inputs = {}
sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
control_sample_ids = file_pair[1]["control_sample_ids"]
file_data = [
[
{"file_name": "%s_peaks.narrowPeak" % (prefix,)},
{"file_name": "%s_peaks.xls" % (prefix,)},
{"file_name": "%s_summits.bed" % (prefix,)}
],
[
{"file_name": "%s_results_sorted.narrowPeak" % (prefix,),
"save_result" : True,
"result_type" : "peak"}
]
]
previous_jobs = file_pair[0]["all_jobs"]
outputs = self.construct_outputs(file_data, markers, all_markers, prefix, sample_data, experiment_sample_ids, control_sample_ids, previous_jobs)
return markers, inputs, additional_inputs, outputs
return (markers, inputs, additional_inputs)
def _make_call_pairs(self, run, file_list):
"""
......@@ -280,14 +153,90 @@ class PeakCallGenerator(ModuleGenerator):
control_files += peak_data[experiment_id]["control"]
return zip(experiment_files, control_files)
def parse_run(self, run_index):
def _make_call_pairs(self, run, result_list):
"""
:param run_index: The index of the run in the yaml file.
:type run_index: int
Generate necessary params for a single run.
"""
run = self.run_data.runs[run_index]
for file_pair in self._make_call_pairs(run, self.run_data.get_output_files(run_index, "remove_duplicates", "no_dups_chr.bed")):
markers, inputs, additional_inputs, outputs = self.generate_calls[run["peak"]](run, file_pair)
yield markers, inputs, additional_inputs, outputs
:param run: The run currently being processed.
:type run: dict
:param result_list: A list of results to pair.
:type result_list: list
Construct a list of file tuples where the first file is the control
and the second is a signal file.
If we have more of one type of result than the other, we randomly add
results until we have the same number of results & controls.
"""
control_results = []
signal_results = []
for result in result_list:
if len(result.control_samples) > 0:
control_results.append(result)
elif len(result.signal_samples) > 0:
signal_results.append(result)
num_control = len(control_results)
num_signal = len(signal_results)
if num_control < num_signal:
while len(control_results) < num_signal:
# Want to randomly select only from original controls
control_results.append(random.choice(control_results[:num_control]))
elif num_signal < num_control:
while len(signal_results) < num_control:
# Want to randomly select only from original controls
signal_results.append(random.choice(signal_results[:num_signal]))
return zip(control_results, signal_results)
def get_markers(self, run):
return {"tool": run.peak}
def create_final_results(self, run):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
"""
remove_duplicates_results = run.get_results("remove_duplicates", "no_dups_chr.bed")
module_markers = {"peak_call": self.get_markers(run)}
module_jobs = [self.workflow_jobs[job_name] for job_name in self.module.get_job_names(module_markers["peak_call"])]
for paired_result in self._make_call_pairs(run, remove_duplicates_results):
markers = dict(paired_result[0].all_markers, **module_markers)
prev_result_jobs = list(set(paired_result[0].all_jobs).union(paired_result[1].all_jobs))
result = Result(
"results_sorted.narrowPeak",
paired_result[0].control_samples + paired_result[1].control_samples,
paired_result[0].signal_samples + paired_result[1].signal_samples,
markers,
prev_result_jobs + module_jobs,
should_save = True
)
run.add_result("peak_call", result)
self.call_pairs[result.full_name] = paired_result
return run.get_results("peak_call", "results_sorted.narrowPeak")
def find_prev_results(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
remove_duplicate_results = run.get_results("remove_duplicates", "no_dups_chr.bed")
prev_results = []
control_accessions = result.get_accessions("control")
signal_accessions = result.get_accessions("signal")
for prev_result in remove_duplicate_results:
if (set(prev_result.get_accessions("control")).issubset(control_accessions) and
set(prev_result.get_accessions("signal")).issubset(signal_accessions)):
prev_results.append(prev_result)
return prev_results
def parse_result(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
prev_results = self.get_prev_results(run, result)
markers, inputs, additional_inputs = self.generate_calls[run.peak](run, result)
results = self.create_results(run, result)
return markers, inputs, additional_inputs, self.get_outputs(results)
......@@ -215,12 +215,14 @@ class Workflow(object):
def _load_generators(self):
self.align_gen = AlignGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["align"], self.workflow_jobs, self.basepath, debug=self.debug)
self.remove_dup_gen = RemoveDuplicatesGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"], self.workflow_jobs, self.basepath, debug=self.debug)
self.peak_call_gen = PeakCallGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["peak_call"], self.workflow_jobs, self.basepath, debug=self.debug)
return
def _generate_jobs(self):
for run in self.runs:
self.align_gen.generate(run)
self.remove_dup_gen.generate(run)
self.peak_call_gen.generate(run)
return
# When generating actual modules, IF results already exist for that job:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment