Commit cce2a4e9 authored by aknecht2's avatar aknecht2
Browse files

Reduced cluster size for extraction step. Updated sql aggregate to work with osg.

parent c5487cb7
...@@ -623,6 +623,7 @@ class ImageProcessor(Workflow): ...@@ -623,6 +623,7 @@ class ImageProcessor(Workflow):
exInput = {} exInput = {}
clusternum = {} clusternum = {}
meancluster = {} meancluster = {}
excluster = {}
if "maxwalltime" in self.config: if "maxwalltime" in self.config:
if "images" in self.config["maxwalltime"]: if "images" in self.config["maxwalltime"]:
...@@ -639,6 +640,7 @@ class ImageProcessor(Workflow): ...@@ -639,6 +640,7 @@ class ImageProcessor(Workflow):
jobnum = -1 jobnum = -1
clusternum[type] = 0 clusternum[type] = 0
meancluster[type] = 0 meancluster[type] = 0
excluster[type] = 0
exNames = self.workflow["extract"]["workflows"][type]["depends"] exNames = self.workflow["extract"]["workflows"][type]["depends"]
for infile in [x for x in self.files[self.dax][type]["input"] if "." + x.split(".")[1] in conf.imageExtensions]: for infile in [x for x in self.files[self.dax][type]["input"] if "." + x.split(".")[1] in conf.imageExtensions]:
if "cluster" in self.config: if "cluster" in self.config:
...@@ -648,8 +650,10 @@ class ImageProcessor(Workflow): ...@@ -648,8 +650,10 @@ class ImageProcessor(Workflow):
clusternum[type] += 1 clusternum[type] += 1
exDep[type].append([]) exDep[type].append([])
exInput[type].append({}) exInput[type].append({})
elif ((clusternum[type] * 100 + jobnum) % int(self.config["cluster"] * 0.3)) == 0: if ((clusternum[type] * 100 + jobnum) % int(self.config["cluster"] * 0.3)) == 0:
meancluster[type] += 1 meancluster[type] += 1
if (jobnum % 50) == 0:
excluster[type] += 1
extension = "." + infile.split(".")[1] extension = "." + infile.split(".")[1]
realname = self.files[self.dax][type]["input"][infile]["path"].split("/")[-1].split(".")[0] realname = self.files[self.dax][type]["input"][infile]["path"].split("/")[-1].split(".")[0]
derivedPath = self.files[self.dax][type]["input"][infile]["derivedPath"] derivedPath = self.files[self.dax][type]["input"][infile]["derivedPath"]
...@@ -658,15 +662,15 @@ class ImageProcessor(Workflow): ...@@ -658,15 +662,15 @@ class ImageProcessor(Workflow):
inputs = self._loadJobInputs(job, type, derivedPath, extension) inputs = self._loadJobInputs(job, type, derivedPath, extension)
if job["name"] in exNames: if job["name"] in exNames:
outputs = self._loadJobOutputs(job, type, derivedPath, True) outputs = self._loadJobOutputs(job, type, derivedPath, True)
exDep[type][clusternum[type]].append(jobname) exDep[type][excluster[type]].append(jobname)
reqFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["inputs"][0] + ".png" reqFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["inputs"][0] + ".png"
exInput[type][clusternum[type]][reqFile] = {"file": reqFile, "transfer": save} exInput[type][excluster[type]][reqFile] = {"file": reqFile, "transfer": save}
if "--dimfromroi" in self.workflow["extract"]["workflows"][type]["arguments"]: if "--dimfromroi" in self.workflow["extract"]["workflows"][type]["arguments"]:
if os.path.isfile(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"]): if os.path.isfile(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"]):
roiFile = os.path.basename(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"]) roiFile = os.path.basename(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"])
else: else:
roiFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"] + ".json" roiFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"] + ".json"
exInput[type][clusternum[type]][roiFile] = {"file": roiFile, "transfer": save} exInput[type][excluster[type]][roiFile] = {"file": roiFile, "transfer": save}
else: else:
outputs = self._loadJobOutputs(job, type, derivedPath, save) outputs = self._loadJobOutputs(job, type, derivedPath, save)
depends = [derivedPath + "_" + depend for depend in job["depends"]] if "depends" in job else [] depends = [derivedPath + "_" + depend for depend in job["depends"]] if "depends" in job else []
...@@ -681,7 +685,7 @@ class ImageProcessor(Workflow): ...@@ -681,7 +685,7 @@ class ImageProcessor(Workflow):
aggIn = {} aggIn = {}
aggIn2 = {} aggIn2 = {}
for type in self.workflow["workflows"]: for type in self.workflow["workflows"]:
for q in range(0, clusternum[type] + 1): for q in range(0, excluster[type] + 1):
arguments = self.workflow["extract"]["workflows"][type]["arguments"] arguments = self.workflow["extract"]["workflows"][type]["arguments"]
if "--input" in arguments: if "--input" in arguments:
del arguments["--input"] del arguments["--input"]
...@@ -696,8 +700,8 @@ class ImageProcessor(Workflow): ...@@ -696,8 +700,8 @@ class ImageProcessor(Workflow):
arguments["--inputs"] = " ".join([x for x in exInput[type][q].keys() if ".png" in x]) arguments["--inputs"] = " ".join([x for x in exInput[type][q].keys() if ".png" in x])
self._addFile(type + str(q) + ".db", type, "output") self._addFile(type + str(q) + ".db", type, "output")
self._addFile(type + str(q) + "_2.db", type, "output") self._addFile(type + str(q) + "_2.db", type, "output")
self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, exDep[type][q], walltime = 300) self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, exDep[type][q], walltime = 180)
self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, [], dax = self.exdax, walltime = 300) self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, [], dax = self.exdax, walltime = 180)
maprc.write(type + str(q) + ".db" + " file://" + self.basepath + "/output/" + type + str(q) + ".db" + " pool=\"local\"\n") maprc.write(type + str(q) + ".db" + " file://" + self.basepath + "/output/" + type + str(q) + ".db" + " pool=\"local\"\n")
binDep.append(type + "_extract" + str(q)) binDep.append(type + "_extract" + str(q))
aggIn[type + str(q) + ".db"] = {"file": type + str(q) + ".db", "transfer": False} aggIn[type + str(q) + ".db"] = {"file": type + str(q) + ".db", "transfer": False}
...@@ -723,7 +727,7 @@ class ImageProcessor(Workflow): ...@@ -723,7 +727,7 @@ class ImageProcessor(Workflow):
map[type] = group map[type] = group
for type in self.workflow["workflows"]: for type in self.workflow["workflows"]:
for q in range(0, clusternum[type] + 1): for q in range(0, excluster[type] + 1):
arguments = {} arguments = {}
arguments["--db"] = "db" arguments["--db"] = "db"
arguments["--copydb"] = "copydb" arguments["--copydb"] = "copydb"
...@@ -731,8 +735,8 @@ class ImageProcessor(Workflow): ...@@ -731,8 +735,8 @@ class ImageProcessor(Workflow):
exInput[type][q]["db"] = {"file": type + str(q) + ".db", "transfer": False} exInput[type][q]["db"] = {"file": type + str(q) + ".db", "transfer": False}
exInput[type][q]["binfile"] = {"file": map[type] + "_hist_bins.json", "transfer": False} exInput[type][q]["binfile"] = {"file": map[type] + "_hist_bins.json", "transfer": False}
arguments["--bins"] = "binfile" arguments["--bins"] = "binfile"
self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], walltime = 480) self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], walltime = 300)
self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], dax = self.exdax, walltime = 480) self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], dax = self.exdax, walltime = 300)
binDep.append(type + "_extractBins" + str(q)) binDep.append(type + "_extractBins" + str(q))
aggIn["db"] = {"file": "img2.db", "transfer": False} aggIn["db"] = {"file": "img2.db", "transfer": False}
self._addJob("sql_aggregate2", "ih-sql-aggregate", aggIn2, {"img3.db": {"file": "img3.db", "transfer": True}}, {"--db": "db", "--output": "img3.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep, walltime = 180) self._addJob("sql_aggregate2", "ih-sql-aggregate", aggIn2, {"img3.db": {"file": "img3.db", "transfer": True}}, {"--db": "db", "--output": "img3.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep, walltime = 180)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment