Select Git revision
workflow_module.py
aknecht2 authored
Added debug variable to workflow_module. Fixed intra module file dependency issues, by passing output files to setup_job_params.
workflow_module.py 16.70 KiB
import os
import argparse
import sys
import json
import datetime
import textwrap
import xml.dom.minidom
import yaml
import traceback
import re
import itertools
import chipathlon.conf
from pprint import pprint
from Pegasus.DAX3 import *
class WorkflowModule(object):
def __init__(self, module_yaml, workflow_jobs, debug = False):
# module_yaml is the module yaml file
# workflow_jobs is a dictionary that points from name -> actual job class
self.err = ""
self.debug = debug
try:
with open(module_yaml, "r") as rh:
self.data = yaml.load(rh)
except:
self.err += "Error parsing module template yaml file %s.\n" % (module_yaml, )
self.err += traceback.format_exc()
try:
self.name = self.data.keys()[0]
self.markers = {}
self.order = []
self.workflow = {}
self.workflow_jobs = workflow_jobs
self._load_markers()
if self.markers:
self._add_workflow_keys()
else:
self.workflow = []
# We pass dict(self.markers) to create a copy
self._load_jobs()
except SystemExit as se:
pass
return
def _get_job_list(self, markers):
"""
:param markers: A dictionary containing marker values for the module.
:type markers: dict
This function returns the list of jobs that will be run provided
the markers. If markers is not specified, it simply returns the
entire workflow, otherwise, it returns the appropriate jobs.
"""
if markers:
job_list = self.workflow
for marker in self.order:
if marker in markers:
job_list = job_list[markers[marker]]
return job_list
else:
return self.workflow
return
def _add_workflow_keys(self, workflow = None, index = 0):
"""
:param workflow: The workflow to construct.
:type workflow: dict
:param index: The index of the marker in the order variable.
:type index: int
This function constructs the workflow dictionary with the
correct markers as keys in a nested strucutre. The workflow
parameter must be passed in order for the function to run recursively
successfully. Using the align.yaml as an example, the following
structure would be created:
{
"bwa": {
"single": [],
"paired": []
},
"bowtie2": {
"single": [],
"paired": []
}
}
Jobs will then be added to this dictionary later for easy access when
traversing.
"""
workflow = self.workflow if workflow is None else workflow
marker_key = self.order[index]
for marker in self.markers[marker_key]:
if index < len(self.order) - 1:
workflow[marker] = {}
self._add_workflow_keys(workflow[marker], int(index + 1))
else:
workflow[marker] = []
return
def _load_jobs(self, yaml_data = None, add_markers = {}):
"""
:param yaml_data: The workflow to load jobs into.
:type yaml_data: list
:param add_markers: The markers left to load.
:type add_markers: dict
After creating the structure of our workflow list, this function
loads the actual jobs into their respective places.
"""
yaml_data = self.data[self.name] if yaml_data is None else yaml_data
add_markers = dict(self.markers) if not add_markers else add_markers
# yaml_data will always be a list of dictionaries
for item in yaml_data:
for key in item.keys():
# The individual key values will either be markers or jobs.
if key in self.workflow_jobs:
# If the key is a job, we need to add the job for every
# possible marker depending on recursion depth.
for marker_values in itertools.product(*[marker[1] for marker in add_markers.iteritems()]):
wlist = self._get_job_list(dict(zip(self.order, marker_values)))
wlist.append(item)
elif "[" in key and "]" in key:
# Split by either [ or ]
val, marker, dummy = re.split("[\[\]]", key)
new_add = dict(add_markers)
if marker in new_add:
new_add[marker] = []
if val not in new_add:
new_add[marker].append(val)
# Recurse with updated add_markers
self._load_jobs(item[key], new_add)
return
def _load_markers(self, yaml_data = None):
"""
:param yaml_data: The yaml_data to load markers from.
:type yaml_data: list
Finds all possible workflow splitting points i.e. markers,
and keeps track of all their values, and the order they were
found in.
"""
yaml_data = self.data[self.name] if yaml_data is None else yaml_data
for item in yaml_data:
for key in item.keys():
if key in self.workflow_jobs:
return
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if marker not in self.markers:
self.markers[marker] = []
self.order.append(marker)
if val not in self.markers[marker]:
self.markers[marker].append(val)
self._load_markers(item[key])
return
def get_job_names(self, markers):
"""
:param markers: The input markers.
:type markers: dict
Returns a list of job names that match the provided markers.
"""
job_list = self._get_job_list(markers)
job_names = []
for job_dict in job_list:
job_names.append(job_dict.keys()[0])
return job_names
def add_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
"""
dax -- The pegasus dax object from workflow
master_jobs -- A dictionary that maps jobname -> Job object
master_files -- A dicitonary that maps filename -> File object
markers -- Dictionary for splitting within module
inputs -- Dictionary of input_file name -> master_files index OR input argument name -> value
additional_files -- dictionary of additional_file name -> master_files index
"""
valid, msg = self._check_params(master_files, markers, inputs, additional_inputs, outputs)
if valid:
self._traverse_jobs(dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs)
else:
print msg
return
def _check_params(self, master_files, markers, inputs, additional_inputs, outputs):
"""
:param master_files: Master file dictionary mapping file_name -> file_object.
:type master_files: dict
:param markers: Input markers.
:type markers: dict
:param inputs: All required inputs for the given markers.
:type inputs: dict
:param additional_inputs: All required additional_inputs for the given markers.
:type additional_inputs: dict
:param outputs: All required outputs for the given markers.
:type outputs: dict
Checks all input params to make sure they are specified, and all files
exist in the master file dictionary.
"""
valid_params = True
valid_markers, msg = self._check_input_markers(markers)
if valid_markers:
required_inputs, required_additional_inputs, required_outputs = self._load_required_params(master_files, markers)
valid_inputs, input_msg = self._check_required_params(master_files, required_inputs, inputs, "inputs")
valid_additional, additional_msg = self._check_required_params(master_files, required_additional_inputs, additional_inputs, "additional_inputs")
valid_outputs, outputs_msg = self._check_required_params(master_files, required_outputs, outputs, "outputs")
if not valid_inputs or not valid_additional or not valid_outputs:
valid_params = False
msg = input_msg + additional_msg + outputs_msg
else:
msg = "Params for module '%s' with markers: %s validated successfully.\n" % (self.name, markers)
else:
valid_params = False
return (valid_params, msg)
def _check_input_markers(self, markers):
""""
:param markers: The input markers to validate.
:type markers: dict
Validates input markers.
"""
valid = True
msg = ""
for input_marker in markers:
if input_marker in self.markers:
if markers[input_marker] not in self.markers[input_marker]:
msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (input_marker, markers[input_marker], self.markers[marker])
valid = False
else:
msg += "Marker '%s' does not exist.\n" % (input_marker,)
valid = False
return (valid, msg)
def _check_required_params(self, master_files, required_param_list, arg_params, arg_type):
"""
:param master_files: Master file dictionary mapping file_name -> file_object.
:type master_files: dict
:param required_param_list: A list of dictionaries containing required arguments.
:type required_param_list: list
:param arg_params: A dictionary mapping param_name -> value.
:type arg_params: dict
:param arg_type: The type of argument being parsed, only used for error messages.
:type arg_type: str.
This function checks the required params against the passed ones. This
function should be called for each required param type: inputs,
additional_inputs, and outputs.
"""
valid = True
msg = ""
for param_dict in required_param_list:
param_name = param_dict["name"]
if param_name in arg_params:
if param_dict["type"] in chipathlon.conf.argument_types["file"]:
if arg_params[param_name] not in master_files:
valid = False
msg += "Error loading '%s' jobs. File with name: '%s', value: '%s' does not exist in master_files.\n" % (self.name, param_name, arg_params[param_name])
else:
valid = False
msg += "Error loading '%s' jobs. Supplied param '%s' for argument type '%s' not defined.\n" % (self.name, param_name, arg_type)
return (valid, msg)
def _load_required_params(self, master_files, markers):
"""
:param master_files: Master file dictionary mapping file_name -> file_object.
:type master_files: dict
:param markers: Input markers.
:type markers: dict
This function loads all required parameters (inputs, additional_inputs,
and outputs) for the specified markers. It returns a tuple of three
elements corresponding to these required params.
"""
inputs = []
additional_inputs = []
outputs = []
job_list = self._get_job_list(markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
for param_type, param_list in zip(["inputs", "additional_inputs", "outputs"], [inputs, additional_inputs, outputs]):
if job_info[param_type] is not None:
for param_dict in job_info[param_type]:
param_name = param_dict.keys()[0]
param_values = param_dict[param_name]
# Make sure param is not a duplicate of a previous value
# and that param has not already been added as an output.
if param_name not in [param["name"] for param in param_list] and param_name not in [param["name"] for param in outputs]:
param_list.append({"name": param_name, "type": param_values["type"]})
return (inputs, additional_inputs, outputs)
def _traverse_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
"""
:param dax: The Pegasus dax object to add jobs to.
:type dax: Pegasus.DAX3.ADAG
:param master_jobs: Master job dictionary mapping job_names -> job objects.
:type master_jobs: dict
:param master_files: Master file dictionary mapping file_names -> file objects.
:type master_files: dict
:param markers: Input markers
:type markers: dict.
:param inputs: All required inputs for the given markers.
:type inputs: dict
:param additional_inputs: All required additional_inputs for the given markers.
:type additional_inputs: dict
:param outputs: All required outputs for the given markers.
:type outputs: dict
Walk through the jobs in the modules and add the jobs to the workflow.
"""
job_list = self._get_job_list(markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
if any([output_file not in master_files for output_file in outputs]):
job_inputs = self._setup_job_params(master_files, job_info, markers, "inputs", inputs, outputs)
job_additional = self._setup_job_params(master_files, job_info, markers, "additional_inputs", additional_inputs, outputs)
job_outputs = self._setup_job_params(master_files, job_info, markers, "outputs", outputs, outputs)
job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
else:
print "JOB ERROR for '%s'.\n" % (job_dict,)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for file_dict in job_outputs:
master_jobs[file_dict["name"]] = job
for file_dict in job_inputs:
if file_dict["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return
def _setup_job_params(self, master_files, job_info, markers, param_type, arg_params, outputs):
"""
:param master_files: Master file dictionary mapping file_names -> file objects.
:type master_files: dict
:param job_info: The job information (inputs, additional_inputs, and outputs)
:type job_info: dict
:param markers: Input markers.
:type markers: dict
:param param_type: The param type to setup should be inputs, additional_inputs, or outputs.
:type param_type: str
:param arg_params: A dictionary mapping param_name -> value.
:type arg_params: dict
This function sets up the required params for a job to run
successfully. Primarily, it converts the format from the dictionary
style passed into the module, to the list style expected for job
creation.
"""
param_list = []
if job_info[param_type] is not None:
for job_dict in job_info[param_type]:
param_name = job_dict.keys()[0]
param_info = job_dict[param_name]
if param_info["type"] in chipathlon.conf.argument_types["file"]:
if param_name in outputs:
param_list.append({
"name": outputs[param_name],
"file": master_files[outputs[param_name]],
"type": param_info["type"]
})
else:
param_list.append({
"name": arg_params[param_name],
"file": master_files[arg_params[param_name]],
"type": param_info["type"]
})
else:
param_list.append({
"name": param_name,
"value": arg_params[param_name],
"type": param_info["type"]
})
return param_list