diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py index 635f0899c4af60d704bee496d349c6b8dc9c373f..a2b150212535dc70a7585b7f4534e91e73c890c8 100644 --- a/chipathlon/workflow.py +++ b/chipathlon/workflow.py @@ -328,8 +328,8 @@ class Workflow(object): def _create_setup(self): """ - Creates the base structure for job submission. Everything is contained - within a folder based on the current timestamp. + Creates the base structure for job submission. Everything is contained + within a folder based on the current timestamp. """ if not os.path.exists(self.base_path): os.makedirs(self.base_path) @@ -339,6 +339,9 @@ class Workflow(object): return def _add_notify(self): + """ + Add the script to email when the workflow is finished. + """ notify_path = os.path.join(self.base_path, "input/notify.sh") with open(notify_path, "w") as wh: notify = textwrap.dedent("""\ @@ -357,7 +360,9 @@ class Workflow(object): def _create_submit(self): """ - Creates the pegasus submit script. submit.sh + Creates the pegasus submit script. Just a wrapper to pegasus-plan + with the correct arguments passed through. Additionally creates + some easy status/remove scripts for the workflow when submitted. """ with open(os.path.join(self.base_path, "input/submit.sh"), "w") as wh: submit = textwrap.dedent("""\ diff --git a/chipathlon/workflow_module.py b/chipathlon/workflow_module.py index eb0c7f6b4c01bf3a489e5697b88a4f442c7ec859..0b953558a8ba54ca3c447638d64823875ade6d70 100644 --- a/chipathlon/workflow_module.py +++ b/chipathlon/workflow_module.py @@ -29,7 +29,7 @@ class WorkflowModule(object): the target module. """ - def __init__(self, module_yaml, workflow_jobs, debug = False): + def __init__(self, module_yaml, workflow_jobs, debug=False): self.err = "" self.debug = debug try: @@ -77,7 +77,7 @@ class WorkflowModule(object): return self.workflow return - def _add_workflow_keys(self, workflow = None, index = 0): + def _add_workflow_keys(self, workflow=None, index=0): """ :param workflow: The workflow to construct. :type workflow: dict @@ -112,7 +112,7 @@ class WorkflowModule(object): workflow[marker] = [] return - def _load_jobs(self, yaml_data = None, add_markers = {}): + def _load_jobs(self, yaml_data=None, add_markers={}): """ :param yaml_data: The workflow to load jobs into. :type yaml_data: list @@ -147,7 +147,7 @@ class WorkflowModule(object): return - def _load_markers(self, yaml_data = None): + def _load_markers(self, yaml_data=None): """ :param yaml_data: The yaml_data to load markers from. :type yaml_data: list @@ -231,7 +231,30 @@ class WorkflowModule(object): """ valid, msg = self._check_params(master_files, markers, inputs, outputs) if valid: - self._traverse_jobs(dax, master_jobs, master_files, markers, inputs, outputs) + job_list = self.get_job_list(markers) + for job_dict in job_list: + job_name = job_dict.keys()[0] + job_info = job_dict[job_name] + # outputs is a dict! Check the values not the keys. + # output files get created before a job is added, check against master_jobs instead + if any([output_file not in master_jobs for output_file in outputs.values()]): + job_inputs = self._setup_job_params(master_files, job_name, job_info, markers, "inputs", inputs, outputs) + job_outputs = self._setup_job_params(master_files, job_name, job_info, markers, "outputs", outputs, outputs) + job = self.workflow_jobs[job_name].create_job(job_inputs, job_outputs) + if job is not None: + dax.addJob(job) + else: + print "JOB ERROR for '%s'.\n" % (job_dict,) + raise SystemExit(1) + # NOW, WE add our job to the master_list, and check dependencies + # Python ALWAYS passes by reference, so even though + # we are a little redundant here, dependency checking + # will be easier later and not very costly + for logical_name, param_dict in job_outputs.iteritems(): + master_jobs[param_dict["name"]] = job + for logical_name, param_dict in job_inputs.iteritems(): + if param_dict["name"] in master_jobs: + dax.depends(child=job, parent=master_jobs[param_dict["name"]]) else: print msg return @@ -277,7 +300,9 @@ class WorkflowModule(object): for input_marker in markers: if input_marker in self.markers: if markers[input_marker] not in self.markers[input_marker]: - msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (input_marker, markers[input_marker], self.markers[marker]) + msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % ( + input_marker, markers[input_marker], self.markers[marker] + ) valid = False else: msg += "Marker '%s' does not exist.\n" % (input_marker,) @@ -306,7 +331,9 @@ class WorkflowModule(object): if param_dict["type"] in chipathlon.conf.argument_types["file"]: if arg_params[param_name] not in master_files: valid = False - msg += "Error loading '%s' jobs. File with name: '%s', value: '%s' does not exist in master_files.\n" % (self.name, param_name, arg_params[param_name]) + msg += "Error loading '%s' jobs. File with name: '%s', value: '%s' does not exist in master_files.\n" % ( + self.name, param_name, arg_params[param_name] + ) else: valid = False msg += "Error loading '%s' jobs. Required param '%s' for argument type '%s' not defined.\n" % (self.name, param_name, arg_type) @@ -342,49 +369,6 @@ class WorkflowModule(object): }) return (inputs, outputs) - def _traverse_jobs(self, dax, master_jobs, master_files, markers, inputs, outputs): - """ - :param dax: The Pegasus dax object to add jobs to. - :type dax: Pegasus.DAX3.ADAG - :param master_jobs: Master job dictionary mapping job_names -> job objects. - :type master_jobs: dict - :param master_files: Master file dictionary mapping file_names -> file objects. - :type master_files: dict - :param markers: Input markers - :type markers: dict. - :param inputs: All required inputs for the given markers. - :type inputs: dict - :param outputs: All required outputs for the given markers. - :type outputs: dict - - Walk through the jobs in the modules and add the jobs to the workflow. - """ - job_list = self.get_job_list(markers) - for job_dict in job_list: - job_name = job_dict.keys()[0] - job_info = job_dict[job_name] - # outputs is a dict! Check the values not the keys. - # output files get created before a job is added, check against master_jobs instead - if any([output_file not in master_jobs for output_file in outputs.values()]): - job_inputs = self._setup_job_params(master_files, job_name, job_info, markers, "inputs", inputs, outputs) - job_outputs = self._setup_job_params(master_files, job_name, job_info, markers, "outputs", outputs, outputs) - job = self.workflow_jobs[job_name].create_job(job_inputs, job_outputs) - if job is not None: - dax.addJob(job) - else: - print "JOB ERROR for '%s'.\n" % (job_dict,) - raise SystemExit(1) - # NOW, WE add our job to the master_list, and check dependencies - # Python ALWAYS passes by reference, so even though - # we are a little redundant here, dependency checking - # will be easier later and not very costly - for logical_name, param_dict in job_outputs.iteritems(): - master_jobs[param_dict["name"]] = job - for logical_name, param_dict in job_inputs.iteritems(): - if param_dict["name"] in master_jobs: - dax.depends(child=job, parent=master_jobs[param_dict["name"]]) - return - def get_all_final_results(self, markers): """ :param markers: The splits to take for the current workflow. @@ -426,9 +410,33 @@ class WorkflowModule(object): :type outputs: list This function sets up the required params for a job to run - successfully. Primarily, it converts the format from the dictionary - style passed into the module, to the list style expected for job - creation. + successfully. It converts all inputs from the format expected for the + module into the format expected for the individual jobs. The format + expected by jobs is a dictionary with keys as the parameter names: + + .. code-block:: python + + { + "file_param_name": { + "name": "FULL-FILE-NAME", + "file": PegasusFileObject, + "transfer": True/False + }, + "arg_param_name": { + "name": "arg_param_name", + "value": ArgValue + }, + "list_param_name": { + "name": "list_param_name", + "value": [ + { + "name": "FULL-FILE-NAME" + "file": PegasusFileObject + }, + ... + ] + } + } """ param_dict = {} if job_info[param_type] is not None: @@ -439,6 +447,8 @@ class WorkflowModule(object): # wj_param is the info contained in the workflow job yaml. if wj_param["type"] in chipathlon.conf.argument_types["file"]: if logical_name in outputs: + # This particular file is an output from a previous step! + # Load its information accordingly. param_dict[param_name] = { "name": outputs[logical_name], "file": master_files[outputs[logical_name]], @@ -452,7 +462,6 @@ class WorkflowModule(object): } elif wj_param["type"] in chipathlon.conf.argument_types["list"]: if wj_param["type"] == "file_list": - sub_list = [] param_dict[param_name] = { "name": param_name, "value": [{