Commit d2b21636 authored by aknecht2's avatar aknecht2
Browse files

Added download script. Updated job yaml. Updated module yaml. Add...

Added download script.  Updated job yaml.  Updated module yaml.  Add executable loading to workflow code.  Updated database validation for new inputs.
parent 7c396945
......@@ -4,6 +4,9 @@ job_modules = "jobs/modules/"
# Job params directory
job_params = "jobs/params/"
# Job wrappers directory
job_wrappers = "jobs/wrappers/"
# File extensions
file_extensions = {
"fna": ["fna"],
......
......@@ -34,8 +34,8 @@ class MongoDB(object):
},
{
"$lookup": {
"from": "samples",
"localField": "_id",
"from": "samples_test",
"localField": "uuid",
"foreignField": "experiment_id",
"as": "samples"
}
......@@ -85,7 +85,7 @@ class MongoDB(object):
# 5. Iterate through control_exps
# 6. Join samples into the control_exps by exp_id
# 7. Re-aggregate all data into arrays
cursor = self.db.experiments.aggregate([
pipeline = [
{
"$match": {
"target": {"$exists": True},
......@@ -98,8 +98,8 @@ class MongoDB(object):
},
{
"$lookup": {
"from": "samples",
"localField": "_id",
"from": "samples_test",
"localField": "uuid",
"foreignField": "experiment_id",
"as": "samples"
}
......@@ -109,36 +109,25 @@ class MongoDB(object):
},
{
"$lookup": {
"from": "experiments",
"localField": "possible_controls",
"foreignField": "@id",
"as": "control_exps"
}
},
{
"$unwind": "$control_exps"
},
{
"$lookup": {
"from": "samples",
"localField": "control_exps._id",
"from": "samples_test",
"localField": "possible_controls.uuid",
"foreignField": "experiment_id",
"as": "control_exps.samples"
"as": "possible_controls.samples"
}
},
{
"$group": {
"_id": "$_id",
"possible_controls": {"$push": "$possible_controls"},
"control_exps": {"$push": "$control_exps"},
"samples": {"$push": "$samples"}
}
}
])
]
cursor = self.db.experiments.aggregate(pipeline)
# We should have only 1 document
document = cursor.next()
control_inputs = [sample for control in document["control_exps"] for sample in control["samples"] if ("filetype" in sample and sample["filetype"] == "fastq")]
experiment_inputs = [sample for sample in document["samples"][0] if ("filetype" in sample and sample["filetype"] == "fastq")]
control_inputs = [sample for control in document["possible_controls"] for sample in control["samples"] if ("file_type" in sample and sample["file_type"] == "fastq")]
experiment_inputs = [sample for sample in document["samples"][0] if ("file_type" in sample and sample["file_type"] == "fastq")]
if (len(control_inputs) > 0 and len(experiment_inputs) > 0):
msg = "Succesfully retrieved input files for experiment with id '%s'." % (experiment_id,)
data = {
......@@ -147,7 +136,7 @@ class MongoDB(object):
}
else:
valid = False
msg = "Experiment with id '%s' has '%s' possible control inputs, and '%s' possible experiment inputs." % (experiment_id, len(control_inputs), len(experiment_inputs))
msg = "Experiment with id '%s' has %s possible control inputs, and %s possible experiment inputs." % (experiment_id, len(control_inputs), len(experiment_inputs))
else:
valid = False
msg = "Experiment with id '%s' does not have possible_controls." % (experiment_id,)
......
align:
bwa:
- single:
- bwa_align_single
- bwa_sai_to_sam
- bwa_align_single:
inputs:
- ref_genome
- download_1.fastq
additional_inputs:
- ref_genome.amb
- ref_genome.ann
- ref_genome.bwt
- ref_genome.pac
- ref_genome.sa
outputs:
- bwa.sai
- bwa_sai_to_sam:
inputs:
- ref_genome.fa
- bwa.sai
- download_1.fastq
additional_inputs: null
outputs:
- align.sam
paired:
- bwa_align_paired
- samtools_sam_to_bam
- bwa_align_paired:
inputs:
- ref_genome.fa
- download_1.fastq
- download_2.fastq
additional_inputs:
- ref_genome.amb
- ref_genome.ann
- ref_genome.bwt
- ref_genome.pac
- ref_genome.sa
outputs:
- align.sam
- samtools_sam_to_bam:
inputs:
- align.sam
additional_inputs: null
outputs:
- align.bam
bowtie2:
- single: bowtie2_align_single
paired: bowtie2_align_paired
- single:
- bowtie2_align_single
paired:
- bowtie2_align_paired
- samtools_sam_to_bam
bwa_align_paired:
inputs:
- fna
-
- fna
- fa
- fastq
- fastq
additional_inputs:
- fa.amb
- fa.ann
- fa.bwt
- fa.pac
- fa.sa
outputs:
- sam
command: bwa
......
bwa_align_single:
inputs:
- fna
-
- fna
- fa
- fastq
additional_inputs:
- fa.amb
- fa.ann
- fa.bwt
- fa.pac
- fa.sa
outputs:
- sai
command: bwa
......
bwa_sai_to_sam:
inputs:
- fna
- fa
- sai
- fastq
outputs:
......
#!/usr/bin/python
import chipathlon
import argparse
parser = argparse.ArgumentParser(description = "Download target file.")
parser.add_argument("--url", dest="url", required = True, help="Target url.")
parser.add_argument("--local_path", dest="local_path", required = True, help="Local path to file.")
parser.add_argument("--url_type", dest="url_type", default="ftp://", help="Type of url to access.")
parser.add_argument("--retries", dest="retries", default=3, type=int, help="Number of retries.")
parser.add_argument("--overwrite", dest="overwrite", default=False, action="store_true", help="Overwrite local file if exists.")
parser.add_argumnet("--md5", dest="md5", help="Check md5 value against passed value.")
args = parser.parse_args()
chipathlon.utils.downloadFile(
args.url,
args.local_path,
urltype = args.url_type,
retries = args.retries,
overwrite = args.overwrite,
checkmd5 = (not not args.md5),
md5 = args.md5
)
- run1:
experiment: "ENCSR000BSE"
- experiment: "ENCSR000BSE"
align: bwa
peak: spp
- run2:
experiment: "ENCSR000BSE"
- experiment: "ENCSR000BSE"
align: bowtie2
peak: spp
......@@ -11,35 +11,34 @@ def progress(current, end, length = 20):
sys.stdout.write("\rProcessed %s / %s entries. [%s] %s%%" % (current, end, hashes + spaces, int(round(percent * 100))))
sys.stdout.flush()
def downloadFile(url, localpath, logfile, urltype = "ftp://", retries = 3, overwrite = False, checkmd5 = False, md5 = None):
with open(logfile, "a") as wh:
success = False
if url[:7] == "http://":
urltype = ""
for i in range(0, retries):
wh.write("Attempt #%s, downloading %s\n" % (i + 1, url))
try:
if not os.path.isfile(localpath) or overwrite:
with open(localpath, "w") as fwh:
response = urllib2.urlopen(urltype + url)
data = response.read()
if checkmd5:
if hashlib.md5(data).hexdigest() == md5:
fwh.write(data)
success = True
wh.write("File saved successfully.\n")
else:
wh.write("MD5 mismatch. Retrying.\n")
else:
def downloadFile(url, localpath, urltype = "ftp://", retries = 3, overwrite = False, checkmd5 = False, md5 = None):
success = False
if url[:7] == "http://":
urltype = ""
for i in range(0, retries):
print("Attempt #%s, downloading %s\n" % (i + 1, url))
try:
if not os.path.isfile(localpath) or overwrite:
with open(localpath, "w") as fwh:
response = urllib2.urlopen(urltype + url)
data = response.read()
if checkmd5:
if hashlib.md5(data).hexdigest() == md5:
fwh.write(data)
success = True
wh.write("File saved successfully.\n")
elif os.path.isfile(localpath) and not overwrite:
wh.write("File already exists, skipping download.\n")
success = True
except Exception as e:
wh.write(traceback.format_exc())
overwrite = True
if success:
return
return
print("File saved successfully.\n")
else:
print("MD5 mismatch. Retrying.\n")
else:
fwh.write(data)
success = True
print("File saved successfully.\n")
elif os.path.isfile(localpath) and not overwrite:
print("File already exists, skipping download.\n")
success = True
except Exception as e:
print(traceback.format_exc())
overwrite = True
if success:
return
return
......@@ -8,6 +8,7 @@ import xml.dom.minidom
import yaml
import traceback
import chipathlon
from pprint import pprint
from Pegasus.DAX3 import *
class Workflow(object):
......@@ -39,14 +40,22 @@ class Workflow(object):
self.modules = {}
return
def info(self):
pprint(self.run_data)
pprint(self.yaml_jobs)
pprint(self.modules)
pprint(self.executables)
return
def generate(self):
self.load_modules()
self.validate_run()
self.load_runs()
if not self.err:
self.load_yaml_jobs()
if not self.err:
print "Put the rest of the generating functions hurrrr."
self.load_executables()
# self.info()
else:
print self.err
......@@ -56,6 +65,23 @@ class Workflow(object):
sys.exit(1)
return
def load_executables(self, os_type = "linux", arch = "x86_64"):
# Load wrapper scripts for commands that need to be loaded from module
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)):
for f in files:
ex_name = f.split("_")[0]
self.executables[ex_name] = Executable(name = ex_name, os = os_type, arch = arch)
self.executables[ex_name].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
self.dax.addExecutable(self.executables[ex_name])
break
# Load actual scripts
for root, dirs, files in os.walk("%s/scripts" % (os.path.dirname(os.path.realpath(__file__)),)):
for f in files:
self.executables[f] = Executable(name = f, os = os_type, arch = arch)
self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
self.dax.addExecutable(self.executables[f])
return
def load_modules(self):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
for f in files:
......@@ -66,7 +92,7 @@ class Workflow(object):
break
return
def validate_run(self):
def load_runs(self):
try:
with open(self.run_file, "r") as rh:
self.run_data = yaml.load(rh)
......@@ -82,9 +108,10 @@ class Workflow(object):
self.err += "Error parsing run template file '%s'. Module '%s' requires an entry, should be one of %s.\n" % (self.run_file, module_name, keys)
if "experiment" in run:
valid, msg, data = self.mdb.get_samples(run["experiment"])
print valid
print msg
print data
if valid:
run["data"] = data
else:
self.err += msg
else:
self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,)
except:
......@@ -115,8 +142,8 @@ class Workflow(object):
self.files[inout][name]["path"] = path if path else name
self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), site))
self.dax.addFile(self.files[dax][inout][name]["file"])
elif inout == "output":
self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
# elif inout == "output":
# self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
return
def _addJob(self, jobname, executable, inputs, outputs, arguments, dependencies = None, dax = None, label = None, walltime = None):
......@@ -183,22 +210,7 @@ class Workflow(object):
self.jobs[dax][jobname].profile(Namespace.GLOBUS, "maxwalltime", walltime)
return
def _loadExecutables(self, os_type = "linux", arch = "x86_64"):
"""
Loads all executables from the scripts directory into the dax
"""
scriptFolder = os.path.dirname(os.path.realpath(__file__)) + "/scripts/"
if os.path.isdir(scriptFolder):
for root, dirs, files in os.walk(scriptFolder):
for ex in files:
self.executables[ex] = Executable(name = ex, os = os_type, arch = arch)
self.executables[ex].addPFN(PFN("file://" + scriptFolder + "/" + ex, "condorpool"))
self.dax.addExecutable(self.executables[ex])
break
self.executables["bwa"] = Executable(name = "bwa", os = os_type, arch = arch)
self.executables[ex].addPFN(PFN("file:///util/opt/bwa/0.7/gcc/4.4/bin/bwa", "condorpool"))
self.dax.addExecutable(self.executables["bwa"])
return
def _loadNotify(self, config):
self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment