Commit c941a0b8 authored by aknecht2's avatar aknecht2
Browse files

Added db_save_results yaml job, and script. Updated workflow information...

Added db_save_results yaml job, and script.  Updated workflow information tracking to create correct meta data files for saving information to the database.  Conf adjusted to support yaml files, and arbitrary result files.  workflow_job can now return a dictionary of all arguments used.
parent db446d7d
......@@ -48,7 +48,9 @@ file_extensions = {
"qc": ["qc"],
"pdf": ["pdf"],
"ccscore": ["ccscore"],
"xls": ["xls"]
"xls": ["xls"],
"yaml": ["yaml"],
"result": ["bed", "narrowPeak", "broadPeak", "tagAlign"],
}
# list of resources that can be specified per job (step) in
......
......@@ -35,7 +35,6 @@ class MongoDB(object):
print "result_id %s doesn't exist." % (result_id,)
return
def create_result(self, output_file, control_ids, experiment_ids, result_type, additional_data = {}, gfs_attributes = {}):
# Make sure output_file exists
if os.path.isfile(output_file):
......
......@@ -57,7 +57,7 @@ peak_call:
type: file
- sort_awk_sort_peaks:
inputs:
- results.narrowPeak:
- peaks.narrowPeak:
type: file
additional_inputs: null
outputs:
......
db_save_result:
inputs:
- username:
type: argument
- password:
type: argument
- host:
type: argument
- result:
type: file
- yaml:
type: file
additional_inputs: null
outputs: null
command: db_save_result.py
arguments:
- "-u":
changeable: false
required: true
has_value: true
default: $inputs.0
- "-p":
changeable: false
required: true
has_value: true
default: $inputs.1
- "-h":
changeable: false
required: true
has_value: true
default: $inputs.2
- "-f":
changeable: false
required: true
has_value: true
default: $inputs.3
- "-m":
changeable: false
required: true
has_value: true
default: $inputs.4
walltime: 2000
memory: 2000
cores: 1
......@@ -59,6 +59,11 @@ r_spp_nodups:
required: false
has_value: true
default: 0.01
- "-p=":
changeable: true
required: false
has_value: true
default: 8
walltime: 2000
memory: 8000
cores: 1
cores: 8
import chipathlon.db
import argparse
import yaml
import os
parser = argparse.ArgumentParser(description="Insert a bed file into the database.")
parser.add_argument("-p", "--password", dest="password", required=True, help="Database user password.")
parser.add_argument("-u", "--username", dest="username", required=True, help="Database user.")
parser.add_argument("-h", "--host", dest="host", required=True, help="Database host.")
parser.add_argument("-f", "--file", dest="file", required=True, help="Path to result file.")
parser.add_argument("-c", "--controls", dest="control_ids", required=True, nargs="+", help="List of control ids.")
parser.add_argument("-e", "--experiments", dest="experiment_ids", required=True, nargs="+", help="List of experiment/signal ids.")
parser.add_argument("-a", "--additional", dest="additional")
parser.add_argument("-t", "--type", dest="type", required=True, help="Type of result file to save [bed, peak]")
parser.add_argument("-m", "--meta", dest="meta", required=True, help="Path to meta yaml file.")
args = parser.parse_args()
mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
if args.type() == "bed":
mdb.save_bed(args.f, args.c, args.e, {})
elif args.type() == "peak":
mdb.save_peak(args.f, args.c, args.e, {})
if os.path.isfile(args.file) and os.path.isfile(args.meta):
mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
with open(args.meta, "r") as rh:
meta = yaml.load(rh)
if meta["result_type"] == "bed":
mdb.save_bed(args.file, meta["control_ids"], meta["experiment_ids"], meta)
elif meta["result_type"] == "peak":
mdb.save_peak(args.file, meta["control_ids"], meta["experiment_ids"], meta)
......@@ -19,10 +19,14 @@ from Pegasus.DAX3 import *
class Workflow(object):
def __init__(self, jobhome, run_file, param_file, config_file, host, username, password, debug=False):
self.username = username
self.host = host
self.password = password
# Initialize db connection
self.mdb = chipathlon.db.MongoDB(host, username, password)
# Jobname info & err
self.jobhome = os.path.abspath(jobhome)
self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
self.err = ""
# debug mode, print out additional information
......@@ -57,12 +61,12 @@ class Workflow(object):
self._load_modules()
self._load_runs()
# All required information is loaded, start queuing jobs
self._create_setup()
self._add_download()
self._add_align()
self._add_remove_duplicates()
self._add_peak_calling()
# Create pegasus important stuff
self._create_setup()
self._add_notify()
self._create_replica()
self._create_sites()
......@@ -114,18 +118,11 @@ class Workflow(object):
self.dax.addExecutable(self.executables[cmd])
if self.debug:
print "[LOAD_EXECUTABLE] %s" % (cmd,)
# Overwrite pegasus::transfer to request more walltime
"""
if self.config["profile"]["env"]["PEGASUS_HOME"]:
self.executables["pegasus::transfer"] = Executable(name = "pegasus::transfer", os=os_type, arch=arch)
self.executables["pegasus::transfer"].addPFN(PFN("file://%s" % (os.path.join(self.config["profile"]["env"]["PEGASUS_HOME"], "bin", "pegasus-transfer"),), "local"))
self.executables["pegasus::transfer"].profile("globus", "maxwalltime", 180)
self.dax.addExecutable(self.executables["pegasus::transfer"])
"""
self._raise()
return
def _load_workflow_jobs(self):
# Load each yaml_job as an actual object
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
for f in files:
with open(os.path.join(root, f), "r") as rh:
......@@ -138,12 +135,13 @@ class Workflow(object):
else:
self.err += yj.err
elif self.debug:
print "[WARNING] Skipping param file %s, corresponding executable %s bit found." % (f,)
print "[WARNING] Skipping param file %s, corresponding executable %s not found." % (f,)
break
self._raise()
return
def _load_modules(self):
# Load each yaml_module as an actual object
for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
for f in files:
mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs)
......@@ -156,6 +154,7 @@ class Workflow(object):
return
def _load_runs(self):
# Load run data & get samples
with open(self.run_file, "r") as rh:
try:
self.run_data = yaml.load(rh)
......@@ -275,6 +274,36 @@ class Workflow(object):
return_data.append((f,))
return return_data
def _save_result(self, result_file_name, result_type, experiment_ids, control_ids, job_list, markers, prefix):
"""
Saves a results file provided:
result_file_name -- the result file name in the master file list
result_type -- the type of result to insert
control_ids -- A list of control sample ids
experiment_ids -- A list of experiment sample ids
job_list -- The jobs that have been run on this file.
markers -- The markers that have been used for this file.
prefix -- A prefix to write the meta file with.
This function will generate the correct meta-data yaml
file & save job, and add them to the dax.
"""
meta_file_name = "%s_meta.yaml" % (prefix,)
meta_file_path = os.path.join(self.basepath, "input/db_meta", meta_file_name)
meta_info = markers
meta_info["result_type"] = result_type
meta_info["control_ids"] = control_ids
meta_info["experiment_ids"] = experiment_ids
for job_name in job_list:
job_arg_dict = self.workflow_jobs[job_name].get_db_arguments()
meta_info.update(job_arg_dict)
with open(meta_file_path, "w") as wh:
yaml.dump(meta_info, wh, default_flow_style=False)
self._add_file(meta_file_name, meta_file_path, "local")
job_inputs = [self.username, self.password, self.host, self.files[result_file_name], self.files[meta_file_name]]
self.workflow_jobs["db_save_result"].create_job(job_inputs, [], [])
return
def _add_download(self):
# Remember, experiment always paired with randomly selected control,
# For alignment doesn't really matter, but we set up the groundwork
......@@ -282,9 +311,11 @@ class Workflow(object):
# equal number of control and experiment files
self.number_of_inputs = 0
for run in self.run_data["runs"]:
run["input_sets"] = []
run["input_set"] = []
experiment_data = self._make_set(run["data"]["experiment"])
control_base = self._make_set(run["data"]["control"])
# Randomly sample control_data to match number
# of experiment samples
if len(experiment_data) < len(control_base):
control_data = [random.choice(control_base) for i in range(0, len(experiment_data))]
elif len(experiment_data) > len(control_base):
......@@ -293,9 +324,13 @@ class Workflow(object):
control_data = control_base
random.shuffle(control_data)
for pair in zip(experiment_data, control_data):
# Maintain experiment_ids for later use
# run["experiment"] only corresponds to
# correct signal id, not control id.
run["exp_ids"] = []
for treatment in pair:
for f in treatment:
name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
name = "%s_%s.fastq.gz" % (f["dataset"].split("/")[2], f["accession"])
if name not in self.files:
# USING DOWNLOAD SCRIPT
output_file = File(name)
......@@ -312,12 +347,19 @@ class Workflow(object):
# there are to increase the number of transfer
# clusters in _create_sites
self.number_of_inputs += 1
run["input_sets"].append(pair)
run["exp_ids"].apend(f["dataset"].split("/")[2])
run["input_set"].append(pair)
return
def _add_align(self):
markers = {}
for run in self.run_data["runs"]:
# Set up prefix for later use in jobs
run["prefix"] = {}
# Set up markers for later use in jobs
run["markers"] = {}
# Set up names of jobs used
run["jobs"] = []
input_files = {}
additional_files = {}
markers["tool"] = run["align"]
......@@ -328,39 +370,62 @@ class Workflow(object):
elif markers["tool"] == "bowtie2":
input_files["ref_genome_prefix"] = gen_prefix
additional_files["ref_genome"] = self.run_data["genomes"][run["align"]]["grch38p6_files"]["base_file"]
# Add additional reference genome files
for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
additional_files["ref_genome." + ext] = self.run_data["genomes"][run["align"]]["grch38p6_files"]["additional_files"][ext]
for pair in run["input_sets"]:
run["prefix"] = []
for file_tuple in pair:
# For (experiment_data, control_data) in the current run
for pair in run["input_set"]:
run["prefix"]["align"] = []
# For index, (file_1, file_2) in the current data set
for i, file_tuple in enumerate(pair):
# If paired end read
if len(file_tuple) == 2:
markers["read_end"] = "paired"
prefix = "%s_%s_%s" % (run["experiment"], file_tuple[0]["accession"], file_tuple[1]["accession"])
input_files["download_1.fastq"] = "%s_%s.fastq.gz" % (run["experiment"], file_tuple[0]["accession"])
input_files["download_2.fastq"] = "%s_%s.fastq.gz" % (run["experiment"], file_tuple[1]["accession"])
prefix = "%s_%s_%s" % (run["exp_ids"][i], file_tuple[0]["accession"], file_tuple[1]["accession"])
input_files["download_1.fastq"] = "%s_%s.fastq.gz" % (run["exp_ids"][i], file_tuple[0]["accession"])
input_files["download_2.fastq"] = "%s_%s.fastq.gz" % (run["exp_ids"][i], file_tuple[1]["accession"])
else:
markers["read_end"] = "single"
prefix = "%s_%s" % (run["experiment"], file_tuple[0]["accession"])
prefix = "%s_%s" % (run["exp_ids"][i], file_tuple[0]["accession"])
input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
# For future tools, we need to keep track
# of prefixes since all files are tracked
# by markers.
run["prefix"].append(self.modules["align"]._get_full_prefix(prefix, markers))
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
run["prefix"]["align"].append(self.modules["align"]._get_full_prefix(prefix, markers))
added_jobs = self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
run["jobs"]=list(set(added_jobs) | set(run["jobs"]))
# Single end reads are always peak-called against single,
# Paired end reads are always peak-called against paired,
# So we don't need to add individual align markers
run["markers"]["align"] = markers
return
def _add_remove_duplicates(self):
for run in self.run_data["runs"]:
for pair in run["input_sets"]:
for pair in run["input_set"]:
for i, file_tuple in enumerate(pair):
# Load our prefix from the previously saved
# version. Remove duplicates has no markers, so it won't
# update the prefix.
prefix = run["prefix"][i]
# update the prefix, or markers.
prefix = run["prefix"]["align"][i]
# Remove duplicates has everything it needs!
# Input files should be in master_files from
# the result of the align step
self.modules["remove_duplicates"].add_jobs(self.dax, self.jobs, self.files, prefix, {}, {}, {})
added_jobs = self.modules["remove_duplicates"].add_jobs(self.dax, self.jobs, self.files, prefix, {}, {}, {})
run["jobs"]=list(set(added_jobs) | set(run["jobs"]))
# We want to save each "no_dups.bed" file, we do that with
# the db_save_result job. Remember, experiment data
# is ALWAYS first in the pair.
sample_ids = [f["accession"] for f in file_tuple]
self._save_result(
"%s_no_dups.bed" % (prefix,),
"bed",
sample_ids if i == 0 else [], # Experiment ids
sample_ids if i == 1 else [], # Control ids
run["jobs"],
run["markers"],
prefix
)
return
def _add_peak_calling(self):
......@@ -368,19 +433,33 @@ class Workflow(object):
for run in self.run_data["runs"]:
inputs = {}
markers["tool"] = run["peak"]
for pair in run["input_sets"]:
for pair in run["input_set"]:
# Now we use individual file_tuples to extract the correct
# prefix and add in as input_files
# remove_duplicates returns prefix_no_dups.bam
# REMEMBER: experiment is always first
final_prefix = run["prefix"][0] + "_" + run["prefix"][1]
inputs["exp.bed"] = run["prefix"][0] + "_no_dups.bed"
inputs["control.bed"] = run["prefix"][1] + "_no_dups.bed"
if run["peak"] == "spp":
pass
elif run["peak"] == "macs2":
inputs["prefix"] = self.modules["peak_call"]._get_full_prefix(final_prefix, markers)
self.modules["peak_call"].add_jobs(self.dax, self.jobs, self.files, final_prefix, markers, inputs, {})
final_prefix = run["prefix"]["align"][0] + "_" + run["prefix"]["align"][1]
inputs["exp.bed"] = run["prefix"]["align"][0] + "_no_dups.bed"
inputs["control.bed"] = run["prefix"]["align"][1] + "_no_dups.bed"
run["prefix"]["peak_call"] = self.modules["peak_call"]._get_full_prefix(final_prefix, markers)
if run["peak"] == "macs2":
inputs["prefix"] = run["prefix"]["peak_call"]
run["markers"]["peak_call"] = markers
added_jobs = self.modules["peak_call"].add_jobs(self.dax, self.jobs, self.files, final_prefix, markers, inputs, {})
run["jobs"] += added_jobs
# We want to save each "results_sorted.narrowPeak" file,
# we agian use the db_save_result job.
experiment_ids = [f["accession"] for f in pair[0]]
control_ids = [f["accession"] for f in pair[1]]
self._save_result(
"%s_results_sorted.narrowPeak" % (final_prefix,),
"peak",
experiment_ids,
control_ids,
run["jobs"],
run["markers"],
final_prefix
)
return
def _create_setup(self):
......@@ -388,10 +467,9 @@ class Workflow(object):
Creates the base structure for job submission. Everything is contained
within a folder based on the current timestamp.
"""
self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
if not os.path.exists(self.basepath):
os.makedirs(self.basepath)
for folder in ["input", "output"]:
for folder in ["input", "output", "input/db_meta"]:
if not os.path.exists(os.path.join(self.basepath, folder)):
os.makedirs(os.path.join(self.basepath, folder))
return
......
......@@ -122,6 +122,22 @@ class WorkflowJob(object):
valid_params = False
return valid_params
def get_db_arguments(self):
"""
Returns a dictionary of arguments to be inserted into
the database.
"""
arguments = {}
for arg_dict in self.base[self.jobname]["arguments"]:
# root key is actual argument name
arg = arg_dict.keys()[0]
arg_info = arg_dict[arg]
# We only want to show changeable args to the end user
if arg_info["changeable"] and arg_info["has_value"]:
arguments[arg] = arg_info["default"]
arguments.update(self.params)
return arguments
def create_job(self, inputs, additional_inputs, output_files):
"""
- inputs should be a list of dictionaries:
......
......@@ -155,8 +155,8 @@ class WorkflowModule(object):
valid = True
msg = ""
valid, msg, module_files = self._check_params(master_files, prefix, markers, inputs, additional_files)
self._traverse_jobs(dax, master_jobs, master_files, prefix, markers, inputs, additional_files)
return
added_jobs = self._traverse_jobs(dax, master_jobs, master_files, prefix, markers, inputs, additional_files)
return added_jobs
def _setup_param_list(self, master_files, job_info, param_type, param_dict, prefix, markers):
"""
......@@ -195,6 +195,7 @@ class WorkflowModule(object):
def _traverse_jobs(self, dax, master_jobs, master_files, prefix, markers, inputs, additional_files):
added_jobs = []
job_list = self._get_data_list(self.workflow, markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
......@@ -209,6 +210,7 @@ class WorkflowModule(object):
job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
added_jobs.append(job_name)
else:
print "JOB ERROR for '%s'.\n" % (job_dict,)
# NOW, WE add our job to the master_list, and check dependencies
......@@ -220,7 +222,7 @@ class WorkflowModule(object):
for file_dict in job_inputs:
if file_dict["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return
return added_jobs
def _check_params(self, master_files, prefix, markers, inputs, additional_files):
valid_params = True
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment