Skip to content
Snippets Groups Projects
Commit a8c9c1af authored by aknecht2's avatar aknecht2
Browse files

Created remove_duplicates generator and added. Added to workflow job creation.

parent e85e9aab
No related branches found
No related tags found
1 merge request!11Resolve "Module generator improvements"
from chipathlon.module_generator import ModuleGenerator
from chipathlon.result import Result
class RemoveDuplicatesGenerator(ModuleGenerator):
def __init__(self, master_files, workflow_module, run_data, debug = False):
super(RemoveDuplicatesGenerator, self).__init__(master_files, workflow_module, run_data, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
"""
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:chipathlon.db.MongoDB
:param workflow_module: The actual module being used.
:type workflow_module: chipathlon.workflow_module.WorkflowModule
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
super(RemoveDuplicatesGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
self.module_name = "remove_duplicates"
self.result_dict = {}
return
def parse_run(self, run_index):
def create_final_results(self, run):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
"""
for align_result in run.get_results("align", "align.bam"):
result = Result(
"no_dups_chr.bed",
align_result.control_samples,
align_result.signal_samples,
{"remove_duplicates": {}},
[self.workflow_jobs[job_name] for job_name in self.module.get_job_names({})],
should_save=True
)
run.add_result("remove_duplicates", result)
return run.get_results("remove_duplicates", "no_dups_chr.bed")
def find_prev_results(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
align_results = run.get_results("align", "align.bam")
prev_results = []
control_accessions = result.get_accessions("control")
signal_accessions = result.get_accessions("signal")
for prev_result in align_results:
if (set(prev_result.get_accessions("control")).issubset(control_accessions) and
set(prev_result.get_accessions("signal")).issubset(signal_accessions)):
prev_results.append(prev_result)
return prev_results
def parse_result(self, run, result):
"""
:param run_index: The index of the run in the yaml file.
:type run_index: int
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
run = self.run_data.runs[run_index]
for output_file in self.run_data.get_output_files(run_index, "align", "align.bam"):
inputs = {
"align.bam": output_file["file_name"]
}
prev_results = self.get_prev_results(run, result)
markers = {}
inputs = {"align.bam": prev_results[0].full_name}
additional_inputs = {}
prefix = output_file["prefix"]
file_data = [
[
{"file_name": "%s_filtered.bam" % (prefix,)}
],
[
{"file_name": "%s_sorted.bam" % (prefix,), "save_result": True, "result_type": "bam"}
],
[
{"file_name": "%s_no_dups.bam" % (prefix,)},
{"file_name": "%s_quality.qc" % (prefix,)}
],
[
{"file_name": "%s_no_dups.bed" % (prefix,)}
],
[
{"file_name": "%s_no_dups_chr.bed" % (prefix,), "save_result": True, "result_type": "bed"}
]
]
outputs = self.construct_outputs(file_data, {}, output_file["all_markers"], prefix, output_file["sample"], output_file["experiment_sample_ids"], output_file["control_sample_ids"], output_file["all_jobs"])
yield ({}, inputs, additional_inputs, outputs)
results = self.create_results(run, result)
return markers, inputs, additional_inputs, self.get_outputs(results)
......@@ -214,11 +214,13 @@ class Workflow(object):
def _load_generators(self):
self.align_gen = AlignGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["align"], self.workflow_jobs, self.basepath, debug=self.debug)
self.remove_dup_gen = RemoveDuplicatesGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"], self.workflow_jobs, self.basepath, debug=self.debug)
return
def _generate_jobs(self):
for run in self.runs:
self.align_gen.generate(run)
self.remove_dup_gen.generate(run)
return
# When generating actual modules, IF results already exist for that job:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment