workflow.py 18.5 KB
Newer Older
1
2
3
4
5
6
7
import os
import argparse
import sys
import json
import datetime
import textwrap
import xml.dom.minidom
8
9
10
import yaml
import traceback
import chipathlon
11
import chipathlon.workflow_job
12
13
import chipathlon.db
import chipathlon.workflow_module
14
import random
15
from pprint import pprint
16
17
from Pegasus.DAX3 import *

18

19
20
class Workflow(object):

aknecht2's avatar
aknecht2 committed
21
    def __init__(self, jobhome, run_file, param_file, config_file, host, username, password):
22
23
24
        # Initialize db connection
        self.mdb = chipathlon.db.MongoDB(host, username, password)
        # Jobname info & err
25
26
        self.jobhome = os.path.abspath(jobhome)
        self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
27
28
29
30
        self.err = ""
        # Input file info
        self.run_file = run_file
        self.param_file = param_file
aknecht2's avatar
aknecht2 committed
31
32
33
        self.config_file = config_file
        with open(self.config_file, "r") as rh:
            self.config = yaml.load(rh)
34
        # Dax specific info
35
36
        self.dax = ADAG(self.jobname)
        self.executables = {}
37
        self.files = {}
38
39
        self.jobs = {}
        self.deps = {}
40
        self.workflow_jobs = {}
41
42
43
        self.modules = {}
        return

44
45
    def info(self):
        pprint(self.run_data)
46
        pprint(self.workflow_jobs)
47
48
49
50
        pprint(self.modules)
        pprint(self.executables)
        return

51
    def generate(self):
aknecht2's avatar
aknecht2 committed
52
        try:
53
54
55
            self._load_executables()
            self._load_workflow_jobs()
            self._load_modules()
56
57
58
            self._load_runs()
            # All required information is loaded, start queuing jobs
            self._add_download()
59
60
61
62
63
64
65
66
            self._add_align()
            # Create pegasus important stuff
            self._create_setup()
            self._add_notify()
            self._create_replica()
            self._create_sites()
            self._create_submit()
            self._write()
aknecht2's avatar
aknecht2 committed
67
68
        except SystemExit as se:
            print self.err
69
        if self.err:
70
71
72
73
            print self.err
            sys.exit(1)
        return

aknecht2's avatar
aknecht2 committed
74
75
76
77
78
79
80
    def _add_file(self, name, path, site):
        f = File(name)
        f.addPFN(PFN(path, site))
        self.files[name] = f
        self.dax.addFile(f)
        return

aknecht2's avatar
aknecht2 committed
81
82
83
84
85
    def _raise(self):
        if self.err:
            raise SystemExit(self.err)
        return

86
    def _load_executables(self, os_type="linux", arch="x86_64"):
87
88
89
90
        # Load wrapper scripts for commands that need to be loaded from module
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)):
            for f in files:
                ex_name = f.split("_")[0]
91
                self.executables[ex_name] = Executable(name=ex_name, os=os_type, arch=arch)
92
                self.executables[ex_name].addPFN(PFN("file://%s/%s" % (root, f), "local"))
93
94
95
                self.dax.addExecutable(self.executables[ex_name])
            break
        # Load actual scripts
96
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
97
            for f in files:
98
                self.executables[f] = Executable(name=f, os=os_type, arch=arch)
99
                self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "local"))
100
                self.dax.addExecutable(self.executables[f])
101
            break
aknecht2's avatar
aknecht2 committed
102
        self._raise()
103
104
        return

105
106
107
108
    def _load_workflow_jobs(self):
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
            for f in files:
                ex_name = f.split("_")[0]
109
110
111
112
113
114
                if ex_name in self.executables:
                    yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
                    if not yj.err:
                        self.workflow_jobs[yj.jobname] = yj
                    else:
                        self.err += yj.err
115
                else:
116
                    print "[INFO] Skipping param file %s, no corresponding executable found." % (f,)
117
            break
aknecht2's avatar
aknecht2 committed
118
        self._raise()
119
120
        return

121
122
    def _load_modules(self):
        for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
123
            for f in files:
124
                mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs)
125
126
127
128
                if not mod.err:
                    self.modules[mod.name] = mod
                else:
                    self.err += mod.err
129
            break
aknecht2's avatar
aknecht2 committed
130
        self._raise()
131
132
        return

133
    def _load_runs(self):
aknecht2's avatar
aknecht2 committed
134
135
        with open(self.run_file, "r") as rh:
            try:
136
                self.run_data = yaml.load(rh)
aknecht2's avatar
aknecht2 committed
137
138
139
            except yaml.YAMLError as exc:
                self.err += "Error parsing run template file '%s'. %s.\n" % (self.run_file, exc)
        if not self.err:
aknecht2's avatar
aknecht2 committed
140
            checked = []
141
            for run in self.run_data["runs"]:
142
143
144
145
146
147
148
149
150
151
152
                for module_name in self.modules:
                    if isinstance(self.modules[module_name], dict):
                        keys = self.modules[module_name].keys()
                        if len(keys) > 1:
                            if module_name in run:
                                if run[module_name] not in keys:
                                    self.err += "Error parsing run template file '%s'. Module '%s' has invalid entry '%s', should be one of %s.\n" % (self.run_file, module_name, run[module_name], keys)
                            else:
                                self.err += "Error parsing run template file '%s'.  Module '%s' requires an entry, should be one of %s.\n" % (self.run_file, module_name, keys)
                if "experiment" in run:
                    valid, msg, data = self.mdb.get_samples(run["experiment"])
153
154
155
156
                    if valid:
                        run["data"] = data
                    else:
                        self.err += msg
157
                    # This will be changed to _check_genomes(run) eventually.
158
                    # self._check_genomes(run)
159
                    # UNCOMMENT THIS:
aknecht2's avatar
aknecht2 committed
160
161
                    if run["align"] not in checked:
                        self._check_grch38(run)
162
                        checked.append(run["align"])
163
164
                else:
                    self.err += "Error parsing run template file '%s'.  Required key 'experiment' not defined.\n" % (self.run_file,)
aknecht2's avatar
aknecht2 committed
165
        self._raise()
166
167
        return

168
169
    def _check_grch38(self, run):
        if run["align"] in self.run_data["genomes"]:
170
            assembly = "grch38p6"
171
172
173
174
            gen_prefix = "genome_%s_%s" % (run["align"], assembly)
            if assembly in self.run_data["genomes"][run["align"]]:
                base_file = self.run_data["genomes"][run["align"]][assembly]
                if os.path.isfile(base_file):
aknecht2's avatar
aknecht2 committed
175
176
                    (base_file_prefix, base_file_ext) = os.path.splitext(os.path.basename(base_file))
                    base_file_ext = base_file_ext[1:]
177
                    base_file_name = gen_prefix + "." + base_file_ext
aknecht2's avatar
aknecht2 committed
178
                    self._add_file(base_file_name, base_file, "local")
179
180
                    genome_files = self.run_data["genomes"][run["align"]][assembly + "_files"] = {}
                    genome_files["base_file"] = base_file_name
181
                    genome_files["additional_files"] = {}
182
                    if base_file_ext in chipathlon.conf.genomes[run["align"]]["base_file"]:
aknecht2's avatar
aknecht2 committed
183
                        prefix = os.path.splitext(base_file)[0]
184
185
186
187
188
                        missing = []
                        for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
                            if not os.path.isfile(prefix + "." + ext) and not os.path.isfile(base_file + "." + ext):
                                missing.append(ext)
                            else:
189
190
191
192
                                no_ext = os.path.isfile(prefix + "." + ext)
                                name = "%s.%s%s" % (gen_prefix, "" if no_ext else base_file_ext + ".", ext)
                                path = "%s.%s" % (prefix if no_ext else base_file, ext)
                                self._add_file(name, path, "local")
193
                                genome_files["additional_files"][ext] = name
194
195
196
197
198
199
200
201
202
203
                        if len(missing) > 0:
                            self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
                    else:
                        self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension.  Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
                else:
                    self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
            else:
                self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
        else:
            self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
aknecht2's avatar
aknecht2 committed
204
        self._raise()
205
206
        return

207
    def _check_genomes(self, run):
208
209
210
        # NOTE: THIS function should NOT be called.
        # We will only be using GRCH38.p6 genome.
        # Use the _check_grch38 function instead
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
        valid, msg, assembly = self.mdb.get_assembly(run["experiment"])
        if valid:
            if run["align"] in self.run_data["genomes"]:
                if assembly in self.run_data["genomes"][run["align"]]:
                    base_file = self.run_data["genomes"][run["align"]][assembly]
                    if os.path.isfile(base_file):
                        if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]:
                            prefix = base_file.split(".", 1)[0]
                            missing = []
                            for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
                                if not os.path.isfile(prefix + "." + ext):
                                    missing.append(ext)
                            if len(missing) > 0:
                                self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
                        else:
                            self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension.  Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
                    else:
                        self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
                else:
                    self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
            else:
                self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
        else:
            self.err += "DB error: %s.\n" % (msg, )
        return

237
    def _add_download(self):
238
239
240
241
        # Remember, experiment always paired with randomly selected control,
        # For alignment doesn't really matter, but we set up the groundwork
        # for the future.  We randomly select unless there is an
        # equal number of control and experiment files
242
        for run in self.run_data["runs"]:
243
            run["input_sets"] = []
244
245
246
247
248
249
250
251
            if (len(run["data"]["control"]) != len(run["data"]["experiment"])):
                control_data = [random.choice(run["data"]["control"]) for i in range(0, len(run["data"]["experiment"]))]
            else:
                control_data = run["data"]["control"]
            for pair in zip(run["data"]["experiment"], control_data):
                exp_name = "%s_%s.fastq.gz" % (run["experiment"], pair[0]["accession"])
                control_name = "%s_%s.fastq.gz" % (run["experiment"], pair[1]["accession"])
                if exp_name not in self.files and control_name not in self.files:
252
                    run["input_sets"].append(pair)
253
                    for f in pair:
254
255
                        # For each fastq, have Pegasus do the http fetching.
                        # Files are named EXPID_ACCESSION.fastq.gz
256
                        output_name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
257
                        self._add_file(output_name, "http://" + f["url"], "dummylocal")
258
259
        return

260
261
262
263
264
265
266
    def _add_align(self):
        markers = {}
        markers["read_end"] = "single"
        for run in self.run_data["runs"]:
            input_files = {}
            additional_files = {}
            markers["tool"] = run["align"]
267
            gen_prefix = "genome_%s_grch38p6" % (run["align"],)
268
            input_files["ref_genome"] = self.run_data["genomes"][run["align"]]["grch38p6_files"]["base_file"]
269
            for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
aknecht2's avatar
aknecht2 committed
270
                additional_files["ref_genome." + ext] = self.run_data["genomes"][run["align"]]["grch38p6_files"]["additional_files"][ext]
271
            for pair in run["input_sets"]:
272
273
274
275
276
                for f in pair:
                    prefix = "%s_%s" % (run["experiment"], f["accession"])
                    input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
                    self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
        return
277

278
279
280
281
    def _create_setup(self):
        """
            Creates the base structure for job submission.  Everything is contained
            within a folder based on the current timestamp.
282
        """
283
284
285
286
287
288
        self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
        if not os.path.exists(self.basepath):
            os.makedirs(self.basepath)
        for folder in ["input", "output"]:
            if not os.path.exists(os.path.join(self.basepath, folder)):
                os.makedirs(os.path.join(self.basepath, folder))
289
290
        return

291
292
293
294
    def _add_notify(self):
        # NEED TO DETERMINE HOW TO READ IN THESE VARS
        notify_path = os.path.join(self.basepath, "input/notify.sh")
        with open(notify_path, "w") as wh:
295
296
297
            notify = textwrap.dedent("""\
                    #!/bin/bash
                    %s/notification/email -t %s --report=pegasus-analyzer
aknecht2's avatar
aknecht2 committed
298
            """ % (self.config["notify"]["pegasus_home"], self.config["notify"]["email"]))
299
            wh.write(notify)
300
301
            os.chmod(notify_path, 0755)
        self.dax.invoke(When.AT_END, notify_path)
302
303
        return

304
305
    def _create_replica(self):
        with open(os.path.join(self.basepath, "input/conf.rc"), "w") as wh:
306
            pegasusrc = textwrap.dedent("""\
307
308
                pegasus.catalog.site = XML
                pegasus.catalog.site.file = %s/sites.xml
309

310
                pegasus.condor.logs.symlink = false
311
                pegasus.transfer.links=true
312
313
                pegasus.data.configuration = sharedfs
                """ % (os.path.join(self.basepath, "input"),))
314
315
316
            wh.write(pegasusrc)
        return

317
318
    def _create_sites(self):
        with open(os.path.join(self.basepath, "input/sites.xml"), "w") as wh:
319
            sites = """\
320
321
322
323
324
325
326
                <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
                   <site handle="local" arch="x86_64" os="LINUX">
                       <directory type="shared-scratch" path="%s">
                           <file-server operation="all" url="file://%s" />
                       </directory>
                       <directory type="local-storage" path="%s">
                           <file-server operation="all" url="file://%s" />
327
                       </directory>""" % (
328
329
330
331
332
                    os.path.join(self.basepath, "work"),
                    os.path.join(self.basepath, "work"),
                    os.path.join(self.basepath, "output"),
                    os.path.join(self.basepath, "output")
                    )
333
334
335
336
337
            # NEED TO DETERMINE HOW TO READ IN THIS INFO
            for namespace in self.config["profile"]:
                for key in self.config["profile"][namespace]:
                    sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, self.config["profile"][namespace][key])

338
            sites += """</site>
339
                   <site handle="dummylocal" arch="x86_64" os="LINUX">
340
341
342
343
344
345
346
                       <directory type="shared-scratch" path="%s">
                           <file-server operation="all" url="file://%s" />
                       </directory>
                """ % (
                        os.path.join(self.basepath, "work"),
                        os.path.join(self.basepath, "work")
                    )
347
            sites += "</site></sitecatalog>"
348
            sites = sites.replace("\n", "")
349
350
351
            wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
        return

352
    def _create_submit(self):
353
354
355
        """
            Creates the pegasus submit script.  submit.sh
        """
356
        with open(os.path.join(self.basepath, "input/submit.sh"), "w") as wh:
357
358
359
360
361
362
363
364
365
            submit = textwrap.dedent("""\
                    #!/bin/bash
                    plan=`pegasus-plan \\
                    --conf "%s" \\
                    --sites "%s" \\
                    --dir "%s" \\
                    --output-site local \\
                    --dax "%s" \\
                    --randomdir \\
366
                    """ % (self.basepath + "/input/conf.rc", "local", self.basepath + "/work/", self.basepath + "/input/chipathlon.dax"))
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
            submit += textwrap.dedent("""\
                    --submit`

                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh

                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
                    pegasus-remove $remove" > remove.sh
                    chmod 744 remove.sh

                    echo "$plan"

                    """)
            wh.write(submit)
            os.chmod(self.basepath + "/input/submit.sh", 0755)
        return

387
    def _write(self):
388
        with open(self.basepath + "/input/chipathlon.dax", "w") as wh:
389
390
            self.dax.writeXML(wh)
        return