Commit 10f026fd authored by aknecht2's avatar aknecht2
Browse files

Added node support. Updated database to load files in chunks.

parent 73a0f8c1
...@@ -67,6 +67,10 @@ resources = { ...@@ -67,6 +67,10 @@ resources = {
"cores": { "cores": {
"namespace": "pegasus", "namespace": "pegasus",
"key": "cores" "key": "cores"
},
"nodes": {
"namespace": "pegasus",
"key": "nodes"
} }
} }
......
...@@ -4,6 +4,7 @@ import gridfs ...@@ -4,6 +4,7 @@ import gridfs
import sys import sys
import traceback import traceback
import os import os
import itertools
from pprint import pprint from pprint import pprint
...@@ -62,7 +63,7 @@ class MongoDB(object): ...@@ -62,7 +63,7 @@ class MongoDB(object):
else: else:
msg = "Not all input ids are valid. The following are invalid: " msg = "Not all input ids are valid. The following are invalid: "
for id_list, valid_list in zip([control_ids, experiment_ids], [valid_controls, valid_experiments]): for id_list, valid_list in zip([control_ids, experiment_ids], [valid_controls, valid_experiments]):
msg += ",".join([id_list[i] for i, valid in enumerate(valid_list) if not valid]) msg += ", ".join([id_list[i] for i, valid in enumerate(valid_list) if not valid])
else: else:
msg = "Specified output_file %s does not exist." % (output_file,) msg = "Specified output_file %s does not exist." % (output_file,)
return (False, msg, None) return (False, msg, None)
...@@ -78,27 +79,30 @@ class MongoDB(object): ...@@ -78,27 +79,30 @@ class MongoDB(object):
# then insert with insert_many() # then insert with insert_many()
print "loading bed_data..." print "loading bed_data..."
with open(bed_file, "r") as rh: with open(bed_file, "r") as rh:
bed_data = [ msg = "Bed file successfully inserted."
{ # Lazy load files in specified line chunk size ~100k lines
"result_id": result_id, n_lines = 100000
"chr": line_info[0], line_set = list(itertools.islice(rh, n_lines))
"start": line_info[1], while line_set:
"end": line_info[2], try:
"name": line_info[3], self.db.bed.insert_many([
"score": line_info[4], {
"strand": line_info[5] "result_id": result_id,
} "chr": line_info[0],
for line in rh.readlines() "start": line_info[1],
for line_info in (line.split(),) "end": line_info[2],
] "name": line_info[3],
try: "score": line_info[4],
print "bed data loaded, inserting." "strand": line_info[5]
self.db.bed.insert_many(bed_data) }
return (True, "Bed file successfully inserted.", result_id) for line in line_set
except pymongo.errors.OperationFailure as e: for line_info in (line.split(),)
valid = False ])
msg = "Error inserting bed_file %s: %s" % (bed_file, e) except pymongo.errors.OperationFailure as e:
return (valid, msg, None) valid = False
msg = "Error inserting bed_file %s: %s" % (bed_file, e)
line_set = list(itertools.islice(rh, n_lines))
return (valid, msg, result_id)
def save_peak(self, peak_file, control_ids, experiment_ids, additional_data = {}): def save_peak(self, peak_file, control_ids, experiment_ids, additional_data = {}):
# Create result_entry for peak_file # Create result_entry for peak_file
...@@ -108,30 +112,34 @@ class MongoDB(object): ...@@ -108,30 +112,34 @@ class MongoDB(object):
# Data is in a 10 column format # Data is in a 10 column format
# chr, start, end, name, score, strand, signal_value, p_value, q_value, summit # chr, start, end, name, score, strand, signal_value, p_value, q_value, summit
with open(peak_file, "r") as rh: with open(peak_file, "r") as rh:
peak_data = [ msg = "Peak file successfully inserted."
{ # Lazy load files in specified line chunk size ~100k lines
"result_id": result_id, n_lines = 10000
"chr": line_info[0], line_set = list(itertools.islice(rh, n_lines))
"start": line_info[1], while line_set:
"end": line_info[2], try:
"name": line_info[3], self.db.peak.insert_many([
"score": line_info[4], {
"strand": line_info[5], "result_id": result_id,
"signal_value": line_info[6], "chr": line_info[0],
"p_value": line_info[7], "start": line_info[1],
"q_value": line_info[8], "end": line_info[2],
"summit": line_info[9] "name": line_info[3],
} "score": line_info[4],
for line in rh.readlines() "strand": line_info[5],
for line_info in (line.split(),) "signal_value": line_info[6],
] "p_value": line_info[7],
try: "q_value": line_info[8],
self.db.peak.insert_many(peak_data) "summit": line_info[9]
return (True, "Peak file successfully inserted.", result_id) }
except pymongo.errors.OperationFailure as e: for line in line_set
valid = False for line_info in (line.split(),)
msg = "Error inserting peak_file %s: %s" % (peak_file, e) ])
return (valid, msg, None) except pymongo.errors.OperationFailure as e:
valid = False
msg = "Error inserting peak_file %s: %s" % (peak_file, e)
line_set = list(itertools.islice(rh, n_lines))
return (valid, msg, result_id)
def is_valid_sample(self, sample_accession): def is_valid_sample(self, sample_accession):
try: try:
......
...@@ -20,3 +20,4 @@ bedtools_bam_to_bed: ...@@ -20,3 +20,4 @@ bedtools_bam_to_bed:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -56,3 +56,4 @@ bowtie2_align_paired: ...@@ -56,3 +56,4 @@ bowtie2_align_paired:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 8 cores: 8
nodes: 1
...@@ -49,3 +49,4 @@ bowtie2_align_single: ...@@ -49,3 +49,4 @@ bowtie2_align_single:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 8 cores: 8
nodes: 1
...@@ -50,3 +50,4 @@ bwa_align_paired: ...@@ -50,3 +50,4 @@ bwa_align_paired:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 8 cores: 8
nodes: 1
...@@ -60,3 +60,4 @@ bwa_align_single: ...@@ -60,3 +60,4 @@ bwa_align_single:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 8 cores: 8
nodes: 1
...@@ -46,3 +46,4 @@ bwa_sai_to_sam: ...@@ -46,3 +46,4 @@ bwa_sai_to_sam:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -19,3 +19,4 @@ cat_awk_sort_peaks: ...@@ -19,3 +19,4 @@ cat_awk_sort_peaks:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -19,3 +19,4 @@ cp_bed_tagalign: ...@@ -19,3 +19,4 @@ cp_bed_tagalign:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -42,3 +42,4 @@ db_save_result: ...@@ -42,3 +42,4 @@ db_save_result:
walltime: 2000 walltime: 2000
memory: 16000 memory: 16000
cores: 1 cores: 1
nodes: 1
...@@ -75,3 +75,4 @@ macs2_callpeak: ...@@ -75,3 +75,4 @@ macs2_callpeak:
walltime: 2000 walltime: 2000
memory: 8000 memory: 8000
cores: 1 cores: 1
nodes: 1
...@@ -47,3 +47,4 @@ picard_mark_duplicates: ...@@ -47,3 +47,4 @@ picard_mark_duplicates:
walltime: 2000 walltime: 2000
memory: 8000 memory: 8000
cores: 1 cores: 1
nodes: 1
...@@ -30,3 +30,4 @@ picard_sort_sam: ...@@ -30,3 +30,4 @@ picard_sort_sam:
walltime: 2000 walltime: 2000
memory: 8000 memory: 8000
cores: 1 cores: 1
nodes: 1
...@@ -67,3 +67,4 @@ r_spp_nodups: ...@@ -67,3 +67,4 @@ r_spp_nodups:
walltime: 2000 walltime: 2000
memory: 16000 memory: 16000
cores: 8 cores: 8
nodes: 1
...@@ -38,3 +38,4 @@ samtools_filter_bam: ...@@ -38,3 +38,4 @@ samtools_filter_bam:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -33,3 +33,4 @@ samtools_remove_duplicates: ...@@ -33,3 +33,4 @@ samtools_remove_duplicates:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -28,3 +28,4 @@ samtools_sam_to_bam: ...@@ -28,3 +28,4 @@ samtools_sam_to_bam:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -19,3 +19,4 @@ sort_awk_sort_peaks: ...@@ -19,3 +19,4 @@ sort_awk_sort_peaks:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
...@@ -19,3 +19,4 @@ zcat_awk_sort_peaks: ...@@ -19,3 +19,4 @@ zcat_awk_sort_peaks:
walltime: 2000 walltime: 2000
memory: 2000 memory: 2000
cores: 1 cores: 1
nodes: 1
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment