workflow.py 15.7 KB
Newer Older
1
import os
2
import copy
3
4
5
6
7
import argparse
import sys
import json
import datetime
import textwrap
8
9
10
import yaml
import traceback
import chipathlon
11
import chipathlon.workflow_job
12
13
import chipathlon.db
import chipathlon.workflow_module
14
from chipathlon.run_parser import RunParser
15
16
17
18
from chipathlon.generators.align_generator import AlignGenerator
from chipathlon.generators.remove_duplicates_generator import RemoveDuplicatesGenerator
from chipathlon.generators.peak_call_generator import PeakCallGenerator
from chipathlon.generators.idr_generator import IdrGenerator
19
from pprint import pprint
20
21
22
from Pegasus.DAX3 import *

class Workflow(object):
aknecht2's avatar
aknecht2 committed
23
    """
24
25
    :param job_home: The base directory to generate the workflow in.
    :type job_home: str
aknecht2's avatar
aknecht2 committed
26
27
28
29
30
31
32
33
34
35
36
37
    :param run_file: The run yaml file defining controls, samples, alignment
        tool, peak calling tool, peak calling type, and which samples to
        run idr on.
    :type run_file: str
    :param config_file: The config yaml file defining system variables.
    :type config_file: str
    :param host: The host address of the MongoDB database.
    :type host: str
    :param username: The username to authenticate for MongoDB access.
    :type username: str
    :param password: The password to authenticate for MongoDB access.
    :type password: str
aknecht2's avatar
aknecht2 committed
38
39
40
    :param execute_site: The target site to submit jobs to.  These sites should
        be defined in the sites.xml file.
    :type execute_site: str
41
    :param output_site: The output site to transfer files to.  This site should
aknecht2's avatar
aknecht2 committed
42
        be defined in the sites.xml file.
43
    :type output_site: str
44
45
46
    :param save_db: Whether or not we want to save results to the database.
        True by default.
    :type save_db: bool
47
48
    :param email: An email address to notify when the workflow is finished.
    :type email: str
aknecht2's avatar
aknecht2 committed
49
50
51
52
53
54
55
    :param rewrite: Whether or not to rewrite existing files.  If true, it will
        ignore files in Mongo and recreate them.  If false, it will download
        files based on the latest available completed job.
    :type rewrite: bool
    :param debug: A flag for printing additional information when generating
        workflows.
    :type debug: bool
56
57
58
59

    Workflow is the highest level class.  It handles interaction between other
    classes as well as handles the initial loading of yaml files and
    executables.
aknecht2's avatar
aknecht2 committed
60
    """
61

62
63
    def __init__(self, job_home, run_file, param_file, properties_file,
                chip_bin, idr_bin, execute_site="local", output_site="local",
64
                host="localhost", username=None, password=None, save_db=True,
65
                email=None, rewrite=False, debug=False):
66
67
        # debug mode, print out additional information
        self.debug = debug
68
        # Job site information
69
        self.execute_site = execute_site
70
        self.output_site = output_site
71
        self.save_db = save_db
72
        self.email = email
73

74
        # DB information
75
76
77
        self.username = username
        self.host = host
        self.password = password
78
        # Initialize db connection
79
        self.mdb = chipathlon.db.MongoDB(host, username, password)
80
        # Jobname info & err
81
        self.job_home = os.path.abspath(job_home)
82
        self.base_path = os.path.join(self.job_home, datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S"))
83
        self.errors = []
84
85
        # add new results even if they exist
        self.rewrite = rewrite
86
        # Input file info
87
88
        self.run_file = os.path.abspath(run_file)
        self.param_file = os.path.abspath(param_file)
Cloud User's avatar
Cloud User committed
89
        self.properties_file = os.path.abspath(properties_file)
90
91
92
93
94
        # Validate the existance of but not the contents of the
        # pegasus properties file
        if not os.path.isfile(self.properties_file):
            self.errors.append("Provided pegasus properties file '%s' does not exist." % (self.properties_file,))

95
96
        self.chip_bin = chip_bin
        self.idr_bin = idr_bin
97

98
        # Dax specific info
99
        self.dax = ADAG(os.path.basename(os.path.dirname(self.job_home + "/")))
100
        self.executables = {}
101
        self.files = {}
102
        self.jobs = {}
103
        self.workflow_jobs = {}
104
        self.modules = {}
105

106
        # The steps to take when generating jobs
107
108
109
110
111
112
113
114
115
116
117
        self.generation_steps = [
            self._load_executables,
            self._load_workflow_jobs,
            self._load_modules,
            self._load_run_data,
            self._load_generators,
            # All required information is loaded, start queuing jobs
            self._create_setup,
            self._generate_jobs,
            # Create pegasus important stuff
            self._add_notify,
118
            self._create_pegasus_files
119
        ]
120
121
        return

122
    def info(self):
aknecht2's avatar
aknecht2 committed
123
124
125
126
127
        """
        A helper function that prints out all currently loaded run information,
        all loaded workflow_jobs, all loaded workflow_modules, and all loaded
        executables.
        """
128
        pprint(self.run_data)
129
        pprint(self.workflow_jobs)
130
131
132
133
        pprint(self.modules)
        pprint(self.executables)
        return

134
    def generate(self):
aknecht2's avatar
aknecht2 committed
135
136
137
        """
        The function that actually generates the workflow!
        """
138
139
140
141
142
143
        for step in self.generation_steps:
            if self.is_valid():
                step()
            else:
                print self.get_error_string()
                break
144
145
        return

146
147
    def is_valid(self):
        """
aknecht2's avatar
aknecht2 committed
148
149
150
        :returns: True if the workflow is valid else False.

        Whether or not the workflow is valid so far!
151
152
153
154
155
        """
        return len(self.errors) == 0

    def get_error_string(self):
        """
aknecht2's avatar
aknecht2 committed
156
        :returns: A newline separated string of all the errors.
157
158
159
        """
        return "\n".join(self.errors)

aknecht2's avatar
aknecht2 committed
160
161
162
163
164
165
166
    def _add_file(self, name, path, site):
        f = File(name)
        f.addPFN(PFN(path, site))
        self.files[name] = f
        self.dax.addFile(f)
        return

167
168
169
170
171
    def _add_genome_file(self, gen_file_obj):
        self.files[gen_file_obj["name"]] = gen_file_obj["file"]
        self.dax.addFile(gen_file_obj["file"])
        return

172
173
    def _add_executable(self, name, path, os_type="linux", arch="x86_64", site="local", installed=True):
        self.executables[name] = Executable(name=name, os=os_type.lower(), arch=arch, installed=installed)
174
175
176
177
178
179
        self.executables[name].addPFN(PFN(os.path.join("file://", path), site))
        self.dax.addExecutable(self.executables[name])
        if self.debug:
            print "[LOAD_EXECUTABLE] %s" % (name,)
        return

180
    def _load_executables(self, os_type="linux", arch="x86_64"):
181
        # Load actual executables don't need no wrappers
182
        for executable in chipathlon.conf.executables:
183
184
            self._add_executable(
                executable,
185
                os.path.join(self.idr_bin if executable == "idr" else self.chip_bin, executable),
186
                site=self.execute_site,
187
188
                arch=arch,
                os_type=os_type
189
            )
190
        # Load actual scripts
191
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
192
            for f in files:
193
                if not os.path.splitext(f)[1] == '.pyc':
194
                    self._add_executable(
195
                        f,
196
                        os.path.join(self.chip_bin, f),
197
                        site=self.execute_site,
198
199
                        arch=arch,
                        os_type=os_type,
200
                    )
201
            break
202
203
        # Handle necessary installed scripts
        for cmd in chipathlon.conf.system_commands:
204
205
206
207
            self._add_executable(
                cmd,
                os.path.join(chipathlon.conf.system_path, cmd),
                site=self.execute_site,
208
209
                arch=arch,
                os_type=os_type
210
	    )
211
212
        return

213
    def _load_workflow_jobs(self):
214
215
216
217
218
        # Load in the specified param file for use in the workflow_jobs
        if os.path.isfile(self.param_file):
            with open(self.param_file, "r") as rh:
                try:
                    self.params = yaml.load(rh)
219
220
221
                    if self.params is None:
                        print "[WARNING] Param file is empty.  Using defaults for all jobs."
                        self.params = {}
222
223
224
225
226
227
                except yaml.YAMLError as ye:
                    self.errors.append("Error parsing param yaml file '%s': %s." % (self.param_file, ye))
        else:
            print "[WARNING] Param file does not exist.  Using defaults for all jobs."
            self.params = {}

228
        # Load each yaml_job as an actual object
229
        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
230
            for f in files:
231
232
                yaml_job = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.params.get(f.split(".")[0]), self.debug)
                if yaml_job.is_valid():
233
234
235
236
                    if yaml_job.command in self.executables:
                        yaml_job.add_executable(self.executables[yaml_job.command])
                        self.workflow_jobs[yaml_job.job_name] = yaml_job
                        if self.debug:
237
                            print "[LOAD WORKFLOW JOB] %s" % (yaml_job.job_name,)
238
                        for raw_file_name in yaml_job.raw_files:
239
240
                            if raw_file_name not in self.files:
                                self._add_file(raw_file_name, yaml_job.raw_files[raw_file_name]["path"], "local")
aknecht2's avatar
aknecht2 committed
241
242
                            # The _add_file actually creates the Pegasus file class,
                            # we want to reference that within the yaml_job for housekeeping
243
244
245
                            yaml_job.raw_files[raw_file_name]["file"] = self.files[raw_file_name]
                    else:
                        print "[WARNING] Skipping param file %s, corresponding executable %s not found." % (f, yaml_job.command)
246
                else:
247
248
                    for error in yaml_job.errors:
                        self.errors.append(error)
249
250
        return

251
    def _load_modules(self):
252
        # Load each yaml_module as an actual object
253
        for root, dirs, files in os.walk(os.path.join(os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
254
            for f in files:
255
                mod = chipathlon.workflow_module.WorkflowModule(os.path.join(root, f), self.workflow_jobs, self.debug)
256
257
                if not mod.err:
                    self.modules[mod.name] = mod
258
259
                    if self.debug:
                        print "[LOAD MODULE] %s" % (mod.name,)
260
261
                else:
                    self.err += mod.err
262
263
264
            break
        return

265
    def _load_run_data(self):
266
267
268
269
        self.run_parser = RunParser(self.run_file, self.mdb)
        if self.run_parser.is_valid():
            for genome in self.run_parser.get_genomes():
                for gen_file_obj in genome.get_all_files():
270
271
                    if gen_file_obj["name"] not in self.files:
                        self._add_genome_file(gen_file_obj)
272
273
274
275
            for i, run in enumerate(self.run_parser.get_runs()):
                if self.debug:
                    print "[LOADING SAMPLES] Run #%s" % (i,)
                run.load_samples(self.mdb)
276
277
                if not run.is_valid():
                    self.errors.append("Error with Run #%s: %s" % (i, run.get_error_string()))
278
279
            self.runs = self.run_parser.get_runs()
        else:
280
            self.errors.append(self.run_parser.get_error_string())
281
        return
282

283
    def _load_generators(self):
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
        self.align_gen = AlignGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["align"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.remove_dup_gen = RemoveDuplicatesGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.peak_call_gen = PeakCallGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["peak_call"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
        self.idr_gen = IdrGenerator(
            self.dax, self.jobs, self.files, self.mdb, self.modules["idr"],
            self.workflow_jobs, self.base_path, save_db=self.save_db,
            rewrite=self.rewrite, debug=self.debug
        )
304
305
306
307
        return

    def _generate_jobs(self):
        for run in self.runs:
308
            if run.file_type == "fastq":
309
310
311
                self.align_gen.generate(run)
            self.remove_dup_gen.generate(run)
            self.peak_call_gen.generate(run)
312
            if run.idr is not None:
313
                self.idr_gen.generate(run)
314
315
        return

316
317
    def _create_setup(self):
        """
318
319
        Creates the base structure for job submission.  Everything is contained
        within a folder based on the current timestamp.
320
        """
321
322
        if not os.path.exists(self.base_path):
            os.makedirs(self.base_path)
323
        for folder in ["input", "input/db_meta"]:
324
325
            if not os.path.exists(os.path.join(self.base_path, folder)):
                os.makedirs(os.path.join(self.base_path, folder))
326
327
        return

328
    def _add_notify(self):
329
330
331
        """
        Add the script to email when the workflow is finished.
        """
332
333
334
335
336
337
338
339
340
341
        if self.email is not None:
            notify_path = os.path.join(self.base_path, "input/notify.sh")
            with open(notify_path, "w") as wh:
                notify = textwrap.dedent("""\
                        #!/bin/bash
                        pegasus-email -t %s --report=pegasus-analyzer
                """ % (self.email,))
                wh.write(notify)
                os.chmod(notify_path, 0755)
            self.dax.invoke(When.AT_END, notify_path)
342
343
        return

344
345
    def _create_pegasus_files(self):
        self._create_submit()
346
        self._create_dax()
347
348
        return

349
    def _create_submit(self):
350
        """
351
352
353
        Creates the pegasus submit script.  Just a wrapper to pegasus-plan
        with the correct arguments passed through.  Additionally creates
        some easy status/remove scripts for the workflow when submitted.
354
        """
355
        with open(os.path.join(self.base_path, "input/submit.sh"), "w") as wh:
356
357
358
            submit = textwrap.dedent("""\
                    #!/bin/bash
                    plan=`pegasus-plan \\
359
360
361
362
363
                    --conf %s \\
                    --sites %s \\
                    --output-site %s \\
                    --dir %s \\
                    --dax %s \\
364
                    --randomdir \\
365
                    """ % (
366
                        self.properties_file,
367
                        self.execute_site,
368
                        self.output_site,
369
                        os.path.join(self.base_path, "work"),
370
                        os.path.join(self.base_path, "input/chipathlon.dax")
371
                    ))
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
            submit += textwrap.dedent("""\
                    --submit`

                    status=`echo "$plan" | grep pegasus-status | tr -s ' '| cut -d ' ' -f 6`
                    echo -e "#!/bin/bash
                    pegasus-status -l $status" > status.sh
                    chmod 744 status.sh

                    remove=`echo "$plan" | grep pegasus-remove | tr -s ' '| cut -d ' ' -f 5`
                    echo -e "#!/bin/bash
                    pegasus-remove $remove" > remove.sh
                    chmod 744 remove.sh

                    echo "$plan"

                    """)
            wh.write(submit)
389
            os.chmod(os.path.join(self.base_path, "input/submit.sh"), 0755)
390
391
        return

392
    def _create_dax(self):
393
        with open(os.path.join(self.base_path, "input/chipathlon.dax"), "w") as wh:
394
395
            self.dax.writeXML(wh)
        return