Commit ff96a951 authored by aknecht2's avatar aknecht2
Browse files

Updated genome & run validation. More peak call fixes.

parent 3eb1cb64
......@@ -107,6 +107,12 @@ param_keys = {
"optional": ["arguments"] + resources.keys()
}
# run keys
run_keys = {
"required": ["experiment", "align", "peak", "genome"],
"optional": []
}
# workflow order
workflow = ["align", "remove_duplicates", "peak_calling"]
......
......@@ -13,7 +13,9 @@ peak_call:
- prefix:
type: file
additional_inputs: null
outputs: gem.narrowPeak
outputs:
- gem.narrowPeak:
type: file
- spp[tool]:
- cp_bed_tagalign:
inputs:
......
......@@ -6,7 +6,7 @@ gem_callpeak:
type: file
- bed:
type: file
- genome:
- genome_dir:
type: argument
- prefix:
type: argument
......
......@@ -36,7 +36,15 @@ class Workflow(object):
self.param_file = param_file
self.config_file = config_file
with open(self.config_file, "r") as rh:
self.config = yaml.load(rh)
try:
self.config = yaml.load(rh)
except yaml.YAMLError as exc:
self.err += "Error parsing config template file '%s': %s.\n" % (self.config_file, exc)
with open(self.run_file, "r") as rh:
try:
self.run_data = yaml.load(rh)
except yaml.YAMLError as exc:
self.err += "Error parsing run template file '%s': %s.\n" % (self.run_file, exc)
# Dax specific info
self.dax = ADAG(self.jobname)
self.executables = {}
......@@ -59,6 +67,7 @@ class Workflow(object):
self._load_executables()
self._load_workflow_jobs()
self._load_modules()
self._load_genomes()
self._load_runs()
# All required information is loaded, start queuing jobs
self._create_setup()
......@@ -86,6 +95,14 @@ class Workflow(object):
self.dax.addFile(f)
return
def _add_executable(self, name, path, os_type="linux", arch="x86_64", site="local"):
self.executables[name] = Executable(name=name, os=os_type, arch=arch)
self.executables[name].addPFN(PFN(os.path.join("file://", path), site))
self.dax.addExecutable(self.executables[name])
if self.debug:
print "[LOAD_EXECUTABLE] %s" % (name,)
return
def _raise(self):
if self.err:
raise SystemExit(self.err)
......@@ -95,29 +112,16 @@ class Workflow(object):
# Load wrapper scripts for commands that need to be loaded from module
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)):
for f in files:
ex_name = f.split("_")[0]
self.executables[ex_name] = Executable(name=ex_name, os=os_type, arch=arch)
self.executables[ex_name].addPFN(PFN("file://%s/%s" % (root, f), "local"))
self.dax.addExecutable(self.executables[ex_name])
if self.debug:
print "[LOAD_EXECUTABLE] %s" % (ex_name,)
self._add_executable(f.split(0), os.path.join(root, f))
break
# Load actual scripts
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
for f in files:
self.executables[f] = Executable(name=f, os=os_type, arch=arch)
self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "local"))
self.dax.addExecutable(self.executables[f])
if self.debug:
print "[LOAD_EXECUTABLE] %s" % (f,)
self._add_executable(f, os.path.join(root, f))
break
# Handle necessary installed scripts
for cmd in chipathlon.conf.system_commands:
self.executables[cmd] = Executable(name=cmd, os=os_type, arch=arch)
self.executables[cmd].addPFN(PFN("file://%s/%s" % (chipathlon.conf.system_path, cmd), "local"))
self.dax.addExecutable(self.executables[cmd])
if self.debug:
print "[LOAD_EXECUTABLE] %s" % (cmd,)
self._add_executable(cmd, os.path.join(chipathlon.conf.system_path, cmd))
self._raise()
return
......@@ -160,108 +164,99 @@ class Workflow(object):
def _load_runs(self):
# Load run data & get samples
with open(self.run_file, "r") as rh:
try:
self.run_data = yaml.load(rh)
except yaml.YAMLError as exc:
self.err += "Error parsing run template file '%s'. %s.\n" % (self.run_file, exc)
if not self.err:
checked = []
for run in self.run_data["runs"]:
for module_name in self.modules:
if isinstance(self.modules[module_name], dict):
keys = self.modules[module_name].keys()
if len(keys) > 1:
if module_name in run:
if run[module_name] not in keys:
self.err += "Error parsing run template file '%s'. Module '%s' has invalid entry '%s', should be one of %s.\n" % (self.run_file, module_name, run[module_name], keys)
else:
self.err += "Error parsing run template file '%s'. Module '%s' requires an entry, should be one of %s.\n" % (self.run_file, module_name, keys)
if "experiment" in run:
valid, msg, data = self.mdb.get_samples(run["experiment"])
if valid:
run["data"] = data
else:
self.err += msg
# This will be changed to _check_genomes(run) eventually.
# self._check_genomes(run)
# UNCOMMENT THIS:
if run["align"] not in checked:
self._check_grch38(run)
checked.append(run["align"])
checked = []
for run in self.run_data["runs"]:
for module_name in self.modules:
print module_name
if isinstance(self.modules[module_name], dict):
print "umm"
keys = self.modules[module_name].keys()
if len(keys) > 1:
print "lenkeys"
if module_name in run:
print "modname in run"
if run[module_name] not in keys:
self.err += "Error parsing run template file '%s'. Module '%s' has invalid entry '%s', should be one of %s.\n" % (self.run_file, module_name, run[module_name], keys)
else:
self.err += "Error parsing run template file '%s'. Module '%s' requires an entry, should be one of %s.\n" % (self.run_file, module_name, keys)
if "experiment" in run:
valid, msg, data = self.mdb.get_samples(run["experiment"])
if valid:
run["data"] = data
else:
self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,)
self.err += msg
# This will be changed to _check_genomes(run) eventually.
# self._check_genomes(run)
# UNCOMMENT THIS:
check = (run["align"], run["tool"])
if check not in checked:
valid, msg = self._valid_genome(run)
if not valid:
self.err += msg
checked.append(check)
else:
self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,)
self._raise()
return
def _check_grch38(self, run):
if run["align"] in self.run_data["genomes"]:
assembly = "grch38p6"
gen_prefix = "genome_%s_%s" % (run["align"], assembly)
if assembly in self.run_data["genomes"][run["align"]]:
base_file = self.run_data["genomes"][run["align"]][assembly]
def _load_genome_files(self, base_file, base_file_name, base_file_ext, assembly, tool):
genome_files = self.run_data["genomes"][assembly][tool + "_files"] = {}
genome_files["base_file"] = base_file_name
genome_files["additional_files"] = {}
if base_file_ext in chipathlon.conf.genomes[tool]["base_file"]:
prefix = os.path.splitext(base_file)[0]
missing = []
for ext in chipathlon.conf.genomes[tool]["additional_files"]:
if not os.path.isfile(prefix + "." + ext) and not os.path.isfile(base_file + "." + ext):
missing.append(ext)
else:
no_ext = os.path.isfile(prefix + "." + ext)
name = "%s.%s%s" % (gen_prefix, "" if no_ext else base_file_ext + ".", ext)
path = "%s.%s" % (prefix if no_ext else base_file, ext)
self._add_file(name, path, "local")
genome_files["additional_files"][ext] = name
if len(missing) > 0:
self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension. Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
return
def _load_genomes(self):
for assembly in self.run_data["genomes"]:
# Handle chrom.sizes separately
for tool in [key for key in self.run_data["genomes"][assembly] if key != "chrom.sizes"]:
gen_prefix = "genome_%s_%s" % (tool, assembly)
base_file = self.run_data["genomes"][assembly][tool]
if os.path.isfile(base_file):
(base_file_prefix, base_file_ext) = os.path.splitext(os.path.basename(base_file))
base_file_ext = base_file_ext[1:]
base_file_name = gen_prefix + "." + base_file_ext
base_file_name = "%s.%s" % (gen_prefix, base_file_ext)
self._add_file(base_file_name, base_file, "local")
genome_files = self.run_data["genomes"][run["align"]][assembly + "_files"] = {}
genome_files["base_file"] = base_file_name
genome_files["additional_files"] = {}
if base_file_ext in chipathlon.conf.genomes[run["align"]]["base_file"]:
prefix = os.path.splitext(base_file)[0]
missing = []
for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
if not os.path.isfile(prefix + "." + ext) and not os.path.isfile(base_file + "." + ext):
missing.append(ext)
else:
no_ext = os.path.isfile(prefix + "." + ext)
name = "%s.%s%s" % (gen_prefix, "" if no_ext else base_file_ext + ".", ext)
path = "%s.%s" % (prefix if no_ext else base_file, ext)
self._add_file(name, path, "local")
genome_files["additional_files"][ext] = name
if len(missing) > 0:
self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension. Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
self._load_genome_files(base_file, base_file_name, base_file_ext, assembly, tool)
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
self.err += "Genome defined with tool '%s' and assembly '%s' is missing base file %s.\n" % (tool, assembly, base_file)
# Check chrom.sizes
if "chrom.sizes" in self.run_data["genomes"][assembly]:
if os.path.isfile(self.run_data["genomes"][assembly]["chrom.sizes"]):
chrom_name = "%s_%s" % (assembly, os.path.basename(self.run_data["genomes"][assembly]["chrom.sizes"]))
self._add_file(chrom_name, self.run_data["genomes"][assembly]["chrom.sizes"], "local")
self.run_data["genomes"][assembly]["chrom.sizes_file"] = chrom_name
else:
self.err += "Genome defined with assembly '%s' is missing chrom.sizes file %s.\n" % (assembly, self.run_data["genomes"][assembly]["chrom.sizes"])
else:
self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
else:
self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
self._raise()
self.err += "Genome defined with assembly '%s' does not have definition fro chrom.sizes.\n" % (assembly,)
return
def _check_genomes(self, run):
# NOTE: THIS function should NOT be called.
# We will only be using GRCH38.p6 genome.
# Use the _check_grch38 function instead
valid, msg, assembly = self.mdb.get_assembly(run["experiment"])
if valid:
if run["align"] in self.run_data["genomes"]:
if assembly in self.run_data["genomes"][run["align"]]:
base_file = self.run_data["genomes"][run["align"]][assembly]
if os.path.isfile(base_file):
if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]:
prefix = base_file.split(".", 1)[0]
missing = []
for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
if not os.path.isfile(prefix + "." + ext):
missing.append(ext)
if len(missing) > 0:
self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension. Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
else:
self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
def _valid_genome(self, run):
valid = False
if run["genome"] in self.run_data["genomes"]:
if run["align"] in self.run_data["genomes"][run["genome"]]:
valid = True
else:
self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
msg = "Alignment tool '%s' not defined for genome '%s'." % (run["align"], run["genome"])
else:
self.err += "DB error: %s.\n" % (msg, )
return
msg = "Run genome '%s' not defined in genome data." % (run["genome"],)
return (valid, msg)
def _make_set(self, file_list):
return_data = []
......@@ -458,8 +453,8 @@ class Workflow(object):
if run["peak"] == "macs2":
inputs["prefix"] = run["prefix"]["peak_call"]
elif run["peak"] == "gem":
inputs["chrom.sizes"] =
inputs["genome_dir"] =
inputs["chrom.sizes"] =
inputs["genome_dir"] =
inputs["prefix"] = run["prefix"]["peak_call"]
pass
run["markers"]["peak_call"] = markers
......
......@@ -342,7 +342,7 @@ class WorkflowJob(object):
arg_list.append(add_value)
else:
arg_list.append("%s %s" % (arg_name, add_value))
else:
arg_list.append(add_value)
return arg_list
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment