Commit ea63700f authored by aknecht2's avatar aknecht2
Browse files

Added option to prevent saving files to database. Updated generator handling...

Added option to prevent saving files to database.  Updated generator handling of save_db & rewrite arguments.  Minor style fixes & comment updates.  Chip-gen now supports --save-db option.
parent 560b3cd4
......@@ -12,17 +12,28 @@ class AlignGenerator(ModuleGenerator):
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule`
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
super(AlignGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(AlignGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.generate_calls = {
"bwa": {
1: self._bwa_single,
......
......@@ -12,17 +12,28 @@ class IdrGenerator(ModuleGenerator):
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule`
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
super(IdrGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(IdrGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.module_name = "idr"
self.result_dict = {}
self.output_files = {
......
......@@ -18,11 +18,19 @@ class ModuleGenerator(object):
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
self.dax = dax
self.master_jobs = master_jobs
self.master_files = master_files
......@@ -30,6 +38,8 @@ class ModuleGenerator(object):
self.module = workflow_module
self.workflow_jobs = workflow_jobs
self.base_path = base_path
self.save_db = save_db
self.rewrite = rewrite
self.debug = debug
self.prev_results = {}
self.save_results = []
......@@ -72,6 +82,7 @@ class ModuleGenerator(object):
all_jobs = prev_results[0].all_jobs
all_markers = final_result.all_markers
results = []
# Iterate through the jobs from the workflow module
for job_dict in self.module.get_job_list(all_markers[self.module.name]):
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
......@@ -80,6 +91,8 @@ class ModuleGenerator(object):
for logical_name, output_info in job_info["outputs"].iteritems():
# We have to explicitly compare since None evaluates to False.
if not output_info.get("final_result") == True:
# Create the intermediate result with all jobs &
# markers that have been run up to this point
result = Result(
logical_name,
final_result.control_samples,
......@@ -90,6 +103,9 @@ class ModuleGenerator(object):
prefix_join=output_info.get("prefix_join"),
name_template=output_info.get("name_template")
)
# Some results require additional prefixes i.e.
# peakranger produces two separate output files
# that we need to keep track of.
if final_result.get_meta("add_prefix") is not None:
result.add_to_prefix(final_result.get_meta("add_prefix"))
results.append(result)
......@@ -97,7 +113,8 @@ class ModuleGenerator(object):
else:
result = run.find_result(self.module.name, logical_name, final_result)
results.append(result)
if result.should_save:
# Should we save the results to the db?
if result.should_save and self.save_db:
self.save_results.append(result)
return results
......@@ -205,12 +222,10 @@ class ModuleGenerator(object):
self._download_from_gridfs(run, result)
return
def generate(self, run, rewrite=False):
def generate(self, run):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:`~chipathlon.run.Run`
:param rewrite: Whether or not to rewrite data even if it exists.
:type rewrite: bool
Generates actual workflow jobs for a particular run. The logic for
job generation is based on the fact that all output files are unique,
......@@ -231,9 +246,9 @@ class ModuleGenerator(object):
"""
final_results = self.create_final_results(run)
for result in final_results:
if rewrite or not result.exists_in_db(self.mdb, run.genome):
if self.rewrite or not result.exists_in_db(self.mdb, run.genome):
for prev_result in self.get_prev_results(run, result):
if prev_result.exists_in_encode() or (not rewrite and prev_result.exists_in_db(self.mdb, run.genome)):
if prev_result.exists_in_encode() or (not self.rewrite and prev_result.exists_in_db(self.mdb, run.genome)):
self.add_download_job(run, prev_result)
# We only want to add jobs for the very last result in a module
# Otherwise we will get duplicate jobs.
......
......@@ -13,17 +13,28 @@ class PeakCallGenerator(ModuleGenerator):
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule`
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
super(PeakCallGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(PeakCallGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.generate_calls = {
"gem": self._gem,
"spp": self._spp,
......
......@@ -12,17 +12,28 @@ class RemoveDuplicatesGenerator(ModuleGenerator):
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: :py:class:`~chipathlon.workflow_module.WorkflowModule`
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
super(RemoveDuplicatesGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(RemoveDuplicatesGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.module_name = "remove_duplicates"
self.result_dict = {}
if debug:
......
No preview for this file type
......@@ -16,6 +16,7 @@ parser.add_argument("--properties", dest="properties", required=True, help="Path
parser.add_argument("--execute-site", dest="execute_site", required=True, default="local", help="Target execute site. Sites should be defined in configuration.")
parser.add_argument("--output-site", dest="output_site", required=True, default="local", help="Target output site. Site should be defined in configuration.")
parser.add_argument("--save-db", dest="save_db", default=True, type=bool, help="Whether or not to save results to the database. Default: True")
parser.add_argument("--rewrite", dest="rewrite", default=False, action="store_true", help="If specified, don't load from the database, rewrite files.")
parser.add_argument("--debug", dest="debug", default=False, action="store_true", help="Print out more information while generating.")
args = parser.parse_args()
......@@ -31,6 +32,7 @@ workflow = Workflow(
args.password,
execute_site=args.execute_site,
output_site=args.output_site,
save_db=args.save_db,
rewrite=args.rewrite,
debug=args.debug
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment