From ea63700fb19efbc7e1f9a545a70585656e735f14 Mon Sep 17 00:00:00 2001 From: aknecht2 <aknecht2@unl.edu> Date: Tue, 6 Jun 2017 14:02:45 -0500 Subject: [PATCH] Added option to prevent saving files to database. Updated generator handling of save_db & rewrite arguments. Minor style fixes & comment updates. Chip-gen now supports --save-db option. --- chipathlon/generators/align_generator.py | 17 +++++- chipathlon/generators/idr_generator.py | 17 +++++- chipathlon/generators/module_generator.py | 29 ++++++--- chipathlon/generators/peak_call_generator.py | 17 +++++- .../generators/remove_duplicates_generator.py | 17 +++++- chipathlon/workflow.py | 61 +++++++++++++------ scripts/chip-gen | 2 + 7 files changed, 124 insertions(+), 36 deletions(-) diff --git a/chipathlon/generators/align_generator.py b/chipathlon/generators/align_generator.py index 5225375..617ac4a 100644 --- a/chipathlon/generators/align_generator.py +++ b/chipathlon/generators/align_generator.py @@ -12,17 +12,28 @@ class AlignGenerator(ModuleGenerator): :param mdb: A MongoDB database class for fetching sample meta data. :type mdb: :py:class:`~chipathlon.db.MongoDB` :param workflow_module: The actual module being used. - :type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule` + :type workflow_module: `~chipathlon.workflow_module.WorkflowModule` :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance :type workflow_jobs: dict :param base_path: Base location of the workflow, used to save metadata files. :type base_path: str + :param save_db: Whether or not we want to save results to the database. + True by default. + :type save_db: bool + :param rewrite: Whether or not to rewrite existing files. If true, it will + ignore files in Mongo and recreate them. If false, it will download + files based on the latest available completed job. + :type rewrite: bool :param debug: If true, prints out params for each job & module. :type debug: bool """ - def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False): - super(AlignGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug) + def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, + workflow_jobs, base_path, save_db=True, rewrite=False, debug=False): + super(AlignGenerator, self).__init__( + dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, + base_path, save_db=save_db, rewrite=rewrite, debug=debug + ) self.generate_calls = { "bwa": { 1: self._bwa_single, diff --git a/chipathlon/generators/idr_generator.py b/chipathlon/generators/idr_generator.py index cb62b8e..4fb0dc0 100644 --- a/chipathlon/generators/idr_generator.py +++ b/chipathlon/generators/idr_generator.py @@ -12,17 +12,28 @@ class IdrGenerator(ModuleGenerator): :param mdb: A MongoDB database class for fetching sample meta data. :type mdb: :py:class:`~chipathlon.db.MongoDB` :param workflow_module: The actual module being used. - :type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule` + :type workflow_module: `~chipathlon.workflow_module.WorkflowModule` :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance :type workflow_jobs: dict :param base_path: Base location of the workflow, used to save metadata files. :type base_path: str + :param save_db: Whether or not we want to save results to the database. + True by default. + :type save_db: bool + :param rewrite: Whether or not to rewrite existing files. If true, it will + ignore files in Mongo and recreate them. If false, it will download + files based on the latest available completed job. + :type rewrite: bool :param debug: If true, prints out params for each job & module. :type debug: bool """ - def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False): - super(IdrGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug) + def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, + workflow_jobs, base_path, save_db=True, rewrite=False, debug=False): + super(IdrGenerator, self).__init__( + dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, + base_path, save_db=save_db, rewrite=rewrite, debug=debug + ) self.module_name = "idr" self.result_dict = {} self.output_files = { diff --git a/chipathlon/generators/module_generator.py b/chipathlon/generators/module_generator.py index 3881456..c0b7f3a 100644 --- a/chipathlon/generators/module_generator.py +++ b/chipathlon/generators/module_generator.py @@ -18,11 +18,19 @@ class ModuleGenerator(object): :type workflow_jobs: dict :param base_path: Base location of the workflow, used to save metadata files. :type base_path: str + :param save_db: Whether or not we want to save results to the database. + True by default. + :type save_db: bool + :param rewrite: Whether or not to rewrite existing files. If true, it will + ignore files in Mongo and recreate them. If false, it will download + files based on the latest available completed job. + :type rewrite: bool :param debug: If true, prints out params for each job & module. :type debug: bool """ - def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False): + def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, + workflow_jobs, base_path, save_db=True, rewrite=False, debug=False): self.dax = dax self.master_jobs = master_jobs self.master_files = master_files @@ -30,6 +38,8 @@ class ModuleGenerator(object): self.module = workflow_module self.workflow_jobs = workflow_jobs self.base_path = base_path + self.save_db = save_db + self.rewrite = rewrite self.debug = debug self.prev_results = {} self.save_results = [] @@ -72,6 +82,7 @@ class ModuleGenerator(object): all_jobs = prev_results[0].all_jobs all_markers = final_result.all_markers results = [] + # Iterate through the jobs from the workflow module for job_dict in self.module.get_job_list(all_markers[self.module.name]): job_name = job_dict.keys()[0] job_info = job_dict[job_name] @@ -80,6 +91,8 @@ class ModuleGenerator(object): for logical_name, output_info in job_info["outputs"].iteritems(): # We have to explicitly compare since None evaluates to False. if not output_info.get("final_result") == True: + # Create the intermediate result with all jobs & + # markers that have been run up to this point result = Result( logical_name, final_result.control_samples, @@ -90,6 +103,9 @@ class ModuleGenerator(object): prefix_join=output_info.get("prefix_join"), name_template=output_info.get("name_template") ) + # Some results require additional prefixes i.e. + # peakranger produces two separate output files + # that we need to keep track of. if final_result.get_meta("add_prefix") is not None: result.add_to_prefix(final_result.get_meta("add_prefix")) results.append(result) @@ -97,7 +113,8 @@ class ModuleGenerator(object): else: result = run.find_result(self.module.name, logical_name, final_result) results.append(result) - if result.should_save: + # Should we save the results to the db? + if result.should_save and self.save_db: self.save_results.append(result) return results @@ -205,12 +222,10 @@ class ModuleGenerator(object): self._download_from_gridfs(run, result) return - def generate(self, run, rewrite=False): + def generate(self, run): """ :param run: The target run to generate jobs for. :type run: :py:class:`~chipathlon.run.Run` - :param rewrite: Whether or not to rewrite data even if it exists. - :type rewrite: bool Generates actual workflow jobs for a particular run. The logic for job generation is based on the fact that all output files are unique, @@ -231,9 +246,9 @@ class ModuleGenerator(object): """ final_results = self.create_final_results(run) for result in final_results: - if rewrite or not result.exists_in_db(self.mdb, run.genome): + if self.rewrite or not result.exists_in_db(self.mdb, run.genome): for prev_result in self.get_prev_results(run, result): - if prev_result.exists_in_encode() or (not rewrite and prev_result.exists_in_db(self.mdb, run.genome)): + if prev_result.exists_in_encode() or (not self.rewrite and prev_result.exists_in_db(self.mdb, run.genome)): self.add_download_job(run, prev_result) # We only want to add jobs for the very last result in a module # Otherwise we will get duplicate jobs. diff --git a/chipathlon/generators/peak_call_generator.py b/chipathlon/generators/peak_call_generator.py index 16e77cf..2c0a6f9 100644 --- a/chipathlon/generators/peak_call_generator.py +++ b/chipathlon/generators/peak_call_generator.py @@ -13,17 +13,28 @@ class PeakCallGenerator(ModuleGenerator): :param mdb: A MongoDB database class for fetching sample meta data. :type mdb: :py:class:`~chipathlon.db.MongoDB` :param workflow_module: The actual module being used. - :type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule` + :type workflow_module: `~chipathlon.workflow_module.WorkflowModule` :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance :type workflow_jobs: dict :param base_path: Base location of the workflow, used to save metadata files. :type base_path: str + :param save_db: Whether or not we want to save results to the database. + True by default. + :type save_db: bool + :param rewrite: Whether or not to rewrite existing files. If true, it will + ignore files in Mongo and recreate them. If false, it will download + files based on the latest available completed job. + :type rewrite: bool :param debug: If true, prints out params for each job & module. :type debug: bool """ - def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False): - super(PeakCallGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug) + def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, + workflow_jobs, base_path, save_db=True, rewrite=False, debug=False): + super(PeakCallGenerator, self).__init__( + dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, + base_path, save_db=save_db, rewrite=rewrite, debug=debug + ) self.generate_calls = { "gem": self._gem, "spp": self._spp, diff --git a/chipathlon/generators/remove_duplicates_generator.py b/chipathlon/generators/remove_duplicates_generator.py index 6381f18..f04f6ff 100644 --- a/chipathlon/generators/remove_duplicates_generator.py +++ b/chipathlon/generators/remove_duplicates_generator.py @@ -12,17 +12,28 @@ class RemoveDuplicatesGenerator(ModuleGenerator): :param mdb: A MongoDB database class for fetching sample meta data. :type mdb: :py:class:`~chipathlon.db.MongoDB` :param workflow_module: The actual module being used. - :type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule` + :type workflow_module: `~chipathlon.workflow_module.WorkflowModule` :param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance :type workflow_jobs: dict :param base_path: Base location of the workflow, used to save metadata files. :type base_path: str + :param save_db: Whether or not we want to save results to the database. + True by default. + :type save_db: bool + :param rewrite: Whether or not to rewrite existing files. If true, it will + ignore files in Mongo and recreate them. If false, it will download + files based on the latest available completed job. + :type rewrite: bool :param debug: If true, prints out params for each job & module. :type debug: bool """ - def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False): - super(RemoveDuplicatesGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug) + def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, + workflow_jobs, base_path, save_db=True, rewrite=False, debug=False): + super(RemoveDuplicatesGenerator, self).__init__( + dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, + base_path, save_db=save_db, rewrite=rewrite, debug=debug + ) self.module_name = "remove_duplicates" self.result_dict = {} if debug: diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py index e9295d2..635f089 100644 --- a/chipathlon/workflow.py +++ b/chipathlon/workflow.py @@ -41,6 +41,9 @@ class Workflow(object): :param output_site: The output site to transfer files to. This site should be defined in the configuration file. :type output_site: str + :param save_db: Whether or not we want to save results to the database. + True by default. + :type save_db: bool :param rewrite: Whether or not to rewrite existing files. If true, it will ignore files in Mongo and recreate them. If false, it will download files based on the latest available completed job. @@ -56,12 +59,15 @@ class Workflow(object): def __init__(self, job_home, run_file, param_file, config_file, properties_file, host, username, password, execute_site="local", - output_site=["local"], rewrite=False, debug=False): + output_site="local", save_db=True, rewrite=False, debug=False): # debug mode, print out additional information self.debug = debug + # Job site information self.execute_site = execute_site self.output_site = output_site + self.save_db = save_db + # DB information self.username = username self.host = host self.password = password @@ -69,29 +75,27 @@ class Workflow(object): self.mdb = chipathlon.db.MongoDB(host, username, password) # Jobname info & err self.job_home = os.path.abspath(job_home) - self.base_path = self.job_home + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") - self.jobname = os.path.basename(os.path.dirname(self.job_home + "/")) + self.base_path = os.path.join(self.job_home, datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")) self.errors = [] # add new results even if they exist self.rewrite = rewrite # Input file info - self.run_file = run_file - self.param_file = param_file - + self.run_file = os.path.abspath(run_file) + self.param_file = os.path.abspath(param_file) self.properties_file = os.path.abspath(properties_file) - if not os.path.isfile(self.properties_file): - self.errors.append("Provided pegasus properties file '%s' does not exist." % (self.properties_file,)) - self._load_config(config_file) + self.config_file = os.path.abspath(config_file) + # Dax specific info - self.dax = ADAG(self.jobname) + self.dax = ADAG(os.path.basename(os.path.dirname(self.job_home + "/"))) self.executables = {} self.files = {} self.jobs = {} - self.deps = {} self.workflow_jobs = {} self.modules = {} + # The steps to take when generating jobs self.generation_steps = [ + self._load_config, self._load_executables, self._load_workflow_jobs, self._load_modules, @@ -156,9 +160,11 @@ class Workflow(object): self.dax.addFile(gen_file_obj["file"]) return - def _load_config(self, config_file): + def _load_config(self): + # Validate input config.yaml file + # required & optional keys can be found in conf.py try: - with open(config_file, "r") as rh: + with open(self.config_file, "r") as rh: self.config = yaml.load(rh) for key in chipathlon.conf.config_file["required_keys"]: if key not in self.config: @@ -169,6 +175,11 @@ class Workflow(object): self.errors.append("Config file '%s' has invalid key '%s' specified, should be one of: %s." % (config_file, key, all_keys)) except yaml.YAMLError as ye: self.errors.append("Error reading config file '%s': %s" % (config_file, ye)) + + # Validate the existance of but not the contents of the + # pegasus properties file + if not os.path.isfile(self.properties_file): + self.errors.append("Provided pegasus properties file '%s' does not exist." % (self.properties_file,)) return def _add_executable(self, name, path, os_type="linux", arch="x86_64", site="local", installed=True): @@ -283,10 +294,26 @@ class Workflow(object): return def _load_generators(self): - self.align_gen = AlignGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["align"], self.workflow_jobs, self.base_path, debug=self.debug) - self.remove_dup_gen = RemoveDuplicatesGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"], self.workflow_jobs, self.base_path, debug=self.debug) - self.peak_call_gen = PeakCallGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["peak_call"], self.workflow_jobs, self.base_path, debug=self.debug) - self.idr_gen = IdrGenerator(self.dax, self.jobs, self.files, self.mdb, self.modules["idr"], self.workflow_jobs, self.base_path, debug=self.debug) + self.align_gen = AlignGenerator( + self.dax, self.jobs, self.files, self.mdb, self.modules["align"], + self.workflow_jobs, self.base_path, save_db=self.save_db, + rewrite=self.rewrite, debug=self.debug + ) + self.remove_dup_gen = RemoveDuplicatesGenerator( + self.dax, self.jobs, self.files, self.mdb, self.modules["remove_duplicates"], + self.workflow_jobs, self.base_path, save_db=self.save_db, + rewrite=self.rewrite, debug=self.debug + ) + self.peak_call_gen = PeakCallGenerator( + self.dax, self.jobs, self.files, self.mdb, self.modules["peak_call"], + self.workflow_jobs, self.base_path, save_db=self.save_db, + rewrite=self.rewrite, debug=self.debug + ) + self.idr_gen = IdrGenerator( + self.dax, self.jobs, self.files, self.mdb, self.modules["idr"], + self.workflow_jobs, self.base_path, save_db=self.save_db, + rewrite=self.rewrite, debug=self.debug + ) return def _generate_jobs(self): diff --git a/scripts/chip-gen b/scripts/chip-gen index 71f7dbf..8f8a11a 100644 --- a/scripts/chip-gen +++ b/scripts/chip-gen @@ -16,6 +16,7 @@ parser.add_argument("--properties", dest="properties", required=True, help="Path parser.add_argument("--execute-site", dest="execute_site", required=True, default="local", help="Target execute site. Sites should be defined in configuration.") parser.add_argument("--output-site", dest="output_site", required=True, default="local", help="Target output site. Site should be defined in configuration.") +parser.add_argument("--save-db", dest="save_db", default=True, type=bool, help="Whether or not to save results to the database. Default: True") parser.add_argument("--rewrite", dest="rewrite", default=False, action="store_true", help="If specified, don't load from the database, rewrite files.") parser.add_argument("--debug", dest="debug", default=False, action="store_true", help="Print out more information while generating.") args = parser.parse_args() @@ -31,6 +32,7 @@ workflow = Workflow( args.password, execute_site=args.execute_site, output_site=args.output_site, + save_db=args.save_db, rewrite=args.rewrite, debug=args.debug ) -- GitLab