Commit 55dbeaa4 authored by aknecht2's avatar aknecht2
Browse files

Cleaned up and added documentation for database functions.

parent 62f28488
......@@ -14,6 +14,16 @@ import hashlib
class MongoDB(object):
def __init__(self, host, username, password, debug=False):
"""
:param host: The host address of the MongoDB database.
:type host: str
:param username: The username of the account for the MongoDB database.
:type username: str
:param password: The password for the user.
:type password: str
:param debug: If true print out debug messages
:type debug: bool
"""
self.debug = debug
self.host = host
self.username = username
......@@ -30,7 +40,12 @@ class MongoDB(object):
return
def delete_result(self, result_id):
# Make sure result exists
"""
:param result_id: ID of result to delete
:type result_id: ObjectId
Deletes a result and it's corresponding gridfs entry.
"""
cursor = self.db.results.find({
"_id": result_id
})
......@@ -68,6 +83,14 @@ class MongoDB(object):
return query
def result_exists(self, result, genome):
"""
:param result: The result to check.
:type result: :py:meth:~chipathlon.result.Result
:param genome: The genome to find information from.
:type genome: :py:meth:~chipathlon.genome.Genome
Check if a result exists.
"""
try:
cursor = self.db.results.find(self._get_result_query(result, genome))
return cursor.count() > 0
......@@ -77,6 +100,15 @@ class MongoDB(object):
def get_result(self, result, genome):
"""
:param result: The result to check.
:type result: :py:meth:~chipathlon.result.Result
:param genome: The genome to find information from.
:type genome: :py:meth:~chipathlon.genome.Genome
Get the metadata for the result from the database. If multiple results
exist, the most recently saved result is returned.
"""
try:
cursor = self.db.results.find(self._get_result_query(result, genome))
if cursor.count() > 0:
......@@ -85,14 +117,32 @@ class MongoDB(object):
print "Error checking result [%s]: %s" % (file_name, e)
return None
def create_result(self, output_file, control_sample_ids, experiment_sample_ids, result_type, additional_data = {}, gfs_attributes = {}):
def save_result(self, output_file, control_sample_accessions, signal_sample_accessions, result_type, additional_data = {}, gfs_attributes = {}):
"""
:param output_file: The path to the result to save.
:type output_file: str
:param control_sample_accessions: A list of control accessions.
:type control_sample_accessions: list
:param signal_sample_accessions: A list of signal accessions.
:type signal_sample_accessions: list
:param result_type: Useful file type info
:type result_type: str
:param additional_data: Additional metadata to store in mongo.
:type additional_data: dict
:param gfs_attributes: Additional metadata to store in gridfs.
:type gfs_attributes: dict
Saves a result file into mongodb and also creates the corresponding
gridfs file.
"""
# Make sure output_file exists
if os.path.isfile(output_file):
# Make sure that all control_sample_ids & experiment_sample_ids are valid
# Make sure that all control_sample_accessions & signal_sample_accessions are valid
# REMEMBER, these are ids for control & experiment SAMPLES
valid_controls = [self.is_valid_sample(cid) for cid in control_sample_ids]
valid_experiments = [self.is_valid_sample(eid) for eid in experiment_sample_ids]
valid_controls = [self.is_valid_sample(cid) for cid in control_sample_accessions]
valid_experiments = [self.is_valid_sample(eid) for eid in signal_sample_accessions]
if all(valid_controls) and all(valid_experiments):
gfs_attributes["file_type"] = result_type
# First, we load the output file into gfs
with open(output_file, "r") as rh:
# Calling put returns the gfs id
......@@ -100,8 +150,8 @@ class MongoDB(object):
# Now, we create the actual result entry by combining all necessary info
result_entry = {
"gridfs_id": gridfs_id,
"control_sample_ids": control_sample_ids,
"experiment_sample_ids": experiment_sample_ids,
"control_sample_accessions": control_sample_accessions,
"signal_sample_accessions": signal_sample_accessions,
"result_type": result_type,
"file_name": output_file,
"timestamp": time.time()
......@@ -113,101 +163,19 @@ class MongoDB(object):
return (True, "Result created successfully.", result.inserted_id)
else:
msg = "Not all input ids are valid. The following are invalid: "
for id_list, valid_list in zip([control_sample_ids, experiment_sample_ids], [valid_controls, valid_experiments]):
for id_list, valid_list in zip([control_sample_accessions, signal_sample_accessions], [valid_controls, valid_experiments]):
msg += ", ".join([id_list[i] for i, valid in enumerate(valid_list) if not valid])
else:
msg = "Specified output_file %s does not exist." % (output_file,)
return (False, msg, None)
def save_bam(self, bam_file, control_sample_ids, experiment_sample_ids, additional_data = {}):
# Create result entry for bam files. Since bam is a binary format, the file will only
# be stored in GridFS
valid, msg, result_id = self.create_result(bam_file, control_sample_ids, experiment_sample_ids, "bam", additional_data, gfs_attributes = {"file_type": "bam"})
return (valid, msg, result_id)
def save_bed(self, bed_file, control_sample_ids, experiment_sample_ids, additional_data = {}):
# Create result_entry for bed_file
valid, msg, result_id = self.create_result(bed_file, control_sample_ids, experiment_sample_ids, "bed", additional_data, gfs_attributes = {"file_type": "bed"})
if valid:
# Now we load the actual bed data into the bed collection.
# Data is in a six column format
# chr, start, end, name, score, strand
# Load data using a list comprehension over lines,
# then insert with insert_one()
# Each document contains "n_lines" number of lines from the
# result BED file.
print "loading bed_data..."
with open(bed_file, "r") as rh:
msg = "Bed file successfully inserted."
# Lazy load files in specified line chunk size
n_lines = chipathlon.conf.result_lines_per_document
line_set = list(itertools.islice(rh, n_lines))
while line_set:
try:
result_lines = []
for line in line_set:
line_info = line.split()
line_record = {
"chr": line_info[0],
"start": line_info[1],
"end": line_info[2],
"name": line_info[3],
"score": line_info[4],
"strand": line_info[5],
}
result_lines.append(line_record)
self.db.bed.insert_one({"result_id": result_id, "result_lines": result_lines})
except pymongo.errors.OperationFailure as e:
valid = False
msg = "Error inserting bed_file %s: %s" % (bed_file, e)
line_set = list(itertools.islice(rh, n_lines))
return (valid, msg, result_id)
def save_peak(self, peak_file, control_sample_ids, experiment_sample_ids, additional_data = {}):
# Create result_entry for peak_file
valid, msg, result_id = self.create_result(peak_file, control_sample_ids, experiment_sample_ids, "peak", additional_data, gfs_attributes = {"file_type": os.path.splitext(peak_file)[1][1:]})
if valid:
# Now we load the actual peak data into the collection
# Data is in a 10 column format
# chr, start, end, name, score, strand, signal_value, p_value, q_value, summit
# Each document contains "n_lines" number of lines from the
# result peak file.
with open(peak_file, "r") as rh:
msg = "Peak file successfully inserted."
# Lazy load files in specified line chunk size
n_lines = chipathlon.conf.result_lines_per_document
line_set = list(itertools.islice(rh, n_lines))
while line_set:
try:
result_lines = []
for line in line_set:
line_info = line.split()
line_record = {
"chr": line_info[0],
"start": line_info[1],
"end": line_info[2],
"name": line_info[3],
"score": line_info[4],
"strand": line_info[5],
"signal_value": line_info[6],
"p_value": line_info[7],
"q_value": line_info[8],
"summit": line_info[9]
}
result_lines.append(line_record)
self.db.peak.insert_one({"result_id": result_id, "result_lines": result_lines})
except pymongo.errors.OperationFailure as e:
valid = False
msg = "Error inserting peak_file %s: %s" % (peak_file, e)
line_set = list(itertools.islice(rh, n_lines))
return (valid, msg, result_id)
def is_valid_sample(self, sample_accession):
"""
:param sample_accession: The accession number to check.
:type sample_accession: str
Ensures that a sample with the accession specified actually exists.
"""
try:
cursor = self.db.samples.find({
"accession": sample_accession
......@@ -218,74 +186,37 @@ class MongoDB(object):
print "Error with sample_accession %s: %s" % (sample_accession, e)
return False
def is_valid_experiment(self, experiment_id):
def is_valid_experiment(self, experiment_accession):
"""
:param experiment_accession: The accession number to check.
:type experiment_accession: str
Ensures that an experiment with the accession specified actually exists.
"""
try:
cursor = self.db.experiments.find({
"target": {"$exists": True},
"@id": "/experiments/%s/" % (experiment_id,)
"@id": "/experiments/%s/" % (experiment_accession,)
})
if cursor.count() == 1:
return True
except pymongo.errors.OperationFailure as e:
print "Error with experiment_id %s: %s" % (experiment_id, e)
print "Error with experiment_accession %s: %s" % (experiment_accession, e)
return False
def check_valid_samples(self):
cursor = self.db.experiments.aggregate([
{
"$match": {
"target": {"$exists": True},
"revoked_files.0": {"$exists": False},
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False}
}
},
{
"$lookup": {
"from": "samples",
"localField": "uuid",
"foreignField": "experiment_id",
"as": "samples"
}
}
])
total = 0
has_samples = 0
for document in cursor:
total += 1
if len(document["samples"]) > 0:
has_samples += 1
return (has_samples, total)
def get_assembly(self, experiment_id):
valid = True
msg = ""
data = ""
cursor = self.db.experiments.find({
"target": {"$exists": True},
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False},
"@id": "/experiments/%s/" % (experiment_id,)
})
if cursor.count() == 1:
document = cursor.next()
data = document["assembly"][0]
msg = "Succesfully retrieved assembly for experiment with id '%s'.\n" % (experiment_id,)
else:
valid = False
msg = "Experiment with id '%s' does not exist.\n" % (experiment_id,)
return (valid, msg, data)
def fetch_from_gridfs(self, gridfs_id, filename, checkmd5=True):
"""
:param gridfs_id: GridFS _id of file to get.
:type gridfs_id: bson.objectid.ObjectId
:param filename: Filename to save file to.
:type filename: str
:param checkmd5: Whether or not to validate the md5 of the result
:type checkmd5: bool
Fetch the file with _id 'gridfs_id' from GridFS and save to the file 'filename'.
Fetch the file with the corresponding id and save it under the
specified 'filename'. If checkmd5 is specified, validate that the saved
file has a correct md5 value.
"""
try:
gridfs_file = self.gfs.get(gridfs_id)
gridfs_md5 = gridfs_file.md5
......@@ -345,19 +276,29 @@ class MongoDB(object):
)
return (valid, msg, data)
def get_samples(self, experiment_id, file_type):
def get_samples(self, experiment_accession, file_type):
"""
:param experiment_accession: Accession number of the experiment to grab samples from.
:type experiment_accession: str
:param file_type: File type of samples to grab usually fastq or bam
:type file_type: str
Validates and gets samples for the given experiment. Experiments must
have control and signal samples of the provided file_type to be
considered valid.
"""
valid = True
msg = ""
data = {}
# First, check to make sure the target experiment is valid
if self.is_valid_experiment(experiment_id):
if self.is_valid_experiment(experiment_accession):
# Next, we check that there is a least 1 possible control
check3 = self.db.experiments.find({
"target": {"$exists": True},
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False},
"possible_controls.0": {"$exists": True},
"@id": "/experiments/%s/" % (experiment_id,)
"@id": "/experiments/%s/" % (experiment_accession,)
})
if check3.count() == 1:
# Complicated aggregtaion pipeline does the following steps:
......@@ -375,14 +316,14 @@ class MongoDB(object):
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False},
"possible_controls.0": {"$exists": True},
"@id": "/experiments/%s/" % (experiment_id,)
"@id": "/experiments/%s/" % (experiment_accession,)
}
},
{
"$lookup": {
"from": "samples",
"localField": "uuid",
"foreignField": "experiment_id",
"foreignField": "experiment_accession",
"as": "samples"
}
},
......@@ -393,7 +334,7 @@ class MongoDB(object):
"$lookup": {
"from": "samples",
"localField": "possible_controls.uuid",
"foreignField": "experiment_id",
"foreignField": "experiment_accession",
"as": "possible_controls.samples"
}
},
......@@ -411,18 +352,18 @@ class MongoDB(object):
control_inputs = [sample for control in document["possible_controls"] for sample in control["samples"] if ("file_type" in sample and sample["file_type"] == file_type)]
signal_inputs = [sample for sample in document["samples"][0] if ("file_type" in sample and sample["file_type"] == file_type)]
if (len(control_inputs) > 0 and len(signal_inputs) > 0):
msg = "Succesfully retrieved input files for experiment with id '%s'.\n" % (experiment_id,)
msg = "Succesfully retrieved input files for experiment with id '%s'.\n" % (experiment_accession,)
data = {
"control": control_inputs,
"signal": signal_inputs
}
else:
valid = False
msg = "Experiment with id '%s' has %s possible control inputs, and %s possible experiment inputs.\n" % (experiment_id, len(control_inputs), len(experiment_inputs))
msg = "Experiment with id '%s' has %s possible control inputs, and %s possible experiment inputs.\n" % (experiment_accession, len(control_inputs), len(experiment_inputs))
else:
valid = False
msg = "Experiment with id '%s' does not have possible_controls.\n" % (experiment_id,)
msg = "Experiment with id '%s' does not have possible_controls.\n" % (experiment_accession,)
else:
valid = False
msg = "Experiment with id '%s' is not valid! It may not exist, or it may be missing required metadata.\n" % (experiment_id,)
msg = "Experiment with id '%s' is not valid! It may not exist, or it may be missing required metadata.\n" % (experiment_accession,)
return (valid, msg, data)
......@@ -19,12 +19,13 @@ if os.path.isfile(args.file) and os.path.isfile(args.meta):
valid = False
msg = ""
if meta["result_type"] == "bed":
valid, msg, data = mdb.save_bed(args.file, meta["control_sample_accessions"], meta["signal_sample_accessions"], meta)
elif meta["result_type"] == "peak":
valid, msg, data = mdb.save_peak(args.file, meta["control_sample_accessions"], meta["signal_sample_accessions"], meta)
elif meta["result_type"] == "bam":
valid, msg, data = mdb.save_bam(args.file, meta["control_sample_accessions"], meta["signal_sample_accessions"], meta)
valid, msg, data = mdb.save_result(
args.file,
meta["control_sample_accessions"],
meta["signal_sample_accessions"],
meta["result_type"],
meta
)
print msg
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment