Select Git revision
workflow.py
aknecht2 authored
workflow.py 57.20 KiB
"""
This file is part of Image Harvest.
Image Harvest is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Image Harvest is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Image Harvest. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import conf
import sys
import json
import re
import csv
import datetime
import sqlite3
import shutil
import errno
import textwrap
import copy
import ih.validator
import getpass
import xml.dom.minidom
from Pegasus.DAX3 import *
class Workflow(object):
"""
Generic workflow creation class.
"""
def __init__(self, jobhome, basename = None):
"""
:param jobhome: The base directory for job submission.
:type jobhome: str
Creates a Workflow class based on the input directory. Only loads and
validates the config file by default.
"""
self.err = ""
self.jobhome = os.path.abspath(jobhome)
self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
self.basename = basename
self.dax = ADAG(self.jobname)
self.executables = {}
self.files = {self.dax: {}}
self.jobs = {self.dax: {}}
self.deps = {}
return
def _loadInputFiles(self):
"""
Loads the input files into the appropriate variables
This should be overloaded.
"""
return
def _isFile(self, name, dax, imtype, inout):
"""
Convenience function to check if a given file exists.
"""
if name in self.files[dax][imtype][inout]:
return True
return False
def _isJob(self, name, dax):
"""
Convenience function to check if a given job exists.
"""
if name in self.jobs[dax]:
return True
return False
def _isExecutable(self, name):
"""
Convenience function to check if a given executable exists.
"""
if name in self.executables:
return True
return False
def _createSetup(self):
"""
Creates the base structure for job submission. Everything is contained
within a folder based on the current timestamp.
"""
self.basepath = self.jobhome + "/" + self.basename if self.basename else self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
if not os.path.exists(self.basepath):
os.makedirs(self.basepath)
if not os.path.exists(self.basepath + "/input"):
os.makedirs(self.basepath + "/input")
if not os.path.exists(self.basepath + "/output"):
os.makedirs(self.basepath + "/output")
if not os.path.exists(self.basepath + "input/templates"):
os.makedirs(self.basepath + "/input/templates")
if not os.path.exists(self.basepath + "/input/rawfiles"):
os.makedirs(self.basepath + "/input/rawfiles")
if "osg" in self.config:
if not os.path.exists(self.basepath + "/staging"):
os.makedirs(self.basepath + "/staging")
return
def _addFile(self, name, imtype, inout, path = None, dax = None, derivedPath = None):
"""
Adds the inputted file to the dax, as well as the internal variable self.files
"""
dax = dax if dax else self.dax
if not self._isFile(name, dax, imtype, inout):
self.files[dax][imtype][inout][name] = {"file": File(name), "path": ""}
if inout == "input":
self.files[dax][imtype][inout][name]["path"] = path if path else name
self.files[dax][imtype][inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), "local"))
if derivedPath:
self.files[dax][imtype][inout][name]["derivedPath"] = derivedPath
dax.addFile(self.files[dax][imtype][inout][name]["file"])
return
def _addJob(self, jobname, executable, inputs, outputs, arguments, dependencies = None, dax = None, label = None, walltime = None):
dax = dax if dax else self.dax
if not self._isJob(jobname, dax):
if "osg" in self.config:
self.jobs[dax][jobname] = Job("osg-wrapper.sh")
else:
self.jobs[dax][jobname] = Job(executable)
dax.addJob(self.jobs[dax][jobname])
if "osg" in self.config:
self.jobs[dax][jobname].uses("ih.tar.gz", link = Link.INPUT)
for key in inputs:
self.jobs[dax][jobname].uses(inputs[key]["file"], link = Link.INPUT)
for key in outputs:
self.jobs[dax][jobname].uses(outputs[key]["file"], link = Link.OUTPUT, transfer = outputs[key]["transfer"])
arglist = []
if "osg" in self.config:
arglist.append("./ih-" + self.config["version"] + "/scripts/" + executable)
for arg in arguments:
arglist.append(arg)
if str(arguments[arg]) in inputs:
arglist.append(inputs[str(arguments[arg])]["file"])
elif str(arguments[arg]) in outputs:
arglist.append(outputs[str(arguments[arg])]["file"])
else:
self.jobs[dax][jobname].addArguments(*arglist)
if dependencies:
for depend in dependencies:
dax.depends(child = self.jobs[dax][jobname], parent = self.jobs[dax][depend])
if label:
self.jobs[dax][jobname].profile(Namespace.PEGASUS, "label", label)
if walltime:
self.jobs[dax][jobname].profile(Namespace.GLOBUS, "maxwalltime", walltime)
return
def create(self):
"""
Creates a new pegasus submission directory based on the current timestamp,
and populates it with all required information to submit a pegasus job.
This function should be overloaded.
"""
return
class Statistics(Workflow):
"""
A class specifically for statistics workflow
validation and submission.
"""
def __init__(self, jobhome, basename, validOnly = False):
super(Statistics, self).__init__(jobhome, basename)
self.configfile = jobhome + "/input/" + conf.configFile
self.statsfile = jobhome + "/input/" + conf.statsFile
self.dbfile = jobhome + "/" + basename + "/output/" + conf.outputdb
self.validOnly = validOnly
self.rawFiles = {}
self._loadInputFiles()
return
def _getImageTypes(self):
return [x[0] for x in self.db.execute("select distinct imtype from images")]
def _loadInputFiles(self):
self.validator = ih.validator.Statistics(self.statsfile, self.configfile, self.dbfile)
if not self.validator.isValid() or self.validOnly:
self.validator.printErrors()
sys.exit()
else:
self.stats = self.validator.workflow.data
self.db = self.validator.db.conn
self.config = self.validator.config.data
self.rawFiles = self.validator.workflow.rawFiles
return
def _copyFiles(self):
"""
Copies all input files to the correct location.
"""
if not os.path.exists(self.basepath + "/input/stats"):
os.makedirs(self.basepath + "/input/stats")
shutil.copyfile(self.statsfile, self.basepath + "/input/templates/" + os.path.basename(self.statsfile))
return
def _loadFiles(self, loc):
"""
Loads all files (input and output) into the dax and the self.files variable
"""
self.files[self.dax] = {}
self.files[self.dax]["raw"] = {"input": {}, "output": {}}
self._addFile("output.db", "raw", "input", self.basepath + "/output/output.db")
for type in self.stats["workflows"]:
self.files[self.dax][type] = {"input": {}, "output": {}}
for f in self.rawFiles[type]:
shutil.copyfile(f, self.basepath + "/input/rawfiles/" + os.path.basename(f))
self._addFile(os.path.basename(f), type, "input", self.basepath + "/input/rawfiles/" + os.path.basename(f))
return
def _loadExecutables(self, os="linux", arch="x86_64"):
"""
Loads all executables (as specified in the conf) into
the dax, as well as the internal self.executables variable
"""
for ex in conf.valid:
if "osg" in self.config:
self.executables[ex] = Executable(name=ex, os=os, arch=arch, installed=False)
else:
self.executables[ex] = Executable(name=ex, os=os, arch=arch)
self.executables[ex].addPFN(PFN("file://" + self.config["installdir"] + "/" + ex, "local"))
#if "cluster" in self.config:
# self.executables[ex].addProfile(Profile(Namespace.PEGASUS, "clusters.size", self.config["cluster"]))
self.dax.addExecutable(self.executables[ex])
return
def _loadNotify(self):
if "notify" in self.config:
self.dax.invoke(When.AT_END, self.basepath + "/input/stats/notify.sh")
with open(self.basepath + "/input/stats/notify.sh", "w") as wh:
notify = textwrap.dedent("""\
#!/bin/bash
%s/notification/email -t %s --report=pegasus-analyzer
""" % (self.config["notify"]["pegasus_home"], self.config["notify"]["email"]))
wh.write(notify)
os.chmod(self.basepath + "/input/stats/notify.sh", 0755)
return
def _loadJobInputs(self, job, type, basename, extension, save = False):
inputs = {}
for i,input in enumerate(job["inputs"]):
ftype = conf.valid[job["executable"]]["inputs"][i]
if input in self.rawFiles[type]:
inputs[input] = {"file": os.path.basename(input), "transfer": save}
else:
ex = extension if job["name"] == self.workflow["workflows"][type][0]["name"] else conf.fileExtensions[ftype]
inputs[input] = {"file": basename + "_" + input + ex, "transfer": save}
return inputs
def _loadJobInputs(self, job, type, save = False):
inputs = {"output.db": {"file": "output.db", "transfer": True}}
for i,input in enumerate(job["inputs"]):
ftype = conf.valid[job["executable"]]["inputs"][i]
if input in self.rawFiles[type]:
inputs[input] = {"file": os.path.basename(input), "transfer": save}
return inputs
def _createDax(self, loc):
"""
Loads all jobs into the dax, and then writes the dax
to input/loc/stats.dax
"""
serial = []
if "maxwalltime" in self.config:
if "stats" in self.config["maxwalltime"]:
maxwalltime = self.config["maxwalltime"]["stats"]
else:
maxwalltime = None
else:
maxwalltime = None
for type in self.stats["workflows"]:
for job in self.stats["workflows"][type]:
jobname = type + "_" + job["name"]
job["arguments"]["--intable"] = type + "_" + job["arguments"]["--intable"] if job["arguments"]["--intable"] != "images" else job["arguments"]["--intable"]
job["arguments"]["--outtable"] = type + "_" + job["arguments"]["--outtable"]
job["arguments"]["--db"] = "output.db"
inputs = self._loadJobInputs(job, type)
outputs = {}
depends = [type + "_" + depend for depend in job["depends"]] if "depends" in job else serial
if self.stats["workflows"][type].index(job) == len(self.stats["workflows"][type]) - 1:
serial = [jobname]
else:
serial = []
self._addJob(jobname, job["executable"], inputs, outputs, job["arguments"], depends, walltime = maxwalltime)
with open(self.basepath + "/" + loc + "/stats.dax", "w") as wh:
self.dax.writeXML(wh)
return
def _createReplica(self, loc):
"""
Creates the pegasus configuration replica catalog. input/conf.rc
"""
with open(self.basepath + "/" + loc + "/conf.rc", "w") as wh:
pegasusrc = textwrap.dedent("""\
pegasus.catalog.site = XML
pegasus.catalog.site.file = %s/sites.xml
pegasus.condor.logs.symlink = false
pegasus.transfer.links = true
pegasus.data.configuration = %s
""" % (self.basepath + "/" + loc))
wh.write(pegasusrc)
return
def _createSites(self, loc):
"""
Creates the pegasus site catalog. input/sites.xml
"""
with open(self.basepath + "/" + loc + "/sites.xml", "w") as wh:
if "osg" in self.config:
sites = """\
<sitecatalog version="3.0" xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-3.0.xsd">
<site handle="local" arch="x86_64" os="LINUX">
<head-fs>
<scratch>
<shared>
<file-server protocol="file" url="file://" mount-point="%s"/>
<internal-mount-point mount-point="%s"/>
</shared>
</scratch>
<storage>
<shared>
<file-server protocol="file" url="file://" mount-point="%s"/>
<internal-mount-point mount-point="%s"/>
</shared>
</storage>
</head-fs>
""" % (self.basepath + "/work/stats/", self.basepath + "/work/stats/", self.basepath + "/output/", self.basepath + "/output/", self.basepath)
sites += """\
</site>
<site handle="condorpool" arch="x86_64" os="LINUX">
<head-fs>
<scratch />
<storage />
</head-fs>
"""
else:
sites = """\
<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
<site handle="local" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
<directory type="local-storage" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
""" % (self.basepath + "/work/stats/", self.basepath + "/work/stats/", self.basepath + "/output/", self.basepath + "/output/")
for namespace in self.config["profile"]:
for key in self.config["profile"][namespace]:
val = ":".join(self.config["profile"][namespace][key]) if "path" in key.lower() else self.config["profile"][namespace][key]
sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
sites += "</site></sitecatalog>"
sites = sites.replace("\n","")
wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
return
def _createSubmit(self, loc):
"""
Creates the pegasus submit script. submit.sh
"""
with open(self.basepath + "/" + loc + "/submit.sh", "w") as wh:
submit = textwrap.dedent("""\
#!/bin/bash
plan=`pegasus-plan \\
--conf "%s" \\
--sites "%s" \\
--dir "%s" \\
--output-site local \\
--dax "%s" \\
--randomdir \\""" % (self.basepath + "/" + loc + "/conf.rc", "condorpool" if "osg" in self.config else "local", self.basepath + "/work/stats", self.basepath + "/" + loc + "/stats.dax"))
if "cluster" in self.config:
submit += """--cluster horizontal \\\n"""
submit += textwrap.dedent("""\
--submit`
status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
echo -e "#!/bin/bash
pegasus-status -l $status" > status.sh
chmod 744 status.sh
remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
echo -e "#!/bin/bash
pegasus-remove $remove" > remove.sh
chmod 744 remove.sh
echo "$plan"
echo "Alternatively, you can use the status & remove scripts in the current directory!"
""")
wh.write(submit)
os.chmod(self.basepath + "/" + loc + "/submit.sh", 0755)
return
def create(self):
"""
Creates a new pegasus submission directory based on the current timestamp,
and populates it with all required information to submit a pegasus job.
"""
loc = "/input/stats/"
self._createSetup()
self._copyFiles()
self._loadFiles(loc)
self._loadExecutables()
self._loadNotify()
self._createDax(loc)
self._createReplica(loc)
self._createSites(loc)
self._createSubmit(loc)
return
class ImageProcessor(Workflow):
"""
A class specifically for image processing workflow
validation and submission.
"""
def __init__(self, jobhome, basename = None, validOnly = False):
"""
:param jobhome: The base directory for job submission.
:type jobhome: str
Creates a Workflow class based on the input directory. Loads and validates all input files,
and quits out if something doesn't exist or is defined innapropriately.
"""
super(ImageProcessor, self).__init__(jobhome, basename)
self.validOnly = validOnly
self.workfile = jobhome + "/input/" + conf.workflowFile
self.metafile = jobhome + "/input/" + conf.dbFile
self.configfile = jobhome + "/input/" + conf.configFile
self._loadInputFiles()
self.exdax = ADAG("extract-" + self.jobname)
self.jobs[self.exdax] = {}
self.files[self.exdax] = {}
return
def _loadInputFiles(self):
"""
Loads the input files into the appropriate variables
"""
self.validator = ih.validator.ImageProcessor(self.workfile, self.configfile, self.metafile)
if not self.validator.isValid() or self.validOnly:
self.validator.printErrors()
sys.exit()
else:
self.workflow = self.validator.workflow.data
self.metadata = self.validator.db.conn
self.config = self.validator.config.data
self.rawFiles = self.validator.workflow.rawFiles
return
def _getImageTypes(self):
"""
Convenience function to get all image types from the database.
"""
return [x[0] for x in self.metadata.execute("select distinct imtype from images")]
def _copyFiles(self):
"""
Copies all input files to the correct location.
"""
if not os.path.exists(self.basepath + "/input/imgproc"):
os.makedirs(self.basepath + "/input/imgproc")
shutil.copyfile(self.workfile, self.basepath + "/input/templates/" + os.path.basename(self.workfile))
shutil.copyfile(self.configfile, self.basepath + "/input/templates/" + os.path.basename(self.configfile))
shutil.copyfile(self.metafile, self.basepath + "/output/output.db")
for type in self.rawFiles:
for input in self.rawFiles[type]:
shutil.copyfile(input, self.basepath + "/input/rawfiles/" + os.path.basename(input))
if "osg" in self.config:
shutil.copyfile(self.config["osg"]["tarball"], self.basepath + "/input/rawfiles/" + os.path.basename(self.config["osg"]["tarball"]))
return
def _loadFiles(self, loc):
"""
Loads all files (input and output) into the dax and the self.files variable
"""
with open(self.basepath + "/input/rawfiles/extract.json", "w") as wh:
json.dump(self.workflow["extract"], wh)
with open(self.basepath + "/" + loc + "/map.rc", "w") as wh:
self.exInput = {"db": "img.db", "extract.json": "extract.json"}
self.files[self.dax] = {}
self.files[self.exdax] = {}
self.files[self.exdax]["all"] = {"input": {}, "output": {}}
self.files[self.dax]["raw"] = {"input": {}, "output": {}}
self.files[self.exdax]["raw"] = {"input": {}, "output": {}}
if "osg" in self.config:
self._addFile("ih.tar.gz", "raw", "input", self.basepath + "/input/rawfiles/ih-" + self.config["version"] + ".tar.gz")
self._addFile("ih.tar.gz", "raw", "input", self.basepath + "/input/rawfiles/ih-" + self.config["version"] + ".tar.gz", dax = self.exdax)
self._addFile("img.db", "raw", "input", self.basepath + "/output/output.db")
self._addFile("img.db", "all", "input", self.basepath + "/output/output.db", dax = self.exdax)
self._addFile("img2.db", "raw", "output", self.basepath + "/output/img2.db")
self._addFile("img2.db", "raw", "output", self.basepath + "/output/img2.db", dax = self.exdax)
self._addFile("img3.db", "raw", "output", self.basepath + "/output/img3.db")
self._addFile("img3.db", "raw", "output", self.basepath + "/output/img3.db", dax = self.exdax)
self._addFile("img.log", "raw", "output", self.basepath + "/output/imgproc.log")
self._addFile("extract.json", "raw", "input", self.basepath + "/input/rawfiles/extract.json")
self._addFile("extract.json", "all", "input", self.basepath + "/input/rawfiles/extract.json", dax = self.exdax)
for type in self.workflow["workflows"]:
self.files[self.dax][type] = {"input": {}, "output": {}}
for input in self.rawFiles[type]:
self._addFile(os.path.basename(input), type, "input", self.basepath + "/input/rawfiles/" + os.path.basename(input))
wh.write("img.log file://" + self.basepath + "/output/imgproc.log pool=\"local\"\n")
wh.write("img3.db file://" + self.basepath + "/output/img3.db pool=\"local\"\n")
inputImages = [self.workflow["workflows"][type][0]["inputs"][i] for i,x in enumerate(conf.valid[self.workflow["workflows"][type][0]["executable"]]["inputs"]) if x == "image"]
outputImages = [self.workflow["workflows"][type][z]["outputs"][i] for z in range(0, len(self.workflow["workflows"][type])) for i,x in enumerate(conf.valid[self.workflow["workflows"][type][z]["executable"]]["outputs"]) if x == "image"]
outputFiles = dict((self.workflow["workflows"][type][z]["outputs"][i], conf.fileExtensions[conf.valid[self.workflow["workflows"][type][z]["executable"]]["outputs"][i]]) for z in range(0, len(self.workflow["workflows"][type])) for i,x in enumerate(conf.valid[self.workflow["workflows"][type][z]["executable"]]["outputs"]) if x != "image" and x != "none" and len(self.workflow["workflows"][type][z]["outputs"]) > i)
for row in self.metadata.execute("select pegasusid, experiment, id, date, imgname, path from images where imtype=?", (type,)):
realname = row["path"].split("/")[-1].split(".")[0]
#derivedPath = row["experiment"].replace(" ","%20") + "/" + row["id"].replace(" ","%20") + "/" + row["date"].replace(" ","%20") + "/" + type + "/" + row["imgname"].replace(" ","%20") + "/"
derivedPath = row["experiment"].replace(" ","") + "/" + row["id"].replace(" ","") + "/" + row["date"].replace(" ","") + "/" + type + "/" + row["imgname"].replace(" ","") + "/"
for input in inputImages:
if "osg" in self.config:
self._addFile(row["pegasusid"] + "_" + input + "." + row["path"].split(".")[1], type, "input", row["path"], derivedPath = derivedPath + row["pegasusid"])
else:
self._addFile(derivedPath + row["pegasusid"] + "_" + input + "." + row["path"].split(".")[1], type, "input", row["path"], derivedPath = derivedPath + row["pegasusid"])
for output in outputImages:
if output in self.workflow["extract"]["workflows"][type]["inputs"]:
self.exInput[derivedPath + row["pegasusid"] + "_" + output + ".png"] = derivedPath + row["pegasusid"] + "_" + output + ".png"
self._addFile(derivedPath + row["pegasusid"] + "_" + output + ".png", "all", "input", self.basepath + "/output/" + derivedPath + row["pegasusid"] + "_" + output + ".png", dax = self.exdax)
self._addFile(derivedPath + row["pegasusid"] + "_" + output + ".png", type, "output")
wh.write(derivedPath + row["pegasusid"] + "_" + output + ".png file://" + self.basepath + "/output/" + derivedPath + row["pegasusid"].replace(" ","%20") + "_" + output + ".png pool=\"local\"\n")
for output in outputFiles:
outname = output + outputFiles[output]
self._addFile(derivedPath + row["pegasusid"] + "_" + outname, type, "output")
wh.write(derivedPath + row["pegasusid"] + "_" + outname + " file://" + self.basepath + "/output/" + derivedPath.replace("_","%20") + outname + " pool=\"local\"\n")
return
def _loadNotify(self):
if "notify" in self.config:
self.dax.invoke(When.AT_END, self.basepath + "/input/imgproc/notify.sh")
self.exdax.invoke(When.AT_END, self.basepath + "/input/imgproc/notify.sh")
with open(self.basepath + "/input/imgproc/notify.sh", "w") as wh:
notify = textwrap.dedent("""\
#!/bin/bash
%s/notification/email -t %s --report=pegasus-analyzer
""" % (self.config["notify"]["pegasus_home"], self.config["notify"]["email"]))
wh.write(notify)
os.chmod(self.basepath + "/input/imgproc/notify.sh", 0755)
return
def _loadExecutables(self, os="linux", arch="x86_64"):
"""
Loads all executables (as specified in the conf) into
the dax, as well as the internal self.executables variable
"""
for ex in conf.valid:
if "osg" in self.config:
self.executables[ex] = Executable(name=ex, os=os, arch=arch, installed = False)
else:
self.executables[ex] = Executable(name=ex, os=os, arch=arch)
self.executables[ex].addPFN(PFN("file://" + self.config["installdir"] + "/" + ex, "local"))
#if "cluster" in self.config:
# val = int(self.config["cluster"] * conf.valid[ex]["weight"]) if "weight" in conf.valid[ex] else self.config["cluster"]
# self.executables[ex].addProfile(Profile(Namespace.PEGASUS, "clusters.size", val))
self.dax.addExecutable(self.executables[ex])
self.exdax.addExecutable(self.executables[ex])
return
def _loadJobInputs(self, job, type, basename, extension, save = False):
inputs = {}
for i,input in enumerate(job["inputs"]):
ftype = conf.valid[job["executable"]]["inputs"][i]
if input in self.rawFiles[type]:
inputs[input] = {"file": os.path.basename(input), "transfer": save}
else:
ex = extension if job["name"] == self.workflow["workflows"][type][0]["name"] else conf.fileExtensions[ftype]
if "osg" in self.config:
if input == "base":
inputs[input] = {"file": os.path.basename(basename) + "_" + input + ex, "transfer": save}
else:
inputs[input] = {"file": basename + "_" + input + ex, "transfer": save}
else:
inputs[input] = {"file": basename + "_" + input + ex, "transfer": save}
return inputs
def _loadJobOutputs(self, job, type, basename, save):
outputs = {}
for i,output in enumerate(job["outputs"]):
if output != "none":
outputs[output] = {"file": basename + "_" + output + conf.fileExtensions[conf.valid[job["executable"]]["outputs"][i]], "transfer": save}
return outputs
#def _loadJobOutputs(self, job, type, basename):
# return dict((output, basename + "_" + output + conf.fileExtensions[conf.valid[job["executable"]]["outputs"][i]]) for i,output in enumerate(job["outputs"]) if output != "none")
def _createDax(self, loc):
"""
Loads all jobs into the dax, and then writes the dax
to input/workflow.dax
"""
exDep = {}
exInput = {}
clusternum = {}
meancluster = {}
if "maxwalltime" in self.config:
if "images" in self.config["maxwalltime"]:
maxwalltime = self.config["maxwalltime"]["images"]
else:
maxwalltime = None
else:
maxwalltime = None
save = True if "save-steps" in self.workflow["options"] else False
for type in self.workflow["workflows"]:
exDep[type] = [[]]
exInput[type] = [{}]
jobnum = -1
clusternum[type] = 0
meancluster[type] = 0
exNames = self.workflow["extract"]["workflows"][type]["depends"]
for infile in [x for x in self.files[self.dax][type]["input"] if "." + x.split(".")[1] in conf.imageExtensions]:
if "cluster" in self.config:
jobnum += 1
if jobnum == self.config["cluster"]:
jobnum = 0
clusternum[type] += 1
exDep[type].append([])
exInput[type].append({})
elif ((clusternum[type] * 100 + jobnum) % int(self.config["cluster"] * 0.3)) == 0:
meancluster[type] += 1
extension = "." + infile.split(".")[1]
realname = self.files[self.dax][type]["input"][infile]["path"].split("/")[-1].split(".")[0]
derivedPath = self.files[self.dax][type]["input"][infile]["derivedPath"]
for stepnum,job in enumerate(self.workflow["workflows"][type]):
jobname = derivedPath + "_" + job["name"]
inputs = self._loadJobInputs(job, type, derivedPath, extension)
if job["name"] in exNames:
outputs = self._loadJobOutputs(job, type, derivedPath, True)
exDep[type][clusternum[type]].append(jobname)
reqFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["inputs"][0] + ".png"
exInput[type][clusternum[type]][reqFile] = {"file": reqFile, "transfer": save}
if "--dimfromroi" in self.workflow["extract"]["workflows"][type]["arguments"]:
if os.path.isfile(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"]):
roiFile = os.path.basename(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"])
else:
roiFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"] + ".json"
exInput[type][clusternum[type]][roiFile] = {"file": roiFile, "transfer": save}
else:
outputs = self._loadJobOutputs(job, type, derivedPath, save)
depends = [derivedPath + "_" + depend for depend in job["depends"]] if "depends" in job else []
if job["executable"] == "ih-meanshift":
self._addJob(jobname, job["executable"], inputs, outputs, job["arguments"], depends, label = type + "_step" + str(stepnum) + "_cluster" + str(meancluster[type]) if "cluster" in self.config else None, walltime = maxwalltime)
else:
self._addJob(jobname, job["executable"], inputs, outputs, job["arguments"], depends, label = type + "_step" + str(stepnum) + "_cluster" + str(clusternum[type]) if "cluster" in self.config else None, walltime = maxwalltime)
maprc = open(self.basepath + "/" + loc + "/map.rc", "a")
binDep = []
aggIn = {}
for type in self.workflow["workflows"]:
for q in range(0, clusternum[type] + 1):
arguments = self.workflow["extract"]["workflows"][type]["arguments"]
if "--input" in arguments:
del arguments["--input"]
if "--dimfromroi" in arguments:
del arguments["--dimfromroi"]
arguments["--dimfromroi"] = " ".join([x for x in exInput[type][q].keys() if ".json" in x])
if "histogram-bin" in self.workflow["extract"]:
if type in [imtype for key in self.workflow["extract"]["histogram-bin"]["--group"] for imtype in self.workflow["extract"]["histogram-bin"]["--group"][key]]:
arguments["--colors"] = ""
arguments["--db"] = "db"
arguments["--createdb"] = ""
arguments["--inputs"] = " ".join([x for x in exInput[type][q].keys() if ".png" in x])
self._addFile(type + str(q) + ".db", type, "output")
self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, exDep[type][q], walltime = 240)
self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, [], dax = self.exdax, walltime = 240)
maprc.write(type + str(q) + ".db" + " file://" + self.basepath + "/output/" + type + str(q) + ".db" + " pool=\"local\"\n")
binDep.append(type + "_extract" + str(q))
aggIn[type + str(q) + ".db"] = {"file": type + str(q) + ".db", "transfer": False}
aggIn["db"] = {"file": "img.db", "transfer": False}
self._addJob("sql_aggregate1", "ih-sql-aggregate", aggIn, {"img2.db": {"file": "img2.db", "transfer": False}}, {"--db": "db", "--output": "img2.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep)
self._addJob("sql_aggregate1", "ih-sql-aggregate", aggIn, {"img2.db": {"file": "img2.db", "transfer": False}}, {"--db": "db", "--output": "img2.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep, dax = self.exdax)
if "histogram-bin" in self.workflow["extract"]:
outputs = {}
for name in self.workflow["extract"]["histogram-bin"]["--group"]:
self._addFile(name + "_hist_bins.json", "raw", "output")
maprc.write(name + "_hist_bins.json" + " file://" + self.basepath + "/output/" + name + "_hist_bins.json" + " pool=\"local\"\n")
outputs[name + "_hist_bins.json"] = {"file": name + "_hist_bins.json", "transfer": True}
self._addJob("bin_creation", "ih-stats-histogram-bin", {"db": {"file": "img2.db", "transfer": True}, "extract.json": {"file": "extract.json", "transfer": False}}, outputs, {"--db": "db", "--options": "extract.json", "--intable": "images", "--outtable": "histogramBins", "--jsonwrite": "", "--overwrite": ""}, ["sql_aggregate1"])
self._addJob("bin_creation", "ih-stats-histogram-bin", {"db": {"file": "img2.db", "transfer": True}, "extract.json": {"file": "extract.json", "transfer": False}}, outputs, {"--db": "db", "--options": "extract.json", "--intable": "images", "--outtable": "histogramBins", "--jsonwrite": "", "--overwrite": ""}, ["sql_aggregate1"], dax = self.exdax)
binDep = []
map = {}
for group in self.workflow["extract"]["histogram-bin"]["--group"]:
for type in self.workflow["extract"]["histogram-bin"]["--group"][group]:
map[type] = group
for type in self.workflow["workflows"]:
for q in range(0, clusternum[type] + 1):
arguments = {}
arguments["--db"] = "db"
arguments["--inputs"] = " ".join(exInput[type][q].keys())
exInput[type][q]["db"] = {"file": type + str(q) + ".db", "transfer": False}
exInput[type][q]["binfile"] = {"file": map[type] + "_hist_bins.json", "transfer": False}
arguments["--bins"] = "binfile"
self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {}, arguments, ["bin_creation"], walltime = 240)
self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {}, arguments, ["bin_creation"], dax = self.exdax, walltime = 240)
binDep.append(type + "_extractBins" + str(q))
aggIn["db"] = {"file": "img2.db", "transfer": False}
self._addJob("sql_aggregate2", "ih-sql-aggregate", aggIn, {"img3.db": {"file": "img3.db", "transfer": True}}, {"--db": "db", "--output": "img3.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep)
self._addJob("sql_aggregate2", "ih-sql-aggregate", aggIn, {"img3.db": {"file": "img3.db", "transfer": True}}, {"--db": "db", "--output": "img3.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep, dax = self.exdax)
z = 2 if "histogram-bin" in self.workflow["extract"] else 1
indb = "img3.db" if "histogram-bin" in self.workflow["extract"] else "img.db"
self._addJob("error-log", "ih-error-log", {"db": {"file": indb, "transfer": True}}, {"output": {"file": "img.log", "transfer": True}}, {"--db": "db", "--output": "output"}, ["sql_aggregate" + str(z)])
with open(self.basepath + "/" + loc + "/workflow.dax", "w") as wh:
self.dax.writeXML(wh)
return
def _createExtract(self, loc):
"""
Creates the extraction step only dax!
"""
#self._addJob("extractOnly", "ih-extract-all", self.exInput, {}, {"--db": "db", "--options": "extract.json"}, dax = self.exdax)
with open(self.basepath + "/" + loc + "/extract.dax", "w") as wh:
self.exdax.writeXML(wh)
return
def _createReplica(self, loc):
"""
Creates the pegasus configuration replica catalog. input/conf.rc
"""
with open(self.basepath + "/" + loc + "/conf.rc", "w") as wh:
pegasusrc = textwrap.dedent("""\
pegasus.catalog.site = XML
pegasus.catalog.site.file = %s/sites.xml
pegasus.condor.logs.symlink = false
pegasus.transfer.links = true
pegasus.data.configuration = %s
pegasus.dir.storage.mapper = Replica
pegasus.dir.storage.mapper.replica = File
pegasus.dir.storage.mapper.replica.file = %s/map.rc
""" % (self.basepath + "/" + loc, "nonsharedfs" if "osg" in self.config else "sharedfs", self.basepath + "/" + loc))
if "osg" in self.config:
pegasusrc += textwrap.dedent("""\
pegasus.stagein.clusters = 4
pegasus.stageout.clusters = 4
pegasus.transfer.threads = 4
pegasus.transfer.lite.threads = 4
""")
wh.write(pegasusrc)
return
def _createSites(self, loc):
"""
Creates the pegasus site catalog. input/sites.xml
"""
with open(self.basepath + "/" + loc + "/sites.xml", "w") as wh:
if "osg" in self.config:
userName = getpass.getuser()
sites = """\
<sitecatalog version="4.0" xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd">
<site handle="local" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
<directory type="local-storage" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
<profile namespace="pegasus" key="SSH_PRIVATE_KEY">%s</profile>
</site>
<site handle="stash" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s">
<!-- <file-server operation="get" url="stash://%s"/> -->
<file-server operation="all" url="scp://%s@login02.osgconnect.net/%s"/>
</directory>
</site>
<site handle="isi_workflow" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="/nfs/ccg4/scratch-purge-no-backups/workflow.isi.edu/scratch2/%s/scratch">
<file-server operation="get" url="http://workflow.isi.edu/scratch2/%s/scratch"/>
<file-server operation="put" url="scp://%s@workflow.isi.edu/nfs/ccg4/scratch-purge-no-backups/workflow.isi.edu/scratch2/%s/scratch"/>
</directory>
</site>
<site handle="condorpool" arch="x86_64" os="LINUX">
""" % (self.basepath + "/work/imgproc/", self.basepath + "/work/imgproc/", self.basepath + "/output/", self.basepath + "/output/", self.config["osg"]["ssh"], self.basepath + "/staging/", "/".join((self.basepath + "/staging/").split("/")[2:]), userName, self.basepath + "/staging/", userName, userName, userName, userName)
else:
sites = """\
<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
<site handle="local" arch="x86_64" os="LINUX">
<directory type="shared-scratch" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
<directory type="local-storage" path="%s">
<file-server operation="all" url="file://%s" />
</directory>
""" % (self.basepath + "/work/imgproc/", self.basepath + "/work/imgproc/", self.basepath + "/output/", self.basepath + "/output/")
for namespace in self.config["profile"]:
for key in self.config["profile"][namespace]:
val = ":".join(self.config["profile"][namespace][key]) if "path" in key.lower() else self.config["profile"][namespace][key]
sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
sites += "</site></sitecatalog>"
sites = sites.replace("\n","")
wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
return
def _createSubmit(self, loc):
"""
Creates the pegasus submit script. submit.sh
"""
with open(self.basepath + "/" + loc + "/submit.sh", "w") as wh:
submit = textwrap.dedent("""\
#!/bin/bash
if [ "$1" = "--extract-only" ] || [ "$1" = "--extract" ] || [ "$1" = "-e" ]; then
DAX="%s"
else
DAX="%s"
fi
plan=`pegasus-plan \\
--conf "%s" \\
--sites "%s" \\
--dir "%s" \\
--output-site local \\
--dax "$DAX" \\
--randomdir \\
""" % (self.basepath + "/" + loc + "/extract.dax", self.basepath + "/" + loc + "/workflow.dax", self.basepath + "/" + loc + "/conf.rc", "condorpool" if "osg" in self.config else "local", self.basepath + "/work/imgproc"))
if "cluster" in self.config:
submit += """--cluster label \\\n"""
if "osg" in self.config:
submit += """--staging-site stash \\\n"""
submit += """--staging isi_workflow \\\n"""
submit += """--cleanup leaf \\\n"""
submit += textwrap.dedent("""\
--submit`
status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
echo -e "#!/bin/bash
pegasus-status -l $status" > status.sh
chmod 744 status.sh
remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
echo -e "#!/bin/bash
pegasus-remove $remove" > remove.sh
chmod 744 remove.sh
echo "$plan"
echo "Alternatively, you can use the status & remove scripts in the current directory!"
""")
wh.write(submit)
os.chmod(self.basepath + "/" + loc + "/submit.sh", 0755)
return
def create(self):
"""
Creates a new pegasus submission directory based on the current timestamp,
and populates it with all required information to submit a pegasus job.
"""
print "Generating workflow. Please wait."
loc = "/input/imgproc/"
self._createSetup()
self._copyFiles()
self._loadFiles(loc)
self._loadExecutables()
self._loadNotify()
self._createDax(loc)
self._createExtract(loc)
self._createReplica(loc)
self._createSites(loc)
self._createSubmit(loc)
return
class ImageLoader:
"""
This should handle all meta-data definitions
"""
def __init__(self, template, output, validOnly = False, overwrite = False):
"""
:param template: Path to input template file, should be valid json.
:type template: str
"""
self.templatePath = os.path.abspath(template)
self.err = ""
self.output = output + "/input/"
try:
open(self.output + "/images.db")
except IOError:
self.err += "Path Error: Cannot open output db file. \n"
try:
self.log = open(self.output + "/crawl.log", "w")
except IOError:
self.err += "Path Error: Cannot open log file. \n"
self.validator = ih.validator.ImageLoader(self.templatePath)
if not self.validator.isValid() or validOnly:
self.validator.printErrors()
sys.exit()
self.overwrite = overwrite
self.template = self.validator.data
self.template["order"].insert(0, "pegasusid")
self.count = {}
self.depth = len(self.template["path"].split("/")) - 1
self.structure = self._dictTemplate()
self.data = []
return
def _success(self):
if self.overwrite:
print "Data crawled!"
print "Use ih-run to submit your jobs."
else:
print "Directory setup successful!"
print "Define the required templates in " + self.output + "."
print "Then use ih-run to submit your jobs."
return
def _dictTemplate(self):
"""
Creates the first of two intermediary dictionaries for crawling. This relates
identifiers in the path, to a specified depth, as well as operators to split
the value at that depth.
"""
m = [x for x in re.findall(r"%(\w+)%", self.template["path"]) if x != "null"]
sp = self.template["path"].split("/")
d = {}
for key in m:
d[key] = {}
d[key]["depth"] = [n for n,x in enumerate(sp) if key in x][0]
d[key]["split"] = []
val = sp[d[key]["depth"]]
for operator in [" ", "_", "-"]:
if operator in val:
b = val.split(operator)
x = [n for n,x in enumerate(b) if key in x][0]
d[key]["split"].append({"operator": operator, "index": x})
val = b[x]
return d
def _loadStructure(self, splitPath):
"""
Creates the second of two intermediary dictionaries for crawling.
This utilizes the template created in _dictTemplate, and creates
a dictionary with actual values instead of structure.
"""
d ={}
for key in self.structure:
val = splitPath[self.structure[key]["depth"]]
for operator in self.structure[key]["split"]:
val = val.split(operator["operator"])[operator["index"]]
d[key] = val
return d
def _loadDataFile(self, dict, structure, key):
"""
Loads data from a data file. If the specified key isn't in the file,
write 'UNKNOWN' instead.
"""
keyVal = self._convertReferences(structure, self.template["data"][key]["key"])
if keyVal in self.filedata[self.template["data"][key]["value"]]:
dict[key] = self.filedata[self.template["data"][key]["value"]][keyVal]
else:
dict[key] = "UNKNOWN"
return
def _loadDataValue(self, dict, structure, key):
"""
Loads a data value.
"""
val = self._convertReferences(structure, self.template["data"][key]["value"])
if "case" in self.template["data"][key]:
if self.template["data"][key]["case"] == "lower":
val = val.lower()
elif self.template["data"][key]["case"] == "upper":
val = val.upper()
if "translate" in self.template["data"][key]:
if val in self.template["translations"][key]:
val = self.template["translations"][key][val]
dict[key] = val
return
def _loadDataDate(self, dict, structure, key):
"""
Loads a date.
"""
val = self._convertReferences(structure, self.template["data"][key]["value"])
format = "".join([x.replace(x, "%" + x) if x in conf.dateFormat else x for x in self.template["data"][key]["format"]])
val = datetime.datetime.strptime(val, format).strftime("%Y-%m-%d")
dict[key] = val
return
def _convertReferences(self, structure, val):
"""
Replaces all references ('%%') to actual values.
"""
idList = [x for x in re.findall(r"%(\w+)%", val) if x != "null"]
for identifier in idList:
if identifier in structure:
val = val.replace("%" + identifier + "%", structure[identifier])
return val
def _loadData(self, path):
"""
Loads all data for a particular path.
"""
temp = {}
final = {}
splitPath = path.split("/")
structure = self._loadStructure(splitPath)
for key in self.template["data"]:
d = {
"value": self._loadDataValue,
"file": self._loadDataFile,
"date": self._loadDataDate
}[self.template["data"][key]["type"]](final, structure, key)
final["path"] = path
if final["imtype"] not in self.count:
self.count[final["imtype"]] = 1
else:
self.count[final["imtype"]] += 1
final["pegasusid"] = final["imtype"] + str(self.count[final["imtype"]])
return final
def _loadFiles(self):
"""
Reads all data from all specified files.
"""
self.filedata = {}
for key in self.template["data"]:
if self.template["data"][key]["type"] == "file":
if self.template["data"][key]["value"] not in self.filedata:
self.filedata[self.template["data"][key]["value"]] = {}
with open(self.template["data"][key]["value"], "r") as rh:
for line in rh.readlines():
info = line.strip().split(",")
self.filedata[self.template["data"][key]["value"]][info[self.template["data"][key]["keyColumn"]]] = info[self.template["data"][key]["valueColumn"]]
return
def crawl(self):
"""
Recursively walks through directories in base, and loads
the appropriate data.
"""
self._loadFiles()
for root, dirs, files in os.walk(self.template["base"]):
arr = root.split("/")
depth = len(arr)
if (depth == self.depth):
for f in files:
if f[-len(self.template["filetype"]):] == self.template["filetype"] and f[0] != ".":
try:
d = self._loadData(root + "/" + f)
self.data.append(d)
except Exception as e:
self.log.write("Could not load file from path: '%s/%s'\n" % (root, f))
return
def write(self):
"""
Writes the data loaded from crawl into csv format based on 'order'
"""
if self.data:
conn = sqlite3.connect(self.output + "/images.db")
tablestr = "(pegasusid PRIMARY KEY"
for x in self.template["order"]:
if x != "pegasusid":
tablestr += "," + str(x)
tablestr += ")"
if self.overwrite:
conn.execute("DROP TABLE IF EXISTS images")
conn.commit()
conn.execute("CREATE TABLE images " + tablestr)
writedata = []
for row in self.data:
writedata.append(tuple([str(row[x]) for x in self.template["order"]]))
query = "insert into images " + str(tuple([str(x) for x in self.template["order"]])) + " values (" + ("?," * len(self.template["order"]))[:-1] + ")"
conn.executemany(query, writedata)
conn.commit()
if not self.overwrite:
shutil.copyfile(self.templatePath, self.output + "/crawl.json")
else:
print "Call crawl first!"
return
class DirectorySetup:
def __init__(self, jobhome):
"""
:param jobhome: The base directory to setup a job.
:type jobhome: str
"""
if not os.path.isdir(os.path.dirname(os.path.dirname(os.path.abspath(jobhome) + "/"))):
print "Can't create job folder, path doesn't exist!"
sys.exit()
self.jobhome = jobhome
return
def _makeDir(self, dirpath):
"""
Makes a directory if it doesn't exist. Catches and
prints out common errors.
"""
try:
os.makedirs(dirpath)
except OSError as e:
raise Exception({
errno.EEXIST: "Directory " + dirpath + " already exists!",
errno.EACCES: "You don't have permission to create directory " + dirpath + ".",
}.get(e.errno, "Error for directory " + dirpath + " error:" + str(e)))
return
def _makeFile(self, filepath):
"""
Opens a file to create it, uses append mode
so it doesn't overwrite any existing files.
"""
try:
open(filepath, "a")
except OSError as e:
raise Exception({
}.get(e, "Unsepcified error for file " + filepath))
return
def setup(self):
"""
Creates the directory structure needed to submit jobs,
and creates empty template files needed for job submission.
"""
self._makeDir(self.jobhome)
self._makeDir(self.jobhome + "/input/")
for fname in conf.jobFiles:
self._makeFile(self.jobhome + "/input/" + fname)
return