workflow.py 18.4 KB
Newer Older
1
2
3
4
5
6
7
import os
import argparse
import sys
import json
import datetime
import textwrap
import xml.dom.minidom
8
9
10
import yaml
import traceback
import chipathlon
11
import chipathlon.workflow_job
12
13
import chipathlon.db
import chipathlon.workflow_module
14
import random
15
from pprint import pprint
16
17
18
19
from Pegasus.DAX3 import *

class Workflow(object):

20
21
22
23
    def __init__(self, jobhome, run_file, param_file, host, username, password):
        # Initialize db connection
        self.mdb = chipathlon.db.MongoDB(host, username, password)
        # Jobname info & err
24
25
        self.jobhome = os.path.abspath(jobhome)
        self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/"))
26
27
28
29
30
        self.err = ""
        # Input file info
        self.run_file = run_file
        self.param_file = param_file
        # Dax specific info
31
32
        self.dax = ADAG(self.jobname)
        self.executables = {}
33
        self.files = {}
34
35
        self.jobs = {}
        self.deps = {}
36
        self.workflow_jobs = {}
37
38
39
        self.modules = {}
        return

40
41
    def info(self):
        pprint(self.run_data)
42
        pprint(self.workflow_jobs)
43
44
45
46
        pprint(self.modules)
        pprint(self.executables)
        return

47
    def generate(self):
aknecht2's avatar
aknecht2 committed
48
        try:
49
50
51
            self._load_executables()
            self._load_workflow_jobs()
            self._load_modules()
52
53
54
            self._load_runs()
            # All required information is loaded, start queuing jobs
            self._add_download()
55
56
57
58
59
60
61
62
            self._add_align()
            # Create pegasus important stuff
            self._create_setup()
            self._add_notify()
            self._create_replica()
            self._create_sites()
            self._create_submit()
            self._write()
aknecht2's avatar
aknecht2 committed
63
64
        except SystemExit as se:
            print self.err
65
        if self.err:
66
67
68
69
            print self.err
            sys.exit(1)
        return

aknecht2's avatar
aknecht2 committed
70
71
72
73
74
    def _raise(self):
        if self.err:
            raise SystemExit(self.err)
        return

75
    def _load_executables(self, os_type="linux", arch="x86_64"):
76
77
78
79
        # Load wrapper scripts for commands that need to be loaded from module
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)):
            for f in files:
                ex_name = f.split("_")[0]
80
                self.executables[ex_name] = Executable(name=ex_name, os=os_type, arch=arch)
81
82
83
84
                self.executables[ex_name].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
                self.dax.addExecutable(self.executables[ex_name])
            break
        # Load actual scripts
85
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
86
            for f in files:
87
                self.executables[f] = Executable(name=f, os=os_type, arch=arch)
88
89
                self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
                self.dax.addExecutable(self.executables[f])
90
            break
aknecht2's avatar
aknecht2 committed
91
        self._raise()
92
93
        return

94
95
96
97
    def _load_workflow_jobs(self):
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
            for f in files:
                ex_name = f.split("_")[0]
98
99
100
101
102
103
                if ex_name in self.executables:
                    yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
                    if not yj.err:
                        self.workflow_jobs[yj.jobname] = yj
                    else:
                        self.err += yj.err
104
                else:
105
                    print "[INFO] Skipping param file %s, no corresponding executable found." % (f,)
106
            break
aknecht2's avatar
aknecht2 committed
107
        self._raise()
108
109
        return

110
111
    def _load_modules(self):
        for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
112
            for f in files:
113
                mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs)
114
115
116
117
                if not mod.err:
                    self.modules[mod.name] = mod
                else:
                    self.err += mod.err
118
            break
aknecht2's avatar
aknecht2 committed
119
        self._raise()
120
121
        return

122
    def _load_runs(self):
aknecht2's avatar
aknecht2 committed
123
124
        with open(self.run_file, "r") as rh:
            try:
125
                self.run_data = yaml.load(rh)
aknecht2's avatar
aknecht2 committed
126
127
128
            except yaml.YAMLError as exc:
                self.err += "Error parsing run template file '%s'. %s.\n" % (self.run_file, exc)
        if not self.err:
aknecht2's avatar
aknecht2 committed
129
            checked = []
130
            for run in self.run_data["runs"]:
131
132
133
134
135
136
137
138
139
140
141
                for module_name in self.modules:
                    if isinstance(self.modules[module_name], dict):
                        keys = self.modules[module_name].keys()
                        if len(keys) > 1:
                            if module_name in run:
                                if run[module_name] not in keys:
                                    self.err += "Error parsing run template file '%s'. Module '%s' has invalid entry '%s', should be one of %s.\n" % (self.run_file, module_name, run[module_name], keys)
                            else:
                                self.err += "Error parsing run template file '%s'.  Module '%s' requires an entry, should be one of %s.\n" % (self.run_file, module_name, keys)
                if "experiment" in run:
                    valid, msg, data = self.mdb.get_samples(run["experiment"])
142
143
144
145
                    if valid:
                        run["data"] = data
                    else:
                        self.err += msg
146
                    # This will be changed to _check_genomes(run) eventually.
147
                    # self._check_genomes(run)
148
                    # UNCOMMENT THIS:
aknecht2's avatar
aknecht2 committed
149
150
151
                    if run["align"] not in checked:
                        self._check_grch38(run)
                        checked += run["align"]
152
153
                else:
                    self.err += "Error parsing run template file '%s'.  Required key 'experiment' not defined.\n" % (self.run_file,)
aknecht2's avatar
aknecht2 committed
154
        self._raise()
155
156
        return

157
158
159
160
161
162
163
    def _check_grch38(self, run):
        if run["align"] in self.run_data["genomes"]:
            assembly = "grch38.p6"
            gen_prefix = "genome_%s_%s" % (run["align"], assembly)
            if assembly in self.run_data["genomes"][run["align"]]:
                base_file = self.run_data["genomes"][run["align"]][assembly]
                if os.path.isfile(base_file):
aknecht2's avatar
aknecht2 committed
164
                    base_file_name = gen_prefix + base_file.split(".", 1)[1]
165
166
167
168
169
170
171
172
173
174
                    f = File(base_file_name)
                    f.addPFN(PFN(base_file, "local"))
                    self.files[base_file_name] = f
                    if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]:
                        prefix = base_file.split(".", 1)[0]
                        missing = []
                        for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
                            if not os.path.isfile(prefix + "." + ext) and not os.path.isfile(base_file + "." + ext):
                                missing.append(ext)
                            else:
175
                                fnew = File(gen_prefix + "." + ext)
176
                                if os.path.isfile(prefix + "." + ext):
177
                                    fnew.addPFN(prefix + "." + ext)
178
                                else:
179
180
181
                                    fnew.addPFN(base_file + "." + ext)
                                self.files[gen_prefix + "." + ext] = fnew
                                self.dax.addFile(fnew)
182
183
184
185
186
187
188
189
190
191
                        if len(missing) > 0:
                            self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
                    else:
                        self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension.  Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
                else:
                    self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
            else:
                self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
        else:
            self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
aknecht2's avatar
aknecht2 committed
192
        self._raise()
193
194
        return

195
    def _check_genomes(self, run):
196
197
198
        # NOTE: THIS function should NOT be called.
        # We will only be using GRCH38.p6 genome.
        # Use the _check_grch38 function instead
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
        valid, msg, assembly = self.mdb.get_assembly(run["experiment"])
        if valid:
            if run["align"] in self.run_data["genomes"]:
                if assembly in self.run_data["genomes"][run["align"]]:
                    base_file = self.run_data["genomes"][run["align"]][assembly]
                    if os.path.isfile(base_file):
                        if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]:
                            prefix = base_file.split(".", 1)[0]
                            missing = []
                            for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
                                if not os.path.isfile(prefix + "." + ext):
                                    missing.append(ext)
                            if len(missing) > 0:
                                self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
                        else:
                            self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension.  Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
                    else:
                        self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
                else:
                    self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
            else:
                self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
        else:
            self.err += "DB error: %s.\n" % (msg, )
        return

225
    def _add_download(self):
226
227
228
229
230
        # Remember, experiment always paired with randomly selected control,
        # For alignment doesn't really matter, but we set up the groundwork
        # for the future.  We randomly select unless there is an
        # equal number of control and experiment files
        self.input_sets = []
231
        for run in self.run_data["runs"]:
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
            if (len(run["data"]["control"]) != len(run["data"]["experiment"])):
                control_data = [random.choice(run["data"]["control"]) for i in range(0, len(run["data"]["experiment"]))]
            else:
                control_data = run["data"]["control"]
            for pair in zip(run["data"]["experiment"], control_data):
                exp_name = "%s_%s.fastq.gz" % (run["experiment"], pair[0]["accession"])
                control_name = "%s_%s.fastq.gz" % (run["experiment"], pair[1]["accession"])
                if exp_name not in self.files and control_name not in self.files:
                    self.input_sets.append(pair)
                    for f in pair:
                        # For each file we create the download job
                        # and output file.  Base output files will be
                        # EXPID_ACCESSION.fastq.gz
                        output_name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
                        output_file = File(output_name)
                        self.files[output_name] = output_file
                        job = Job(self.executables["download_fastq.py"])
                        job.uses(output_file, link=Link.OUTPUT, transfer=True)
                        job.addArguments("-u", f["url"], "-p", output_file, "-t http://", "-m", f["content_md5sum"])
                        self.jobs[output_name] = job
                        self.dax.addJob(job)
253
254
        return

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
    def _add_align(self):
        print self.modules["align"].markers
        markers = {}
        markers["read_end"] = "single"
        for run in self.run_data["runs"]:
            input_files = {}
            additional_files = {}
            markers["tool"] = run["align"]
            gen_prefix = "genome_%s_grch38.p6" % (run["align"],)
            input_files["ref_genome"] = "%s.%s" % (gen_prefix, self.run_data["genomes"][run["align"]]["grch38.p6"])
            for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
                additional_files["ref_genome." + ext] = "%s.%s" % (gen_prefix, ext)
            for pair in self.input_sets:
                for f in pair:
                    prefix = "%s_%s" % (run["experiment"], f["accession"])
                    input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
                    self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
        return
273

274
275
276
277
    def _create_setup(self):
        """
            Creates the base structure for job submission.  Everything is contained
            within a folder based on the current timestamp.
278
        """
279
280
281
282
283
284
        self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
        if not os.path.exists(self.basepath):
            os.makedirs(self.basepath)
        for folder in ["input", "output"]:
            if not os.path.exists(os.path.join(self.basepath, folder)):
                os.makedirs(os.path.join(self.basepath, folder))
285
286
        return

287
288
289
290
    def _add_notify(self):
        # NEED TO DETERMINE HOW TO READ IN THESE VARS
        notify_path = os.path.join(self.basepath, "input/notify.sh")
        with open(notify_path, "w") as wh:
291
292
293
294
295
            notify = textwrap.dedent("""\
                    #!/bin/bash
                    %s/notification/email -t %s --report=pegasus-analyzer
            """ % (config["notify"]["pegasus_home"], config["notify"]["email"]))
            wh.write(notify)
296
297
            os.chmod(notify_path, 0755)
        self.dax.invoke(When.AT_END, notify_path)
298
299
        return

300
301
    def _create_replica(self):
        with open(os.path.join(self.basepath, "input/conf.rc"), "w") as wh:
302
            pegasusrc = textwrap.dedent("""\
303
304
                pegasus.catalog.site = XML
                pegasus.catalog.site.file = %s/sites.xml
305

306
                pegasus.condor.logs.symlink = false
307

308
309
                pegasus.data.configuration = sharedfs
                """ % (os.path.join(self.basepath, "input"),))
310
311
312
            wh.write(pegasusrc)
        return

313
314
    def _create_sites(self):
        with open(os.path.join(self.basepath, "input/sites.xml"), "w") as wh:
315
            sites = """\
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
                <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
                   <site handle="local" arch="x86_64" os="LINUX">
                       <directory type="shared-scratch" path="%s">
                           <file-server operation="all" url="file://%s" />
                       </directory>
                       <directory type="local-storage" path="%s">
                           <file-server operation="all" url="file://%s" />
                       </directory>
                   </site>
                   <site handle="condorpool" arch="x86_64" os="LINUX">
                       <directory type="shared-scratch" path="%s">
                           <file-server operation="all" url="file://%s" />
                       </directory>
                """ % (
                        os.path.join(self.basepath, "work"),
                        os.path.join(self.basepath, "work"),
                        os.path.join(self.basepath, "output"),
                        os.path.join(self.basepath, "output"),
                        os.path.join(self.basepath, "work"),
                        os.path.join(self.basepath, "work")
                    )
            # NEED TO DETERMINE HOW TO READ IN THIS INFO
338
339
340
341
342
            for namespace in config["profile"]:
                for key in config["profile"][namespace]:
                    val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key]
                    sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
            sites += "</site></sitecatalog>"
343
            sites = sites.replace("\n", "")
344
345
346
            wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
        return

347
    def _create_submit(self):
348
349
350
        """
            Creates the pegasus submit script.  submit.sh
        """
351
        with open(os.path.join(self.basepath, "input/submit.sh"), "w") as wh:
352
353
354
355
356
357
358
359
360
            submit = textwrap.dedent("""\
                    #!/bin/bash
                    plan=`pegasus-plan \\
                    --conf "%s" \\
                    --sites "%s" \\
                    --dir "%s" \\
                    --output-site local \\
                    --dax "%s" \\
                    --randomdir \\
361
                    """ % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/chipathlon.dax"))
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
            submit += textwrap.dedent("""\
                    --submit`

                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh

                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
                    pegasus-remove $remove" > remove.sh
                    chmod 744 remove.sh

                    echo "$plan"

                    """)
            wh.write(submit)
            os.chmod(self.basepath + "/input/submit.sh", 0755)
        return

382
    def _write(self):
383
        with open(self.basepath + "/input/chipathlon.dax", "w") as wh:
384
385
            self.dax.writeXML(wh)
        return