workflow_job.py 26.3 KB
Newer Older
1
import yaml
2
import os
3
import sys
4
import re
5
from Pegasus.DAX3 import *
6
7
import chipathlon
import chipathlon.conf
8
from chipathlon.utils import is_number
9
from pprint import pprint
10

11

12
class WorkflowJob(object):
13
14
15
    """
    :param base_file: The yaml job definition file.
    :type base_file: str
16
17
    :param params: A dictionary of values for overwriting default arguments.
    :type params: dict
18
19
    :param debug: A debug flag to print out additional information.
    :type debug: bool
aknecht2's avatar
aknecht2 committed
20
21
22

    The WorkflowJob contains all infomration necessary to create a single job.
    Parameters and resources are loaded from a yaml file.
23
    """
24

25
26
    def __init__(self, base_file, params, debug=False):
        self.errors = []
27
        self.debug = debug
28
29
30
31
32
33
        self.raw_files = {}
        self.params = params if params is not None else {}
        self._load_base_file(base_file)
        self.validate()
        return

aknecht2's avatar
aknecht2 committed
34
35
36
    def __repr__(self):
        return "[job_name=%s, job_command=%s]" % (self.job_name, self.command)

37
38
39
40
41
42
    def _load_base_file(self, base_file):
        """
        Doesn't do much besides load in the yaml for the job definition.
        """
        if os.path.isfile(base_file):
            with open(base_file, "r") as rh:
43
                try:
44
45
46
47
                    base = yaml.load(rh)
                    self.job_name = base.keys()[0]
                    self.job_data = base[self.job_name]
                    self.command = self.job_data["command"]
48
                except yaml.YAMLError as ye:
49
                    self.errors.append("Error parsing job template yaml file '%s': %s." % (base_file, ye))
50
        else:
51
            self.errors.append("Job template file '%s' does not exist." % (base_file,))
52
53
        return

54
    def is_valid(self):
55
        """
56
        Checks if the run is valid... Pretty straightforward.
57
        """
58
        return len(self.errors) == 0
59

60
    def get_error_string(self):
61
        """
62
        Returns the errors as a newline separated string.
63
        """
64
        return "\n".join(self.errors)
65

66
    def validate(self):
67
        """
68
69
70
        1. Validate that the input yaml meets the base standard.
        2. Validate the resources meet the base standard.
        3. Ensure all arguments are valid and reference existing inputs / outputs.
71
        """
72
73
74
        for validation_step in [self._validate_base_yaml, self._validate_files, self._validate_resources, self._validate_arguments]:
            if self.is_valid():
                validation_step()
75
76
        return

77
    def _check_keys(self, data, required_keys, optional_keys, error_prefix="Key Error"):
78
        """
79
80
81
82
83
84
        :param data: The dictionary to validate
        :type data: dict
        :param required_keys: A list of required keys for the dictionary.
        :type required_keys: list
        :param optional_keys: A list of optional keys that can be in the dictionary.
        :type optional_keys: list
85
86
87
88
89
90
        :param error_prefix: A prefix to add to error messages.
        :type error_prefix: str

        Checks that all required keys are specified in the dictionary.
        Checks that all keys in the dictionary are defined in either
        required or optional keys.
91
        """
92
        valid = True
93
94
95
        all_keys = required_keys + optional_keys
        for key in required_keys:
            if key not in data:
96
97
                self.errors.append("[Error parsing job %s] %s: Required key '%s' not defined." % (self, error_prefix, key))
                valid = False
98
99
        for key in data:
            if key not in all_keys:
100
101
102
                self.errors.append("[Error parsing job %s] %s: Specified key '%s' is not valid." % (self, error_prefix, key))
                valid = False
        return valid
103
104

    def _validate_base_yaml(self):
105
106
107
108
        """
        Pretty straightforward... Just validating the structure of the job
        yaml file.
        """
109
        self._check_keys(self.job_data, chipathlon.conf.job_keys["required"], chipathlon.conf.job_keys["optional"], "Base Yaml Error")
110
111
112
        if self.is_valid():
            self.valid_inputs = self.job_data["inputs"].keys()
            self.valid_outputs = self.job_data["outputs"].keys()
113
114
115
        return

    def _validate_files(self):
116
        """
117
        Validates all the files in the inputs /  outputs
118
119
120
        list for the current job.  Ensures that the params have appropriate
        definitions.
        """
121
122
123
        for file_dict_type in ["inputs", "outputs"]:
            if self.job_data[file_dict_type] is not None:
                for param_name, file_dict in self.job_data[file_dict_type].iteritems():
124
125
                    self._check_keys(
                        file_dict,
126
127
128
                        chipathlon.conf.job_inout_keys["required"],
                        chipathlon.conf.job_inout_keys["optional"],
                        file_dict_type
129
                    )
130
131
                    # Validate file types for inputs / outputs that are files
                    if "type" in file_dict and file_dict["type"] in chipathlon.conf.argument_types["file"]:
132
133
                        if "file_type" in file_dict:
                            if not file_dict["file_type"] in chipathlon.conf.file_extensions:
134
135
                                self.errors.append("[Error parsing job %s]: Param '%s' has invalid file_type: '%s'.  \
                                    Should be one of : %s." % (self, param_name, file_dict["file_type"], chipathlon.conf.file_extensions))
136
                        else:
137
                            self.errors.append("[Error parsing job %s]: Param '%s' is a file, and must have a 'file_type' specified." % (self, param_name))
138
139

    def _validate_resources(self):
140
        """
141
142
        Validate memory/walltime/cores/nodes values supplied in the params file
        are present and numeric.
143
        """
144
145
146
147
        for resource_type in chipathlon.conf.resources.keys():
            # Check that default value provided exists / is valid
            if resource_type in self.job_data:
                if not is_number(self.job_data[resource_type]):
148
                    self.errors.append("[Error parsing job %s] Resource specification for '%s' must be numeric." % (self, resource_type))
149
            else:
150
                self.errors.append("[Error parsing job %s] Resource specification for '%s' is missing." % (self, resource_type))
151
152
153
            # Check if provided param values exist / are valid
            if resource_type in self.params:
                if not is_number(self.params[resource_type]):
154
                    self.errors.append("[Error parsing job %s] Resource specification for '%s' passed through params must be numeric." % (self, resource_type))
155
156
        return

157
    def _validate_arguments(self):
158
        """
159
160
161
162
163
164
        Loop through each argument & validate it, a few things happen here:
        1. Verify the structure of the arguments.  For the full list of required
            and optional keys see chipathlon.conf.argument_keys
        2. Verify the value of the arguments.  Make sure file references are
            valid and file types exist.
        3. Verify that passed in arguments from the params file are valid.
165
        """
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
        all_args = []
        for arg_dict in self.job_data["arguments"]:
            arg_name = arg_dict.keys()[0]
            all_args.append(arg_name)
            arg_info = arg_dict[arg_name]
            # Check that arguments have the correct definition
            if self._check_keys(arg_info, chipathlon.conf.argument_keys["required"], chipathlon.conf.argument_keys["optional"], "Argument '%s' Error" % (arg_name,)):
                valid_arg, msg = self._is_valid_arg(arg_name, arg_info)
                if valid_arg:
                    # If the argument is a rawfile, add it to the raw_file list for use in the DAX
                    # This is for files that are on disk and need to be staged
                    if arg_info["type"] == "rawfile":
                        arg_value = self._get_arg_value(arg_name, arg_info)
                        self.raw_files[os.path.basename(arg_value)] = {"path": arg_value}
                    # Sometimes we have a folder full of files we need to include,
                    # in this case, use the rawfolder argument to stage all files
                    # within the folder.
                    elif arg_info["type"] == "rawfolder":
                        arg_value = self._get_arg_value(arg_name, arg_info)
                        for root, dirs, files in os.walk(arg_value):
                            for f in files:
                                self.raw_files[f] = {"path": "%s/%s" % (arg_value, f)}
                            break
                else:
                    self.errors.append(msg)
        if self.params.get("arguments") is not None:
            for arg_name in self.params["arguments"]:
                if arg_name not in all_args:
                    self.errors.append("[Error parsing job %s] Argument '%s' specified in params does not exist." % (self, arg_name))
195
        return
196
197
198

    def _is_valid_arg(self, arg_name, arg_info):
        """
199
200
201
202
        :param arg_name: Name of the argument.
        :type arg_name: str
        :param arg_info: Argument descriptors.
        :type arg_info: dict
203

204
205
        Checks if an argument is valid.  Argument descriptors have the following
        implication rules:
206
207
        changeable & has_value & no default => must be in params
        not changeable & has_value => required and default
208
        not changeable & not_required => Optional argument, but not passed in from params
209
210
211
        """
        # First we check our implication rules
        if arg_info["changeable"]:
212
213
            if arg_info["required"] and "default" not in arg_info and (self.params.get("arguments") is None or arg_name not in self.params["arguments"]):
                return (False, "[Error parsing %s job file] Argument '%s' is changeable and has no default.  Argument must be supplied through params." % (self.job_name, arg_name))
214
215
        elif arg_info["required"]:
            if arg_info["has_value"] and "default" not in arg_info:
216
                return (False, "[Error parsing %s job file] Argument '%s' is unchangeable and required but has no default." % (self.job_name, arg_name))
217
218
219
220
221
        # If all our implication rules pass, we check the values
        return self._has_valid_arg_value(arg_name, arg_info)

    def _has_valid_arg_value(self, arg_name, arg_info):
        """
222
223
224
225
226
227
        :param arg_name: Name of the argument.
        :type arg_name: str
        :param arg_info: Argument descriptors.
        :type arg_info: dict

        Validates the given argument value.
228

229
        If it's numeric make sure the value is appropriate.
230
        If it's a rawfile make sure the input path exists / validate file_type.
231
232
233
234
        If it's a file make sure it references a file list.
        If it references a file list make sure it's a valid index.  Since all
        input and outputs are in list format, make sure the index is within
        the size of the list.
235
236
        """
        arg_value = self._get_arg_value(arg_name, arg_info)
237
        if arg_info["type"] == "numeric" and not is_number(arg_value):
238
            return (False, "[Error parsing job %s]: Argument '%s' with value '%s' must be numeric." % (self, arg_name, arg_value))
239
        elif arg_info["type"] in chipathlon.conf.argument_types["file"]:
240
            # If the argument is a rawfile, validate it's extension & existance
241
            if arg_info["type"] == "rawfile":
242
243
244
245
246
247
248
249
250
                if os.path.isfile(arg_value):
                    if not arg_value.endswith(tuple(chipathlon.conf.file_extensions[arg_info["file_type"]])):
                        return (False, "[Error parsing job %s] Argument '%s' with file path '%s' is not of file type '%s'. \
                                    Should match one of these extensions: %s." % (
                                        self,
                                        arg_name,
                                        arg_value,
                                        arg_info["file_type"],
                                        chipathlon.conf.file_extensions[arg_info["file_type"]]
251
252
253
                                    )
                                )
                else:
254
255
256
257
258
259
260
261
262
                    return (False, "[Error parsing job %s]: Argument '%s' is a rawfile, however the specified path '%s' does not exist. " % (self, arg_name, arg_value))
            # If the argument is a 'regular' file, we need to make sure that it
            # references one of the keys of the inputs / outputs
            elif not arg_value.startswith("$"):
                return (False, "[Error parsing job %s]: Argument '%s' has value '%s'.  File references must start with a '$'." % (self, arg_name, arg_value))

        if arg_value.startswith("$"):
            if not any([arg_value[1:] == ref for ref in (self.valid_inputs + self.valid_outputs)]):
                return (False, "[Error parsing job %s]: Argument '%s' has reference '%s'.  No such input / output exists." % (self, arg_name, arg_value))
263
264
        return (True, None)

265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    def _get_arg_value(self, arg_name, arg_info):
        """
        :param arg_name: Name of the argument.
        :type arg_name: str
        :param arg_info: Argument descriptors.
        :type arg_info: dict

        Get the value of the given argument depending on the argument
        descriptors.  Return value is based on the descriptors:
        changeable = false & has_value = false -> arg_name
        changeable = false & has_value = true -> default
        changeable = true & param = true -> param
        changeable = true & param = false -> default
        """
        if arg_info["changeable"]:
            if self.params.get("arguments") is not None and arg_name in self.params["arguments"]:
                return self.params["arguments"][arg_name]
            elif "path" in arg_info:
                return self._get_path(arg_info["path"]) + arg_info["default"]
            else:
                return arg_info["default"]
        else:
            if arg_info["has_value"]:
                return arg_info["default"]
            elif "path" in arg_info:
                return self._get_path(arg_info["path"]) + arg_name
            else:
                return arg_name
        return

    def _get_path(self, dir_name):
        """
        Helper function to get the correct path to a resource file.
        Used for default file arguments, like the config files provided
        to gem and ccat.
        """
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), "jobs", dir_name) + "/"

303
304
305
306
307
308
309
310
311
312
    def add_executable(self, executable):
        """
        :param executable: Executable to use for the job.
        :type executable: Pegasus.DAX3.Executable

        Setter function for job executable.
        """
        self.executable = executable
        return

313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
    def get_db_arguments(self):
        """
        Returns a dictionary of arguments to be inserted into the database.
        The dictionary will be arg_name -> arg_value entries.
        """
        arguments = {}
        for arg_dict in self.job_data["arguments"]:
            # root key is actual argument name
            arg_name = arg_dict.keys()[0]
            arg_info = arg_dict[arg_name]
            # We only want to show changeable args to the end user
            if arg_info["changeable"] and arg_info["has_value"]:
                arguments[arg_name] = self._get_arg_value(arg_name, arg_info)
        return arguments

328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
    def _is_param_required(self, param_name):
        """
        :param param_name: The input/output key to test.
        :type param_name: str

        Check to see if a particular input/output is required.  An input/output
        is required iff it is interpolated in the arguments section by a
        non-changeable required argument.
        """
        for arg_dict in self.job_data["arguments"]:
            # root key is actual argument name
            arg_name = arg_dict.keys()[0]
            arg_info = arg_dict[arg_name]
            if arg_name == "$%s" % (param_name,) or "$%s" % (param_name,) in arg_info["default"]:
                return arg_info["required"] and not arg_info["changeable"]
        return False

    def _params_are_valid(self, inputs, outputs):
346
        """
347
        :param inputs: Input params for the job.
348
        :type inputs: dcit
349
        :param outputs: Output params for the job.
350
        :type outputs: dcit
351

352
353
354
        Validate the passed arguments & files.  The keys of the inputs / outputs
        passed in should match the job definition, and files should have
        correctly matching extension.
355
        """
356
        for param_type, passed_param_dict in zip(["inputs", "outputs"], [inputs, outputs]):
357
            # In theory there could be a job that doesn't need inputs...
358
            if self.job_data[param_type] is not None:
359
                # Loop through all required params to make sure they are provided
360
                for param_name, base_param_info in self.job_data[param_type].iteritems():
361
                    if self._is_param_required(param_name):
362
363
                        if param_name in passed_param_dict:
                            if base_param_info["type"] in chipathlon.conf.argument_types["file"]:
364
365
                                # We need to check the FULL file name of the passed in file to
                                # validate the extensions correctly.
366
                                if not passed_param_dict[param_name]["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])):
367
368
369
                                    self.errors.append("Job creation error %s: File %s specified for param %s is not of \
                                        type %s.  Should match one of these extensions: %s." % (
                                            self,
370
                                            passed_param_dict[param_name]["name"],
371
                                            param_name,
372
373
                                            base_param_info["file_type"],
                                            chipathlon.conf.file_extensions[base_param_info["file_type"]]
374
375
376
377
378
379
                                        )
                                    )
                        else:
                            self.errors.append("Job creation error %s: %s is a required %s paramater, but was not passed in." \
                                % (self, param_name, param_type))
                # Now we loop through all passed in params to make sure they are valid
380
                for param_name, passed_param_info in passed_param_dict.iteritems():
381
                    if param_name in self.job_data[param_type]:
382
                        base_param_info = self.job_data[param_type][param_name]
383
384
385
                        if self.job_data[param_type][param_name]["type"] in chipathlon.conf.argument_types["file"]:
                            # We need to check the FULL file name of the passed in file to
                            # validate the extensions correctly.
386
                            if not passed_param_info["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])):
387
388
389
390
391
                                self.errors.append("Job creation error %s: File %s specified for param %s is not of \
                                    type %s.  Should match one of these extensions: %s." % (
                                        self,
                                        param,
                                        param_name,
392
                                        base_param_info["file_type"],
393
394
395
396
397
                                        chipathlon.conf.file_extensions[param_info["file_type"]]
                                    )
                                )
                    else:
                        self.errors.append("Job creation error %s: Specified param %s does not exist in job." % (self, param_name))
398
        return self.is_valid()
399

400
    def create_job(self, inputs, outputs):
401
        """
402
403
404
405
        :param inputs: Input params for the job.
        :type inputs: list
        :param outputs: Output params for the job.
        :type outputs: list
aknecht2's avatar
aknecht2 committed
406
        :returns: The job created or None if unsuccessful
407
408

        Creates an actual Pegasus.Dax3.Job instance from the provided
409
410
        information.  Inputs & outputs should be dictionaries with keys that
        correspond to the param information in the workflow_job yaml file.
411
412
413
414
415

        **inputs**

        .. code-block:: python

416
417
418
            {
                "param-name": {
                    "name": "full-file-name",
419
                    "type": "file",
420
                    "file": pegasus-file-object
421
                },
422
423
                "param-name": {
                    "name": "argument-name",
424
                    "type": "argument",
425
                    "value": argument-value
426
                },
427
428
429
430
431
432
                "param-name": {
                    "name": "full-file-name",
                    "type": "file_list",
                    "value": [

                    ]
433
                ...
434
            }
435
436
437

        .. code-block:: python

438
439
440
            {
                "param-name": {
                    "name": "full-file-name",
441
442
                    "type": "file",
                    "transfer": True/False,
443
                    "file": pegasus-file-object
444
445
                },
                ...
446
            }
447
        """
448
        if self._params_are_valid(inputs,outputs):
449
            job = Job(self.executable)
450
451
452
            for param_name, param_info in inputs.iteritems():
                if param_info["type"] == "file_list":
                    for f in param_info["value"]:
453
                        job.uses(f["file"], link=Link.INPUT)
454
455
456
457
                elif param_info["type"] in chipathlon.conf.argument_types["file"]:
                    job.uses(param_info["file"], link=Link.INPUT)
            for output_name, output_info in outputs.iteritems():
                job.uses(output_info["file"], link=Link.OUTPUT, transfer=output_info["transfer"])
458
                # Redirect stdout / stderr
459
                if output_info["type"] == "stdout":
460
                    job.setStdout(output_file["name"])
461
                elif output_info["type"] == "stderr":
462
                    job.setStderr(output_file["name"])
463
464
            arg_list = self._create_arg_list(inputs, outputs)
            print "Created arg_list: %s" % (arg_list,)
465
            job.addArguments(*self._create_arg_list(inputs, outputs))
466
            self._add_job_resources(job)
aknecht2's avatar
aknecht2 committed
467
            return job
468
        else:
469
            print self.get_error_string()
470
            return None
471
        return None
472

473
474
    def _add_job_resources(self, job):
        """
475
476
477
478
479
        :param job: The job instance to add resources to.
        :type job: Pegasus.Dax3.Job

        Add the job's resources (memory, walltime, etc.) via Pegasus
        Profiles.
480
481
482
483
484
        """
        for resource_type in chipathlon.conf.resources.keys():
            try:
                resource_value = self.params[resource_type]
            except KeyError:
485
                resource_value = self.job_data[resource_type]
486
487
488
            ns = chipathlon.conf.resources[resource_type]["namespace"]
            key = chipathlon.conf.resources[resource_type]["key"]
            job.profile(ns, key, resource_value)
489
        return
490

491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
    def _interpolate_value(self, inputs, outputs, arg_name, arg_info):
        """
        :param inputs: Input params for the job.
        :type inputs: dict
        :param outputs: Output params for the job.
        :type outputs: dict
        :param arg_name: The name of the arg to convert.
        :type arg_name: str
        :param arg_info: Argument info from the yaml.
        :type arg_info: dict

        This helper method converts an argument into its proper value, meaning
        it can convert a reference string like "$target_file" into an actual
        Pegasus File instance, or interpolate the proper arguments that are
        passed in.
        """
        arg_value = self._get_arg_value(arg_name, arg_info)
        add_value = arg_value
        if arg_value.startswith("$"):
            arg_value = arg_value[1:]
            if arg_info["type"] == "rawfile":
                add_value = self.raw_files[os.path.basename(arg_value)]["file"]
            elif arg_info["type"] == "file_list":
                # Lists can only be loaded from inputs
                add_value = []
                for file_dict in inputs[arg_value]["values"]:
                    add_value.append(file_dict["file"])
            elif arg_info["type"] in chipathlon.conf.argument_types["file"]:
                add_value = (inputs if arg_value in inputs else outputs)[arg_value]["file"]
            else:
                add_value = inputs[arg_value]["value"]
        return add_value

524
525
526
    def _create_arg_list(self, inputs, outputs):
        """
        :param inputs: Input params for the job.
527
        :type inputs: dict
528
        :param outputs: Output params for the job.
529
        :type outputs: dict
530

531
532
        Inputs / outputs should be dictionaries like the ones described in
        the create_job documentation.
533

534
535
536
537
        This function creates the necessary argument list for pegasus.
        This includes all arguments the executable needs to run,
        and the correct file objects passed in the correct order.
        To add arguments to the pegasus job use: job.addArguments(*arg_list)
538
        """
539
        arg_list = []
540
541
        # Java tools need to have -Xmx specified...
        if self.executable in chipathlon.conf.java_tools:
542
            arg_list.append("-Xmx%sM" % (self.params.get("memory", self.job_data["memory"])))
543
        # Go through each argument
544
        for arg_dict in self.job_data["arguments"]:
545
            arg_name = arg_dict.keys()[0]
546
            arg_info = arg_dict[arg_name]
547
548
549
550
551
552
553
            # Need to figure out 2 things:
            # 1. Should we add the argument name?
            # 2. What's the correct value to add?
            add_value = self._interpolate_value(inputs, outputs, arg_name, arg_info)
            # Now we actually add the argument
            if arg_info["has_value"]:
                # If it's a file we want to add the actual Pegasus.File instance
554
                if isinstance(add_value, File):
555
                    arg_list.append(arg_name)
556
                    arg_list.append(add_value)
557
                # If it's a list we want to add each of the Pegasus.File instances
aknecht2's avatar
aknecht2 committed
558
559
560
561
                elif isinstance(add_value, list):
                    arg_list.append(arg_name)
                    for f in add_value:
                        arg_list.append(f)
562
                # Otherwise, add stuff as a string
563
                else:
564
                    arg_list.append("%s %s" % (arg_name, add_value))
565
            else:
566
                arg_list.append(add_value)
567
        return arg_list