Commit 47190424 authored by aknecht2's avatar aknecht2
Browse files

Added bed & peak file saving functionality to the database class. Updated...

Added bed & peak file saving functionality to the database class.  Updated testing script to use unittests.  Updated experiment_from_samples script to only print out unique values.  Added list of current tools to conf.
parent 716ec5b1
...@@ -22,6 +22,18 @@ system_commands = [ ...@@ -22,6 +22,18 @@ system_commands = [
"awk" "awk"
] ]
# Current align tools
align_tools = [
"bwa",
"bowtie2"
]
# Current peak callign tools
peak_tools = [
"spp",
"macs2"
]
# File extensions # File extensions
file_extensions = { file_extensions = {
"genome_index": ["fa", "fna"], "genome_index": ["fa", "fna"],
......
from pymongo import MongoClient from pymongo import MongoClient
import pymongo.errors
import gridfs import gridfs
import sys import sys
import traceback import traceback
import os
from pprint import pprint from pprint import pprint
...@@ -19,10 +21,131 @@ class MongoDB(object): ...@@ -19,10 +21,131 @@ class MongoDB(object):
self.gfs = gridfs.GridFS(self.db) self.gfs = gridfs.GridFS(self.db)
return return
def load_bed(self, collection, result_id, bed_file, attributes={}): def delete_result(self, result_id):
# Make sure result exists
cursor = self.db.results.find({
"_id": result_id
})
if cursor.count() == 1:
result = cursor.next()
self.gfs.delete(result["gridfs_id"])
self.db[result["result_type"]].delete_many({"result_id": result["_id"]})
self.db.results.delete_one({"_id": result["_id"]})
else:
print "result_id %s doesn't exist." % (result_id,)
return return
def create_result(self, output_file, control_ids, experiment_ids, result_type, additional_data = {}, gfs_attributes = {}):
# Make sure output_file exists
if os.path.isfile(output_file):
# Make sure that all control_ids & experiment_ids are valid
valid_controls = [self.is_valid_experiment(cid) for cid in control_ids]
valid_experiments = [self.is_valid_experiment(eid) for eid in experiment_ids]
if all(valid_controls) and all(valid_experiments):
# First, we load the output file into gfs
with open(output_file, "r") as rh:
# Calling put returns the gfs id
gridfs_id = self.gfs.put(rh, filename=os.path.basename(output_file), **gfs_attributes)
# Now, we create the actual result entry by combining all necessary info
result_entry = {
"gridfs_id": gridfs_id,
"control_ids": control_ids,
"experiment_ids": experiment_ids,
"result_type": result_type
}
# Add additional attributes into the result_entry
result_entry.update(additional_data)
# Insert the entry into the database, and return the id
result = self.db.results.insert_one(result_entry)
return (True, "Result created successfully.", result.inserted_id)
else:
msg = "Not all input ids are valid. The following are invalid:"
for id_list, valid_list in zip([control_ids, experiment_ids], [valid_controls, valid_experiments]):
for i, valid in enumerate(valid_list):
if not valid:
msg += id_list[i] + ", "
else:
msg = "Specified output_file %s does not exist." % (output_file,)
return (False, msg, None)
def save_bed(self, bed_file, control_ids, experiment_ids, additional_data = {}):
# Create result_entry for bed_file
valid, msg, result_id = self.create_result(bed_file, control_ids, experiment_ids, "bed", additional_data, gfs_attributes = {"file_type": "bed"})
if valid:
# Now we load the actual bed data into the bed collection.
# Data is in a six column format
# chr, start, end, name, score, strand
# Load data using a list comprehension over lines,
# then insert with insert_many()
with open(bed_file, "r") as rh:
bed_data = [
{
"result_id": result_id,
"chr": line_info[0],
"start": line_info[1],
"end": line_info[2],
"name": line_info[3],
"score": line_info[4],
"strand": line_info[5]
}
for line in rh.readlines()
for line_info in (line.split(),)
]
try:
self.db.bed.insert_many(bed_data)
return (True, "Bed file successfully inserted.", result_id)
except pymongo.errors.OperationFailure as e:
valid = False
msg = "Error inserting bed_file %s: %s" % (bed_file, e)
return (valid, msg, None)
def save_peak(self, peak_file, control_ids, experiment_ids, additional_data = {}):
# Create result_entry for peak_file
valid, msg, result_id = self.create_result(peak_file, control_ids, experiment_ids, "peak", additional_data, gfs_attributes = {"file_type": os.path.splitext(peak_file)[1][1:]})
if valid:
# Now we load the actual peak data into the collection
# Data is in a 10 column format
# chr, start, end, name, score, strand, signal_value, p_value, q_value, summit
with open(peak_file, "r") as rh:
peak_data = [
{
"result_id": result_id,
"chr": line_info[0],
"start": line_info[1],
"end": line_info[2],
"name": line_info[3],
"score": line_info[4],
"strand": line_info[5],
"signal_value": line_info[6],
"p_value": line_info[7],
"q_value": line_info[8],
"summit": line_info[9]
}
for line in rh.readlines()
for line_info in (line.split(),)
]
try:
self.db.peak.insert_many(peak_data)
return (True, "Peak file successfully inserted.", result_id)
except pymongo.errors.OperationFailure as e:
valid = False
msg = "Error inserting peak_file %s: %s" % (peak_file, e)
return (valid, msg, None)
def is_valid_experiment(self, experiment_id):
try:
cursor = self.db.experiments.find({
"target": {"$exists": True},
"revoked_files.0": {"$exists": False},
"@id": "/experiments/%s/" % (experiment_id,)
})
if cursor.count() == 1:
return True
except pymongo.errors.OperationFailure as e:
print "Error with experiment_id %s: %s" % (experiment_id, e)
return False
def check_valid_samples(self): def check_valid_samples(self):
cursor = self.db.experiments.aggregate([ cursor = self.db.experiments.aggregate([
{ {
...@@ -74,95 +197,82 @@ class MongoDB(object): ...@@ -74,95 +197,82 @@ class MongoDB(object):
valid = True valid = True
msg = "" msg = ""
data = {} data = {}
# First, check to make sure the target experiment exists. # First, check to make sure the target experiment is valid
check = self.db.experiments.find({ if self.is_valid_experiment(experiment_id):
"@id": "/experiments/%s/" % (experiment_id,) # Next, we check that there is a least 1 possible control
}) check3 = self.db.experiments.find({
if check.count() == 1:
# Next, we check that all metadata is defined
check2 = self.db.experiments.find({
"target": {"$exists": True}, "target": {"$exists": True},
"revoked_files.0": {"$exists": False}, "revoked_files.0": {"$exists": False},
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False},
"possible_controls.0": {"$exists": True},
"@id": "/experiments/%s/" % (experiment_id,) "@id": "/experiments/%s/" % (experiment_id,)
}) })
if check2.count() == 1: if check3.count() == 1:
# Next, we check that there is a least 1 possible control # Complicated aggregtaion pipeline does the following steps:
check3 = self.db.experiments.find({ # 1. Find the experiment that matches the given id
"target": {"$exists": True}, # 2. Join samples into the collection by exp_id
"revoked_files.0": {"$exists": False}, # 3. Iterate through possible_controls
"assembly.0": {"$exists": True}, # 4. Join possible_control data into control_exps
"assembly.1": {"$exists": False}, # 5. Iterate through control_exps
"possible_controls.0": {"$exists": True}, # 6. Join samples into the control_exps by exp_id
"@id": "/experiments/%s/" % (experiment_id,) # 7. Re-aggregate all data into arrays
}) pipeline = [
if check3.count() == 1: {
# Complicated aggregtaion pipeline does the following steps: "$match": {
# 1. Find the experiment that matches the given id "target": {"$exists": True},
# 2. Join samples into the collection by exp_id "revoked_files.0": {"$exists": False},
# 3. Iterate through possible_controls "assembly.0": {"$exists": True},
# 4. Join possible_control data into control_exps "assembly.1": {"$exists": False},
# 5. Iterate through control_exps "possible_controls.0": {"$exists": True},
# 6. Join samples into the control_exps by exp_id "@id": "/experiments/%s/" % (experiment_id,)
# 7. Re-aggregate all data into arrays
pipeline = [
{
"$match": {
"target": {"$exists": True},
"revoked_files.0": {"$exists": False},
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False},
"possible_controls.0": {"$exists": True},
"@id": "/experiments/%s/" % (experiment_id,)
}
},
{
"$lookup": {
"from": "samples",
"localField": "uuid",
"foreignField": "experiment_id",
"as": "samples"
}
},
{
"$unwind": "$possible_controls"
},
{
"$lookup": {
"from": "samples",
"localField": "possible_controls.uuid",
"foreignField": "experiment_id",
"as": "possible_controls.samples"
}
},
{
"$group": {
"_id": "$_id",
"possible_controls": {"$push": "$possible_controls"},
"samples": {"$push": "$samples"}
}
} }
] },
cursor = self.db.experiments.aggregate(pipeline) {
# We should have only 1 document "$lookup": {
document = cursor.next() "from": "samples",
control_inputs = [sample for control in document["possible_controls"] for sample in control["samples"] if ("file_type" in sample and sample["file_type"] == "fastq")] "localField": "uuid",
experiment_inputs = [sample for sample in document["samples"][0] if ("file_type" in sample and sample["file_type"] == "fastq")] "foreignField": "experiment_id",
if (len(control_inputs) > 0 and len(experiment_inputs) > 0): "as": "samples"
msg = "Succesfully retrieved input files for experiment with id '%s'.\n" % (experiment_id,)
data = {
"control": control_inputs,
"experiment": experiment_inputs
} }
else: },
valid = False {
msg = "Experiment with id '%s' has %s possible control inputs, and %s possible experiment inputs.\n" % (experiment_id, len(control_inputs), len(experiment_inputs)) "$unwind": "$possible_controls"
},
{
"$lookup": {
"from": "samples",
"localField": "possible_controls.uuid",
"foreignField": "experiment_id",
"as": "possible_controls.samples"
}
},
{
"$group": {
"_id": "$_id",
"possible_controls": {"$push": "$possible_controls"},
"samples": {"$push": "$samples"}
}
}
]
cursor = self.db.experiments.aggregate(pipeline)
# We should have only 1 document
document = cursor.next()
control_inputs = [sample for control in document["possible_controls"] for sample in control["samples"] if ("file_type" in sample and sample["file_type"] == "fastq")]
experiment_inputs = [sample for sample in document["samples"][0] if ("file_type" in sample and sample["file_type"] == "fastq")]
if (len(control_inputs) > 0 and len(experiment_inputs) > 0):
msg = "Succesfully retrieved input files for experiment with id '%s'.\n" % (experiment_id,)
data = {
"control": control_inputs,
"experiment": experiment_inputs
}
else: else:
valid = False valid = False
msg = "Experiment with id '%s' does not have possible_controls.\n" % (experiment_id,) msg = "Experiment with id '%s' has %s possible control inputs, and %s possible experiment inputs.\n" % (experiment_id, len(control_inputs), len(experiment_inputs))
else: else:
valid = False valid = False
msg = "Experiment with id '%s' does not have all required metadata (target exists and no revoked_files).\n" % (experiment_id,) msg = "Experiment with id '%s' does not have possible_controls.\n" % (experiment_id,)
else: else:
valid = False valid = False
msg = "Experiment with id '%s' does not exist.\n" % (experiment_id,) msg = "Experiment with id '%s' is not valid! It may not exist, or it may be missing required metadata.\n" % (experiment_id,)
return (valid, msg, data) return (valid, msg, data)
import chipathlon.db import chipathlon.db
import argparse import argparse
parser = argparse.ArgumentParser(description="Download target file.") parser = argparse.ArgumentParser(description="Insert a bed file into the database.")
parser.add_argument("-u", "--url", dest="url", required=True, help="Target url.") parser.add_argument("-p", "--password", dest="password", required=True, help="Database user password.")
parser.add_argument("-p", "--path", dest="path", required=True, help="Local path to file.") parser.add_argument("-u", "--username", dest="username", required=True, help="Database user.")
parser.add_argument("-t", "--url_type", dest="url_type", default="ftp://", help="Type of url to access.") parser.add_argument("-h", "--host", dest="host", required=True, help="Database host.")
parser.add_argument("-r", "--retries", dest="retries", default=3, type=int, help="Number of retries.") parser.add_argument("-f", "--file", dest="file", required=True, help="Path to bed file.")
parser.add_argument("-n", "--overwrite", dest="overwrite", default=True, action="store_false", help="Dont' overwrite local file if exists.") parser.add_argument("-c", "--controls", dest="control_ids", required=True, nargs="+", help="List of control ids.")
parser.add_argument("-m", "--md5", dest="md5", help="Check md5 value against passed value.") parser.add_argument("-e", "--experiments", dest="experiment_ids", required=True, nargs="+", help="List of experiment/signal ids.")
parser.add_argument("-a", "--additional", dest="additional")
args = parser.parse_args() args = parser.parse_args()
This diff is collapsed.
This diff is collapsed.
...@@ -4,6 +4,7 @@ import chipathlon.workflow_job ...@@ -4,6 +4,7 @@ import chipathlon.workflow_job
import chipathlon.db import chipathlon.db
import chipathlon.workflow import chipathlon.workflow
import argparse import argparse
import unittest
parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.") parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.")
parser.add_argument("--password", dest="password", help="Database user password.") parser.add_argument("--password", dest="password", help="Database user password.")
...@@ -13,118 +14,52 @@ args = parser.parse_args() ...@@ -13,118 +14,52 @@ args = parser.parse_args()
mdb = chipathlon.db.MongoDB(args.host, args.username, args.password) mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
class DatabaseTestCase(unittest.TestCase):
# Shamelessly stolen from: https://svn.blender.org/svnroot/bf-blender/trunk/blender/build_files/scons/tools/bcolors.py
class bcolors(object): def setUp(self):
HEADER = '\033[95m' self.mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
OKBLUE = '\033[94m' return
OKGREEN = '\033[92m'
WARNING = '\033[93m' def tearDown(self):
FAIL = '\033[91m' return
ENDC = '\033[0m'
BOLD = '\033[1m' class InsertResultsTests(DatabaseTestCase):
UNDERLINE = '\033[4m'
def setUp(self):
super(self.__class__, self).setUp()
def print_test(name, success): self.result_ids = []
if success: return
print bcolors.OKBLUE + name + ": Success" + bcolors.ENDC
else: def tearDown(self):
print bcolors.FAIL + name + ": Failure" + bcolors.ENDC for result_id in self.result_ids:
return self.mdb.delete_result(result_id)
super(self.__class__, self).tearDown()
return
# Test 1, valid inputs
def yaml_job_test_1(): def test_insert_bed(self):
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params1.yaml") valid, msg, result_id = self.mdb.save_bed(
arg_list = yj.create_arg_list(["bwa_index.fna", "bwa_1.fastq", "bwa_2.fastq"], ["lfn1", "lfn2", "lfn3"]) "test/insert_tests/sample.bed",
print_test("Yaml_Job_Test_1", arg_list == ['mem', '-M', '-t 1', 'lfn1', 'lfn2', 'lfn3']) [],
return ["ENCSR000BKT"],
{"align_tool": "bowtie2", "read_end": "single"}
)
# Test2, invalid arguments self.result_ids.append(result_id)
def yaml_job_test_2(): self.assertTrue(valid)
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params2.yaml") return
err = "Unchangeable argument '-M' specified in params file."
print_test("Yaml_Job_Test_2", yj.err != err) def test_insert_peak(self):
return valid, msg, result_id = self.mdb.save_peak(
"test/insert_tests/sample.narrowPeak",
["ENCSR000BLJ"],
# Test3, non-yaml input file ["ENCSR000BKT"],
def yaml_job_test_3(): {"align_tool": "bwa", "read_end": "single", "peak_tool": "macs2"}
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params3.yaml") )
print_test("Yaml_Job_Test_3", not yj.valid()) self.result_ids.append(result_id)
return self.assertTrue(valid)
return
# Test4, ill-defined input file # Load all insert tests, currently for bed, & narrowPeak files
def yaml_job_test_4(): insert_suite = unittest.TestLoader().loadTestsFromTestCase(InsertResultsTests)
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params4.yaml")
err = "Specified key 'batman' does not exist for job 'bwa_align_paired'." # Run all insertion tests
print_test("Yaml_Job_Test_4", yj.err != err) unittest.TextTestRunner(verbosity=2).run(insert_suite)
return
# Test5, valid exp, with 2 experiment and 2 control samples
def exp_files_test_1():
valid, msg, data = mdb.get_samples("ENCSR000BSE")
print_test("Exp_Files_Test_1", valid and msg == "Succesfully retrieved input files for experiment with id 'ENCSR000BSE'." and len(data["control"]) == 2 and len(data["experiment"]) == 2)
return
# Test6, invalid exp id
def exp_files_test_2():
valid, msg, data = mdb.get_samples("NOT_AN_ID")
print_test("Exp_Files_Test_2", not valid and msg == "Experiment with id 'NOT_AN_ID' does not exist.")
return
# Test7, invalid metadata
def exp_files_test_3():
valid, msg, data = mdb.get_samples("ENCSR329RIP")
print_test("Exp_Files_Test_3", not valid and msg == "Experiment with id 'ENCSR329RIP' does not have all required metadata (assembly, target, no revoked_files).")
return
# Test8, multiple control experiments
def exp_files_test_4():
valid, msg, data = mdb.get_samples("ENCSR000CWZ")
print_test("Exp_Files_Test_4", valid and msg == "Succesfully retrieved input files for experiment with id 'ENCSR000CWZ'." and len(data["control"]) == 4 and len(data["experiment"]) == 2)
return
# Test9, multiple control experiments, no possible control_inputs
def exp_files_test_5():
valid, msg, data = mdb.get_samples("ENCSR000DKB")
print_test("Exp_Files_Test_5", not valid and msg == "Experiment with id 'ENCSR000DKB' has '0' possible control inputs, and '3' possible experiment inputs.")
return
def workflow_test_1():
workflow = chipathlon.workflow.Workflow("test/run/", "test/run/run.yaml", "test/run/param.yaml", "test/run/config.yaml", args.host, args.username, args.password)
workflow.generate()
return
def valid_samples():
has_samples, total = mdb.check_valid_samples()
print_test("Verify_All_Samples", has_samples == total)
return
# tests = [
# yaml_job_test_1,
# yaml_job_test_2,
# yaml_job_test_3,
# yaml_job_test_4,
# exp_files_test_1,
# exp_files_test_2,
# exp_files_test_3,
# exp_files_test_4,
# exp_files_test_5,
# valid_samples
# ]
# for test in tests:
# test()
workflow_test_1()
...@@ -18,6 +18,7 @@ mdb = chipathlon.db.MongoDB(args.host, args.username, args.password) ...@@ -18,6 +18,7 @@ mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
write_data = [] write_data = []
counts = {} counts = {}
experiment_ids = [] experiment_ids = []
final_ids = []
for c in ["TF_4cells", "HM_4cells"]: for c in ["TF_4cells", "HM_4cells"]:
counts[c] = {} counts[c] = {}
...@@ -51,6 +52,8 @@ for c in ["TF_4cells", "HM_4cells"]: ...@@ -51,6 +52,8 @@ for c in ["TF_4cells", "HM_4cells"]:
for exp in experiment_ids: for exp in experiment_ids:
valid, msg, data = mdb.get_samples(exp) valid, msg, data = mdb.get_samples(exp)
if valid: if valid:
if exp not in final_ids:
final_ids.append(exp)
for x in ["control", "experiment"]: for x in ["control", "experiment"]:
for sample in data[x]: for sample in data[x]:
counts[c][x] += 1 counts[c][x] += 1
...@@ -63,4 +66,4 @@ with open("all_files.list", "w") as wh: ...@@ -63,4 +66,4 @@ with open("all_files.list", "w") as wh:
wh.write(row + "\n") wh.write(row + "\n")
print "File counts: %s." % (counts,) print "File counts: %s." % (counts,)
print "Final experiment_ids: %s." % (experiment_ids,) print "Final experiment_ids: %s." % (final_ids,)
#!python
import chipathlon.conf
import yaml
import argparse
parser = argparse.ArgumentParser(description="Write a sample run file from a list of experiments.")
parser.add_argument("-o", "--output", dest="output", required=True, help="Output file to write.")
parser.add_argument("-i", "--id", dest="ids", required=True, nargs="+", help="List of ids to use.")
args = parser.parse_args()
data = {
"runs": [],
"genomes": {
"bwa": {
"grch38p6": "/work/ladunga/SHARED/genomes/grch38p6/gca000001405_21_grch38p6_genomic.fna"
},
"bowtie2":
{
"grch38p6": "/work/ladunga/SHARED/genomes/grch38p6/GCA_000001405.21_GRCh38.p6_genomic.fna"
}
}
}
for exp_id in args.ids:
for align_tool in chipathlon.conf.align_tools:
for peak_tool in chipathlon.conf.peak_tools:
data["runs"].append({