Commit 831f8a51 authored by aknecht2's avatar aknecht2
Browse files

Fixed whitespace errors and re-updated pep8 compliance.

parent 646bda8b
......@@ -20,17 +20,20 @@ file_extensions = {
"bed": ["bed", "narrowPeak", "broadPeak"],
"bwa_genome": ["amb", "ann", "bwt", "pac", "sa"],
"bowtie2_genome": ["1.bt2", "2.bt2", "3.bt2", "4.bt2", "rev.1.bt2", "rev.2.bt2"],
"quality" : ["quality"]
"quality": ["quality"]
}
# list of resources that can be specified per job (step) in
# the workflow and corresponding Pegasus profile info
resources = { "walltime" :{ "namespace" : "globus",
"key" : "maxwalltime"
},
"memory" : { "namespace" : "condor",
"key" : "request_memory"
}
resources = {
"walltime": {
"namespace": "globus",
"key": "maxwalltime"
},
"memory": {
"namespace": "condor",
"key": "request_memory"
}
}
# param keys
......
......@@ -47,6 +47,8 @@ def downloadFile(url, localpath, urltype="ftp://", retries=3, overwrite=True, ch
else:
print "File already exists, skipping download.\n"
return
# http://pythoncentral.io/how-to-check-if-a-string-is-a-number-in-python-including-unicode/
def is_number(s):
try:
......
......@@ -15,6 +15,7 @@ import random
from pprint import pprint
from Pegasus.DAX3 import *
class Workflow(object):
def __init__(self, jobhome, run_file, param_file, config_file, host, username, password):
......@@ -250,10 +251,10 @@ class Workflow(object):
if exp_name not in self.files and control_name not in self.files:
run["input_sets"].append(pair)
for f in pair:
# For each fastq, have Pegasus do the http fetching.
# Files are named EXPID_ACCESSION.fastq.gz
# For each fastq, have Pegasus do the http fetching.
# Files are named EXPID_ACCESSION.fastq.gz
output_name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
self._add_file(output_name,"http://" + f["url"],"dummylocal")
self._add_file(output_name, "http://" + f["url"], "dummylocal")
return
def _add_align(self):
......@@ -307,7 +308,7 @@ class Workflow(object):
pegasus.catalog.site.file = %s/sites.xml
pegasus.condor.logs.symlink = false
pegasus.transfer.links=true
pegasus.transfer.links=true
pegasus.data.configuration = sharedfs
""" % (os.path.join(self.basepath, "input"),))
wh.write(pegasusrc)
......@@ -324,17 +325,17 @@ class Workflow(object):
<directory type="local-storage" path="%s">
<file-server operation="all" url="file://%s" />
</directory>""" % (
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "output"),
os.path.join(self.basepath, "output")
)
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "output"),
os.path.join(self.basepath, "output")
)
# NEED TO DETERMINE HOW TO READ IN THIS INFO
for namespace in self.config["profile"]:
for key in self.config["profile"][namespace]:
sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, self.config["profile"][namespace][key])
sites +="""</site>
sites += """</site>
<site handle="dummylocal" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" />
......
......@@ -44,7 +44,7 @@ class WorkflowJob(object):
self._validate_input()
if self.valid():
self._validate_arguments()
self._validate_resource_params()
self._validate_resource_params()
return
def _validate_input(self):
......@@ -84,21 +84,21 @@ class WorkflowJob(object):
return
def _validate_resource_params(self):
"""
Validate memory/walltime values supplied in the params file
are present and numeric.
"""
for resource_type in chipathlon.conf.resources.keys():
try:
if not is_number(self.params[resource_type]):
self.err += "Resource specification of type '%s' for '%s' must be numeric.\n" \
% (resource_type,self.jobname)
except TypeError:
self.err += "Resource specification of type '%s' for '%s' is missing a value.\n" \
% (resource_type,self.jobname)
except KeyError:
pass
return
"""
Validate memory/walltime values supplied in the params file
are present and numeric.
"""
for resource_type in chipathlon.conf.resources.keys():
try:
if not is_number(self.params[resource_type]):
self.err += "Resource specification of type '%s' for '%s' must be numeric.\n" \
% (resource_type, self.jobname)
except TypeError:
self.err += "Resource specification of type '%s' for '%s' is missing a value.\n" \
% (resource_type, self.jobname)
except KeyError:
pass
return
def _check_files(self, input_files, additional_inputs, output_files):
valid_files = True
......@@ -130,25 +130,25 @@ class WorkflowJob(object):
for f in output_files:
job.uses(f["file"], link=Link.OUTPUT, transfer=True)
job.addArguments(*self._create_arg_list(input_files, output_files))
self._add_job_resources(job)
self._add_job_resources(job)
return job
else:
print self.err
return None
def _add_job_resources(self,job):
"""
Add the job's resources (memory, walltime, etc.) via Pegasus
Profiles.
"""
for resource_type in chipathlon.conf.resources.keys():
try:
resource_value = self.params[resource_type]
except KeyError:
resource_value = self.base[self.jobname][resource_type]
ns = chipathlon.conf.resources[resource_type]["namespace"]
key = chipathlon.conf.resources[resource_type]["key"]
job.profile(ns,key,resource_value)
def _add_job_resources(self, job):
"""
Add the job's resources (memory, walltime, etc.) via Pegasus
Profiles.
"""
for resource_type in chipathlon.conf.resources.keys():
try:
resource_value = self.params[resource_type]
except KeyError:
resource_value = self.base[self.jobname][resource_type]
ns = chipathlon.conf.resources[resource_type]["namespace"]
key = chipathlon.conf.resources[resource_type]["key"]
job.profile(ns, key, resource_value)
def _create_arg_list(self, input_files, output_files):
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment