Commit 9a4900f1 authored by aknecht2's avatar aknecht2
Browse files

Merge branch '17-idr' into 'master'

Resolve "IDR"

Closes #17 & #23

See merge request !21
parents 8dbbdf44 399afe1f
import string
# Module directory
job_modules = "jobs/modules/"
......@@ -61,7 +63,7 @@ file_extensions = {
"ccat_conf": ["txt", "conf"],
"log": ["log"]
}
file_extensions["any"] = [extension for ext_list in file_extensions.values() for extension in ext_list]
file_extensions["any"] = list(string.ascii_lowercase)
# list of resources that can be specified per job (step) in
# the workflow and corresponding Pegasus profile info
......
......@@ -39,20 +39,22 @@ class MongoDB(object):
self.gfs = gridfs.GridFS(self.db)
return
def delete_result(self, result_id):
def delete_result(self, result, genome):
"""
:param result_id: ID of result to delete
:type result_id: ObjectId
:param result: The result to delete
:type result: :py:class:~chipathlon.result.Result
:param genome: The genome to find information from.
:type genome: :py:meth:~chipathlon.genome.Genome
Deletes a result and it's corresponding gridfs entry.
"""
result_id = self.get_reuslt_id(result, genome)
cursor = self.db.results.find({
"_id": result_id
})
if cursor.count() == 1:
result = cursor.next()
self.gfs.delete(result["gridfs_id"])
self.db[result["result_type"]].delete_many({"result_id": result["_id"]})
self.db.results.delete_one({"_id": result["_id"]})
else:
print "result_id %s doesn't exist." % (result_id,)
......@@ -98,6 +100,22 @@ class MongoDB(object):
print "Error checking result [%s]: %s" % (file_name, e)
return False
def get_result_id(self, result, genome):
"""
:param result: The result to check.
:type result: :py:meth:~chipathlon.result.Result
:param genome: The genome to find information from.
:type genome: :py:meth:~chipathlon.genome.Genome
Get the id of a result.
"""
try:
cursor = self.db.results.find(self._get_result_query(result, genome))
if cursor.count() == 1:
return cursor._id
except pymongo.errors.OperationFailure as e:
print "Error getting result id [%s]: %s" % (file_name, e)
return None
def get_result(self, result, genome):
"""
......
from chipathlon.module_generator import ModuleGenerator
from module_generator import ModuleGenerator
from chipathlon.result import Result
class AlignGenerator(ModuleGenerator):
......@@ -185,12 +185,28 @@ class AlignGenerator(ModuleGenerator):
for paired_sample in self._find_pairs(control_samples):
markers = self.get_markers(run, len(paired_sample))
result = Result("align.bam", paired_sample, [], {"align": markers}, [self.workflow_jobs[job_name] for job_name in self.module.get_job_names(markers)], should_save=True)
result = Result(
"align.bam",
paired_sample,
[],
{"align": markers},
[self.workflow_jobs[job_name] for job_name in self.module.get_job_names(markers)],
should_save=True,
last_result=True
)
run.add_result("align", result)
for paired_sample in self._find_pairs(signal_samples):
markers = self.get_markers(run, len(paired_sample))
result = Result("align.bam", [], paired_sample, {"align": markers}, [self.workflow_jobs[job_name] for job_name in self.module.get_job_names(markers)], should_save=True)
result = Result(
"align.bam",
[],
paired_sample,
{"align": markers},
[self.workflow_jobs[job_name] for job_name in self.module.get_job_names(markers)],
should_save=True,
last_result=True
)
run.add_result("align", result)
return run.get_results("align", "align.bam")
......
from chipathlon.module_generator import ModuleGenerator
from module_generator import ModuleGenerator
class DownloadGenerator(ModuleGenerator):
......
from module_generator import ModuleGenerator
from chipathlon.result import Result
class IdrGenerator(ModuleGenerator):
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
"""
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:chipathlon.db.MongoDB
:param workflow_module: The actual module being used.
:type workflow_module: chipathlon.workflow_module.WorkflowModule
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
super(IdrGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
self.module_name = "idr"
self.result_dict = {}
self.output_files = {
"peakranger": ["region_sorted.bed", "summit_sorted.bed"],
"ccat": ["region_sorted.bed", "peak_sorted.bed"],
"gem": ["results_GEM_sorted.bed", "results_GPS_sorted.bed"],
"spp": ["results_sorted.bed"],
"macs2": ["results_sorted.bed"]
}
if debug:
print "[LOADING GENERATOR] IdrGenerator"
return
def _make_idr_pairs(self, run, result_list):
potential_files = []
for idr_accession in run.idr:
for result in result_list:
if idr_accession in result.get_accessions("signal"):
potential_files.append(result)
break
return potential_files
def create_final_results(self, run):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
"""
module_jobs = [self.workflow_jobs[job_name] for job_name in self.module.get_job_names({})]
peak_results = self._find_prev_results(run)
for result_dict in self._find_prev_results(run):
idr_pair = self._make_idr_pairs(run, result_dict["results"])
markers = dict({"idr": {}}, **idr_pair[0].all_markers)
prev_result_jobs = list(set(idr_pair[0].all_jobs).union(idr_pair[1].all_jobs))
result = Result(
"sorted_idr.bed",
idr_pair[0].control_samples + idr_pair[1].control_samples,
idr_pair[0].signal_samples + idr_pair[1].signal_samples,
markers,
prev_result_jobs + module_jobs,
should_save=True,
last_result=True
)
result.add_to_prefix("_" + "_".join(result_dict["file_name"].split("_")[:-1]))
result.add_meta("prev_result_name", result_dict["file_name"])
result.add_meta("add_prefix", "_" + "_".join(result_dict["file_name"].split("_")[:-1]))
run.add_result("idr", result)
return run.get_results("idr", "sorted_idr.bed")
def _find_prev_results(self, run):
return [{
"results": run.get_results("peak_call", output_file),
"file_name": output_file
} for output_file in self.output_files[run.peak]]
def find_prev_results(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
prev_result_name = result.get_meta("prev_result_name")
for result_dict in self._find_prev_results(run):
if result_dict["file_name"] == prev_result_name:
return self._make_idr_pairs(run, result_dict["results"])
return []
def parse_result(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
"""
result_pair = self.get_prev_results(run, result)
markers = {}
inputs = {
"sorted_sample_1.bed": result_pair[0].full_name,
"sorted_sample_2.bed": result_pair[1].full_name
}
additional_inputs = {}
results = self.create_results(run, result)
return markers, inputs, additional_inputs, self.get_outputs(results)
......@@ -32,6 +32,7 @@ class ModuleGenerator(object):
self.base_path = base_path
self.debug = debug
self.prev_results = {}
self.save_results = []
pprint.PrettyPrinter(indent=4)
return
......@@ -74,25 +75,35 @@ class ModuleGenerator(object):
all_jobs = prev_results[0].all_jobs
all_markers = final_result.all_markers
results = []
for job_dict in self.module.get_job_list(all_markers[self.module.name])[:-1]:
for job_dict in self.module.get_job_list(all_markers[self.module.name]):
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
all_jobs.append(self.workflow_jobs[job_name])
for output_info in job_info["outputs"]:
logical_name = output_info.keys()[0]
result = Result(
logical_name,
final_result.control_samples,
final_result.signal_samples,
all_markers,
all_jobs,
should_save=output_info[logical_name]["save_result"] if "save_result" in output_info[logical_name] else False,
prefix_join=output_info[logical_name].get("prefix_join"),
name_template=output_info[logical_name].get("name_template")
)
results.append(result)
run.add_result(self.module.name, result)
results.append(final_result)
for output_dict in job_info["outputs"]:
logical_name = output_dict.keys()[0]
output_info = output_dict[logical_name]
# We have to explicitly compare since None evalutes to False.
if not output_info.get("final_result") == True:
result = Result(
logical_name,
final_result.control_samples,
final_result.signal_samples,
all_markers,
all_jobs,
should_save=output_info.get("save_result"),
prefix_join=output_info.get("prefix_join"),
name_template=output_info.get("name_template")
)
if final_result.get_meta("add_prefix") is not None:
result.add_to_prefix(final_result.get_meta("add_prefix"))
results.append(result)
run.add_result(self.module.name, result)
else:
result = run.find_result(self.module.name, logical_name, final_result)
results.append(result)
if result.should_save:
self.save_results.append(result)
return results
def get_outputs(self, results):
......@@ -123,7 +134,7 @@ class ModuleGenerator(object):
{"name": "id", "type": "string", "value": db_result["gridfs_id"]}
]
outputs = [
{"name": result.full_name, "type": "file", "file": result.pegasus_file}
{"name": result.full_name, "type": "file", "file": result.pegasus_file, "transfer": False}
]
download_job = self.workflow_jobs["download_from_gridfs"].create_job(inputs, [], outputs)
if download_job is not None:
......@@ -146,7 +157,7 @@ class ModuleGenerator(object):
{"name": "md5", "type": "string", "value": sample["md5sum"]}
]
outputs = [
{"name": result.full_name, "type": "file", "file": result.pegasus_file}
{"name": result.full_name, "type": "file", "file": result.pegasus_file, "transfer": False}
]
download_job = self.workflow_jobs["download_from_encode"].create_job(inputs, [], outputs)
if download_job is not None:
......@@ -199,7 +210,10 @@ class ModuleGenerator(object):
for prev_result in self.get_prev_results(run, result):
if prev_result.exists_in_encode() or (not rewrite and prev_result.exists_in_db(self.mdb, run.genome)):
self.add_download_job(run, prev_result)
self.add_jobs(run, result)
# We only want to add jobs for the very last result in a module
# Otherwise we will get duplicate jobs.
if result.last_result:
self.add_jobs(run, result)
return
def add_jobs(self, run, result):
......@@ -216,8 +230,9 @@ class ModuleGenerator(object):
if self.debug:
self._print_parse(run, markers, inputs, additional_inputs, outputs)
self.module.add_jobs(self.dax, self.master_jobs, self.master_files, markers, inputs, additional_inputs, outputs)
if result.should_save:
result.save_to_db(self.mdb, self.dax, self.master_files, self.master_jobs, run.genome, self.workflow_jobs["db_save_result"], self.base_path)
while len(self.save_results) > 0:
res = self.save_results.pop()
res.save_to_db(self.mdb, self.dax, self.master_files, self.master_jobs, run.genome, self.workflow_jobs["db_save_result"], self.base_path)
return
def parse_result(self, run, result):
......
from chipathlon.module_generator import ModuleGenerator
from module_generator import ModuleGenerator
from chipathlon.result import Result
import random
......@@ -217,28 +217,31 @@ class PeakCallGenerator(ModuleGenerator):
"""
remove_duplicates_results = run.get_results("remove_duplicates", "no_dups_chr.bed")
module_markers = {"peak_call": self.get_markers(run)}
module_jobs = [self.workflow_jobs[job_name] for job_name in self.module.get_job_names(module_markers["peak_call"])]
final_module_outputs = self.module.get_final_outputs(self.get_markers(run))
if len(final_module_outputs) == 1:
final_result_name = final_module_outputs[0]
else:
final_result_name = "results_sorted.bed"
for paired_result in self._make_call_pairs(run, remove_duplicates_results):
markers = dict(paired_result[0].all_markers, **module_markers)
prev_result_jobs = list(set(paired_result[0].all_jobs).union(paired_result[1].all_jobs))
result = Result(
final_result_name,
paired_result[0].control_samples + paired_result[1].control_samples,
paired_result[0].signal_samples + paired_result[1].signal_samples,
markers,
prev_result_jobs + module_jobs,
should_save=True
)
run.add_result("peak_call", result)
self.call_pairs[result.full_name] = paired_result
all_result_names = []
final_results = self.module.get_all_final_results(self.get_markers(run))
for i, final_result in enumerate(final_results):
final_result_name = final_result["file_name"]
if final_result_name not in all_result_names:
all_result_names.append(final_result_name)
module_jobs = [self.workflow_jobs[job_name] for job_name in final_result["job_names"]]
return run.get_results("peak_call", final_result_name)
for paired_result in self._make_call_pairs(run, remove_duplicates_results):
markers = dict(paired_result[0].all_markers, **module_markers)
prev_result_jobs = list(set(paired_result[0].all_jobs).union(paired_result[1].all_jobs))
result = Result(
final_result_name,
paired_result[0].control_samples + paired_result[1].control_samples,
paired_result[0].signal_samples + paired_result[1].signal_samples,
markers,
prev_result_jobs + module_jobs,
should_save=True,
last_result=(i == len(final_results) - 1)
)
run.add_result("peak_call", result)
self.call_pairs[result.full_name] = paired_result
return [result for name in all_result_names for result in run.get_results("peak_call", name)]
def find_prev_results(self, run, result):
"""
......
from chipathlon.module_generator import ModuleGenerator
from module_generator import ModuleGenerator
from chipathlon.result import Result
class RemoveDuplicatesGenerator(ModuleGenerator):
......@@ -43,7 +43,8 @@ class RemoveDuplicatesGenerator(ModuleGenerator):
align_result.signal_samples,
markers,
align_result.all_jobs + module_jobs,
should_save=True
should_save=True,
last_result=True
)
run.add_result("remove_duplicates", result)
return run.get_results("remove_duplicates", "no_dups_chr.bed")
......
......@@ -131,3 +131,4 @@ align:
outputs:
- align.bam:
type: file
final_result: true
idr:
- idr:
inputs:
- sorted_sample_1.bed:
type: file
- sorted_sample_2.bed:
type: file
additional_inputs: null
outputs:
- idr.bed:
type: file
- sort_awk_sort_peaks:
inputs:
- idr.bed:
type: file
additional_inputs: null
outputs:
- sorted_idr.bed:
type: file
final_result: true
......@@ -10,7 +10,7 @@ peak_call:
type: string
additional_inputs: null
outputs:
- details.txt:
- details:
type: file
- region.bed:
type: file
......@@ -26,14 +26,16 @@ peak_call:
outputs:
- region_sorted.bed:
type: file
final_result: true
- peakranger_format_bed:
inputs:
- summit.bed:
type: file
additional_inputs: null
outputs:
- results_sorted.bed:
- summit_sorted.bed:
type: file
final_result: true
- ccat[tool]:
- ccat_callpeak:
inputs:
......@@ -69,15 +71,16 @@ peak_call:
outputs:
- region_sorted.bed:
type: file
save_result: true
final_result: true
- ccat_format_bed:
inputs:
- significant.peak:
type: file
additional_inputs: null
outputs:
- results_sorted.bed:
- peak_sorted.bed:
type: file
final_result: true
- gem[tool]:
- gem_callpeak:
inputs:
......@@ -111,6 +114,7 @@ peak_call:
outputs:
- results_GEM_sorted.bed:
type: file
final_result: true
- sort_awk_sort_peaks:
inputs:
- GPS_events.narrowPeak:
......@@ -119,6 +123,7 @@ peak_call:
outputs:
- results_GPS_sorted.bed:
type: file
final_result: true
- spp[tool]:
- cp:
inputs:
......@@ -158,6 +163,7 @@ peak_call:
outputs:
- results_sorted.bed:
type: file
final_result: true
- macs2[tool]:
- macs2_callpeak:
inputs:
......@@ -171,7 +177,7 @@ peak_call:
outputs:
- peaks.narrowPeak:
type: file
- results.xls:
- peaks.xls:
type: file
- summits.bed:
type: file
......@@ -183,3 +189,4 @@ peak_call:
outputs:
- results_sorted.bed:
type: file
final_result: true
......@@ -49,3 +49,4 @@ remove_duplicates:
outputs:
- no_dups_chr.bed:
type: file
final_result: true
idr:
inputs:
- name: sample_1
type: file
file_type: bed
- name: sample_2
type: file
file_type: bed
additional_inputs: null
outputs:
- name: idr_bed
type: file
file_type: bed
command: idr
arguments:
- "--samples":
type: list
changeable: false
required: true
has_value: true
default:
- $inputs.0
- $inputs.1
- "--rank":
type: numeric
changeable: true
required: true
has_value: true
default: 5
- "--input-file-type":
type: string
changeable: false
required: true
has_value: true
default: bed
- "--output-file":
type: file
changeable: false
required: true
has_value: true
default: $outputs.0
walltime: 2000
memory: 16000
cores: 1
nodes: 1
#!/bin/bash
. /util/opt/lmod/lmod/init/profile
# idr uses python 3, and will break if
module unload python
unset PYTHONPATH
module load idr
idr "$@"
......@@ -10,7 +10,7 @@ class Result(object):
run on the result file up to this point.
"""
def __init__(self, logical_name, control_samples, signal_samples, all_markers, all_jobs, should_save=False, prefix_join=None, name_template=None):
def __init__(self, logical_name, control_samples, signal_samples, all_markers, all_jobs, should_save=False, prefix_join=None, name_template=None, last_result=False):
"""
:param logical_name: The unique name of the file as presented in the module yaml
:type logical_name: string
......@@ -28,6 +28,8 @@ class Result(object):
:type prefix_join str
:param name_template: A template to load the correct name of the result
:type name_template: str
:param last_result: A boolean to determine if result is the last result in a module.
:type last_result: boolean
The result class is for managing all intermediate output files.
It also helps manage checking if a result already exists for the
......@@ -44,13 +46,17 @@ class Result(object):
self.control_samples = control_samples
self.signal_samples = signal_samples
self.all_jobs = all_jobs
self.should_save = should_save
self.should_save = should_save if should_save else False
self.name_template = name_template
self.prefix_join = prefix_join
self.last_result = last_result
self.meta = {}
self._load_prefix()
self._load_full_name(name_template, prefix_join)