Skip to content
Snippets Groups Projects
Commit 71b2ef14 authored by aknecht2's avatar aknecht2
Browse files

Renamed yaml_job to workflow_job. Updated validation to support has_value &...

Renamed yaml_job to workflow_job.  Updated validation to support has_value & default.  Workflow_module class can now generate jobs.
parent 46efb5d6
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@ align:
- bwa.sai
- bwa_sai_to_sam:
inputs:
- ref_genome.fa
- ref_genome
- bwa.sai
- download_1.fastq
additional_inputs: null
......
......@@ -8,7 +8,7 @@ import xml.dom.minidom
import yaml
import traceback
import chipathlon
import chipathlon.yaml_job
import chipathlon.workflow_job
import chipathlon.db
import chipathlon.workflow_module
from pprint import pprint
......@@ -33,30 +33,25 @@ class Workflow(object):
self.files = {}
self.jobs = {}
self.deps = {}
self.yaml_jobs = {}
self.workflow_jobs = {}
self.modules = {}
return
def info(self):
pprint(self.run_data)
pprint(self.yaml_jobs)
pprint(self.workflow_jobs)
pprint(self.modules)
pprint(self.executables)
return
def generate(self):
self._load_yaml_jobs()
self._load_modules()
if not self.err:
self._load_runs()
if not self.err:
print "Put the rest of the generating functions hurrrr."
while (not self.err):
self._load_executables()
self.info()
else:
print self.err
sys.exit(1)
else:
self._load_workflow_jobs()
self._load_modules()
#self._load_runs()
break
if self.err:
print self.err
sys.exit(1)
return
......@@ -82,7 +77,7 @@ class Workflow(object):
def _load_modules(self):
for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
for f in files:
mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.yaml_jobs)
mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs)
if not mod.err:
self.modules[mod.name] = mod
else:
......@@ -145,12 +140,13 @@ class Workflow(object):
self.err += "DB error: %s.\n" % (msg, )
return
def _load_yaml_jobs(self):
def _load_workflow_jobs(self):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
for f in files:
yj = chipathlon.yaml_job.YamlJob(os.path.join(root, f), self.param_file)
ex_name = f.split("_")[0]
yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
if not yj.err:
self.yaml_jobs[yj.jobname] = yj
self.workflow_jobs[yj.jobname] = yj
else:
self.err += yj.err
break
......
......@@ -7,10 +7,11 @@ import chipathlon.conf
from pprint import pprint
class YamlJob(object):
class WorkflowJob(object):
def __init__(self, base_yaml, param_file):
def __init__(self, base_yaml, param_file, executable):
self.err = ""
self.executable = executable
try:
with open(base_yaml, "r") as rh:
self.base = yaml.load(rh)
......@@ -93,9 +94,8 @@ class YamlJob(object):
valid_files = False
return valid_files
def create_job(self, executable, input_files, additional_inputs, output_files):
def create_job(self, input_files, additional_inputs, output_files):
"""
- executable should be the pegasus class
- input_files should be a list of dictionaries:
[{"name": FILE_NAME, "file": FILE_OBJECT},...]
- additional_files should be a list of dictionaries:
......@@ -104,7 +104,7 @@ class YamlJob(object):
[{"name": FILE_NAME, "file": FILE_OBJECT},...]
"""
if self._check_files(input_files, additional_inputs, output_files):
job = Job(executable)
job = Job(self.executable)
for f in input_files + additional_inputs:
job.uses(f["file"], link=Link.INPUT)
for f in output_files:
......@@ -129,19 +129,19 @@ class YamlJob(object):
for argd in self.base[self.jobname]["arguments"]:
arg = argd.keys()[0]
arg_info = argd[arg]
if arg_info["changeable"] and arg_info["value"]:
if arg_info["changeable"] and arg_info["has_value"]:
if arg in self.params["arguments"]:
arg_list.append("%s %s" % (arg, self.params["arguments"][arg]))
else:
arg_list.append("%s %s" % (arg, arg_info["value"]))
arg_list.append("%s %s" % (arg, arg_info["default"]))
else:
if arg_info["value"]:
if arg_info["value"][:1] == "$":
key, index = arg_info["value"].split(".")[0]
if arg_info["has_value"]:
if arg_info["default"][:1] == "$":
key, index = arg_info["default"].split(".")[0]
arg_list.append(arg)
arg_list.append(file_map[key][index]["file"])
else:
arg_list.append("%s %s" % (arg, arg_info["value"]))
arg_list.append("%s %s" % (arg, arg_info["default"]))
else:
if arg[:1] == "$":
key, index = arg.split(".")[0]
......
......@@ -14,16 +14,17 @@ from Pegasus.DAX3 import *
class WorkflowModule(object):
def __init__(self, module_yaml, yaml_jobs):
def __init__(self, module_yaml, workflow_jobs):
# module_yaml is the module yaml file
# yaml_jobs is a dictionary that points from name -> actual job class
# workflow_jobs is a dictionary that points from name -> actual job class
self.err = ""
try:
with open(module_yaml, "r") as rh:
self.data = yaml.load(rh)
self.name = self.data.keys()[0]
self.markers = {}
self.yaml_jobs = yaml_jobs
self.order = []
self.workflow_jobs = workflow_jobs
self._load_markers(self.data[self.name])
except:
self.err += "Error parsing job template yaml file.\n"
......@@ -34,12 +35,13 @@ class WorkflowModule(object):
# Finds all potential splitting points and their values
for item in data:
for key in item.keys():
if key in self.yaml_jobs:
if key in self.workflow_jobs:
return
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if marker not in self.markers:
self.markers[marker] = []
self.order.append(marker)
if val not in self.markers[marker]:
self.markers[marker].append(val)
self._load_markers(item[key])
......@@ -61,7 +63,7 @@ class WorkflowModule(object):
def _load_required_files(self, data, markers, input_files = [], additional_files = [], output_files = []):
for item in data:
for key in item.keys():
if key in self.yaml_jobs:
if key in self.workflow_jobs:
for ftype, flist in zip(["inputs", "additional_inputs", "outputs"], [input_files, additional_files, output_files]):
if item[key][ftype] is not None:
for f in item[key][ftype]:
......@@ -73,10 +75,92 @@ class WorkflowModule(object):
self._load_required_files(item[key], markers, input_files, additional_files, output_files)
return (input_files, additional_files, output_files)
def print_required_files(self, markers):
def get_files(self, markers):
valid, msg = self._check_input_markers(markers)
if valid:
print self._load_required_files(self.data[self.name], markers)
input_files, additional_files, output_files = self._load_required_files(self.data[self.name], markers)
return valid, {
"inputs": input_files,
"additional_files": additional_files,
"output_files": output_files
}
else:
print msg
return valid, msg
def add_jobs(self, dax, master_jobs, master_files, prefix, markers, input_files, additional_files):
"""
dax -- The pegasus dax object from workflow
master_jobs -- A dictionary that maps jobname -> Job object
master_files -- A dicitonary that maps filename -> File object
prefix -- Used to prefix file and jobnames
markers -- Dictionary for splitting within module
input_files -- Dictionary of input_file name -> master_files index
additional_files -- dictionary of additional_file name -> master_files index
"""
valid = True
msg = ""
valid, msg, module_files = self._check_files(master_files, prefix, markers, input_files, additional_files)
self._add_output_files(master_files, prefix, markers, output_files)
self._traverse_jobs(self.data[self.name], dax, master_jobs, master_files, prefix, markers, input_files, additional_files)
return
def _traverse_jobs(self, data, dax, master_jobs, master_files, prefix, markers, input_files, additional_files):
for item in data:
for key in item.keys():
if key in self.workflow_jobs:
## GENERATE JOB HERE!!! ###
job_info = item[key]
job_inputs = [{"name": input_files[fid], "file": master_files[input_files[fid]]} for fid in job_info["inputs"]]
job_additional = [{"name": additional_files[fid], "file": master_files[additional_files[fid]]} for fid in job_info["additional_inputs"]]
job_outputs = []
for fid in job_info["outputs"]:
full_name = self._get_full_name(prefix, markers, fid)
master_files[full_name] = File(full_name)
job_outputs.append({"name": full_name, "file": master_files[full_name]})
job = self.workflow_jobs[key].create_job(job_inputs, job_additional, job_outputs)
dax.addJob(job)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later
for fd in job_outputs:
master_jobs[fd["name"]] = job
for fd in job_inputs:
if fd["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[fd["name"]])
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if markers[marker] == val:
self._traverse_jobs(item[key], dax, master_jobs, master_files, prefix, markers, input_files, additional_files)
return
def _check_files(self, master_files, prefix, markers, input_files, additional_files):
valid = True
msg = ""
data = ""
valid, module_files = self.get_files(markers)
if valid:
data = module_files
for arg_files, file_key in zip([input_files, additional_files], ["inputs", "additional_files"]):
for fname in arg_files:
if fname not in module_files[file_key]:
valid = False
msg += "Error loading '%s' jobs. Supplied file with name '%s' does not exist in module %s.\n" % (self.name, fname, file_key)
if arg_files[fname] not in master_files:
valid = False
msg += "Error loading '%s' jobs. File with name '%s', value '%s', does not exist in master_files.\n" % (self.name, fname, arg_files[fname])
for fname in module_files[file_key]:
if fname not in arg_files:
valid = False
msg += "Error loading '%s' jobs. Required file '%s' for %s not defined.\n" % (self.name, fname, file_key)
else:
valid = False
msg += "Error loading '%s' jobs. Could not load files.\n" % (self.name,)
return (valid, msg, data)
def _get_full_name(self, prefix, markers, fname):
return "%s_%s_%s" % (prefix, "_".join([markers[x] for x in self.order]), fname)
def _add_output_files(self, master_files, prefix, markers, output_files):
return
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment