workflow.py 16.7 KB
Newer Older
1
import os
2
import copy
3
4
5
6
7
import argparse
import sys
import json
import datetime
import textwrap
8
9
10
import yaml
import traceback
import chipathlon
11
import chipathlon.workflow_job
12
13
import chipathlon.db
import chipathlon.workflow_module
14
from chipathlon.run_parser import RunParser
15
16
17
18
from chipathlon.generators.align_generator import AlignGenerator
from chipathlon.generators.remove_duplicates_generator import RemoveDuplicatesGenerator
from chipathlon.generators.peak_call_generator import PeakCallGenerator
from chipathlon.generators.idr_generator import IdrGenerator
19
from pprint import pprint
20
21
22
from Pegasus.DAX3 import *

class Workflow(object):
aknecht2's avatar
aknecht2 committed
23
    """
24
25
    :param job_home: The base directory to generate the workflow in.
    :type job_home: str
aknecht2's avatar
aknecht2 committed
26
27
28
29
30
31
32
33
34
35
36
37
    :param run_file: The run yaml file defining controls, samples, alignment
        tool, peak calling tool, peak calling type, and which samples to
        run idr on.
    :type run_file: str
    :param config_file: The config yaml file defining system variables.
    :type config_file: str
    :param host: The host address of the MongoDB database.
    :type host: str
    :param username: The username to authenticate for MongoDB access.
    :type username: str
    :param password: The password to authenticate for MongoDB access.
    :type password: str
38
    :param execute_site: A list of sites to submit jobs to.  These sites should
39
        be defined in the configuration file.
40
    :type execute_site: list
41
42
43
    :param output_site: The output site to transfer files to.  This site should
        be defined in the configuration file.
    :type output_site: str
44
45
46
    :param save_db: Whether or not we want to save results to the database.
        True by default.
    :type save_db: bool
aknecht2's avatar
aknecht2 committed
47
48
49
50
51
52
53
    :param rewrite: Whether or not to rewrite existing files.  If true, it will
        ignore files in Mongo and recreate them.  If false, it will download
        files based on the latest available completed job.
    :type rewrite: bool
    :param debug: A flag for printing additional information when generating
        workflows.
    :type debug: bool
54
55
56
57

    Workflow is the highest level class.  It handles interaction between other
    classes as well as handles the initial loading of yaml files and
    executables.
aknecht2's avatar
aknecht2 committed
58
    """
59

60
    def __init__(self, job_home, run_file, param_file, config_file,
61
62
63
                properties_file, execute_site="local", output_site="local",
                host="localhost", username=None, password=None, save_db=True,
                rewrite=False, debug=False):
64
65
        # debug mode, print out additional information
        self.debug = debug
66
        # Job site information
67
        self.execute_site = execute_site
68
        self.output_site = output_site
69
        self.save_db = save_db
70

71
        # DB information
72
73
74
        self.username = username
        self.host = host
        self.password = password
75
        # Initialize db connection
76
        self.mdb = chipathlon.db.MongoDB(host, username, password)
77
        # Jobname info & err
78
        self.job_home = os.path.abspath(job_home)
79
        self.base_path = os.path.join(self.job_home, datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S"))
80
        self.errors = []
81
82
        # add new results even if they exist
        self.rewrite = rewrite
83
        # Input file info
84
85
        self.run_file = os.path.abspath(run_file)
        self.param_file = os.path.abspath(param_file)
Cloud User's avatar
Cloud User committed
86
        self.properties_file = os.path.abspath(properties_file)
87
88
        self.config_file = os.path.abspath(config_file)

89
        # Dax specific info
90
        self.dax = ADAG(os.path.basename(os.path.dirname(self.job_home + "/")))
91
        self.executables = {}
92
        self.files = {}
93
        self.jobs = {}
94
        self.workflow_jobs = {}
95
        self.modules = {}
96

97
        # The steps to take when generating jobs
98
        self.generation_steps = [
99
            self._load_config,
100
101
102
103
104
105
106
107
108
109
            self._load_executables,
            self._load_workflow_jobs,
            self._load_modules,
            self._load_run_data,
            self._load_generators,
            # All required information is loaded, start queuing jobs
            self._create_setup,
            self._generate_jobs,
            # Create pegasus important stuff
            self._add_notify,
110
            self._create_pegasus_files
111
        ]
112
113
        return

114
    def info(self):
aknecht2's avatar
aknecht2 committed
115
116
117
118
119
        """
        A helper function that prints out all currently loaded run information,
        all loaded workflow_jobs, all loaded workflow_modules, and all loaded
        executables.
        """
120
        pprint(self.run_data)
121
        pprint(self.workflow_jobs)
122
123
124
125
        pprint(self.modules)
        pprint(self.executables)
        return

126
    def generate(self):
aknecht2's avatar
aknecht2 committed
127
128
129
        """
        The function that actually generates the workflow!
        """
130
131
132
133
134
135
        for step in self.generation_steps:
            if self.is_valid():
                step()
            else:
                print self.get_error_string()
                break
136
137
        return

138
139
    def is_valid(self):
        """
aknecht2's avatar
aknecht2 committed
140
141
142
        :returns: True if the workflow is valid else False.

        Whether or not the workflow is valid so far!
143
144
145
146
147
        """
        return len(self.errors) == 0

    def get_error_string(self):
        """
aknecht2's avatar
aknecht2 committed
148
        :returns: A newline separated string of all the errors.
149
150
151
        """
        return "\n".join(self.errors)

aknecht2's avatar
aknecht2 committed
152
153
154
155
156
157
158
    def _add_file(self, name, path, site):
        f = File(name)
        f.addPFN(PFN(path, site))
        self.files[name] = f
        self.dax.addFile(f)
        return

159
160
161
162
163
    def _add_genome_file(self, gen_file_obj):
        self.files[gen_file_obj["name"]] = gen_file_obj["file"]
        self.dax.addFile(gen_file_obj["file"])
        return

164
165
166
    def _load_config(self):
        # Validate input config.yaml file
        # required & optional keys can be found in conf.py
167
        try:
168
            with open(self.config_file, "r") as rh:
169
170
171
172
173
174
175
176
177
178
                self.config = yaml.load(rh)
            for key in chipathlon.conf.config_file["required_keys"]:
                if key not in self.config:
                    self.errors.append("Config file '%s' does not have required key '%s'." % (config_file, key))
            all_keys = chipathlon.conf.config_file["optional_keys"] + chipathlon.conf.config_file["required_keys"]
            for key in self.config:
                if key not in all_keys:
                    self.errors.append("Config file '%s' has invalid key '%s' specified, should be one of: %s." % (config_file, key, all_keys))
        except yaml.YAMLError as ye:
            self.errors.append("Error reading config file '%s': %s" % (config_file, ye))
179
180
181
182
183

        # Validate the existance of but not the contents of the
        # pegasus properties file
        if not os.path.isfile(self.properties_file):
            self.errors.append("Provided pegasus properties file '%s' does not exist." % (self.properties_file,))
184
        return
185

186
187
    def _add_executable(self, name, path, os_type="linux", arch="x86_64", site="local", installed=True):
        self.executables[name] = Executable(name=name, os=os_type.lower(), arch=arch, installed=installed)
188
189
190
191
192
193
        self.executables[name].addPFN(PFN(os.path.join("file://", path), site))
        self.dax.addExecutable(self.executables[name])
        if self.debug:
            print "[LOAD_EXECUTABLE] %s" % (name,)
        return

194
    def _load_executables(self, os_type="linux", arch="x86_64"):
195
        # Load actual executables don't need no wrappers
196
        for executable in chipathlon.conf.executables:
197
198
199
200
201
202
203
            self._add_executable(
                executable,
                os.path.join(self.config["idr_bin"] if executable == "idr" else self.config["chipathlon_bin"], executable),
                site=self.execute_site,
                arch=self.config.get("arch", "x86_64"),
                os_type=self.config.get("os", "linux")
            )
204
        # Load actual scripts
205
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
206
            for f in files:
207
                if not os.path.splitext(f)[1] == '.pyc':
208
                    self._add_executable(
209
210
                        f,
                        os.path.join(self.config["chipathlon_bin"], f),
211
                        site=self.execute_site,
212
213
                        arch=self.config.get("arch", "x86_64"),
                        os_type=self.config.get("os", "linux")
214
                    )
215
            break
216
217
        # Handle necessary installed scripts
        for cmd in chipathlon.conf.system_commands:
218
219
220
221
            self._add_executable(
                cmd,
                os.path.join(chipathlon.conf.system_path, cmd),
                site=self.execute_site,
222
223
                arch=self.config.get("arch", "x86_64"),
                os_type=self.config.get("os", "linux")
224
	    )
225
226
        return

227
    def _load_workflow_jobs(self):
228
229
230
231
232
        # Load in the specified param file for use in the workflow_jobs
        if os.path.isfile(self.param_file):
            with open(self.param_file, "r") as rh:
                try:
                    self.params = yaml.load(rh)
233
234
235
                    if self.params is None:
                        print "[WARNING] Param file is empty.  Using defaults for all jobs."
                        self.params = {}
236
237
238
239
240
241
                except yaml.YAMLError as ye:
                    self.errors.append("Error parsing param yaml file '%s': %s." % (self.param_file, ye))
        else:
            print "[WARNING] Param file does not exist.  Using defaults for all jobs."
            self.params = {}

242
        # Load each yaml_job as an actual object
243
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
244
            for f in files:
245
246
                yaml_job = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.params.get(f.split(".")[0]), self.debug)
                if yaml_job.is_valid():
247
248
249
250
                    if yaml_job.command in self.executables:
                        yaml_job.add_executable(self.executables[yaml_job.command])
                        self.workflow_jobs[yaml_job.job_name] = yaml_job
                        if self.debug:
251
                            print "[LOAD WORKFLOW JOB] %s" % (yaml_job.job_name,)
252
                        for raw_file_name in yaml_job.raw_files:
253
254
                            if raw_file_name not in self.files:
                                self._add_file(raw_file_name, yaml_job.raw_files[raw_file_name]["path"], "local")
aknecht2's avatar
aknecht2 committed
255
256
                            # The _add_file actually creates the Pegasus file class,
                            # we want to reference that within the yaml_job for housekeeping
257
258
259
                            yaml_job.raw_files[raw_file_name]["file"] = self.files[raw_file_name]
                    else:
                        print "[WARNING] Skipping param file %s, corresponding executable %s not found." % (f, yaml_job.command)
260
                else:
261
262
                    for error in yaml_job.errors:
                        self.errors.append(error)
263
264
        return

265
    def _load_modules(self):
266
        # Load each yaml_module as an actual object
267
        for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
268
            for f in files:
269
                mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs, self.debug)
270
271
                if not mod.err:
                    self.modules[mod.name] = mod
272
273
                    if self.debug:
                        print "[LOAD MODULE] %s" % (mod.name,)
274
275
                else:
                    self.err += mod.err
276
277
278
            break
        return

279
    def _load_run_data(self):
280
281
282
283
        self.run_parser = RunParser(self.run_file, self.mdb)
        if self.run_parser.is_valid():
            for genome in self.run_parser.get_genomes():
                for gen_file_obj in genome.get_all_files():
284
285
                    if gen_file_obj["name"] not in self.files:
                        self._add_genome_file(gen_file_obj)
286
287
288
289
            for i, run in enumerate(self.run_parser.get_runs()):
                if self.debug:
                    print "[LOADING SAMPLES] Run #%s" % (i,)
                run.load_samples(self.mdb)
290
291
                if not run.is_valid():
                    self.errors.append("Error with Run #%s: %s" % (i, run.get_error_string()))
292
293
            self.runs = self.run_parser.get_runs()
        else:
294
            self.errors.append(self.run_parser.get_error_string())
295
        return
296

297
    def _load_generators(self):
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
        self.align_gen = AlignGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["align"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.remove_dup_gen = RemoveDuplicatesGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.peak_call_gen = PeakCallGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["peak_call"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.idr_gen = IdrGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["idr"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
318
319
320
321
        return

    def _generate_jobs(self):
        for run in self.runs:
322
            if run.file_type == "fastq":
323
324
325
                self.align_gen.generate(run)
            self.remove_dup_gen.generate(run)
            self.peak_call_gen.generate(run)
326
            if run.idr is not None:
327
                self.idr_gen.generate(run)
328
329
        return

330
331
    def _create_setup(self):
        """
332
333
        Creates the base structure for job submission.  Everything is contained
        within a folder based on the current timestamp.
334
        """
335
336
        if not os.path.exists(self.base_path):
            os.makedirs(self.base_path)
337
        for folder in ["input", "input/db_meta"]:
338
339
            if not os.path.exists(os.path.join(self.base_path, folder)):
                os.makedirs(os.path.join(self.base_path, folder))
340
341
        return

342
    def _add_notify(self):
343
344
345
        """
        Add the script to email when the workflow is finished.
        """
346
        notify_path = os.path.join(self.base_path, "input/notify.sh")
347
        with open(notify_path, "w") as wh:
348
349
350
            notify = textwrap.dedent("""\
                    #!/bin/bash
                    %s/notification/email -t %s --report=pegasus-analyzer
351
            """ % (self.config["pegasus_home"], self.config["email"]))
352
            wh.write(notify)
353
354
            os.chmod(notify_path, 0755)
        self.dax.invoke(When.AT_END, notify_path)
355
356
        return

357
358
    def _create_pegasus_files(self):
        self._create_submit()
359
        self._create_dax()
360
361
        return

362
    def _create_submit(self):
363
        """
364
365
366
        Creates the pegasus submit script.  Just a wrapper to pegasus-plan
        with the correct arguments passed through.  Additionally creates
        some easy status/remove scripts for the workflow when submitted.
367
        """
368
        with open(os.path.join(self.base_path, "input/submit.sh"), "w") as wh:
369
370
371
            submit = textwrap.dedent("""\
                    #!/bin/bash
                    plan=`pegasus-plan \\
372
373
374
375
376
                    --conf %s \\
                    --sites %s \\
                    --output-site %s \\
                    --dir %s \\
                    --dax %s \\
377
                    --randomdir \\
378
                    """ % (
379
                        self.properties_file,
380
                        self.execute_site,
381
                        self.output_site,
382
                        os.path.join(self.base_path, "work"),
383
                        os.path.join(self.base_path, "input/chipathlon.dax")
384
                    ))
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
            submit += textwrap.dedent("""\
                    --submit`

                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh

                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
                    pegasus-remove $remove" > remove.sh
                    chmod 744 remove.sh

                    echo "$plan"

                    """)
            wh.write(submit)
402
            os.chmod(os.path.join(self.base_path, "input/submit.sh"), 0755)
403
404
        return

405
    def _create_dax(self):
406
        with open(os.path.join(self.base_path, "input/chipathlon.dax"), "w") as wh:
407
408
            self.dax.writeXML(wh)
        return