Commit 01ba72d3 authored by aknecht2's avatar aknecht2
Browse files

Added debug variable to workflow_module. Fixed intra module file dependency...

Added debug variable to workflow_module.  Fixed intra module file dependency issues, by passing output files to setup_job_params.
parent 6ec977c1
......@@ -16,10 +16,11 @@ from Pegasus.DAX3 import *
class WorkflowModule(object):
def __init__(self, module_yaml, workflow_jobs):
def __init__(self, module_yaml, workflow_jobs, debug = False):
# module_yaml is the module yaml file
# workflow_jobs is a dictionary that points from name -> actual job class
self.err = ""
self.debug = debug
try:
with open(module_yaml, "r") as rh:
self.data = yaml.load(rh)
......@@ -320,9 +321,9 @@ class WorkflowModule(object):
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
if any([output_file not in master_files for output_file in outputs]):
job_inputs = self._setup_job_params(master_files, job_info, markers, "inputs", inputs)
job_additional = self._setup_job_params(master_files, job_info, markers, "additional_inputs", additional_inputs)
job_outputs = self._setup_job_params(master_files, job_info, markers, "outputs", outputs)
job_inputs = self._setup_job_params(master_files, job_info, markers, "inputs", inputs, outputs)
job_additional = self._setup_job_params(master_files, job_info, markers, "additional_inputs", additional_inputs, outputs)
job_outputs = self._setup_job_params(master_files, job_info, markers, "outputs", outputs, outputs)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
......@@ -339,7 +340,7 @@ class WorkflowModule(object):
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return
def _setup_job_params(self, master_files, job_info, markers, param_type, arg_params):
def _setup_job_params(self, master_files, job_info, markers, param_type, arg_params, outputs):
"""
:param master_files: Master file dictionary mapping file_names -> file objects.
:type master_files: dict
......@@ -363,11 +364,18 @@ class WorkflowModule(object):
param_name = job_dict.keys()[0]
param_info = job_dict[param_name]
if param_info["type"] in chipathlon.conf.argument_types["file"]:
param_list.append({
"name": arg_params[param_name],
"file": master_files[arg_params[param_name]],
"type": param_info["type"]
})
if param_name in outputs:
param_list.append({
"name": outputs[param_name],
"file": master_files[outputs[param_name]],
"type": param_info["type"]
})
else:
param_list.append({
"name": arg_params[param_name],
"file": master_files[arg_params[param_name]],
"type": param_info["type"]
})
else:
param_list.append({
"name": param_name,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment