Commit a5561568 authored by aknecht2's avatar aknecht2
Browse files

Fixed param files to reference correct commands. Updated workflow job loading...

Fixed param files to reference correct commands.  Updated workflow job loading to use commands specified within yaml files.  Added a debug mode to workflow to print out additional statements.
parent 7c797795
......@@ -6,7 +6,7 @@ cat_awk_sort_peaks:
outputs:
- bed:
type: file
command: cat_spp
command: cat_spp.sh
arguments:
- "$inputs.0":
changeable: false
......
zcat_awk_sort_peaks:
mv_bed_tagalign:
inputs:
- bed:
type: file
......@@ -6,7 +6,7 @@ zcat_awk_sort_peaks:
outputs:
- bed:
type: file
command: mv_bed_tagalign
command: mv_bed_tagalign.sh
arguments:
- "$inputs.0":
changeable: false
......
......@@ -6,7 +6,7 @@ sort_awk_sort_peaks:
outputs:
- bed:
type: file
command: sort_macs2
command: sort_macs2.sh
arguments:
- "$inputs.0":
changeable: false
......
......@@ -6,7 +6,7 @@ zcat_awk_sort_peaks:
outputs:
- bed:
type: file
command: zcat_spp
command: zcat_spp.sh
arguments:
- "$inputs.0":
changeable: false
......
......@@ -18,13 +18,15 @@ from Pegasus.DAX3 import *
class Workflow(object):
def __init__(self, jobhome, run_file, param_file, config_file, host, username, password):
def __init__(self, jobhome, run_file, param_file, config_file, host, username, password, debug=False):
# Initialize db connection
self.mdb = chipathlon.db.MongoDB(host, username, password)
# Jobname info & err
self.jobhome = os.path.abspath(jobhome)
self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
self.err = ""
# debug mode, print out additional information
self.debug = debug
# Input file info
self.run_file = run_file
self.param_file = param_file
......@@ -93,6 +95,8 @@ class Workflow(object):
self.executables[ex_name] = Executable(name=ex_name, os=os_type, arch=arch)
self.executables[ex_name].addPFN(PFN("file://%s/%s" % (root, f), "local"))
self.dax.addExecutable(self.executables[ex_name])
if self.debug:
print "[LOAD_EXECUTABLE] %s" % (ex_name,)
break
# Load actual scripts
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
......@@ -100,6 +104,8 @@ class Workflow(object):
self.executables[f] = Executable(name=f, os=os_type, arch=arch)
self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "local"))
self.dax.addExecutable(self.executables[f])
if self.debug:
print "[LOAD_EXECUTABLE] %s" % (f,)
break
# Overwrite pegasus::transfer to request more walltime
"""
......@@ -115,15 +121,17 @@ class Workflow(object):
def _load_workflow_jobs(self):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
for f in files:
ex_name = f.split("_")[0]
with open(os.path.join(root, f), "r") as rh:
job_info = yaml.load(rh)
ex_name = job_info[job_info.keys()[0]]["command"]
if ex_name in self.executables:
yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
if not yj.err:
self.workflow_jobs[yj.jobname] = yj
else:
self.err += yj.err
else:
print "[INFO] Skipping param file %s, no corresponding executable found." % (f,)
elif self.debug:
print "[WARNING] Skipping param file %s, corresponding executable %s bit found." % (f,)
break
self._raise()
return
......
......@@ -89,17 +89,20 @@ class WorkflowModule(object):
def _load_markers(self, data):
# Finds all potential splitting points and their values
for item in data:
for key in item.keys():
if key in self.workflow_jobs:
return
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if marker not in self.markers:
self.markers[marker] = []
self.order.append(marker)
if val not in self.markers[marker]:
self.markers[marker].append(val)
self._load_markers(item[key])
try:
for key in item.keys():
if key in self.workflow_jobs:
return
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if marker not in self.markers:
self.markers[marker] = []
self.order.append(marker)
if val not in self.markers[marker]:
self.markers[marker].append(val)
self._load_markers(item[key])
except AttributeError as ae:
print "Error parsing %s in %s." % (item, self.name)
return
def _check_input_markers(self, markers):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment