Skip to content
Snippets Groups Projects

Resolve "Module generator improvements"

Merged aknecht2 requested to merge 8-module-generator-improvements into master
12 files
+ 886
605
Compare changes
  • Side-by-side
  • Inline
Files
12
#!/bin/bash
. /util/opt/lmod/lmod/init/profile
module load peakranger
peakranger "$@"
+ 154
157
from chipathlon.module_generator import ModuleGenerator
from chipathlon.result import Result
class AlignGenerator(ModuleGenerator):
def __init__(self, master_files, workflow_module, run_data, debug = False):
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
"""
:param master_files: The dictionary mapping file name -> file object.
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:chipathlon.db.MongoDB
:param workflow_module: The actual module being used.
:type workflow_module: chipathlon.workflow_module.WorkflowModule
:param run_data: Input sample data.
:type run_data: chipathlon.run_data.RunData
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
super(AlignGenerator, self).__init__(master_files, workflow_module, run_data, debug)
self.module_name = "align"
super(AlignGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
self.generate_calls = {
"bwa": {
1: self._bwa_single,
@@ -25,174 +33,124 @@ class AlignGenerator(ModuleGenerator):
2: self._bowtie2_paired
}
}
if debug:
print "[LOADING GENERATOR] AlignGenerator"
return
def _load_genome_additional(self, assembly, tool):
def _bwa_init(self, run):
"""
:param assembly: The assembly used for the genome. (i.e. grch38p6)
:type assembly: str
:param tool: The tool used to create the genome. (i.e. bwa)
:type tool: str
:param run: The run to load genome files for
:type run: :py:class:chipathlon.run.Run
Constructs the structure necessary for additional genome files.
Loads required genome files for bwa in to the dictionary structure
expected by the actual workflow module. Specifically, this maps:
ref_genome -> base .fa or .fna file
ref_genome.amb -> .amb file
ref_genome.ann -> .ann file
e.t.c
"""
genome_additional = {}
genome_files = self.run_data.get_genome_files(assembly, tool)
for ext in genome_files["additional_files"]:
genome_additional["ref_genome.%s" % (ext,)] = genome_files["additional_files"][ext]
return genome_additional
def _bwa_init(self, assembly):
"""
:param assembly: The assembly used for the genome. (i.e. grch38p6)
:type assembly: str
Creates initial inputs & additional_inputs for bwa jobs.
"""
additional_inputs = self._load_genome_additional(assembly, "bwa")
inputs = {}
inputs["ref_genome"] = self.run_data.get_genome_files(assembly, "bwa")["base_file"]
base_file = run.genome.get_base_file()
inputs = {"ref_genome": base_file["name"]}
additional_inputs = {}
for f in run.genome.get_additional_files():
ext = f["name"].split(".")[-1]
additional_inputs["ref_genome.%s" % (ext,)] = f["name"]
return (inputs, additional_inputs)
def _bowtie2_init(self, assembly):
"""
:param assembly: The assembly used for the genome. (i.e. grch38p6)
:type assembly: str
Creates initial inputs & additional_inputs for bowite2 jobs.
"""
additional_inputs = self._load_genome_additional(assembly, "bowtie2")
additional_inputs["ref_genome"] = self.run_data.get_genome_files(assembly, "bowtie2")["base_file"]
inputs = {}
inputs["ref_genome_prefix"] = "genome_%s_bowtie2" % (assembly,)
def _bowtie2_init(self, run):
"""
:param run: The run to load genome files for
:type run: :py:class:chipathlon.run.Run
Loads required genome files for bowtie2 in to the dictionary structure
expected by the actual workflow module. Specifically, this maps:
ref_genome_prefix -> base file name
ref_genome -> base .fa or .fna file
ref_genome.1.bt2 -> .1.bt2 file
ref_genome.2.bt2 -> .2.bt2 file
e.t.c
"""
base_file = run.genome.get_base_file()
additional_files = run.genome.get_additional_files()
inputs = {"ref_genome_prefix": run.genome.file_prefix}
additional_inputs = {"ref_genome": base_file["name"]}
for f in run.genome.get_additional_files():
ext = f["name"].split(".")[-1]
additional_inputs["ref_genome.%s" % (ext,)] = f["name"]
return (inputs, additional_inputs)
def _bwa_single(self, run, file_pair):
def _bwa_single(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
Creates inputs, additional_inputs, and outputs for bwa alignment
jobs with single end reads. Only the first file in the file
pair will be processed.
jobs with single end reads. Only the first file in prev_results will
be processed
"""
markers = {"tool": "bwa", "read_end": "single"}
inputs, additional_inputs = self._bwa_init(run["genome"])
inputs["download_1.fastq"] = file_pair[0]["file_name"]
prefix = "%s_bwa_single_%s" % (file_pair[0]["prefix"], run["genome"])
sample_data = [file_pair[0]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
control_sample_ids = file_pair[0]["control_sample_ids"]
file_data = [
[
{"file_name": "%s_align.sai" % (prefix,)}
],
[
{"file_name": "%s_align.sam" % (prefix,)},
{"file_name": "%s_align.quality" % (prefix,)}
],
[
{"file_name": "%s_align.bam" % (prefix,), "save_result": True, "result_type": "bam"}
]
]
outputs = self.construct_outputs(file_data, markers, {"align": markers}, prefix, sample_data, experiment_sample_ids, control_sample_ids, file_pair[0]["jobs"])
return markers, inputs, additional_inputs, outputs
def _bwa_paired(self, run, file_pair):
inputs, additional_inputs = self._bwa_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
return (markers, inputs, additional_inputs)
def _bwa_paired(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
Creates inputs, additional_inputs, and outputs for bwa alignment
jobs with paired end reads.
"""
markers = {"tool": "bwa", "read_end": "paired"}
inputs, additional_inputs = self._bwa_init(run["genome"])
inputs["download_1.fastq"] = file_pair[0]["file_name"]
inputs["download_2.fastq"] = file_pair[1]["file_name"]
prefix = "%s_%s_bwa_paired_%s" % (file_pair[0]["prefix"], file_pair[1]["accession"], run["genome"])
sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"] + file_pair[1]["experiment_sample_ids"]
control_sample_ids = file_pair[0]["control_sample_ids"] + file_pair[1]["control_sample_ids"]
file_data = [
[
{"file_name": "%s_align.sam" % (prefix,)},
{"file_name": "%s_align.quality" % (prefix,)}
],
[
{"file_name": "%s_align.bam" % (prefix,), "save_result": True, "result_type": "bam"}
]
]
outputs = self.construct_outputs(file_data, markers, {"align": markers}, prefix, sample_data, experiment_sample_ids, control_sample_ids, file_pair[0]["jobs"])
return markers, inputs, additional_inputs, outputs
def _bowtie2_single(self, run, file_pair):
inputs, additional_inputs = self._bwa_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
inputs["download_2.fastq"] = prev_results[1].full_name
return (markers, inputs, additional_inputs)
def _bowtie2_single(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
Creates inputs, additional_inputs, and outputs for bowtie2 alignment
jobs with single end reads. Only the first file in the file
pair will be processed.
jobs with single end reads. Only the first file in prev_results will
be processed.
"""
markers = {"tool": "bowtie2", "read_end": "single"}
inputs, additional_inputs = self._bowtie2_init(run["genome"])
inputs["download_1.fastq"] = file_pair[0]["file_name"]
prefix = "%s_bowtie2_single_%s" % (file_pair[0]["prefix"], run["genome"])
sample_data = [file_pair[0]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"]
control_sample_ids = file_pair[0]["control_sample_ids"]
file_data = [
[
{"file_name": "%s_align.sam" % (prefix,)},
{"file_name": "%s_align.quality" % (prefix,)}
],
[
{"file_name": "%s_align.bam" % (prefix,), "save_result": True, "result_type": "bam"}
]
]
outputs = self.construct_outputs(file_data, markers, {"align": markers}, prefix, sample_data, experiment_sample_ids, control_sample_ids, file_pair[0]["jobs"])
return markers, inputs, additional_inputs, outputs
def _bowtie2_paired(self, run, file_pair):
inputs, additional_inputs = self._bowtie2_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
return (markers, inputs, additional_inputs)
def _bowtie2_paired(self, run, result):
"""
:param run: The run to generate jobs for
:type run: dictionary
:param file_pair: The tuple of input files.
:type file_pair: tuple
:type run: :py:class:chipathlon.run.Run
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
Creates inputs, additional_inputs, and outputs for bowtie2 alignment
jobs with paired end reads.
"""
markers = {"tool": "bowtie2", "read_end": "paired"}
inputs, additional_inputs = self._bowtie2_init(run["genome"])
inputs["download_1.fastq"] = file_pair[0]["file_name"]
inputs["download_2.fastq"] = file_pair[1]["file_name"]
prefix = "%s_%s_bowtie2_paired_%s" % (file_pair[0]["prefix"], file_pair[1]["sample"]["accession"], run["genome"])
sample_data = [file_pair[0]["sample"], file_pair[1]["sample"]]
experiment_sample_ids = file_pair[0]["experiment_sample_ids"] + file_pair[1]["experiment_sample_ids"]
control_sample_ids = file_pair[0]["control_sample_ids"] + file_pair[1]["control_sample_ids"]
file_data = [
[
{"file_name": "%s_align.sam" % (prefix,)},
{"file_name": "%s_align.quality" % (prefix,)}
],
[
{"file_name": "%s_align.bam" % (prefix,), "save_result": True, "result_type": "bam"}
]
]
outputs = self.construct_outputs(file_data, markers, {"align": markers}, prefix, sample_data, experiment_sample_ids, control_sample_ids, file_pair[0]["jobs"])
return markers, inputs, additional_inputs, outputs
def _find_pairs(self, file_list):
"""
:param file_list: A list of fastq files and associated metadata.
inputs, additional_inputs = self._bowtie2_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
inputs["download_2.fastq"] = prev_results[1].full_name
return (markers, inputs, additional_inputs)
def _find_pairs(self, sample_list):
"""
:param file_list: A list of samples
:type file_list: list
Takes the list of downloaded fastq files, and associates paired end
@@ -201,28 +159,67 @@ class AlignGenerator(ModuleGenerator):
"""
align_data = []
find = {}
for fastq in file_list:
encode_meta = fastq["sample"]
if "paired_with" in encode_meta:
if encode_meta["@id"] in find:
align_data.append(
(fastq, find[encode_meta["@id"]]) if encode_meta["paired_end"] == 1
else (find[encode_meta["@id"]], fastq)
)
for sample in sample_list:
if "paired_with" in sample:
if sample["@id"] in find:
align_data.append([
sample if sample["paired_end"] == 1 else find[sample["@id"]],
find[sample["@id"]] if sample["paired_end"] == 1 else sample
])
else:
find[encode_meta["paired_with"]] = fastq
find[sample["paired_with"]] = sample
else:
align_data.append((fastq,))
align_data.append([sample])
return align_data
def parse_run(self, run_index):
def get_markers(self, run, num_samples):
return {"tool": run.genome.tool, "read_end": "paired" if num_samples == 2 else "single"}
def create_final_results(self, run):
"""
:param run_index: The index of the run in the yaml file.
:type run_index: int
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
"""
control_samples = run.get_samples("control")
signal_samples = run.get_samples("signal")
for paired_sample in self._find_pairs(control_samples):
markers = self.get_markers(run, len(paired_sample))
result = Result("align.bam", paired_sample, [], {"align": markers}, [self.workflow_jobs[job_name] for job_name in self.module.get_job_names(markers)], should_save=True)
run.add_result("align", result)
for paired_sample in self._find_pairs(signal_samples):
markers = self.get_markers(run, len(paired_sample))
result = Result("align.bam", [], paired_sample, {"align": markers}, [self.workflow_jobs[job_name] for job_name in self.module.get_job_names(markers)], should_save=True)
run.add_result("align", result)
Generate necessary params for a single run.
return run.get_results("align", "align.bam")
def find_prev_results(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
download_samples = run.get_results("download", "encode.fastq.gz")
prev_results = []
control_accessions = result.get_accessions("control")
signal_accessions = result.get_accessions("signal")
for prev_result in download_samples:
if (set(prev_result.get_accessions("control")).issubset(control_accessions) and
set(prev_result.get_accessions("signal")).issubset(signal_accessions)):
prev_results.append(prev_result)
return prev_results
def parse_result(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
run = self.run_data.runs[run_index]
for file_pair in self._find_pairs(self.run_data.get_output_files(run_index, "download", "fastq")):
markers, inputs, additional_inputs, outputs = self.generate_calls[run["align"]][len(file_pair)](run, file_pair)
yield markers, inputs, additional_inputs, outputs
prev_results = self.get_prev_results(run, result)
markers, inputs, additional_inputs = self.generate_calls[run.genome.tool][len(prev_results)](run, result)
results = self.create_results(run, result)
return markers, inputs, additional_inputs, self.get_outputs(results)
+ 7
3
@@ -52,7 +52,7 @@ class Genome(object):
return self.files.get("base_file")
def get_chrom_sizes(self):
return self.files.get("chrom_sizes")
return self.files.get("chrom.sizes")
def get_additional_files(self):
return self.files.get("additional_files")
@@ -60,15 +60,18 @@ class Genome(object):
def get_chr_fasta_files(self):
return self.files.get("chr_fasta")
def get_all_files(self):
return [self.get_base_file(), self.get_chrom_sizes()] + self.get_additional_files() + self.get_chr_fasta_files()
def _load_prefixes(self):
self.file_prefix = "genome_%s_%s" % (self.assembly, self.tool)
self.add_file_prefix = self.file_prefix if self.tool == "bowtie2" else self.file_prefix + os.path.splitext(self.base_file)[1]
bowtie2_path_prefix, base_ext = os.path.splitext(self.base_file)
self.base_ext = base_ext
self.base_ext = base_ext[1:]
self.path_prefix = bowtie2_path_prefix if self.tool == "bowtie2" else self.base_file
return
def _add_file(self, name, path, gen_file_type="base_file"):
def _add_file(self, name, path, gen_file_type):
"""
Adds a file to the internal files list. Files are indexed
by type. There should only be a single base_file / chrom.sizes
@@ -123,4 +126,5 @@ class Genome(object):
for f in files:
if f.startswith("chr"):
self._add_file("%s_%s" % (self.add_file_prefix, f), root + "/" + f, "chr_fasta")
break
return
Loading