workflow_module.py 20.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
import os
import argparse
import sys
import json
import datetime
import textwrap
import xml.dom.minidom
import yaml
import traceback
10
import re
11
import itertools
12
import chipathlon.conf
13
14
from pprint import pprint
from Pegasus.DAX3 import *
15

16

17
class WorkflowModule(object):
18
19
20
21
22
23
24
    """
    :param module_yaml: The yaml module definition file.
    :type module_yaml: str
    :param workflow_jobs: A dictionary mapping job name -> workflow job class.
    :type workflow_jobs: dict
    :param debug: A flag to print additional info.
    :type debug: bool
25
26
27
28
29

    The workflow module handles the inner workings of an individual module
    yaml file.  Provided with the correct workflow_jobs and a list of
    files, the workflow module creates all jobs in the correct order for
    the target module.
30
    """
31

32
    def __init__(self, module_yaml, workflow_jobs, debug=False):
33
        self.err = ""
34
        self.debug = debug
35
36
37
        try:
            with open(module_yaml, "r") as rh:
                self.data = yaml.load(rh)
38
39
40
        except:
            self.err += "Error parsing module template yaml file %s.\n" % (module_yaml, )
            self.err += traceback.format_exc()
41
            print self.err
42
        try:
43
            self.name = self.data.keys()[0]
44
            self.markers = {}
45
            self.order = []
46
            self.workflow = {}
47
            self.workflow_jobs = workflow_jobs
48
            self._load_markers()
49
            if self.markers:
50
                self._add_workflow_keys()
51
52
            else:
                self.workflow = []
53
54
            # We pass dict(self.markers) to create a copy
            self._load_jobs()
55
56
57
58
        except SystemExit as se:
            pass
        return

59
    def get_job_list(self, markers):
60
        """
61
        :param markers: The splits to take for the current module.
62
        :type markers: dict
63
64
        :returns: A list of all jobs that will be run for this workflow with
            the provided markers.
65
66
67
68
69

        This function returns the list of jobs that will be run provided
        the markers.  If markers is not specified, it simply returns the
        entire workflow, otherwise, it returns the appropriate jobs.
        """
70
        if markers:
71
            job_list = self.workflow
72
73
            for marker in self.order:
                if marker in markers:
74
75
                    job_list = job_list[markers[marker]]
            return job_list
76
        else:
77
            return self.workflow
78
79
        return

80
    def _add_workflow_keys(self, workflow=None, index=0):
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
        """
        :param workflow: The workflow to construct.
        :type workflow: dict
        :param index: The index of the marker in the order variable.
        :type index: int

        This function constructs the workflow dictionary with the
        correct markers as keys in a nested strucutre.  The workflow
        parameter must be passed in order for the function to run recursively
        successfully.  Using the align.yaml as an example, the following
        structure would be created:
        {
            "bwa": {
                "single": [],
                "paired": []
            },
            "bowtie2": {
                "single": [],
                "paired": []
            }
        }
        Jobs will then be added to this dictionary later for easy access when
        traversing.
        """
        workflow = self.workflow if workflow is None else workflow
106
        marker_key = self.order[index]
107
        for marker in self.markers[marker_key]:
108
            if index < len(self.order) - 1:
109
                workflow[marker] = {}
110
                self._add_workflow_keys(workflow[marker], int(index + 1))
111
112
113
114
            else:
                workflow[marker] = []
        return

115
    def _load_jobs(self, yaml_data=None, add_markers={}):
116
117
118
119
120
121
122
123
124
125
126
127
128
        """
        :param yaml_data: The workflow to load jobs into.
        :type yaml_data: list
        :param add_markers: The markers left to load.
        :type add_markers: dict

        After creating the structure of our workflow list, this function
        loads the actual jobs into their respective places.
        """
        yaml_data = self.data[self.name] if yaml_data is None else yaml_data
        add_markers = dict(self.markers) if not add_markers else add_markers
        # yaml_data will always be a list of dictionaries
        for item in yaml_data:
129
            for key in item.keys():
130
                # The individual key values will either be markers or jobs.
131
                if key in self.workflow_jobs:
132
133
134
                    # If the key is a job, we need to add the job for every
                    # possible marker depending on recursion depth.
                    for marker_values in itertools.product(*[marker[1] for marker in add_markers.iteritems()]):
135
                        wlist = self.get_job_list(dict(zip(self.order, marker_values)))
136
137
                        wlist.append(item)
                elif "[" in key and "]" in key:
138
                    # Split by either [ or ]
139
140
141
142
143
144
                    val, marker, dummy = re.split("[\[\]]", key)
                    new_add = dict(add_markers)
                    if marker in new_add:
                        new_add[marker] = []
                    if val not in new_add:
                        new_add[marker].append(val)
145
                    # Recurse with updated add_markers
146
                    self._load_jobs(item[key], new_add)
147
        return
148

149

150
    def _load_markers(self, yaml_data=None):
151
152
153
154
155
156
157
158
        """
        :param yaml_data: The yaml_data to load markers from.
        :type yaml_data: list

        Finds all possible workflow splitting points i.e. markers,
        and keeps track of all their values, and the order they were
        found in.
        """
159
160
        yaml_data = self.data[self.name] if yaml_data is None else yaml_data
        for item in yaml_data:
161
162
            for key in item.keys():
                if key in self.workflow_jobs:
163
                    pass
164
165
166
167
168
169
170
                elif "[" in key and "]" in key:
                    val, marker, dummy = re.split("[\[\]]", key)
                    if marker not in self.markers:
                        self.markers[marker] = []
                        self.order.append(marker)
                    if val not in self.markers[marker]:
                        self.markers[marker].append(val)
171
                    self._load_markers(item[key])
172
        return
173

174
    def get_job_params(self, markers, job_name):
175
176
177
178
179
        """
        :param markers: Input markers.
        :type markers: dict
        :param job_name: The name of the job to get params for.
        :type job_name: str
180
        :returns: A dictionary containing inputs and outputs.
181

182
        Returns the inputs and outputs defined for the
183
184
185
186
        specified job & marker combination from the module.  The values returned
        will be based on the names defined in the module template file, they
        will not be real file names.
        """
187
        job_list = self.get_job_list(markers)
188
189
190
191
192
        for job_dict in job_list:
            if job_name == job_dict.keys()[0]:
                return job_dict[job_name]
        return {}

193
    def get_job_names(self, markers):
194
195
196
        """
        :param markers: The input markers.
        :type markers: dict
197
198
        :returns: The names of the jobs that will be run for this module with
            the provided markers.
199
        """
200
        job_list = self.get_job_list(markers)
201
202
203
204
205
        job_names = []
        for job_dict in job_list:
            job_names.append(job_dict.keys()[0])
        return job_names

206
    def add_jobs(self, dax, master_jobs, master_files, markers, inputs, outputs):
207
        """
208
209
210
211
212
213
214
215
216
        :param dax: The dax to add jobs to.
        :type dax: Pegasus.DAX3.ADAG
        :param master_jobs: A dictionary mapping job names -> Pegasus job objects.
        :type master_jobs: dict
        :param master_files: A dictionary mapping file names -> File objects.
        :type master_files: dict
        :param markers: The splits to take for the current module.
        :type markers: dict
        :param inputs: A dictionary mapping the logical file names defined in
217
            the workflow module -> the full file name.
218
219
220
        :type inputs: dict
        :param outputs: A dictionary mapping the logical file names defined in
            the workflow module -> the full file name.
221
222
223
        :returns: None

        Adds all the jobs in the correct order for the current workflow.
224
        In this case, the inputs and outputs need to be
225
226
227
228
229
230
        passed in for all jobs in the workflow.  If you consider a small
        example of three jobs that get run back to back, with the output of
        each getting piped into the input of the next.  In this case, we only
        need to pass a single input in, because the inputs of the future jobs
        use the outputs of the previous jobs.  We need to pass in three outputs
        still for the outputs of each of the jobs.
231
        """
232
        valid, msg = self._check_params(master_files, markers, inputs, outputs)
233
        if valid:
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
            job_list = self.get_job_list(markers)
            for job_dict in job_list:
                job_name = job_dict.keys()[0]
                job_info = job_dict[job_name]
                # outputs is a dict!  Check the values not the keys.
                # output files get created before a job is added, check against master_jobs instead
                if any([output_file not in master_jobs for output_file in outputs.values()]):
                    job_inputs = self._setup_job_params(master_files, job_name, job_info, markers, "inputs", inputs, outputs)
                    job_outputs = self._setup_job_params(master_files, job_name, job_info, markers, "outputs", outputs, outputs)
                    job = self.workflow_jobs[job_name].create_job(job_inputs, job_outputs)
                    if job is not None:
                        dax.addJob(job)
                    else:
                        print "JOB ERROR for '%s'.\n" % (job_dict,)
                        raise SystemExit(1)
                    # NOW, WE add our job to the master_list, and check dependencies
                    # Python ALWAYS passes by reference, so even though
                    # we are a little redundant here, dependency checking
                    # will be easier later and not very costly
                    for logical_name, param_dict in job_outputs.iteritems():
                        master_jobs[param_dict["name"]] = job
                    for logical_name, param_dict in job_inputs.iteritems():
                        if param_dict["name"] in master_jobs:
257
258
259
260
                            dep = Dependency(child=job, parent=master_jobs[param_dict["name"]])
                            # Prevent duplicate dependencies from being created
                            if not dax.hasDependency(dep):
                                dax.addDependency(dep)
261
262
        else:
            print msg
263
        return
264

265
    def _check_params(self, master_files, markers, inputs, outputs):
266
        """
267
268
269
270
271
272
273
274
275
276
277
        :param master_files: Master file dictionary mapping file_name -> file_object.
        :type master_files: dict
        :param markers: Input markers.
        :type markers: dict
        :param inputs: All required inputs for the given markers.
        :type inputs: dict
        :param outputs: All required outputs for the given markers.
        :type outputs: dict

        Checks all input params to make sure they are specified, and all files
        exist in the master file dictionary.
278
        """
279
        valid_params = True
280
281
        valid_markers, msg = self._check_input_markers(markers)
        if valid_markers:
282
            required_inputs, required_outputs = self._load_required_params(master_files, markers)
283
284
            valid_inputs, input_msg = self._check_required_params(master_files, required_inputs, inputs, "inputs")
            valid_outputs, outputs_msg = self._check_required_params(master_files, required_outputs, outputs, "outputs")
285
            if not valid_inputs or not valid_outputs:
286
                valid_params = False
287
                msg = input_msg + outputs_msg
288
289
290
291
292
            else:
                msg = "Params for module '%s' with markers: %s validated successfully.\n" % (self.name, markers)
        else:
            valid_params = False
        return (valid_params, msg)
293

294
295
296
297
298
299
300
301
302
303
304
305
    def _check_input_markers(self, markers):
        """"
        :param markers: The input markers to validate.
        :type markers: dict

        Validates input markers.
        """
        valid = True
        msg = ""
        for input_marker in markers:
            if input_marker in self.markers:
                if markers[input_marker] not in self.markers[input_marker]:
306
307
308
                    msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (
                        input_marker, markers[input_marker], self.markers[marker]
                    )
309
310
311
312
313
314
                    valid = False
            else:
                msg += "Marker '%s' does not exist.\n" % (input_marker,)
                valid = False
        return (valid, msg)

315
    def _check_required_params(self, master_files, required_param_list, arg_params, arg_type = None):
316
317
318
319
320
321
322
323
324
325
326
        """
        :param master_files: Master file dictionary mapping file_name -> file_object.
        :type master_files: dict
        :param required_param_list: A list of dictionaries containing required arguments.
        :type required_param_list: list
        :param arg_params: A dictionary mapping param_name -> value.
        :type arg_params: dict
        :param arg_type: The type of argument being parsed, only used for error messages.
        :type arg_type: str.

        This function checks the required params against the passed ones.  This
327
        function should be called for both inputs and outputs.
328
329
330
        """
        valid = True
        msg = ""
331
        for param_dict in required_param_list:
332
333
334
335
336
            param_name = param_dict["name"]
            if param_name in arg_params:
                if param_dict["type"] in chipathlon.conf.argument_types["file"]:
                    if arg_params[param_name] not in master_files:
                        valid = False
337
338
339
                        msg += "Error loading '%s' jobs.  File with name: '%s', value: '%s' does not exist in master_files.\n" % (
                            self.name, param_name, arg_params[param_name]
                        )
340
341
            else:
                valid = False
342
                msg += "Error loading '%s' jobs.  Required param '%s' for argument type '%s' not defined.\n" % (self.name, param_name, arg_type)
343
344
345
346
347
348
349
350
351
        return (valid, msg)

    def _load_required_params(self, master_files, markers):
        """
        :param master_files: Master file dictionary mapping file_name -> file_object.
        :type master_files: dict
        :param markers: Input markers.
        :type markers: dict

352
353
354
        This function loads all required parameters (inputs and outputs) for
        the specified markers.  It returns a tuple corresponding to these
        required params.
355
        """
356
357
        inputs = []
        outputs = []
358
        job_list = self.get_job_list(markers)
359
360
361
        for job_dict in job_list:
            job_name = job_dict.keys()[0]
            job_info = job_dict[job_name]
362
            for param_type, param_list in zip(["inputs", "outputs"], [inputs, outputs]):
363
                if job_info[param_type] is not None:
364
365
                    for logical_name, param_values in job_info[param_type].iteritems():
                        param_name = param_values["param_name"]
366
367
                        # Make sure param is not a duplicate of a previous value
                        # and that param has not already been added as an output.
368
                        if logical_name not in [param["name"] for param in param_list] and logical_name not in [param["name"] for param in outputs]:
369
                            param_list.append({
370
                                "name": logical_name,
371
372
373
                                "type": self.workflow_jobs[job_name].get_param_info(param_name)["type"]
                            })
        return (inputs, outputs)
374

375
    def get_all_final_results(self, markers):
376
377
378
        """
        :param markers: The splits to take for the current workflow.
        :type markers: dict
379
        :returns: A list of all output files marked with final_result: True
380
381
382

        Gets all the final_results for the workflow with the target markers.
        """
383
384
385
386
387
        job_list = self.get_job_list(markers)
        final_results = []
        for i, job_dict in enumerate(job_list):
            job_name = job_dict.keys()[0]
            job_info = job_dict[job_name]
388
            if job_info.get("outputs"):
389
                for file_name, file_info in job_info["outputs"].iteritems():
390
391
392
393
394
395
                    if file_info.get("final_result"):
                        final_results.append({
                            "job_names": [job.keys()[0] for job in job_list[:i + 1]],
                            "file_info": file_info,
                            "file_name": file_name
                        })
396
397
        return final_results

398
    def _setup_job_params(self, master_files, job_name, job_info, markers, param_type, arg_params, outputs):
399
400
401
        """
        :param master_files: Master file dictionary mapping file_names -> file objects.
        :type master_files: dict
402
403
404
        :param job_name: The name of the job for accessing from self.workflow_jobs
        :type job_name: str
        :param job_info: The job information (inputs and outputs)
405
406
407
        :type job_info: dict
        :param markers: Input markers.
        :type markers: dict
408
        :param param_type: The param type to setup should be inputs, or outputs.
409
410
411
        :type param_type: str
        :param arg_params: A dictionary mapping param_name -> value.
        :type arg_params: dict
412
413
        :param outputs: All the outptus for the entire workflow module.
        :type outputs: list
414

415
        This function sets up the required params for a job to run
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
        successfully.  It converts all inputs from the format expected for the
        module into the format expected for the individual jobs.  The format
        expected by jobs is a dictionary with keys as the parameter names:

        .. code-block:: python

            {
                "file_param_name": {
                    "name": "FULL-FILE-NAME",
                    "file": PegasusFileObject,
                    "transfer": True/False
                },
                "arg_param_name": {
                    "name": "arg_param_name",
                    "value": ArgValue
                },
                "list_param_name": {
                    "name": "list_param_name",
                    "value": [
                        {
                            "name": "FULL-FILE-NAME"
                            "file": PegasusFileObject
                        },
                        ...
                    ]
                }
            }
443
        """
444
        param_dict = {}
445
        if job_info[param_type] is not None:
446
447
            for logical_name, param_info in job_info[param_type].iteritems():
                param_name = param_info["param_name"]
448
449
450
451
                wj_param = self.workflow_jobs[job_name].get_param_info(param_name)
                # param_info is the info contained in the module whereas
                # wj_param is the info contained in the workflow job yaml.
                if wj_param["type"] in chipathlon.conf.argument_types["file"]:
452
                    if logical_name in outputs:
453
454
                        # This particular file is an output from a previous step!
                        # Load its information accordingly.
455
                        param_dict[param_name] = {
456
457
                            "name": outputs[logical_name],
                            "file": master_files[outputs[logical_name]],
aknecht2's avatar
aknecht2 committed
458
                            "transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
459
                        }
460
                    else:
461
                        param_dict[param_name] = {
462
463
                            "name": arg_params[logical_name],
                            "file": master_files[arg_params[logical_name]],
464
465
466
467
468
                            "transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
                        }
                elif wj_param["type"] in chipathlon.conf.argument_types["list"]:
                    if wj_param["type"] == "file_list":
                        param_dict[param_name] = {
469
                            "name": param_name,
470
471
472
                            "value": [{
                                "name": val,
                                "file": master_files[val]
473
                            } for val in arg_params[logical_name]]
474
                        }
475
                else:
476
                    param_dict[param_name] = {
477
                        "name": param_name,
478
                        "value": arg_params[logical_name]
479
480
                    }
        return param_dict