Skip to content
Snippets Groups Projects
Commit a49f8b8f authored by aknecht2's avatar aknecht2
Browse files

More style updates and comment additions.

parent ea63700f
No related branches found
No related tags found
No related merge requests found
No preview for this file type
......@@ -29,7 +29,7 @@ class WorkflowModule(object):
the target module.
"""
def __init__(self, module_yaml, workflow_jobs, debug = False):
def __init__(self, module_yaml, workflow_jobs, debug=False):
self.err = ""
self.debug = debug
try:
......@@ -77,7 +77,7 @@ class WorkflowModule(object):
return self.workflow
return
def _add_workflow_keys(self, workflow = None, index = 0):
def _add_workflow_keys(self, workflow=None, index=0):
"""
:param workflow: The workflow to construct.
:type workflow: dict
......@@ -112,7 +112,7 @@ class WorkflowModule(object):
workflow[marker] = []
return
def _load_jobs(self, yaml_data = None, add_markers = {}):
def _load_jobs(self, yaml_data=None, add_markers={}):
"""
:param yaml_data: The workflow to load jobs into.
:type yaml_data: list
......@@ -147,7 +147,7 @@ class WorkflowModule(object):
return
def _load_markers(self, yaml_data = None):
def _load_markers(self, yaml_data=None):
"""
:param yaml_data: The yaml_data to load markers from.
:type yaml_data: list
......@@ -231,7 +231,30 @@ class WorkflowModule(object):
"""
valid, msg = self._check_params(master_files, markers, inputs, outputs)
if valid:
self._traverse_jobs(dax, master_jobs, master_files, markers, inputs, outputs)
job_list = self.get_job_list(markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
# outputs is a dict! Check the values not the keys.
# output files get created before a job is added, check against master_jobs instead
if any([output_file not in master_jobs for output_file in outputs.values()]):
job_inputs = self._setup_job_params(master_files, job_name, job_info, markers, "inputs", inputs, outputs)
job_outputs = self._setup_job_params(master_files, job_name, job_info, markers, "outputs", outputs, outputs)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_outputs)
if job is not None:
dax.addJob(job)
else:
print "JOB ERROR for '%s'.\n" % (job_dict,)
raise SystemExit(1)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for logical_name, param_dict in job_outputs.iteritems():
master_jobs[param_dict["name"]] = job
for logical_name, param_dict in job_inputs.iteritems():
if param_dict["name"] in master_jobs:
dax.depends(child=job, parent=master_jobs[param_dict["name"]])
else:
print msg
return
......@@ -277,7 +300,9 @@ class WorkflowModule(object):
for input_marker in markers:
if input_marker in self.markers:
if markers[input_marker] not in self.markers[input_marker]:
msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (input_marker, markers[input_marker], self.markers[marker])
msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (
input_marker, markers[input_marker], self.markers[marker]
)
valid = False
else:
msg += "Marker '%s' does not exist.\n" % (input_marker,)
......@@ -306,7 +331,9 @@ class WorkflowModule(object):
if param_dict["type"] in chipathlon.conf.argument_types["file"]:
if arg_params[param_name] not in master_files:
valid = False
msg += "Error loading '%s' jobs. File with name: '%s', value: '%s' does not exist in master_files.\n" % (self.name, param_name, arg_params[param_name])
msg += "Error loading '%s' jobs. File with name: '%s', value: '%s' does not exist in master_files.\n" % (
self.name, param_name, arg_params[param_name]
)
else:
valid = False
msg += "Error loading '%s' jobs. Required param '%s' for argument type '%s' not defined.\n" % (self.name, param_name, arg_type)
......@@ -342,49 +369,6 @@ class WorkflowModule(object):
})
return (inputs, outputs)
def _traverse_jobs(self, dax, master_jobs, master_files, markers, inputs, outputs):
"""
:param dax: The Pegasus dax object to add jobs to.
:type dax: Pegasus.DAX3.ADAG
:param master_jobs: Master job dictionary mapping job_names -> job objects.
:type master_jobs: dict
:param master_files: Master file dictionary mapping file_names -> file objects.
:type master_files: dict
:param markers: Input markers
:type markers: dict.
:param inputs: All required inputs for the given markers.
:type inputs: dict
:param outputs: All required outputs for the given markers.
:type outputs: dict
Walk through the jobs in the modules and add the jobs to the workflow.
"""
job_list = self.get_job_list(markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
# outputs is a dict! Check the values not the keys.
# output files get created before a job is added, check against master_jobs instead
if any([output_file not in master_jobs for output_file in outputs.values()]):
job_inputs = self._setup_job_params(master_files, job_name, job_info, markers, "inputs", inputs, outputs)
job_outputs = self._setup_job_params(master_files, job_name, job_info, markers, "outputs", outputs, outputs)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_outputs)
if job is not None:
dax.addJob(job)
else:
print "JOB ERROR for '%s'.\n" % (job_dict,)
raise SystemExit(1)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for logical_name, param_dict in job_outputs.iteritems():
master_jobs[param_dict["name"]] = job
for logical_name, param_dict in job_inputs.iteritems():
if param_dict["name"] in master_jobs:
dax.depends(child=job, parent=master_jobs[param_dict["name"]])
return
def get_all_final_results(self, markers):
"""
:param markers: The splits to take for the current workflow.
......@@ -426,9 +410,33 @@ class WorkflowModule(object):
:type outputs: list
This function sets up the required params for a job to run
successfully. Primarily, it converts the format from the dictionary
style passed into the module, to the list style expected for job
creation.
successfully. It converts all inputs from the format expected for the
module into the format expected for the individual jobs. The format
expected by jobs is a dictionary with keys as the parameter names:
.. code-block:: python
{
"file_param_name": {
"name": "FULL-FILE-NAME",
"file": PegasusFileObject,
"transfer": True/False
},
"arg_param_name": {
"name": "arg_param_name",
"value": ArgValue
},
"list_param_name": {
"name": "list_param_name",
"value": [
{
"name": "FULL-FILE-NAME"
"file": PegasusFileObject
},
...
]
}
}
"""
param_dict = {}
if job_info[param_type] is not None:
......@@ -439,6 +447,8 @@ class WorkflowModule(object):
# wj_param is the info contained in the workflow job yaml.
if wj_param["type"] in chipathlon.conf.argument_types["file"]:
if logical_name in outputs:
# This particular file is an output from a previous step!
# Load its information accordingly.
param_dict[param_name] = {
"name": outputs[logical_name],
"file": master_files[outputs[logical_name]],
......@@ -452,7 +462,6 @@ class WorkflowModule(object):
}
elif wj_param["type"] in chipathlon.conf.argument_types["list"]:
if wj_param["type"] == "file_list":
sub_list = []
param_dict[param_name] = {
"name": param_name,
"value": [{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment