Commit 7c396945 authored by aknecht2's avatar aknecht2
Browse files

Updated yaml_job validation. Added workflow validation for modules,...

Updated yaml_job validation.  Added workflow validation for modules, yaml_jobs, and run files.  Added wrapper scripts for some commands.
parent 549e7cd0
build/
*.swp
*.pyc
*.DS_Store
MANIFEST
#!/bin/bash
module load bowtie2
bowtie2 "$@"
#!/bin/bash
module load bwa
bwa "$@"
#!/bin/bash
module load samtools
samtools "$@"
bwa_sai_to_sam:
arguments: null
walltime: 2000
memory: 2000
bwa_align_single:
arguments:
"-q": 5
"-l": 32
"-k": 2
"-t": 1
walltime: 2000
memory: 2000
bwa_align_paired:
arguments:
"-t": 1
walltime: 2000
memory: 2000
samtools_sam_to_bam:
arguments: null
walltime: 2000
memory: 2000
......@@ -2,6 +2,7 @@
import chipathlon
import chipathlon.yaml_job
import chipathlon.db
import chipathlon.workflow
import argparse
parser = argparse.ArgumentParser(description = "Perform a join between the experiment and sample collections.")
......@@ -87,6 +88,11 @@ def exp_files_test_5():
print_test("Exp_Files_Test_5", not valid and msg == "Experiment with id 'ENCSR000DKB' has '0' possible control inputs, and '3' possible experiment inputs.")
return
def workflow_test_1():
workflow = chipathlon.workflow.Workflow("test/run/", "test/run/run.yaml", "test/run/param.yaml", args.host, args.username, args.password)
workflow.generate()
return
def valid_samples():
has_samples, total = mdb.check_valid_samples()
print_test("Verify_All_Samples", has_samples == total)
......@@ -94,17 +100,18 @@ def valid_samples():
tests = [
yaml_job_test_1,
yaml_job_test_2,
yaml_job_test_3,
yaml_job_test_4,
exp_files_test_1,
exp_files_test_2,
exp_files_test_3,
exp_files_test_4,
exp_files_test_5,
valid_samples
]
for test in tests:
test()
# tests = [
# yaml_job_test_1,
# yaml_job_test_2,
# yaml_job_test_3,
# yaml_job_test_4,
# exp_files_test_1,
# exp_files_test_2,
# exp_files_test_3,
# exp_files_test_4,
# exp_files_test_5,
# valid_samples
# ]
# for test in tests:
# test()
workflow_test_1()
......@@ -5,11 +5,14 @@ import json
import datetime
import textwrap
import xml.dom.minidom
import yaml
import traceback
import chipathlon
from Pegasus.DAX3 import *
class Workflow(object):
def __init__(self, jobhome):
def __init__(self, jobhome, run_file, param_file, host, username, password):
"""
:param jobhome: The base directory for job submission.
:type jobhome: str
......@@ -17,33 +20,101 @@ class Workflow(object):
Creates a Workflow class based on the input directory. Only loads and
validates the config file by default.
"""
self.err = ""
# Initialize db connection
self.mdb = chipathlon.db.MongoDB(host, username, password)
# Jobname info & err
self.jobhome = os.path.abspath(jobhome)
self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
self.err = ""
# Input file info
self.run_file = run_file
self.param_file = param_file
# Dax specific info
self.dax = ADAG(self.jobname)
self.executables = {}
self.files = {"input": {}, "output": {}}
self.files = {}
self.jobs = {}
self.deps = {}
self.yaml_jobs = {}
self.modules = {}
return
def generate(self):
self.load_modules()
self.validate_run()
if not self.err:
self.load_yaml_jobs()
if not self.err:
print "Put the rest of the generating functions hurrrr."
else:
print self.err
sys.exit(1)
else:
print self.err
sys.exit(1)
return
def load_modules(self):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
for f in files:
with open("%s/%s" % (root, f), "r") as rh:
mdata = yaml.load(rh)
root_key = mdata.keys()[0]
self.modules[root_key] = mdata[root_key]
break
return
def validate_run(self):
try:
with open(self.run_file, "r") as rh:
self.run_data = yaml.load(rh)
for run in self.run_data:
for module_name in self.modules:
if isinstance(self.modules[module_name], dict):
keys = self.modules[module_name].keys()
if len(keys) > 1:
if module_name in run:
if run[module_name] not in keys:
self.err += "Error parsing run template file '%s'. Module '%s' has invalid entry '%s', should be one of %s.\n" % (self.run_file, module_name, run[module_name], keys)
else:
self.err += "Error parsing run template file '%s'. Module '%s' requires an entry, should be one of %s.\n" % (self.run_file, module_name, keys)
if "experiment" in run:
valid, msg, data = self.mdb.get_samples(run["experiment"])
print valid
print msg
print data
else:
self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,)
except:
self.err += "Error parsing run template yaml file.\n"
self.err += traceback.format_exc()
return
def _addFile(self, name, inout, site = "condorpool", path = None, dax = None):
def load_yaml_jobs(self):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
for f in files:
yj = chipathlon.yaml_job.YamlJob("%s/%s" % (root, f), self.param_file)
self.err += yj.err
self.yaml_jobs[yj.jobname] = yj
break
return
def _addFile(self, name, inout, site = "condorpool", path = None):
"""
:param name: Name of the file to add.
:type name: str
:param path: Path to the file. Only required for input files.
:type path: str
:param dax: Any dax can be passed in through this variable, otherwise use the internal dax.
:type dax: ADAG
Adds the inputted file to the dax, as well as the internal variable self.files
"""
dax = dax if dax else self.dax
self.files[dax][inout][name] = {"file": File(name), "path": ""}
self.files[inout][name] = {"file": File(name), "path": ""}
if inout == "input":
self.files[dax][inout][name]["path"] = path if path else name
self.files[dax][inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), site))
dax.addFile(self.files[dax][inout][name]["file"])
self.files[inout][name]["path"] = path if path else name
self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), site))
self.dax.addFile(self.files[dax][inout][name]["file"])
elif inout == "output":
self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
return
......
......@@ -11,7 +11,7 @@ class YamlJob(object):
def __init__(self, base_yaml, param_file):
self.err = ""
try:
with open(chipathlon.conf.job_params + "/" + base_yaml + ".yaml", "r") as rh:
with open(base_yaml, "r") as rh:
self.base = yaml.load(rh)
self.jobname = self.base.keys()[0]
except:
......@@ -24,6 +24,8 @@ class YamlJob(object):
except:
self.err += "Error parsing params yaml file.\n"
self.err += traceback.format_exc()
self.inputs = self.base[self.jobname]["inputs"]
self.outputs = self.base[self.jobname]["outputs"]
self.validate()
return
......@@ -68,7 +70,6 @@ class YamlJob(object):
if arg_info["required"] and arg_info["changeable"] and "default" not in arg_info:
if not arg in self.params["arguments"]:
self.err += "Required argument '%s' does not have a default, and is not defined in params file.\n" % (arg,)
if self.params["arguments"] is not None:
for arg in self.params["arguments"]:
if arg not in valid_args:
......@@ -92,7 +93,7 @@ class YamlJob(object):
"""
Create the necessary argument list for pegasus.
To add arguments to the pegasus job use:
job.addArgumnets(*arg_list)
job.addArguments(*arg_list)
"""
if self.check_file_names(file_names):
arg_list = []
......
......@@ -5,5 +5,8 @@ setup(
version="1.0",
packages=["chipathlon"],
license="???",
package_data={
"chipathlon": ["jobs/modules/*", "jobs/params/*", "jobs/wrappers/*"]
},
scripts = []
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment