Commit 13235331 authored by aknecht2's avatar aknecht2
Browse files

Added boilerplate workflow class. Modified job yaml. Added test input/run yaml.

parent 2feb7312
......@@ -7,25 +7,25 @@ bwa_align_paired:
- sam
arguments:
- "-M":
changeable: false
required: true
value: false
changeable: false
required: true
value: false
- "-t":
changeable: true
required: false
default: 1
value: true
changeable: true
required: false
default: 1
value: true
- "$inputs.0":
changeable: false
required: true
value: false
changeable: false
required: true
value: false
- "$inputs.1":
changeable: false
required: true
value: false
changeable: false
required: true
value: false
- "$inputs.2":
changeable: false
required: true
value: false
changeable: false
required: true
value: false
walltime: 2000
memory: 2000
......@@ -8,20 +8,20 @@ bwa_sai_to_sam:
command: bwa
arguments:
- samse:
changeable: false
required: true
value: false
changeable: false
required: true
value: false
- "$inputs.0":
changeable: false
required: true
value: false
changeable: false
required: true
value: false
- "$inputs.1":
changeable: false
required: true
value: false
changeable: false
required: true
value: false
- "$inputs.2":
changeable: false
required: true
value: false
changeable: false
required: true
value: false
walltime: 2000
memory: 2000
......@@ -5,16 +5,16 @@ samtools_sam_to_bam:
- bam
arguments:
- view:
changeable: false
required: true
value: false
- "-bS"
changeable: false
required: true
value: false
- "$inputs.0"
changeable: false
required: true
value: false
changeable: false
required: true
value: false
- "-bS":
changeable: false
required: true
value: false
- "$inputs.0":
changeable: false
required: true
value: false
walltime: 2000
memory: 2000
- SRR1.fastq.gz:
filetype: fastq
organism: human
genome: hg19
celltype: k562
transcription_factor: ctcf
control: false
- SRR2.fastq.gz:
filetype: fastq
organism: human
genome: hg19
celltype: k562
control: true
bwa_sai_to_sam:
arguments: none
walltime: 2000
memory: 2000
bwa_align_single:
arguments:
- "-q": 5
- "-l": 32
- "-k": 2
- "-t": 1
walltime: 2000
memory: 2000
bwa_align_paired:
arguments:
- "-t": 1
walltime: 2000
memory: 2000
samtools_sam_to_bam:
arguments: none
walltime: 2000
memory: 2000
import sys
import hashlib
import urllib2
import os
import traceback
def progress(current, end, length = 20):
percent = float(current) / end
......@@ -6,3 +10,36 @@ def progress(current, end, length = 20):
spaces = " " * (length - len(hashes))
sys.stdout.write("\rProcessed %s / %s entries. [%s] %s%%" % (current, end, hashes + spaces, int(round(percent * 100))))
sys.stdout.flush()
def downloadFile(url, localpath, logfile, urltype = "ftp://", retries = 3, overwrite = False, checkmd5 = False, md5 = None):
with open(logfile, "a") as wh:
success = False
if url[:7] == "http://":
urltype = ""
for i in range(0, retries):
wh.write("Attempt #%s, downloading %s\n" % (i + 1, url))
try:
if not os.path.isfile(localpath) or overwrite:
with open(localpath, "w") as fwh:
response = urllib2.urlopen(urltype + url)
data = response.read()
if checkmd5:
if hashlib.md5(data).hexdigest() == md5:
fwh.write(data)
success = True
wh.write("File saved successfully.\n")
else:
wh.write("MD5 mismatch. Retrying.\n")
else:
fwh.write(data)
success = True
wh.write("File saved successfully.\n")
elif os.path.isfile(localpath) and not overwrite:
wh.write("File already exists, skipping download.\n")
success = True
except Exception as e:
wh.write(traceback.format_exc())
overwrite = True
if success:
return
return
import os
import argparse
import sys
import json
import datetime
import textwrap
import xml.dom.minidom
from Pegasus.DAX3 import *
class Workflow(object):
def __init__(self, jobhome):
"""
:param jobhome: The base directory for job submission.
:type jobhome: str
Creates a Workflow class based on the input directory. Only loads and
validates the config file by default.
"""
self.err = ""
self.jobhome = os.path.abspath(jobhome)
self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
self.dax = ADAG(self.jobname)
self.executables = {}
self.files = {"input": {}, "output": {}}
self.jobs = {}
self.deps = {}
return
def _addFile(self, name, inout, site = "condorpool", path = None, dax = None):
"""
:param name: Name of the file to add.
:type name: str
:param path: Path to the file. Only required for input files.
:type path: str
:param dax: Any dax can be passed in through this variable, otherwise use the internal dax.
:type dax: ADAG
Adds the inputted file to the dax, as well as the internal variable self.files
"""
dax = dax if dax else self.dax
self.files[dax][inout][name] = {"file": File(name), "path": ""}
if inout == "input":
self.files[dax][inout][name]["path"] = path if path else name
self.files[dax][inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), site))
dax.addFile(self.files[dax][inout][name]["file"])
elif inout == "output":
self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
return
def _addJob(self, jobname, executable, inputs, outputs, arguments, dependencies = None, dax = None, label = None, walltime = None):
"""
:param jobname: Name of the job to add.
:type jobname: str
:param executable: Executable to run.
:type executable: str
:param inputs: Information for all necessary inputs. See formatting information below.
:type inputs: dict
:param outputs: Information for all necessary outputs. See formatting information below.
:type outputs: dict
:param arguments: Dictionary of arguments to add.
:type arguments: dict
:param dependencies: List of jobnames to be dependent upon.
:type dependencies: list
Input / Ouptut dicts are formatted the same way:
{
FNAME1: {
"file": PATH,
"transfer": True / False
},
FNAME2: {
"file": PATH,
"transfer": True / False
},
...
}
The keys of the dictionary represent the internal naming of an
individual file within the dax. The path definition should
either be the full path to the input file, or the full name
of the output file (output paths are NOT created at runtime).
The transfer argument simply represents if you want the file to
be transferred to the output folder. As such, it is not required
for input files.
"""
dax = dax if dax else self.dax
if not self._isJob(jobname, dax):
self.jobs[dax][jobname] = Job(executable)
dax.addJob(self.jobs[dax][jobname])
for key in inputs:
self.jobs[dax][jobname].uses(inputs[key]["file"], link = Link.INPUT)
for key in outputs:
self._addFile(os.path.basename(key), "output")
self.jobs[dax][jobname].uses(outputs[key]["file"], link = Link.OUTPUT, transfer = outputs[key]["transfer"])
arglist = []
for arg in arguments:
arglist.append(arg)
if str(arguments[arg]) in inputs:
arglist.append(inputs[str(arguments[arg])]["file"])
elif str(arguments[arg]) in outputs:
arglist.append(outputs[str(arguments[arg])]["file"])
else:
arglist.append(str(arguments[arg]))
self.jobs[dax][jobname].addArguments(*arglist)
if dependencies:
for depend in dependencies:
dax.depends(child = self.jobs[dax][jobname], parent = self.jobs[dax][depend])
if label:
self.jobs[dax][jobname].profile(Namespace.PEGASUS, "label", label)
if walltime:
self.jobs[dax][jobname].profile(Namespace.GLOBUS, "maxwalltime", walltime)
return
def _loadExecutables(self, os_type = "linux", arch = "x86_64"):
"""
Loads all executables from the scripts directory into the dax
"""
scriptFolder = os.path.dirname(os.path.realpath(__file__)) + "/scripts/"
if os.path.isdir(scriptFolder):
for root, dirs, files in os.walk(scriptFolder):
for ex in files:
self.executables[ex] = Executable(name = ex, os = os_type, arch = arch)
self.executables[ex].addPFN(PFN("file://" + scriptFolder + "/" + ex, "condorpool"))
self.dax.addExecutable(self.executables[ex])
break
self.executables["bwa"] = Executable(name = "bwa", os = os_type, arch = arch)
self.executables[ex].addPFN(PFN("file:///util/opt/bwa/0.7/gcc/4.4/bin/bwa", "condorpool"))
self.dax.addExecutable(self.executables["bwa"])
return
def _loadNotify(self, config):
self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh")
with open(self.basepath + "/input/notify.sh", "w") as wh:
notify = textwrap.dedent("""\
#!/bin/bash
%s/notification/email -t %s --report=pegasus-analyzer
""" % (config["notify"]["pegasus_home"], config["notify"]["email"]))
wh.write(notify)
os.chmod(self.basepath + "/input/notify.sh", 0755)
return
def _createReplica(self):
"""
Creates the pegasus configuration replica catalog. input/conf.rc
"""
with open(self.basepath + "/input/conf.rc", "w") as wh:
pegasusrc = textwrap.dedent("""\
pegasus.catalog.site = XML
pegasus.catalog.site.file = %s/sites.xml
pegasus.condor.logs.symlink = false
pegasus.data.configuration = sharedfs
pegasus.dir.storage.mapper = Replica
pegasus.dir.storage.mapper.replica = File
pegasus.dir.storage.mapper.replica.file = %s/map.rc
""" % (self.basepath + "/input", self.basepath + "/input"))
wh.write(pegasusrc)
return
def _createSites(self, config):
"""
Creates the pegasus site catalog. input/sites.xml
"""
with open(self.basepath + "/input/sites.xml", "w") as wh:
sites = """\
<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
<site handle="local" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
<directory type="local-storage" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
</site>
<site handle="condorpool" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
""" % (self.basepath + "/work/", self.basepath + "/work/", self.basepath + "/output/", self.basepath + "/output/", self.basepath + "/work/", self.basepath + "/work/")
for namespace in config["profile"]:
for key in config["profile"][namespace]:
val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key]
sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
sites += "</site></sitecatalog>"
sites = sites.replace("\n","")
wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
return
def _createSubmit(self):
"""
Creates the pegasus submit script. submit.sh
"""
with open(self.basepath + "/input/submit.sh", "w") as wh:
submit = textwrap.dedent("""\
#!/bin/bash
plan=`pegasus-plan \\
--conf "%s" \\
--sites "%s" \\
--dir "%s" \\
--output-site local \\
--dax "%s" \\
--randomdir \\
""" % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/encode.dax"))
submit += textwrap.dedent("""\
--submit`
status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
echo -e "#!/bin/bash
pegasus-status -l $status" > status.sh
chmod 744 status.sh
remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
echo -e "#!/bin/bash
pegasus-remove $remove" > remove.sh
chmod 744 remove.sh
echo "$plan"
echo "Alternatively, you can use the status & remove scripts in the current directory!"
""")
wh.write(submit)
os.chmod(self.basepath + "/input/submit.sh", 0755)
return
def write(self):
with open(self.basepath + "/input/encode.dax", "w") as wh:
self.dax.writeXML(wh)
return
import yaml
from Pegasus.DAX3 import *
class YamlJob(object):
def __init__(self, yaml_file):
with open(yaml_file, "r") as rh:
self.data = yaml.load(rh)
return
def createDaxJob(self):
return
Questions
==========
Q: Should we only use control / signal between similar experiments i.e
only use control data taken from experiments within potential_controls[]?
A:
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment