Commit 6cc26ddc authored by aknecht2's avatar aknecht2
Browse files

Moved the constructor auto-doc to the correct place. Added auto-doc for...

Moved the constructor auto-doc to the correct place.  Added auto-doc for missing functions, and removed unused methods.
parent f5740173
......@@ -15,10 +15,16 @@ from Pegasus.DAX3 import *
class WorkflowModule(object):
"""
:param module_yaml: The yaml module definition file.
:type module_yaml: str
:param workflow_jobs: A dictionary mapping job name -> workflow job class.
:type workflow_jobs: dict
:param debug: A flag to print additional info.
:type debug: bool
"""
def __init__(self, module_yaml, workflow_jobs, debug = False):
# module_yaml is the module yaml file
# workflow_jobs is a dictionary that points from name -> actual job class
self.err = ""
self.debug = debug
try:
......@@ -52,7 +58,6 @@ class WorkflowModule(object):
This function returns the list of jobs that will be run provided
the markers. If markers is not specified, it simply returns the
entire workflow, otherwise, it returns the appropriate jobs.
"""
if markers:
job_list = self.workflow
......@@ -159,6 +164,17 @@ class WorkflowModule(object):
return
def get_job_params(self, markers, job_name):
"""
:param markers: Input markers.
:type markers: dict
:param job_name: The name of the job to get params for.
:type job_name: str
Returns the inputs, additional_inputs, and outputs defined for the
specified job & marker combination from the module. The values returned
will be based on the names defined in the module template file, they
will not be real file names.
"""
job_list = self.get_job_list(markers)
for job_dict in job_list:
if job_name == job_dict.keys()[0]:
......@@ -180,12 +196,22 @@ class WorkflowModule(object):
def add_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
"""
dax -- The pegasus dax object from workflow
master_jobs -- A dictionary that maps jobname -> Job object
master_files -- A dicitonary that maps filename -> File object
markers -- Dictionary for splitting within module
inputs -- Dictionary of input_file name -> master_files index OR input argument name -> value
additional_files -- dictionary of additional_file name -> master_files index
:param dax: The dax to add jobs to.
:type dax: Pegasus.DAX3.ADAG
:param master_jobs: A dictionary mapping job names -> Pegasus job objects.
:type master_jobs: dict
:param master_files: A dictionary mapping file names -> File objects.
:type master_files: dict
:param markers: The splits to take for the current module.
:type markers: dict
:param inputs: A dictionary mapping the logical file names defined in
the the workflow module -> the full file name.
:type inputs: dict
:param additional_inputs: A dictionary mapping the logical file names
defined in the workflow module -> the full file name.
:type additional_inputs: dict
:param outputs: A dictionary mapping the logical file names defined in
the workflow module -> the full file name.
"""
valid, msg = self._check_params(master_files, markers, inputs, additional_inputs, outputs)
if valid:
......@@ -350,26 +376,14 @@ class WorkflowModule(object):
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return
def get_dependent_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
dep_list = []
job_list = self.get_job_list(markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
job_inputs = self._setup_job_params(master_files, job_info, markers, "inputs", inputs, outputs)
for file_dict in job_inputs:
if file_dict["name"] in master_jobs and file_dict["name"] not in dep_list:
dep_list.append(file_dict["name"])
return dep_list
def get_final_outputs(self, markers):
job_list = self.get_job_list(markers)
job_dict = job_list[-1]
job_name = job_dict.keys()[0]
outputs = job_dict[job_name]["outputs"]
return [output.keys()[0] for output in outputs]
def get_all_final_results(self, markers):
"""
:param markers: The splits to take for the current workflow.
:type markers: dict
Gets all the final_results for the workflow with the target markers.
That is, all the outputs that are marked with final_result: True.
"""
job_list = self.get_job_list(markers)
final_results = []
for i, job_dict in enumerate(job_list):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment