From f73fdf4206ce89be20748e2c7667df3991120d58 Mon Sep 17 00:00:00 2001
From: aknecht2 <aknecht2@unl.edu>
Date: Sun, 13 Mar 2016 17:26:38 -0500
Subject: [PATCH] Fixed minor issues in param yaml scripts. Updated workflow to
 add jobs and create relevant pegasus information.  Workflow can now generate
 download and alignment jobs, still needs testing.  Setup updated to include
 job-scripts.

---
 chipathlon/conf.py                            |   3 +
 .../jobs/params/bedtools_bam_to_bed.yaml      |   2 +-
 chipathlon/jobs/params/macs2_callpeak.yaml    |   2 +-
 .../jobs/params/picard_mark_duplicates.yaml   |   9 +-
 chipathlon/test/run/param.yaml                |  24 ++
 chipathlon/tester.py                          |   7 +-
 chipathlon/workflow.py                        | 219 +++++++++++++-----
 chipathlon/workflow_job.py                    |   4 +-
 chipathlon/workflow_module.py                 |   6 +-
 setup.py                                      |   2 +-
 10 files changed, 197 insertions(+), 81 deletions(-)

diff --git a/chipathlon/conf.py b/chipathlon/conf.py
index c43734e..4fe73b7 100644
--- a/chipathlon/conf.py
+++ b/chipathlon/conf.py
@@ -7,6 +7,9 @@ job_params = "jobs/params/"
 # Job wrappers directory
 job_wrappers = "jobs/wrappers/"
 
+# Job scripts directory
+job_scripts = "jobs/scripts/"
+
 # File extensions
 file_extensions = {
     "genome_index": ["fa", "fna"],
diff --git a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml
index fc37a2d..07ecadf 100644
--- a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml
+++ b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml
@@ -5,7 +5,7 @@ bedtools_bam_to_bed:
     - bed
   arguments:
     - "bamtobed":
-        changable: false
+        changeable: false
         required: true
         has_value: false
     - "-i":
diff --git a/chipathlon/jobs/params/macs2_callpeak.yaml b/chipathlon/jobs/params/macs2_callpeak.yaml
index d86ed62..efcc93b 100644
--- a/chipathlon/jobs/params/macs2_callpeak.yaml
+++ b/chipathlon/jobs/params/macs2_callpeak.yaml
@@ -26,7 +26,7 @@ macs2_callpeak:
         has_value: true
         default: "BED"
     - "-n":
-        changable: false
+        changeable: false
         required: true
         has_value: true
         default: $inputs.2
diff --git a/chipathlon/jobs/params/picard_mark_duplicates.yaml b/chipathlon/jobs/params/picard_mark_duplicates.yaml
index 5a299e3..0d5e305 100644
--- a/chipathlon/jobs/params/picard_mark_duplicates.yaml
+++ b/chipathlon/jobs/params/picard_mark_duplicates.yaml
@@ -1,9 +1,11 @@
-picard_sort_sam:
+picard_mark_duplicates:
   inputs:
     - bam
+  additional_inputs: null
   outputs:
     - bam
     - qc
+  command: picard
   arguments:
     - MarkDuplicates:
         changeable: false
@@ -28,16 +30,15 @@ picard_sort_sam:
         required: false
         has_value: true
         default: "LENIENT"
-     - "ASSUME_SORTED=":
+    - "ASSUME_SORTED=":
         changeable: true
         required: false
         has_value: true
         default: true
-     - "REMOVE_DUPLICATES=":
+    - "REMOVE_DUPLICATES=":
         changeable: false
         required: false
         has_value: true
         default: false
-
   walltime: 2000
   memory: 8000
diff --git a/chipathlon/test/run/param.yaml b/chipathlon/test/run/param.yaml
index 66eec9f..346d669 100644
--- a/chipathlon/test/run/param.yaml
+++ b/chipathlon/test/run/param.yaml
@@ -27,3 +27,27 @@ samtools_sam_to_bam:
   arguments: null
   walltime: 2000
   memory: 2000
+bedtools_bam_to_bed:
+  arguments: null
+  walltime: 2000
+  memory: 2000
+macs2_callpeak:
+  arguments: null
+  walltime: 2000
+  memory: 8000
+picard_mark_duplicates:
+  arguments: null
+  walltime: 2000
+  memory: 2000
+picard_sort_sam:
+  arguments: null
+  walltime: 2000
+  memory: 2000
+r_spp_nodups:
+  arguments: null
+  walltime: 2000
+  memory: 2000
+samtools_remove_duplicates:
+  arguments: null
+  walltime: 2000
+  memory: 2000
diff --git a/chipathlon/tester.py b/chipathlon/tester.py
index 9d55db5..c52ba72 100644
--- a/chipathlon/tester.py
+++ b/chipathlon/tester.py
@@ -6,9 +6,9 @@ import chipathlon.workflow
 import argparse
 
 parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.")
-parser.add_argument("--password", dest="password", required=True, help="Database user password.")
-parser.add_argument("--username", dest="username", default="aknecht", required=True, help="Database user.")
-parser.add_argument("--host", dest="host", default="hcc-anvil-241-41.unl.edu", required=True, help="Database host.")
+parser.add_argument("--password", dest="password", default="nerpAusIrd)griUrlobIbfaifovript4", help="Database user password.")
+parser.add_argument("--username", dest="username", default="aknecht", help="Database user.")
+parser.add_argument("--host", dest="host", default="hcc-anvil-175-9.unl.edu", help="Database host.")
 args = parser.parse_args()
 
 mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
@@ -126,4 +126,5 @@ def valid_samples():
 # ]
 # for test in tests:
 #     test()
+
 workflow_test_1()
diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py
index 3bb88de..b6fa748 100644
--- a/chipathlon/workflow.py
+++ b/chipathlon/workflow.py
@@ -11,6 +11,7 @@ import chipathlon
 import chipathlon.workflow_job
 import chipathlon.db
 import chipathlon.workflow_module
+import random
 from pprint import pprint
 from Pegasus.DAX3 import *
 
@@ -51,6 +52,14 @@ class Workflow(object):
             self._load_runs()
             # All required information is loaded, start queuing jobs
             self._add_download()
+            self._add_align()
+            # Create pegasus important stuff
+            self._create_setup()
+            self._add_notify()
+            self._create_replica()
+            self._create_sites()
+            self._create_submit()
+            self._write()
             break
         if self.err:
             print self.err
@@ -67,7 +76,7 @@ class Workflow(object):
                 self.dax.addExecutable(self.executables[ex_name])
             break
         # Load actual scripts
-        for root, dirs, files in os.walk("%s/scripts" % (os.path.dirname(os.path.realpath(__file__)),)):
+        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_scripts)):
             for f in files:
                 self.executables[f] = Executable(name=f, os=os_type, arch=arch)
                 self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
@@ -79,11 +88,14 @@ class Workflow(object):
         for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
             for f in files:
                 ex_name = f.split("_")[0]
-                yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
-                if not yj.err:
-                    self.workflow_jobs[yj.jobname] = yj
+                if ex_name in self.executables:
+                    yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
+                    if not yj.err:
+                        self.workflow_jobs[yj.jobname] = yj
+                    else:
+                        self.err += yj.err
                 else:
-                    self.err += yj.err
+                    print "[INFO] Skipping param file %s, no corresponding executable found." % (f,)
             break
         return
 
@@ -118,8 +130,10 @@ class Workflow(object):
                         run["data"] = data
                     else:
                         self.err += msg
-                    # MAKE SURE YOU UNCOMMENT THIS!!!
+                    # This will be changed to _check_genomes(run) eventually.
                     # self._check_genomes(run)
+                    # UNCOMMENT THIS:
+                    # self._check_grch38(run)
                 else:
                     self.err += "Error parsing run template file '%s'.  Required key 'experiment' not defined.\n" % (self.run_file,)
         except:
@@ -127,7 +141,47 @@ class Workflow(object):
             self.err += traceback.format_exc()
         return
 
+    def _check_grch38(self, run):
+        if run["align"] in self.run_data["genomes"]:
+            assembly = "grch38.p6"
+            gen_prefix = "genome_%s_%s" % (run["align"], assembly)
+            if assembly in self.run_data["genomes"][run["align"]]:
+                base_file = self.run_data["genomes"][run["align"]][assembly]
+                if os.path.isfile(base_file):
+                    base_file_name = gen_prefix + base_file.split(".", 1)
+                    f = File(base_file_name)
+                    f.addPFN(PFN(base_file, "local"))
+                    self.files[base_file_name] = f
+                    if base_file.split(".", 1)[1] in chipathlon.conf.genomes[run["align"]]["base_file"]:
+                        prefix = base_file.split(".", 1)[0]
+                        missing = []
+                        for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
+                            if not os.path.isfile(prefix + "." + ext) and not os.path.isfile(base_file + "." + ext):
+                                missing.append(ext)
+                            else:
+                                f = File(gen_prefix + "." + ext)
+                                if os.path.isfile(prefix + "." + ext):
+                                    f.addPFN(prefix + "." + ext)
+                                else:
+                                    f.addPFN(base_file + "." + ext)
+                                self.files[gen_prefix + "." + ext] = f
+                                self.dax.addFile(f)
+                        if len(missing) > 0:
+                            self.err += "Genome defined with tool '%s' and assembly '%s' is missing additional_files with extensions %s.\n" % (run["align"], assembly, missing)
+                    else:
+                        self.err += "Genome defined with tool '%s' and assembly '%s', has invalid extension.  Should be one of %s.\n" % (run["align"], assembly, chipathlon.conf.genomes[run["align"]]["base_file"])
+                else:
+                    self.err += "Genome defined with tool '%s' and assembly '%s', has non-existant base file '%s'.\n" % (run["align"], assembly, base_file)
+            else:
+                self.err += "Alignment defined with tool '%s' for assembly '%s', no corresponding genome definition.\n" % (run["align"], assembly)
+        else:
+            self.err += "Alignment defined with tool '%s', no corresponding genome definition.\n" % (run["align"])
+        return
+
     def _check_genomes(self, run):
+        # NOTE: THIS function should NOT be called.
+        # We will only be using GRCH38.p6 genome.
+        # Use the _check_grch38 function instead
         valid, msg, assembly = self.mdb.get_assembly(run["experiment"])
         if valid:
             if run["align"] in self.run_data["genomes"]:
@@ -155,80 +209,118 @@ class Workflow(object):
         return
 
     def _add_download(self):
+        # Remember, experiment always paired with randomly selected control,
+        # For alignment doesn't really matter, but we set up the groundwork
+        # for the future.  We randomly select unless there is an
+        # equal number of control and experiment files
+        self.input_sets = []
         for run in self.run_data["runs"]:
-            print "Control samples = %s, Experiment samples = %s" % (len(run["data"]["control"]), len(run["data"]["experiment"]))
+            if (len(run["data"]["control"]) != len(run["data"]["experiment"])):
+                control_data = [random.choice(run["data"]["control"]) for i in range(0, len(run["data"]["experiment"]))]
+            else:
+                control_data = run["data"]["control"]
+            for pair in zip(run["data"]["experiment"], control_data):
+                exp_name = "%s_%s.fastq.gz" % (run["experiment"], pair[0]["accession"])
+                control_name = "%s_%s.fastq.gz" % (run["experiment"], pair[1]["accession"])
+                if exp_name not in self.files and control_name not in self.files:
+                    self.input_sets.append(pair)
+                    for f in pair:
+                        # For each file we create the download job
+                        # and output file.  Base output files will be
+                        # EXPID_ACCESSION.fastq.gz
+                        output_name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
+                        output_file = File(output_name)
+                        self.files[output_name] = output_file
+                        job = Job(self.executables["download_fastq.py"])
+                        job.uses(output_file, link=Link.OUTPUT, transfer=True)
+                        job.addArguments("-u", f["url"], "-p", output_file, "-t http://", "-m", f["content_md5sum"])
+                        self.jobs[output_name] = job
+                        self.dax.addJob(job)
         return
 
-    def _addFile(self, name, inout, site="condorpool", path=None):
-        """
-            :param name: Name of the file to add.
-            :type name: str
-            :param path: Path to the file.  Only required for input files.
-            :type path: str
+    def _add_align(self):
+        print self.modules["align"].markers
+        markers = {}
+        markers["read_end"] = "single"
+        for run in self.run_data["runs"]:
+            input_files = {}
+            additional_files = {}
+            markers["tool"] = run["align"]
+            gen_prefix = "genome_%s_grch38.p6" % (run["align"],)
+            input_files["ref_genome"] = "%s.%s" % (gen_prefix, self.run_data["genomes"][run["align"]]["grch38.p6"])
+            for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
+                additional_files["ref_genome." + ext] = "%s.%s" % (gen_prefix, ext)
+            for pair in self.input_sets:
+                for f in pair:
+                    prefix = "%s_%s" % (run["experiment"], f["accession"])
+                    input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
+                    self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
+        return
 
-            Adds the inputted file to the dax, as well as the internal variable self.files
+    def _create_setup(self):
+        """
+            Creates the base structure for job submission.  Everything is contained
+            within a folder based on the current timestamp.
         """
-        self.files[inout][name] = {"file": File(name), "path": ""}
-        if inout == "input":
-            self.files[inout][name]["path"] = path if path else name
-            self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ", "%20"), site))
-            self.dax.addFile(self.files[dax][inout][name]["file"])
-        # elif inout == "output":
-        #     self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
+        self.basepath = self.jobhome + "/" + datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")
+        if not os.path.exists(self.basepath):
+            os.makedirs(self.basepath)
+        for folder in ["input", "output"]:
+            if not os.path.exists(os.path.join(self.basepath, folder)):
+                os.makedirs(os.path.join(self.basepath, folder))
         return
 
-    def _loadNotify(self, config):
-        self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh")
-        with open(self.basepath + "/input/notify.sh", "w") as wh:
+    def _add_notify(self):
+        # NEED TO DETERMINE HOW TO READ IN THESE VARS
+        notify_path = os.path.join(self.basepath, "input/notify.sh")
+        with open(notify_path, "w") as wh:
             notify = textwrap.dedent("""\
                     #!/bin/bash
                     %s/notification/email -t %s --report=pegasus-analyzer
             """ % (config["notify"]["pegasus_home"], config["notify"]["email"]))
             wh.write(notify)
-            os.chmod(self.basepath + "/input/notify.sh", 0755)
+            os.chmod(notify_path, 0755)
+        self.dax.invoke(When.AT_END, notify_path)
         return
 
-    def _createReplica(self):
-        """
-            Creates the pegasus configuration replica catalog.  input/conf.rc
-        """
-        with open(self.basepath + "/input/conf.rc", "w") as wh:
+    def _create_replica(self):
+        with open(os.path.join(self.basepath, "input/conf.rc"), "w") as wh:
             pegasusrc = textwrap.dedent("""\
-                        pegasus.catalog.site = XML
-                        pegasus.catalog.site.file = %s/sites.xml
+                pegasus.catalog.site = XML
+                pegasus.catalog.site.file = %s/sites.xml
 
-                        pegasus.condor.logs.symlink = false
-
-                        pegasus.data.configuration = sharedfs
-
-                        pegasus.dir.storage.mapper = Replica
-                        pegasus.dir.storage.mapper.replica = File
-                        pegasus.dir.storage.mapper.replica.file = %s/map.rc
-                        """ % (self.basepath + "/input", self.basepath + "/input"))
+                pegasus.condor.logs.symlink = false
 
+                pegasus.data.configuration = sharedfs
+                """ % (os.path.join(self.basepath, "input"),))
             wh.write(pegasusrc)
         return
 
-    def _createSites(self, config):
-        """
-            Creates the pegasus site catalog.  input/sites.xml
-        """
-        with open(self.basepath + "/input/sites.xml", "w") as wh:
+    def _create_sites(self):
+        with open(os.path.join(self.basepath, "input/sites.xml"), "w") as wh:
             sites = """\
-                 <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
-                            <site handle="local" arch="x86_64" os="LINUX">
-                                <directory type="shared-scratch" path="%s">
-                                    <file-server operation="all" url="file://%s" />
-                                </directory>
-                                <directory type="local-storage" path="%s">
-                                    <file-server operation="all" url="file://%s" />
-                                </directory>
-                            </site>
-                            <site handle="condorpool" arch="x86_64" os="LINUX">
-                                <directory type="shared-scratch" path="%s">
-                                    <file-server operation="all" url="file://%s" />
-                                </directory>
-            """ % (self.basepath + "/work/", self.basepath + "/work/", self.basepath + "/output/", self.basepath + "/output/", self.basepath + "/work/", self.basepath + "/work/")
+                <sitecatalog xmlns="http://pegasus.isi.edu/schema/sitecatalog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://pegasus.isi.edu/schema/sitecatalog http://pegasus.isi.edu/schema/sc-4.0.xsd" version="4.0">
+                   <site handle="local" arch="x86_64" os="LINUX">
+                       <directory type="shared-scratch" path="%s">
+                           <file-server operation="all" url="file://%s" />
+                       </directory>
+                       <directory type="local-storage" path="%s">
+                           <file-server operation="all" url="file://%s" />
+                       </directory>
+                   </site>
+                   <site handle="condorpool" arch="x86_64" os="LINUX">
+                       <directory type="shared-scratch" path="%s">
+                           <file-server operation="all" url="file://%s" />
+                       </directory>
+                """ % (
+                        os.path.join(self.basepath, "work"),
+                        os.path.join(self.basepath, "work"),
+                        os.path.join(self.basepath, "output"),
+                        os.path.join(self.basepath, "output"),
+                        os.path.join(self.basepath, "work"),
+                        os.path.join(self.basepath, "work")
+                    )
+            # NEED TO DETERMINE HOW TO READ IN THIS INFO
             for namespace in config["profile"]:
                 for key in config["profile"][namespace]:
                     val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key]
@@ -238,11 +330,11 @@ class Workflow(object):
             wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
         return
 
-    def _createSubmit(self):
+    def _create_submit(self):
         """
             Creates the pegasus submit script.  submit.sh
         """
-        with open(self.basepath + "/input/submit.sh", "w") as wh:
+        with open(os.path.join(self.basepath, "input/submit.sh"), "w") as wh:
             submit = textwrap.dedent("""\
                     #!/bin/bash
                     plan=`pegasus-plan \\
@@ -252,7 +344,7 @@ class Workflow(object):
                     --output-site local \\
                     --dax "%s" \\
                     --randomdir \\
-                    """ % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/encode.dax"))
+                    """ % (self.basepath + "/input/conf.rc", "condorpool", self.basepath + "/work/", self.basepath + "/input/chipathlon.dax"))
             submit += textwrap.dedent("""\
                     --submit`
 
@@ -267,14 +359,13 @@ class Workflow(object):
                     chmod 744 remove.sh
 
                     echo "$plan"
-                    echo "Alternatively, you can use the status & remove scripts in the current directory!"
 
                     """)
             wh.write(submit)
             os.chmod(self.basepath + "/input/submit.sh", 0755)
         return
 
-    def write(self):
+    def _write(self):
         with open(self.basepath + "/input/chipathlon.dax", "w") as wh:
             self.dax.writeXML(wh)
         return
diff --git a/chipathlon/workflow_job.py b/chipathlon/workflow_job.py
index 2258456..c4d84df 100644
--- a/chipathlon/workflow_job.py
+++ b/chipathlon/workflow_job.py
@@ -16,6 +16,8 @@ class WorkflowJob(object):
             with open(base_yaml, "r") as rh:
                 self.base = yaml.load(rh)
             self.jobname = self.base.keys()[0]
+            self.inputs = self.base[self.jobname]["inputs"]
+            self.outputs = self.base[self.jobname]["outputs"]
         except:
             self.err += "Error parsing job template yaml file.\n"
             self.err += traceback.format_exc()
@@ -29,8 +31,6 @@ class WorkflowJob(object):
         except:
             self.err += "Error parsing params yaml file.\n"
             self.err += traceback.format_exc()
-        self.inputs = self.base[self.jobname]["inputs"]
-        self.outputs = self.base[self.jobname]["outputs"]
         self.validate()
         return
 
diff --git a/chipathlon/workflow_module.py b/chipathlon/workflow_module.py
index ae752a9..20d680e 100644
--- a/chipathlon/workflow_module.py
+++ b/chipathlon/workflow_module.py
@@ -123,7 +123,7 @@ class WorkflowModule(object):
                     # NOW, WE add our job to the master_list, and check dependencies
                     # Python ALWAYS passes by reference, so even though
                     # we are a little redundant here, dependency checking
-                    # will be easier later
+                    # will be easier later and not very costly
                     for fd in job_outputs:
                         master_jobs[fd["name"]] = job
                     for fd in job_inputs:
@@ -161,7 +161,3 @@ class WorkflowModule(object):
 
     def _get_full_name(self, prefix, markers, fname):
         return "%s_%s_%s" % (prefix, "_".join([markers[x] for x in self.order]), fname)
-
-    def _add_output_files(self, master_files, prefix, markers, output_files):
-
-        return
diff --git a/setup.py b/setup.py
index 92e6586..779ef15 100644
--- a/setup.py
+++ b/setup.py
@@ -6,7 +6,7 @@ setup(
     packages=["chipathlon"],
     license="???",
     package_data={
-        "chipathlon": ["jobs/modules/*", "jobs/params/*", "jobs/wrappers/*"]
+        "chipathlon": ["jobs/modules/*", "jobs/params/*", "jobs/wrappers/*", "jobs/scripts/*"]
     },
     scripts = []
 )
-- 
GitLab