From d2b21636843bdb8f1178308687cf4a9b170772ea Mon Sep 17 00:00:00 2001
From: aknecht2 <aknecht2@unl.edu>
Date: Wed, 2 Mar 2016 16:04:47 -0600
Subject: [PATCH] Added download script.  Updated job yaml.  Updated module
 yaml.  Add executable loading to workflow code.  Updated database validation
 for new inputs.

---
 chipathlon/conf.py                           |  3 +
 chipathlon/db.py                             | 37 +++++-------
 chipathlon/jobs/modules/align.yaml           | 49 ++++++++++++++--
 chipathlon/jobs/params/bwa_align_paired.yaml | 10 +++-
 chipathlon/jobs/params/bwa_align_single.yaml | 10 +++-
 chipathlon/jobs/params/bwa_sai_to_sam.yaml   |  2 +-
 chipathlon/scripts/download_fastq.py         | 22 +++++++
 chipathlon/test/run/run.yaml                 |  6 +-
 chipathlon/utils.py                          | 59 ++++++++++---------
 chipathlon/workflow.py                       | 60 ++++++++++++--------
 10 files changed, 167 insertions(+), 91 deletions(-)
 create mode 100644 chipathlon/scripts/download_fastq.py

diff --git a/chipathlon/conf.py b/chipathlon/conf.py
index fedfae0..372f47a 100644
--- a/chipathlon/conf.py
+++ b/chipathlon/conf.py
@@ -4,6 +4,9 @@ job_modules = "jobs/modules/"
 # Job params directory
 job_params = "jobs/params/"
 
+# Job wrappers directory
+job_wrappers = "jobs/wrappers/"
+
 # File extensions
 file_extensions = {
     "fna": ["fna"],
diff --git a/chipathlon/db.py b/chipathlon/db.py
index 8846474..622bd81 100644
--- a/chipathlon/db.py
+++ b/chipathlon/db.py
@@ -34,8 +34,8 @@ class MongoDB(object):
             },
             {
                 "$lookup": {
-                    "from": "samples",
-                    "localField": "_id",
+                    "from": "samples_test",
+                    "localField": "uuid",
                     "foreignField": "experiment_id",
                     "as": "samples"
                 }
@@ -85,7 +85,7 @@ class MongoDB(object):
                     # 5. Iterate through control_exps
                     # 6. Join samples into the control_exps by exp_id
                     # 7. Re-aggregate all data into arrays
-                    cursor = self.db.experiments.aggregate([
+                    pipeline = [
                         {
                             "$match": {
                                 "target": {"$exists": True},
@@ -98,8 +98,8 @@ class MongoDB(object):
                         },
                         {
                             "$lookup": {
-                                "from": "samples",
-                                "localField": "_id",
+                                "from": "samples_test",
+                                "localField": "uuid",
                                 "foreignField": "experiment_id",
                                 "as": "samples"
                             }
@@ -109,36 +109,25 @@ class MongoDB(object):
                         },
                         {
                             "$lookup": {
-                                "from": "experiments",
-                                "localField": "possible_controls",
-                                "foreignField": "@id",
-                                "as": "control_exps"
-                            }
-                        },
-                        {
-                            "$unwind": "$control_exps"
-                        },
-                        {
-                            "$lookup": {
-                                "from": "samples",
-                                "localField": "control_exps._id",
+                                "from": "samples_test",
+                                "localField": "possible_controls.uuid",
                                 "foreignField": "experiment_id",
-                                "as": "control_exps.samples"
+                                "as": "possible_controls.samples"
                             }
                         },
                         {
                             "$group": {
                                 "_id": "$_id",
                                 "possible_controls": {"$push": "$possible_controls"},
-                                "control_exps": {"$push": "$control_exps"},
                                 "samples": {"$push": "$samples"}
                             }
                         }
-                    ])
+                    ]
+                    cursor = self.db.experiments.aggregate(pipeline)
                     # We should have only 1 document
                     document = cursor.next()
-                    control_inputs = [sample for control in document["control_exps"] for sample in control["samples"] if ("filetype" in sample and sample["filetype"] == "fastq")]
-                    experiment_inputs = [sample for sample in document["samples"][0] if ("filetype" in sample and sample["filetype"] == "fastq")]
+                    control_inputs = [sample for control in document["possible_controls"] for sample in control["samples"] if ("file_type" in sample and sample["file_type"] == "fastq")]
+                    experiment_inputs = [sample for sample in document["samples"][0] if ("file_type" in sample and sample["file_type"] == "fastq")]
                     if (len(control_inputs) > 0 and len(experiment_inputs) > 0):
                         msg = "Succesfully retrieved input files for experiment with id '%s'." % (experiment_id,)
                         data = {
@@ -147,7 +136,7 @@ class MongoDB(object):
                         }
                     else:
                         valid = False
-                        msg = "Experiment with id '%s' has '%s' possible control inputs, and '%s' possible experiment inputs." % (experiment_id, len(control_inputs), len(experiment_inputs))
+                        msg = "Experiment with id '%s' has %s possible control inputs, and %s possible experiment inputs." % (experiment_id, len(control_inputs), len(experiment_inputs))
                 else:
                     valid = False
                     msg = "Experiment with id '%s' does not have possible_controls." % (experiment_id,)
diff --git a/chipathlon/jobs/modules/align.yaml b/chipathlon/jobs/modules/align.yaml
index 78dc927..b418a90 100644
--- a/chipathlon/jobs/modules/align.yaml
+++ b/chipathlon/jobs/modules/align.yaml
@@ -1,12 +1,49 @@
 align:
   bwa:
     - single:
-        - bwa_align_single
-        - bwa_sai_to_sam
+        - bwa_align_single:
+            inputs:
+              - ref_genome
+              - download_1.fastq
+            additional_inputs:
+              - ref_genome.amb
+              - ref_genome.ann
+              - ref_genome.bwt
+              - ref_genome.pac
+              - ref_genome.sa
+            outputs:
+              - bwa.sai
+        - bwa_sai_to_sam:
+            inputs:
+              - ref_genome.fa
+              - bwa.sai
+              - download_1.fastq
+            additional_inputs: null
+            outputs:
+              - align.sam
       paired:
-        - bwa_align_paired
-    - samtools_sam_to_bam
+        - bwa_align_paired:
+            inputs:
+              - ref_genome.fa
+              - download_1.fastq
+              - download_2.fastq
+            additional_inputs:
+              - ref_genome.amb
+              - ref_genome.ann
+              - ref_genome.bwt
+              - ref_genome.pac
+              - ref_genome.sa
+            outputs:
+              - align.sam
+    - samtools_sam_to_bam:
+        inputs:
+          - align.sam
+        additional_inputs: null
+        outputs:
+          - align.bam
   bowtie2:
-    - single: bowtie2_align_single
-      paired: bowtie2_align_paired
+    - single:
+        - bowtie2_align_single
+      paired:
+        - bowtie2_align_paired
     - samtools_sam_to_bam
diff --git a/chipathlon/jobs/params/bwa_align_paired.yaml b/chipathlon/jobs/params/bwa_align_paired.yaml
index 156bc7d..da1cdd6 100644
--- a/chipathlon/jobs/params/bwa_align_paired.yaml
+++ b/chipathlon/jobs/params/bwa_align_paired.yaml
@@ -1,8 +1,16 @@
 bwa_align_paired:
   inputs:
-    - fna
+    -
+      - fna
+      - fa
     - fastq
     - fastq
+  additional_inputs:
+    - fa.amb
+    - fa.ann
+    - fa.bwt
+    - fa.pac
+    - fa.sa
   outputs:
     - sam
   command: bwa
diff --git a/chipathlon/jobs/params/bwa_align_single.yaml b/chipathlon/jobs/params/bwa_align_single.yaml
index d911a43..368ad02 100644
--- a/chipathlon/jobs/params/bwa_align_single.yaml
+++ b/chipathlon/jobs/params/bwa_align_single.yaml
@@ -1,7 +1,15 @@
 bwa_align_single:
   inputs:
-    - fna
+    -
+      - fna
+      - fa
     - fastq
+  additional_inputs:
+    - fa.amb
+    - fa.ann
+    - fa.bwt
+    - fa.pac
+    - fa.sa
   outputs:
     - sai
   command: bwa
diff --git a/chipathlon/jobs/params/bwa_sai_to_sam.yaml b/chipathlon/jobs/params/bwa_sai_to_sam.yaml
index 60a920b..cdae736 100644
--- a/chipathlon/jobs/params/bwa_sai_to_sam.yaml
+++ b/chipathlon/jobs/params/bwa_sai_to_sam.yaml
@@ -1,6 +1,6 @@
 bwa_sai_to_sam:
   inputs:
-    - fna
+    - fa
     - sai
     - fastq
   outputs:
diff --git a/chipathlon/scripts/download_fastq.py b/chipathlon/scripts/download_fastq.py
new file mode 100644
index 0000000..9287276
--- /dev/null
+++ b/chipathlon/scripts/download_fastq.py
@@ -0,0 +1,22 @@
+#!/usr/bin/python
+import chipathlon
+import argparse
+
+parser = argparse.ArgumentParser(description = "Download target file.")
+parser.add_argument("--url", dest="url", required = True, help="Target url.")
+parser.add_argument("--local_path", dest="local_path", required = True, help="Local path to file.")
+parser.add_argument("--url_type", dest="url_type", default="ftp://", help="Type of url to access.")
+parser.add_argument("--retries", dest="retries", default=3, type=int, help="Number of retries.")
+parser.add_argument("--overwrite", dest="overwrite", default=False, action="store_true", help="Overwrite local file if exists.")
+parser.add_argumnet("--md5", dest="md5", help="Check md5 value against passed value.")
+args = parser.parse_args()
+
+chipathlon.utils.downloadFile(
+    args.url,
+    args.local_path,
+    urltype = args.url_type,
+    retries = args.retries,
+    overwrite = args.overwrite,
+    checkmd5 = (not not args.md5),
+    md5 = args.md5
+)
diff --git a/chipathlon/test/run/run.yaml b/chipathlon/test/run/run.yaml
index dce3a54..6234c7b 100644
--- a/chipathlon/test/run/run.yaml
+++ b/chipathlon/test/run/run.yaml
@@ -1,8 +1,6 @@
-- run1:
-  experiment: "ENCSR000BSE"
+- experiment: "ENCSR000BSE"
   align: bwa
   peak: spp
-- run2:
-  experiment: "ENCSR000BSE"
+- experiment: "ENCSR000BSE"
   align: bowtie2
   peak: spp
diff --git a/chipathlon/utils.py b/chipathlon/utils.py
index cd2acd2..0594c74 100644
--- a/chipathlon/utils.py
+++ b/chipathlon/utils.py
@@ -11,35 +11,34 @@ def progress(current, end, length = 20):
     sys.stdout.write("\rProcessed %s / %s entries. [%s] %s%%" % (current, end, hashes + spaces, int(round(percent * 100))))
     sys.stdout.flush()
 
-def downloadFile(url, localpath, logfile, urltype = "ftp://", retries = 3, overwrite = False, checkmd5 = False, md5 = None):
-    with open(logfile, "a") as wh:
-        success = False
-        if url[:7] == "http://":
-            urltype = ""
-        for i in range(0, retries):
-            wh.write("Attempt #%s, downloading %s\n" % (i + 1, url))
-            try:
-                if not os.path.isfile(localpath) or overwrite:
-                    with open(localpath, "w") as fwh:
-                        response = urllib2.urlopen(urltype + url)
-                        data = response.read()
-                        if checkmd5:
-                            if hashlib.md5(data).hexdigest() == md5:
-                                fwh.write(data)
-                                success = True
-                                wh.write("File saved successfully.\n")
-                            else:
-                                wh.write("MD5 mismatch.  Retrying.\n")
-                        else:
+def downloadFile(url, localpath, urltype = "ftp://", retries = 3, overwrite = False, checkmd5 = False, md5 = None):
+    success = False
+    if url[:7] == "http://":
+        urltype = ""
+    for i in range(0, retries):
+        print("Attempt #%s, downloading %s\n" % (i + 1, url))
+        try:
+            if not os.path.isfile(localpath) or overwrite:
+                with open(localpath, "w") as fwh:
+                    response = urllib2.urlopen(urltype + url)
+                    data = response.read()
+                    if checkmd5:
+                        if hashlib.md5(data).hexdigest() == md5:
                             fwh.write(data)
                             success = True
-                            wh.write("File saved successfully.\n")
-                elif os.path.isfile(localpath) and not overwrite:
-                    wh.write("File already exists, skipping download.\n")
-                    success = True
-            except Exception as e:
-                wh.write(traceback.format_exc())
-                overwrite = True
-            if success:
-                return
-        return
+                            print("File saved successfully.\n")
+                        else:
+                            print("MD5 mismatch.  Retrying.\n")
+                    else:
+                        fwh.write(data)
+                        success = True
+                        print("File saved successfully.\n")
+            elif os.path.isfile(localpath) and not overwrite:
+                print("File already exists, skipping download.\n")
+                success = True
+        except Exception as e:
+            print(traceback.format_exc())
+            overwrite = True
+        if success:
+            return
+    return
diff --git a/chipathlon/workflow.py b/chipathlon/workflow.py
index cdb23cc..df8c0ff 100644
--- a/chipathlon/workflow.py
+++ b/chipathlon/workflow.py
@@ -8,6 +8,7 @@ import xml.dom.minidom
 import yaml
 import traceback
 import chipathlon
+from pprint import pprint
 from Pegasus.DAX3 import *
 
 class Workflow(object):
@@ -39,14 +40,22 @@ class Workflow(object):
         self.modules = {}
         return
 
+    def info(self):
+        pprint(self.run_data)
+        pprint(self.yaml_jobs)
+        pprint(self.modules)
+        pprint(self.executables)
+        return
+
     def generate(self):
         self.load_modules()
-        self.validate_run()
+        self.load_runs()
         if not self.err:
             self.load_yaml_jobs()
             if not self.err:
                 print "Put the rest of the generating functions hurrrr."
-
+                self.load_executables()
+                # self.info()
 
             else:
                 print self.err
@@ -56,6 +65,23 @@ class Workflow(object):
             sys.exit(1)
         return
 
+    def load_executables(self, os_type = "linux", arch = "x86_64"):
+        # Load wrapper scripts for commands that need to be loaded from module
+        for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)):
+            for f in files:
+                ex_name = f.split("_")[0]
+                self.executables[ex_name] = Executable(name = ex_name, os = os_type, arch = arch)
+                self.executables[ex_name].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
+                self.dax.addExecutable(self.executables[ex_name])
+            break
+        # Load actual scripts
+        for root, dirs, files in os.walk("%s/scripts" % (os.path.dirname(os.path.realpath(__file__)),)):
+            for f in files:
+                self.executables[f] = Executable(name = f, os = os_type, arch = arch)
+                self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
+                self.dax.addExecutable(self.executables[f])
+        return
+
     def load_modules(self):
         for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_modules)):
             for f in files:
@@ -66,7 +92,7 @@ class Workflow(object):
             break
         return
 
-    def validate_run(self):
+    def load_runs(self):
         try:
             with open(self.run_file, "r") as rh:
                 self.run_data = yaml.load(rh)
@@ -82,9 +108,10 @@ class Workflow(object):
                                 self.err += "Error parsing run template file '%s'.  Module '%s' requires an entry, should be one of %s.\n" % (self.run_file, module_name, keys)
                 if "experiment" in run:
                     valid, msg, data = self.mdb.get_samples(run["experiment"])
-                    print valid
-                    print msg
-                    print data
+                    if valid:
+                        run["data"] = data
+                    else:
+                        self.err += msg
                 else:
                     self.err += "Error parsing run template file '%s'.  Required key 'experiment' not defined.\n" % (self.run_file,)
         except:
@@ -115,8 +142,8 @@ class Workflow(object):
             self.files[inout][name]["path"] = path if path else name
             self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), site))
             self.dax.addFile(self.files[dax][inout][name]["file"])
-        elif inout == "output":
-            self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
+        # elif inout == "output":
+        #     self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
         return
 
     def _addJob(self, jobname, executable, inputs, outputs, arguments, dependencies = None, dax = None, label = None, walltime = None):
@@ -183,22 +210,7 @@ class Workflow(object):
                 self.jobs[dax][jobname].profile(Namespace.GLOBUS, "maxwalltime", walltime)
         return
 
-    def _loadExecutables(self, os_type = "linux", arch = "x86_64"):
-        """
-            Loads all executables from the scripts directory into the dax
-        """
-        scriptFolder = os.path.dirname(os.path.realpath(__file__)) + "/scripts/"
-        if os.path.isdir(scriptFolder):
-            for root, dirs, files in os.walk(scriptFolder):
-                for ex in files:
-                    self.executables[ex] = Executable(name = ex, os = os_type, arch = arch)
-                    self.executables[ex].addPFN(PFN("file://" + scriptFolder + "/" + ex, "condorpool"))
-                    self.dax.addExecutable(self.executables[ex])
-                break
-		self.executables["bwa"] = Executable(name = "bwa", os = os_type, arch = arch)
-		self.executables[ex].addPFN(PFN("file:///util/opt/bwa/0.7/gcc/4.4/bin/bwa", "condorpool"))
-		self.dax.addExecutable(self.executables["bwa"])
-        return
+
 
     def _loadNotify(self, config):
         self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh")
-- 
GitLab