workflow_job.py 29 KB
Newer Older
1
import yaml
2
import os
3
import sys
4
import re
5
from Pegasus.DAX3 import *
6
7
import chipathlon
import chipathlon.conf
8
from chipathlon.utils import is_number
9
from pprint import pprint
10

11

12
class WorkflowJob(object):
13
14
15
    """
    :param base_file: The yaml job definition file.
    :type base_file: str
16
17
    :param params: A dictionary of values for overwriting default arguments.
    :type params: dict
18
19
    :param debug: A debug flag to print out additional information.
    :type debug: bool
aknecht2's avatar
aknecht2 committed
20
21
22

    The WorkflowJob contains all infomration necessary to create a single job.
    Parameters and resources are loaded from a yaml file.
23
    """
24

25
26
    def __init__(self, base_file, params, debug=False):
        self.errors = []
27
        self.debug = debug
28
29
30
31
32
33
        self.raw_files = {}
        self.params = params if params is not None else {}
        self._load_base_file(base_file)
        self.validate()
        return

aknecht2's avatar
aknecht2 committed
34
35
36
    def __repr__(self):
        return "[job_name=%s, job_command=%s]" % (self.job_name, self.command)

37
38
39
40
41
42
    def _load_base_file(self, base_file):
        """
        Doesn't do much besides load in the yaml for the job definition.
        """
        if os.path.isfile(base_file):
            with open(base_file, "r") as rh:
43
                try:
44
45
46
47
                    base = yaml.load(rh)
                    self.job_name = base.keys()[0]
                    self.job_data = base[self.job_name]
                    self.command = self.job_data["command"]
48
                except yaml.YAMLError as ye:
49
                    self.errors.append("Error parsing job template yaml file '%s': %s." % (base_file, ye))
50
        else:
51
            self.errors.append("Job template file '%s' does not exist." % (base_file,))
52
53
        return

54
    def is_valid(self):
55
        """
56
        Checks if the run is valid... Pretty straightforward.
57
        """
58
        return len(self.errors) == 0
59

60
    def get_error_string(self):
61
        """
62
        Returns the errors as a newline separated string.
63
        """
64
        return "\n".join(self.errors)
65

66
    def validate(self):
67
        """
68
69
70
        1. Validate that the input yaml meets the base standard.
        2. Validate the resources meet the base standard.
        3. Ensure all arguments are valid and reference existing inputs / outputs.
71
        """
72
73
74
        for validation_step in [self._validate_base_yaml, self._validate_files, self._validate_resources, self._validate_arguments]:
            if self.is_valid():
                validation_step()
75
76
        return

77
    def _check_keys(self, data, required_keys, optional_keys, error_prefix="Key Error"):
78
        """
79
80
81
82
83
84
        :param data: The dictionary to validate
        :type data: dict
        :param required_keys: A list of required keys for the dictionary.
        :type required_keys: list
        :param optional_keys: A list of optional keys that can be in the dictionary.
        :type optional_keys: list
85
86
87
88
89
90
        :param error_prefix: A prefix to add to error messages.
        :type error_prefix: str

        Checks that all required keys are specified in the dictionary.
        Checks that all keys in the dictionary are defined in either
        required or optional keys.
91
        """
92
        valid = True
93
94
95
        all_keys = required_keys + optional_keys
        for key in required_keys:
            if key not in data:
96
97
                self.errors.append("[Error parsing job %s] %s: Required key '%s' not defined." % (self, error_prefix, key))
                valid = False
98
99
        for key in data:
            if key not in all_keys:
100
101
102
                self.errors.append("[Error parsing job %s] %s: Specified key '%s' is not valid." % (self, error_prefix, key))
                valid = False
        return valid
103
104

    def _validate_base_yaml(self):
105
106
107
108
        """
        Pretty straightforward... Just validating the structure of the job
        yaml file.
        """
109
        self._check_keys(self.job_data, chipathlon.conf.job_keys["required"], chipathlon.conf.job_keys["optional"], "Base Yaml Error")
110
        if self.is_valid():
111
112
            self.valid_inputs = self.job_data["inputs"].keys() if self.job_data["inputs"] is not None else []
            self.valid_outputs = self.job_data["outputs"].keys() if self.job_data["outputs"] is not None else []
113
114
115
        return

    def _validate_files(self):
116
        """
117
        Validates all the files in the inputs /  outputs
118
119
120
        list for the current job.  Ensures that the params have appropriate
        definitions.
        """
121
122
123
        for file_dict_type in ["inputs", "outputs"]:
            if self.job_data[file_dict_type] is not None:
                for param_name, file_dict in self.job_data[file_dict_type].iteritems():
124
125
                    self._check_keys(
                        file_dict,
126
127
128
                        chipathlon.conf.job_inout_keys["required"],
                        chipathlon.conf.job_inout_keys["optional"],
                        file_dict_type
129
                    )
130
131
                    # Validate file types for inputs / outputs that are files
                    if "type" in file_dict and file_dict["type"] in chipathlon.conf.argument_types["file"]:
132
133
                        if "file_type" in file_dict:
                            if not file_dict["file_type"] in chipathlon.conf.file_extensions:
134
135
                                self.errors.append("[Error parsing job %s]: Param '%s' has invalid file_type: '%s'.  \
                                    Should be one of : %s." % (self, param_name, file_dict["file_type"], chipathlon.conf.file_extensions))
136
                        else:
137
                            self.errors.append("[Error parsing job %s]: Param '%s' is a file, and must have a 'file_type' specified." % (self, param_name))
138
139

    def _validate_resources(self):
140
        """
141
142
        Validate memory/walltime/cores/nodes values supplied in the params file
        are present and numeric.
143
        """
144
145
146
147
        for resource_type in chipathlon.conf.resources.keys():
            # Check that default value provided exists / is valid
            if resource_type in self.job_data:
                if not is_number(self.job_data[resource_type]):
148
                    self.errors.append("[Error parsing job %s] Resource specification for '%s' must be numeric." % (self, resource_type))
149
            else:
150
                self.errors.append("[Error parsing job %s] Resource specification for '%s' is missing." % (self, resource_type))
151
152
153
            # Check if provided param values exist / are valid
            if resource_type in self.params:
                if not is_number(self.params[resource_type]):
154
                    self.errors.append("[Error parsing job %s] Resource specification for '%s' passed through params must be numeric." % (self, resource_type))
155
156
        return

157
    def _validate_arguments(self):
158
        """
159
160
161
162
163
164
        Loop through each argument & validate it, a few things happen here:
        1. Verify the structure of the arguments.  For the full list of required
            and optional keys see chipathlon.conf.argument_keys
        2. Verify the value of the arguments.  Make sure file references are
            valid and file types exist.
        3. Verify that passed in arguments from the params file are valid.
165
        """
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
        all_args = []
        for arg_dict in self.job_data["arguments"]:
            arg_name = arg_dict.keys()[0]
            all_args.append(arg_name)
            arg_info = arg_dict[arg_name]
            # Check that arguments have the correct definition
            if self._check_keys(arg_info, chipathlon.conf.argument_keys["required"], chipathlon.conf.argument_keys["optional"], "Argument '%s' Error" % (arg_name,)):
                valid_arg, msg = self._is_valid_arg(arg_name, arg_info)
                if valid_arg:
                    # If the argument is a rawfile, add it to the raw_file list for use in the DAX
                    # This is for files that are on disk and need to be staged
                    if arg_info["type"] == "rawfile":
                        arg_value = self._get_arg_value(arg_name, arg_info)
                        self.raw_files[os.path.basename(arg_value)] = {"path": arg_value}
                    # Sometimes we have a folder full of files we need to include,
                    # in this case, use the rawfolder argument to stage all files
                    # within the folder.
                    elif arg_info["type"] == "rawfolder":
                        arg_value = self._get_arg_value(arg_name, arg_info)
185
                        dir_name = os.path.basename(os.path.dirname(arg_value + "/"))
186
187
                        for root, dirs, files in os.walk(arg_value):
                            for f in files:
188
                                self.raw_files[dir_name + "/" + f] = {"path": "%s/%s" % (arg_value, f)}
189
190
191
192
193
194
195
                            break
                else:
                    self.errors.append(msg)
        if self.params.get("arguments") is not None:
            for arg_name in self.params["arguments"]:
                if arg_name not in all_args:
                    self.errors.append("[Error parsing job %s] Argument '%s' specified in params does not exist." % (self, arg_name))
196
        return
197
198
199

    def _is_valid_arg(self, arg_name, arg_info):
        """
200
201
202
203
        :param arg_name: Name of the argument.
        :type arg_name: str
        :param arg_info: Argument descriptors.
        :type arg_info: dict
204

205
206
        Checks if an argument is valid.  Argument descriptors have the following
        implication rules:
207
208
        changeable & has_value & no default => must be in params
        not changeable & has_value => required and default
209
        not changeable & not_required => Optional argument, but not passed in from params
210
211
212
        """
        # First we check our implication rules
        if arg_info["changeable"]:
213
214
            if arg_info["required"] and "default" not in arg_info and (self.params.get("arguments") is None or arg_name not in self.params["arguments"]):
                return (False, "[Error parsing %s job file] Argument '%s' is changeable and has no default.  Argument must be supplied through params." % (self.job_name, arg_name))
215
216
        elif arg_info["required"]:
            if arg_info["has_value"] and "default" not in arg_info:
217
                return (False, "[Error parsing %s job file] Argument '%s' is unchangeable and required but has no default." % (self.job_name, arg_name))
218
219
220
221
222
        # If all our implication rules pass, we check the values
        return self._has_valid_arg_value(arg_name, arg_info)

    def _has_valid_arg_value(self, arg_name, arg_info):
        """
223
224
225
226
227
228
        :param arg_name: Name of the argument.
        :type arg_name: str
        :param arg_info: Argument descriptors.
        :type arg_info: dict

        Validates the given argument value.
229

230
        If it's numeric make sure the value is appropriate.
231
        If it's a rawfile make sure the input path exists / validate file_type.
232
233
234
235
        If it's a file make sure it references a file list.
        If it references a file list make sure it's a valid index.  Since all
        input and outputs are in list format, make sure the index is within
        the size of the list.
236
237
        """
        arg_value = self._get_arg_value(arg_name, arg_info)
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        if arg_value is not None:
            if arg_info["type"] == "numeric":
                # We need this as two seperate if statements otherwise we try
                # to interpolate numeric arguments
                if not is_number(arg_value):
                    return (False, "[Error parsing job %s]: Argument '%s' with value '%s' must be numeric." % (self, arg_name, arg_value))
            elif arg_info["type"] in chipathlon.conf.argument_types["file"]:
                # If the argument is a rawfile, validate it's extension & existance
                if arg_info["type"] == "rawfile":
                    if os.path.isfile(arg_value):
                        if not arg_value.endswith(tuple(chipathlon.conf.file_extensions[arg_info["file_type"]])):
                            return (False, "[Error parsing job %s] Argument '%s' with file path '%s' is not of file type '%s'. \
                                        Should match one of these extensions: %s." % (
                                            self,
                                            arg_name,
                                            arg_value,
                                            arg_info["file_type"],
                                            chipathlon.conf.file_extensions[arg_info["file_type"]]
                                        )
257
                                    )
258
259
260
261
262
263
264
265
266
                    else:
                        return (False, "[Error parsing job %s]: Argument '%s' is a rawfile, however the specified path '%s' does not exist. " % (self, arg_name, arg_value))
                # If the argument is a 'regular' file, we need to make sure that it
                # references one of the keys of the inputs / outputs
                elif not arg_value.startswith("$"):
                    return (False, "[Error parsing job %s]: Argument '%s' has value '%s'.  File references must start with a '$'." % (self, arg_name, arg_value))
            elif isinstance(arg_value, str) and arg_value.startswith("$"):
                if not any([str(arg_value)[1:] == ref for ref in (self.valid_inputs + self.valid_outputs)]):
                    return (False, "[Error parsing job %s]: Argument '%s' has reference '%s'.  No such input / output exists." % (self, arg_name, arg_value))
267
268
        return (True, None)

269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
    def _get_arg_value(self, arg_name, arg_info):
        """
        :param arg_name: Name of the argument.
        :type arg_name: str
        :param arg_info: Argument descriptors.
        :type arg_info: dict

        Get the value of the given argument depending on the argument
        descriptors.  Return value is based on the descriptors:
        changeable = false & has_value = false -> arg_name
        changeable = false & has_value = true -> default
        changeable = true & param = true -> param
        changeable = true & param = false -> default
        """
        if arg_info["changeable"]:
            if self.params.get("arguments") is not None and arg_name in self.params["arguments"]:
                return self.params["arguments"][arg_name]
            elif "path" in arg_info:
                return self._get_path(arg_info["path"]) + arg_info["default"]
            else:
289
                return arg_info.get("default")
290
291
        else:
            if arg_info["has_value"]:
292
                return arg_info.get("default")
293
294
295
296
297
298
299
300
301
302
303
304
305
306
            elif "path" in arg_info:
                return self._get_path(arg_info["path"]) + arg_name
            else:
                return arg_name
        return

    def _get_path(self, dir_name):
        """
        Helper function to get the correct path to a resource file.
        Used for default file arguments, like the config files provided
        to gem and ccat.
        """
        return os.path.join(os.path.dirname(os.path.realpath(__file__)), "jobs", dir_name) + "/"

307
308
309
310
311
312
313
314
315
316
    def add_executable(self, executable):
        """
        :param executable: Executable to use for the job.
        :type executable: Pegasus.DAX3.Executable

        Setter function for job executable.
        """
        self.executable = executable
        return

317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
    def get_db_arguments(self):
        """
        Returns a dictionary of arguments to be inserted into the database.
        The dictionary will be arg_name -> arg_value entries.
        """
        arguments = {}
        for arg_dict in self.job_data["arguments"]:
            # root key is actual argument name
            arg_name = arg_dict.keys()[0]
            arg_info = arg_dict[arg_name]
            # We only want to show changeable args to the end user
            if arg_info["changeable"] and arg_info["has_value"]:
                arguments[arg_name] = self._get_arg_value(arg_name, arg_info)
        return arguments

332
333
334
335
336
337
338
339
340
341
342
343
344
    def _is_param_required(self, param_name):
        """
        :param param_name: The input/output key to test.
        :type param_name: str

        Check to see if a particular input/output is required.  An input/output
        is required iff it is interpolated in the arguments section by a
        non-changeable required argument.
        """
        for arg_dict in self.job_data["arguments"]:
            # root key is actual argument name
            arg_name = arg_dict.keys()[0]
            arg_info = arg_dict[arg_name]
345
346
            check_val = "$%s" % (param_name,)
            if arg_info["type"] in chipathlon.conf.argument_types["file"] and (arg_name == check_val or ("default" in arg_info and check_val in arg_info["default"])):
347
348
349
350
                return arg_info["required"] and not arg_info["changeable"]
        return False

    def _params_are_valid(self, inputs, outputs):
351
        """
352
        :param inputs: Input params for the job.
353
        :type inputs: dcit
354
        :param outputs: Output params for the job.
355
        :type outputs: dcit
356

357
358
359
        Validate the passed arguments & files.  The keys of the inputs / outputs
        passed in should match the job definition, and files should have
        correctly matching extension.
360
        """
361
        for param_type, passed_param_dict in zip(["inputs", "outputs"], [inputs, outputs]):
362
            # In theory there could be a job that doesn't need inputs...
363
            if self.job_data[param_type] is not None:
364
                # Loop through all required params to make sure they are provided
365
                for param_name, base_param_info in self.job_data[param_type].iteritems():
366
                    if self._is_param_required(param_name):
367
368
                        if param_name in passed_param_dict:
                            if base_param_info["type"] in chipathlon.conf.argument_types["file"]:
369
370
                                # We need to check the FULL file name of the passed in file to
                                # validate the extensions correctly.
371
                                if not passed_param_dict[param_name]["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])):
372
373
374
                                    self.errors.append("Job creation error %s: File %s specified for param %s is not of \
                                        type %s.  Should match one of these extensions: %s." % (
                                            self,
375
                                            passed_param_dict[param_name]["name"],
376
                                            param_name,
377
378
                                            base_param_info["file_type"],
                                            chipathlon.conf.file_extensions[base_param_info["file_type"]]
379
380
381
382
383
384
                                        )
                                    )
                        else:
                            self.errors.append("Job creation error %s: %s is a required %s paramater, but was not passed in." \
                                % (self, param_name, param_type))
                # Now we loop through all passed in params to make sure they are valid
385
                for param_name, passed_param_info in passed_param_dict.iteritems():
386
                    if param_name in self.job_data[param_type]:
387
                        base_param_info = self.job_data[param_type][param_name]
388
389
390
                        if self.job_data[param_type][param_name]["type"] in chipathlon.conf.argument_types["file"]:
                            # We need to check the FULL file name of the passed in file to
                            # validate the extensions correctly.
391
                            if not passed_param_info["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])):
392
393
394
                                self.errors.append("Job creation error %s: File %s specified for param %s is not of \
                                    type %s.  Should match one of these extensions: %s." % (
                                        self,
395
                                        passed_param_info["name"],
396
                                        param_name,
397
                                        base_param_info["file_type"],
398
                                        chipathlon.conf.file_extensions[base_param_info["file_type"]]
399
400
401
402
                                    )
                                )
                    else:
                        self.errors.append("Job creation error %s: Specified param %s does not exist in job." % (self, param_name))
403
        return self.is_valid()
404

405
    def create_job(self, inputs, outputs):
406
        """
407
408
409
410
        :param inputs: Input params for the job.
        :type inputs: list
        :param outputs: Output params for the job.
        :type outputs: list
aknecht2's avatar
aknecht2 committed
411
        :returns: The job created or None if unsuccessful
412
413

        Creates an actual Pegasus.Dax3.Job instance from the provided
414
415
        information.  Inputs & outputs should be dictionaries with keys that
        correspond to the param information in the workflow_job yaml file.
416
417
418
419
420

        **inputs**

        .. code-block:: python

421
422
423
            {
                "param-name": {
                    "name": "full-file-name",
424
                    "type": "file",
425
                    "file": pegasus-file-object
426
                },
427
428
                "param-name": {
                    "name": "argument-name",
429
                    "type": "argument",
430
                    "value": argument-value
431
                },
432
433
434
435
                "param-name": {
                    "name": "full-file-name",
                    "type": "file_list",
                    "value": [
436

437
                    ]
438
                ...
439
            }
440
441
442

        .. code-block:: python

443
444
445
            {
                "param-name": {
                    "name": "full-file-name",
446
447
                    "type": "file",
                    "transfer": True/False,
448
                    "file": pegasus-file-object
449
450
                },
                ...
451
            }
452
        """
453
        if self._params_are_valid(inputs, outputs):
454
            job = Job(self.executable)
455
            for file_name, file_info in self.raw_files.iteritems():
456
                job.uses(file_info["file"], link=Link.INPUT)
457
            for param_name, param_info in inputs.iteritems():
458
459
                wj_param = self.get_param_info(param_name)
                if wj_param["type"] == "file_list":
460
                    for f in param_info["value"]:
461
                        job.uses(f["file"], link=Link.INPUT)
462
                elif wj_param["type"] in chipathlon.conf.argument_types["file"]:
463
464
                    job.uses(param_info["file"], link=Link.INPUT)
            for output_name, output_info in outputs.iteritems():
465
                wj_param = self.get_param_info(output_name)
466
                job.uses(output_info["file"], link=Link.OUTPUT, transfer=output_info["transfer"])
467
                # Redirect stdout / stderr
468
                if wj_param["type"] == "stdout":
469
                    job.setStdout(output_info["name"])
470
                elif wj_param["type"] == "stderr":
471
                    job.setStderr(output_info["name"])
472
            arg_list = self._create_arg_list(inputs, outputs)
473
            job.addArguments(*self._create_arg_list(inputs, outputs))
474
            self._add_job_resources(job)
aknecht2's avatar
aknecht2 committed
475
            return job
476
        else:
477
            print self.get_error_string()
478
            return None
479
        return None
480

481
482
    def _add_job_resources(self, job):
        """
483
484
485
486
487
        :param job: The job instance to add resources to.
        :type job: Pegasus.Dax3.Job

        Add the job's resources (memory, walltime, etc.) via Pegasus
        Profiles.
488
489
490
491
492
        """
        for resource_type in chipathlon.conf.resources.keys():
            try:
                resource_value = self.params[resource_type]
            except KeyError:
493
                resource_value = self.job_data[resource_type]
494
495
496
            ns = chipathlon.conf.resources[resource_type]["namespace"]
            key = chipathlon.conf.resources[resource_type]["key"]
            job.profile(ns, key, resource_value)
497
        return
498

499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
    def _interpolate_value(self, inputs, outputs, arg_name, arg_info):
        """
        :param inputs: Input params for the job.
        :type inputs: dict
        :param outputs: Output params for the job.
        :type outputs: dict
        :param arg_name: The name of the arg to convert.
        :type arg_name: str
        :param arg_info: Argument info from the yaml.
        :type arg_info: dict

        This helper method converts an argument into its proper value, meaning
        it can convert a reference string like "$target_file" into an actual
        Pegasus File instance, or interpolate the proper arguments that are
        passed in.
        """
        arg_value = self._get_arg_value(arg_name, arg_info)
516
517
        if self.debug:
            print "arg_value: %s" % (arg_value,),
518
        add_value = arg_value
519
        if (isinstance(arg_value, str) and arg_value.startswith("$")) or arg_info["type"] == "rawfile":
520
521
522
523
            arg_value = arg_value[1:]
            if arg_info["type"] == "rawfile":
                add_value = self.raw_files[os.path.basename(arg_value)]["file"]
            else:
524
525
526
527
528
529
530
531
                # Conditionally load from inputs / outputs
                # This will only load the dict of information though, not
                # the actual value we want
                add_value = (inputs if arg_value in inputs else outputs).get(arg_value)
                # If the value is None we wan't to avoid adding anything
                if add_value is not None:
                    if arg_info["type"] in chipathlon.conf.argument_types["file"]:
                        # Want the actual Pegasus file obj
532
                        add_value = add_value.get("file").name
533
534
535
                    else:
                        # Get the actual argumnet value
                        add_value = add_value.get("value")
536
537
538
        elif arg_info["type"] == "rawfolder":
            # We want just the folder name
            add_value = os.path.basename(os.path.dirname(arg_value + "/"))
539
540
541
542
543
        elif arg_info["type"] == "file_list":
            add_value = []
            for val in arg_value:
                file_name = val[1:]
                add_value.append((inputs if file_name in inputs else outputs)[file_name]["file"].name)
544
545
        return add_value

546
547
548
    def _create_arg_list(self, inputs, outputs):
        """
        :param inputs: Input params for the job.
549
        :type inputs: dict
550
        :param outputs: Output params for the job.
551
        :type outputs: dict
552

553
554
        Inputs / outputs should be dictionaries like the ones described in
        the create_job documentation.
555

556
557
558
559
        This function creates the necessary argument list for pegasus.
        This includes all arguments the executable needs to run,
        and the correct file objects passed in the correct order.
        To add arguments to the pegasus job use: job.addArguments(*arg_list)
560
        """
561
        arg_list = []
562
        # Java tools need to have -Xmx specified...
aknecht2's avatar
aknecht2 committed
563
        if self.job_data["command"] in chipathlon.conf.java_tools:
564
            arg_list.append("-Xmx%sM" % (self.params.get("memory", self.job_data["memory"])))
565
        # Go through each argument
566
        for arg_dict in self.job_data["arguments"]:
567
            arg_name = arg_dict.keys()[0]
568
            arg_info = arg_dict[arg_name]
569
570
571
            # Need to figure out 2 things:
            # 1. Should we add the argument name?
            # 2. What's the correct value to add?
572
            if self.debug:
573
                print "\t%s: Loading argument: %s, info: %s, " % (self, arg_name, arg_info),
574
            add_value = self._interpolate_value(inputs, outputs, arg_name, arg_info)
575
576
            if self.debug:
                print "Final value: %s" % (add_value)
577
578
579
580
581
582
583
584
585
586
587
            # Only add arguments that have a value
            if add_value is not None:
                # Need to add in the arg_name and the arg_value
                if arg_info["has_value"]:
                    # If it's a file we want to add the actual Pegasus.File instance
                    if isinstance(add_value, File):
                        arg_list.append(arg_name)
                        arg_list.append(add_value)
                    # If it's a list we want to add each of the Pegasus.File instances
                    elif isinstance(add_value, list):
                        arg_list.append(arg_name)
588
589
590
591
592
                        if "separator" in arg_info:
                            arg_list.append(arg_info["separator"].join(add_value))
                        else:
                            for f in add_value:
                                arg_list.append(f)
593
                    # Otherwise, add stuff as a string
aknecht2's avatar
aknecht2 committed
594
                    else:
595
                        arg_list.append("%s %s" % (arg_name, add_value))
596
                else:
597
                    arg_list.append(add_value)
aknecht2's avatar
aknecht2 committed
598
        return arg_list
599
600
601
602
603
604

    def get_param_info(self, param_name):
        """
        :param param_name: The name of the input/output to get information for.
        :type param_name: str

605
606
        Returns the information associated with a particular input/output as a
        dictionary: the type & file_type if applicable.
607
        """
608
609
610
611
612
        for param_type in ["inputs", "outputs"]:
            if self.job_data[param_type] is not None:
                if param_name in self.job_data[param_type]:
                    return self.job_data[param_type][param_name]
        return None