Commit 9135d7b2 authored by aknecht2's avatar aknecht2
Browse files

Refactored module code. Updated job error handling. Added config file. Updated .gitignore.

parent 831f8a51
......@@ -2,4 +2,6 @@ build/
*.swp
*.pyc
*.DS_Store
dist/
chipathlon.egg-info/
MANIFEST
notify:
pegasus_home: "/usr/share/pegasus/"
email: "avi@kurtknecht.com"
profile:
pegasus:
style: "glite"
condor:
grid_resource: "pbs"
universe: "vanilla"
env:
PATH: "/work/walia/common/pymodules/anaconda/bin"
PYTHONPATH: "/work/walia/common/pymodules/anaconda/lib/python2.7/site-packages/"
......@@ -51,3 +51,7 @@ samtools_remove_duplicates:
arguments: null
walltime: 2000
memory: 2000
samtools_filter_bam:
arguments: null
walltime: 2000
memory: 2000
......@@ -65,7 +65,7 @@ class Workflow(object):
self._create_submit()
self._write()
except SystemExit as se:
print self.err
pass
if self.err:
print self.err
sys.exit(1)
......
......@@ -20,7 +20,7 @@ class WorkflowJob(object):
self.inputs = self.base[self.jobname]["inputs"]
self.outputs = self.base[self.jobname]["outputs"]
except:
self.err += "Error parsing job template yaml file.\n"
self.err += "Error parsing job template yaml file %s.\n" % (base_yaml, )
self.err += traceback.format_exc()
try:
with open(param_file, "r") as rh:
......@@ -35,16 +35,19 @@ class WorkflowJob(object):
self.validate()
return
def valid(self):
return False if self.err else True
def validate(self):
# Maybe unnecessary abstraction?
if self.valid():
self._validate_input()
if self.valid():
if not self.err:
try:
self._validate_input()
self._validate_arguments()
self._validate_resource_params()
self._validate_resource_params()
except SystemExit as se:
return
return
def _raise(self):
if self.err:
raise SystemExit(self.err)
return
def _validate_input(self):
......@@ -54,6 +57,7 @@ class WorkflowJob(object):
for key in self.params:
if key not in (chipathlon.conf.param_keys["required"] + chipathlon.conf.param_keys["optional"]):
self.err += "Specified key '%s' does not exist for job '%s'.\n" % (key, self.jobname)
self._raise()
return
def _validate_arguments(self):
......@@ -81,6 +85,7 @@ class WorkflowJob(object):
for arg in self.params["arguments"]:
if arg not in valid_args:
self.err += "Argument '%s' specified in params file does not exist.\n" % (arg,)
self._raise()
return
def _validate_resource_params(self):
......@@ -98,6 +103,7 @@ class WorkflowJob(object):
% (resource_type, self.jobname)
except KeyError:
pass
self._raise()
return
def _check_files(self, input_files, additional_inputs, output_files):
......@@ -133,7 +139,7 @@ class WorkflowJob(object):
self._add_job_resources(job)
return job
else:
print self.err
return None
return None
def _add_job_resources(self, job):
......
......@@ -8,6 +8,7 @@ import xml.dom.minidom
import yaml
import traceback
import re
import itertools
from pprint import pprint
from Pegasus.DAX3 import *
......@@ -21,16 +22,69 @@ class WorkflowModule(object):
try:
with open(module_yaml, "r") as rh:
self.data = yaml.load(rh)
except:
self.err += "Error parsing module template yaml file %s.\n" % (module_yaml, )
self.err += traceback.format_exc()
try:
self.name = self.data.keys()[0]
self.markers = {}
self.current_markers = {}
self.order = []
self.workflow = {}
self.workflow_jobs = workflow_jobs
self._load_markers(self.data[self.name])
except:
self.err += "Error parsing job template yaml file.\n"
self.err += traceback.format_exc()
# Deep copy self.order, want to maintain it
self._add_workflow_keys(self.workflow, list(self.order))
self._load_jobs(self.data[self.name], add_markers = dict(self.markers))
pprint(self.workflow)
except SystemExit as se:
pass
return
def _get_data_list(self, data, markers):
if markers:
rdata = data
for marker in self.order:
if marker in markers:
rdata = rdata[markers[marker]]
return rdata
else:
return data
return
def _add_workflow_keys(self, workflow, order, index = 0):
marker_key = order[index]
for marker in self.markers[marker_key]:
if index < len(order) - 1:
workflow[marker] = {}
new_index = index + 1
self._add_workflow_keys(workflow[marker], order, new_index)
else:
workflow[marker] = []
return
def _load_jobs(self, data, add_markers = {}):
# If we find a job, we need to add it for each
# possible marker combination found up to that point.
# If we find a marker, we need to split the workflow
# for all possible combinations
for item in data:
for key in item.keys():
if key in self.workflow_jobs:
for values in itertools.product(*[key_value[1] for key_value in add_markers.iteritems()]):
wlist = self._get_data_list(self.workflow, dict(zip(self.order, values)))
wlist.append(item)
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
new_add = dict(add_markers)
if marker in new_add:
new_add[marker] = []
if val not in new_add:
new_add[marker].append(val)
self._load_jobs(item[key], new_add)
return
def _load_markers(self, data):
# Finds all potential splitting points and their values
for item in data:
......@@ -60,20 +114,18 @@ class WorkflowModule(object):
valid = False
return (valid, msg)
def _load_required_files(self, data, markers, input_files = [], additional_files = [], output_files = [], prefix = None, master_files = {}):
for item in data:
for key in item.keys():
if key in self.workflow_jobs:
for ftype, flist in zip(["inputs", "additional_inputs", "outputs"], [input_files, additional_files, output_files]):
if item[key][ftype] is not None:
for f in item[key][ftype]:
if f not in flist and f not in output_files:
if prefix is None or self._get_full_name(prefix, markers, f) in master_files:
flist.append(f)
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if markers[marker] == val:
self._load_required_files(item[key], markers, input_files, additional_files, output_files, prefix, master_files)
def _load_required_files(self, markers, prefix = None, master_files = {}):
input_files = []
additional_files = []
output_files = []
job_list = self._get_data_list(self.workflow, markers)
for job_info in job_list:
for file_type, file_list in zip(["inputs", "additional_inputs", "outputs"], [input_files, additional_files, output_files]):
if job_info[ftype] is not None:
for file_id in job_info[file_type]:
if file_id not in file_list and file_id not in output_files:
if prefix is None or self._get_full_name(prefix, markers, file_id) in master_files:
file_list.append(f)
return (input_files, additional_files, output_files)
def get_files(self, markers, prefix = None, master_files = {}):
......@@ -104,51 +156,50 @@ class WorkflowModule(object):
self._traverse_jobs(self.data[self.name], dax, master_jobs, master_files, prefix, markers, input_files, additional_files)
return
def _traverse_jobs(self, data, dax, master_jobs, master_files, prefix, markers, input_files, additional_files):
for item in data:
for key in item.keys():
if key in self.workflow_jobs:
## GENERATE JOB HERE!!! ###
job_info = item[key]
job_inputs = []
job_additional = []
for ftype, fdict, joblist in zip(["inputs", "additional_inputs"], [input_files, additional_files], [job_inputs, job_additional]):
if job_info[ftype] is not None:
for fid in job_info[ftype]:
full_name = self._get_full_name(prefix, markers, fid)
if full_name in master_files:
joblist.append({
"name": full_name,
"file": master_files[full_name]
})
else:
joblist.append({
"name": fdict[fid],
"file": master_files[fdict[fid]]
})
job_outputs = []
for fid in job_info["outputs"]:
full_name = self._get_full_name(prefix, markers, fid)
master_files[full_name] = File(full_name)
job_outputs.append({"name": full_name, "file": master_files[full_name]})
job = self.workflow_jobs[key].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
else:
print "JOB ERROR for '%s'.\n" % (key,)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for fd in job_outputs:
master_jobs[fd["name"]] = job
for fd in job_inputs:
if fd["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[fd["name"]])
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if markers[marker] == val:
self._traverse_jobs(item[key], dax, master_jobs, master_files, prefix, markers, input_files, additional_files)
def _setup_file_list(self, master_files, job_info, file_type, file_dict, prefix, markers):
file_list = []
if job_info[file_type] is not None:
for file_id in job_info[file_type]:
full_name = self._get_full_name(prefix, markers, file_id)
if full_name in master_files:
file_list.append({
"name": full_name,
"file": master_files[full_name]
})
else:
file_list.append({
"name": file_dict[file_id],
"file": master_files[file_dict[file_id]]
})
return file_list
def _traverse_jobs(self, dax, master_jobs, master_files, prefix, markers, input_files, additional_files):
job_list = self._get_data_list(self.workflow, markers)
for job_info in job_list:
job_inputs = []
job_additional = []
job_inputs = self._setup_file_list(master_files, job_info, "inputs", input_files, prefix, markers)
job_additional = self._setup_file_list(master_files, job_info, "additional_inputs", additional_files, prefix, markers)
job_outputs = []
for file_id in job_info["outputs"]:
full_name = self._get_full_name(prefix, markers, file_id)
master_files[full_name] = File(full_name)
job_outputs.append({"name": full_name, "file": master_files[full_name]})
job = self.workflow_jobs[key].create_job(job_inputs, job_additional, job_outputs)
if job is not None:
dax.addJob(job)
else:
print "JOB ERROR for '%s'.\n" % (key,)
# NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking
# will be easier later and not very costly
for file_dict in job_outputs:
master_jobs[file_dict["name"]] = job
for file_dict in job_inputs:
if file_dict["name"] in master_jobs:
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return
def _check_files(self, master_files, prefix, markers, input_files, additional_files):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment