Commit 220af4bf authored by aknecht2's avatar aknecht2
Browse files

Merge branch '35-workflow-job-parsing-update'

Heavily updated the yaml file parsing for two primary reasons:
1. Make it more understandable what's actually going on underneath.
2. Allow for optional arguments.
parents cb56d4fb 09ac0215
......@@ -65,6 +65,11 @@ executables = [
"chip-job-zcat-peak"
]
# Java needs to have -Xmx specified...
java_tools = [
"gem"
]
# Peak_type validation
peak_types = {
"spp": ["narrow", "broad"],
......@@ -136,33 +141,33 @@ resources = {
argument_types = {
"argument": ["string", "numeric"],
"file": ["file", "rawfile", "stdout", "stderr"],
"list": ["list"],
"list": ["file_list", "argument_list"],
"folder": ["folder", "rawfolder"]
}
# Defines information about arguments
argument_keys = {
"required": ["type", "changeable", "has_value"],
"optional": ["required", "default", "file_type", "path"]
"required": ["type", "changeable", "has_value", "required"],
"optional": ["default", "path", "file_type"]
}
# workflow_job keys
job_keys = {
"required": ["inputs", "additional_inputs", "outputs", "command", "arguments"] + resources.keys(),
"required": ["inputs", "outputs", "command", "arguments"] + resources.keys(),
"optional": []
}
job_inout_keys = {
"required": ["type"],
"optional": ["file_type"]
}
# param keys
param_keys = {
"required": [],
"optional": ["arguments"] + resources.keys()
}
file_list_keys = {
"required": ["name", "type"],
"optional": ["file_type"]
}
# workflow order
workflow = ["align", "remove_duplicates", "peak_calling"]
......
......@@ -6,9 +6,11 @@ import traceback
import os
import itertools
import time
import collections
import chipathlon.conf
from pprint import pprint
import hashlib
from chipathlon.utils import progress
class MongoDB(object):
......@@ -35,7 +37,7 @@ class MongoDB(object):
self.password = password
self.client = MongoClient(host)
self.db = self.client.chipseq
self.cache = {}
self.cache = collections.defaultdict(dict)
try:
self.db.authenticate(username, password, mechanism="SCRAM-SHA-1")
except:
......@@ -45,12 +47,12 @@ class MongoDB(object):
self.gfs = gridfs.GridFS(self.db)
return
def add_cache(self, function, key, data):
def add_cache(self, accession, file_type, data):
"""
:param function: The function name
:type function: str
:param key: The key to add the cache entry under.
:type key: Any hashable
:param accession: The accession of the file to store.
:type accession: str
:param file_type: The type of file to store.
:type file_type: str
:param data: The data to add to the cache.
:type data: Object
......@@ -60,23 +62,20 @@ class MongoDB(object):
peak calling tools. In these cases we don't want to request info
from the database multiple times for the same data.
"""
if function not in self.cache:
self.cache[function] = {}
self.cache[function][key] = data
self.cache[accession][file_type] = data
return
def get_cache(self, function, key):
def get_cache(self, accession, file_type):
"""
:param function: The function name
:type function: str
:param key: The key to get from the cache.
:type key: Any hashable
:param accession: The accession of the file to retrieve.
:type accession: str
:param file_type: The type of file to retrieve.
:type file_type: str
Gets a data item from the internal cache.
"""
if function in self.cache:
if key in self.cache[function]:
return self.cache[function][key]
if accession in self.cache:
return self.cache[accession].get(file_type)
return None
def delete_result(self, result, genome):
......@@ -331,7 +330,7 @@ class MongoDB(object):
valid = True
msg = ""
data = {}
check_cache = self.get_cache("get_sample", accession)
check_cache = self.get_cache(accession, file_type)
if check_cache is not None:
msg = "Retrieved data from cache."
data = check_cache
......@@ -342,7 +341,7 @@ class MongoDB(object):
})
if cursor.count() == 1:
data = cursor.next()
self.add_cache("get_sample", accession, data)
self.add_cache(accession, file_type, data)
else:
valid = False
msg = "Found %s files with accession: %s, file_type: %s. Should only be 1." % (
......@@ -352,6 +351,42 @@ class MongoDB(object):
)
return (valid, msg, data)
def clean_gfs(self):
"""
This function finds all files stored in gridfs that are not currently
referenced by any result file and removes them.
A clean database is a happy database.
"""
cursor = self.db.results.aggregate([
{
"$group": {
"_id": 1,
"valid_ids": {"$push": "$gridfs_id"}
}
}
])
# Doc contains all our valid ids
id_doc = cursor.next()
# Find all fs.files documents
gfs_cursor = self.db.fs.files.find({
"_id": {
"$nin": id_doc["valid_ids"]
}
})
# Iterate through file, delete fs.chunks then fs.files
total_files = gfs_cursor.count()
print "Found %s unused gridfs files. Preparing to delete...." % (total_files,)
for i, fs_file in enumerate(gfs_cursor):
progress(i, total_files)
self.db.fs.chunks.remove({
"files_id": fs_file["_id"]
})
self.db.fs.files.remove({
"_id": fs_file["_id"]
})
progress(total_files, total_files)
return
def get_samples(self, experiment_accession, file_type):
"""
:param experiment_accession: Accession number of the experiment to grab samples from.
......
......@@ -12,17 +12,28 @@ class AlignGenerator(ModuleGenerator):
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule`
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
super(AlignGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(AlignGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.generate_calls = {
"bwa": {
1: self._bwa_single,
......@@ -51,11 +62,10 @@ class AlignGenerator(ModuleGenerator):
"""
base_file = run.genome.get_base_file()
inputs = {"ref_genome": base_file["name"]}
additional_inputs = {}
for f in run.genome.get_additional_files():
ext = f["name"].split(".")[-1]
additional_inputs["ref_genome.%s" % (ext,)] = f["name"]
return (inputs, additional_inputs)
inputs["ref_genome.%s" % (ext,)] = f["name"]
return inputs
def _bowtie2_init(self, run):
"""
......@@ -72,12 +82,14 @@ class AlignGenerator(ModuleGenerator):
"""
base_file = run.genome.get_base_file()
additional_files = run.genome.get_additional_files()
inputs = {"ref_genome_prefix": run.genome.file_prefix}
additional_inputs = {"ref_genome": base_file["name"]}
inputs = {
"ref_genome_prefix": run.genome.file_prefix,
"ref_genome": base_file["name"]
}
for f in run.genome.get_additional_files():
ext = ".".join(f["name"].split(".")[-3 if "rev" in f["name"] else -2:])
additional_inputs["ref_genome.%s" % (ext,)] = f["name"]
return (inputs, additional_inputs)
inputs["ref_genome.%s" % (ext,)] = f["name"]
return inputs
def _bwa_single(self, run, result):
"""
......@@ -91,10 +103,10 @@ class AlignGenerator(ModuleGenerator):
be processed
"""
markers = {"tool": "bwa", "read_end": "single"}
inputs, additional_inputs = self._bwa_init(run)
inputs = self._bwa_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _bwa_paired(self, run, result):
"""
......@@ -107,11 +119,11 @@ class AlignGenerator(ModuleGenerator):
jobs with paired end reads.
"""
markers = {"tool": "bwa", "read_end": "paired"}
inputs, additional_inputs = self._bwa_init(run)
inputs = self._bwa_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
inputs["download_2.fastq"] = prev_results[1].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _bowtie2_single(self, run, result):
"""
......@@ -125,10 +137,10 @@ class AlignGenerator(ModuleGenerator):
be processed.
"""
markers = {"tool": "bowtie2", "read_end": "single"}
inputs, additional_inputs = self._bowtie2_init(run)
inputs = self._bowtie2_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _bowtie2_paired(self, run, result):
......@@ -142,11 +154,11 @@ class AlignGenerator(ModuleGenerator):
jobs with paired end reads.
"""
markers = {"tool": "bowtie2", "read_end": "paired"}
inputs, additional_inputs = self._bowtie2_init(run)
inputs = self._bowtie2_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
inputs["download_2.fastq"] = prev_results[1].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _find_pairs(self, sample_list):
"""
......@@ -236,6 +248,6 @@ class AlignGenerator(ModuleGenerator):
:type result: :py:class:`~chipathlon.result.Result`
"""
prev_results = self.get_prev_results(run, result)
markers, inputs, additional_inputs = self.generate_calls[run.genome.tool][len(prev_results)](run, result)
markers, inputs = self.generate_calls[run.genome.tool][len(prev_results)](run, result)
results = self.create_results(run, result)
return markers, inputs, additional_inputs, self.get_outputs(results)
return markers, inputs, self.get_outputs(results)
......@@ -12,17 +12,28 @@ class IdrGenerator(ModuleGenerator):
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule`
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
super(IdrGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(IdrGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.module_name = "idr"
self.result_dict = {}
self.output_files = {
......@@ -114,6 +125,5 @@ class IdrGenerator(ModuleGenerator):
"sorted_sample_1.bed": result_pair[0].full_name,
"sorted_sample_2.bed": result_pair[1].full_name
}
additional_inputs = {}
results = self.create_results(run, result)
return markers, inputs, additional_inputs, self.get_outputs(results)
return markers, inputs, self.get_outputs(results)
......@@ -18,11 +18,19 @@ class ModuleGenerator(object):
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
self.dax = dax
self.master_jobs = master_jobs
self.master_files = master_files
......@@ -30,20 +38,20 @@ class ModuleGenerator(object):
self.module = workflow_module
self.workflow_jobs = workflow_jobs
self.base_path = base_path
self.save_db = save_db
self.rewrite = rewrite
self.debug = debug
self.prev_results = {}
self.save_results = []
pprint.PrettyPrinter(indent=4)
return
def _print_parse(self, run, markers, inputs, additional_inputs, outputs):
def _print_parse(self, run, markers, inputs, outputs):
"""
:param markers: The splits to take in a given module.
:type markers: dict
:param inputs: The inputs required for a given module.
:type inputs: dict
:param additional_inputs: The additional_inputs required for a given module.
:type additional_inputs: dict
:param outputs: The outputs required for a given module.
:type outputs: dict
:param genome: The genome for the run
......@@ -51,12 +59,11 @@ class ModuleGenerator(object):
Prints the generated data from a single parse call in a pretty format.
"""
print "\n[%s]\n\tParsing Run: %s\n\tMarkers: %s\n\tInputs: %s\n\tAdditional: %s\n\tOutputs: %s" % (
print "\n[%s]\n\tParsing Run: %s\n\tMarkers: %s\n\tInputs: %s\n\tOutputs: %s" % (
self.__class__.__name__,
run,
markers,
inputs,
additional_inputs,
outputs
)
return
......@@ -75,16 +82,17 @@ class ModuleGenerator(object):
all_jobs = prev_results[0].all_jobs
all_markers = final_result.all_markers
results = []
# Iterate through the jobs from the workflow module
for job_dict in self.module.get_job_list(all_markers[self.module.name]):
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
all_jobs.append(self.workflow_jobs[job_name])
if job_info.get("outputs"):
for output_dict in job_info["outputs"]:
logical_name = output_dict.keys()[0]
output_info = output_dict[logical_name]
# We have to explicitly compare since None evalutes to False.
for logical_name, output_info in job_info["outputs"].iteritems():
# We have to explicitly compare since None evaluates to False.
if not output_info.get("final_result") == True:
# Create the intermediate result with all jobs &
# markers that have been run up to this point
result = Result(
logical_name,
final_result.control_samples,
......@@ -95,6 +103,9 @@ class ModuleGenerator(object):
prefix_join=output_info.get("prefix_join"),
name_template=output_info.get("name_template")
)
# Some results require additional prefixes i.e.
# peakranger produces two separate output files
# that we need to keep track of.
if final_result.get_meta("add_prefix") is not None:
result.add_to_prefix(final_result.get_meta("add_prefix"))
results.append(result)
......@@ -102,7 +113,8 @@ class ModuleGenerator(object):
else:
result = run.find_result(self.module.name, logical_name, final_result)
results.append(result)
if result.should_save:
# Should we save the results to the db?
if result.should_save and self.save_db:
self.save_results.append(result)
return results
......@@ -128,16 +140,32 @@ class ModuleGenerator(object):
Download the target result from gridfs.
"""
db_result = result.get_db_result(self.mdb, run.genome)
inputs = [
{"name": "host", "type": "string", "value": self.mdb.host},
{"name": "username", "type": "string", "value": self.mdb.username},
{"name": "password", "type": "string", "value": self.mdb.password},
{"name": "id", "type": "string", "value": db_result["gridfs_id"]}
]
outputs = [
{"name": result.full_name, "type": "file", "file": result.pegasus_file, "transfer": False}
]
download_job = self.workflow_jobs["download_from_gridfs"].create_job(inputs, [], outputs)
inputs = {
"host": {
"name": "host",
"value": self.mdb.host
},
"username": {
"name": "username",
"value": self.mdb.username
},
"password": {
"name": "password",
"value": self.mdb.password
},
"gfs_id": {
"name": "gfs_id",
"value": db_result["gridfs_id"]
}
}
outputs = {
"downloaded_result": {
"name": result.full_name,
"file": result.pegasus_file,
"transfer": False
}
}
download_job = self.workflow_jobs["download_from_gridfs"].create_job(inputs, outputs)
if download_job is not None:
self.master_files[result.full_name] = result.pegasus_file
self.dax.addJob(download_job)
......@@ -153,14 +181,24 @@ class ModuleGenerator(object):
"""
# Assuming we only call on a result file with 1 accession
sample = result.control_samples[0] if len(result.control_samples) > 0 else result.signal_samples[0]
inputs = [
{"name": "url", "type": "string", "value": sample["hcc_url"] if "hcc_url" in sample else sample["url"]},
{"name": "md5", "type": "string", "value": sample["md5sum"]}
]
outputs = [
{"name": result.full_name, "type": "file", "file": result.pegasus_file, "transfer": False}
]
download_job = self.workflow_jobs["download_from_encode"].create_job(inputs, [], outputs)
inputs = {
"url": {
"name": "url",
"value": sample["hcc_url"] if "hcc_url" in sample else sample["url"]
},
"md5": {
"name": "md5",
"value": sample["md5sum"]
}
}
outputs = {
"downloaded_file": {
"name": result.full_name,
"file": result.pegasus_file,
"transfer": False
}
}
download_job = self.workflow_jobs["download_from_encode"].create_job(inputs, outputs)
if download_job is not None:
self.master_files[result.full_name] = result.pegasus_file
self.dax.addJob(download_job)
......@@ -184,12 +222,10 @@ class ModuleGenerator(object):
self._download_from_gridfs(run, result)
return
def generate(self, run, rewrite=False):
def generate(self, run):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:`~chipathlon.run.Run`
:param rewrite: Whether or not to rewrite data even if it exists.
:type rewrite: bool
Generates actual workflow jobs for a particular run. The logic for
job generation is based on the fact that all output files are unique,
......@@ -210,14 +246,16 @@ class ModuleGenerator(object):
"""
final_results = self.create_final_results(run)
for result in final_results:
if rewrite or not result.exists_in_db(self.mdb, run.genome):
if self.rewrite or not result.exists_in_db(self.mdb, run.genome):
for prev_result in self.get_prev_results(run, result):
if prev_result.exists_in_encode() or (not rewrite and prev_result.exists_in_db(self.mdb, run.genome)):
if prev_result.exists_in_encode() or (not self.rewrite and prev_result.exists_in_db(self.mdb, run.genome)):
self.add_download_job(run, prev_result)
# We only want to add jobs for the very last result in a module
# Otherwise we will get duplicate jobs.
if result.last_result:
self.add_jobs(run, result)
else:
print "Skipping jobs for run: %s, result: %s" % (run, result)
return
def add_jobs(self, run, result):
......@@ -230,10 +268,10 @@ class ModuleGenerator(object):
This function is used to generate all jobs
necessary to create a particular result file.
"""
markers, inputs, additional_inputs, outputs = self.parse_result(run, result)
markers, inputs, outputs = self.parse_result(run, result)
if self.debug:
self._print_parse(run, markers, inputs, additional_inputs, outputs)
self.module.add_jobs(self.dax, self.master_jobs, self.master_files, markers, inputs, additional_inputs, outputs)
self._print_parse(run, markers, inputs, outputs)
self.module.add_jobs(self.dax, self.master_jobs, self.master_files, markers, inputs, outputs)
while len(self.save_results) > 0:
res = self.save_results.pop()
res.save_to_db(self.mdb, self.dax, self.master_files, self.master_jobs, run.genome, self.workflow_jobs["db_save_result"], self.base_path)
......@@ -248,7 +286,7 @@ class ModuleGenerator(object):
This function should return everything necessary for
actually creating jobs from the module i.e.
markers, inputs, additional_inputs, and outputs
markers, inputs, and outputs
markers -- Should be a dictionary that has key value pairs for the
current module splits that should be taken. For example, the align
......@@ -260,8 +298,6 @@ class ModuleGenerator(object):
'align.bam' as input, so the inputs dictionary should be
{"align.bam": "full_file_name.bam"}
additional_inputs -- Same as inputs, not directly used by the jobs.
outputs -- Same as inputs except defines the outputs created by the
jobs within the module.
"""
......
......@@ -13,17 +13,28 @@ class PeakCallGenerator(ModuleGenerator):
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule`
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool