pegasus_config.py 7.49 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import yaml
import os
import xml.dom.minidom

class PegasusConfig(object):
    """
    :param config_file: Path to configuration yaml file used for pegasus.
    :type config_file: str
    :param base_path: Base path of workflow generation.
    :type base_path: str

    The PegasusConfig loads all information from the config file to create
    the proper sites.xml and conf.rc pegasus needs to run.  Additionally,
    the PegasusConfig class holds information about relevant sites.
    """

    def __init__(self, config_file, base_path):
        self.config_file = config_file
        self.base_path = base_path
        self.errors = []
        self.site_file = "%s/input/sites.xml" % (base_path,)
        self.properties_file = "%s/input/properties.conf" % (base_path,)

        try:
            with open(config_file, "r") as rh:
                self.config = yaml.load(rh)
27
            self.validate()
28
29
30
31
        except yaml.YAMLError as exc:
            self.errors.append("Error parsing config template file '%s': %s.\n" % (config_file, exc))
        return

32
33
34
    def __getitem__(self, key):
        return self.config[key]

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    def is_valid(self):
        """
        :returns: Whether or not the run is valid.
        """
        return len(self.errors) == 0

    def get_error_string(self):
        """
        :returns: The errors as a newline separated string.
        """
        return "\n".join(self.errors)

    def validate(self):
        """
        Validates the passed in configuration file.  Many values passed in
        cannot be validated before run-time however.  Be careful.
        """
        self._validate_notify()
        self._validate_sites()
54
55
56
57
58
        if "install_paths" in self.config:
            if len(self.config["install_paths"]) < 0:
                self.errors.append("Error parsing config template file '%s': At least 1 install_path needs to be specified." % (self.config_file,))
        else:
            self.errors.append("Error parsing config template file '%s': install_paths must be included." % (self.config_file,))
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
        return

    def _validate_sites(self):
        if "sites" in self.config:
            if len(self.config["sites"].keys()) > 0:
                for site in self.config["sites"]:
                    if "dirs" in self.config["sites"][site]:
                        if len(self.config["sites"][site]["dirs"].keys()) == 0:
                            self.errors.append("Error parsing config template file '%s': Site '%s' has 0 dirs defined." % (self.config_file, site))
                    else:
                        self.errors.append("Error parsing config template file '%s': Site '%s' does not have any dirs defined." % (self.config_file, site))
            else:
                self.errors.append("Error parsing config template file '%s': At least 1 site must be defined." % (self.config_file,))
        else:
            self.errors.append("Error parsing config template file '%s': sites must be defined in config." % (self.config_file,))
        return

    def _validate_notify(self):
        if "notify" in self.config:
            if "pegasus_home" in self.config["notify"]:
                if not os.path.isdir(self.config["notify"]["pegasus_home"]):
                    self.errors.append("Error parsing config template file '%s': pegasus_home path '%s' does not exist." % (config_file, self.config["notify"]["pegasus_home"]))
            else:
                self.errors.append("Error parsing config template file '%s': pegasus_home must be defined under 'notify'." % (config_file,))
            if not "email" in self.config["notify"]:
                self.errors.append("Error parsing config template file '%s': email must be defined under 'notify'." % (config_file,))
        else:
            self.errors.append("Error parsing config template file '%s': notify must be defined at root level of config." % (config_file,))
        return

    def create_sites(self):
        """
        Creates the correct sites.xml file from the input configuration data.
        """
        site_string = """<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">\n"""
        for site_name, site_info in self.config["sites"].iteritems():
            # Load initial site info
            site_string += """<site handle="%s" arch="%s" os="%s">\n""" % (
                site_name,
                site_info.get("arch", "x86_64"),
                site_info.get("os", "LINUX")
            )
            # Load each directory for the site
            for dir_type, dir_path in site_info["dirs"].iteritems():
                site_string += """\
                    <directory type="%s" path="%s">
                        <file-server operation="all" url="file://%s" />
                    </directory>
107
                """ % (dir_type, dir_path.replace("%base_path%", self.base_path), dir_path.replace("%base_path%", self.base_path))
108
109
110
111
112
113
114
115
116
117
118
119
120
121
            # Grids is required for the BRIDGES setup,
            # conditionally add those to the sites
            if "grids" in site_info:
                site_string += "\n"
                for grid in site_info["grids"]:
                    site_string += """<grid type="%s" contact="%s" scheduler="%s" jobtype="%s" />\n""" % (
                        grid.get("type", "batch"),
                        grid.get("contact"),
                        grid.get("scheduler", "slurm"),
                        grid.get("jobtype")
                    )
            # Pass profile information directly through
            if "profile" in site_info:
                site_string += "\n"
122
123
                for namespace in site_info["profile"]:
                    for key, value in site_info["profile"][namespace].iteritems():
124
125
126
127
128
129
130
131
132
133
134
135
136
                        if key == "PATH":
                            value = value + ":%s/jobs/scripts/" % (os.path.dirname(os.path.abspath(__file__)),)
                        site_string += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (
                            namespace,
                            key,
                            value
                        )
                # If PATH is not specified, we need to add a key for it
                if "env" not in site_info["profile"] and "PATH" not in site_info["profile"]["env"]:
                    site_string += """\n\t<profile namespace="env" key="PATH">%s/jobs/scripts/</profile> """ % (
                        os.path.dirname(os.path.abspath(__file__))
                    )

137
138
139
140
            site_string += "</site>\n"

        site_string += "</sitecatalog>\n"

141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
        with open(self.site_file, "w") as wh:
            wh.write("\n".join([line for line in xml.dom.minidom.parseString(site_string).toprettyxml().split("\n") if line.strip()]))

        return

    def create_properties(self):
        """
        Creates the correct properties.conf file from the input configuration data.
        """
        properties_string = """\
        pegasus.catalog.site = XML
        pegasus.catalog.site.file = %s

        pegasus.dir.useTimestamp = true
        pegasus.condor.logs.symlink = false
        pegasus.data.configuration = sharedfs
        pegasus.transfer.links = true
        """ % (self.site_file,)
        if "properties" in self.config:
            for key, value in self.config["properties"].iteritems():
                properties_string += "%s = %s\n" % (key, value)
        with open(self.properties_file, "w") as wh:
            wh.write(properties_string)
        return