import yaml import os import sys import re from Pegasus.DAX3 import * import chipathlon import chipathlon.conf from chipathlon.utils import is_number from pprint import pprint class WorkflowJob(object): """ :param base_file: The yaml job definition file. :type base_file: str :param params: A dictionary of values for overwriting default arguments. :type params: dict :param debug: A debug flag to print out additional information. :type debug: bool The WorkflowJob contains all infomration necessary to create a single job. Parameters and resources are loaded from a yaml file. """ def __init__(self, base_file, params, debug=False): self.errors = [] self.debug = debug self.raw_files = {} self.params = params if params is not None else {} self._load_base_file(base_file) self.validate() return def __repr__(self): return "[job_name=%s, job_command=%s]" % (self.job_name, self.command) def _load_base_file(self, base_file): """ Doesn't do much besides load in the yaml for the job definition. """ if os.path.isfile(base_file): with open(base_file, "r") as rh: try: base = yaml.load(rh) self.job_name = base.keys()[0] self.job_data = base[self.job_name] self.command = self.job_data["command"] except yaml.YAMLError as ye: self.errors.append("Error parsing job template yaml file '%s': %s." % (base_file, ye)) else: self.errors.append("Job template file '%s' does not exist." % (base_file,)) return def is_valid(self): """ Checks if the run is valid... Pretty straightforward. """ return len(self.errors) == 0 def get_error_string(self): """ Returns the errors as a newline separated string. """ return "\n".join(self.errors) def validate(self): """ 1. Validate that the input yaml meets the base standard. 2. Validate the resources meet the base standard. 3. Ensure all arguments are valid and reference existing inputs / outputs. """ for validation_step in [self._validate_base_yaml, self._validate_files, self._validate_resources, self._validate_arguments]: if self.is_valid(): validation_step() return def _check_keys(self, data, required_keys, optional_keys, error_prefix="Key Error"): """ :param data: The dictionary to validate :type data: dict :param required_keys: A list of required keys for the dictionary. :type required_keys: list :param optional_keys: A list of optional keys that can be in the dictionary. :type optional_keys: list :param error_prefix: A prefix to add to error messages. :type error_prefix: str Checks that all required keys are specified in the dictionary. Checks that all keys in the dictionary are defined in either required or optional keys. """ valid = True all_keys = required_keys + optional_keys for key in required_keys: if key not in data: self.errors.append("[Error parsing job %s] %s: Required key '%s' not defined." % (self, error_prefix, key)) valid = False for key in data: if key not in all_keys: self.errors.append("[Error parsing job %s] %s: Specified key '%s' is not valid." % (self, error_prefix, key)) valid = False return valid def _validate_base_yaml(self): """ Pretty straightforward... Just validating the structure of the job yaml file. """ self._check_keys(self.job_data, chipathlon.conf.job_keys["required"], chipathlon.conf.job_keys["optional"], "Base Yaml Error") if self.is_valid(): self.valid_inputs = self.job_data["inputs"].keys() if self.job_data["inputs"] is not None else [] self.valid_outputs = self.job_data["outputs"].keys() if self.job_data["outputs"] is not None else [] return def _validate_files(self): """ Validates all the files in the inputs / outputs list for the current job. Ensures that the params have appropriate definitions. """ for file_dict_type in ["inputs", "outputs"]: if self.job_data[file_dict_type] is not None: for param_name, file_dict in self.job_data[file_dict_type].iteritems(): self._check_keys( file_dict, chipathlon.conf.job_inout_keys["required"], chipathlon.conf.job_inout_keys["optional"], file_dict_type ) # Validate file types for inputs / outputs that are files if "type" in file_dict and file_dict["type"] in chipathlon.conf.argument_types["file"]: if "file_type" in file_dict: if not file_dict["file_type"] in chipathlon.conf.file_extensions: self.errors.append("[Error parsing job %s]: Param '%s' has invalid file_type: '%s'. \ Should be one of : %s." % (self, param_name, file_dict["file_type"], chipathlon.conf.file_extensions)) else: self.errors.append("[Error parsing job %s]: Param '%s' is a file, and must have a 'file_type' specified." % (self, param_name)) def _validate_resources(self): """ Validate memory/walltime/cores/nodes values supplied in the params file are present and numeric. """ for resource_type in chipathlon.conf.resources.keys(): # Check that default value provided exists / is valid if resource_type in self.job_data: if not is_number(self.job_data[resource_type]): self.errors.append("[Error parsing job %s] Resource specification for '%s' must be numeric." % (self, resource_type)) else: self.errors.append("[Error parsing job %s] Resource specification for '%s' is missing." % (self, resource_type)) # Check if provided param values exist / are valid if resource_type in self.params: if not is_number(self.params[resource_type]): self.errors.append("[Error parsing job %s] Resource specification for '%s' passed through params must be numeric." % (self, resource_type)) return def _validate_arguments(self): """ Loop through each argument & validate it, a few things happen here: 1. Verify the structure of the arguments. For the full list of required and optional keys see chipathlon.conf.argument_keys 2. Verify the value of the arguments. Make sure file references are valid and file types exist. 3. Verify that passed in arguments from the params file are valid. """ all_args = [] for arg_dict in self.job_data["arguments"]: arg_name = arg_dict.keys()[0] all_args.append(arg_name) arg_info = arg_dict[arg_name] # Check that arguments have the correct definition if self._check_keys(arg_info, chipathlon.conf.argument_keys["required"], chipathlon.conf.argument_keys["optional"], "Argument '%s' Error" % (arg_name,)): valid_arg, msg = self._is_valid_arg(arg_name, arg_info) if valid_arg: # If the argument is a rawfile, add it to the raw_file list for use in the DAX # This is for files that are on disk and need to be staged if arg_info["type"] == "rawfile": arg_value = self._get_arg_value(arg_name, arg_info) self.raw_files[os.path.basename(arg_value)] = {"path": arg_value} # Sometimes we have a folder full of files we need to include, # in this case, use the rawfolder argument to stage all files # within the folder. elif arg_info["type"] == "rawfolder": arg_value = self._get_arg_value(arg_name, arg_info) dir_name = os.path.basename(os.path.dirname(arg_value + "/")) for root, dirs, files in os.walk(arg_value): for f in files: self.raw_files[dir_name + "/" + f] = {"path": "%s/%s" % (arg_value, f)} break else: self.errors.append(msg) if self.params.get("arguments") is not None: for arg_name in self.params["arguments"]: if arg_name not in all_args: self.errors.append("[Error parsing job %s] Argument '%s' specified in params does not exist." % (self, arg_name)) return def _is_valid_arg(self, arg_name, arg_info): """ :param arg_name: Name of the argument. :type arg_name: str :param arg_info: Argument descriptors. :type arg_info: dict Checks if an argument is valid. Argument descriptors have the following implication rules: changeable & has_value & no default => must be in params not changeable & has_value => required and default not changeable & not_required => Optional argument, but not passed in from params """ # First we check our implication rules if arg_info["changeable"]: if arg_info["required"] and "default" not in arg_info and (self.params.get("arguments") is None or arg_name not in self.params["arguments"]): return (False, "[Error parsing %s job file] Argument '%s' is changeable and has no default. Argument must be supplied through params." % (self.job_name, arg_name)) elif arg_info["required"]: if arg_info["has_value"] and "default" not in arg_info: return (False, "[Error parsing %s job file] Argument '%s' is unchangeable and required but has no default." % (self.job_name, arg_name)) # If all our implication rules pass, we check the values return self._has_valid_arg_value(arg_name, arg_info) def _has_valid_arg_value(self, arg_name, arg_info): """ :param arg_name: Name of the argument. :type arg_name: str :param arg_info: Argument descriptors. :type arg_info: dict Validates the given argument value. If it's numeric make sure the value is appropriate. If it's a rawfile make sure the input path exists / validate file_type. If it's a file make sure it references a file list. If it references a file list make sure it's a valid index. Since all input and outputs are in list format, make sure the index is within the size of the list. """ arg_value = self._get_arg_value(arg_name, arg_info) if arg_value is not None: if arg_info["type"] == "numeric": # We need this as two seperate if statements otherwise we try # to interpolate numeric arguments if not is_number(arg_value): return (False, "[Error parsing job %s]: Argument '%s' with value '%s' must be numeric." % (self, arg_name, arg_value)) elif arg_info["type"] in chipathlon.conf.argument_types["file"]: # If the argument is a rawfile, validate it's extension & existance if arg_info["type"] == "rawfile": if os.path.isfile(arg_value): if not arg_value.endswith(tuple(chipathlon.conf.file_extensions[arg_info["file_type"]])): return (False, "[Error parsing job %s] Argument '%s' with file path '%s' is not of file type '%s'. \ Should match one of these extensions: %s." % ( self, arg_name, arg_value, arg_info["file_type"], chipathlon.conf.file_extensions[arg_info["file_type"]] ) ) else: return (False, "[Error parsing job %s]: Argument '%s' is a rawfile, however the specified path '%s' does not exist. " % (self, arg_name, arg_value)) # If the argument is a 'regular' file, we need to make sure that it # references one of the keys of the inputs / outputs elif not arg_value.startswith("$"): return (False, "[Error parsing job %s]: Argument '%s' has value '%s'. File references must start with a '$'." % (self, arg_name, arg_value)) elif isinstance(arg_value, str) and arg_value.startswith("$"): if not any([str(arg_value)[1:] == ref for ref in (self.valid_inputs + self.valid_outputs)]): return (False, "[Error parsing job %s]: Argument '%s' has reference '%s'. No such input / output exists." % (self, arg_name, arg_value)) return (True, None) def _get_arg_value(self, arg_name, arg_info): """ :param arg_name: Name of the argument. :type arg_name: str :param arg_info: Argument descriptors. :type arg_info: dict Get the value of the given argument depending on the argument descriptors. Return value is based on the descriptors: changeable = false & has_value = false -> arg_name changeable = false & has_value = true -> default changeable = true & param = true -> param changeable = true & param = false -> default """ if arg_info["changeable"]: if self.params.get("arguments") is not None and arg_name in self.params["arguments"]: return self.params["arguments"][arg_name] elif "path" in arg_info: return self._get_path(arg_info["path"]) + arg_info["default"] else: return arg_info.get("default") else: if arg_info["has_value"]: return arg_info.get("default") elif "path" in arg_info: return self._get_path(arg_info["path"]) + arg_name else: return arg_name return def _get_path(self, dir_name): """ Helper function to get the correct path to a resource file. Used for default file arguments, like the config files provided to gem and ccat. """ return os.path.join(os.path.dirname(os.path.realpath(__file__)), "jobs", dir_name) + "/" def add_executable(self, executable): """ :param executable: Executable to use for the job. :type executable: Pegasus.DAX3.Executable Setter function for job executable. """ self.executable = executable return def get_db_arguments(self): """ Returns a dictionary of arguments to be inserted into the database. The dictionary will be arg_name -> arg_value entries. """ arguments = {} for arg_dict in self.job_data["arguments"]: # root key is actual argument name arg_name = arg_dict.keys()[0] arg_info = arg_dict[arg_name] # We only want to show changeable args to the end user if arg_info["changeable"] and arg_info["has_value"]: arguments[arg_name] = self._get_arg_value(arg_name, arg_info) return arguments def _is_param_required(self, param_name): """ :param param_name: The input/output key to test. :type param_name: str Check to see if a particular input/output is required. An input/output is required iff it is interpolated in the arguments section by a non-changeable required argument. """ for arg_dict in self.job_data["arguments"]: # root key is actual argument name arg_name = arg_dict.keys()[0] arg_info = arg_dict[arg_name] check_val = "$%s" % (param_name,) if arg_info["type"] in chipathlon.conf.argument_types["file"] and (arg_name == check_val or ("default" in arg_info and check_val in arg_info["default"])): return arg_info["required"] and not arg_info["changeable"] return False def _params_are_valid(self, inputs, outputs): """ :param inputs: Input params for the job. :type inputs: dcit :param outputs: Output params for the job. :type outputs: dcit Validate the passed arguments & files. The keys of the inputs / outputs passed in should match the job definition, and files should have correctly matching extension. """ for param_type, passed_param_dict in zip(["inputs", "outputs"], [inputs, outputs]): # In theory there could be a job that doesn't need inputs... if self.job_data[param_type] is not None: # Loop through all required params to make sure they are provided for param_name, base_param_info in self.job_data[param_type].iteritems(): if self._is_param_required(param_name): if param_name in passed_param_dict: if base_param_info["type"] in chipathlon.conf.argument_types["file"]: # We need to check the FULL file name of the passed in file to # validate the extensions correctly. if not passed_param_dict[param_name]["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])): self.errors.append("Job creation error %s: File %s specified for param %s is not of \ type %s. Should match one of these extensions: %s." % ( self, passed_param_dict[param_name]["name"], param_name, base_param_info["file_type"], chipathlon.conf.file_extensions[base_param_info["file_type"]] ) ) else: self.errors.append("Job creation error %s: %s is a required %s paramater, but was not passed in." \ % (self, param_name, param_type)) # Now we loop through all passed in params to make sure they are valid for param_name, passed_param_info in passed_param_dict.iteritems(): if param_name in self.job_data[param_type]: base_param_info = self.job_data[param_type][param_name] if self.job_data[param_type][param_name]["type"] in chipathlon.conf.argument_types["file"]: # We need to check the FULL file name of the passed in file to # validate the extensions correctly. if not passed_param_info["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])): self.errors.append("Job creation error %s: File %s specified for param %s is not of \ type %s. Should match one of these extensions: %s." % ( self, passed_param_info["name"], param_name, base_param_info["file_type"], chipathlon.conf.file_extensions[base_param_info["file_type"]] ) ) else: self.errors.append("Job creation error %s: Specified param %s does not exist in job." % (self, param_name)) return self.is_valid() def create_job(self, inputs, outputs): """ :param inputs: Input params for the job. :type inputs: list :param outputs: Output params for the job. :type outputs: list :returns: The job created or None if unsuccessful Creates an actual Pegasus.Dax3.Job instance from the provided information. Inputs & outputs should be dictionaries with keys that correspond to the param information in the workflow_job yaml file. **inputs** .. code-block:: python { "param-name": { "name": "full-file-name", "type": "file", "file": pegasus-file-object }, "param-name": { "name": "argument-name", "type": "argument", "value": argument-value }, "param-name": { "name": "full-file-name", "type": "file_list", "value": [ ] ... } .. code-block:: python { "param-name": { "name": "full-file-name", "type": "file", "transfer": True/False, "file": pegasus-file-object }, ... } """ if self._params_are_valid(inputs, outputs): job = Job(self.executable) for file_name, file_info in self.raw_files.iteritems(): job.uses(file_info["file"], link=Link.INPUT) for param_name, param_info in inputs.iteritems(): wj_param = self.get_param_info(param_name) if wj_param["type"] == "file_list": for f in param_info["value"]: job.uses(f["file"], link=Link.INPUT) elif wj_param["type"] in chipathlon.conf.argument_types["file"]: job.uses(param_info["file"], link=Link.INPUT) for output_name, output_info in outputs.iteritems(): wj_param = self.get_param_info(output_name) job.uses(output_info["file"], link=Link.OUTPUT, transfer=output_info["transfer"]) # Redirect stdout / stderr if wj_param["type"] == "stdout": job.setStdout(output_info["name"]) elif wj_param["type"] == "stderr": job.setStderr(output_info["name"]) arg_list = self._create_arg_list(inputs, outputs) job.addArguments(*self._create_arg_list(inputs, outputs)) self._add_job_resources(job) return job else: print self.get_error_string() return None return None def _add_job_resources(self, job): """ :param job: The job instance to add resources to. :type job: Pegasus.Dax3.Job Add the job's resources (memory, walltime, etc.) via Pegasus Profiles. """ for resource_type in chipathlon.conf.resources.keys(): try: resource_value = self.params[resource_type] except KeyError: resource_value = self.job_data[resource_type] ns = chipathlon.conf.resources[resource_type]["namespace"] key = chipathlon.conf.resources[resource_type]["key"] job.profile(ns, key, resource_value) return def _interpolate_value(self, inputs, outputs, arg_name, arg_info): """ :param inputs: Input params for the job. :type inputs: dict :param outputs: Output params for the job. :type outputs: dict :param arg_name: The name of the arg to convert. :type arg_name: str :param arg_info: Argument info from the yaml. :type arg_info: dict This helper method converts an argument into its proper value, meaning it can convert a reference string like "$target_file" into an actual Pegasus File instance, or interpolate the proper arguments that are passed in. """ arg_value = self._get_arg_value(arg_name, arg_info) if self.debug: print "arg_value: %s" % (arg_value,), add_value = arg_value if (isinstance(arg_value, str) and arg_value.startswith("$")) or arg_info["type"] == "rawfile": arg_value = arg_value[1:] if arg_info["type"] == "rawfile": add_value = self.raw_files[os.path.basename(arg_value)]["file"] else: # Conditionally load from inputs / outputs # This will only load the dict of information though, not # the actual value we want add_value = (inputs if arg_value in inputs else outputs).get(arg_value) # If the value is None we wan't to avoid adding anything if add_value is not None: if arg_info["type"] in chipathlon.conf.argument_types["file"]: # Want the actual Pegasus file obj add_value = add_value.get("file").name else: # Get the actual argumnet value add_value = add_value.get("value") elif arg_info["type"] == "rawfolder": # We want just the folder name add_value = os.path.basename(os.path.dirname(arg_value + "/")) elif arg_info["type"] == "file_list": add_value = [] for val in arg_value: file_name = val[1:] add_value.append((inputs if file_name in inputs else outputs)[file_name]["file"].name) return add_value def _create_arg_list(self, inputs, outputs): """ :param inputs: Input params for the job. :type inputs: dict :param outputs: Output params for the job. :type outputs: dict Inputs / outputs should be dictionaries like the ones described in the create_job documentation. This function creates the necessary argument list for pegasus. This includes all arguments the executable needs to run, and the correct file objects passed in the correct order. To add arguments to the pegasus job use: job.addArguments(*arg_list) """ arg_list = [] # Java tools need to have -Xmx specified... if self.job_data["command"] in chipathlon.conf.java_tools: arg_list.append("-Xmx%sM" % (self.params.get("memory", self.job_data["memory"]))) # Go through each argument for arg_dict in self.job_data["arguments"]: arg_name = arg_dict.keys()[0] arg_info = arg_dict[arg_name] # Need to figure out 2 things: # 1. Should we add the argument name? # 2. What's the correct value to add? if self.debug: print "\t%s: Loading argument: %s, info: %s, " % (self, arg_name, arg_info), add_value = self._interpolate_value(inputs, outputs, arg_name, arg_info) if self.debug: print "Final value: %s" % (add_value) # Only add arguments that have a value if add_value is not None: # Need to add in the arg_name and the arg_value if arg_info["has_value"]: # If it's a file we want to add the actual Pegasus.File instance if isinstance(add_value, File): arg_list.append(arg_name) arg_list.append(add_value) # If it's a list we want to add each of the Pegasus.File instances elif isinstance(add_value, list): arg_list.append(arg_name) if "separator" in arg_info: arg_list.append(arg_info["separator"].join(add_value)) else: for f in add_value: arg_list.append(f) # Otherwise, add stuff as a string else: arg_list.append("%s %s" % (arg_name, add_value)) else: arg_list.append(add_value) return arg_list def get_param_info(self, param_name): """ :param param_name: The name of the input/output to get information for. :type param_name: str Returns the information associated with a particular input/output as a dictionary: the type & file_type if applicable. """ for param_type in ["inputs", "outputs"]: if self.job_data[param_type] is not None: if param_name in self.job_data[param_type]: return self.job_data[param_type][param_name] return None