diff --git a/chipathlon/conf.py b/chipathlon/conf.py index c0a961f3f66f129c94d63b50bb65c9b9d0b225bf..f33f87a775bbe5e294f3973622602a4c06bdcc3b 100644 --- a/chipathlon/conf.py +++ b/chipathlon/conf.py @@ -167,3 +167,12 @@ genomes = { "additional_files": file_extensions["bowtie2_genome"] } } + +config_file = { + "required_keys": [ + "chipathlon_bin", "idr_bin", "pegasus_home", "email" + ], + "optional_keys": [ + "arch", "os" + ] +} diff --git a/chipathlon/pegasus_config.py b/chipathlon/pegasus_config.py deleted file mode 100644 index 7c0acaa108e2c720c457ab5ff762499389e016c0..0000000000000000000000000000000000000000 --- a/chipathlon/pegasus_config.py +++ /dev/null @@ -1,164 +0,0 @@ -import yaml -import os -import xml.dom.minidom - -class PegasusConfig(object): - """ - :param config_file: Path to configuration yaml file used for pegasus. - :type config_file: str - :param base_path: Base path of workflow generation. - :type base_path: str - - The PegasusConfig loads all information from the config file to create - the proper sites.xml and conf.rc pegasus needs to run. Additionally, - the PegasusConfig class holds information about relevant sites. - """ - - def __init__(self, config_file, base_path): - self.config_file = config_file - self.base_path = base_path - self.errors = [] - self.site_file = "%s/input/sites.xml" % (base_path,) - self.properties_file = "%s/input/properties.conf" % (base_path,) - - try: - with open(config_file, "r") as rh: - self.config = yaml.load(rh) - self.validate() - except yaml.YAMLError as exc: - self.errors.append("Error parsing config template file '%s': %s.\n" % (config_file, exc)) - return - - def __getitem__(self, key): - return self.config[key] - - def is_valid(self): - """ - :returns: Whether or not the run is valid. - """ - return len(self.errors) == 0 - - def get_error_string(self): - """ - :returns: The errors as a newline separated string. - """ - return "\n".join(self.errors) - - def validate(self): - """ - Validates the passed in configuration file. Many values passed in - cannot be validated before run-time however. Be careful. - """ - self._validate_notify() - self._validate_sites() - if "install_paths" in self.config: - if len(self.config["install_paths"]) < 0: - self.errors.append("Error parsing config template file '%s': At least 1 install_path needs to be specified." % (self.config_file,)) - else: - self.errors.append("Error parsing config template file '%s': install_paths must be included." % (self.config_file,)) - return - - def _validate_sites(self): - if "sites" in self.config: - if len(self.config["sites"].keys()) > 0: - for site in self.config["sites"]: - if "dirs" in self.config["sites"][site]: - if len(self.config["sites"][site]["dirs"].keys()) == 0: - self.errors.append("Error parsing config template file '%s': Site '%s' has 0 dirs defined." % (self.config_file, site)) - else: - self.errors.append("Error parsing config template file '%s': Site '%s' does not have any dirs defined." % (self.config_file, site)) - else: - self.errors.append("Error parsing config template file '%s': At least 1 site must be defined." % (self.config_file,)) - else: - self.errors.append("Error parsing config template file '%s': sites must be defined in config." % (self.config_file,)) - return - - def _validate_notify(self): - if "notify" in self.config: - if "pegasus_home" in self.config["notify"]: - if not os.path.isdir(self.config["notify"]["pegasus_home"]): - self.errors.append("Error parsing config template file '%s': pegasus_home path '%s' does not exist." % (config_file, self.config["notify"]["pegasus_home"])) - else: - self.errors.append("Error parsing config template file '%s': pegasus_home must be defined under 'notify'." % (config_file,)) - if not "email" in self.config["notify"]: - self.errors.append("Error parsing config template file '%s': email must be defined under 'notify'." % (config_file,)) - else: - self.errors.append("Error parsing config template file '%s': notify must be defined at root level of config." % (config_file,)) - return - - def create_sites(self): - """ - Creates the correct sites.xml file from the input configuration data. - """ - site_string = """<sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">\n""" - for site_name, site_info in self.config["sites"].iteritems(): - # Load initial site info - site_string += """<site handle="%s" arch="%s" os="%s">\n""" % ( - site_name, - site_info.get("arch", "x86_64"), - site_info.get("os", "LINUX") - ) - # Load each directory for the site - for dir_type, dir_path in site_info["dirs"].iteritems(): - site_string += """\ - <directory type="%s" path="%s"> - <file-server operation="all" url="file://%s" /> - </directory> - """ % (dir_type, dir_path.replace("%base_path%", self.base_path), dir_path.replace("%base_path%", self.base_path)) - # Grids is required for the BRIDGES setup, - # conditionally add those to the sites - if "grids" in site_info: - site_string += "\n" - for grid in site_info["grids"]: - site_string += """<grid type="%s" contact="%s" scheduler="%s" jobtype="%s" />\n""" % ( - grid.get("type", "batch"), - grid.get("contact"), - grid.get("scheduler", "slurm"), - grid.get("jobtype") - ) - # Pass profile information directly through - if "profile" in site_info: - site_string += "\n" - for namespace in site_info["profile"]: - for key, value in site_info["profile"][namespace].iteritems(): - if key == "PATH": - value = value + ":%s/jobs/scripts/" % (os.path.dirname(os.path.abspath(__file__)),) - site_string += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % ( - namespace, - key, - value - ) - # If PATH is not specified, we need to add a key for it - if "env" not in site_info["profile"] and "PATH" not in site_info["profile"]["env"]: - site_string += """\n\t<profile namespace="env" key="PATH">%s/jobs/scripts/</profile> """ % ( - os.path.dirname(os.path.abspath(__file__)) - ) - - site_string += "</site>\n" - - site_string += "</sitecatalog>\n" - - with open(self.site_file, "w") as wh: - wh.write("\n".join([line for line in xml.dom.minidom.parseString(site_string).toprettyxml().split("\n") if line.strip()])) - - return - - def create_properties(self): - """ - Creates the correct properties.conf file from the input configuration data. - """ - properties_string = """\ - pegasus.catalog.site = XML - pegasus.catalog.site.file = %s - - pegasus.dir.useTimestamp = true - pegasus.condor.logs.symlink = false - pegasus.data.configuration = sharedfs - pegasus.transfer.links = true - """ % (self.site_file,) - if "properties" in self.config: - for key, value in self.config["properties"].iteritems(): - properties_string += "%s = %s\n" % (key, value) - with open(self.properties_file, "w") as wh: - wh.write(properties_string) - return diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py index 93283842e8294ecc0d9fee5b4b3d13c7331d6b0b..abd11bc72893581c983f55ba5020c65bab694337 100644 --- a/chipathlon/workflow.py +++ b/chipathlon/workflow.py @@ -22,8 +22,8 @@ from Pegasus.DAX3 import * class Workflow(object): """ - :param jobhome: The base directory to generate the workflow in. - :type jobhome: str + :param job_home: The base directory to generate the workflow in. + :type job_home: str :param run_file: The run yaml file defining controls, samples, alignment tool, peak calling tool, peak calling type, and which samples to run idr on. @@ -55,7 +55,9 @@ class Workflow(object): executables. """ - def __init__(self, jobhome, run_file, param_file, config_file, host, username, password, execute_site="local", output_site=["local"], rewrite=False, debug=False): + def __init__(self, job_home, run_file, param_file, config_file, + properties_file, host, username, password, execute_site="local", + output_site=["local"], rewrite=False, debug=False): # debug mode, print out additional information self.debug = debug self.execute_site = execute_site @@ -67,15 +69,19 @@ class Workflow(object): # Initialize db connection self.mdb = chipathlon.db.MongoDB(host, username, password) # Jobname info & err - self.jobhome = os.path.abspath(jobhome) - self.base_path = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") - self.jobname = os.path.basename(os.path.dirname(self.jobhome + "/")) + self.job_home = os.path.abspath(job_home) + self.base_path = self.job_home + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") + self.jobname = os.path.basename(os.path.dirname(self.job_home + "/")) self.errors = [] # add new results even if they exist self.rewrite = rewrite # Input file info self.run_file = run_file self.param_file = param_file + + self.properties_file = properties_file + if not os.path.isfile(self.properties_file): + self.errors.append("Provided pegasus properties file '%s' does not exist." % (self.properties_file,)) self._load_config(config_file) # Dax specific info self.dax = ADAG(self.jobname) @@ -152,19 +158,22 @@ class Workflow(object): return def _load_config(self, config_file): - self.config = PegasusConfig(config_file, self.base_path) - if self.config.is_valid(): - if self.output_site not in self.config["sites"]: - self.errors.append("Specified output site '%s' is not defined in config file '%s'." % (self.output_site, config_file)) - if self.execute_site not in self.config["sites"]: - self.errors.append("Specified execute site '%s' is not defined in config file '%s'." % (self.execute_site, config_file)) - else: - for error in self.config.errors: - self.errors.append(error) + try: + with open(config_file, "r") as rh: + self.config = yaml.load(rh) + for key in chipathlon.conf.config_file["required_keys"]: + if key not in self.config: + self.errors.append("Config file '%s' does not have required key '%s'." % (config_file, key)) + all_keys = chipathlon.conf.config_file["optional_keys"] + chipathlon.conf.config_file["required_keys"] + for key in self.config: + if key not in all_keys: + self.errors.append("Config file '%s' has invalid key '%s' specified, should be one of: %s." % (config_file, key, all_keys)) + except yaml.YAMLError as ye: + self.errors.append("Error reading config file '%s': %s" % (config_file, ye)) return - def _add_executable(self, name, path, os_type="linux", arch="x86_64", site="local"): - self.executables[name] = Executable(name=name, os=os_type.lower(), arch=arch) + def _add_executable(self, name, path, os_type="linux", arch="x86_64", site="local", installed=True): + self.executables[name] = Executable(name=name, os=os_type.lower(), arch=arch, installed=installed) self.executables[name].addPFN(PFN(os.path.join("file://", path), site)) self.dax.addExecutable(self.executables[name]) if self.debug: @@ -174,30 +183,23 @@ class Workflow(object): def _load_executables(self, os_type="linux", arch="x86_64"): # Load actual executables don't need no wrappers for executable in chipathlon.conf.executables: - found = False - for install_path in self.config["install_paths"]: - if os.path.isfile(os.path.join(install_path, executable)): - self._add_executable( - executable, - os.path.join(install_path, executable), - site=self.execute_site, - arch=self.config["sites"][self.execute_site].get("arch", "x86_64"), - os_type=self.config["sites"][self.execute_site].get("os", "linux") - ) - found = True - break - if not found: - self.errors.append("Could not find executable %s" % (executable,)) + self._add_executable( + executable, + os.path.join(self.config["idr_bin"] if executable == "idr" else self.config["chipathlon_bin"], executable), + site=self.execute_site, + arch=self.config.get("arch", "x86_64"), + os_type=self.config.get("os", "linux") + ) # Load actual scripts for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)): for f in files: if not os.path.splitext(f)[1] == '.pyc': self._add_executable( - f, - os.path.join(root, f), + f, + os.path.join(self.config["chipathlon_bin"], f), site=self.execute_site, - arch=self.config["sites"][self.execute_site].get("arch", "x86_64"), - os_type=self.config["sites"][self.execute_site].get("os", "linux") + arch=self.config.get("arch", "x86_64"), + os_type=self.config.get("os", "linux") ) break # Handle necessary installed scripts @@ -206,8 +208,8 @@ class Workflow(object): cmd, os.path.join(chipathlon.conf.system_path, cmd), site=self.execute_site, - arch=self.config["sites"][self.execute_site].get("arch", "x86_64"), - os_type=self.config["sites"][self.execute_site].get("os", "linux") + arch=self.config.get("arch", "x86_64"), + os_type=self.config.get("os", "linux") ) return @@ -314,15 +316,13 @@ class Workflow(object): notify = textwrap.dedent("""\ #!/bin/bash %s/notification/email -t %s --report=pegasus-analyzer - """ % (self.config["notify"]["pegasus_home"], self.config["notify"]["email"])) + """ % (self.config["pegasus_home"], self.config["email"])) wh.write(notify) os.chmod(notify_path, 0755) self.dax.invoke(When.AT_END, notify_path) return def _create_pegasus_files(self): - self.config.create_properties() - self.config.create_sites() self._create_submit() self._create_dax() return @@ -342,7 +342,7 @@ class Workflow(object): --dax %s \\ --randomdir \\ """ % ( - self.config.properties_file, + self.properties_file, self.execute_site, self.output_site, os.path.join(self.base_path, "work"),