Commit d1367c3b authored by aknecht2's avatar aknecht2
Browse files

Create save_result function in workflow. Updated workflow_job & module to...

Create save_result function in workflow.  Updated workflow_job & module to provide additional necessary info to save database results.
parent c941a0b8
......@@ -289,19 +289,29 @@ class Workflow(object):
file & save job, and add them to the dax.
"""
meta_file_name = "%s_meta.yaml" % (prefix,)
meta_file_path = os.path.join(self.basepath, "input/db_meta", meta_file_name)
meta_info = markers
meta_info["result_type"] = result_type
meta_info["control_ids"] = control_ids
meta_info["experiment_ids"] = experiment_ids
for job_name in job_list:
job_arg_dict = self.workflow_jobs[job_name].get_db_arguments()
meta_info.update(job_arg_dict)
with open(meta_file_path, "w") as wh:
yaml.dump(meta_info, wh, default_flow_style=False)
self._add_file(meta_file_name, meta_file_path, "local")
job_inputs = [self.username, self.password, self.host, self.files[result_file_name], self.files[meta_file_name]]
self.workflow_jobs["db_save_result"].create_job(job_inputs, [], [])
if meta_file_name not in self.files:
meta_file_path = os.path.join(self.basepath, "input/db_meta", meta_file_name)
if self.debug:
print "[SAVE_RESULT] Result File = %s, Meta File = %s, Job list = %s" % (result_file_name, meta_file_name, job_list)
meta_info = markers
meta_info["result_type"] = result_type
meta_info["control_ids"] = control_ids
meta_info["experiment_ids"] = experiment_ids
for job_name in job_list:
job_arg_dict = self.workflow_jobs[job_name].get_db_arguments()
meta_info[job_name] = job_arg_dict
with open(meta_file_path, "w") as wh:
yaml.dump(meta_info, wh, default_flow_style=False)
self._add_file(meta_file_name, meta_file_path, "local")
job_inputs = [
{"name": "username", "type": "argument", "value": self.username},
{"name": "password", "type": "argument", "value": self.password},
{"name": "host", "type": "argument", "value": self.host},
{"name": result_file_name, "type": "file", "file": self.files[result_file_name]},
{"name": meta_file_name, "type": "file", "file": self.files[meta_file_name]}
]
save_job = self.workflow_jobs["db_save_result"].create_job(job_inputs, [], [])
self.dax.addJob(save_job)
return
def _add_download(self):
......@@ -347,7 +357,7 @@ class Workflow(object):
# there are to increase the number of transfer
# clusters in _create_sites
self.number_of_inputs += 1
run["exp_ids"].apend(f["dataset"].split("/")[2])
run["exp_ids"].append(f["dataset"].split("/")[2])
run["input_set"].append(pair)
return
......@@ -392,8 +402,8 @@ class Workflow(object):
# of prefixes since all files are tracked
# by markers.
run["prefix"]["align"].append(self.modules["align"]._get_full_prefix(prefix, markers))
added_jobs = self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
run["jobs"]=list(set(added_jobs) | set(run["jobs"]))
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
run["jobs"]=list(set(self.modules["align"].get_job_names(markers)) | set(run["jobs"]))
# Single end reads are always peak-called against single,
# Paired end reads are always peak-called against paired,
# So we don't need to add individual align markers
......@@ -411,8 +421,10 @@ class Workflow(object):
# Remove duplicates has everything it needs!
# Input files should be in master_files from
# the result of the align step
added_jobs = self.modules["remove_duplicates"].add_jobs(self.dax, self.jobs, self.files, prefix, {}, {}, {})
run["jobs"]=list(set(added_jobs) | set(run["jobs"]))
self.modules["remove_duplicates"].add_jobs(self.dax, self.jobs, self.files, prefix, {}, {}, {})
run["jobs"]=list(set(self.modules["remove_duplicates"].get_job_names({})) | set(run["jobs"]))
if run["peak"] == "macs2":
print run["jobs"]
# We want to save each "no_dups.bed" file, we do that with
# the db_save_result job. Remember, experiment data
# is ALWAYS first in the pair.
......@@ -433,6 +445,8 @@ class Workflow(object):
for run in self.run_data["runs"]:
inputs = {}
markers["tool"] = run["peak"]
if run["peak"] == "macs2":
print run["jobs"]
for pair in run["input_set"]:
# Now we use individual file_tuples to extract the correct
# prefix and add in as input_files
......@@ -445,20 +459,20 @@ class Workflow(object):
if run["peak"] == "macs2":
inputs["prefix"] = run["prefix"]["peak_call"]
run["markers"]["peak_call"] = markers
added_jobs = self.modules["peak_call"].add_jobs(self.dax, self.jobs, self.files, final_prefix, markers, inputs, {})
run["jobs"] += added_jobs
self.modules["peak_call"].add_jobs(self.dax, self.jobs, self.files, final_prefix, markers, inputs, {})
run["jobs"]=list(set(self.modules["peak_call"].get_job_names(markers)) | set(run["jobs"]))
# We want to save each "results_sorted.narrowPeak" file,
# we agian use the db_save_result job.
experiment_ids = [f["accession"] for f in pair[0]]
control_ids = [f["accession"] for f in pair[1]]
self._save_result(
"%s_results_sorted.narrowPeak" % (final_prefix,),
"%s_results_sorted.narrowPeak" % (run["prefix"]["peak_call"],),
"peak",
experiment_ids,
control_ids,
run["jobs"],
run["markers"],
final_prefix
run["prefix"]["peak_call"]
)
return
......
......@@ -135,7 +135,8 @@ class WorkflowJob(object):
# We only want to show changeable args to the end user
if arg_info["changeable"] and arg_info["has_value"]:
arguments[arg] = arg_info["default"]
arguments.update(self.params)
if self.params["arguments"] is not None:
arguments.update(self.params["arguments"])
return arguments
def create_job(self, inputs, additional_inputs, output_files):
......
......@@ -142,6 +142,13 @@ class WorkflowModule(object):
param_list.append({"name": param_name, "type": "argument"})
return (inputs, additional_files, output_files)
def get_job_names(self, markers):
job_list = self._get_data_list(self.workflow, markers)
job_names = []
for job_dict in job_list:
job_names.append(job_dict.keys()[0])
return job_names
def add_jobs(self, dax, master_jobs, master_files, prefix, markers, inputs, additional_files):
"""
dax -- The pegasus dax object from workflow
......@@ -155,8 +162,8 @@ class WorkflowModule(object):
valid = True
msg = ""
valid, msg, module_files = self._check_params(master_files, prefix, markers, inputs, additional_files)
added_jobs = self._traverse_jobs(dax, master_jobs, master_files, prefix, markers, inputs, additional_files)
return added_jobs
self._traverse_jobs(dax, master_jobs, master_files, prefix, markers, inputs, additional_files)
return
def _setup_param_list(self, master_files, job_info, param_type, param_dict, prefix, markers):
"""
......@@ -195,7 +202,6 @@ class WorkflowModule(object):
def _traverse_jobs(self, dax, master_jobs, master_files, prefix, markers, inputs, additional_files):
added_jobs = []
job_list = self._get_data_list(self.workflow, markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
......@@ -210,7 +216,6 @@ class WorkflowModule(object):
job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
added_jobs.append(job_name)
else:
print "JOB ERROR for '%s'.\n" % (job_dict,)
# NOW, WE add our job to the master_list, and check dependencies
......@@ -222,7 +227,7 @@ class WorkflowModule(object):
for file_dict in job_inputs:
if file_dict["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return added_jobs
return
def _check_params(self, master_files, prefix, markers, inputs, additional_files):
valid_params = True
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment