Commit d52a0651 authored by aknecht2's avatar aknecht2
Browse files

Updated configuration info. Strengthened yaml validator. Adjusted names of workflow modules.

parent 9c85952b
...@@ -12,3 +12,12 @@ file_extensions = { ...@@ -12,3 +12,12 @@ file_extensions = {
"bam": ["bam"], "bam": ["bam"],
"bed": ["bed", "narrowPeak", "broadPeak"] "bed": ["bed", "narrowPeak", "broadPeak"]
} }
# param keys
param_keys = {
"required": ["arguments"],
"optional": ["walltime", "memory"]
}
# workflow order
workflow = ["align", "remove_duplicates", "peak_calling"]
bwa_sai_to_sam:
arguments: None
walltime: 2000
memory: 2000
bwa_align_single:
arguments:
"-q": 5
"-l": 32
"-k": 2
"-t": 1
walltime: 2000
memory: 2000
bwa_align_paired:
arguments:
"-M": ""
walltime: 2000
memory: 2000
samtools_sam_to_bam:
arguments: None
walltime: 2000
memory: 2000
bwa_sai_to_sam:
arguments: None
walltime: 2000
memory: 2000
bwa_align_single:
arguments:
"-q": 5
"-l": 32
"-k": 2
"-t": 1
walltime: 2000
memory: 2000
bwa_align_paired:
arguments: None
walltime: 2000
memory: 2000
batman: nana
samtools_sam_to_bam:
arguments: None
walltime: 2000
memory: 2000
import chipathlon import chipathlon
import chipathlon.yaml_job import chipathlon.yaml_job
def test_yaml_job(): # Shamelessly stolen from: https://svn.blender.org/svnroot/bf-blender/trunk/blender/build_files/scons/tools/bcolors.py
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job_1/params.yaml") class bcolors(object):
print yj.create_arg_list(["bwa_index.fna", "bwa_1.fastq", "bwa_2.fastq"]) HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def print_test(name, success):
if success:
print bcolors.OKBLUE + name + ": Success" + bcolors.ENDC
else:
print bcolors.FAIL + name + ": Failure" + bcolors.ENDC
return
# Test 1, valid inputs
def yaml_job_test_1():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params1.yaml")
arg_list = yj.create_arg_list(["bwa_index.fna", "bwa_1.fastq", "bwa_2.fastq"], ["lfn1", "lfn2", "lfn3"])
print_test("Yaml_Job_Test_1", arg_list == ['mem', '-M', '-t 1', 'lfn1', 'lfn2', 'lfn3'])
return
# Test2, invalid arguments
def yaml_job_test_2():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params2.yaml")
print_test("Yaml_Job_Test_2", not yj.valid())
return return
test_yaml_job() # Test3, non-yaml input file
def yaml_job_test_3():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params3.yaml")
print_test("Yaml_Job_Test_3", not yj.valid())
return
# Test4, ill-defined input file
def yaml_job_test_4():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params4.yaml")
print_test("Yaml_Job_Test_4", not yj.valid())
return
tests = [yaml_job_test_1, yaml_job_test_2, yaml_job_test_3, yaml_job_test_4]
for test in tests:
test()
...@@ -27,9 +27,24 @@ class YamlJob(object): ...@@ -27,9 +27,24 @@ class YamlJob(object):
self.validate() self.validate()
return return
def valid(self):
return False if self.err else True
def validate(self): def validate(self):
# Maybe unnecessary abstraction? # Maybe unnecessary abstraction?
self._validateArguments() if self.valid():
self._validate_input()
if self.valid():
self._validate_arguments()
return
def _validate_input(self):
for key in chipathlon.conf.param_keys["required"]:
if key not in self.params:
self.err += "Required key '%s' not defined for job '%s'.\n" % (key, self.jobname)
for key in self.params:
if key not in (chipathlon.conf.param_keys["required"] + chipathlon.conf.param_keys["optional"]):
self.err += "Key '%s' does not exist for job '%s'.\n" % (key, self.jobname)
return return
def _validate_arguments(self): def _validate_arguments(self):
...@@ -45,16 +60,19 @@ class YamlJob(object): ...@@ -45,16 +60,19 @@ class YamlJob(object):
for argd in self.base[self.jobname]["arguments"]: for argd in self.base[self.jobname]["arguments"]:
arg = argd.keys()[0] arg = argd.keys()[0]
arg_info = argd[arg] arg_info = argd[arg]
if not arg_info["changeable"] and arg in self.params["arguments"]: if self.params["arguments"] is not None:
self.err += "Unchangeable argument '%s' specified in params file.\n" % (arg,) if arg[:1] != "$":
valid_args.append(arg)
if not arg_info["changeable"] and arg in self.params["arguments"]:
self.err += "Unchangeable argument '%s' specified in params file.\n" % (arg,)
if arg_info["required"] and arg_info["changeable"] and "default" not in arg_info: if arg_info["required"] and arg_info["changeable"] and "default" not in arg_info:
if not arg in self.params["arguments"]: if not arg in self.params["arguments"]:
self.err += "Required argument '%s' does not have a default, and is not defined in params file.\n" % (arg,) self.err += "Required argument '%s' does not have a default, and is not defined in params file.\n" % (arg,)
if arg[:1] != "$":
valid_args.append(arg) if self.params["arguments"] is not None:
for arg in self.params["arguments"]: for arg in self.params["arguments"]:
if arg not in valid_args: if arg not in valid_args:
self.err += "Argument '%s' specified in params file does not exist.\n" % (arg,) self.err += "Argument '%s' specified in params file does not exist.\n" % (arg,)
return return
def check_file_names(self, file_names): def check_file_names(self, file_names):
...@@ -76,7 +94,7 @@ class YamlJob(object): ...@@ -76,7 +94,7 @@ class YamlJob(object):
To add arguments to the pegasus job use: To add arguments to the pegasus job use:
job.addArgumnets(*arg_list) job.addArgumnets(*arg_list)
""" """
if check_file_names(file_names): if self.check_file_names(file_names):
arg_list = [] arg_list = []
curr_input = 0 curr_input = 0
for argd in self.base[self.jobname]["arguments"]: for argd in self.base[self.jobname]["arguments"]:
...@@ -84,9 +102,9 @@ class YamlJob(object): ...@@ -84,9 +102,9 @@ class YamlJob(object):
arg_info = argd[arg] arg_info = argd[arg]
if arg_info["changeable"] and arg_info["value"]: if arg_info["changeable"] and arg_info["value"]:
if arg in self.params["arguments"]: if arg in self.params["arguments"]:
arg_list.append(arg + " " + self.params["arguments"][arg]) arg_list.append("%s %s" % (arg, self.params["arguments"][arg]))
else: else:
arg_list.append(arg + " " + arg_info["value"] arg_list.append("%s %s" % (arg, arg_info["value"]))
else: else:
if arg_info["value"]: if arg_info["value"]:
if arg_info["value"][:1] == "$": if arg_info["value"][:1] == "$":
...@@ -94,7 +112,7 @@ class YamlJob(object): ...@@ -94,7 +112,7 @@ class YamlJob(object):
arg_list.append(lfns[curr_input]) arg_list.append(lfns[curr_input])
curr_input += 1 curr_input += 1
else: else:
arg_list.append(arg + " " + arg_info["value"]) arg_list.append("%s %s" % (arg, arg_info["value"]))
else: else:
if arg[:1] == "$": if arg[:1] == "$":
arg_list.append(lfns[curr_input]) arg_list.append(lfns[curr_input])
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment