Commit 7c07157c authored by aknecht2's avatar aknecht2
Browse files

Small adjustments to workflow & workflow_module for compatability.

parent 7ad5cb80
......@@ -267,15 +267,12 @@ class Workflow(object):
control_data = control_base
random.shuffle(control_data)
for pair in zip(experiment_data, control_data):
append_pair = False
for treatment in pair:
for f in treatment:
name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
if name not in self.files:
append_pair = True
self._add_file(name, "http://" + f["url"], "dummylocal")
if append_pair:
run["input_sets"].append(pair)
run["input_sets"].append(pair)
return
def _add_align(self):
......@@ -295,12 +292,11 @@ class Workflow(object):
prefix = "%s_%s_%s" % (run["experiment"], file_tuple[0]["accession"], file_tuple[1]["accession"])
input_files["download_1.fastq"] = "%s_%s.fastq.gz" % (run["experiment"], file_tuple[0]["accession"])
input_files["download_2.fastq"] = "%s_%s.fastq.gz" % (run["experiment"], file_tuple[1]["accession"])
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
else:
markers["read_end"] = "single"
prefix = "%s_%s" % (run["experiment"], file_tuple[0]["accession"])
input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
return
def _create_setup(self):
......
......@@ -188,24 +188,26 @@ class WorkflowModule(object):
job_info = job_dict[job_name]
job_inputs = []
job_additional = []
job_inputs = self._setup_param_list(master_files, job_info, "inputs", inputs, prefix, markers)
job_additional = self._setup_param_list(master_files, job_info, "additional_inputs", additional_files, prefix, markers)
job_outputs = self._setup_param_list(master_files, job_info, "outputs", {}, prefix, markers)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
else:
print "JOB ERROR for '%s'.\n" % (key,)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for file_dict in job_outputs:
master_jobs[file_dict["name"]] = job
master_jobs[prefix + "_" + self.name]
for file_dict in job_inputs:
if file_dict["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
# If all outputs have already been added, we don't add the job
if not any([self._get_full_name(prefix, markers, file_name) in master_files for file_name in job_info["outputs"]]):
job_inputs = self._setup_param_list(master_files, job_info, "inputs", inputs, prefix, markers)
job_additional = self._setup_param_list(master_files, job_info, "additional_inputs", additional_files, prefix, markers)
job_outputs = self._setup_param_list(master_files, job_info, "outputs", {}, prefix, markers)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
else:
print "JOB ERROR for '%s'.\n" % (key,)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for file_dict in job_outputs:
master_jobs[file_dict["name"]] = job
master_jobs[prefix + "_" + self.name]
for file_dict in job_inputs:
if file_dict["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return
def _check_files(self, master_files, prefix, markers, inputs, additional_files):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment