Commit 9c85952b authored by aknecht2's avatar aknecht2
Browse files

Added initial yamlJob class. Added tester file for individual tests. Added...

Added initial yamlJob class.  Added tester file for individual tests.  Added missing command entry to bwa_align_paired.yaml.  Added internal configuration
parent 23ad9c33
# Module directory
job_modules = "jobs/modules/"
# Job params directory
job_params = "jobs/params/"
# File extensions
file_extensions = {
"fna": ["fna"],
"fastq": ["fastq", "fastq.gz"],
"sam": ["sam"],
"bam": ["bam"],
"bed": ["bed", "narrowPeak", "broadPeak"]
}
...@@ -5,6 +5,7 @@ bwa_align_paired: ...@@ -5,6 +5,7 @@ bwa_align_paired:
- fastq - fastq
outputs: outputs:
- sam - sam
command: bwa
arguments: arguments:
- "mem": - "mem":
changeable: false changeable: false
......
...@@ -4,15 +4,15 @@ bwa_sai_to_sam: ...@@ -4,15 +4,15 @@ bwa_sai_to_sam:
memory: 2000 memory: 2000
bwa_align_single: bwa_align_single:
arguments: arguments:
- "-q": 5 "-q": 5
- "-l": 32 "-l": 32
- "-k": 2 "-k": 2
- "-t": 1 "-t": 1
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
bwa_align_paired: bwa_align_paired:
arguments: arguments:
- "-t": 1 "-t": 1
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
samtools_sam_to_bam: samtools_sam_to_bam:
......
import chipathlon
import chipathlon.yaml_job
def test_yaml_job():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job_1/params.yaml")
print yj.create_arg_list(["bwa_index.fna", "bwa_1.fastq", "bwa_2.fastq"])
return
test_yaml_job()
import yaml import yaml
import traceback
import sys
from Pegasus.DAX3 import * from Pegasus.DAX3 import *
import chipathlon
import chipathlon.conf
from pprint import pprint
class YamlJob(object): class YamlJob(object):
def __init__(self, yaml_file): def __init__(self, base_yaml, param_file):
with open(yaml_file, "r") as rh: self.err = ""
self.data = yaml.load(rh) try:
with open(chipathlon.conf.job_params + "/" + base_yaml + ".yaml", "r") as rh:
self.base = yaml.load(rh)
self.jobname = self.base.keys()[0]
except:
self.err += "Error parsing job template yaml file.\n"
self.err += traceback.format_exc()
try:
with open(param_file, "r") as rh:
params = yaml.load(rh)
self.params = params[self.jobname]
except:
self.err += "Error parsing params yaml file.\n"
self.err += traceback.format_exc()
self.validate()
return return
def createDaxJob(self): def validate(self):
# Maybe unnecessary abstraction?
self._validateArguments()
return
def _validate_arguments(self):
"""
Here, we loop through each argument in the base_yaml file,
then, we ensure that changeable arguments aren't defined in
the params, and that required arguments with no default
ARE defined in the params. Then, we loop through the list of
supplied args in the params file, and ensure there are no
'bad' entries
"""
valid_args = []
for argd in self.base[self.jobname]["arguments"]:
arg = argd.keys()[0]
arg_info = argd[arg]
if not arg_info["changeable"] and arg in self.params["arguments"]:
self.err += "Unchangeable argument '%s' specified in params file.\n" % (arg,)
if arg_info["required"] and arg_info["changeable"] and "default" not in arg_info:
if not arg in self.params["arguments"]:
self.err += "Required argument '%s' does not have a default, and is not defined in params file.\n" % (arg,)
if arg[:1] != "$":
valid_args.append(arg)
for arg in self.params["arguments"]:
if arg not in valid_args:
self.err += "Argument '%s' specified in params file does not exist.\n" % (arg,)
return
def check_file_names(self, file_names):
valid_files = True
if (len(file_names) == len(self.base[self.jobname]["inputs"])):
for i,f in enumerate(file_names):
if not f.split(".")[1] in chipathlon.conf.file_extensions[self.base[self.jobname]["inputs"][i]]:
self.err += "File '%s' is not of type '%s'. Should match one of '%s' extensions.\n" % (f, self.base[self.jobname]["inputs"][i], chipathlon.conf.file_extensions[self.base[self.jobname]["inputs"][i]])
valid_files = False
else:
self.err += "Number of file_names '%s' must match number of expected input files.\n" % (file_names,)
valid_files = False
return valid_files
def create_arg_list(self, file_names, lfns):
"""
Create the necessary argument list for pegasus.
To add arguments to the pegasus job use:
job.addArgumnets(*arg_list)
"""
if check_file_names(file_names):
arg_list = []
curr_input = 0
for argd in self.base[self.jobname]["arguments"]:
arg = argd.keys()[0]
arg_info = argd[arg]
if arg_info["changeable"] and arg_info["value"]:
if arg in self.params["arguments"]:
arg_list.append(arg + " " + self.params["arguments"][arg])
else:
arg_list.append(arg + " " + arg_info["value"]
else:
if arg_info["value"]:
if arg_info["value"][:1] == "$":
arg_list.append(arg)
arg_list.append(lfns[curr_input])
curr_input += 1
else:
arg_list.append(arg + " " + arg_info["value"])
else:
if arg[:1] == "$":
arg_list.append(lfns[curr_input])
curr_input += 1
else:
arg_list.append(arg)
return arg_list
else:
print self.err
return return
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment