diff --git a/chipathlon/conf.py b/chipathlon/conf.py index a6110dba46a5ed4625cc7d0ac78d1a580a2f940b..656e1d72b855551e8c25a91b7542a437fc4979da 100644 --- a/chipathlon/conf.py +++ b/chipathlon/conf.py @@ -67,6 +67,10 @@ resources = { "cores": { "namespace": "pegasus", "key": "cores" + }, + "nodes": { + "namespace": "pegasus", + "key": "nodes" } } diff --git a/chipathlon/db.py b/chipathlon/db.py index 3cb7ec628e1d5ae417f35efc5b3b490f266fd54f..3526b411398367b0d3081917ca0cdc07f239ba43 100644 --- a/chipathlon/db.py +++ b/chipathlon/db.py @@ -4,6 +4,7 @@ import gridfs import sys import traceback import os +import itertools from pprint import pprint @@ -62,7 +63,7 @@ class MongoDB(object): else: msg = "Not all input ids are valid. The following are invalid: " for id_list, valid_list in zip([control_ids, experiment_ids], [valid_controls, valid_experiments]): - msg += ",".join([id_list[i] for i, valid in enumerate(valid_list) if not valid]) + msg += ", ".join([id_list[i] for i, valid in enumerate(valid_list) if not valid]) else: msg = "Specified output_file %s does not exist." % (output_file,) return (False, msg, None) @@ -78,27 +79,30 @@ class MongoDB(object): # then insert with insert_many() print "loading bed_data..." with open(bed_file, "r") as rh: - bed_data = [ - { - "result_id": result_id, - "chr": line_info[0], - "start": line_info[1], - "end": line_info[2], - "name": line_info[3], - "score": line_info[4], - "strand": line_info[5] - } - for line in rh.readlines() - for line_info in (line.split(),) - ] - try: - print "bed data loaded, inserting." - self.db.bed.insert_many(bed_data) - return (True, "Bed file successfully inserted.", result_id) - except pymongo.errors.OperationFailure as e: - valid = False - msg = "Error inserting bed_file %s: %s" % (bed_file, e) - return (valid, msg, None) + msg = "Bed file successfully inserted." + # Lazy load files in specified line chunk size ~100k lines + n_lines = 100000 + line_set = list(itertools.islice(rh, n_lines)) + while line_set: + try: + self.db.bed.insert_many([ + { + "result_id": result_id, + "chr": line_info[0], + "start": line_info[1], + "end": line_info[2], + "name": line_info[3], + "score": line_info[4], + "strand": line_info[5] + } + for line in line_set + for line_info in (line.split(),) + ]) + except pymongo.errors.OperationFailure as e: + valid = False + msg = "Error inserting bed_file %s: %s" % (bed_file, e) + line_set = list(itertools.islice(rh, n_lines)) + return (valid, msg, result_id) def save_peak(self, peak_file, control_ids, experiment_ids, additional_data = {}): # Create result_entry for peak_file @@ -108,30 +112,34 @@ class MongoDB(object): # Data is in a 10 column format # chr, start, end, name, score, strand, signal_value, p_value, q_value, summit with open(peak_file, "r") as rh: - peak_data = [ - { - "result_id": result_id, - "chr": line_info[0], - "start": line_info[1], - "end": line_info[2], - "name": line_info[3], - "score": line_info[4], - "strand": line_info[5], - "signal_value": line_info[6], - "p_value": line_info[7], - "q_value": line_info[8], - "summit": line_info[9] - } - for line in rh.readlines() - for line_info in (line.split(),) - ] - try: - self.db.peak.insert_many(peak_data) - return (True, "Peak file successfully inserted.", result_id) - except pymongo.errors.OperationFailure as e: - valid = False - msg = "Error inserting peak_file %s: %s" % (peak_file, e) - return (valid, msg, None) + msg = "Peak file successfully inserted." + # Lazy load files in specified line chunk size ~100k lines + n_lines = 10000 + line_set = list(itertools.islice(rh, n_lines)) + while line_set: + try: + self.db.peak.insert_many([ + { + "result_id": result_id, + "chr": line_info[0], + "start": line_info[1], + "end": line_info[2], + "name": line_info[3], + "score": line_info[4], + "strand": line_info[5], + "signal_value": line_info[6], + "p_value": line_info[7], + "q_value": line_info[8], + "summit": line_info[9] + } + for line in line_set + for line_info in (line.split(),) + ]) + except pymongo.errors.OperationFailure as e: + valid = False + msg = "Error inserting peak_file %s: %s" % (peak_file, e) + line_set = list(itertools.islice(rh, n_lines)) + return (valid, msg, result_id) def is_valid_sample(self, sample_accession): try: diff --git a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml index c578d48c761fb33affce02f5e6a4bc27b356c2d5..df9a59af77e7e61ab8f01760393e2008b0b185c0 100644 --- a/chipathlon/jobs/params/bedtools_bam_to_bed.yaml +++ b/chipathlon/jobs/params/bedtools_bam_to_bed.yaml @@ -20,3 +20,4 @@ bedtools_bam_to_bed: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/bowtie2_align_paired.yaml b/chipathlon/jobs/params/bowtie2_align_paired.yaml index f2960c0321863a18658862b67e51c9c1c7b243cc..56f7906470092d109b6fab297692a843063869ee 100644 --- a/chipathlon/jobs/params/bowtie2_align_paired.yaml +++ b/chipathlon/jobs/params/bowtie2_align_paired.yaml @@ -56,3 +56,4 @@ bowtie2_align_paired: walltime: 2000 memory: 2000 cores: 8 + nodes: 1 diff --git a/chipathlon/jobs/params/bowtie2_align_single.yaml b/chipathlon/jobs/params/bowtie2_align_single.yaml index da7c4d968c63808d1e88633ac5139eb310cb636a..6855b4d7729154e61175e65299d1fd98051ff303 100644 --- a/chipathlon/jobs/params/bowtie2_align_single.yaml +++ b/chipathlon/jobs/params/bowtie2_align_single.yaml @@ -49,3 +49,4 @@ bowtie2_align_single: walltime: 2000 memory: 2000 cores: 8 + nodes: 1 diff --git a/chipathlon/jobs/params/bwa_align_paired.yaml b/chipathlon/jobs/params/bwa_align_paired.yaml index 1940cff09b8013afcaaef17fa7189707f9b8a455..36025348b74850b56d5d080ee163c18231a6b459 100644 --- a/chipathlon/jobs/params/bwa_align_paired.yaml +++ b/chipathlon/jobs/params/bwa_align_paired.yaml @@ -50,3 +50,4 @@ bwa_align_paired: walltime: 2000 memory: 2000 cores: 8 + nodes: 1 diff --git a/chipathlon/jobs/params/bwa_align_single.yaml b/chipathlon/jobs/params/bwa_align_single.yaml index b5f8ef801c679cb0d58dff2ae96de30c3e003682..5b40ffd312d06389fcce9a5f763b8e45d4bacd23 100644 --- a/chipathlon/jobs/params/bwa_align_single.yaml +++ b/chipathlon/jobs/params/bwa_align_single.yaml @@ -60,3 +60,4 @@ bwa_align_single: walltime: 2000 memory: 2000 cores: 8 + nodes: 1 diff --git a/chipathlon/jobs/params/bwa_sai_to_sam.yaml b/chipathlon/jobs/params/bwa_sai_to_sam.yaml index cd37a4bbf8a2727f54a5e63bd95e50053db408e4..382e606c19b4d4c2f7146d61a3e7fa2ff94f6607 100644 --- a/chipathlon/jobs/params/bwa_sai_to_sam.yaml +++ b/chipathlon/jobs/params/bwa_sai_to_sam.yaml @@ -46,3 +46,4 @@ bwa_sai_to_sam: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/cat_awk_sort_peaks.yaml b/chipathlon/jobs/params/cat_awk_sort_peaks.yaml index 46f32da5e3449c2879bd98f72bc90959b09ea8df..ded9fd21972599394ee3ec378911d390e21d19a7 100644 --- a/chipathlon/jobs/params/cat_awk_sort_peaks.yaml +++ b/chipathlon/jobs/params/cat_awk_sort_peaks.yaml @@ -19,3 +19,4 @@ cat_awk_sort_peaks: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/cp_bed_tagalign.yaml b/chipathlon/jobs/params/cp_bed_tagalign.yaml index e5256ecda4d8def0f5550a04abe583434def5a9f..8dcebd76052a3078c945a54754230182296d2fa6 100644 --- a/chipathlon/jobs/params/cp_bed_tagalign.yaml +++ b/chipathlon/jobs/params/cp_bed_tagalign.yaml @@ -19,3 +19,4 @@ cp_bed_tagalign: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/db_save_result.yaml b/chipathlon/jobs/params/db_save_result.yaml index 887ce4aba4befc82dfcfe167a83802a779371f21..2c2b74e349464d4e1f6469c01705d9c467030c12 100644 --- a/chipathlon/jobs/params/db_save_result.yaml +++ b/chipathlon/jobs/params/db_save_result.yaml @@ -42,3 +42,4 @@ db_save_result: walltime: 2000 memory: 16000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/macs2_callpeak.yaml b/chipathlon/jobs/params/macs2_callpeak.yaml index f0c48d585b550dd2bc934f668b19a1c06ee06124..fbee879475098aba97d5d4633d2714d1cf2a518a 100644 --- a/chipathlon/jobs/params/macs2_callpeak.yaml +++ b/chipathlon/jobs/params/macs2_callpeak.yaml @@ -75,3 +75,4 @@ macs2_callpeak: walltime: 2000 memory: 8000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/picard_mark_duplicates.yaml b/chipathlon/jobs/params/picard_mark_duplicates.yaml index 04baad5ee0f8a221f530f9bba7e1f97756afd1b3..2c9d96046ae5bb5ca70d400846f3b43fa68b5d31 100644 --- a/chipathlon/jobs/params/picard_mark_duplicates.yaml +++ b/chipathlon/jobs/params/picard_mark_duplicates.yaml @@ -47,3 +47,4 @@ picard_mark_duplicates: walltime: 2000 memory: 8000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/picard_sort_sam.yaml b/chipathlon/jobs/params/picard_sort_sam.yaml index e1d874948ac9f8c1d74d43897f6dbce5cf34bbfa..d9bb2d79928d9c143fff9e93740aace5a96d71f6 100644 --- a/chipathlon/jobs/params/picard_sort_sam.yaml +++ b/chipathlon/jobs/params/picard_sort_sam.yaml @@ -30,3 +30,4 @@ picard_sort_sam: walltime: 2000 memory: 8000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/r_spp_nodups.yaml b/chipathlon/jobs/params/r_spp_nodups.yaml index 1d03155b3f73076fcaafc35a4a38737472eb07d6..d350bc072a5f3d1bfc61851d1eaed6c63f810c1e 100644 --- a/chipathlon/jobs/params/r_spp_nodups.yaml +++ b/chipathlon/jobs/params/r_spp_nodups.yaml @@ -67,3 +67,4 @@ r_spp_nodups: walltime: 2000 memory: 16000 cores: 8 + nodes: 1 diff --git a/chipathlon/jobs/params/samtools_filter_bam.yaml b/chipathlon/jobs/params/samtools_filter_bam.yaml index 33f5e08e3f568c6ae1f240eead3f8929707022a2..1ee86a07f32a9d294fc47460f853535be58de450 100644 --- a/chipathlon/jobs/params/samtools_filter_bam.yaml +++ b/chipathlon/jobs/params/samtools_filter_bam.yaml @@ -38,3 +38,4 @@ samtools_filter_bam: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/samtools_remove_duplicates.yaml b/chipathlon/jobs/params/samtools_remove_duplicates.yaml index c610bc3f3e6385c00516849a02c1afb9ebceeb06..75fd0f74ae4f830f07f78bc184316821286798f2 100644 --- a/chipathlon/jobs/params/samtools_remove_duplicates.yaml +++ b/chipathlon/jobs/params/samtools_remove_duplicates.yaml @@ -33,3 +33,4 @@ samtools_remove_duplicates: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/samtools_sam_to_bam.yaml b/chipathlon/jobs/params/samtools_sam_to_bam.yaml index 18d72d438c68181fa85ea9d4e41739973aa885ff..91ff0943f9a4fcdddca153ee5adada255f9c6534 100644 --- a/chipathlon/jobs/params/samtools_sam_to_bam.yaml +++ b/chipathlon/jobs/params/samtools_sam_to_bam.yaml @@ -28,3 +28,4 @@ samtools_sam_to_bam: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/sort_awk_sort_peaks.yaml b/chipathlon/jobs/params/sort_awk_sort_peaks.yaml index 1b81ae02f46711f31cc8160e1d1b585a32c3816d..18cf70dec3369dfbc09eba0e66de05d15557d1b3 100644 --- a/chipathlon/jobs/params/sort_awk_sort_peaks.yaml +++ b/chipathlon/jobs/params/sort_awk_sort_peaks.yaml @@ -19,3 +19,4 @@ sort_awk_sort_peaks: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml b/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml index f0e42fd05f70d474ec6fe0c28e8606de5bc3f029..fbd1db1adef3fa6c787800f266e428298e5dbb3b 100644 --- a/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml +++ b/chipathlon/jobs/params/zcat_awk_sort_peaks.yaml @@ -19,3 +19,4 @@ zcat_awk_sort_peaks: walltime: 2000 memory: 2000 cores: 1 + nodes: 1 diff --git a/chipathlon/jobs/scripts/sort_macs2.sh b/chipathlon/jobs/scripts/sort_macs2.sh index 87439ccaaba90036b741d0f08890c1fb179734e0..106b42819ed6fe51ea22fd6815175128b10ea570 100755 --- a/chipathlon/jobs/scripts/sort_macs2.sh +++ b/chipathlon/jobs/scripts/sort_macs2.sh @@ -1,3 +1,3 @@ #!/bin/bash -/bin/sort -k 8gr,8gr "$1" | awk 'BEGIN{OFS="\t"}{$r="PEAK_"NR; print $0;}' > "$2" +/bin/sort -k 8gr,8gr "$1" | awk 'BEGIN{OFS="\t"}{$4="PEAK_"NR; print $0;}' > "$2" diff --git a/chipathlon/tester.py b/chipathlon/tester.py index 8acabceeae177fb1c5c3ae352ffbbce12246ec04..b831dbf296195b31b4a8bcb730f0b737e15899ed 100644 --- a/chipathlon/tester.py +++ b/chipathlon/tester.py @@ -40,21 +40,27 @@ class InsertResultsTests(DatabaseTestCase): valid, msg, result_id = self.mdb.save_bed( "test/insert_tests/sample.bed", [], - ["ENCSR000BKT"], + ["ENCFF000RCB"], {"align_tool": "bowtie2", "read_end": "single"} ) - self.result_ids.append(result_id) + if valid: + self.result_ids.append(result_id) + else: + print msg self.assertTrue(valid) return def test_insert_peak(self): valid, msg, result_id = self.mdb.save_peak( "test/insert_tests/sample.narrowPeak", - ["ENCSR000BLJ"], - ["ENCSR000BKT"], + ["ENCFF000RCB"], + ["ENCFF000RCB"], {"align_tool": "bwa", "read_end": "single", "peak_tool": "macs2"} ) - self.result_ids.append(result_id) + if valid: + self.result_ids.append(result_id) + else: + print msg self.assertTrue(valid) return