Commit f73fdf42 authored by aknecht2's avatar aknecht2
Browse files

Fixed minor issues in param yaml scripts. Updated workflow to add jobs and...

Fixed minor issues in param yaml scripts. Updated workflow to add jobs and create relevant pegasus information.  Workflow can now generate download and alignment jobs, still needs testing.  Setup updated to include job-scripts.
parent 2443feb3
...@@ -7,6 +7,9 @@ job_params = "jobs/params/" ...@@ -7,6 +7,9 @@ job_params = "jobs/params/"
# Job wrappers directory # Job wrappers directory
job_wrappers = "jobs/wrappers/" job_wrappers = "jobs/wrappers/"
# Job scripts directory
job_scripts = "jobs/scripts/"
# File extensions # File extensions
file_extensions = { file_extensions = {
"genome_index": ["fa", "fna"], "genome_index": ["fa", "fna"],
......
...@@ -5,7 +5,7 @@ bedtools_bam_to_bed: ...@@ -5,7 +5,7 @@ bedtools_bam_to_bed:
- bed - bed
arguments: arguments:
- "bamtobed": - "bamtobed":
changable: false changeable: false
required: true required: true
has_value: false has_value: false
- "-i": - "-i":
......
...@@ -26,7 +26,7 @@ macs2_callpeak: ...@@ -26,7 +26,7 @@ macs2_callpeak:
has_value: true has_value: true
default: "BED" default: "BED"
- "-n": - "-n":
changable: false changeable: false
required: true required: true
has_value: true has_value: true
default: $inputs.2 default: $inputs.2
......
picard_sort_sam: picard_mark_duplicates:
inputs: inputs:
- bam - bam
additional_inputs: null
outputs: outputs:
- bam - bam
- qc - qc
command: picard
arguments: arguments:
- MarkDuplicates: - MarkDuplicates:
changeable: false changeable: false
...@@ -28,16 +30,15 @@ picard_sort_sam: ...@@ -28,16 +30,15 @@ picard_sort_sam:
required: false required: false
has_value: true has_value: true
default: "LENIENT" default: "LENIENT"
- "ASSUME_SORTED=": - "ASSUME_SORTED=":
changeable: true changeable: true
required: false required: false
has_value: true has_value: true
default: true default: true
- "REMOVE_DUPLICATES=": - "REMOVE_DUPLICATES=":
changeable: false changeable: false
required: false required: false
has_value: true has_value: true
default: false default: false
walltime: 2000 walltime: 2000
memory: 8000 memory: 8000
...@@ -27,3 +27,27 @@ samtools_sam_to_bam: ...@@ -27,3 +27,27 @@ samtools_sam_to_bam:
arguments: null arguments: null
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
bedtools_bam_to_bed:
arguments: null
walltime: 2000
memory: 2000
macs2_callpeak:
arguments: null
walltime: 2000
memory: 8000
picard_mark_duplicates:
arguments: null
walltime: 2000
memory: 2000
picard_sort_sam:
arguments: null
walltime: 2000
memory: 2000
r_spp_nodups:
arguments: null
walltime: 2000
memory: 2000
samtools_remove_duplicates:
arguments: null
walltime: 2000
memory: 2000
...@@ -6,9 +6,9 @@ import chipathlon.workflow ...@@ -6,9 +6,9 @@ import chipathlon.workflow
import argparse import argparse
parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.") parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.")
parser.add_argument("--password", dest="password", required=True, help="Database user password.") parser.add_argument("--password", dest="password", default="nerpAusIrd)griUrlobIbfaifovript4", help="Database user password.")
parser.add_argument("--username", dest="username", default="aknecht", required=True, help="Database user.") parser.add_argument("--username", dest="username", default="aknecht", help="Database user.")
parser.add_argument("--host", dest="host", default="hcc-anvil-241-41.unl.edu", required=True, help="Database host.") parser.add_argument("--host", dest="host", default="hcc-anvil-175-9.unl.edu", help="Database host.")
args = parser.parse_args() args = parser.parse_args()
mdb = chipathlon.db.MongoDB(args.host, args.username, args.password) mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
...@@ -126,4 +126,5 @@ def valid_samples(): ...@@ -126,4 +126,5 @@ def valid_samples():
# ] # ]
# for test in tests: # for test in tests:
# test() # test()
workflow_test_1() workflow_test_1()
...@@ -11,6 +11,7 @@ import chipathlon ...@@ -11,6 +11,7 @@ import chipathlon
import chipathlon.workflow_job import chipathlon.workflow_job
import chipathlon.db import chipathlon.db
import chipathlon.workflow_module import chipathlon.workflow_module
import random
from pprint import pprint from pprint import pprint
from Pegasus.DAX3 import * from Pegasus.DAX3 import *
...@@ -51,6 +52,14 @@ class Workflow(object): ...@@ -51,6 +52,14 @@ class Workflow(object):
self._load_runs() self._load_runs()
# All required information is loaded, start queuing jobs # All required information is loaded, start queuing jobs
self._add_download() self._add_download()
self._add_align()
# Create pegasus important stuff
self._create_setup()
self._add_notify()
self._create_replica()
self._create_sites()
self._create_submit()
self._write()
break break
if self.err: if self.err:
print self.err print self.err
...@@ -67,7 +76,7 @@ class Workflow(object): ...@@ -67,7 +76,7 @@ class Workflow(object):
self.dax.addExecutable(self.executables[ex_name]) self.dax.addExecutable(self.executables[ex_name])
break break
# Load actual scripts # Load actual scripts
for root, dirs, files in os.walk("%s/scripts" % (os.path.dirname(os.path.realpath(__file__)),)): for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
for f in files: for f in files:
self.executables[f] = Executable(name=f, os=os_type, arch=arch) self.executables[f] = Executable(name=f, os=os_type, arch=arch)
self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool")) self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
...@@ -79,11 +88,14 @@ class Workflow(object): ...@@ -79,11 +88,14 @@ class Workflow(object):
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)): for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
for f in files: for f in files:
ex_name = f.split("_")[0] ex_name = f.split("_")[0]
yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name]) if ex_name in self.executables:
if not yj.err: yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
self.workflow_jobs[yj.jobname] = yj if not yj.err:
self.workflow_jobs[yj.jobname] = yj
else:
self.err += yj.err
else: else:
self.err += yj.err print "[INFO] Skipping param file %s, no corresponding executable found." % (f,)
break break
return return
...@@ -118,8 +130,10 @@ class Workflow(object): ...@@ -118,8 +130,10 @@ class Workflow(object):
run["data"] = data run["data"] = data
else: else:
self.err += msg self.err += msg
# MAKE SURE YOU UNCOMMENT THIS!!! # This will be changed to _check_genomes(run) eventually.
# self._check_genomes(run) # self._check_genomes(run)
# UNCOMMENT THIS:
# self._check_grch38(run)
else: else:
self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,) self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,)
except: except:
...@@ -127,7 +141,47 @@ class Workflow(object): ...@@ -127,7 +141,47 @@ class Workflow(object):
self.err += traceback.format_exc() self.err += traceback.format_exc()
return return
def _check_grch38(self, run):
if run["align"] in self.run_data["genomes"]:
assembly = "grch38.p6"
gen_prefix = "genome_%s_%s" % (run["align"], assembly)
if assembly in self.run_data["genomes"][run["align"]]:
base_file = self.run_data["genomes"][run["align"]][assembly]
if os.path.isfile(base_file):
base_file_name = gen_prefix + base_file.split(".", 1)
f = File(base_file_name)
f.addPFN(PFN(base_file, "local"))
self.files[base_file_name] = f
if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]:
prefix = base_file.split(".", 1)[0]
missing = []
for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
if not os.path.isfile(prefix + "." + ext) and not os.path.isfile(base_file + "." + ext):
missing.append(ext)
else:
f = File(gen_prefix + "." + ext)
if os.path.isfile(prefix + "." + ext):
f.addPFN(prefix + "." + ext)
else:
f.addPFN(base_file + "." + ext)
self.files[gen_prefix + "." + ext] = f
self.dax.addFile(f)
if len(missing) > 0:
self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension. Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
else:
self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
else:
self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
else:
self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
return
def _check_genomes(self, run): def _check_genomes(self, run):
# NOTE: THIS function should NOT be called.
# We will only be using GRCH38.p6 genome.
# Use the _check_grch38 function instead
valid, msg, assembly = self.mdb.get_assembly(run["experiment"]) valid, msg, assembly = self.mdb.get_assembly(run["experiment"])
if valid: if valid:
if run["align"] in self.run_data["genomes"]: if run["align"] in self.run_data["genomes"]:
...@@ -155,80 +209,118 @@ class Workflow(object): ...@@ -155,80 +209,118 @@ class Workflow(object):
return return
def _add_download(self): def _add_download(self):
# Remember, experiment always paired with randomly selected control,
# For alignment doesn't really matter, but we set up the groundwork
# for the future. We randomly select unless there is an
# equal number of control and experiment files
self.input_sets = []
for run in self.run_data["runs"]: for run in self.run_data["runs"]:
print "Control samples = %s, Experiment samples = %s" % (len(run["data"]["control"]), len(run["data"]["experiment"])) if (len(run["data"]["control"]) != len(run["data"]["experiment"])):
control_data = [random.choice(run["data"]["control"]) for i in range(0, len(run["data"]["experiment"]))]
else:
control_data = run["data"]["control"]
for pair in zip(run["data"]["experiment"], control_data):
exp_name = "%s_%s.fastq.gz" % (run["experiment"], pair[0]["accession"])
control_name = "%s_%s.fastq.gz" % (run["experiment"], pair[1]["accession"])
if exp_name not in self.files and control_name not in self.files:
self.input_sets.append(pair)
for f in pair:
# For each file we create the download job
# and output file. Base output files will be
# EXPID_ACCESSION.fastq.gz
output_name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
output_file = File(output_name)
self.files[output_name] = output_file
job = Job(self.executables["download_fastq.py"])
job.uses(output_file, link=Link.OUTPUT, transfer=True)
job.addArguments("-u", f["url"], "-p", output_file, "-t http://", "-m", f["content_md5sum"])
self.jobs[output_name] = job
self.dax.addJob(job)
return return
def _addFile(self, name, inout, site="condorpool", path=None): def _add_align(self):
""" print self.modules["align"].markers
:param name: Name of the file to add. markers = {}
:type name: str markers["read_end"] = "single"
:param path: Path to the file. Only required for input files. for run in self.run_data["runs"]:
:type path: str input_files = {}
additional_files = {}
markers["tool"] = run["align"]
gen_prefix = "genome_%s_grch38.p6" % (run["align"],)
input_files["ref_genome"] = "%s.%s" % (gen_prefix, self.run_data["genomes"][run["align"]]["grch38.p6"])
for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
additional_files["ref_genome." + ext] = "%s.%s" % (gen_prefix, ext)
for pair in self.input_sets:
for f in pair:
prefix = "%s_%s" % (run["experiment"], f["accession"])
input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
return
Adds the inputted file to the dax, as well as the internal variable self.files def _create_setup(self):
"""
Creates the base structure for job submission. Everything is contained
within a folder based on the current timestamp.
""" """
self.files[inout][name] = {"file": File(name), "path": ""} self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
if inout == "input": if not os.path.exists(self.basepath):
self.files[inout][name]["path"] = path if path else name os.makedirs(self.basepath)
self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ", "%20"), site)) for folder in ["input", "output"]:
self.dax.addFile(self.files[dax][inout][name]["file"]) if not os.path.exists(os.path.join(self.basepath, folder)):
# elif inout == "output": os.makedirs(os.path.join(self.basepath, folder))
# self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
return return
def _loadNotify(self, config): def _add_notify(self):
self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh") # NEED TO DETERMINE HOW TO READ IN THESE VARS
with open(self.basepath + "/input/notify.sh", "w") as wh: notify_path = os.path.join(self.basepath, "input/notify.sh")
with open(notify_path, "w") as wh:
notify = textwrap.dedent("""\ notify = textwrap.dedent("""\
#!/bin/bash #!/bin/bash
%s/notification/email -t %s --report=pegasus-analyzer %s/notification/email -t %s --report=pegasus-analyzer
""" % (config["notify"]["pegasus_home"], config["notify"]["email"])) """ % (config["notify"]["pegasus_home"], config["notify"]["email"]))
wh.write(notify) wh.write(notify)
os.chmod(self.basepath + "/input/notify.sh", 0755) os.chmod(notify_path, 0755)
self.dax.invoke(When.AT_END, notify_path)
return return
def _createReplica(self): def _create_replica(self):
""" with open(os.path.join(self.basepath, "input/conf.rc"), "w") as wh:
Creates the pegasus configuration replica catalog. input/conf.rc
"""
with open(self.basepath + "/input/conf.rc", "w") as wh:
pegasusrc = textwrap.dedent("""\ pegasusrc = textwrap.dedent("""\
pegasus.catalog.site = XML pegasus.catalog.site = XML
pegasus.catalog.site.file = %s/sites.xml pegasus.catalog.site.file = %s/sites.xml
pegasus.condor.logs.symlink = false pegasus.condor.logs.symlink = false
pegasus.data.configuration = sharedfs
pegasus.dir.storage.mapper = Replica
pegasus.dir.storage.mapper.replica = File
pegasus.dir.storage.mapper.replica.file = %s/map.rc
""" % (self.basepath + "/input", self.basepath + "/input"))
pegasus.data.configuration = sharedfs
""" % (os.path.join(self.basepath, "input"),))
wh.write(pegasusrc) wh.write(pegasusrc)
return return
def _createSites(self, config): def _create_sites(self):
""" with open(os.path.join(self.basepath, "input/sites.xml"), "w") as wh:
Creates the pegasus site catalog. input/sites.xml
"""
with open(self.basepath + "/input/sites.xml", "w") as wh:
sites = """\ sites = """\
<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0"> <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
<site handle="local" arch="x86_64" os="LINUX"> <site handle="local" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s"> <directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" /> <file-server operation="all" url="file://%s" />
</directory> </directory>
<directory type="local-storage" path="%s"> <directory type="local-storage" path="%s">
<file-server operation="all" url="file://%s" /> <file-server operation="all" url="file://%s" />
</directory> </directory>
</site> </site>
<site handle="condorpool" arch="x86_64" os="LINUX"> <site handle="condorpool" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s"> <directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" /> <file-server operation="all" url="file://%s" />
</directory> </directory>
""" % (self.basepath + "/work/", self.basepath + "/work/", self.basepath + "/output/", self.basepath + "/output/", self.basepath + "/work/", self.basepath + "/work/") """ % (
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "output"),
os.path.join(self.basepath, "output"),
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "work")
)
# NEED TO DETERMINE HOW TO READ IN THIS INFO
for namespace in config["profile"]: for namespace in config["profile"]:
for key in config["profile"][namespace]: for key in config["profile"][namespace]:
val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key] val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key]
...@@ -238,11 +330,11 @@ class Workflow(object): ...@@ -238,11 +330,11 @@ class Workflow(object):
wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()])) wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
return return
def _createSubmit(self): def _create_submit(self):
""" """
Creates the pegasus submit script. submit.sh Creates the pegasus submit script. submit.sh
""" """
with open(self.basepath + "/input/submit.sh", "w") as wh: with open(os.path.join(self.basepath, "input/submit.sh"), "w") as wh:
submit = textwrap.dedent("""\ submit = textwrap.dedent("""\
#!/bin/bash #!/bin/bash
plan=`pegasus-plan \\ plan=`pegasus-plan \\
...@@ -252,7 +344,7 @@ class Workflow(object): ...@@ -252,7 +344,7 @@ class Workflow(object):
--output-site local \\ --output-site local \\
--dax "%s" \\ --dax "%s" \\
--randomdir \\ --randomdir \\
""" % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/encode.dax")) """ % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/chipathlon.dax"))
submit += textwrap.dedent("""\ submit += textwrap.dedent("""\
--submit` --submit`
...@@ -267,14 +359,13 @@ class Workflow(object): ...@@ -267,14 +359,13 @@ class Workflow(object):
chmod 744 remove.sh chmod 744 remove.sh
echo "$plan" echo "$plan"
echo "Alternatively, you can use the status & remove scripts in the current directory!"
""") """)
wh.write(submit) wh.write(submit)
os.chmod(self.basepath + "/input/submit.sh", 0755) os.chmod(self.basepath + "/input/submit.sh", 0755)
return return
def write(self): def _write(self):
with open(self.basepath + "/input/chipathlon.dax", "w") as wh: with open(self.basepath + "/input/chipathlon.dax", "w") as wh:
self.dax.writeXML(wh) self.dax.writeXML(wh)
return return
...@@ -16,6 +16,8 @@ class WorkflowJob(object): ...@@ -16,6 +16,8 @@ class WorkflowJob(object):
with open(base_yaml, "r") as rh: with open(base_yaml, "r") as rh:
self.base = yaml.load(rh) self.base = yaml.load(rh)
self.jobname = self.base.keys()[0] self.jobname = self.base.keys()[0]
self.inputs = self.base[self.jobname]["inputs"]
self.outputs = self.base[self.jobname]["outputs"]
except: except:
self.err += "Error parsing job template yaml file.\n" self.err += "Error parsing job template yaml file.\n"
self.err += traceback.format_exc() self.err += traceback.format_exc()
...@@ -29,8 +31,6 @@ class WorkflowJob(object): ...@@ -29,8 +31,6 @@ class WorkflowJob(object):
except: except:
self.err += "Error parsing params yaml file.\n" self.err += "Error parsing params yaml file.\n"
self.err += traceback.format_exc() self.err += traceback.format_exc()
self.inputs = self.base[self.jobname]["inputs"]
self.outputs = self.base[self.jobname]["outputs"]
self.validate() self.validate()
return return
......
...@@ -123,7 +123,7 @@ class WorkflowModule(object): ...@@ -123,7 +123,7 @@ class WorkflowModule(object):
# NOW, WE add our job to the master_list, and check dependencies # NOW, WE add our job to the master_list, and check dependencies
# Python ALWAYS passes by reference, so even though # Python ALWAYS passes by reference, so even though
# we are a little redundant here, dependency checking # we are a little redundant here, dependency checking
# will be easier later # will be easier later and not very costly
for fd in job_outputs: for fd in job_outputs:
master_jobs[fd["name"]] = job master_jobs[fd["name"]] = job
for fd in job_inputs: for fd in job_inputs:
...@@ -161,7 +161,3 @@ class WorkflowModule(object): ...@@ -161,7 +161,3 @@ class WorkflowModule(object):
def _get_full_name(self, prefix, markers, fname): def _get_full_name(self, prefix, markers, fname):
return "%s_%s_%s" % (prefix, "_".join([markers[x] for x in self.order]), fname) return "%s_%s_%s" % (prefix, "_".join([markers[x] for x in self.order]), fname)
def _add_output_files(self, master_files, prefix, markers, output_files):
return
...@@ -6,7 +6,7 @@ setup( ...@@ -6,7 +6,7 @@ setup(
packages=["chipathlon"], packages=["chipathlon"],
license="???", license="???",
package_data={ package_data={
"chipathlon": ["jobs/modules/*", "jobs/params/*", "jobs/wrappers/*"] "chipathlon": ["jobs/modules/*", "jobs/params/*", "jobs/wrappers/*", "jobs/scripts/*"]
}, },