Skip to content
Snippets Groups Projects
Select Git revision
  • 01ba72d32fcf48c517aa6b97b4a19d6118170c99
  • master default
  • docs
3 results

workflow_module.py

Blame
  • aknecht2's avatar
    aknecht2 authored
    Added debug variable to workflow_module.  Fixed intra module file dependency issues, by passing output files to setup_job_params.
    01ba72d3
    History
    workflow_module.py 16.70 KiB
    import os
    import argparse
    import sys
    import json
    import datetime
    import textwrap
    import xml.dom.minidom
    import yaml
    import traceback
    import re
    import itertools
    import chipathlon.conf
    from pprint import pprint
    from Pegasus.DAX3 import *
    
    
    class WorkflowModule(object):
    
        def __init__(self, module_yaml, workflow_jobs, debug = False):
            # module_yaml is the module yaml file
            # workflow_jobs is a dictionary that points from name -> actual job class
            self.err = ""
            self.debug = debug
            try:
                with open(module_yaml, "r") as rh:
                    self.data = yaml.load(rh)
            except:
                self.err += "Error parsing module template yaml file %s.\n" % (module_yaml, )
                self.err += traceback.format_exc()
            try:
                self.name = self.data.keys()[0]
                self.markers = {}
                self.order = []
                self.workflow = {}
                self.workflow_jobs = workflow_jobs
                self._load_markers()
                if self.markers:
                    self._add_workflow_keys()
                else:
                    self.workflow = []
                # We pass dict(self.markers) to create a copy
                self._load_jobs()
            except SystemExit as se:
                pass
            return
    
        def _get_job_list(self, markers):
            """
            :param markers: A dictionary containing marker values for the module.
            :type markers: dict
    
            This function returns the list of jobs that will be run provided
            the markers.  If markers is not specified, it simply returns the
            entire workflow, otherwise, it returns the appropriate jobs.
    
            """
            if markers:
                job_list = self.workflow
                for marker in self.order:
                    if marker in markers:
                        job_list = job_list[markers[marker]]
                return job_list
            else:
                return self.workflow
            return
    
        def _add_workflow_keys(self, workflow = None, index = 0):
            """
            :param workflow: The workflow to construct.
            :type workflow: dict
            :param index: The index of the marker in the order variable.
            :type index: int
    
            This function constructs the workflow dictionary with the
            correct markers as keys in a nested strucutre.  The workflow
            parameter must be passed in order for the function to run recursively
            successfully.  Using the align.yaml as an example, the following
            structure would be created:
            {
                "bwa": {
                    "single": [],
                    "paired": []
                },
                "bowtie2": {
                    "single": [],
                    "paired": []
                }
            }
            Jobs will then be added to this dictionary later for easy access when
            traversing.
            """
            workflow = self.workflow if workflow is None else workflow
            marker_key = self.order[index]
            for marker in self.markers[marker_key]:
                if index < len(self.order) - 1:
                    workflow[marker] = {}
                    self._add_workflow_keys(workflow[marker], int(index + 1))
                else:
                    workflow[marker] = []
            return
    
        def _load_jobs(self, yaml_data = None, add_markers = {}):
            """
            :param yaml_data: The workflow to load jobs into.
            :type yaml_data: list
            :param add_markers: The markers left to load.
            :type add_markers: dict
    
            After creating the structure of our workflow list, this function
            loads the actual jobs into their respective places.
            """
            yaml_data = self.data[self.name] if yaml_data is None else yaml_data
            add_markers = dict(self.markers) if not add_markers else add_markers
            # yaml_data will always be a list of dictionaries
            for item in yaml_data:
                for key in item.keys():
                    # The individual key values will either be markers or jobs.
                    if key in self.workflow_jobs:
                        # If the key is a job, we need to add the job for every
                        # possible marker depending on recursion depth.
                        for marker_values in itertools.product(*[marker[1] for marker in add_markers.iteritems()]):
                            wlist = self._get_job_list(dict(zip(self.order, marker_values)))
                            wlist.append(item)
                    elif "[" in key and "]" in key:
                        # Split by either [ or ]
                        val, marker, dummy = re.split("[\[\]]", key)
                        new_add = dict(add_markers)
                        if marker in new_add:
                            new_add[marker] = []
                        if val not in new_add:
                            new_add[marker].append(val)
                        # Recurse with updated add_markers
                        self._load_jobs(item[key], new_add)
            return
    
    
        def _load_markers(self, yaml_data = None):
            """
            :param yaml_data: The yaml_data to load markers from.
            :type yaml_data: list
    
            Finds all possible workflow splitting points i.e. markers,
            and keeps track of all their values, and the order they were
            found in.
            """
            yaml_data = self.data[self.name] if yaml_data is None else yaml_data
            for item in yaml_data:
                for key in item.keys():
                    if key in self.workflow_jobs:
                        return
                    elif "[" in key and "]" in key:
                        val, marker, dummy = re.split("[\[\]]", key)
                        if marker not in self.markers:
                            self.markers[marker] = []
                            self.order.append(marker)
                        if val not in self.markers[marker]:
                            self.markers[marker].append(val)
                    self._load_markers(item[key])
            return
    
        def get_job_names(self, markers):
            """
            :param markers: The input markers.
            :type markers: dict
    
            Returns a list of job names that match the provided markers.
            """
            job_list = self._get_job_list(markers)
            job_names = []
            for job_dict in job_list:
                job_names.append(job_dict.keys()[0])
            return job_names
    
        def add_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
            """
            dax -- The pegasus dax object from workflow
            master_jobs -- A dictionary that maps jobname -> Job object
            master_files -- A dicitonary that maps filename -> File object
            markers -- Dictionary for splitting within module
            inputs -- Dictionary of input_file name -> master_files index OR input argument name -> value
            additional_files -- dictionary of additional_file name -> master_files index
            """
            valid, msg = self._check_params(master_files, markers, inputs, additional_inputs, outputs)
            if valid:
                self._traverse_jobs(dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs)
            else:
                print msg
            return
    
        def _check_params(self, master_files, markers, inputs, additional_inputs, outputs):
            """
            :param master_files: Master file dictionary mapping file_name -> file_object.
            :type master_files: dict
            :param markers: Input markers.
            :type markers: dict
            :param inputs: All required inputs for the given markers.
            :type inputs: dict
            :param additional_inputs: All required additional_inputs for the given markers.
            :type additional_inputs: dict
            :param outputs: All required outputs for the given markers.
            :type outputs: dict
    
            Checks all input params to make sure they are specified, and all files
            exist in the master file dictionary.
            """
            valid_params = True
            valid_markers, msg = self._check_input_markers(markers)
            if valid_markers:
                required_inputs, required_additional_inputs, required_outputs = self._load_required_params(master_files, markers)
                valid_inputs, input_msg = self._check_required_params(master_files, required_inputs, inputs, "inputs")
                valid_additional, additional_msg = self._check_required_params(master_files, required_additional_inputs, additional_inputs, "additional_inputs")
                valid_outputs, outputs_msg = self._check_required_params(master_files, required_outputs, outputs, "outputs")
                if not valid_inputs or not valid_additional or not valid_outputs:
                    valid_params = False
                    msg = input_msg + additional_msg + outputs_msg
                else:
                    msg = "Params for module '%s' with markers: %s validated successfully.\n" % (self.name, markers)
            else:
                valid_params = False
            return (valid_params, msg)
    
        def _check_input_markers(self, markers):
            """"
            :param markers: The input markers to validate.
            :type markers: dict
    
            Validates input markers.
            """
            valid = True
            msg = ""
            for input_marker in markers:
                if input_marker in self.markers:
                    if markers[input_marker] not in self.markers[input_marker]:
                        msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (input_marker, markers[input_marker], self.markers[marker])
                        valid = False
                else:
                    msg += "Marker '%s' does not exist.\n" % (input_marker,)
                    valid = False
            return (valid, msg)
    
        def _check_required_params(self, master_files, required_param_list, arg_params, arg_type):
            """
            :param master_files: Master file dictionary mapping file_name -> file_object.
            :type master_files: dict
            :param required_param_list: A list of dictionaries containing required arguments.
            :type required_param_list: list
            :param arg_params: A dictionary mapping param_name -> value.
            :type arg_params: dict
            :param arg_type: The type of argument being parsed, only used for error messages.
            :type arg_type: str.
    
            This function checks the required params against the passed ones.  This
            function should be called for each required param type: inputs,
            additional_inputs, and outputs.
            """
            valid = True
            msg = ""
            for param_dict in required_param_list:
                param_name = param_dict["name"]
                if param_name in arg_params:
                    if param_dict["type"] in chipathlon.conf.argument_types["file"]:
                        if arg_params[param_name] not in master_files:
                            valid = False
                            msg += "Error loading '%s' jobs.  File with name: '%s', value: '%s' does not exist in master_files.\n" % (self.name, param_name, arg_params[param_name])
                else:
                    valid = False
                    msg += "Error loading '%s' jobs.  Supplied param '%s' for argument type '%s' not defined.\n" % (self.name, param_name, arg_type)
            return (valid, msg)
    
        def _load_required_params(self, master_files, markers):
            """
            :param master_files: Master file dictionary mapping file_name -> file_object.
            :type master_files: dict
            :param markers: Input markers.
            :type markers: dict
    
            This function loads all required parameters (inputs, additional_inputs,
            and outputs) for the specified markers.  It returns a tuple of three
            elements corresponding to these required params.
            """
            inputs = []
            additional_inputs = []
            outputs = []
            job_list = self._get_job_list(markers)
            for job_dict in job_list:
                job_name = job_dict.keys()[0]
                job_info = job_dict[job_name]
                for param_type, param_list in zip(["inputs", "additional_inputs", "outputs"], [inputs, additional_inputs, outputs]):
                    if job_info[param_type] is not None:
                        for param_dict in job_info[param_type]:
                            param_name = param_dict.keys()[0]
                            param_values = param_dict[param_name]
                            # Make sure param is not a duplicate of a previous value
                            # and that param has not already been added as an output.
                            if param_name not in [param["name"] for param in param_list] and param_name not in [param["name"] for param in outputs]:
                                param_list.append({"name": param_name, "type": param_values["type"]})
            return (inputs, additional_inputs, outputs)
    
    
        def _traverse_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
            """
            :param dax: The Pegasus dax object to add jobs to.
            :type dax: Pegasus.DAX3.ADAG
            :param master_jobs: Master job dictionary mapping job_names -> job objects.
            :type master_jobs: dict
            :param master_files: Master file dictionary mapping file_names -> file objects.
            :type master_files: dict
            :param markers: Input markers
            :type markers: dict.
            :param inputs: All required inputs for the given markers.
            :type inputs: dict
            :param additional_inputs: All required additional_inputs for the given markers.
            :type additional_inputs: dict
            :param outputs: All required outputs for the given markers.
            :type outputs: dict
    
            Walk through the jobs in the modules and add the jobs to the workflow.
            """
            job_list = self._get_job_list(markers)
            for job_dict in job_list:
                job_name = job_dict.keys()[0]
                job_info = job_dict[job_name]
                if any([output_file not in master_files for output_file in outputs]):
                    job_inputs = self._setup_job_params(master_files, job_info, markers, "inputs", inputs, outputs)
                    job_additional = self._setup_job_params(master_files, job_info, markers, "additional_inputs", additional_inputs, outputs)
                    job_outputs = self._setup_job_params(master_files, job_info, markers, "outputs", outputs, outputs)
                    job = self.workflow_jobs[job_name].create_job(job_inputs, job_additional, job_outputs)
                    if job is not None:
                        dax.addJob(job)
                    else:
                        print "JOB ERROR for '%s'.\n" % (job_dict,)
                    # NOW, WE add our job to the master_list, and check dependencies
                    # Python ALWAYS passes by reference, so even though
                    # we are a little redundant here, dependency checking
                    # will be easier later and not very costly
                    for file_dict in job_outputs:
                        master_jobs[file_dict["name"]] = job
                    for file_dict in job_inputs:
                        if file_dict["name"] in master_jobs:
                            dax.depends(child = job, parent = master_jobs[file_dict["name"]])
            return
    
        def _setup_job_params(self, master_files, job_info, markers, param_type, arg_params, outputs):
            """
            :param master_files: Master file dictionary mapping file_names -> file objects.
            :type master_files: dict
            :param job_info: The job information (inputs, additional_inputs, and outputs)
            :type job_info: dict
            :param markers: Input markers.
            :type markers: dict
            :param param_type: The param type to setup should be inputs, additional_inputs, or outputs.
            :type param_type: str
            :param arg_params: A dictionary mapping param_name -> value.
            :type arg_params: dict
    
            This function sets up the required params for a job to run
            successfully.  Primarily, it converts the format from the dictionary
            style passed into the module, to the list style expected for job
            creation.
            """
            param_list = []
            if job_info[param_type] is not None:
                for job_dict in job_info[param_type]:
                    param_name = job_dict.keys()[0]
                    param_info = job_dict[param_name]
                    if param_info["type"] in chipathlon.conf.argument_types["file"]:
                        if param_name in outputs:
                            param_list.append({
                                "name": outputs[param_name],
                                "file": master_files[outputs[param_name]],
                                "type": param_info["type"]
                            })
                        else:
                            param_list.append({
                                "name": arg_params[param_name],
                                "file": master_files[arg_params[param_name]],
                                "type": param_info["type"]
                            })
                    else:
                        param_list.append({
                            "name": param_name,
                            "value": arg_params[param_name],
                            "type": param_info["type"]
                        })
            return param_list