workflow.py 58.4 KB
Newer Older
aknecht2's avatar
aknecht2 committed
1
"""
2
This file is part of Image Harvest.
aknecht2's avatar
aknecht2 committed
3

4 5 6 7
Image Harvest is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
aknecht2's avatar
aknecht2 committed
8

9 10 11 12
Image Harvest is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.
aknecht2's avatar
aknecht2 committed
13

14 15
You should have received a copy of the GNU General Public License
along with Image Harvest.  If not, see <http://www.gnu.org/licenses/>.
aknecht2's avatar
aknecht2 committed
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
"""

import os
import conf
import sys
import json
import re
import csv
import datetime
import sqlite3
import shutil
import errno
import textwrap
import copy
import ih.validator
aknecht2's avatar
aknecht2 committed
31
import getpass
aknecht2's avatar
aknecht2 committed
32 33
import xml.dom.minidom
from Pegasus.DAX3 import *
34

aknecht2's avatar
aknecht2 committed
35 36

class Workflow(object):
37

aknecht2's avatar
aknecht2 committed
38 39 40
    """
        Generic workflow creation class.
    """
41

aknecht2's avatar
aknecht2 committed
42 43 44 45
    def __init__(self, jobhome, basename = None):
        """
            :param jobhome: The base directory for job submission.
            :type jobhome: str
46 47

            Creates a Workflow class based on the input directory.  Only loads and
aknecht2's avatar
aknecht2 committed
48 49 50 51 52 53 54 55 56 57 58 59
            validates the config file by default.
        """
        self.err = ""
        self.jobhome = os.path.abspath(jobhome)
        self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
        self.basename = basename
        self.dax = ADAG(self.jobname)
        self.executables = {}
        self.files = {self.dax: {}}
        self.jobs = {self.dax: {}}
        self.deps = {}
        return
60

aknecht2's avatar
aknecht2 committed
61 62 63 64 65 66 67 68 69 70 71 72 73 74
    def _loadInputFiles(self):
        """
            Loads the input files into the appropriate variables
            This should be overloaded.
        """
        return

    def _isFile(self, name, dax, imtype, inout):
        """
            Convenience function to check if a given file exists.
        """
        if name in self.files[dax][imtype][inout]:
            return True
        return False
75

aknecht2's avatar
aknecht2 committed
76 77 78 79 80 81 82
    def _isJob(self, name, dax):
        """
            Convenience function to check if a given job exists.
        """
        if name in self.jobs[dax]:
            return True
        return False
83

aknecht2's avatar
aknecht2 committed
84 85 86 87 88 89 90
    def _isExecutable(self, name):
        """
            Convenience function to check if a given executable exists.
        """
        if name in self.executables:
            return True
        return False
91

aknecht2's avatar
aknecht2 committed
92 93
    def _createSetup(self):
        """
94
            Creates the base structure for job submission.  Everything is contained
aknecht2's avatar
aknecht2 committed
95 96 97 98 99 100 101 102 103 104 105 106 107
            within a folder based on the current timestamp.
        """
        self.basepath = self.jobhome + "/" + self.basename if self.basename else self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
        if not os.path.exists(self.basepath):
            os.makedirs(self.basepath)
        if not os.path.exists(self.basepath + "/input"):
            os.makedirs(self.basepath + "/input")
        if not os.path.exists(self.basepath + "/output"):
            os.makedirs(self.basepath + "/output")
        if not os.path.exists(self.basepath + "input/templates"):
            os.makedirs(self.basepath + "/input/templates")
        if not os.path.exists(self.basepath + "/input/rawfiles"):
            os.makedirs(self.basepath + "/input/rawfiles")
aknecht2's avatar
aknecht2 committed
108 109 110
        if "osg" in self.config:
            if not os.path.exists(self.basepath + "/staging"):
                os.makedirs(self.basepath + "/staging")
aknecht2's avatar
aknecht2 committed
111
        return
112

aknecht2's avatar
aknecht2 committed
113
    def _addFile(self, name, imtype, inout, path = None, dax = None, derivedPath = None):
aknecht2's avatar
aknecht2 committed
114 115 116 117 118 119 120 121 122
        """
            Adds the inputted file to the dax, as well as the internal variable self.files
        """
        dax = dax if dax else self.dax
        if not self._isFile(name, dax, imtype, inout):
            self.files[dax][imtype][inout][name] = {"file": File(name), "path": ""}
            if inout == "input":
                self.files[dax][imtype][inout][name]["path"] = path if path else name
                self.files[dax][imtype][inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), "local"))
aknecht2's avatar
aknecht2 committed
123 124
                if derivedPath:
                    self.files[dax][imtype][inout][name]["derivedPath"] = derivedPath
aknecht2's avatar
aknecht2 committed
125 126
                dax.addFile(self.files[dax][imtype][inout][name]["file"])
        return
127

128
    def _addJob(self, jobname, executable, inputs, outputs, arguments, dependencies = None, dax = None, label = None, walltime = None):
aknecht2's avatar
aknecht2 committed
129 130
        dax = dax if dax else self.dax
        if not self._isJob(jobname, dax):
aknecht2's avatar
aknecht2 committed
131 132 133 134
            if "osg" in self.config:
                self.jobs[dax][jobname] = Job("osg-wrapper.sh")
            else:
                self.jobs[dax][jobname] = Job(executable)
aknecht2's avatar
aknecht2 committed
135
            dax.addJob(self.jobs[dax][jobname])
aknecht2's avatar
aknecht2 committed
136 137
            if "osg" in self.config:
                self.jobs[dax][jobname].uses("ih.tar.gz", link = Link.INPUT)
aknecht2's avatar
aknecht2 committed
138 139 140 141 142
            for key in inputs:
                self.jobs[dax][jobname].uses(inputs[key]["file"], link = Link.INPUT)
            for key in outputs:
                self.jobs[dax][jobname].uses(outputs[key]["file"], link = Link.OUTPUT, transfer = outputs[key]["transfer"])
            arglist = []
aknecht2's avatar
aknecht2 committed
143 144
            if "osg" in self.config:
                arglist.append("./ih-" + self.config["version"] + "/scripts/" + executable)
aknecht2's avatar
aknecht2 committed
145 146 147 148 149 150
            for arg in arguments:
                arglist.append(arg)
                if str(arguments[arg]) in inputs:
                    arglist.append(inputs[str(arguments[arg])]["file"])
                elif str(arguments[arg]) in outputs:
                    arglist.append(outputs[str(arguments[arg])]["file"])
aknecht2's avatar
aknecht2 committed
151
                else:
aknecht2's avatar
aknecht2 committed
152 153 154 155
                    if "osg" in self.config:
                        arglist.append("'" + str(arguments[arg]) + "'")
                    else:
                        arglist.append(str(arguments[arg]))
aknecht2's avatar
aknecht2 committed
156 157 158 159 160 161
            self.jobs[dax][jobname].addArguments(*arglist)
            if dependencies:
                for depend in dependencies:
                    dax.depends(child = self.jobs[dax][jobname], parent = self.jobs[dax][depend])
            if label:
                self.jobs[dax][jobname].profile(Namespace.PEGASUS, "label", label)
162
            if walltime:
163
                self.jobs[dax][jobname].profile(Namespace.GLOBUS, "maxwalltime", walltime)
aknecht2's avatar
aknecht2 committed
164
        return
165

aknecht2's avatar
aknecht2 committed
166 167 168 169 170 171
    def create(self):
        """
            Creates a new pegasus submission directory based on the current timestamp,
            and populates it with all required information to submit a pegasus job.
            This function should be overloaded.
        """
172
        return
aknecht2's avatar
aknecht2 committed
173 174 175


class Statistics(Workflow):
176

aknecht2's avatar
aknecht2 committed
177 178 179 180
    """
        A class specifically for statistics workflow
        validation and submission.
    """
181

aknecht2's avatar
aknecht2 committed
182 183 184 185 186 187 188 189 190
    def __init__(self, jobhome, basename, validOnly = False):
        super(Statistics, self).__init__(jobhome, basename)
        self.configfile = jobhome + "/input/" + conf.configFile
        self.statsfile = jobhome + "/input/" + conf.statsFile
        self.dbfile = jobhome + "/" + basename + "/output/" + conf.outputdb
        self.validOnly = validOnly
        self.rawFiles = {}
        self._loadInputFiles()
        return
191

aknecht2's avatar
aknecht2 committed
192 193
    def _getImageTypes(self):
        return [x[0] for x in self.db.execute("select distinct imtype from images")]
194 195


aknecht2's avatar
aknecht2 committed
196 197 198 199 200 201 202 203 204 205 206
    def _loadInputFiles(self):
        self.validator = ih.validator.Statistics(self.statsfile, self.configfile, self.dbfile)
        if not self.validator.isValid() or self.validOnly:
            self.validator.printErrors()
            sys.exit()
        else:
            self.stats = self.validator.workflow.data
            self.db = self.validator.db.conn
            self.config = self.validator.config.data
            self.rawFiles = self.validator.workflow.rawFiles
        return
207

aknecht2's avatar
aknecht2 committed
208 209 210 211 212 213 214 215
    def _copyFiles(self):
        """
            Copies all input files to the correct location.
        """
        if not os.path.exists(self.basepath + "/input/stats"):
            os.makedirs(self.basepath + "/input/stats")
        shutil.copyfile(self.statsfile, self.basepath + "/input/templates/" + os.path.basename(self.statsfile))
        return
216

aknecht2's avatar
aknecht2 committed
217 218 219 220 221 222 223 224 225 226 227 228 229
    def _loadFiles(self, loc):
        """
            Loads all files (input and output) into the dax and the self.files variable
        """
        self.files[self.dax] = {}
        self.files[self.dax]["raw"] = {"input": {}, "output": {}}
        self._addFile("output.db", "raw", "input", self.basepath + "/output/output.db")
        for type in self.stats["workflows"]:
            self.files[self.dax][type] = {"input": {}, "output": {}}
            for f in self.rawFiles[type]:
                shutil.copyfile(f, self.basepath + "/input/rawfiles/" + os.path.basename(f))
                self._addFile(os.path.basename(f), type, "input", self.basepath + "/input/rawfiles/" + os.path.basename(f))
        return
230

aknecht2's avatar
aknecht2 committed
231 232 233 234 235 236
    def _loadExecutables(self, os="linux", arch="x86_64"):
        """
            Loads all executables (as specified in the conf) into
            the dax, as well as the internal self.executables variable
        """
        for ex in conf.valid:
aknecht2's avatar
aknecht2 committed
237 238
            if "osg" in self.config:
                self.executables[ex] = Executable(name=ex, os=os, arch=arch, installed=False)
aknecht2's avatar
aknecht2 committed
239 240
            else:
                self.executables[ex] = Executable(name=ex, os=os, arch=arch)
aknecht2's avatar
aknecht2 committed
241 242 243 244 245
            self.executables[ex].addPFN(PFN("file://" + self.config["installdir"] + "/" + ex, "local"))
            #if "cluster" in self.config:
            #     self.executables[ex].addProfile(Profile(Namespace.PEGASUS, "clusters.size", self.config["cluster"]))
            self.dax.addExecutable(self.executables[ex])
        return
246

247
    def _loadNotify(self):
248
        if "notify" in self.config:
249 250
            self.dax.invoke(When.AT_END, self.basepath + "/input/stats/notify.sh")
            with open(self.basepath + "/input/stats/notify.sh", "w") as wh:
251 252
                notify = textwrap.dedent("""\
                        #!/bin/bash
aknecht2's avatar
aknecht2 committed
253
                        %s/notification/email -t %s --report=pegasus-analyzer
254
                """ % (self.config["notify"]["pegasus_home"], self.config["notify"]["email"]))
255
                wh.write(notify)
256
            os.chmod(self.basepath + "/input/stats/notify.sh", 0755)
257
        return
258

259 260 261 262 263 264 265 266 267 268
    def _loadJobInputs(self, job, type, basename, extension, save = False):
        inputs = {}
        for i,input in enumerate(job["inputs"]):
            ftype = conf.valid[job["executable"]]["inputs"][i]
            if input in self.rawFiles[type]:
                inputs[input] = {"file": os.path.basename(input), "transfer": save}
            else:
                ex = extension if job["name"] == self.workflow["workflows"][type][0]["name"] else conf.fileExtensions[ftype]
                inputs[input] = {"file": basename + "_" + input + ex, "transfer": save}
        return inputs
269

270 271
    def _loadJobInputs(self, job, type, save = False):
        inputs = {"output.db": {"file": "output.db", "transfer": True}}
aknecht2's avatar
aknecht2 committed
272 273 274
        for i,input in enumerate(job["inputs"]):
            ftype = conf.valid[job["executable"]]["inputs"][i]
            if input in self.rawFiles[type]:
275
                inputs[input] = {"file": os.path.basename(input), "transfer": save}
aknecht2's avatar
aknecht2 committed
276
        return inputs
277

aknecht2's avatar
aknecht2 committed
278 279 280 281 282 283
    def _createDax(self, loc):
        """
            Loads all jobs into the dax, and then writes the dax
            to input/loc/stats.dax
        """
        serial = []
284
        if "maxwalltime" in self.config:
285 286
            if "stats" in self.config["maxwalltime"]:
                maxwalltime = self.config["maxwalltime"]["stats"]
287 288 289 290
            else:
                maxwalltime = None
        else:
            maxwalltime = None
aknecht2's avatar
aknecht2 committed
291 292 293 294 295 296 297 298 299 300 301 302 303
        for type in self.stats["workflows"]:
            for job in self.stats["workflows"][type]:
                jobname = type + "_" + job["name"]
                job["arguments"]["--intable"] = type + "_" + job["arguments"]["--intable"] if job["arguments"]["--intable"] != "images" else job["arguments"]["--intable"]
                job["arguments"]["--outtable"] = type + "_" + job["arguments"]["--outtable"]
                job["arguments"]["--db"] = "output.db"
                inputs = self._loadJobInputs(job, type)
                outputs = {}
                depends = [type + "_" + depend for depend in job["depends"]] if "depends" in job else serial
                if self.stats["workflows"][type].index(job) == len(self.stats["workflows"][type]) - 1:
                    serial = [jobname]
                else:
                    serial = []
304
                self._addJob(jobname, job["executable"], inputs, outputs, job["arguments"], depends, walltime = maxwalltime)
aknecht2's avatar
aknecht2 committed
305 306 307
        with open(self.basepath + "/" + loc + "/stats.dax", "w") as wh:
            self.dax.writeXML(wh)
        return
308

aknecht2's avatar
aknecht2 committed
309 310 311 312 313 314 315 316
    def _createReplica(self, loc):
        """
            Creates the pegasus configuration replica catalog.  input/conf.rc
        """
        with open(self.basepath + "/" + loc + "/conf.rc", "w") as wh:
            pegasusrc = textwrap.dedent("""\
                        pegasus.catalog.site = XML
                        pegasus.catalog.site.file = %s/sites.xml
317

aknecht2's avatar
aknecht2 committed
318
                        pegasus.condor.logs.symlink = false
319

aknecht2's avatar
aknecht2 committed
320
                        pegasus.transfer.links = true
321

322
                        pegasus.data.configuration = %s
323

aknecht2's avatar
aknecht2 committed
324 325 326
                        """ % (self.basepath + "/" + loc))
            wh.write(pegasusrc)
        return
327

aknecht2's avatar
aknecht2 committed
328 329 330 331 332
    def _createSites(self, loc):
        """
            Creates the pegasus site catalog.  input/sites.xml
        """
        with open(self.basepath + "/" + loc + "/sites.xml", "w") as wh:
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
            if "osg" in self.config:
                sites = """\
                <sitecatalog version="3.0" xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-3.0.xsd">
                    <site handle="local" arch="x86_64" os="LINUX">
                            <head-fs>
                                <scratch>
                                        <shared>
                                            <file-server protocol="file" url="file://" mount-point="%s"/>
                                            <internal-mount-point mount-point="%s"/>
                                        </shared>
                                    </scratch>
                                <storage>
                                        <shared>
                                            <file-server protocol="file" url="file://" mount-point="%s"/>
                                            <internal-mount-point mount-point="%s"/>
                                        </shared>
                                    </storage>
                            </head-fs>
aknecht2's avatar
aknecht2 committed
351
                    """ % (self.basepath + "/work/stats/", self.basepath + "/work/stats/", self.basepath + "/output/", self.basepath + "/output/", self.basepath)
aknecht2's avatar
aknecht2 committed
352
                sites += """\
353 354 355 356 357 358
                    </site>
                    <site handle="condorpool" arch="x86_64" os="LINUX">
                            <head-fs>
                                    <scratch />
                                    <storage />
                            </head-fs>
aknecht2's avatar
aknecht2 committed
359
                """
360 361 362
            else:
                sites = """\
                 <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
363

364 365 366 367 368 369 370 371
                            <site  handle="local" arch="x86_64" os="LINUX">
                                <directory type="shared-scratch" path="%s">
                                    <file-server operation="all" url="file://%s" />
                                </directory>
                                <directory type="local-storage" path="%s">
                                    <file-server operation="all" url="file://%s" />
                                </directory>
                            """ % (self.basepath + "/work/stats/", self.basepath + "/work/stats/", self.basepath + "/output/", self.basepath + "/output/")
aknecht2's avatar
aknecht2 committed
372 373 374 375 376 377 378 379
            for namespace in self.config["profile"]:
                for key in self.config["profile"][namespace]:
                    val = ":".join(self.config["profile"][namespace][key]) if "path" in key.lower() else self.config["profile"][namespace][key]
                    sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
            sites += "</site></sitecatalog>"
            sites = sites.replace("\n","")
            wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
        return
380

aknecht2's avatar
aknecht2 committed
381 382 383 384 385
    def _createSubmit(self, loc):
        """
            Creates the pegasus submit script.  submit.sh
        """
        with open(self.basepath + "/" + loc + "/submit.sh", "w") as wh:
386

aknecht2's avatar
aknecht2 committed
387 388
            submit = textwrap.dedent("""\
                    #!/bin/bash
389
                    %s
aknecht2's avatar
aknecht2 committed
390 391
                    plan=`pegasus-plan \\
                    --conf "%s" \\
aknecht2's avatar
aknecht2 committed
392
                    --sites "%s" \\
aknecht2's avatar
aknecht2 committed
393 394 395
                    --dir "%s" \\
                    --output-site local \\
                    --dax "%s" \\
396
                    --randomdir \\""" % ("module unload python/2.7" if "osg" in self.config else "", self.basepath + "/" + loc + "/conf.rc", "condorpool" if "osg" in self.config else "local", self.basepath + "/work/stats", self.basepath + "/" + loc + "/stats.dax"))
aknecht2's avatar
aknecht2 committed
397 398 399 400
            if "cluster" in self.config:
                submit += """--cluster horizontal \\\n"""
            submit += textwrap.dedent("""\
                    --submit`
401

aknecht2's avatar
aknecht2 committed
402 403 404 405
                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh
406

aknecht2's avatar
aknecht2 committed
407 408
                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
409
                    pegasus-remove $remove" > remove.sh
aknecht2's avatar
aknecht2 committed
410
                    chmod 744 remove.sh
411

aknecht2's avatar
aknecht2 committed
412 413
                    echo "$plan"
                    echo "Alternatively, you can use the status & remove scripts in the current directory!"
414

aknecht2's avatar
aknecht2 committed
415 416 417 418
                    """)
            wh.write(submit)
            os.chmod(self.basepath + "/" + loc + "/submit.sh", 0755)
        return
419

aknecht2's avatar
aknecht2 committed
420 421 422 423 424 425 426 427 428 429
    def create(self):
        """
            Creates a new pegasus submission directory based on the current timestamp,
            and populates it with all required information to submit a pegasus job.
        """
        loc = "/input/stats/"
        self._createSetup()
        self._copyFiles()
        self._loadFiles(loc)
        self._loadExecutables()
430
        self._loadNotify()
aknecht2's avatar
aknecht2 committed
431 432 433 434
        self._createDax(loc)
        self._createReplica(loc)
        self._createSites(loc)
        self._createSubmit(loc)
435 436 437 438 439 440
        return





aknecht2's avatar
aknecht2 committed
441
class ImageProcessor(Workflow):
442

aknecht2's avatar
aknecht2 committed
443 444 445 446
    """
        A class specifically for image processing workflow
        validation and submission.
    """
447

aknecht2's avatar
aknecht2 committed
448 449 450 451
    def __init__(self, jobhome, basename = None, validOnly = False):
        """
            :param jobhome: The base directory for job submission.
            :type jobhome: str
452

aknecht2's avatar
aknecht2 committed
453 454 455 456 457 458 459 460 461 462 463 464 465
            Creates a Workflow class based on the input directory.  Loads and validates all input files,
            and quits out if something doesn't exist or is defined innapropriately.
        """
        super(ImageProcessor, self).__init__(jobhome, basename)
        self.validOnly = validOnly
        self.workfile = jobhome + "/input/" + conf.workflowFile
        self.metafile = jobhome + "/input/" + conf.dbFile
        self.configfile = jobhome + "/input/" + conf.configFile
        self._loadInputFiles()
        self.exdax = ADAG("extract-" + self.jobname)
        self.jobs[self.exdax] = {}
        self.files[self.exdax] = {}
        return
466

aknecht2's avatar
aknecht2 committed
467 468 469 470 471 472 473 474 475 476 477 478 479 480
    def _loadInputFiles(self):
        """
            Loads the input files into the appropriate variables
        """
        self.validator = ih.validator.ImageProcessor(self.workfile, self.configfile, self.metafile)
        if not self.validator.isValid() or self.validOnly:
            self.validator.printErrors()
            sys.exit()
        else:
            self.workflow = self.validator.workflow.data
            self.metadata = self.validator.db.conn
            self.config = self.validator.config.data
            self.rawFiles = self.validator.workflow.rawFiles
        return
481

aknecht2's avatar
aknecht2 committed
482 483 484 485 486
    def _getImageTypes(self):
        """
            Convenience function to get all image types from the database.
        """
        return [x[0] for x in self.metadata.execute("select distinct imtype from images")]
487

aknecht2's avatar
aknecht2 committed
488 489 490 491 492 493 494 495 496 497 498 499
    def _copyFiles(self):
        """
            Copies all input files to the correct location.
        """
        if not os.path.exists(self.basepath + "/input/imgproc"):
            os.makedirs(self.basepath + "/input/imgproc")
        shutil.copyfile(self.workfile, self.basepath + "/input/templates/" + os.path.basename(self.workfile))
        shutil.copyfile(self.configfile, self.basepath + "/input/templates/" + os.path.basename(self.configfile))
        shutil.copyfile(self.metafile, self.basepath + "/output/output.db")
        for type in self.rawFiles:
            for input in self.rawFiles[type]:
                shutil.copyfile(input, self.basepath + "/input/rawfiles/" + os.path.basename(input))
aknecht2's avatar
aknecht2 committed
500
        if "osg" in self.config:
aknecht2's avatar
aknecht2 committed
501
            shutil.copyfile(self.config["osg"]["tarball"], self.basepath + "/input/rawfiles/" + os.path.basename(self.config["osg"]["tarball"]))
aknecht2's avatar
aknecht2 committed
502
        return
503

aknecht2's avatar
aknecht2 committed
504 505 506 507 508 509 510 511 512 513 514 515
    def _loadFiles(self, loc):
        """
            Loads all files (input and output) into the dax and the self.files variable
        """
        with open(self.basepath + "/input/rawfiles/extract.json", "w") as wh:
            json.dump(self.workflow["extract"], wh)
        with open(self.basepath + "/" + loc + "/map.rc", "w") as wh:
            self.exInput = {"db": "img.db", "extract.json": "extract.json"}
            self.files[self.dax] = {}
            self.files[self.exdax] = {}
            self.files[self.exdax]["all"] = {"input": {}, "output": {}}
            self.files[self.dax]["raw"] = {"input": {}, "output": {}}
aknecht2's avatar
aknecht2 committed
516
            self.files[self.exdax]["raw"] = {"input": {}, "output": {}}
aknecht2's avatar
aknecht2 committed
517
            if "osg" in self.config:
aknecht2's avatar
aknecht2 committed
518 519
                self._addFile("ih.tar.gz", "raw", "input", self.basepath + "/input/rawfiles/ih-" + self.config["version"] + ".tar.gz")
                self._addFile("ih.tar.gz", "raw", "input", self.basepath + "/input/rawfiles/ih-" + self.config["version"] + ".tar.gz", dax = self.exdax)
aknecht2's avatar
aknecht2 committed
520 521
            self._addFile("img.db", "raw", "input", self.basepath + "/output/output.db")
            self._addFile("img.db", "all", "input", self.basepath + "/output/output.db", dax = self.exdax)
aknecht2's avatar
aknecht2 committed
522 523 524 525
            self._addFile("img2.db", "raw", "output", self.basepath + "/output/img2.db")
            self._addFile("img2.db", "raw", "output", self.basepath + "/output/img2.db", dax = self.exdax)
            self._addFile("img3.db", "raw", "output", self.basepath + "/output/img3.db")
            self._addFile("img3.db", "raw", "output", self.basepath + "/output/img3.db", dax = self.exdax)
aknecht2's avatar
aknecht2 committed
526 527 528 529 530 531 532 533
            self._addFile("img.log", "raw", "output", self.basepath + "/output/imgproc.log")
            self._addFile("extract.json", "raw", "input", self.basepath + "/input/rawfiles/extract.json")
            self._addFile("extract.json", "all", "input", self.basepath + "/input/rawfiles/extract.json", dax = self.exdax)
            for type in self.workflow["workflows"]:
                self.files[self.dax][type] = {"input": {}, "output": {}}
                for input in self.rawFiles[type]:
                    self._addFile(os.path.basename(input), type, "input", self.basepath + "/input/rawfiles/" + os.path.basename(input))
                wh.write("img.log file://" + self.basepath + "/output/imgproc.log pool=\"local\"\n")
534
                wh.write("img2.db file://" + self.basepath + "/output/img2.db pool=\"local\"\n")
535
                wh.write("img3.db file://" + self.basepath + "/output/img3.db pool=\"local\"\n")
aknecht2's avatar
aknecht2 committed
536 537 538 539 540 541 542 543
                inputImages = [self.workflow["workflows"][type][0]["inputs"][i] for i,x in enumerate(conf.valid[self.workflow["workflows"][type][0]["executable"]]["inputs"]) if x == "image"]
                outputImages = [self.workflow["workflows"][type][z]["outputs"][i] for z in range(0, len(self.workflow["workflows"][type])) for i,x in enumerate(conf.valid[self.workflow["workflows"][type][z]["executable"]]["outputs"]) if x == "image"]
                outputFiles = dict((self.workflow["workflows"][type][z]["outputs"][i], conf.fileExtensions[conf.valid[self.workflow["workflows"][type][z]["executable"]]["outputs"][i]]) for z in range(0, len(self.workflow["workflows"][type])) for i,x in enumerate(conf.valid[self.workflow["workflows"][type][z]["executable"]]["outputs"]) if x != "image" and x != "none" and len(self.workflow["workflows"][type][z]["outputs"]) > i)
                for row in self.metadata.execute("select pegasusid, experiment, id, date, imgname, path from images where imtype=?", (type,)):
                    realname = row["path"].split("/")[-1].split(".")[0]
                    #derivedPath = row["experiment"].replace(" ","%20") + "/" + row["id"].replace(" ","%20") + "/" + row["date"].replace(" ","%20") + "/" + type + "/" + row["imgname"].replace(" ","%20") + "/"
                    derivedPath = row["experiment"].replace(" ","") + "/" + row["id"].replace(" ","") + "/" + row["date"].replace(" ","") + "/" + type + "/" + row["imgname"].replace(" ","") + "/"
                    for input in inputImages:
544
                        if "osg" in self.config:
aknecht2's avatar
aknecht2 committed
545
                            self._addFile(row["pegasusid"] + "_" + input + "." + row["path"].split(".")[1], type, "input", row["path"], derivedPath = derivedPath + row["pegasusid"])
546
                        else:
aknecht2's avatar
aknecht2 committed
547
                            self._addFile(derivedPath + row["pegasusid"] + "_" + input + "." + row["path"].split(".")[1], type, "input", row["path"], derivedPath = derivedPath + row["pegasusid"])
aknecht2's avatar
aknecht2 committed
548 549 550 551 552
                    for output in outputImages:
                        if output in self.workflow["extract"]["workflows"][type]["inputs"]:
                            self.exInput[derivedPath + row["pegasusid"] + "_" + output + ".png"] = derivedPath + row["pegasusid"] + "_" + output + ".png"
                            self._addFile(derivedPath + row["pegasusid"] + "_" + output + ".png", "all", "input", self.basepath + "/output/" + derivedPath + row["pegasusid"] + "_" + output + ".png", dax = self.exdax)
                        self._addFile(derivedPath + row["pegasusid"] + "_" + output + ".png", type, "output")
553
                        wh.write(derivedPath + row["pegasusid"] + "_" + output + ".png file://" + self.basepath + "/output/" + derivedPath + row["pegasusid"].replace(" ","%20") + "_" + output + ".png pool=\"local\"\n")
aknecht2's avatar
aknecht2 committed
554 555 556 557 558
                    for output in outputFiles:
                        outname = output + outputFiles[output]
                        self._addFile(derivedPath + row["pegasusid"] + "_" + outname, type, "output")
                        wh.write(derivedPath + row["pegasusid"] + "_" + outname + " file://" + self.basepath + "/output/" + derivedPath.replace("_","%20") + outname + " pool=\"local\"\n")
        return
559

aknecht2's avatar
aknecht2 committed
560
    def _loadNotify(self):
561
        if "notify" in self.config:
562 563
            self.dax.invoke(When.AT_END, self.basepath + "/input/imgproc/notify.sh")
            self.exdax.invoke(When.AT_END, self.basepath + "/input/imgproc/notify.sh")
aknecht2's avatar
aknecht2 committed
564 565 566
            with open(self.basepath + "/input/imgproc/notify.sh", "w") as wh:
                notify = textwrap.dedent("""\
                        #!/bin/bash
aknecht2's avatar
aknecht2 committed
567
                        %s/notification/email -t %s --report=pegasus-analyzer
568
                """ % (self.config["notify"]["pegasus_home"], self.config["notify"]["email"]))
aknecht2's avatar
aknecht2 committed
569 570 571
                wh.write(notify)
            os.chmod(self.basepath + "/input/imgproc/notify.sh", 0755)
        return
572

aknecht2's avatar
aknecht2 committed
573 574 575 576 577 578
    def _loadExecutables(self, os="linux", arch="x86_64"):
        """
            Loads all executables (as specified in the conf) into
            the dax, as well as the internal self.executables variable
        """
        for ex in conf.valid:
aknecht2's avatar
aknecht2 committed
579
            if "osg" in self.config:
aknecht2's avatar
aknecht2 committed
580
                self.executables[ex] = Executable(name=ex, os=os, arch=arch, installed = False)
aknecht2's avatar
aknecht2 committed
581
            else:
aknecht2's avatar
aknecht2 committed
582
                self.executables[ex] = Executable(name=ex, os=os, arch=arch)
aknecht2's avatar
aknecht2 committed
583
            self.executables[ex].addPFN(PFN("file://" + self.config["installdir"] + "/" + ex, "local"))
aknecht2's avatar
aknecht2 committed
584 585 586 587 588 589
            #if "cluster" in self.config:
            #    val = int(self.config["cluster"] * conf.valid[ex]["weight"]) if "weight" in conf.valid[ex] else self.config["cluster"]
            #    self.executables[ex].addProfile(Profile(Namespace.PEGASUS, "clusters.size", val))
            self.dax.addExecutable(self.executables[ex])
            self.exdax.addExecutable(self.executables[ex])
        return
590

aknecht2's avatar
aknecht2 committed
591 592 593 594 595 596 597 598
    def _loadJobInputs(self, job, type, basename, extension, save = False):
        inputs = {}
        for i,input in enumerate(job["inputs"]):
            ftype = conf.valid[job["executable"]]["inputs"][i]
            if input in self.rawFiles[type]:
                inputs[input] = {"file": os.path.basename(input), "transfer": save}
            else:
                ex = extension if job["name"] == self.workflow["workflows"][type][0]["name"] else conf.fileExtensions[ftype]
599 600
                if "osg" in self.config:
                    if input == "base":
601
                        inputs[input] = {"file": os.path.basename(basename) + "_" + input + ex, "transfer": save}
602 603
                    else:
                        inputs[input] = {"file": basename + "_" + input + ex, "transfer": save}
604 605
                else:
                    inputs[input] = {"file": basename + "_" + input + ex, "transfer": save}
aknecht2's avatar
aknecht2 committed
606
        return inputs
607

aknecht2's avatar
aknecht2 committed
608 609 610 611 612 613
    def _loadJobOutputs(self, job, type, basename, save):
        outputs = {}
        for i,output in enumerate(job["outputs"]):
            if output != "none":
                outputs[output] = {"file": basename + "_" + output + conf.fileExtensions[conf.valid[job["executable"]]["outputs"][i]], "transfer": save}
        return outputs
614

aknecht2's avatar
aknecht2 committed
615 616
    #def _loadJobOutputs(self, job, type, basename):
    #    return dict((output, basename + "_" + output + conf.fileExtensions[conf.valid[job["executable"]]["outputs"][i]]) for i,output in enumerate(job["outputs"]) if output != "none")
617

aknecht2's avatar
aknecht2 committed
618 619 620 621 622 623 624 625 626
    def _createDax(self, loc):
        """
            Loads all jobs into the dax, and then writes the dax
            to input/workflow.dax
        """
        exDep = {}
        exInput = {}
        clusternum = {}
        meancluster = {}
627
        excluster = {}
628

629 630 631 632 633 634 635
        if "maxwalltime" in self.config:
            if "images" in self.config["maxwalltime"]:
                maxwalltime = self.config["maxwalltime"]["images"]
            else:
                maxwalltime = None
        else:
            maxwalltime = None
636

aknecht2's avatar
aknecht2 committed
637 638 639 640 641
        save = True if "save-steps" in self.workflow["options"] else False
        for type in self.workflow["workflows"]:
            exDep[type] = [[]]
            exInput[type] = [{}]
            jobnum = -1
642
            skipFirst = True
aknecht2's avatar
aknecht2 committed
643 644
            clusternum[type] = 0
            meancluster[type] = 0
645
            excluster[type] = 0
aknecht2's avatar
aknecht2 committed
646 647 648 649 650 651 652
            exNames = self.workflow["extract"]["workflows"][type]["depends"]
            for infile in [x for x in self.files[self.dax][type]["input"] if "." + x.split(".")[1] in conf.imageExtensions]:
                if "cluster" in self.config:
                    jobnum += 1
                    if jobnum == self.config["cluster"]:
                        jobnum = 0
                        clusternum[type] += 1
653
                    if ((clusternum[type] * 100 + jobnum) % int(self.config["cluster"] * 0.3)) == 0:
aknecht2's avatar
aknecht2 committed
654
                        meancluster[type] += 1
655
                    if (jobnum % 50) == 0 and not skipFirst:
656 657
                        exDep[type].append([])
                        exInput[type].append({})
658
                        excluster[type] += 1
659
                    skipFirst = False
aknecht2's avatar
aknecht2 committed
660 661
                extension = "." + infile.split(".")[1]
                realname = self.files[self.dax][type]["input"][infile]["path"].split("/")[-1].split(".")[0]
662
                derivedPath = self.files[self.dax][type]["input"][infile]["derivedPath"]
aknecht2's avatar
aknecht2 committed
663 664 665 666 667
                for stepnum,job in enumerate(self.workflow["workflows"][type]):
                    jobname = derivedPath + "_" + job["name"]
                    inputs = self._loadJobInputs(job, type, derivedPath, extension)
                    if job["name"] in exNames:
                        outputs = self._loadJobOutputs(job, type, derivedPath, True)
668
                        exDep[type][excluster[type]].append(jobname)
aknecht2's avatar
aknecht2 committed
669
                        reqFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["inputs"][0] + ".png"
670
                        exInput[type][excluster[type]][reqFile] = {"file": reqFile, "transfer": save}
671
                        if "--dimfromroi" in self.workflow["extract"]["workflows"][type]["arguments"]:
aknecht2's avatar
aknecht2 committed
672
                            if os.path.isfile(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"]):
aknecht2's avatar
aknecht2 committed
673
                                roiFile = os.path.basename(self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"])
674
                            else:
aknecht2's avatar
aknecht2 committed
675
                                roiFile = derivedPath + "_" + self.workflow["extract"]["workflows"][type]["arguments"]["--dimfromroi"] + ".json"
676
                            exInput[type][excluster[type]][roiFile] = {"file": roiFile, "transfer": save}
aknecht2's avatar
aknecht2 committed
677 678 679 680
                    else:
                        outputs = self._loadJobOutputs(job, type, derivedPath, save)
                    depends = [derivedPath + "_" + depend for depend in job["depends"]] if "depends" in job else []
                    if job["executable"] == "ih-meanshift":
681
                        self._addJob(jobname, job["executable"], inputs, outputs, job["arguments"], depends, label = type + "_step" + str(stepnum) + "_cluster" + str(meancluster[type]) if "cluster" in self.config else None, walltime = maxwalltime)
682
                    else:
683
                        self._addJob(jobname, job["executable"], inputs, outputs, job["arguments"], depends, label = type + "_step" + str(stepnum) + "_cluster" + str(clusternum[type]) if "cluster" in self.config else None, walltime = maxwalltime)
684 685


aknecht2's avatar
aknecht2 committed
686 687 688
        maprc = open(self.basepath + "/" + loc + "/map.rc", "a")
        binDep = []
        aggIn = {}
aknecht2's avatar
aknecht2 committed
689
        aggIn2 = {}
aknecht2's avatar
aknecht2 committed
690
        for type in self.workflow["workflows"]:
691
            for q in range(0, excluster[type] + 1):
aknecht2's avatar
aknecht2 committed
692 693 694
                arguments = self.workflow["extract"]["workflows"][type]["arguments"]
                if "--input" in arguments:
                    del arguments["--input"]
695 696
                if "--dimfromroi" in arguments:
                    del arguments["--dimfromroi"]
697
                    arguments["--dimfromroi"] = " ".join([x for x in exInput[type][q].keys() if ".json" in x])
aknecht2's avatar
aknecht2 committed
698 699 700 701 702
                if "histogram-bin" in self.workflow["extract"]:
                    if type in [imtype for key in self.workflow["extract"]["histogram-bin"]["--group"] for imtype in self.workflow["extract"]["histogram-bin"]["--group"][key]]:
                        arguments["--colors"] = ""
                arguments["--db"] = "db"
                arguments["--createdb"] = ""
703
                arguments["--inputs"] = " ".join([x for x in exInput[type][q].keys() if ".png" in x])
aknecht2's avatar
aknecht2 committed
704
                self._addFile(type + str(q) + ".db", type, "output")
aknecht2's avatar
aknecht2 committed
705
                self._addFile(type + str(q) + "_2.db", type, "output")
706 707
                self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, exDep[type][q], walltime = 180)
                self._addJob(type + "_extract" + str(q), "ih-extract-multi", exInput[type][q], {"db": {"file": type + str(q) + ".db", "transfer": False}}, arguments, [], dax = self.exdax, walltime = 180)
aknecht2's avatar
aknecht2 committed
708 709 710
                maprc.write(type + str(q) + ".db" + " file://" + self.basepath + "/output/" + type + str(q) + ".db" + " pool=\"local\"\n")
                binDep.append(type + "_extract" + str(q))
                aggIn[type + str(q) + ".db"] = {"file": type + str(q) + ".db", "transfer": False}
aknecht2's avatar
aknecht2 committed
711
                aggIn2[type + str(q) + "_2.db"] = {"file": type + str(q) + "_2.db", "transfer": False}
aknecht2's avatar
aknecht2 committed
712
        aggIn["db"] = {"file": "img.db", "transfer": False}
713 714
        self._addJob("sql_aggregate1", "ih-sql-aggregate", aggIn, {"img2.db": {"file": "img2.db", "transfer": False if "histogram-bin" in self.workflow["extract"] else True}}, {"--db": "db", "--output": "img2.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep, walltime = 180)
        self._addJob("sql_aggregate1", "ih-sql-aggregate", aggIn, {"img2.db": {"file": "img2.db", "transfer": False if "histogram-bin" in self.workflow["extract"] else True}}, {"--db": "db", "--output": "img2.db", "--inputs": " ".join([aggIn[x]["file"] for x in aggIn if x != "db"])}, binDep, dax = self.exdax, walltime = 180)
715

aknecht2's avatar
aknecht2 committed
716 717 718 719 720 721
        if "histogram-bin" in self.workflow["extract"]:
            outputs = {}
            for name in self.workflow["extract"]["histogram-bin"]["--group"]:
                self._addFile(name + "_hist_bins.json", "raw", "output")
                maprc.write(name + "_hist_bins.json" + " file://" + self.basepath + "/output/" + name + "_hist_bins.json" + " pool=\"local\"\n")
                outputs[name + "_hist_bins.json"] = {"file": name + "_hist_bins.json", "transfer": True}
aknecht2's avatar
aknecht2 committed
722 723
            self._addJob("bin_creation", "ih-stats-histogram-bin", {"db": {"file": "img2.db", "transfer": True}, "extract.json": {"file": "extract.json", "transfer": False}}, outputs, {"--db": "db", "--options": "extract.json", "--intable": "images", "--outtable": "histogramBins", "--jsonwrite": "", "--overwrite": ""}, ["sql_aggregate1"])
            self._addJob("bin_creation", "ih-stats-histogram-bin", {"db": {"file": "img2.db", "transfer": True}, "extract.json": {"file": "extract.json", "transfer": False}}, outputs, {"--db": "db", "--options": "extract.json", "--intable": "images", "--outtable": "histogramBins", "--jsonwrite": "", "--overwrite": ""}, ["sql_aggregate1"], dax = self.exdax)
724

aknecht2's avatar
aknecht2 committed
725 726
            binDep = []
            map = {}
727

aknecht2's avatar
aknecht2 committed
728 729 730
            for group in self.workflow["extract"]["histogram-bin"]["--group"]:
                for type in self.workflow["extract"]["histogram-bin"]["--group"][group]:
                    map[type] = group
731

aknecht2's avatar
aknecht2 committed
732
            for type in self.workflow["workflows"]:
733
                for q in range(0, excluster[type] + 1):
aknecht2's avatar
aknecht2 committed
734 735
                    arguments = {}
                    arguments["--db"] = "db"
aknecht2's avatar
aknecht2 committed
736
                    arguments["--copydb"] = "copydb"
aknecht2's avatar
aknecht2 committed
737 738 739 740
                    arguments["--inputs"] = " ".join(exInput[type][q].keys())
                    exInput[type][q]["db"] = {"file": type + str(q) + ".db", "transfer": False}
                    exInput[type][q]["binfile"] = {"file": map[type] + "_hist_bins.json", "transfer": False}
                    arguments["--bins"] = "binfile"
741 742
                    self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], walltime = 300)
                    self._addJob(type + "_extractBins" + str(q), "ih-extract-multi", exInput[type][q], {"copydb": {"file": type + str(q) + "_2.db", "transfer": False}}, arguments, ["bin_creation"], dax = self.exdax, walltime = 300)
aknecht2's avatar
aknecht2 committed
743
                    binDep.append(type + "_extractBins" + str(q))
aknecht2's avatar
aknecht2 committed
744
            aggIn2["db"] = {"file": "img2.db", "transfer": False}
745 746
            self._addJob("sql_aggregate2", "ih-sql-aggregate", aggIn2, {"img3.db": {"file": "img3.db", "transfer": True}}, {"--db": "db", "--output": "img3.db", "--inputs": " ".join([aggIn2[x]["file"] for x in aggIn2 if x != "db"])}, binDep, walltime = 180)
            self._addJob("sql_aggregate2", "ih-sql-aggregate", aggIn2, {"img3.db": {"file": "img3.db", "transfer": True}}, {"--db": "db", "--output": "img3.db", "--inputs": " ".join([aggIn2[x]["file"] for x in aggIn2 if x != "db"])}, binDep, dax = self.exdax, walltime = 180)
747

aknecht2's avatar
aknecht2 committed
748
        z = 2 if "histogram-bin" in self.workflow["extract"] else 1
aknecht2's avatar
aknecht2 committed
749 750
        indb = "img3.db" if "histogram-bin" in self.workflow["extract"] else "img.db"
        self._addJob("error-log", "ih-error-log", {"db": {"file": indb, "transfer": True}}, {"output": {"file": "img.log", "transfer": True}}, {"--db": "db", "--output": "output"}, ["sql_aggregate" + str(z)])
aknecht2's avatar
aknecht2 committed
751 752 753
        with open(self.basepath + "/" + loc + "/workflow.dax", "w") as wh:
            self.dax.writeXML(wh)
        return
754

aknecht2's avatar
aknecht2 committed
755 756 757 758 759 760 761 762
    def _createExtract(self, loc):
        """
            Creates the extraction step only dax!
        """
        #self._addJob("extractOnly", "ih-extract-all", self.exInput, {}, {"--db": "db", "--options": "extract.json"}, dax = self.exdax)
        with open(self.basepath + "/" + loc + "/extract.dax", "w") as wh:
            self.exdax.writeXML(wh)
        return
763

aknecht2's avatar
aknecht2 committed
764 765 766 767 768 769 770 771
    def _createReplica(self, loc):
        """
            Creates the pegasus configuration replica catalog.  input/conf.rc
        """
        with open(self.basepath + "/" + loc + "/conf.rc", "w") as wh:
            pegasusrc = textwrap.dedent("""\
                        pegasus.catalog.site = XML
                        pegasus.catalog.site.file = %s/sites.xml
772

aknecht2's avatar
aknecht2 committed
773
                        pegasus.condor.logs.symlink = false
774

aknecht2's avatar
aknecht2 committed
775
                        pegasus.transfer.links = true
776

aknecht2's avatar
aknecht2 committed
777
                        pegasus.data.configuration = %s
778

aknecht2's avatar
aknecht2 committed
779 780 781
                        pegasus.dir.storage.mapper = Replica
                        pegasus.dir.storage.mapper.replica = File
                        pegasus.dir.storage.mapper.replica.file = %s/map.rc
aknecht2's avatar
aknecht2 committed
782
                        """ % (self.basepath + "/" + loc, "nonsharedfs" if "osg" in self.config else "sharedfs", self.basepath + "/" + loc))
783 784 785 786 787 788
            if "osg" in self.config:
                pegasusrc += textwrap.dedent("""\
                    pegasus.stagein.clusters = 4
                    pegasus.stageout.clusters = 4
                    pegasus.transfer.threads = 4
                    pegasus.transfer.lite.threads = 4
789 790

                    dagman.maxjobs = 200
791
                """)
aknecht2's avatar
aknecht2 committed
792 793
            wh.write(pegasusrc)
        return
794

aknecht2's avatar
aknecht2 committed
795 796 797 798 799
    def _createSites(self, loc):
        """
            Creates the pegasus site catalog.  input/sites.xml
        """
        with open(self.basepath + "/" + loc + "/sites.xml", "w") as wh:
800
            if "osg" in self.config:
801
                userName = getpass.getuser()
802
                sites = """\
aknecht2's avatar
aknecht2 committed
803
                <sitecatalog version="4.0" xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd">
804
                    <site handle="local" arch="x86_64" os="LINUX">
aknecht2's avatar
aknecht2 committed
805 806 807 808 809 810 811 812 813 814
                        <directory type="shared-scratch" path="%s">
                            <file-server operation="all" url="file://%s" />
                        </directory>
                        <directory type="local-storage" path="%s">
                            <file-server operation="all" url="file://%s" />
                        </directory>
                        <profile namespace="pegasus" key="SSH_PRIVATE_KEY">%s</profile>
                    </site>
                    <site handle="stash" arch="x86_64" os="LINUX">
                        <directory type="shared-scratch" path="%s">
aknecht2's avatar
aknecht2 committed
815
                            <!-- <file-server operation="get" url="stash://%s"/> -->
816
                            <file-server operation="all" url="scp://%s@login02.osgconnect.net/%s"/>
aknecht2's avatar
aknecht2 committed
817
                        </directory>
818
                    </site>
819 820 821 822 823 824
                    <site handle="isi_workflow" arch="x86_64" os="LINUX">
                        <directory type="shared-scratch" path="/nfs/ccg4/scratch-purge-no-backups/workflow.isi.edu/scratch2/%s/scratch">
                            <file-server operation="get" url="http://workflow.isi.edu/scratch2/%s/scratch"/>
                            <file-server operation="put" url="scp://%s@workflow.isi.edu/nfs/ccg4/scratch-purge-no-backups/workflow.isi.edu/scratch2/%s/scratch"/>
                        </directory>
                    </site>
825
                    <site handle="condorpool" arch="x86_64" os="LINUX">
826

827 828 829


                    """ % (self.basepath + "/work/imgproc/", self.basepath + "/work/imgproc/", self.basepath + "/output/", self.basepath + "/output/", self.config["osg"]["ssh"], self.basepath + "/staging/", "/".join((self.basepath + "/staging/").split("/")[2:]), userName, self.basepath + "/staging/", userName, userName, userName, userName)
830 831 832
            else:
                sites = """\
                 <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
833

834 835 836 837 838 839 840 841
                            <site  handle="local" arch="x86_64" os="LINUX">
                                <directory type="shared-scratch" path="%s">
                                    <file-server operation="all" url="file://%s" />
                                </directory>
                                <directory type="local-storage" path="%s">
                                    <file-server operation="all" url="file://%s" />
                                </directory>
                            """ % (self.basepath + "/work/imgproc/", self.basepath + "/work/imgproc/", self.basepath + "/output/", self.basepath + "/output/")
aknecht2's avatar
aknecht2 committed
842 843 844 845 846 847 848 849
            for namespace in self.config["profile"]:
                for key in self.config["profile"][namespace]:
                    val = ":".join(self.config["profile"][namespace][key]) if "path" in key.lower() else self.config["profile"][namespace][key]
                    sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
            sites += "</site></sitecatalog>"
            sites = sites.replace("\n","")
            wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
        return
850

aknecht2's avatar
aknecht2 committed
851 852 853 854 855 856 857 858 859 860 861 862 863 864
    def _createSubmit(self, loc):
        """
            Creates the pegasus submit script.  submit.sh
        """
        with open(self.basepath + "/" + loc + "/submit.sh", "w") as wh:
            submit = textwrap.dedent("""\
                    #!/bin/bash
                    if [ "$1" = "--extract-only" ] || [ "$1" = "--extract" ] || [ "$1" = "-e" ]; then
                        DAX="%s"
                    else
                        DAX="%s"
                    fi
                    plan=`pegasus-plan \\
                    --conf "%s" \\
aknecht2's avatar
aknecht2 committed
865
                    --sites "%s" \\
aknecht2's avatar
aknecht2 committed
866 867 868 869
                    --dir "%s" \\
                    --output-site local \\
                    --dax "$DAX" \\
                    --randomdir \\
aknecht2's avatar
aknecht2 committed
870
                    """ % (self.basepath + "/" + loc + "/extract.dax", self.basepath + "/" + loc + "/workflow.dax", self.basepath + "/" + loc + "/conf.rc", "condorpool" if "osg" in self.config else "local", self.basepath + "/work/imgproc"))
aknecht2's avatar
aknecht2 committed
871 872
            if "cluster" in self.config:
                submit += """--cluster label \\\n"""
aknecht2's avatar
aknecht2 committed
873 874
            if "osg" in self.config:
                submit += """--staging-site stash \\\n"""
875 876
                submit += """--staging isi_workflow \\\n"""
                submit += """--cleanup leaf \\\n"""
aknecht2's avatar
aknecht2 committed
877 878
            submit += textwrap.dedent("""\
                    --submit`
879

aknecht2's avatar
aknecht2 committed
880 881 882 883
                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh
884

aknecht2's avatar
aknecht2 committed
885 886
                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
887
                    pegasus-remove $remove" > remove.sh
aknecht2's avatar
aknecht2 committed
888
                    chmod 744 remove.sh
889

aknecht2's avatar
aknecht2 committed
890 891
                    echo "$plan"
                    echo "Alternatively, you can use the status & remove scripts in the current directory!"
892

aknecht2's avatar
aknecht2 committed
893 894 895 896
                    """)
            wh.write(submit)
            os.chmod(self.basepath + "/" + loc + "/submit.sh", 0755)
        return
897

aknecht2's avatar
aknecht2 committed
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
    def create(self):
        """
            Creates a new pegasus submission directory based on the current timestamp,
            and populates it with all required information to submit a pegasus job.
        """
        print "Generating workflow.  Please wait."
        loc = "/input/imgproc/"
        self._createSetup()
        self._copyFiles()
        self._loadFiles(loc)
        self._loadExecutables()
        self._loadNotify()
        self._createDax(loc)
        self._createExtract(loc)
        self._createReplica(loc)
        self._createSites(loc)
        self._createSubmit(loc)
915
        return
aknecht2's avatar
aknecht2 committed
916 917 918 919 920 921


class ImageLoader:
    """
        This should handle all meta-data definitions
    """
922

aknecht2's avatar
aknecht2 committed
923 924 925 926 927 928 929 930 931 932 933 934
    def __init__(self, template, output, validOnly = False, overwrite = False):
        """
            :param template: Path to input template file, should be valid json.
            :type template: str
        """
        self.templatePath = os.path.abspath(template)
        self.err = ""
        self.output = output + "/input/"
        try:
            open(self.output + "/images.db")
        except IOError:
            self.err += "Path Error: Cannot open output db file. \n"
935
        try:
936
            self.log = open(self.output + "/crawl.log", "w")
937 938
        except IOError:
            self.err += "Path Error: Cannot open log file. \n"
aknecht2's avatar
aknecht2 committed
939 940 941 942 943 944 945
        self.validator = ih.validator.ImageLoader(self.templatePath)
        if not self.validator.isValid() or validOnly:
            self.validator.printErrors()
            sys.exit()
        self.overwrite = overwrite
        self.template = self.validator.data
        self.template["order"].insert(0, "pegasusid")
946

aknecht2's avatar
aknecht2 committed
947 948 949 950 951
        self.count = {}
        self.depth = len(self.template["path"].split("/")) - 1
        self.structure = self._dictTemplate()
        self.data = []
        return
952

aknecht2's avatar
aknecht2 committed
953 954
    def _success(self):
        if self.overwrite:
955
            print "Data crawled!"
aknecht2's avatar
aknecht2 committed
956 957 958 959 960 961
            print "Use ih-run to submit your jobs."
        else:
            print "Directory setup successful!"
            print "Define the required templates in " + self.output + "."
            print "Then use ih-run to submit your jobs."
        return
962

aknecht2's avatar
aknecht2 committed
963 964 965 966 967 968 969 970 971 972 973 974 975 976
    def _dictTemplate(self):
        """
            Creates the first of two intermediary dictionaries for crawling.  This relates
            identifiers in the path, to a specified depth, as well as operators to split
            the value at that depth.
        """
        m = [x for x in re.findall(r"%(\w+)%", self.template["path"]) if x != "null"]
        sp = self.template["path"].split("/")
        d = {}
        for key in m:
            d[key] = {}
            d[key]["depth"] = [n for n,x in enumerate(sp) if key in x][0]
            d[key]["split"] = []
            val = sp[d[key]["depth"]]
977
            for operator in [" ", "_", "-", "."]:
aknecht2's avatar
aknecht2 committed
978 979 980 981 982 983
                if operator in val:
                    b = val.split(operator)
                    x = [n for n,x in enumerate(b) if key in x][0]
                    d[key]["split"].append({"operator": operator, "index": x})
                    val = b[x]
        return d
984

aknecht2's avatar
aknecht2 committed
985 986 987 988 989 990 991 992 993 994 995 996 997
    def _loadStructure(self, splitPath):
        """
            Creates the second of two intermediary dictionaries for crawling.
            This utilizes the template created in _dictTemplate, and creates
            a dictionary with actual values instead of structure.
        """
        d ={}
        for key in self.structure:
            val = splitPath[self.structure[key]["depth"]]
            for operator in self.structure[key]["split"]:
                val = val.split(operator["operator"])[operator["index"]]
            d[key] = val
        return d
998

aknecht2's avatar
aknecht2 committed
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
    def _loadDataFile(self, dict, structure, key):
        """
            Loads data from a data file.  If the specified key isn't in the file,
            write 'UNKNOWN' instead.
        """
        keyVal = self._convertReferences(structure, self.template["data"][key]["key"])
        if keyVal in self.filedata[self.template["data"][key]["value"]]:
            dict[key] = self.filedata[self.template["data"][key]["value"]][keyVal]
        else:
            dict[key] = "UNKNOWN"
        return
1010

aknecht2's avatar
aknecht2 committed
1011 1012 1013 1014 1015
    def _loadDataValue(self, dict, structure, key):
        """
            Loads a data value.
        """
        val = self._convertReferences(structure, self.template["data"][key]["value"])
1016 1017 1018 1019 1020 1021
        if "case" in self.template["data"][key]:
            if self.template["data"][key]["case"] == "lower":
                val = val.lower()
            elif self.template["data"][key]["case"] == "upper":
                val = val.upper()

aknecht2's avatar
aknecht2 committed
1022 1023 1024 1025
        if "translate" in self.template["data"][key]:
            if val in self.template["translations"][key]:
                val = self.template["translations"][key][val]
        dict[key] = val
1026

aknecht2's avatar
aknecht2 committed
1027
        return
1028

aknecht2's avatar
aknecht2 committed
1029 1030 1031 1032 1033 1034 1035 1036 1037
    def _loadDataDate(self, dict, structure, key):
        """
            Loads a date.
        """
        val = self._convertReferences(structure, self.template["data"][key]["value"])
        format = "".join([x.replace(x, "%" + x) if x in conf.dateFormat else x for x in self.template["data"][key]["format"]])
        val = datetime.datetime.strptime(val, format).strftime("%Y-%m-%d")
        dict[key] = val
        return
1038

aknecht2's avatar
aknecht2 committed
1039 1040 1041 1042 1043 1044 1045 1046 1047
    def _convertReferences(self, structure, val):
        """
            Replaces all references ('%%') to actual values.
        """
        idList = [x for x in re.findall(r"%(\w+)%", val) if x != "null"]
        for identifier in idList:
            if identifier in structure:
                val = val.replace("%" + identifier + "%", structure[identifier])
        return val
1048

aknecht2's avatar
aknecht2 committed
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
    def _loadData(self, path):
        """
            Loads all data for a particular path.
        """
        temp = {}
        final = {}
        splitPath = path.split("/")
        structure = self._loadStructure(splitPath)
        for key in self.template["data"]:
            d = {
                "value": self._loadDataValue,
                "file": self._loadDataFile,
                "date": self._loadDataDate
            }[self.template["data"][key]["type"]](final, structure, key)
        final["path"] = path
        if final["imtype"] not in self.count:
            self.count[final["imtype"]] = 1
        else:
            self.count[final["imtype"]] += 1
        final["pegasusid"] = final["imtype"] + str(self.count[final["imtype"]])
        return final
1070

aknecht2's avatar
aknecht2 committed
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
    def _loadFiles(self):
        """
            Reads all data from all specified files.
        """
        self.filedata = {}
        for key in self.template["data"]:
            if self.template["data"][key]["type"] == "file":
                if self.template["data"][key]["value"] not in self.filedata:
                    self.filedata[self.template["data"][key]["value"]] = {}
                    with open(self.template["data"][key]["value"], "r") as rh:
                        for line in rh.readlines():
                            info = line.strip().split(",")
                            self.filedata[self.template["data"][key]["value"]][info[self.template["data"][key]["keyColumn"]]] = info[self.template["data"][key]["valueColumn"]]
        return
1085

aknecht2's avatar
aknecht2 committed
1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
    def crawl(self):
        """
            Recursively walks through directories in base, and loads
            the appropriate data.
        """
        self._loadFiles()
        for root, dirs, files in os.walk(self.template["base"]):
            arr = root.split("/")
            depth = len(arr)
            if (depth == self.depth):
1096 1097 1098 1099 1100 1101 1102
                for f in files:
                    if f[-len(self.template["filetype"]):] == self.template["filetype"] and f[0] != ".":
                        try:
                            d = self._loadData(root + "/" + f)
                            self.data.append(d)
                        except Exception as e:
                            self.log.write("Could not load file from path: '%s/%s'\n" % (root, f))
aknecht2's avatar
aknecht2 committed
1103
        return
1104

aknecht2's avatar
aknecht2 committed
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130
    def write(self):
        """
            Writes the data loaded from crawl into csv format based on 'order'
        """
        if self.data:
            conn = sqlite3.connect(self.output + "/images.db")
            tablestr = "(pegasusid PRIMARY KEY"
            for x in self.template["order"]:
                if x != "pegasusid":
                    tablestr += "," + str(x)
            tablestr += ")"
            if self.overwrite:
                conn.execute("DROP TABLE IF EXISTS images")
                conn.commit()
            conn.execute("CREATE TABLE images " + tablestr)
            writedata = []
            for row in self.data:
                writedata.append(tuple([str(row[x]) for x in self.template["order"]]))
            query = "insert into images " + str(tuple([str(x) for x in self.template["order"]])) + " values (" + ("?," * len(self.template["order"]))[:-1] + ")"
            conn.executemany(query, writedata)
            conn.commit()
            if not self.overwrite:
                shutil.copyfile(self.templatePath, self.output + "/crawl.json")
        else:
            print "Call crawl first!"
        return
1131

aknecht2's avatar
aknecht2 committed
1132
class DirectorySetup:
1133

aknecht2's avatar
aknecht2 committed
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
    def __init__(self, jobhome):
        """
            :param jobhome: The base directory to setup a job.
            :type jobhome: str
        """
        if not os.path.isdir(os.path.dirname(os.path.dirname(os.path.abspath(jobhome) + "/"))):
            print "Can't create job folder, path doesn't exist!"
            sys.exit()
        self.jobhome = jobhome
        return
1144

aknecht2's avatar
aknecht2 committed
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157
    def _makeDir(self, dirpath):
        """
            Makes a directory if it doesn't exist.  Catches and
            prints out common errors.
        """
        try:
            os.makedirs(dirpath)
        except OSError as e:
            raise Exception({
                errno.EEXIST: "Directory " + dirpath + " already exists!",
                errno.EACCES: "You don't have permission to create directory "  + dirpath + ".",
            }.get(e.errno, "Error for directory " + dirpath + " error:" + str(e)))
        return
1158

aknecht2's avatar
aknecht2 committed
1159 1160 1161 1162 1163 1164 1165 1166 1167
    def _makeFile(self, filepath):
        """
            Opens a file to create it, uses append mode
            so it doesn't overwrite any existing files.
        """
        try:
            open(filepath, "a")
        except OSError as e:
            raise Exception({
1168

aknecht2's avatar
aknecht2 committed
1169 1170
            }.get(e, "Unsepcified error for file " + filepath))
        return
1171

aknecht2's avatar
aknecht2 committed
1172 1173 1174 1175 1176 1177 1178 1179 1180 1181
    def setup(self):
        """
            Creates the directory structure needed to submit jobs,
            and creates empty template files needed for job submission.
        """
        self._makeDir(self.jobhome)
        self._makeDir(self.jobhome + "/input/")
        for fname in conf.jobFiles:
            self._makeFile(self.jobhome + "/input/" + fname)
        return