Commit b6279673 authored by aknecht2's avatar aknecht2
Browse files

Updated workflow module parsing to support new format. Argument types are now...

Updated workflow module parsing to support new format.  Argument types are now loaded from workflow_job data instead of being repeated within the module itself.
parent 30b1838a
......@@ -176,9 +176,9 @@ class WorkflowModule(object):
:type markers: dict
:param job_name: The name of the job to get params for.
:type job_name: str
:returns: A dictionary containing inputs, additional_inputs, and outputs.
:returns: A dictionary containing inputs and outputs.
Returns the inputs, additional_inputs, and outputs defined for the
Returns the inputs and outputs defined for the
specified job & marker combination from the module. The values returned
will be based on the names defined in the module template file, they
will not be real file names.
......@@ -202,7 +202,7 @@ class WorkflowModule(object):
job_names.append(job_dict.keys()[0])
return job_names
def add_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
def add_jobs(self, dax, master_jobs, master_files, markers, inputs, outputs):
"""
:param dax: The dax to add jobs to.
:type dax: Pegasus.DAX3.ADAG
......@@ -215,15 +215,12 @@ class WorkflowModule(object):
:param inputs: A dictionary mapping the logical file names defined in
the workflow module -> the full file name.
:type inputs: dict
:param additional_inputs: A dictionary mapping the logical file names
defined in the workflow module -> the full file name.
:type additional_inputs: dict
:param outputs: A dictionary mapping the logical file names defined in
the workflow module -> the full file name.
:returns: None
Adds all the jobs in the correct order for the current workflow.
In this case, the inputs, additional_inputs, and outputs need to be
In this case, the inputs and outputs need to be
passed in for all jobs in the workflow. If you consider a small
example of three jobs that get run back to back, with the output of
each getting piped into the input of the next. In this case, we only
......@@ -231,14 +228,14 @@ class WorkflowModule(object):
use the outputs of the previous jobs. We need to pass in three outputs
still for the outputs of each of the jobs.
"""
valid, msg = self._check_params(master_files, markers, inputs, additional_inputs, outputs)
valid, msg = self._check_params(master_files, markers, inputs, outputs)
if valid:
self._traverse_jobs(dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs)
self._traverse_jobs(dax, master_jobs, master_files, markers, inputs, outputs)
else:
print msg
return
def _check_params(self, master_files, markers, inputs, additional_inputs, outputs):
def _check_params(self, master_files, markers, inputs, outputs):
"""
:param master_files: Master file dictionary mapping file_name -> file_object.
:type master_files: dict
......@@ -246,8 +243,6 @@ class WorkflowModule(object):
:type markers: dict
:param inputs: All required inputs for the given markers.
:type inputs: dict
:param additional_inputs: All required additional_inputs for the given markers.
:type additional_inputs: dict
:param outputs: All required outputs for the given markers.
:type outputs: dict
......@@ -257,13 +252,12 @@ class WorkflowModule(object):
valid_params = True
valid_markers, msg = self._check_input_markers(markers)
if valid_markers:
required_inputs, required_additional_inputs, required_outputs = self._load_required_params(master_files, markers)
required_inputs, required_outputs = self._load_required_params(master_files, markers)
valid_inputs, input_msg = self._check_required_params(master_files, required_inputs, inputs, "inputs")
valid_additional, additional_msg = self._check_required_params(master_files, required_additional_inputs, additional_inputs, "additional_inputs")
valid_outputs, outputs_msg = self._check_required_params(master_files, required_outputs, outputs, "outputs")
if not valid_inputs or not valid_additional or not valid_outputs:
if not valid_inputs or not valid_outputs:
valid_params = False
msg = input_msg + additional_msg + outputs_msg
msg = input_msg + outputs_msg
else:
msg = "Params for module '%s' with markers: %s validated successfully.\n" % (self.name, markers)
else:
......@@ -289,7 +283,7 @@ class WorkflowModule(object):
valid = False
return (valid, msg)
def _check_required_params(self, master_files, required_param_list, arg_params, arg_type):
def _check_required_params(self, master_files, required_param_list, arg_params, arg_type = None):
"""
:param master_files: Master file dictionary mapping file_name -> file_object.
:type master_files: dict
......@@ -301,8 +295,7 @@ class WorkflowModule(object):
:type arg_type: str.
This function checks the required params against the passed ones. This
function should be called for each required param type: inputs,
additional_inputs, and outputs.
function should be called for both inputs and outputs.
"""
valid = True
msg = ""
......@@ -325,18 +318,17 @@ class WorkflowModule(object):
:param markers: Input markers.
:type markers: dict
This function loads all required parameters (inputs, additional_inputs,
and outputs) for the specified markers. It returns a tuple of three
elements corresponding to these required params.
This function loads all required parameters (inputs and outputs) for
the specified markers. It returns a tuple corresponding to these
required params.
"""
inputs = []
additional_inputs = []
outputs = []
job_list = self.get_job_list(markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
for param_type, param_list in zip(["inputs", "additional_inputs", "outputs"], [inputs, additional_inputs, outputs]):
for param_type, param_list in zip(["inputs", "outputs"], [inputs, outputs]):
if job_info[param_type] is not None:
for param_dict in job_info[param_type]:
param_name = param_dict.keys()[0]
......@@ -344,11 +336,14 @@ class WorkflowModule(object):
# Make sure param is not a duplicate of a previous value
# and that param has not already been added as an output.
if param_name not in [param["name"] for param in param_list] and param_name not in [param["name"] for param in outputs]:
param_list.append({"name": param_name, "type": param_values["type"]})
return (inputs, additional_inputs, outputs)
param_list.append({
"name": param_name,
"type": self.workflow_jobs[job_name].get_param_info(param_name)["type"]
})
return (inputs, outputs)
def _traverse_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
def _traverse_jobs(self, dax, master_jobs, master_files, markers, inputs, outputs):
"""
:param dax: The Pegasus dax object to add jobs to.
:type dax: Pegasus.DAX3.ADAG
......@@ -360,8 +355,6 @@ class WorkflowModule(object):
:type markers: dict.
:param inputs: All required inputs for the given markers.
:type inputs: dict
:param additional_inputs: All required additional_inputs for the given markers.
:type additional_inputs: dict
:param outputs: All required outputs for the given markers.
:type outputs: dict
......@@ -374,10 +367,9 @@ class WorkflowModule(object):
# outputs is a dict! Check the values not the keys.
# output files get created before a job is added, check against master_jobs instead
if any([output_file not in master_jobs for output_file in outputs.values()]):
job_inputs = self._setup_job_params(master_files, job_info, markers, "inputs", inputs, outputs)
job_additional = self._setup_job_params(master_files, job_info, markers, "additional_inputs", additional_inputs, outputs)
job_outputs = self._setup_job_params(master_files, job_info, markers, "outputs", outputs, outputs)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
job_inputs = self._setup_job_params(master_files, job_name, job_info, markers, "inputs", inputs, outputs)
job_outputs = self._setup_job_params(master_files, job_name, job_info, markers, "outputs", outputs, outputs)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_outputs)
if job is not None:
dax.addJob(job)
else:
......@@ -419,56 +411,62 @@ class WorkflowModule(object):
})
return final_results
def _setup_job_params(self, master_files, job_info, markers, param_type, arg_params, outputs):
def _setup_job_params(self, master_files, job_name, job_info, markers, param_type, arg_params, outputs):
"""
:param master_files: Master file dictionary mapping file_names -> file objects.
:type master_files: dict
:param job_info: The job information (inputs, additional_inputs, and outputs)
:param job_name: The name of the job for accessing from self.workflow_jobs
:type job_name: str
:param job_info: The job information (inputs and outputs)
:type job_info: dict
:param markers: Input markers.
:type markers: dict
:param param_type: The param type to setup should be inputs, additional_inputs, or outputs.
:param param_type: The param type to setup should be inputs, or outputs.
:type param_type: str
:param arg_params: A dictionary mapping param_name -> value.
:type arg_params: dict
:param outputs: All the outptus for the entire workflow module.
:type outputs: list
This function sets up the required params for a job to run
successfully. Primarily, it converts the format from the dictionary
style passed into the module, to the list style expected for job
creation.
"""
param_list = []
param_dict = {}
if job_info[param_type] is not None:
for job_dict in job_info[param_type]:
param_name = job_dict.keys()[0]
param_info = job_dict[param_name]
if param_info["type"] in chipathlon.conf.argument_types["file"]:
wj_param = self.workflow_jobs[job_name].get_param_info(param_name)
# param_info is the info contained in the module whereas
# wj_param is the info contained in the workflow job yaml.
if wj_param["type"] in chipathlon.conf.argument_types["file"]:
if param_name in outputs:
param_list.append({
param_dict[param_name] = {
"name": outputs[param_name],
"file": master_files[outputs[param_name]],
"type": param_info["type"],
"transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
})
}
else:
param_list.append({
param_dict[param_name] = {
"name": arg_params[param_name],
"file": master_files[arg_params[param_name]],
"type": param_info["type"]
})
elif param_info["type"] in chipathlon.conf.argument_types["list"]:
sub_list = []
for val in arg_params[param_name]:
sub_list.append({
"name": val,
"file": master_files[val],
"type": "file"
})
param_list.append({"name": arg_params[param_name], "type": "list", "value": sub_list})
"transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
}
elif wj_param["type"] in chipathlon.conf.argument_types["list"]:
if wj_param["type"] == "file_list":
sub_list = []
param_dict[param_name] = {
"name": arg_params[param_name],
"value": [{
"name": val,
"file": master_files[val]
} for val in arg_params[param_name]]
}
else:
param_list.append({
param_dict[param_name] = {
"name": param_name,
"value": arg_params[param_name],
"type": param_info["type"]
})
return param_list
"value": arg_params[param_name]
}
return param_dict
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment