From 10f026fd5d1b092b7ebc64add87e661bedba3e77 Mon Sep 17 00:00:00 2001
From: aknecht2 <aknecht2@unl.edu>
Date: Fri, 15 Apr 2016 14:15:39 -0500
Subject: [PATCH] Added node support.  Updated database to load files in
 chunks.

---
 chipathlon/conf.py                            |   4 +
 chipathlon/db.py                              | 100 ++++++++++--------
 .../jobs/params/bedtools_bam_to_bed.yaml      |   1 +
 .../jobs/params/bowtie2_align_paired.yaml     |   1 +
 .../jobs/params/bowtie2_align_single.yaml     |   1 +
 chipathlon/jobs/params/bwa_align_paired.yaml  |   1 +
 chipathlon/jobs/params/bwa_align_single.yaml  |   1 +
 chipathlon/jobs/params/bwa_sai_to_sam.yaml    |   1 +
 .../jobs/params/cat_awk_sort_peaks.yaml       |   1 +
 chipathlon/jobs/params/cp_bed_tagalign.yaml   |   1 +
 chipathlon/jobs/params/db_save_result.yaml    |   1 +
 chipathlon/jobs/params/macs2_callpeak.yaml    |   1 +
 .../jobs/params/picard_mark_duplicates.yaml   |   1 +
 chipathlon/jobs/params/picard_sort_sam.yaml   |   1 +
 chipathlon/jobs/params/r_spp_nodups.yaml      |   1 +
 .../jobs/params/samtools_filter_bam.yaml      |   1 +
 .../params/samtools_remove_duplicates.yaml    |   1 +
 .../jobs/params/samtools_sam_to_bam.yaml      |   1 +
 .../jobs/params/sort_awk_sort_peaks.yaml      |   1 +
 .../jobs/params/zcat_awk_sort_peaks.yaml      |   1 +
 chipathlon/jobs/scripts/sort_macs2.sh         |   2 +-
 chipathlon/tester.py                          |  16 ++-
 22 files changed, 88 insertions(+), 52 deletions(-)

diff --git a/chipathlon/conf.py b/chipathlon/conf.py
index a6110db..656e1d7 100644
--- a/chipathlon/conf.py
+++ b/chipathlon/conf.py
@@ -67,6 +67,10 @@ resources = {
     "cores": {
         "namespace": "pegasus",
         "key": "cores"
+    },
+    "nodes": {
+        "namespace": "pegasus",
+        "key": "nodes"
     }
 }
 
diff --git a/chipathlon/db.py b/chipathlon/db.py
index 3cb7ec6..3526b41 100644
--- a/chipathlon/db.py
+++ b/chipathlon/db.py
@@ -4,6 +4,7 @@ import gridfs
 import sys
 import traceback
 import os
+import itertools
 from pprint import pprint
 
 
@@ -62,7 +63,7 @@ class MongoDB(object):
             else:
                 msg = "Not all input ids are valid.  The following are invalid: "
                 for id_list, valid_list in zip([control_ids, experiment_ids], [valid_controls, valid_experiments]):
-                    msg += ",".join([id_list[i] for i, valid in enumerate(valid_list) if not valid])
+                    msg += ", ".join([id_list[i] for i, valid in enumerate(valid_list) if not valid])
         else:
             msg = "Specified output_file %s does not exist." % (output_file,)
         return (False, msg, None)
@@ -78,27 +79,30 @@ class MongoDB(object):
             # then insert with insert_many()
             print "loading bed_data..."
             with open(bed_file, "r") as rh:
-                bed_data = [
-                    {
-                        "result_id": result_id,
-                        "chr": line_info[0],
-                        "start": line_info[1],
-                        "end": line_info[2],
-                        "name": line_info[3],
-                        "score": line_info[4],
-                        "strand": line_info[5]
-                    }
-                    for line in rh.readlines()
-                    for line_info in (line.split(),)
-                ]
-                try:
-                    print "bed data loaded, inserting."
-                    self.db.bed.insert_many(bed_data)
-                    return (True, "Bed file successfully inserted.", result_id)
-                except pymongo.errors.OperationFailure as e:
-                    valid = False
-                    msg = "Error inserting bed_file %s: %s" % (bed_file, e)
-        return (valid, msg, None)
+                msg = "Bed file successfully inserted."
+                # Lazy load files in specified line chunk size ~100k lines
+                n_lines = 100000
+                line_set = list(itertools.islice(rh, n_lines))
+                while line_set:
+                    try:
+                        self.db.bed.insert_many([
+                            {
+                                "result_id": result_id,
+                                "chr": line_info[0],
+                                "start": line_info[1],
+                                "end": line_info[2],
+                                "name": line_info[3],
+                                "score": line_info[4],
+                                "strand": line_info[5]
+                            }
+                            for line in line_set
+                            for line_info in (line.split(),)
+                        ])
+                    except pymongo.errors.OperationFailure as e:
+                        valid = False
+                        msg = "Error inserting bed_file %s: %s" % (bed_file, e)
+                    line_set = list(itertools.islice(rh, n_lines))
+        return (valid, msg, result_id)
 
     def save_peak(self, peak_file, control_ids, experiment_ids, additional_data = {}):
         # Create result_entry for peak_file
@@ -108,30 +112,34 @@ class MongoDB(object):
             # Data is in a 10 column format
             # chr, start, end, name, score, strand, signal_value, p_value, q_value, summit
             with open(peak_file, "r") as rh:
-                peak_data = [
-                    {
-                        "result_id": result_id,
-                        "chr": line_info[0],
-                        "start": line_info[1],
-                        "end": line_info[2],
-                        "name": line_info[3],
-                        "score": line_info[4],
-                        "strand": line_info[5],
-                        "signal_value": line_info[6],
-                        "p_value": line_info[7],
-                        "q_value": line_info[8],
-                        "summit": line_info[9]
-                    }
-                    for line in rh.readlines()
-                    for line_info in (line.split(),)
-                ]
-                try:
-                    self.db.peak.insert_many(peak_data)
-                    return (True, "Peak file successfully inserted.", result_id)
-                except pymongo.errors.OperationFailure as e:
-                    valid = False
-                    msg = "Error inserting peak_file %s: %s" % (peak_file, e)
-        return (valid, msg, None)
+                msg = "Peak file successfully inserted."
+                # Lazy load files in specified line chunk size ~100k lines
+                n_lines = 10000
+                line_set = list(itertools.islice(rh, n_lines))
+                while line_set:
+                    try:
+                        self.db.peak.insert_many([
+                            {
+                                "result_id": result_id,
+                                "chr": line_info[0],
+                                "start": line_info[1],
+                                "end": line_info[2],
+                                "name": line_info[3],
+                                "score": line_info[4],
+                                "strand": line_info[5],
+                                "signal_value": line_info[6],
+                                "p_value": line_info[7],
+                                "q_value": line_info[8],
+                                "summit": line_info[9]
+                            }
+                            for line in line_set
+                            for line_info in (line.split(),)
+                        ])
+                    except pymongo.errors.OperationFailure as e:
+                        valid = False
+                        msg = "Error inserting peak_file %s: %s" % (peak_file, e)
+                    line_set = list(itertools.islice(rh, n_lines))
+        return (valid, msg, result_id)
 
     def is_valid_sample(self, sample_accession):
         try:
diff --git a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml
index c578d48..df9a59a 100644
--- a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml
+++ b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml
@@ -20,3 +20,4 @@ bedtools_bam_to_bed:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/bowtie2_align_paired.yaml b/chipathlon/jobs/params/bowtie2_align_paired.yaml
index f2960c0..56f7906 100644
--- a/chipathlon/jobs/params/bowtie2_align_paired.yaml
+++ b/chipathlon/jobs/params/bowtie2_align_paired.yaml
@@ -56,3 +56,4 @@ bowtie2_align_paired:
   walltime: 2000
   memory: 2000
   cores: 8
+  nodes: 1
diff --git a/chipathlon/jobs/params/bowtie2_align_single.yaml b/chipathlon/jobs/params/bowtie2_align_single.yaml
index da7c4d9..6855b4d 100644
--- a/chipathlon/jobs/params/bowtie2_align_single.yaml
+++ b/chipathlon/jobs/params/bowtie2_align_single.yaml
@@ -49,3 +49,4 @@ bowtie2_align_single:
   walltime: 2000
   memory: 2000
   cores: 8
+  nodes: 1
diff --git a/chipathlon/jobs/params/bwa_align_paired.yaml b/chipathlon/jobs/params/bwa_align_paired.yaml
index 1940cff..3602534 100644
--- a/chipathlon/jobs/params/bwa_align_paired.yaml
+++ b/chipathlon/jobs/params/bwa_align_paired.yaml
@@ -50,3 +50,4 @@ bwa_align_paired:
   walltime: 2000
   memory: 2000
   cores: 8
+  nodes: 1
diff --git a/chipathlon/jobs/params/bwa_align_single.yaml b/chipathlon/jobs/params/bwa_align_single.yaml
index b5f8ef8..5b40ffd 100644
--- a/chipathlon/jobs/params/bwa_align_single.yaml
+++ b/chipathlon/jobs/params/bwa_align_single.yaml
@@ -60,3 +60,4 @@ bwa_align_single:
   walltime: 2000
   memory: 2000
   cores: 8
+  nodes: 1
diff --git a/chipathlon/jobs/params/bwa_sai_to_sam.yaml b/chipathlon/jobs/params/bwa_sai_to_sam.yaml
index cd37a4b..382e606 100644
--- a/chipathlon/jobs/params/bwa_sai_to_sam.yaml
+++ b/chipathlon/jobs/params/bwa_sai_to_sam.yaml
@@ -46,3 +46,4 @@ bwa_sai_to_sam:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/cat_awk_sort_peaks.yaml b/chipathlon/jobs/params/cat_awk_sort_peaks.yaml
index 46f32da..ded9fd2 100644
--- a/chipathlon/jobs/params/cat_awk_sort_peaks.yaml
+++ b/chipathlon/jobs/params/cat_awk_sort_peaks.yaml
@@ -19,3 +19,4 @@ cat_awk_sort_peaks:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/cp_bed_tagalign.yaml b/chipathlon/jobs/params/cp_bed_tagalign.yaml
index e5256ec..8dcebd7 100644
--- a/chipathlon/jobs/params/cp_bed_tagalign.yaml
+++ b/chipathlon/jobs/params/cp_bed_tagalign.yaml
@@ -19,3 +19,4 @@ cp_bed_tagalign:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/db_save_result.yaml b/chipathlon/jobs/params/db_save_result.yaml
index 887ce4a..2c2b74e 100644
--- a/chipathlon/jobs/params/db_save_result.yaml
+++ b/chipathlon/jobs/params/db_save_result.yaml
@@ -42,3 +42,4 @@ db_save_result:
   walltime: 2000
   memory: 16000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/macs2_callpeak.yaml b/chipathlon/jobs/params/macs2_callpeak.yaml
index f0c48d5..fbee879 100644
--- a/chipathlon/jobs/params/macs2_callpeak.yaml
+++ b/chipathlon/jobs/params/macs2_callpeak.yaml
@@ -75,3 +75,4 @@ macs2_callpeak:
   walltime: 2000
   memory: 8000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/picard_mark_duplicates.yaml b/chipathlon/jobs/params/picard_mark_duplicates.yaml
index 04baad5..2c9d960 100644
--- a/chipathlon/jobs/params/picard_mark_duplicates.yaml
+++ b/chipathlon/jobs/params/picard_mark_duplicates.yaml
@@ -47,3 +47,4 @@ picard_mark_duplicates:
   walltime: 2000
   memory: 8000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/picard_sort_sam.yaml b/chipathlon/jobs/params/picard_sort_sam.yaml
index e1d8749..d9bb2d7 100644
--- a/chipathlon/jobs/params/picard_sort_sam.yaml
+++ b/chipathlon/jobs/params/picard_sort_sam.yaml
@@ -30,3 +30,4 @@ picard_sort_sam:
   walltime: 2000
   memory: 8000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/r_spp_nodups.yaml b/chipathlon/jobs/params/r_spp_nodups.yaml
index 1d03155..d350bc0 100644
--- a/chipathlon/jobs/params/r_spp_nodups.yaml
+++ b/chipathlon/jobs/params/r_spp_nodups.yaml
@@ -67,3 +67,4 @@ r_spp_nodups:
   walltime: 2000
   memory: 16000
   cores: 8
+  nodes: 1
diff --git a/chipathlon/jobs/params/samtools_filter_bam.yaml b/chipathlon/jobs/params/samtools_filter_bam.yaml
index 33f5e08..1ee86a0 100644
--- a/chipathlon/jobs/params/samtools_filter_bam.yaml
+++ b/chipathlon/jobs/params/samtools_filter_bam.yaml
@@ -38,3 +38,4 @@ samtools_filter_bam:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/samtools_remove_duplicates.yaml b/chipathlon/jobs/params/samtools_remove_duplicates.yaml
index c610bc3..75fd0f7 100644
--- a/chipathlon/jobs/params/samtools_remove_duplicates.yaml
+++ b/chipathlon/jobs/params/samtools_remove_duplicates.yaml
@@ -33,3 +33,4 @@ samtools_remove_duplicates:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/samtools_sam_to_bam.yaml b/chipathlon/jobs/params/samtools_sam_to_bam.yaml
index 18d72d4..91ff094 100644
--- a/chipathlon/jobs/params/samtools_sam_to_bam.yaml
+++ b/chipathlon/jobs/params/samtools_sam_to_bam.yaml
@@ -28,3 +28,4 @@ samtools_sam_to_bam:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/sort_awk_sort_peaks.yaml b/chipathlon/jobs/params/sort_awk_sort_peaks.yaml
index 1b81ae0..18cf70d 100644
--- a/chipathlon/jobs/params/sort_awk_sort_peaks.yaml
+++ b/chipathlon/jobs/params/sort_awk_sort_peaks.yaml
@@ -19,3 +19,4 @@ sort_awk_sort_peaks:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml b/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml
index f0e42fd..fbd1db1 100644
--- a/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml
+++ b/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml
@@ -19,3 +19,4 @@ zcat_awk_sort_peaks:
   walltime: 2000
   memory: 2000
   cores: 1
+  nodes: 1
diff --git a/chipathlon/jobs/scripts/sort_macs2.sh b/chipathlon/jobs/scripts/sort_macs2.sh
index 87439cc..106b428 100755
--- a/chipathlon/jobs/scripts/sort_macs2.sh
+++ b/chipathlon/jobs/scripts/sort_macs2.sh
@@ -1,3 +1,3 @@
 #!/bin/bash
 
-/bin/sort -k 8gr,8gr "$1" | awk 'BEGIN{OFS="\t"}{$r="PEAK_"NR; print $0;}' > "$2"
+/bin/sort -k 8gr,8gr "$1" | awk 'BEGIN{OFS="\t"}{$4="PEAK_"NR; print $0;}' > "$2"
diff --git a/chipathlon/tester.py b/chipathlon/tester.py
index 8acabce..b831dbf 100644
--- a/chipathlon/tester.py
+++ b/chipathlon/tester.py
@@ -40,21 +40,27 @@ class InsertResultsTests(DatabaseTestCase):
         valid, msg, result_id = self.mdb.save_bed(
                                     "test/insert_tests/sample.bed",
                                     [],
-                                    ["ENCSR000BKT"],
+                                    ["ENCFF000RCB"],
                                     {"align_tool": "bowtie2", "read_end": "single"}
                                 )
-        self.result_ids.append(result_id)
+        if valid:
+            self.result_ids.append(result_id)
+        else:
+            print msg
         self.assertTrue(valid)
         return
 
     def test_insert_peak(self):
         valid, msg, result_id = self.mdb.save_peak(
                                     "test/insert_tests/sample.narrowPeak",
-                                    ["ENCSR000BLJ"],
-                                    ["ENCSR000BKT"],
+                                    ["ENCFF000RCB"],
+                                    ["ENCFF000RCB"],
                                     {"align_tool": "bwa", "read_end": "single", "peak_tool": "macs2"}
                                 )
-        self.result_ids.append(result_id)
+        if valid:
+            self.result_ids.append(result_id)
+        else:
+            print msg
         self.assertTrue(valid)
         return
 
-- 
GitLab