Commit 0119ee2a authored by aknecht2's avatar aknecht2
Browse files

Added genome validation. Added module loading with minor validation. Added...

Added genome validation.  Added module loading with minor validation.  Added required file detection in modules.
parent dc6a2061
...@@ -15,7 +15,7 @@ file_extensions = { ...@@ -15,7 +15,7 @@ file_extensions = {
"bam": ["bam"], "bam": ["bam"],
"bed": ["bed", "narrowPeak", "broadPeak"], "bed": ["bed", "narrowPeak", "broadPeak"],
"bwa_genome": ["amb", "ann", "bwt", "pac", "sac"], "bwa_genome": ["amb", "ann", "bwt", "pac", "sac"],
"bowtie2_genome": [".bt2"] "bowtie2_genome": ["1.bt2", "2.bt2", "3.bt2", "4.bt2", "rev.1.bt2", "rev.2.bt2"]
} }
# param keys # param keys
...@@ -26,3 +26,15 @@ param_keys = { ...@@ -26,3 +26,15 @@ param_keys = {
# workflow order # workflow order
workflow = ["align", "remove_duplicates", "peak_calling"] workflow = ["align", "remove_duplicates", "peak_calling"]
# genome info
genomes = {
"bwa": {
"base_file": file_extensions["genome_index"],
"additional_files": file_extensions["bwa_genome"]
},
"bowtie2": {
"base_file": file_extensions["genome_index"],
"additional_files": file_extensions["bowtie2_genome"]
}
}
...@@ -50,6 +50,26 @@ class MongoDB(object): ...@@ -50,6 +50,26 @@ class MongoDB(object):
has_samples += 1 has_samples += 1
return (has_samples, total) return (has_samples, total)
def get_assembly(self, experiment_id):
valid = True
msg = ""
data = ""
cursor = self.db.experiments.find({
"target": {"$exists": True},
"revoked_files.0": {"$exists": False},
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False},
"@id": "/experiments/%s/" % (experiment_id,)
})
if cursor.count() == 1:
document = cursor.next()
data = document["assembly"][0]
msg = "Succesfully retrieved assembly for experiment with id '%s'." % (experiment_id,)
else:
valid = False
msg = "Experiment with id '%s' does not exist." % (experiment_id,)
return (valid, msg, data)
def get_samples(self, experiment_id): def get_samples(self, experiment_id):
valid = True valid = True
msg = "" msg = ""
......
...@@ -65,9 +65,9 @@ align: ...@@ -65,9 +65,9 @@ align:
- ref_genome.rev.2.bt2 - ref_genome.rev.2.bt2
outputs: outputs:
- align.sam - align.sam
- samtools_sam_to_bam: - samtools_sam_to_bam:
inputs: inputs:
- align.sam - align.sam
additional_inputs: null additional_inputs: null
outputs: outputs:
- align.bam - align.bam
peak:
macs2:
- run_macs2
- sort_macs2
spp:
- run_spp
- unzip_spp
duplicates:
- samtools_filter
- picard_sort
- picard_remove_duplicates
- bedtools_bam_to_bed
bwa_align_paired: bowtie2_align_paired:
inputs: inputs:
- genome_index - genome_index
- fastq - fastq
......
bwa_align_single: bowtie2_align_single:
inputs: inputs:
- genome_index - genome_index
- fastq - fastq
......
...@@ -15,6 +15,14 @@ bwa_align_paired: ...@@ -15,6 +15,14 @@ bwa_align_paired:
"-t": 1 "-t": 1
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
bowtie2_align_single:
arguments: null
walltime: 2000
memory: 2000
bowtie2_align_paired:
arguments: null
walltime: 2000
memory: 2000
samtools_sam_to_bam: samtools_sam_to_bam:
arguments: null arguments: null
walltime: 2000 walltime: 2000
......
...@@ -8,6 +8,9 @@ import xml.dom.minidom ...@@ -8,6 +8,9 @@ import xml.dom.minidom
import yaml import yaml
import traceback import traceback
import chipathlon import chipathlon
import chipathlon.yaml_job
import chipathlon.db
import chipathlon.workflow_module
from pprint import pprint from pprint import pprint
from Pegasus.DAX3 import * from Pegasus.DAX3 import *
...@@ -42,14 +45,14 @@ class Workflow(object): ...@@ -42,14 +45,14 @@ class Workflow(object):
return return
def generate(self): def generate(self):
self.load_modules() self._load_yaml_jobs()
self.load_runs() self._load_modules()
if not self.err: if not self.err:
self.load_yaml_jobs() self._load_runs()
if not self.err: if not self.err:
print "Put the rest of the generating functions hurrrr." print "Put the rest of the generating functions hurrrr."
self.load_executables() self._load_executables()
# self.info() self.info()
else: else:
print self.err print self.err
...@@ -59,7 +62,7 @@ class Workflow(object): ...@@ -59,7 +62,7 @@ class Workflow(object):
sys.exit(1) sys.exit(1)
return return
def load_executables(self, os_type="linux", arch="x86_64"): def _load_executables(self, os_type="linux", arch="x86_64"):
# Load wrapper scripts for commands that need to be loaded from module # Load wrapper scripts for commands that need to be loaded from module
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)): for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)):
for f in files: for f in files:
...@@ -74,23 +77,25 @@ class Workflow(object): ...@@ -74,23 +77,25 @@ class Workflow(object):
self.executables[f] = Executable(name=f, os=os_type, arch=arch) self.executables[f] = Executable(name=f, os=os_type, arch=arch)
self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool")) self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
self.dax.addExecutable(self.executables[f]) self.dax.addExecutable(self.executables[f])
break
return return
def load_modules(self): def _load_modules(self):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)): for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
for f in files: for f in files:
with open("%s/%s" % (root, f), "r") as rh: mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.yaml_jobs)
mdata = yaml.load(rh) if not mod.err:
root_key = mdata.keys()[0] self.modules[mod.name] = mod
self.modules[root_key] = mdata[root_key] else:
self.err += mod.err
break break
return return
def load_runs(self): def _load_runs(self):
try: try:
with open(self.run_file, "r") as rh: with open(self.run_file, "r") as rh:
self.run_data = yaml.load(rh) self.run_data = yaml.load(rh)
for run in self.run_data: for run in self.run_data["runs"]:
for module_name in self.modules: for module_name in self.modules:
if isinstance(self.modules[module_name], dict): if isinstance(self.modules[module_name], dict):
keys = self.modules[module_name].keys() keys = self.modules[module_name].keys()
...@@ -106,6 +111,7 @@ class Workflow(object): ...@@ -106,6 +111,7 @@ class Workflow(object):
run["data"] = data run["data"] = data
else: else:
self.err += msg self.err += msg
self._check_genomes(run)
else: else:
self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,) self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,)
except: except:
...@@ -113,12 +119,41 @@ class Workflow(object): ...@@ -113,12 +119,41 @@ class Workflow(object):
self.err += traceback.format_exc() self.err += traceback.format_exc()
return return
def load_yaml_jobs(self): def _check_genomes(self, run):
valid, msg, assembly = self.mdb.get_assembly(run["experiment"])
if valid:
if run["align"] in self.run_data["genomes"]:
if assembly in self.run_data["genomes"][run["align"]]:
base_file = self.run_data["genomes"][run["align"]][assembly]
if os.path.isfile(base_file):
if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]:
prefix = base_file.split(".", 1)[0]
missing = []
for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
if not os.path.isfile(prefix + "." + ext):
missing.append(ext)
if len(missing) > 0:
self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension. Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
else:
self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
else:
self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
else:
self.err += "DB error: %s.\n" % (msg, )
return
def _load_yaml_jobs(self):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)): for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
for f in files: for f in files:
yj = chipathlon.yaml_job.YamlJob("%s/%s" % (root, f), self.param_file) yj = chipathlon.yaml_job.YamlJob(os.path.join(root, f), self.param_file)
self.err += yj.err if not yj.err:
self.yaml_jobs[yj.jobname] = yj self.yaml_jobs[yj.jobname] = yj
else:
self.err += yj.err
break break
return return
......
...@@ -7,12 +7,12 @@ import textwrap ...@@ -7,12 +7,12 @@ import textwrap
import xml.dom.minidom import xml.dom.minidom
import yaml import yaml
import traceback import traceback
import chipathlon import re
from pprint import pprint from pprint import pprint
from Pegasus.DAX3 import * from Pegasus.DAX3 import *
class WorkflowModule(Object): class WorkflowModule(object):
def __init__(self, module_yaml, yaml_jobs): def __init__(self, module_yaml, yaml_jobs):
# module_yaml is the module yaml file # module_yaml is the module yaml file
...@@ -22,32 +22,61 @@ class WorkflowModule(Object): ...@@ -22,32 +22,61 @@ class WorkflowModule(Object):
with open(module_yaml, "r") as rh: with open(module_yaml, "r") as rh:
self.data = yaml.load(rh) self.data = yaml.load(rh)
self.name = self.data.keys()[0] self.name = self.data.keys()[0]
self.input_files = [] self.markers = {}
self.additional_inputs = []
self.output_files = []
self.splits = {}
self.workflow = {}
self.yaml_jobs = yaml_jobs self.yaml_jobs = yaml_jobs
self._construct() self._load_markers(self.data[self.name])
except: except:
self.err += "Error parsing job template yaml file.\n" self.err += "Error parsing job template yaml file.\n"
self.err += traceback.format_exc() self.err += traceback.format_exc()
return return
def _construct(self): def _load_markers(self, data):
# Here's where things get complicated. # Finds all potential splitting points and their values
# We need to maintain a complex structure for item in data:
# of actual jobs, and where they split. for key in item.keys():
# From there, we will be able to if key in self.yaml_jobs:
# specify exactly what inputs a module needs return
# to build correctly. elif "[" in key and "]" in key:
for key in self.data[self.name]: val, marker, dummy = re.split("[\[\]]", key)
self.workflow[key] = self._construct_mod(self.data[self.name][key]) if marker not in self.markers:
self.markers[marker] = []
if val not in self.markers[marker]:
self.markers[marker].append(val)
self._load_markers(item[key])
return return
def _construct_mod(self, data): def _check_input_markers(self, markers):
# We always have a list! valid = True
msg = ""
for marker in markers:
if marker in self.markers:
if markers[marker] not in self.markers[marker]:
msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (marker, markers[marker], self.markers[marker])
valid = False
else:
msg += "Marker '%s' does not exist.\n" % (marker,)
valid = False
return (valid, msg)
def _load_required_files(self, data, markers, input_files = [], additional_files = [], output_files = []):
for item in data: for item in data:
if item not in self.yaml_jobs: for key in item.keys():
pass if key in self.yaml_jobs:
for ftype, flist in zip(["inputs", "additional_inputs", "outputs"], [input_files, additional_files, output_files]):
if item[key][ftype] is not None:
for f in item[key][ftype]:
if f not in flist and f not in output_files:
flist.append(f)
elif "[" in key and "]" in key:
val, marker, dummy = re.split("[\[\]]", key)
if markers[marker] == val:
self._load_required_files(item[key], markers, input_files, additional_files, output_files)
return (input_files, additional_files, output_files)
def print_required_files(self, markers):
valid, msg = self._check_input_markers(markers)
if valid:
print self._load_required_files(self.data[self.name], markers)
else:
print msg
return return
...@@ -21,7 +21,10 @@ class YamlJob(object): ...@@ -21,7 +21,10 @@ class YamlJob(object):
try: try:
with open(param_file, "r") as rh: with open(param_file, "r") as rh:
params = yaml.load(rh) params = yaml.load(rh)
self.params = params[self.jobname] if self.jobname in params:
self.params = params[self.jobname]
else:
self.err += "Params file does not contain definition for '%s'.\n" % (self.jobname,)
except: except:
self.err += "Error parsing params yaml file.\n" self.err += "Error parsing params yaml file.\n"
self.err += traceback.format_exc() self.err += traceback.format_exc()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment