diff --git a/chipathlon/jobs/modules/align.yaml b/chipathlon/jobs/modules/align.yaml index 26a5cc389baa4193f927481fd066995557479601..84e2bcae8578579200cf909a435d931fa8aba666 100644 --- a/chipathlon/jobs/modules/align.yaml +++ b/chipathlon/jobs/modules/align.yaml @@ -15,7 +15,7 @@ align: - bwa.sai - bwa_sai_to_sam: inputs: - - ref_genome.fa + - ref_genome - bwa.sai - download_1.fastq additional_inputs: null diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py index 125a56fcd00f5de4f50245004e1337fb29336477..08ae32fdea334039dce939305bb12b6f536f6ec4 100644 --- a/chipathlon/workflow.py +++ b/chipathlon/workflow.py @@ -8,7 +8,7 @@ import xml.dom.minidom import yaml import traceback import chipathlon -import chipathlon.yaml_job +import chipathlon.workflow_job import chipathlon.db import chipathlon.workflow_module from pprint import pprint @@ -33,30 +33,25 @@ class Workflow(object): self.files = {} self.jobs = {} self.deps = {} - self.yaml_jobs = {} + self.workflow_jobs = {} self.modules = {} return def info(self): pprint(self.run_data) - pprint(self.yaml_jobs) + pprint(self.workflow_jobs) pprint(self.modules) pprint(self.executables) return def generate(self): - self._load_yaml_jobs() - self._load_modules() - if not self.err: - self._load_runs() - if not self.err: - print "Put the rest of the generating functions hurrrr." - self._load_executables() - self.info() - else: - print self.err - sys.exit(1) - else: + while (not self.err): + self._load_executables() + self._load_workflow_jobs() + self._load_modules() + #self._load_runs() + break + if self.err: print self.err sys.exit(1) return @@ -82,7 +77,7 @@ class Workflow(object): def _load_modules(self): for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)): for f in files: - mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.yaml_jobs) + mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs) if not mod.err: self.modules[mod.name] = mod else: @@ -145,12 +140,13 @@ class Workflow(object): self.err += "DB error: %s.\n" % (msg, ) return - def _load_yaml_jobs(self): + def _load_workflow_jobs(self): for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)): for f in files: - yj = chipathlon.yaml_job.YamlJob(os.path.join(root, f), self.param_file) + ex_name = f.split("_")[0] + yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name]) if not yj.err: - self.yaml_jobs[yj.jobname] = yj + self.workflow_jobs[yj.jobname] = yj else: self.err += yj.err break diff --git a/chipathlon/yaml_job.py b/chipathlon/workflow_job.py similarity index 91% rename from chipathlon/yaml_job.py rename to chipathlon/workflow_job.py index c7069ae5efa4601604ed7ca796e1c63ce470fc2e..1e16c88e195b3628d581f1afa8ccbee568671627 100644 --- a/chipathlon/yaml_job.py +++ b/chipathlon/workflow_job.py @@ -7,10 +7,11 @@ import chipathlon.conf from pprint import pprint -class YamlJob(object): +class WorkflowJob(object): - def __init__(self, base_yaml, param_file): + def __init__(self, base_yaml, param_file, executable): self.err = "" + self.executable = executable try: with open(base_yaml, "r") as rh: self.base = yaml.load(rh) @@ -93,9 +94,8 @@ class YamlJob(object): valid_files = False return valid_files - def create_job(self, executable, input_files, additional_inputs, output_files): + def create_job(self, input_files, additional_inputs, output_files): """ - - executable should be the pegasus class - input_files should be a list of dictionaries: [{"name": FILE_NAME, "file": FILE_OBJECT},...] - additional_files should be a list of dictionaries: @@ -104,7 +104,7 @@ class YamlJob(object): [{"name": FILE_NAME, "file": FILE_OBJECT},...] """ if self._check_files(input_files, additional_inputs, output_files): - job = Job(executable) + job = Job(self.executable) for f in input_files + additional_inputs: job.uses(f["file"], link=Link.INPUT) for f in output_files: @@ -129,19 +129,19 @@ class YamlJob(object): for argd in self.base[self.jobname]["arguments"]: arg = argd.keys()[0] arg_info = argd[arg] - if arg_info["changeable"] and arg_info["value"]: + if arg_info["changeable"] and arg_info["has_value"]: if arg in self.params["arguments"]: arg_list.append("%s %s" % (arg, self.params["arguments"][arg])) else: - arg_list.append("%s %s" % (arg, arg_info["value"])) + arg_list.append("%s %s" % (arg, arg_info["default"])) else: - if arg_info["value"]: - if arg_info["value"][:1] == "$": - key, index = arg_info["value"].split(".")[0] + if arg_info["has_value"]: + if arg_info["default"][:1] == "$": + key, index = arg_info["default"].split(".")[0] arg_list.append(arg) arg_list.append(file_map[key][index]["file"]) else: - arg_list.append("%s %s" % (arg, arg_info["value"])) + arg_list.append("%s %s" % (arg, arg_info["default"])) else: if arg[:1] == "$": key, index = arg.split(".")[0] diff --git a/chipathlon/workflow_module.py b/chipathlon/workflow_module.py index 07b51322ac08bbf9c7ab4204d97438ac822a4b53..5db493dee3ff5170f9f7bd04abb774281f89fb01 100644 --- a/chipathlon/workflow_module.py +++ b/chipathlon/workflow_module.py @@ -14,16 +14,17 @@ from Pegasus.DAX3 import * class WorkflowModule(object): - def __init__(self, module_yaml, yaml_jobs): + def __init__(self, module_yaml, workflow_jobs): # module_yaml is the module yaml file - # yaml_jobs is a dictionary that points from name -> actual job class + # workflow_jobs is a dictionary that points from name -> actual job class self.err = "" try: with open(module_yaml, "r") as rh: self.data = yaml.load(rh) self.name = self.data.keys()[0] self.markers = {} - self.yaml_jobs = yaml_jobs + self.order = [] + self.workflow_jobs = workflow_jobs self._load_markers(self.data[self.name]) except: self.err += "Error parsing job template yaml file.\n" @@ -34,12 +35,13 @@ class WorkflowModule(object): # Finds all potential splitting points and their values for item in data: for key in item.keys(): - if key in self.yaml_jobs: + if key in self.workflow_jobs: return elif "[" in key and "]" in key: val, marker, dummy = re.split("[\[\]]", key) if marker not in self.markers: self.markers[marker] = [] + self.order.append(marker) if val not in self.markers[marker]: self.markers[marker].append(val) self._load_markers(item[key]) @@ -61,7 +63,7 @@ class WorkflowModule(object): def _load_required_files(self, data, markers, input_files = [], additional_files = [], output_files = []): for item in data: for key in item.keys(): - if key in self.yaml_jobs: + if key in self.workflow_jobs: for ftype, flist in zip(["inputs", "additional_inputs", "outputs"], [input_files, additional_files, output_files]): if item[key][ftype] is not None: for f in item[key][ftype]: @@ -73,10 +75,92 @@ class WorkflowModule(object): self._load_required_files(item[key], markers, input_files, additional_files, output_files) return (input_files, additional_files, output_files) - def print_required_files(self, markers): + def get_files(self, markers): valid, msg = self._check_input_markers(markers) if valid: - print self._load_required_files(self.data[self.name], markers) + input_files, additional_files, output_files = self._load_required_files(self.data[self.name], markers) + return valid, { + "inputs": input_files, + "additional_files": additional_files, + "output_files": output_files + } else: - print msg + return valid, msg + + def add_jobs(self, dax, master_jobs, master_files, prefix, markers, input_files, additional_files): + """ + dax -- The pegasus dax object from workflow + master_jobs -- A dictionary that maps jobname -> Job object + master_files -- A dicitonary that maps filename -> File object + prefix -- Used to prefix file and jobnames + markers -- Dictionary for splitting within module + input_files -- Dictionary of input_file name -> master_files index + additional_files -- dictionary of additional_file name -> master_files index + """ + valid = True + msg = "" + valid, msg, module_files = self._check_files(master_files, prefix, markers, input_files, additional_files) + self._add_output_files(master_files, prefix, markers, output_files) + self._traverse_jobs(self.data[self.name], dax, master_jobs, master_files, prefix, markers, input_files, additional_files) + return + + def _traverse_jobs(self, data, dax, master_jobs, master_files, prefix, markers, input_files, additional_files): + for item in data: + for key in item.keys(): + if key in self.workflow_jobs: + ## GENERATE JOB HERE!!! ### + job_info = item[key] + job_inputs = [{"name": input_files[fid], "file": master_files[input_files[fid]]} for fid in job_info["inputs"]] + job_additional = [{"name": additional_files[fid], "file": master_files[additional_files[fid]]} for fid in job_info["additional_inputs"]] + job_outputs = [] + for fid in job_info["outputs"]: + full_name = self._get_full_name(prefix, markers, fid) + master_files[full_name] = File(full_name) + job_outputs.append({"name": full_name, "file": master_files[full_name]}) + job = self.workflow_jobs[key].create_job(job_inputs, job_additional, job_outputs) + dax.addJob(job) + # NOW, WE add our job to the master_list, and check dependencies + # Python ALWAYS passes by reference, so even though + # we are a little redundant here, dependency checking + # will be easier later + for fd in job_outputs: + master_jobs[fd["name"]] = job + for fd in job_inputs: + if fd["name"] in master_jobs: + dax.depends(child = job, parent = master_jobs[fd["name"]]) + elif "[" in key and "]" in key: + val, marker, dummy = re.split("[\[\]]", key) + if markers[marker] == val: + self._traverse_jobs(item[key], dax, master_jobs, master_files, prefix, markers, input_files, additional_files) + return + + def _check_files(self, master_files, prefix, markers, input_files, additional_files): + valid = True + msg = "" + data = "" + valid, module_files = self.get_files(markers) + if valid: + data = module_files + for arg_files, file_key in zip([input_files, additional_files], ["inputs", "additional_files"]): + for fname in arg_files: + if fname not in module_files[file_key]: + valid = False + msg += "Error loading '%s' jobs. Supplied file with name '%s' does not exist in module %s.\n" % (self.name, fname, file_key) + if arg_files[fname] not in master_files: + valid = False + msg += "Error loading '%s' jobs. File with name '%s', value '%s', does not exist in master_files.\n" % (self.name, fname, arg_files[fname]) + for fname in module_files[file_key]: + if fname not in arg_files: + valid = False + msg += "Error loading '%s' jobs. Required file '%s' for %s not defined.\n" % (self.name, fname, file_key) + else: + valid = False + msg += "Error loading '%s' jobs. Could not load files.\n" % (self.name,) + return (valid, msg, data) + + def _get_full_name(self, prefix, markers, fname): + return "%s_%s_%s" % (prefix, "_".join([markers[x] for x in self.order]), fname) + + def _add_output_files(self, master_files, prefix, markers, output_files): + return