workflow_module.py 20.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
import os
import argparse
import sys
import json
import datetime
import textwrap
import xml.dom.minidom
import yaml
import traceback
10
import re
11
import itertools
12
import chipathlon.conf
13
14
from pprint import pprint
from Pegasus.DAX3 import *
15

16

17
class WorkflowModule(object):
18
19
20
21
22
23
24
    """
    :param module_yaml: The yaml module definition file.
    :type module_yaml: str
    :param workflow_jobs: A dictionary mapping job name -> workflow job class.
    :type workflow_jobs: dict
    :param debug: A flag to print additional info.
    :type debug: bool
25
26
27
28
29

    The workflow module handles the inner workings of an individual module
    yaml file.  Provided with the correct workflow_jobs and a list of
    files, the workflow module creates all jobs in the correct order for
    the target module.
30
    """
31

32
    def __init__(self, module_yaml, workflow_jobs, debug=False):
33
        self.err = ""
34
        self.debug = debug
35
36
37
        try:
            with open(module_yaml, "r") as rh:
                self.data = yaml.load(rh)
38
39
40
        except:
            self.err += "Error parsing module template yaml file %s.\n" % (module_yaml, )
            self.err += traceback.format_exc()
41
            print self.err
42
        try:
43
            self.name = self.data.keys()[0]
44
            self.markers = {}
45
            self.order = []
46
            self.workflow = {}
47
            self.workflow_jobs = workflow_jobs
48
            self._load_markers()
49
            if self.markers:
50
                self._add_workflow_keys()
51
52
            else:
                self.workflow = []
53
54
            # We pass dict(self.markers) to create a copy
            self._load_jobs()
55
56
57
58
        except SystemExit as se:
            pass
        return

59
    def get_job_list(self, markers):
60
        """
61
        :param markers: The splits to take for the current module.
62
        :type markers: dict
63
64
        :returns: A list of all jobs that will be run for this workflow with
            the provided markers.
65
66
67
68
69

        This function returns the list of jobs that will be run provided
        the markers.  If markers is not specified, it simply returns the
        entire workflow, otherwise, it returns the appropriate jobs.
        """
70
        if markers:
71
            job_list = self.workflow
72
73
            for marker in self.order:
                if marker in markers:
74
75
                    job_list = job_list[markers[marker]]
            return job_list
76
        else:
77
            return self.workflow
78
79
        return

80
    def _add_workflow_keys(self, workflow=None, index=0):
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
        """
        :param workflow: The workflow to construct.
        :type workflow: dict
        :param index: The index of the marker in the order variable.
        :type index: int

        This function constructs the workflow dictionary with the
        correct markers as keys in a nested strucutre.  The workflow
        parameter must be passed in order for the function to run recursively
        successfully.  Using the align.yaml as an example, the following
        structure would be created:
        {
            "bwa": {
                "single": [],
                "paired": []
            },
            "bowtie2": {
                "single": [],
                "paired": []
            }
        }
        Jobs will then be added to this dictionary later for easy access when
        traversing.
        """
        workflow = self.workflow if workflow is None else workflow
106
        marker_key = self.order[index]
107
        for marker in self.markers[marker_key]:
108
            if index < len(self.order) - 1:
109
                workflow[marker] = {}
110
                self._add_workflow_keys(workflow[marker], int(index + 1))
111
112
113
114
            else:
                workflow[marker] = []
        return

115
    def _load_jobs(self, yaml_data=None, add_markers={}):
116
117
118
119
120
121
122
123
124
125
126
127
128
        """
        :param yaml_data: The workflow to load jobs into.
        :type yaml_data: list
        :param add_markers: The markers left to load.
        :type add_markers: dict

        After creating the structure of our workflow list, this function
        loads the actual jobs into their respective places.
        """
        yaml_data = self.data[self.name] if yaml_data is None else yaml_data
        add_markers = dict(self.markers) if not add_markers else add_markers
        # yaml_data will always be a list of dictionaries
        for item in yaml_data:
129
            for key in item.keys():
130
                # The individual key values will either be markers or jobs.
131
                if key in self.workflow_jobs:
132
133
134
                    # If the key is a job, we need to add the job for every
                    # possible marker depending on recursion depth.
                    for marker_values in itertools.product(*[marker[1] for marker in add_markers.iteritems()]):
135
                        wlist = self.get_job_list(dict(zip(self.order, marker_values)))
136
137
                        wlist.append(item)
                elif "[" in key and "]" in key:
138
                    # Split by either [ or ]
139
140
141
142
143
144
                    val, marker, dummy = re.split("[\[\]]", key)
                    new_add = dict(add_markers)
                    if marker in new_add:
                        new_add[marker] = []
                    if val not in new_add:
                        new_add[marker].append(val)
145
                    # Recurse with updated add_markers
146
                    self._load_jobs(item[key], new_add)
147
        return
148

149

150
    def _load_markers(self, yaml_data=None):
151
152
153
154
155
156
157
158
        """
        :param yaml_data: The yaml_data to load markers from.
        :type yaml_data: list

        Finds all possible workflow splitting points i.e. markers,
        and keeps track of all their values, and the order they were
        found in.
        """
159
160
        yaml_data = self.data[self.name] if yaml_data is None else yaml_data
        for item in yaml_data:
161
162
            for key in item.keys():
                if key in self.workflow_jobs:
163
                    pass
164
165
166
167
168
169
170
                elif "[" in key and "]" in key:
                    val, marker, dummy = re.split("[\[\]]", key)
                    if marker not in self.markers:
                        self.markers[marker] = []
                        self.order.append(marker)
                    if val not in self.markers[marker]:
                        self.markers[marker].append(val)
171
                    self._load_markers(item[key])
172
        return
173

174
    def get_job_params(self, markers, job_name):
175
176
177
178
179
        """
        :param markers: Input markers.
        :type markers: dict
        :param job_name: The name of the job to get params for.
        :type job_name: str
180
        :returns: A dictionary containing inputs and outputs.
181

182
        Returns the inputs and outputs defined for the
183
184
185
186
        specified job & marker combination from the module.  The values returned
        will be based on the names defined in the module template file, they
        will not be real file names.
        """
187
        job_list = self.get_job_list(markers)
188
189
190
191
192
        for job_dict in job_list:
            if job_name == job_dict.keys()[0]:
                return job_dict[job_name]
        return {}

193
    def get_job_names(self, markers):
194
195
196
        """
        :param markers: The input markers.
        :type markers: dict
197
198
        :returns: The names of the jobs that will be run for this module with
            the provided markers.
199
        """
200
        job_list = self.get_job_list(markers)
201
202
203
204
205
        job_names = []
        for job_dict in job_list:
            job_names.append(job_dict.keys()[0])
        return job_names

206
    def add_jobs(self, dax, master_jobs, master_files, markers, inputs, outputs):
207
        """
208
209
210
211
212
213
214
215
216
        :param dax: The dax to add jobs to.
        :type dax: Pegasus.DAX3.ADAG
        :param master_jobs: A dictionary mapping job names -> Pegasus job objects.
        :type master_jobs: dict
        :param master_files: A dictionary mapping file names -> File objects.
        :type master_files: dict
        :param markers: The splits to take for the current module.
        :type markers: dict
        :param inputs: A dictionary mapping the logical file names defined in
217
            the workflow module -> the full file name.
218
219
220
        :type inputs: dict
        :param outputs: A dictionary mapping the logical file names defined in
            the workflow module -> the full file name.
221
222
223
        :returns: None

        Adds all the jobs in the correct order for the current workflow.
224
        In this case, the inputs and outputs need to be
225
226
227
228
229
230
        passed in for all jobs in the workflow.  If you consider a small
        example of three jobs that get run back to back, with the output of
        each getting piped into the input of the next.  In this case, we only
        need to pass a single input in, because the inputs of the future jobs
        use the outputs of the previous jobs.  We need to pass in three outputs
        still for the outputs of each of the jobs.
231
        """
232
        valid, msg = self._check_params(master_files, markers, inputs, outputs)
233
        if valid:
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
            job_list = self.get_job_list(markers)
            for job_dict in job_list:
                job_name = job_dict.keys()[0]
                job_info = job_dict[job_name]
                # outputs is a dict!  Check the values not the keys.
                # output files get created before a job is added, check against master_jobs instead
                if any([output_file not in master_jobs for output_file in outputs.values()]):
                    job_inputs = self._setup_job_params(master_files, job_name, job_info, markers, "inputs", inputs, outputs)
                    job_outputs = self._setup_job_params(master_files, job_name, job_info, markers, "outputs", outputs, outputs)
                    job = self.workflow_jobs[job_name].create_job(job_inputs, job_outputs)
                    if job is not None:
                        dax.addJob(job)
                    else:
                        print "JOB ERROR for '%s'.\n" % (job_dict,)
                        raise SystemExit(1)
                    # NOW, WE add our job to the master_list, and check dependencies
                    # Python ALWAYS passes by reference, so even though
                    # we are a little redundant here, dependency checking
                    # will be easier later and not very costly
                    for logical_name, param_dict in job_outputs.iteritems():
                        master_jobs[param_dict["name"]] = job
                    for logical_name, param_dict in job_inputs.iteritems():
                        if param_dict["name"] in master_jobs:
                            dax.depends(child=job, parent=master_jobs[param_dict["name"]])
258
259
        else:
            print msg
260
        return
261

262
    def _check_params(self, master_files, markers, inputs, outputs):
263
        """
264
265
266
267
268
269
270
271
272
273
274
        :param master_files: Master file dictionary mapping file_name -> file_object.
        :type master_files: dict
        :param markers: Input markers.
        :type markers: dict
        :param inputs: All required inputs for the given markers.
        :type inputs: dict
        :param outputs: All required outputs for the given markers.
        :type outputs: dict

        Checks all input params to make sure they are specified, and all files
        exist in the master file dictionary.
275
        """
276
        valid_params = True
277
278
        valid_markers, msg = self._check_input_markers(markers)
        if valid_markers:
279
            required_inputs, required_outputs = self._load_required_params(master_files, markers)
280
281
            valid_inputs, input_msg = self._check_required_params(master_files, required_inputs, inputs, "inputs")
            valid_outputs, outputs_msg = self._check_required_params(master_files, required_outputs, outputs, "outputs")
282
            if not valid_inputs or not valid_outputs:
283
                valid_params = False
284
                msg = input_msg + outputs_msg
285
286
287
288
289
            else:
                msg = "Params for module '%s' with markers: %s validated successfully.\n" % (self.name, markers)
        else:
            valid_params = False
        return (valid_params, msg)
290

291
292
293
294
295
296
297
298
299
300
301
302
    def _check_input_markers(self, markers):
        """"
        :param markers: The input markers to validate.
        :type markers: dict

        Validates input markers.
        """
        valid = True
        msg = ""
        for input_marker in markers:
            if input_marker in self.markers:
                if markers[input_marker] not in self.markers[input_marker]:
303
304
305
                    msg += "Marker '%s' does not have value '%s', should be one of %s.\n" % (
                        input_marker, markers[input_marker], self.markers[marker]
                    )
306
307
308
309
310
311
                    valid = False
            else:
                msg += "Marker '%s' does not exist.\n" % (input_marker,)
                valid = False
        return (valid, msg)

312
    def _check_required_params(self, master_files, required_param_list, arg_params, arg_type = None):
313
314
315
316
317
318
319
320
321
322
323
        """
        :param master_files: Master file dictionary mapping file_name -> file_object.
        :type master_files: dict
        :param required_param_list: A list of dictionaries containing required arguments.
        :type required_param_list: list
        :param arg_params: A dictionary mapping param_name -> value.
        :type arg_params: dict
        :param arg_type: The type of argument being parsed, only used for error messages.
        :type arg_type: str.

        This function checks the required params against the passed ones.  This
324
        function should be called for both inputs and outputs.
325
326
327
        """
        valid = True
        msg = ""
328
        for param_dict in required_param_list:
329
330
331
332
333
            param_name = param_dict["name"]
            if param_name in arg_params:
                if param_dict["type"] in chipathlon.conf.argument_types["file"]:
                    if arg_params[param_name] not in master_files:
                        valid = False
334
335
336
                        msg += "Error loading '%s' jobs.  File with name: '%s', value: '%s' does not exist in master_files.\n" % (
                            self.name, param_name, arg_params[param_name]
                        )
337
338
            else:
                valid = False
339
                msg += "Error loading '%s' jobs.  Required param '%s' for argument type '%s' not defined.\n" % (self.name, param_name, arg_type)
340
341
342
343
344
345
346
347
348
        return (valid, msg)

    def _load_required_params(self, master_files, markers):
        """
        :param master_files: Master file dictionary mapping file_name -> file_object.
        :type master_files: dict
        :param markers: Input markers.
        :type markers: dict

349
350
351
        This function loads all required parameters (inputs and outputs) for
        the specified markers.  It returns a tuple corresponding to these
        required params.
352
        """
353
354
        inputs = []
        outputs = []
355
        job_list = self.get_job_list(markers)
356
357
358
        for job_dict in job_list:
            job_name = job_dict.keys()[0]
            job_info = job_dict[job_name]
359
            for param_type, param_list in zip(["inputs", "outputs"], [inputs, outputs]):
360
                if job_info[param_type] is not None:
361
362
                    for logical_name, param_values in job_info[param_type].iteritems():
                        param_name = param_values["param_name"]
363
364
                        # Make sure param is not a duplicate of a previous value
                        # and that param has not already been added as an output.
365
                        if logical_name not in [param["name"] for param in param_list] and logical_name not in [param["name"] for param in outputs]:
366
                            param_list.append({
367
                                "name": logical_name,
368
369
370
                                "type": self.workflow_jobs[job_name].get_param_info(param_name)["type"]
                            })
        return (inputs, outputs)
371

372
    def get_all_final_results(self, markers):
373
374
375
        """
        :param markers: The splits to take for the current workflow.
        :type markers: dict
376
        :returns: A list of all output files marked with final_result: True
377
378
379

        Gets all the final_results for the workflow with the target markers.
        """
380
381
382
383
384
        job_list = self.get_job_list(markers)
        final_results = []
        for i, job_dict in enumerate(job_list):
            job_name = job_dict.keys()[0]
            job_info = job_dict[job_name]
385
            if job_info.get("outputs"):
386
                for file_name, file_info in job_info["outputs"].iteritems():
387
388
389
390
391
392
                    if file_info.get("final_result"):
                        final_results.append({
                            "job_names": [job.keys()[0] for job in job_list[:i + 1]],
                            "file_info": file_info,
                            "file_name": file_name
                        })
393
394
        return final_results

395
    def _setup_job_params(self, master_files, job_name, job_info, markers, param_type, arg_params, outputs):
396
397
398
        """
        :param master_files: Master file dictionary mapping file_names -> file objects.
        :type master_files: dict
399
400
401
        :param job_name: The name of the job for accessing from self.workflow_jobs
        :type job_name: str
        :param job_info: The job information (inputs and outputs)
402
403
404
        :type job_info: dict
        :param markers: Input markers.
        :type markers: dict
405
        :param param_type: The param type to setup should be inputs, or outputs.
406
407
408
        :type param_type: str
        :param arg_params: A dictionary mapping param_name -> value.
        :type arg_params: dict
409
410
        :param outputs: All the outptus for the entire workflow module.
        :type outputs: list
411

412
        This function sets up the required params for a job to run
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
        successfully.  It converts all inputs from the format expected for the
        module into the format expected for the individual jobs.  The format
        expected by jobs is a dictionary with keys as the parameter names:

        .. code-block:: python

            {
                "file_param_name": {
                    "name": "FULL-FILE-NAME",
                    "file": PegasusFileObject,
                    "transfer": True/False
                },
                "arg_param_name": {
                    "name": "arg_param_name",
                    "value": ArgValue
                },
                "list_param_name": {
                    "name": "list_param_name",
                    "value": [
                        {
                            "name": "FULL-FILE-NAME"
                            "file": PegasusFileObject
                        },
                        ...
                    ]
                }
            }
440
        """
441
        param_dict = {}
442
        if job_info[param_type] is not None:
443
444
            for logical_name, param_info in job_info[param_type].iteritems():
                param_name = param_info["param_name"]
445
446
447
448
                wj_param = self.workflow_jobs[job_name].get_param_info(param_name)
                # param_info is the info contained in the module whereas
                # wj_param is the info contained in the workflow job yaml.
                if wj_param["type"] in chipathlon.conf.argument_types["file"]:
449
                    if logical_name in outputs:
450
451
                        # This particular file is an output from a previous step!
                        # Load its information accordingly.
452
                        param_dict[param_name] = {
453
454
                            "name": outputs[logical_name],
                            "file": master_files[outputs[logical_name]],
aknecht2's avatar
aknecht2 committed
455
                            "transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
456
                        }
457
                    else:
458
                        param_dict[param_name] = {
459
460
                            "name": arg_params[logical_name],
                            "file": master_files[arg_params[logical_name]],
461
462
463
464
465
                            "transfer": param_info.get("transfer", False) or param_info.get("final_result", False)
                        }
                elif wj_param["type"] in chipathlon.conf.argument_types["list"]:
                    if wj_param["type"] == "file_list":
                        param_dict[param_name] = {
466
                            "name": param_name,
467
468
469
                            "value": [{
                                "name": val,
                                "file": master_files[val]
470
                            } for val in arg_params[logical_name]]
471
                        }
472
                else:
473
                    param_dict[param_name] = {
474
                        "name": param_name,
475
                        "value": arg_params[logical_name]
476
477
                    }
        return param_dict