Commit 6a8ae431 authored by aknecht2's avatar aknecht2
Browse files

Updated data transfer threads, job number, and walltime.

parent 738e3744
......@@ -101,6 +101,12 @@ class Workflow(object):
self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "local"))
self.dax.addExecutable(self.executables[f])
break
# Overwrite pegasus::transfer to request more walltime
if config["profile"]["env"]["PEGASUS_HOME"]:
self.executables["pegasus::transfer"] = Executable(name = "pegasus::transfer", os=os_type, arch=arch)
self.executables["pegasus::transfer"].addPFN(PFN("file://%s" % (os.path.join(config["profile"]["env"]["PEGASUS_HOME"]), "bin", "pegasus-transfer")
self.executables["pegasus::transfer"].profile("globus", "maxwalltime", 180)
self.dax.addExecutable(self.executables["pegasus::transfer"])
self._raise()
return
......@@ -257,6 +263,7 @@ class Workflow(object):
# For alignment doesn't really matter, but we set up the groundwork
# for the future. We randomly select unless there is an
# equal number of control and experiment files
self.number_of_inputs = 0
for run in self.run_data["runs"]:
run["input_sets"] = []
experiment_data = self._make_set(run["data"]["experiment"])
......@@ -274,6 +281,10 @@ class Workflow(object):
name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
if name not in self.files:
self._add_file(name, "http://" + f["url"], "dummylocal")
# We additionally keep track of how many input files
# there are to increase the number of transfer
# clusters in _create_sites
self.number_of_inputs += 1
run["input_sets"].append(pair)
return
......@@ -401,7 +412,14 @@ class Workflow(object):
os.path.join(self.basepath, "output")
)
# NEED TO DETERMINE HOW TO READ IN THIS INFO
# change.dir moves execute directory to the work directory created by pegasus
sites += """\n\t<profile key="change.dir" namespace="pegasus">true</profile>"""
# Increase the number of stagein jobs to be equal to half the number of inputs
sites += """<profile key="stagein.remote.clusters" namespace="pegasus">%s</profile>""" % (
int(float(self.number_of_inputs) / 2),
)
# Increase the number of threads for transfer jobs.
sites += """<profile key="transfer.threads" namespace="pegasus">4</profile>"""
for namespace in self.config["profile"]:
for key in self.config["profile"][namespace]:
sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, self.config["profile"][namespace][key])
......@@ -411,6 +429,7 @@ class Workflow(object):
<directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
""" % (
os.path.join(self.basepath, "work"),
os.path.join(self.basepath, "work")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment