Commit a5da5bef authored by aknecht2's avatar aknecht2
Browse files

Adjust the input format of the align.yaml module. Fixed up minor issues...

Adjust the input format of the align.yaml module.  Fixed up minor issues across workflow job, module, and generator to get jobs correctly generating.
parent 5637ed06
......@@ -73,13 +73,11 @@ class ModuleGenerator(object):
all_markers = final_result.all_markers
results = []
for job_dict in self.module.get_job_list(all_markers[self.module.name]):
print job_dict
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
all_jobs.append(self.workflow_jobs[job_name])
if job_info.get("outputs"):
for wj_name, output_info in job_info["outputs"].iteritems():
logical_name = output_info["name"]
for logical_name, output_info in job_info["outputs"].iteritems():
# We have to explicitly compare since None evaluates to False.
if not output_info.get("final_result") == True:
result = Result(
......@@ -99,7 +97,6 @@ class ModuleGenerator(object):
else:
result = run.find_result(self.module.name, logical_name, final_result)
results.append(result)
print result
if result.should_save:
self.save_results.append(result)
......
......@@ -3,123 +3,124 @@ align:
- single[read_end]:
- bwa_align_single:
inputs:
base_genome_file:
name: ref_genome
fastq1:
name: download_1.fastq
genome.fna.amb:
name: ref_genome.amb
genome.fna.ann:
name: ref_genome.ann
genome.fna.bwt:
name: ref_genome.bwt
genome.fna.pac:
name: ref_genome.pac
genome.fna.sa:
name: ref_genome.sa
ref_genome:
param_name: base_genome_file
download_1.fastq:
param_name: fastq1
ref_genome.amb:
param_name: genome.fna.amb
ref_genome.ann:
param_name: genome.fna.ann
ref_genome.bwt:
param_name: genome.fna.bwt
ref_genome.pac:
param_name: genome.fna.pac
ref_genome.sa:
param_name: genome.fna.sa
outputs:
output_sai:
name: align.sai
align.sai:
param_name: output_sai
- bwa_sai_to_sam:
inputs:
base_genome_file:
name: ref_genome
input_sai:
name: align.sai
genome.fna.amb:
name: ref_genome.amb
genome.fna.ann:
name: ref_genome.ann
genome.fna.bwt:
name: ref_genome.bwt
genome.fna.pac:
name: ref_genome.pac
genome.fna.sa:
name: ref_genome.sa
ref_genome:
param_name: base_genome_file
align.sai:
param_name: input_sai
ref_genome.amb:
param_name: genome.fna.amb
ref_genome.ann:
param_name: genome.fna.ann
ref_genome.bwt:
param_name: genome.fna.bwt
ref_genome.pac:
param_name: genome.fna.pac
ref_genome.sa:
param_name: genome.fna.sa
outputs:
output_sam:
name: align.sam
align.sam:
param_name: output_sam
- paired[read_end]:
- bwa_align_paired:
inputs:
base_genome_file:
name: ref_genome
fastq1:
name: download_1.fastq
fastq2:
name: download_2.fastq
genome.fna.amb:
name: ref_genome.amb
genome.fna.ann:
name: ref_genome.ann
genome.fna.bwt:
name: ref_genome.bwt
genome.fna.pac:
name: ref_genome.pac
genome.fna.sa:
name: ref_genome.sa
ref_genome:
param_name: base_genome_file
download_1.fastq:
param_name: fastq1
download_2.fastq:
param_name: fastq2
ref_genome.amb:
param_name: genome.fna.amb
ref_genome.ann:
param_name: genome.fna.ann
ref_genome.bwt:
param_name: genome.fna.bwt
ref_genome.pac:
param_name: genome.fna.pac
ref_genome.sa:
param_name: genome.fna.sa
outputs:
output_sai: align.sai
align.sai:
param_name: output_sai
- bowtie2[tool]:
- single[read_end]:
- bowtie2_align_single:
inputs:
genome_prefix:
name: ref_genome_prefix
fastq1:
name: download_1.fastq
base_genome_file:
name: ref_genome
genome.1.bt2:
name: ref_genome.1.bt2
genome.2.bt2:
name: ref_genome.2.bt2
genome.3.bt2:
name: ref_genome.3.bt2
genome.4.bt2:
name: ref_genome.4.bt2
genome.rev.1.bt2:
name: ref_genome.rev.1.bt2
genome.rev.2.bt2:
name: ref_genome.rev.2.bt2
ref_genome_prefix:
param_name: genome_prefix
download_1.fastq:
param_name: fastq1
ref_genome:
param_name: base_genome_file
ref_genome.1.bt2:
param_name: genome.1.bt2
ref_genome.2.bt2:
param_name: genome.2.bt2
ref_genome.3.bt2:
param_name: genome.3.bt2
ref_genome.4.bt2:
param_name: genome.4.bt2
ref_genome.rev.1.bt2:
param_name: genome.rev.1.bt2
ref_genome.rev.2.bt2:
param_name: genome.rev.2.bt2
outputs:
output_sam:
name: align.sam
fastq_quality:
name: align.quality
align.sam:
param_name: output_sam
align.quality:
param_name: fastq_quality
- paired[read_end]:
- bowtie2_align_paired:
inputs:
genome_prefix:
name: ref_genome_prefix
fastq1:
name: download_1.fastq
fastq2:
name: download_2.fastq
base_genome_file:
name: ref_genome
genome.1.bt2:
name: ref_genome.1.bt2
genome.2.bt2:
name: ref_genome.2.bt2
genome.3.bt2:
name: ref_genome.3.bt2
genome.4.bt2:
name: ref_genome.4.bt2
genome.rev.1.bt2:
name: ref_genome.rev.1.bt2
genome.rev.2.bt2:
name: ref_genome.rev.2.bt2
ref_genome_prefix:
param_name: genome_prefix
download_1.fastq:
param_name: fastq1
download_2.fastq:
param_name: fastq2
ref_genome:
param_name: base_genome_file
ref_genome.1.bt2:
param_name: genome.1.bt2
ref_genome.2.bt2:
param_name: genome.2.bt2
ref_genome.3.bt2:
param_name: genome.3.bt2
ref_genome.4.bt2:
param_name: genome.4.bt2
ref_genome.rev.1.bt2:
param_name: genome.rev.1.bt2
ref_genome.rev.2.bt2:
param_name: genome.rev.2.bt2
outputs:
output_sam:
name: align.sam
fastq_quality:
name: align.quality
align.sam:
param_name: output_sam
align.quality:
param_name: fastq_quality
- samtools_sam_to_bam:
inputs:
align_sam:
name: align.sam
align.sam:
param_name: align_sam
outputs:
align_bam:
name: align.bam
align.bam:
param_name: align_bam
final_result: true
......@@ -108,8 +108,8 @@ class WorkflowJob(object):
"""
self._check_keys(self.job_data, chipathlon.conf.job_keys["required"], chipathlon.conf.job_keys["optional"], "Base Yaml Error")
if self.is_valid():
self.valid_inputs = self.job_data["inputs"].keys()
self.valid_outputs = self.job_data["outputs"].keys()
self.valid_inputs = self.job_data["inputs"].keys() if self.job_data["inputs"] is not None else []
self.valid_outputs = self.job_data["outputs"].keys() if self.job_data["outputs"] is not None else []
return
def _validate_files(self):
......@@ -340,7 +340,8 @@ class WorkflowJob(object):
# root key is actual argument name
arg_name = arg_dict.keys()[0]
arg_info = arg_dict[arg_name]
if arg_name == "$%s" % (param_name,) or "$%s" % (param_name,) in arg_info["default"]:
check_val = "$%s" % (param_name,)
if arg_info["type"] in chipathlon.conf.argument_types["file"] and (arg_name == check_val or ("default" in arg_info and check_val in arg_info["default"])):
return arg_info["required"] and not arg_info["changeable"]
return False
......@@ -389,10 +390,10 @@ class WorkflowJob(object):
self.errors.append("Job creation error %s: File %s specified for param %s is not of \
type %s. Should match one of these extensions: %s." % (
self,
param,
passed_param_info["name"],
param_name,
base_param_info["file_type"],
chipathlon.conf.file_extensions[param_info["file_type"]]
chipathlon.conf.file_extensions[base_param_info["file_type"]]
)
)
else:
......@@ -461,9 +462,9 @@ class WorkflowJob(object):
job.uses(output_info["file"], link=Link.OUTPUT, transfer=output_info["transfer"])
# Redirect stdout / stderr
if wj_param["type"] == "stdout":
job.setStdout(output_file["name"])
job.setStdout(output_info["name"])
elif wj_param["type"] == "stderr":
job.setStderr(output_file["name"])
job.setStderr(output_info["name"])
arg_list = self._create_arg_list(inputs, outputs)
job.addArguments(*self._create_arg_list(inputs, outputs))
self._add_job_resources(job)
......@@ -509,7 +510,7 @@ class WorkflowJob(object):
"""
arg_value = self._get_arg_value(arg_name, arg_info)
add_value = arg_value
if arg_value.startswith("$"):
if arg_info["type"] != "numeric" and arg_value.startswith("$"):
arg_value = arg_value[1:]
if arg_info["type"] == "rawfile":
add_value = self.raw_files[os.path.basename(arg_value)]["file"]
......@@ -587,4 +588,8 @@ class WorkflowJob(object):
Returns the information associated with a particular input/output.
"""
return self.job_data["inputs"].get(param_name, self.job_data["outputs"].get(param_name))
for param_type in ["inputs", "outputs"]:
if self.job_data[param_type] is not None:
if param_name in self.job_data[param_type]:
return self.job_data[param_type][param_name]
return None
......@@ -330,19 +330,17 @@ class WorkflowModule(object):
job_info = job_dict[job_name]
for param_type, param_list in zip(["inputs", "outputs"], [inputs, outputs]):
if job_info[param_type] is not None:
for param_dict in job_info[param_type]:
param_name = param_dict.keys()[0]
param_values = param_dict[param_name]
for logical_name, param_values in job_info[param_type].iteritems():
param_name = param_values["param_name"]
# Make sure param is not a duplicate of a previous value
# and that param has not already been added as an output.
if param_name not in [param["name"] for param in param_list] and param_name not in [param["name"] for param in outputs]:
if logical_name not in [param["name"] for param in param_list] and logical_name not in [param["name"] for param in outputs]:
param_list.append({
"name": param_name,
"name": logical_name,
"type": self.workflow_jobs[job_name].get_param_info(param_name)["type"]
})
return (inputs, outputs)
def _traverse_jobs(self, dax, master_jobs, master_files, markers, inputs, outputs):
"""
:param dax: The Pegasus dax object to add jobs to.
......@@ -379,11 +377,11 @@ class WorkflowModule(object):
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for file_dict in job_outputs:
master_jobs[file_dict["name"]] = job
for file_dict in job_inputs:
if file_dict["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
for logical_name, param_dict in job_outputs.iteritems():
master_jobs[param_dict["name"]] = job
for logical_name, param_dict in job_inputs.iteritems():
if param_dict["name"] in master_jobs:
dax.depends(child=job, parent=master_jobs[param_dict["name"]])
return
def get_all_final_results(self, markers):
......@@ -435,38 +433,37 @@ class WorkflowModule(object):
"""
param_dict = {}
if job_info[param_type] is not None:
for job_dict in job_info[param_type]:
param_name = job_dict.keys()[0]
param_info = job_dict[param_name]
for logical_name, param_info in job_info[param_type].iteritems():
param_name = param_info["param_name"]
wj_param = self.workflow_jobs[job_name].get_param_info(param_name)
# param_info is the info contained in the module whereas
# wj_param is the info contained in the workflow job yaml.
if wj_param["type"] in chipathlon.conf.argument_types["file"]:
if param_name in outputs:
if logical_name in outputs:
param_dict[param_name] = {
"name": outputs[param_name],
"file": master_files[outputs[param_name]],
"name": outputs[logical_name],
"file": master_files[outputs[logical_name]],
"transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
}
else:
param_dict[param_name] = {
"name": arg_params[param_name],
"file": master_files[arg_params[param_name]],
"name": arg_params[logical_name],
"file": master_files[arg_params[logical_name]],
"transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
}
elif wj_param["type"] in chipathlon.conf.argument_types["list"]:
if wj_param["type"] == "file_list":
sub_list = []
param_dict[param_name] = {
"name": arg_params[param_name],
"name": arg_params[logical_name],
"value": [{
"name": val,
"file": master_files[val]
} for val in arg_params[param_name]]
} for val in arg_params[logical_name]]
}
else:
param_dict[param_name] = {
"name": param_name,
"value": arg_params[param_name]
"value": arg_params[logical_name]
}
return param_dict
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment