From cce2a4e90ca8fb6fd10183630b01eff486ca6cf3 Mon Sep 17 00:00:00 2001
From: aknecht2 <aknecht2@unl.edu>
Date: Mon, 30 Nov 2015 09:40:31 -0600
Subject: [PATCH] Reduced cluster size for extraction step.  Updated sql
 aggregate to work with osg.

---
 ih/workflow.py | 24 ++++++++++++++----------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/ih/workflow.py b/ih/workflow.py
index dc5349e..1a6e386 100644
--- a/ih/workflow.py
+++ b/ih/workflow.py
@@ -623,6 +623,7 @@ class ImageProcessor(Workflow):
         exInput = {}
         clusternum = {}
         meancluster = {}
+        excluster = {}
 
         if "maxwalltime" in self.config:
             if "images" in self.config["maxwalltime"]:
@@ -639,6 +640,7 @@ class ImageProcessor(Workflow):
             jobnum = -1
             clusternum[type] = 0
             meancluster[type] = 0
+            excluster[type] = 0
             exNames = self.workflow["extract"]["workflows"][type]["depends"]
             for infile in [x for x in self.files[self.dax][type]["input"] if "." + x.split(".")[1] in conf.imageExtensions]:
                 if "cluster" in self.config:
@@ -648,8 +650,10 @@ class ImageProcessor(Workflow):
                         clusternum[type] += 1
                         exDep[type].append([])
                         exInput[type].append({})
-                    elif ((clusternum[type] * 100 + jobnum) % int(self.config["cluster"] * 0.3)) == 0:
+                    if ((clusternum[type] * 100 + jobnum) % int(self.config["cluster"] * 0.3)) == 0:
                         meancluster[type] += 1
+                    if (jobnum % 50) == 0:
+                        excluster[type] += 1
                 extension = "." + infile.split(".")[1]
                 realname = self.files[self.dax][type]["input"][infile]["path"].split("/")[-1].split(".")[0]
                 derivedPath = self.files[self.dax][type]["input"][infile]["derivedPath"]
@@ -658,15 +662,15 @@ class ImageProcessor(Workflow):
                     inputs = self._loadJobInputs(job, type, derivedPath, extension)
                     if job["name"] in exNames:
                         outputs = self._loadJobOutputs(job, type, derivedPath, True)
-                        exDep[type][clusternum[type]].append(jobname)
+                        exDep[type][excluster[type]].append(jobname)
                         reqFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["inputs"][0] + ".png"
-                        exInput[type][clusternum[type]][reqFile] = {"file": reqFile, "transfer": save}
+                        exInput[type][excluster[type]][reqFile] = {"file": reqFile, "transfer": save}
                         if "--dimfromroi" in self.workflow["extract"]["workflows"][type]["arguments"]:
                             if os.path.isfile(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"]):
                                 roiFile = os.path.basename(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"])
                             else:
                                 roiFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"] + ".json"
-                            exInput[type][clusternum[type]][roiFile] = {"file": roiFile, "transfer": save}
+                            exInput[type][excluster[type]][roiFile] = {"file": roiFile, "transfer": save}
                     else:
                         outputs = self._loadJobOutputs(job, type, derivedPath, save)
                     depends = [derivedPath + "_" + depend for depend in job["depends"]] if "depends" in job else []
@@ -681,7 +685,7 @@ class ImageProcessor(Workflow):
         aggIn = {}
         aggIn2 = {}
         for type in self.workflow["workflows"]:
-            for q in range(0, clusternum[type] + 1):
+            for q in range(0, excluster[type] + 1):
                 arguments = self.workflow["extract"]["workflows"][type]["arguments"]
                 if "--input" in arguments:
                     del arguments["--input"]
@@ -696,8 +700,8 @@ class ImageProcessor(Workflow):
                 arguments["--inputs"] = " ".join([x for x in exInput[type][q].keys() if ".png" in x])
                 self._addFile(type + str(q) + ".db", type, "output")
                 self._addFile(type + str(q) + "_2.db", type, "output")
-                self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, exDep[type][q], walltime = 300)
-                self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, [], dax = self.exdax, walltime = 300)
+                self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, exDep[type][q], walltime = 180)
+                self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, [], dax = self.exdax, walltime = 180)
                 maprc.write(type + str(q) + ".db" + " file://" + self.basepath + "/output/" + type + str(q) + ".db" + " pool=\"local\"\n")
                 binDep.append(type + "_extract" + str(q))
                 aggIn[type + str(q) + ".db"] = {"file": type + str(q) + ".db", "transfer": False}
@@ -723,7 +727,7 @@ class ImageProcessor(Workflow):
                     map[type] = group
 
             for type in self.workflow["workflows"]:
-                for q in range(0, clusternum[type] + 1):
+                for q in range(0, excluster[type] + 1):
                     arguments = {}
                     arguments["--db"] = "db"
                     arguments["--copydb"] = "copydb"
@@ -731,8 +735,8 @@ class ImageProcessor(Workflow):
                     exInput[type][q]["db"] = {"file": type + str(q) + ".db", "transfer": False}
                     exInput[type][q]["binfile"] = {"file": map[type] + "_hist_bins.json", "transfer": False}
                     arguments["--bins"] = "binfile"
-                    self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], walltime = 480)
-                    self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], dax = self.exdax, walltime = 480)
+                    self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], walltime = 300)
+                    self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], dax = self.exdax, walltime = 300)
                     binDep.append(type + "_extractBins" + str(q))
             aggIn["db"] = {"file": "img2.db", "transfer": False}
             self._addJob("sql_aggregate2", "ih-sql-aggregate", aggIn2, {"img3.db": {"file": "img3.db", "transfer": True}}, {"--db": "db", "--output": "img3.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep, walltime = 180)
-- 
GitLab