yaml_job.py 5.14 KB
Newer Older
1
import yaml
2
3
import traceback
import sys
4
from Pegasus.DAX3 import *
5
6
7
import chipathlon
import chipathlon.conf
from pprint import pprint
8
9
10

class YamlJob(object):

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    def __init__(self, base_yaml, param_file):
        self.err = ""
        try:
            with open(chipathlon.conf.job_params + "/" + base_yaml + ".yaml", "r") as rh:
                self.base = yaml.load(rh)
            self.jobname = self.base.keys()[0]
        except:
            self.err += "Error parsing job template yaml file.\n"
            self.err += traceback.format_exc()
        try:
            with open(param_file, "r") as rh:
                params = yaml.load(rh)
                self.params = params[self.jobname]
        except:
            self.err += "Error parsing params yaml file.\n"
            self.err += traceback.format_exc()
        self.validate()
28
29
        return

30
31
32
    def valid(self):
        return False if self.err else True

33
34
    def validate(self):
        # Maybe unnecessary abstraction?
35
36
37
38
39
40
41
42
43
44
45
46
47
        if self.valid():
            self._validate_input()
            if self.valid():
                self._validate_arguments()
        return

    def _validate_input(self):
        for key in chipathlon.conf.param_keys["required"]:
            if key not in self.params:
                self.err += "Required key '%s' not defined for job '%s'.\n" % (key, self.jobname)
        for key in self.params:
            if key not in (chipathlon.conf.param_keys["required"] + chipathlon.conf.param_keys["optional"]):
                self.err += "Key '%s' does not exist for job '%s'.\n" % (key, self.jobname)
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
        return

    def _validate_arguments(self):
        """
        Here, we loop through each argument in the base_yaml file,
        then, we ensure that changeable arguments aren't defined in
        the params, and that required arguments with no default
        ARE defined in the params.  Then, we loop through the list of
        supplied args in the params file, and ensure there are no
        'bad' entries
        """
        valid_args = []
        for argd in self.base[self.jobname]["arguments"]:
            arg = argd.keys()[0]
            arg_info = argd[arg]
63
64
65
66
67
            if self.params["arguments"] is not None:
                if arg[:1] != "$":
                    valid_args.append(arg)
                if not arg_info["changeable"] and arg in self.params["arguments"]:
                    self.err += "Unchangeable argument '%s' specified in params file.\n" % (arg,)
68
69
70
            if arg_info["required"] and arg_info["changeable"] and "default" not in arg_info:
                if not arg in self.params["arguments"]:
                    self.err += "Required argument '%s' does not have a default, and is not defined in params file.\n" % (arg,)
71
72
73
74
75

        if self.params["arguments"] is not None:
            for arg in self.params["arguments"]:
                if arg not in valid_args:
                    self.err += "Argument '%s' specified in params file does not exist.\n" % (arg,)
76
77
78
79
80
81
82
83
84
85
86
87
88
89
        return

    def check_file_names(self, file_names):
        valid_files = True
        if (len(file_names) == len(self.base[self.jobname]["inputs"])):
            for i,f in enumerate(file_names):
                if not f.split(".")[1] in chipathlon.conf.file_extensions[self.base[self.jobname]["inputs"][i]]:
                    self.err += "File '%s' is not of type '%s'.  Should match one of '%s' extensions.\n" % (f, self.base[self.jobname]["inputs"][i], chipathlon.conf.file_extensions[self.base[self.jobname]["inputs"][i]])
                    valid_files = False
        else:
            self.err += "Number of file_names '%s' must match number of expected input files.\n" % (file_names,)
            valid_files = False
        return valid_files

90

91
92
93
94
95
96
    def create_arg_list(self, file_names, lfns):
        """
        Create the necessary argument list for pegasus.
        To add arguments to the pegasus job use:
        job.addArgumnets(*arg_list)
        """
97
        if self.check_file_names(file_names):
98
99
100
101
102
103
104
            arg_list = []
            curr_input = 0
            for argd in self.base[self.jobname]["arguments"]:
                arg = argd.keys()[0]
                arg_info = argd[arg]
                if arg_info["changeable"] and arg_info["value"]:
                    if arg in self.params["arguments"]:
105
                        arg_list.append("%s %s" % (arg, self.params["arguments"][arg]))
106
                    else:
107
                        arg_list.append("%s %s" % (arg, arg_info["value"]))
108
109
110
111
112
113
114
                else:
                    if arg_info["value"]:
                        if arg_info["value"][:1] == "$":
                            arg_list.append(arg)
                            arg_list.append(lfns[curr_input])
                            curr_input += 1
                        else:
115
                            arg_list.append("%s %s" % (arg, arg_info["value"]))
116
117
118
119
120
121
122
123
124
                    else:
                        if arg[:1] == "$":
                            arg_list.append(lfns[curr_input])
                            curr_input += 1
                        else:
                            arg_list.append(arg)
            return arg_list
        else:
            print self.err
125
        return