workflow.py 17.6 KB
Newer Older
1
import os
2
import copy
3
4
5
6
7
import argparse
import sys
import json
import datetime
import textwrap
8
9
10
import yaml
import traceback
import chipathlon
11
import chipathlon.workflow_job
12
13
import chipathlon.db
import chipathlon.workflow_module
14
from chipathlon.run_parser import RunParser
15
16
17
18
from chipathlon.generators.align_generator import AlignGenerator
from chipathlon.generators.remove_duplicates_generator import RemoveDuplicatesGenerator
from chipathlon.generators.peak_call_generator import PeakCallGenerator
from chipathlon.generators.idr_generator import IdrGenerator
19
from pprint import pprint
20
21
22
from Pegasus.DAX3 import *

class Workflow(object):
aknecht2's avatar
aknecht2 committed
23
    """
24
25
    :param job_home: The base directory to generate the workflow in.
    :type job_home: str
aknecht2's avatar
aknecht2 committed
26
27
28
29
30
31
32
33
34
35
36
37
    :param run_file: The run yaml file defining controls, samples, alignment
        tool, peak calling tool, peak calling type, and which samples to
        run idr on.
    :type run_file: str
    :param config_file: The config yaml file defining system variables.
    :type config_file: str
    :param host: The host address of the MongoDB database.
    :type host: str
    :param username: The username to authenticate for MongoDB access.
    :type username: str
    :param password: The password to authenticate for MongoDB access.
    :type password: str
38
    :param execute_site: A list of sites to submit jobs to.  These sites should
39
        be defined in the configuration file.
40
    :type execute_site: list
41
42
43
    :param output_site: The output site to transfer files to.  This site should
        be defined in the configuration file.
    :type output_site: str
44
45
46
    :param save_db: Whether or not we want to save results to the database.
        True by default.
    :type save_db: bool
aknecht2's avatar
aknecht2 committed
47
48
49
50
51
52
53
    :param rewrite: Whether or not to rewrite existing files.  If true, it will
        ignore files in Mongo and recreate them.  If false, it will download
        files based on the latest available completed job.
    :type rewrite: bool
    :param debug: A flag for printing additional information when generating
        workflows.
    :type debug: bool
54
55
56
57

    Workflow is the highest level class.  It handles interaction between other
    classes as well as handles the initial loading of yaml files and
    executables.
aknecht2's avatar
aknecht2 committed
58
    """
59

60
61
    def __init__(self, job_home, run_file, param_file, config_file,
                properties_file, host, username, password, execute_site="local",
62
                output_site="local", save_db=True, rewrite=False, debug=False):
63
64
        # debug mode, print out additional information
        self.debug = debug
65
        # Job site information
66
        self.execute_site = execute_site
67
        self.output_site = output_site
68
        self.save_db = save_db
69

70
        # DB information
71
72
73
        self.username = username
        self.host = host
        self.password = password
74
        # Initialize db connection
75
        self.mdb = chipathlon.db.MongoDB(host, username, password)
76
        # Jobname info & err
77
        self.job_home = os.path.abspath(job_home)
78
        self.base_path = os.path.join(self.job_home, datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S"))
79
        self.errors = []
80
81
        # add new results even if they exist
        self.rewrite = rewrite
82
        # Input file info
83
84
        self.run_file = os.path.abspath(run_file)
        self.param_file = os.path.abspath(param_file)
Cloud User's avatar
Cloud User committed
85
        self.properties_file = os.path.abspath(properties_file)
86
87
        self.config_file = os.path.abspath(config_file)

88
        # Dax specific info
89
        self.dax = ADAG(os.path.basename(os.path.dirname(self.job_home + "/")))
90
        self.executables = {}
91
        self.files = {}
92
        self.jobs = {}
93
        self.workflow_jobs = {}
94
        self.modules = {}
95

96
        # The steps to take when generating jobs
97
        self.generation_steps = [
98
            self._load_config,
99
100
101
102
103
104
105
106
107
108
            self._load_executables,
            self._load_workflow_jobs,
            self._load_modules,
            self._load_run_data,
            self._load_generators,
            # All required information is loaded, start queuing jobs
            self._create_setup,
            self._generate_jobs,
            # Create pegasus important stuff
            self._add_notify,
109
            self._create_pegasus_files
110
        ]
111
112
        return

113
    def info(self):
aknecht2's avatar
aknecht2 committed
114
115
116
117
118
        """
        A helper function that prints out all currently loaded run information,
        all loaded workflow_jobs, all loaded workflow_modules, and all loaded
        executables.
        """
119
        pprint(self.run_data)
120
        pprint(self.workflow_jobs)
121
122
123
124
        pprint(self.modules)
        pprint(self.executables)
        return

125
    def generate(self):
aknecht2's avatar
aknecht2 committed
126
127
128
        """
        The function that actually generates the workflow!
        """
129
130
131
132
133
134
        for step in self.generation_steps:
            if self.is_valid():
                step()
            else:
                print self.get_error_string()
                break
135
136
        return

137
138
    def is_valid(self):
        """
aknecht2's avatar
aknecht2 committed
139
140
141
        :returns: True if the workflow is valid else False.

        Whether or not the workflow is valid so far!
142
143
144
145
146
        """
        return len(self.errors) == 0

    def get_error_string(self):
        """
aknecht2's avatar
aknecht2 committed
147
        :returns: A newline separated string of all the errors.
148
149
150
        """
        return "\n".join(self.errors)

aknecht2's avatar
aknecht2 committed
151
152
153
154
155
156
157
    def _add_file(self, name, path, site):
        f = File(name)
        f.addPFN(PFN(path, site))
        self.files[name] = f
        self.dax.addFile(f)
        return

158
159
160
161
162
    def _add_genome_file(self, gen_file_obj):
        self.files[gen_file_obj["name"]] = gen_file_obj["file"]
        self.dax.addFile(gen_file_obj["file"])
        return

163
164
165
    def _load_config(self):
        # Validate input config.yaml file
        # required & optional keys can be found in conf.py
166
        try:
167
            with open(self.config_file, "r") as rh:
168
169
170
171
172
173
174
175
176
177
                self.config = yaml.load(rh)
            for key in chipathlon.conf.config_file["required_keys"]:
                if key not in self.config:
                    self.errors.append("Config file '%s' does not have required key '%s'." % (config_file, key))
            all_keys = chipathlon.conf.config_file["optional_keys"] + chipathlon.conf.config_file["required_keys"]
            for key in self.config:
                if key not in all_keys:
                    self.errors.append("Config file '%s' has invalid key '%s' specified, should be one of: %s." % (config_file, key, all_keys))
        except yaml.YAMLError as ye:
            self.errors.append("Error reading config file '%s': %s" % (config_file, ye))
178
179
180
181
182

        # Validate the existance of but not the contents of the
        # pegasus properties file
        if not os.path.isfile(self.properties_file):
            self.errors.append("Provided pegasus properties file '%s' does not exist." % (self.properties_file,))
183
        return
184

185
186
    def _add_executable(self, name, path, os_type="linux", arch="x86_64", site="local", installed=True):
        self.executables[name] = Executable(name=name, os=os_type.lower(), arch=arch, installed=installed)
187
188
189
190
191
192
        self.executables[name].addPFN(PFN(os.path.join("file://", path), site))
        self.dax.addExecutable(self.executables[name])
        if self.debug:
            print "[LOAD_EXECUTABLE] %s" % (name,)
        return

193
    def _load_executables(self, os_type="linux", arch="x86_64"):
194
        # Load actual executables don't need no wrappers
195
        for executable in chipathlon.conf.executables:
196
197
198
199
200
201
202
            self._add_executable(
                executable,
                os.path.join(self.config["idr_bin"] if executable == "idr" else self.config["chipathlon_bin"], executable),
                site=self.execute_site,
                arch=self.config.get("arch", "x86_64"),
                os_type=self.config.get("os", "linux")
            )
203
        # Load actual scripts
204
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
205
            for f in files:
206
                if not os.path.splitext(f)[1] == '.pyc':
207
                    self._add_executable(
208
209
                        f,
                        os.path.join(self.config["chipathlon_bin"], f),
210
                        site=self.execute_site,
211
212
                        arch=self.config.get("arch", "x86_64"),
                        os_type=self.config.get("os", "linux")
213
                    )
214
            break
215
216
        # Handle necessary installed scripts
        for cmd in chipathlon.conf.system_commands:
217
218
219
220
            self._add_executable(
                cmd,
                os.path.join(chipathlon.conf.system_path, cmd),
                site=self.execute_site,
221
222
                arch=self.config.get("arch", "x86_64"),
                os_type=self.config.get("os", "linux")
223
	    )
224
225
        return

226
    def _load_workflow_jobs(self):
227
228
229
230
231
        # Load in the specified param file for use in the workflow_jobs
        if os.path.isfile(self.param_file):
            with open(self.param_file, "r") as rh:
                try:
                    self.params = yaml.load(rh)
232
233
234
                    if self.params is None:
                        print "[WARNING] Param file is empty.  Using defaults for all jobs."
                        self.params = {}
235
236
237
238
239
240
                except yaml.YAMLError as ye:
                    self.errors.append("Error parsing param yaml file '%s': %s." % (self.param_file, ye))
        else:
            print "[WARNING] Param file does not exist.  Using defaults for all jobs."
            self.params = {}

241
        # Load each yaml_job as an actual object
242
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
243
            for f in files:
244
245
                yaml_job = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.params.get(f.split(".")[0]), self.debug)
                if yaml_job.is_valid():
246
247
248
249
                    if yaml_job.command in self.executables:
                        yaml_job.add_executable(self.executables[yaml_job.command])
                        self.workflow_jobs[yaml_job.job_name] = yaml_job
                        if self.debug:
250
                            print "[LOAD WORKFLOW JOB] %s" % (yaml_job.job_name,)
251
                        for raw_file_name in yaml_job.raw_files:
252
253
                            if raw_file_name not in self.files:
                                self._add_file(raw_file_name, yaml_job.raw_files[raw_file_name]["path"], "local")
aknecht2's avatar
aknecht2 committed
254
255
                            # The _add_file actually creates the Pegasus file class,
                            # we want to reference that within the yaml_job for housekeeping
256
257
258
                            yaml_job.raw_files[raw_file_name]["file"] = self.files[raw_file_name]
                    else:
                        print "[WARNING] Skipping param file %s, corresponding executable %s not found." % (f, yaml_job.command)
259
                else:
260
261
                    for error in yaml_job.errors:
                        self.errors.append(error)
262
263
        return

264
    def _load_modules(self):
265
        # Load each yaml_module as an actual object
266
        for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
267
            for f in files:
268
                mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs, self.debug)
269
270
                if not mod.err:
                    self.modules[mod.name] = mod
271
272
                    if self.debug:
                        print "[LOAD MODULE] %s" % (mod.name,)
273
274
                else:
                    self.err += mod.err
275
276
277
            break
        return

278
    def _load_run_data(self):
279
280
281
282
        self.run_parser = RunParser(self.run_file, self.mdb)
        if self.run_parser.is_valid():
            for genome in self.run_parser.get_genomes():
                for gen_file_obj in genome.get_all_files():
283
284
                    if gen_file_obj["name"] not in self.files:
                        self._add_genome_file(gen_file_obj)
285
286
287
288
            for i, run in enumerate(self.run_parser.get_runs()):
                if self.debug:
                    print "[LOADING SAMPLES] Run #%s" % (i,)
                run.load_samples(self.mdb)
289
290
                if not run.is_valid():
                    self.errors.append("Error with Run #%s: %s" % (i, run.get_error_string()))
291
292
            self.runs = self.run_parser.get_runs()
        else:
293
            self.errors.append(self.run_parser.get_error_string())
294
        return
295

296
    def _load_generators(self):
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
        self.align_gen = AlignGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["align"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.remove_dup_gen = RemoveDuplicatesGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.peak_call_gen = PeakCallGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["peak_call"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.idr_gen = IdrGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["idr"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
317
318
319
320
        return

    def _generate_jobs(self):
        for run in self.runs:
321
            if run.file_type == "fastq":
322
323
324
                self.align_gen.generate(run)
            self.remove_dup_gen.generate(run)
            self.peak_call_gen.generate(run)
325
            if run.idr is not None:
326
                self.idr_gen.generate(run)
327
328
        return

329
330
    def _create_setup(self):
        """
331
332
        Creates the base structure for job submission.  Everything is contained
        within a folder based on the current timestamp.
333
        """
334
335
        if not os.path.exists(self.base_path):
            os.makedirs(self.base_path)
336
        for folder in ["input", "input/db_meta"]:
337
338
            if not os.path.exists(os.path.join(self.base_path, folder)):
                os.makedirs(os.path.join(self.base_path, folder))
339
340
        return

341
    def _add_notify(self):
342
343
344
        """
        Add the script to email when the workflow is finished.
        """
345
        notify_path = os.path.join(self.base_path, "input/notify.sh")
346
        with open(notify_path, "w") as wh:
347
348
349
            notify = textwrap.dedent("""\
                    #!/bin/bash
                    %s/notification/email -t %s --report=pegasus-analyzer
350
            """ % (self.config["pegasus_home"], self.config["email"]))
351
            wh.write(notify)
352
353
            os.chmod(notify_path, 0755)
        self.dax.invoke(When.AT_END, notify_path)
354
355
        return

356
357
    def _create_pegasus_files(self):
        self._create_submit()
358
        self._create_dax()
359
360
        return

361
    def _create_submit(self):
362
        """
363
364
365
        Creates the pegasus submit script.  Just a wrapper to pegasus-plan
        with the correct arguments passed through.  Additionally creates
        some easy status/remove scripts for the workflow when submitted.
366
        """
367
        with open(os.path.join(self.base_path, "input/submit.sh"), "w") as wh:
368
369
370
            submit = textwrap.dedent("""\
                    #!/bin/bash
                    plan=`pegasus-plan \\
371
372
373
374
375
                    --conf %s \\
                    --sites %s \\
                    --output-site %s \\
                    --dir %s \\
                    --dax %s \\
376
                    --randomdir \\
377
                    """ % (
378
                        self.properties_file,
379
                        self.execute_site,
380
                        self.output_site,
381
                        os.path.join(self.base_path, "work"),
382
                        os.path.join(self.base_path, "input/chipathlon.dax")
383
                    ))
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
            submit += textwrap.dedent("""\
                    --submit`

                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh

                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
                    pegasus-remove $remove" > remove.sh
                    chmod 744 remove.sh

                    echo "$plan"

                    """)
            wh.write(submit)
401
            os.chmod(os.path.join(self.base_path, "input/submit.sh"), 0755)
402
403
        return

404
    def _create_dax(self):
405
        with open(os.path.join(self.base_path, "input/chipathlon.dax"), "w") as wh:
406
407
            self.dax.writeXML(wh)
        return
408