diff --git a/chipathlon/conf.py b/chipathlon/conf.py index c43734ef4bd3882f0520c08b6b6e70c5e87a3fd3..4fe73b73ae02524e6a6ce4d96952b80b51b8a530 100644 --- a/chipathlon/conf.py +++ b/chipathlon/conf.py @@ -7,6 +7,9 @@ job_params = "jobs/params/" # Job wrappers directory job_wrappers = "jobs/wrappers/" +# Job scripts directory +job_scripts = "jobs/scripts/" + # File extensions file_extensions = { "genome_index": ["fa", "fna"], diff --git a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml index fc37a2de40456e0ff03dd15ccce74474db6a3321..07ecadf4454b093022afafcf2c70b5e7e749f599 100644 --- a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml +++ b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml @@ -5,7 +5,7 @@ bedtools_bam_to_bed: - bed arguments: - "bamtobed": - changable: false + changeable: false required: true has_value: false - "-i": diff --git a/chipathlon/jobs/params/macs2_callpeak.yaml b/chipathlon/jobs/params/macs2_callpeak.yaml index d86ed627ea190dc80b48fa8ecd326e578c0c7d85..efcc93b776d2fc2c66bbea6abdb42fb6391feb64 100644 --- a/chipathlon/jobs/params/macs2_callpeak.yaml +++ b/chipathlon/jobs/params/macs2_callpeak.yaml @@ -26,7 +26,7 @@ macs2_callpeak: has_value: true default: "BED" - "-n": - changable: false + changeable: false required: true has_value: true default: $inputs.2 diff --git a/chipathlon/jobs/params/picard_mark_duplicates.yaml b/chipathlon/jobs/params/picard_mark_duplicates.yaml index 5a299e35c301bdae357be7376558d854a520645c..0d5e30549458dca5fbe5924cd47b29b7067b8061 100644 --- a/chipathlon/jobs/params/picard_mark_duplicates.yaml +++ b/chipathlon/jobs/params/picard_mark_duplicates.yaml @@ -1,9 +1,11 @@ -picard_sort_sam: +picard_mark_duplicates: inputs: - bam + additional_inputs: null outputs: - bam - qc + command: picard arguments: - MarkDuplicates: changeable: false @@ -28,16 +30,15 @@ picard_sort_sam: required: false has_value: true default: "LENIENT" - - "ASSUME_SORTED=": + - "ASSUME_SORTED=": changeable: true required: false has_value: true default: true - - "REMOVE_DUPLICATES=": + - "REMOVE_DUPLICATES=": changeable: false required: false has_value: true default: false - walltime: 2000 memory: 8000 diff --git a/chipathlon/test/run/param.yaml b/chipathlon/test/run/param.yaml index 66eec9f9ac637b385784a6d85d8ced9c13de01f3..346d669f472d5f87358027c3e0c0b776456b7217 100644 --- a/chipathlon/test/run/param.yaml +++ b/chipathlon/test/run/param.yaml @@ -27,3 +27,27 @@ samtools_sam_to_bam: arguments: null walltime: 2000 memory: 2000 +bedtools_bam_to_bed: + arguments: null + walltime: 2000 + memory: 2000 +macs2_callpeak: + arguments: null + walltime: 2000 + memory: 8000 +picard_mark_duplicates: + arguments: null + walltime: 2000 + memory: 2000 +picard_sort_sam: + arguments: null + walltime: 2000 + memory: 2000 +r_spp_nodups: + arguments: null + walltime: 2000 + memory: 2000 +samtools_remove_duplicates: + arguments: null + walltime: 2000 + memory: 2000 diff --git a/chipathlon/tester.py b/chipathlon/tester.py index 9d55db5cb2e10f7bed3b2a5ba497202a8429891e..c52ba72eeefa0447b7f59c9cd17306830aa942e1 100644 --- a/chipathlon/tester.py +++ b/chipathlon/tester.py @@ -6,9 +6,9 @@ import chipathlon.workflow import argparse parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.") -parser.add_argument("--password", dest="password", required=True, help="Database user password.") -parser.add_argument("--username", dest="username", default="aknecht", required=True, help="Database user.") -parser.add_argument("--host", dest="host", default="hcc-anvil-241-41.unl.edu", required=True, help="Database host.") +parser.add_argument("--password", dest="password", default="nerpAusIrd)griUrlobIbfaifovript4", help="Database user password.") +parser.add_argument("--username", dest="username", default="aknecht", help="Database user.") +parser.add_argument("--host", dest="host", default="hcc-anvil-175-9.unl.edu", help="Database host.") args = parser.parse_args() mdb = chipathlon.db.MongoDB(args.host, args.username, args.password) @@ -126,4 +126,5 @@ def valid_samples(): # ] # for test in tests: # test() + workflow_test_1() diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py index 3bb88de577b5c0c7ff0081bd1bea85644591ff7b..b6fa748e8d23c48e2591433c061dd4f1f227ab2c 100644 --- a/chipathlon/workflow.py +++ b/chipathlon/workflow.py @@ -11,6 +11,7 @@ import chipathlon import chipathlon.workflow_job import chipathlon.db import chipathlon.workflow_module +import random from pprint import pprint from Pegasus.DAX3 import * @@ -51,6 +52,14 @@ class Workflow(object): self._load_runs() # All required information is loaded, start queuing jobs self._add_download() + self._add_align() + # Create pegasus important stuff + self._create_setup() + self._add_notify() + self._create_replica() + self._create_sites() + self._create_submit() + self._write() break if self.err: print self.err @@ -67,7 +76,7 @@ class Workflow(object): self.dax.addExecutable(self.executables[ex_name]) break # Load actual scripts - for root, dirs, files in os.walk("%s/scripts" % (os.path.dirname(os.path.realpath(__file__)),)): + for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)): for f in files: self.executables[f] = Executable(name=f, os=os_type, arch=arch) self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool")) @@ -79,11 +88,14 @@ class Workflow(object): for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)): for f in files: ex_name = f.split("_")[0] - yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name]) - if not yj.err: - self.workflow_jobs[yj.jobname] = yj + if ex_name in self.executables: + yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name]) + if not yj.err: + self.workflow_jobs[yj.jobname] = yj + else: + self.err += yj.err else: - self.err += yj.err + print "[INFO] Skipping param file %s, no corresponding executable found." % (f,) break return @@ -118,8 +130,10 @@ class Workflow(object): run["data"] = data else: self.err += msg - # MAKE SURE YOU UNCOMMENT THIS!!! + # This will be changed to _check_genomes(run) eventually. # self._check_genomes(run) + # UNCOMMENT THIS: + # self._check_grch38(run) else: self.err += "Error parsing run template file '%s'. Required key 'experiment' not defined.\n" % (self.run_file,) except: @@ -127,7 +141,47 @@ class Workflow(object): self.err += traceback.format_exc() return + def _check_grch38(self, run): + if run["align"] in self.run_data["genomes"]: + assembly = "grch38.p6" + gen_prefix = "genome_%s_%s" % (run["align"], assembly) + if assembly in self.run_data["genomes"][run["align"]]: + base_file = self.run_data["genomes"][run["align"]][assembly] + if os.path.isfile(base_file): + base_file_name = gen_prefix + base_file.split(".", 1) + f = File(base_file_name) + f.addPFN(PFN(base_file, "local")) + self.files[base_file_name] = f + if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]: + prefix = base_file.split(".", 1)[0] + missing = [] + for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]: + if not os.path.isfile(prefix + "." + ext) and not os.path.isfile(base_file + "." + ext): + missing.append(ext) + else: + f = File(gen_prefix + "." + ext) + if os.path.isfile(prefix + "." + ext): + f.addPFN(prefix + "." + ext) + else: + f.addPFN(base_file + "." + ext) + self.files[gen_prefix + "." + ext] = f + self.dax.addFile(f) + if len(missing) > 0: + self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing) + else: + self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension. Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"]) + else: + self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file) + else: + self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly) + else: + self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"]) + return + def _check_genomes(self, run): + # NOTE: THIS function should NOT be called. + # We will only be using GRCH38.p6 genome. + # Use the _check_grch38 function instead valid, msg, assembly = self.mdb.get_assembly(run["experiment"]) if valid: if run["align"] in self.run_data["genomes"]: @@ -155,80 +209,118 @@ class Workflow(object): return def _add_download(self): + # Remember, experiment always paired with randomly selected control, + # For alignment doesn't really matter, but we set up the groundwork + # for the future. We randomly select unless there is an + # equal number of control and experiment files + self.input_sets = [] for run in self.run_data["runs"]: - print "Control samples = %s, Experiment samples = %s" % (len(run["data"]["control"]), len(run["data"]["experiment"])) + if (len(run["data"]["control"]) != len(run["data"]["experiment"])): + control_data = [random.choice(run["data"]["control"]) for i in range(0, len(run["data"]["experiment"]))] + else: + control_data = run["data"]["control"] + for pair in zip(run["data"]["experiment"], control_data): + exp_name = "%s_%s.fastq.gz" % (run["experiment"], pair[0]["accession"]) + control_name = "%s_%s.fastq.gz" % (run["experiment"], pair[1]["accession"]) + if exp_name not in self.files and control_name not in self.files: + self.input_sets.append(pair) + for f in pair: + # For each file we create the download job + # and output file. Base output files will be + # EXPID_ACCESSION.fastq.gz + output_name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"]) + output_file = File(output_name) + self.files[output_name] = output_file + job = Job(self.executables["download_fastq.py"]) + job.uses(output_file, link=Link.OUTPUT, transfer=True) + job.addArguments("-u", f["url"], "-p", output_file, "-t http://", "-m", f["content_md5sum"]) + self.jobs[output_name] = job + self.dax.addJob(job) return - def _addFile(self, name, inout, site="condorpool", path=None): - """ - :param name: Name of the file to add. - :type name: str - :param path: Path to the file. Only required for input files. - :type path: str + def _add_align(self): + print self.modules["align"].markers + markers = {} + markers["read_end"] = "single" + for run in self.run_data["runs"]: + input_files = {} + additional_files = {} + markers["tool"] = run["align"] + gen_prefix = "genome_%s_grch38.p6" % (run["align"],) + input_files["ref_genome"] = "%s.%s" % (gen_prefix, self.run_data["genomes"][run["align"]]["grch38.p6"]) + for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]: + additional_files["ref_genome." + ext] = "%s.%s" % (gen_prefix, ext) + for pair in self.input_sets: + for f in pair: + prefix = "%s_%s" % (run["experiment"], f["accession"]) + input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,) + self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files) + return - Adds the inputted file to the dax, as well as the internal variable self.files + def _create_setup(self): + """ + Creates the base structure for job submission. Everything is contained + within a folder based on the current timestamp. """ - self.files[inout][name] = {"file": File(name), "path": ""} - if inout == "input": - self.files[inout][name]["path"] = path if path else name - self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ", "%20"), site)) - self.dax.addFile(self.files[dax][inout][name]["file"]) - # elif inout == "output": - # self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name)) + self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S") + if not os.path.exists(self.basepath): + os.makedirs(self.basepath) + for folder in ["input", "output"]: + if not os.path.exists(os.path.join(self.basepath, folder)): + os.makedirs(os.path.join(self.basepath, folder)) return - def _loadNotify(self, config): - self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh") - with open(self.basepath + "/input/notify.sh", "w") as wh: + def _add_notify(self): + # NEED TO DETERMINE HOW TO READ IN THESE VARS + notify_path = os.path.join(self.basepath, "input/notify.sh") + with open(notify_path, "w") as wh: notify = textwrap.dedent("""\ #!/bin/bash %s/notification/email -t %s --report=pegasus-analyzer """ % (config["notify"]["pegasus_home"], config["notify"]["email"])) wh.write(notify) - os.chmod(self.basepath + "/input/notify.sh", 0755) + os.chmod(notify_path, 0755) + self.dax.invoke(When.AT_END, notify_path) return - def _createReplica(self): - """ - Creates the pegasus configuration replica catalog. input/conf.rc - """ - with open(self.basepath + "/input/conf.rc", "w") as wh: + def _create_replica(self): + with open(os.path.join(self.basepath, "input/conf.rc"), "w") as wh: pegasusrc = textwrap.dedent("""\ - pegasus.catalog.site = XML - pegasus.catalog.site.file = %s/sites.xml + pegasus.catalog.site = XML + pegasus.catalog.site.file = %s/sites.xml - pegasus.condor.logs.symlink = false - - pegasus.data.configuration = sharedfs - - pegasus.dir.storage.mapper = Replica - pegasus.dir.storage.mapper.replica = File - pegasus.dir.storage.mapper.replica.file = %s/map.rc - """ % (self.basepath + "/input", self.basepath + "/input")) + pegasus.condor.logs.symlink = false + pegasus.data.configuration = sharedfs + """ % (os.path.join(self.basepath, "input"),)) wh.write(pegasusrc) return - def _createSites(self, config): - """ - Creates the pegasus site catalog. input/sites.xml - """ - with open(self.basepath + "/input/sites.xml", "w") as wh: + def _create_sites(self): + with open(os.path.join(self.basepath, "input/sites.xml"), "w") as wh: sites = """\ - <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0"> - <site handle="local" arch="x86_64" os="LINUX"> - <directory type="shared-scratch" path="%s"> - <file-server operation="all" url="file://%s" /> - </directory> - <directory type="local-storage" path="%s"> - <file-server operation="all" url="file://%s" /> - </directory> - </site> - <site handle="condorpool" arch="x86_64" os="LINUX"> - <directory type="shared-scratch" path="%s"> - <file-server operation="all" url="file://%s" /> - </directory> - """ % (self.basepath + "/work/", self.basepath + "/work/", self.basepath + "/output/", self.basepath + "/output/", self.basepath + "/work/", self.basepath + "/work/") + <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0"> + <site handle="local" arch="x86_64" os="LINUX"> + <directory type="shared-scratch" path="%s"> + <file-server operation="all" url="file://%s" /> + </directory> + <directory type="local-storage" path="%s"> + <file-server operation="all" url="file://%s" /> + </directory> + </site> + <site handle="condorpool" arch="x86_64" os="LINUX"> + <directory type="shared-scratch" path="%s"> + <file-server operation="all" url="file://%s" /> + </directory> + """ % ( + os.path.join(self.basepath, "work"), + os.path.join(self.basepath, "work"), + os.path.join(self.basepath, "output"), + os.path.join(self.basepath, "output"), + os.path.join(self.basepath, "work"), + os.path.join(self.basepath, "work") + ) + # NEED TO DETERMINE HOW TO READ IN THIS INFO for namespace in config["profile"]: for key in config["profile"][namespace]: val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key] @@ -238,11 +330,11 @@ class Workflow(object): wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()])) return - def _createSubmit(self): + def _create_submit(self): """ Creates the pegasus submit script. submit.sh """ - with open(self.basepath + "/input/submit.sh", "w") as wh: + with open(os.path.join(self.basepath, "input/submit.sh"), "w") as wh: submit = textwrap.dedent("""\ #!/bin/bash plan=`pegasus-plan \\ @@ -252,7 +344,7 @@ class Workflow(object): --output-site local \\ --dax "%s" \\ --randomdir \\ - """ % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/encode.dax")) + """ % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/chipathlon.dax")) submit += textwrap.dedent("""\ --submit` @@ -267,14 +359,13 @@ class Workflow(object): chmod 744 remove.sh echo "$plan" - echo "Alternatively, you can use the status & remove scripts in the current directory!" """) wh.write(submit) os.chmod(self.basepath + "/input/submit.sh", 0755) return - def write(self): + def _write(self): with open(self.basepath + "/input/chipathlon.dax", "w") as wh: self.dax.writeXML(wh) return diff --git a/chipathlon/workflow_job.py b/chipathlon/workflow_job.py index 2258456501bcde2ec1040d3f668d40f7e15be172..c4d84dfb22ed52dc8e753e34596cf78ad982c08b 100644 --- a/chipathlon/workflow_job.py +++ b/chipathlon/workflow_job.py @@ -16,6 +16,8 @@ class WorkflowJob(object): with open(base_yaml, "r") as rh: self.base = yaml.load(rh) self.jobname = self.base.keys()[0] + self.inputs = self.base[self.jobname]["inputs"] + self.outputs = self.base[self.jobname]["outputs"] except: self.err += "Error parsing job template yaml file.\n" self.err += traceback.format_exc() @@ -29,8 +31,6 @@ class WorkflowJob(object): except: self.err += "Error parsing params yaml file.\n" self.err += traceback.format_exc() - self.inputs = self.base[self.jobname]["inputs"] - self.outputs = self.base[self.jobname]["outputs"] self.validate() return diff --git a/chipathlon/workflow_module.py b/chipathlon/workflow_module.py index ae752a9253d6925f1667743633f5e1f6ab863e5d..20d680ec4b6505e75af7dff8079806c54d111231 100644 --- a/chipathlon/workflow_module.py +++ b/chipathlon/workflow_module.py @@ -123,7 +123,7 @@ class WorkflowModule(object): # NOW, WE add our job to the master_list, and check dependencies # Python ALWAYS passes by reference, so even though # we are a little redundant here, dependency checking - # will be easier later + # will be easier later and not very costly for fd in job_outputs: master_jobs[fd["name"]] = job for fd in job_inputs: @@ -161,7 +161,3 @@ class WorkflowModule(object): def _get_full_name(self, prefix, markers, fname): return "%s_%s_%s" % (prefix, "_".join([markers[x] for x in self.order]), fname) - - def _add_output_files(self, master_files, prefix, markers, output_files): - - return diff --git a/setup.py b/setup.py index 92e6586f8d7a3283fa297ad2fd9e0829e51c737a..779ef150ca31c05378b1e34232b7d78859ae6713 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( packages=["chipathlon"], license="???", package_data={ - "chipathlon": ["jobs/modules/*", "jobs/params/*", "jobs/wrappers/*"] + "chipathlon": ["jobs/modules/*", "jobs/params/*", "jobs/wrappers/*", "jobs/scripts/*"] }, scripts = [] )