workflow.py 10.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import os
import argparse
import sys
import json
import datetime
import textwrap
import xml.dom.minidom
from Pegasus.DAX3 import *

class Workflow(object):

    def __init__(self, jobhome):
        """
            :param jobhome: The base directory for job submission.
            :type jobhome: str

            Creates a Workflow class based on the input directory.  Only loads and
            validates the config file by default.
        """
        self.err = ""
        self.jobhome = os.path.abspath(jobhome)
        self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
        self.dax = ADAG(self.jobname)
        self.executables = {}
        self.files = {"input": {}, "output": {}}
        self.jobs = {}
        self.deps = {}
        return

    def _addFile(self, name, inout, site = "condorpool", path = None, dax = None):
        """
            :param name: Name of the file to add.
            :type name: str
            :param path: Path to the file.  Only required for input files.
            :type path: str
            :param dax: Any dax can be passed in through this variable, otherwise use the internal dax.
            :type dax: ADAG

            Adds the inputted file to the dax, as well as the internal variable self.files
        """
        dax = dax if dax else self.dax
        self.files[dax][inout][name] = {"file": File(name), "path": ""}
        if inout == "input":
            self.files[dax][inout][name]["path"] = path if path else name
            self.files[dax][inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), site))
            dax.addFile(self.files[dax][inout][name]["file"])
        elif inout == "output":
            self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
        return

    def _addJob(self, jobname, executable, inputs, outputs, arguments, dependencies = None, dax = None, label = None, walltime = None):
        """
            :param jobname: Name of the job to add.
            :type jobname: str
            :param executable: Executable to run.
            :type executable: str
            :param inputs: Information for all necessary inputs.  See formatting information below.
            :type inputs: dict
            :param outputs: Information for all necessary outputs.  See formatting information below.
            :type outputs: dict
            :param arguments: Dictionary of arguments to add.
            :type arguments: dict
            :param dependencies: List of jobnames to be dependent upon.
            :type dependencies: list

            Input / Ouptut dicts are formatted the same way:
            {
                FNAME1: {
                    "file": PATH,
                    "transfer": True / False
                },
                FNAME2: {
                    "file": PATH,
                    "transfer": True / False
                },
                ...
            }
            The keys of the dictionary represent the internal naming of an
            individual file within the dax.  The path definition should
            either be the full path to the input file, or the full name
            of the output file (output paths are NOT created at runtime).
            The transfer argument simply represents if you want the file to
            be transferred to the output folder.  As such, it is not required
            for input files.

        """
        dax = dax if dax else self.dax
        if not self._isJob(jobname, dax):
            self.jobs[dax][jobname] = Job(executable)
            dax.addJob(self.jobs[dax][jobname])
            for key in inputs:
                self.jobs[dax][jobname].uses(inputs[key]["file"], link = Link.INPUT)
            for key in outputs:
                self._addFile(os.path.basename(key), "output")
                self.jobs[dax][jobname].uses(outputs[key]["file"], link = Link.OUTPUT, transfer = outputs[key]["transfer"])
            arglist = []
            for arg in arguments:
                arglist.append(arg)
                if str(arguments[arg]) in inputs:
                    arglist.append(inputs[str(arguments[arg])]["file"])
                elif str(arguments[arg]) in outputs:
                    arglist.append(outputs[str(arguments[arg])]["file"])
                else:
                    arglist.append(str(arguments[arg]))
            self.jobs[dax][jobname].addArguments(*arglist)
            if dependencies:
                for depend in dependencies:
                    dax.depends(child = self.jobs[dax][jobname], parent = self.jobs[dax][depend])
            if label:
                self.jobs[dax][jobname].profile(Namespace.PEGASUS, "label", label)
            if walltime:
                self.jobs[dax][jobname].profile(Namespace.GLOBUS, "maxwalltime", walltime)
        return

    def _loadExecutables(self, os_type = "linux", arch = "x86_64"):
        """
            Loads all executables from the scripts directory into the dax
        """
        scriptFolder = os.path.dirname(os.path.realpath(__file__)) + "/scripts/"
        if os.path.isdir(scriptFolder):
            for root, dirs, files in os.walk(scriptFolder):
                for ex in files:
                    self.executables[ex] = Executable(name = ex, os = os_type, arch = arch)
                    self.executables[ex].addPFN(PFN("file://" + scriptFolder + "/" + ex, "condorpool"))
                    self.dax.addExecutable(self.executables[ex])
                break
		self.executables["bwa"] = Executable(name = "bwa", os = os_type, arch = arch)
		self.executables[ex].addPFN(PFN("file:///util/opt/bwa/0.7/gcc/4.4/bin/bwa", "condorpool"))
		self.dax.addExecutable(self.executables["bwa"])
        return

    def _loadNotify(self, config):
        self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh")
        with open(self.basepath + "/input/notify.sh", "w") as wh:
            notify = textwrap.dedent("""\
                    #!/bin/bash
                    %s/notification/email -t %s --report=pegasus-analyzer
            """ % (config["notify"]["pegasus_home"], config["notify"]["email"]))
            wh.write(notify)
            os.chmod(self.basepath + "/input/notify.sh", 0755)
        return

    def _createReplica(self):
        """
            Creates the pegasus configuration replica catalog.  input/conf.rc
        """
        with open(self.basepath + "/input/conf.rc", "w") as wh:
            pegasusrc = textwrap.dedent("""\
                        pegasus.catalog.site = XML
                        pegasus.catalog.site.file = %s/sites.xml

                        pegasus.condor.logs.symlink = false

                        pegasus.data.configuration = sharedfs

                        pegasus.dir.storage.mapper = Replica
                        pegasus.dir.storage.mapper.replica = File
                        pegasus.dir.storage.mapper.replica.file = %s/map.rc
                        """ % (self.basepath + "/input", self.basepath + "/input"))

            wh.write(pegasusrc)
        return


    def _createSites(self, config):
        """
            Creates the pegasus site catalog.  input/sites.xml
        """
        with open(self.basepath + "/input/sites.xml", "w") as wh:
            sites = """\
                 <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
                            <site handle="local" arch="x86_64" os="LINUX">
                                <directory type="shared-scratch" path="%s">
                                    <file-server operation="all" url="file://%s" />
                                </directory>
                                <directory type="local-storage" path="%s">
                                    <file-server operation="all" url="file://%s" />
                                </directory>
                            </site>
                            <site handle="condorpool" arch="x86_64" os="LINUX">
                                <directory type="shared-scratch" path="%s">
                                    <file-server operation="all" url="file://%s" />
                                </directory>
            """ % (self.basepath + "/work/", self.basepath + "/work/", self.basepath + "/output/", self.basepath + "/output/", self.basepath + "/work/", self.basepath + "/work/")
            for namespace in config["profile"]:
                for key in config["profile"][namespace]:
                    val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key]
                    sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
            sites += "</site></sitecatalog>"
            sites = sites.replace("\n","")
            wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
        return

    def _createSubmit(self):
        """
            Creates the pegasus submit script.  submit.sh
        """
        with open(self.basepath + "/input/submit.sh", "w") as wh:
            submit = textwrap.dedent("""\
                    #!/bin/bash
                    plan=`pegasus-plan \\
                    --conf "%s" \\
                    --sites "%s" \\
                    --dir "%s" \\
                    --output-site local \\
                    --dax "%s" \\
                    --randomdir \\
                    """ % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/encode.dax"))
            submit += textwrap.dedent("""\
                    --submit`

                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh

                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
                    pegasus-remove $remove" > remove.sh
                    chmod 744 remove.sh

                    echo "$plan"
                    echo "Alternatively, you can use the status & remove scripts in the current directory!"

                    """)
            wh.write(submit)
            os.chmod(self.basepath + "/input/submit.sh", 0755)
        return


    def write(self):
        with open(self.basepath + "/input/encode.dax", "w") as wh:
            self.dax.writeXML(wh)
        return