db.py 20.8 KB
Newer Older
aknecht2's avatar
aknecht2 committed
1
from pymongo import MongoClient
2
import pymongo.errors
3
4
5
import gridfs
import sys
import traceback
6
import os
7
import itertools
8
import time
9
import collections
10
import chipathlon.conf
11
from pprint import pprint
12
import hashlib
13
from chipathlon.utils import progress
14
import bson
aknecht2's avatar
aknecht2 committed
15

16
17
18
19
20
def download_from_gridfs(host, gridfs_id, local_path, username=None, password=None, retries=3, overwrite=True, checkmd5=False):
    mdb = chipathlon.db.MongoDB(host, username, password)
    if not os.path.isfile(local_path) or overwrite:
        for i in range(0, retries):
            print "Attempt #%s, downloading file with ID '%s' to '%s'" % (i + 1, gridfs_id, local_path)
21
            if mdb.fetch_from_gridfs(bson.objectid.ObjectId(gridfs_id), local_path, checkmd5):
22
23
24
25
26
27
                return True
            else:
                print "Download attempt #%s from GridFS failed, retrying..." % (i + 1)
    else:
         print "File already exists, skipping download.\n"
    return False
28

29
class MongoDB(object):
aknecht2's avatar
aknecht2 committed
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    """
    :param host: The host address of the MongoDB database.
    :type host: str
    :param username: The username of the account for the MongoDB database.
    :type username: str
    :param password: The password for the user.
    :type password: str
    :param debug: A flag for printing additional messages.
    :type debug: bool

    This class is used to manage all interactions with the encode metadata.
    The metadata can be very unruly and difficult to deal with.  There
    are several helper functions within this class to make some database
    operations much easier.
    """
45

46
    def __init__(self, host="localhost", username=None, password=None, debug=False):
47
        self.debug = debug
48
49
50
        self.host = host
        self.username = username
        self.password = password
51
52
        self.client = MongoClient(host)
        self.db = self.client.chipseq
53
        self.cache = collections.defaultdict(dict)
54
55
56
57
58
59
60
        if username and password:
            try:
                self.db.authenticate(username, password, mechanism="SCRAM-SHA-1")
            except:
                print("Could not authenticate to db %s!" % (host,))
                print traceback.format_exc()
                sys.exit(1)
61
62
63
        self.gfs = gridfs.GridFS(self.db)
        return

64
    def add_cache(self, accession, file_type, data):
65
        """
66
67
68
69
        :param accession: The accession of the file to store.
        :type accession: str
        :param file_type: The type of file to store.
        :type file_type: str
70
71
        :param data: The data to add to the cache.
        :type data: Object
aknecht2's avatar
aknecht2 committed
72
73
74
75
76
77

        Adds a data result to the internal cache.  This is used to speed up
        requests that are identical.  We may have multiple runs that use
        identical control / signal files but change around the alignment or
        peak calling tools.  In these cases we don't want to request info
        from the database multiple times for the same data.
78
        """
79
        self.cache[accession][file_type] = data
80
81
        return

82
    def get_cache(self, accession, file_type):
83
        """
84
85
86
87
        :param accession: The accession of the file to retrieve.
        :type accession: str
        :param file_type: The type of file to retrieve.
        :type file_type: str
aknecht2's avatar
aknecht2 committed
88
89

        Gets a data item from the internal cache.
90
        """
91
92
        if accession in self.cache:
            return self.cache[accession].get(file_type)
93
94
        return None

95
    def delete_result(self, result, genome):
96
        """
97
        :param result: The result to delete
aknecht2's avatar
aknecht2 committed
98
        :type result: :py:class:`~chipathlon.result.Result`
99
        :param genome: The genome to find information from.
aknecht2's avatar
aknecht2 committed
100
        :type genome: :py:class:`~chipathlon.genome.Genome`
101
102
103

        Deletes a result and it's corresponding gridfs entry.
        """
104
        result_id = self.get_reuslt_id(result, genome)
105
106
107
108
109
110
111
112
113
        cursor = self.db.results.find({
            "_id": result_id
        })
        if cursor.count() == 1:
            result = cursor.next()
            self.gfs.delete(result["gridfs_id"])
            self.db.results.delete_one({"_id": result["_id"]})
        else:
            print "result_id %s doesn't exist." % (result_id,)
114
115
        return

116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
    def _get_result_query(self, result, genome):
        query = {
            "result_type": result.file_type,
            "assembly": genome.assembly,
            "timestamp": {"$exists": True},
            "file_name": result.full_name
        }
        # In the case that there are 0 samples we just want to check for existence.
        control_sample_accessions = result.get_accessions("control")
        signal_sample_accessions = result.get_accessions("signal")
        query["control_sample_accessions"] = {"$all": control_sample_accessions} if (len(control_sample_accessions) > 0) else {"$exists": True}
        query["signal_sample_accessions"] = {"$all": signal_sample_accessions} if (len(signal_sample_accessions) > 0) else {"$exists": True}
        for job in result.all_jobs:
            job_args = job.get_db_arguments()
            arg_keys = job_args.keys()
            if len(arg_keys) == 0:
                query[job.job_name] = {"$exists": True}
            else:
                for arg_name in arg_keys:
                    query[job.job_name + "." + arg_name] = job_args[arg_name]
136
137
        if self.debug:
            print "Result query: %s" % (query,)
138
139
140
        return query

    def result_exists(self, result, genome):
141
142
        """
        :param result: The result to check.
aknecht2's avatar
aknecht2 committed
143
        :type result: :py:meth:`~chipathlon.result.Result`
144
        :param genome: The genome to find information from.
aknecht2's avatar
aknecht2 committed
145
        :type genome: :py:meth:`~chipathlon.genome.Genome`
146

aknecht2's avatar
aknecht2 committed
147
148
149
        Check if a result exists in the database.  The genome parameter
        is required since some files have been aligned or use individual
        chromsome fasta or size files for peak calling.
150
        """
151
152
153
154
155
156
157
        try:
            cursor = self.db.results.find(self._get_result_query(result, genome))
            return cursor.count() > 0
        except pymongo.errors.OperationFailure as e:
            print "Error checking result [%s]: %s" % (file_name, e)
        return False

158
159
160
    def get_result_id(self, result, genome):
        """
        :param result: The result to check.
aknecht2's avatar
aknecht2 committed
161
        :type result: :py:meth:`~chipathlon.result.Result`
162
        :param genome: The genome to find information from.
aknecht2's avatar
aknecht2 committed
163
164
        :type genome: :py:meth:`~chipathlon.genome.Genome`
        :returns: The id found or None
165

aknecht2's avatar
aknecht2 committed
166
        Get the id of a result in the database.
167
168
169
170
171
172
173
174
        """
        try:
            cursor = self.db.results.find(self._get_result_query(result, genome))
            if cursor.count() == 1:
                return cursor._id
        except pymongo.errors.OperationFailure as e:
            print "Error getting result id [%s]: %s" % (file_name, e)
        return None
175
176

    def get_result(self, result, genome):
177
178
179
180
181
182
183
184
185
        """
        :param result: The result to check.
        :type result: :py:meth:~chipathlon.result.Result
        :param genome: The genome to find information from.
        :type genome: :py:meth:~chipathlon.genome.Genome

        Get the metadata for the result from the database.  If multiple results
        exist, the most recently saved result is returned.
        """
186
        try:
187
            cursor = self.db.results.find(self._get_result_query(result, genome))
188
189
            if cursor.count() > 0:
                return cursor.sort("timestamp", pymongo.DESCENDING).next()
190
        except pymongo.errors.OperationFailure as e:
191
            print "Error checking result [%s]: %s" % (file_name, e)
192
        return None
193

194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
    def save_result(self, output_file, control_sample_accessions, signal_sample_accessions, result_type, additional_data = {}, gfs_attributes = {}):
        """
        :param output_file: The path to the result to save.
        :type output_file: str
        :param control_sample_accessions: A list of control accessions.
        :type control_sample_accessions: list
        :param signal_sample_accessions: A list of signal accessions.
        :type signal_sample_accessions: list
        :param result_type: Useful file type info
        :type result_type: str
        :param additional_data: Additional metadata to store in mongo.
        :type additional_data: dict
        :param gfs_attributes: Additional metadata to store in gridfs.
        :type gfs_attributes: dict

aknecht2's avatar
aknecht2 committed
209
210
211
212
213
        Saves a result entry into MongodDB and uploads the file into gridfs.
        The only difference between additional_data and gfs_attributes is the
        location the metadata is stored.  Both just store key value pairs of
        information, the additional_data information is stored in the result
        entry, the gfs_attributes information is stored in gridfs.
214
        """
215
216
        # Make sure output_file exists
        if os.path.isfile(output_file):
217
            # Make sure that all control_sample_accessions & signal_sample_accessions are valid
218
            # REMEMBER, these are ids for control & experiment SAMPLES
219
220
            valid_controls = [self.is_valid_sample(cid) for cid in control_sample_accessions]
            valid_experiments = [self.is_valid_sample(eid) for eid in signal_sample_accessions]
221
            if all(valid_controls) and all(valid_experiments):
222
                gfs_attributes["file_type"] = result_type
223
224
225
226
227
228
229
                # First, we load the output file into gfs
                with open(output_file, "r") as rh:
                    # Calling put returns the gfs id
                    gridfs_id = self.gfs.put(rh, filename=os.path.basename(output_file), **gfs_attributes)
                # Now, we create the actual result entry by combining all necessary info
                result_entry = {
                    "gridfs_id": gridfs_id,
230
231
                    "control_sample_accessions": control_sample_accessions,
                    "signal_sample_accessions": signal_sample_accessions,
232
233
234
                    "result_type": result_type,
                    "file_name": output_file,
                    "timestamp": time.time()
235
236
237
238
239
240
241
                }
                # Add additional attributes into the result_entry
                result_entry.update(additional_data)
                # Insert the entry into the database, and return the id
                result = self.db.results.insert_one(result_entry)
                return (True, "Result created successfully.", result.inserted_id)
            else:
242
                msg = "Not all input ids are valid.  The following are invalid: "
243
                for id_list, valid_list in zip([control_sample_accessions, signal_sample_accessions], [valid_controls, valid_experiments]):
244
                    msg += ", ".join([id_list[i] for i, valid in enumerate(valid_list) if not valid])
245
246
247
248
        else:
            msg = "Specified output_file %s does not exist." % (output_file,)
        return (False, msg, None)

249
    def is_valid_sample(self, sample_accession):
250
251
252
        """
        :param sample_accession: The accession number to check.
        :type sample_accession: str
aknecht2's avatar
aknecht2 committed
253
        :returns: Whether or not the sample is valid.
254
255
256

        Ensures that a sample with the accession specified actually exists.
        """
257
258
259
260
261
262
263
264
265
266
        try:
            cursor = self.db.samples.find({
                "accession": sample_accession
            })
            if cursor.count() == 1:
                return True
        except pymongo.errors.OperationFailure as e:
            print "Error with sample_accession %s: %s" % (sample_accession, e)
        return False

267
268
269
270
    def is_valid_experiment(self, experiment_accession):
        """
        :param experiment_accession: The accession number to check.
        :type experiment_accession: str
aknecht2's avatar
aknecht2 committed
271
        :returns: Whether or not the experiment is valid
272
273
274

        Ensures that an experiment with the accession specified actually exists.
        """
275
276
277
        try:
            cursor = self.db.experiments.find({
                "target": {"$exists": True},
278
                "@id": "/experiments/%s/" % (experiment_accession,)
279
280
281
282
            })
            if cursor.count() == 1:
                return True
        except pymongo.errors.OperationFailure as e:
283
            print "Error with experiment_accession %s: %s" % (experiment_accession, e)
284
285
        return False

Adam Caprez's avatar
Adam Caprez committed
286
    def fetch_from_gridfs(self, gridfs_id, filename, checkmd5=True):
287
288
        """
        :param gridfs_id: GridFS _id of file to get.
aknecht2's avatar
aknecht2 committed
289
        :type gridfs_id: :py:class:`bson.objectid.ObjectId`
290
291
        :param filename: Filename to save file to.
        :type filename: str
292
293
        :param checkmd5: Whether or not to validate the md5 of the result
        :type checkmd5: bool
294

295
        Fetch the file with the corresponding id and save it under the
aknecht2's avatar
aknecht2 committed
296
297
        specified 'filename'.  If checkmd5 is specified, validate that the
        saved file has a correct md5 value.
298
        """
299
300
301
302
303
304
305
        try:
            gridfs_file = self.gfs.get(gridfs_id)
            gridfs_md5 = gridfs_file.md5
        except gridfs.errors.NoFile as e:
            print "Error fetching file from GridFS!\nNo file with ID '%s'" % (gridfs_id)
            print e
            sys.exit(1)
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321

        try:
            output_fh = open(filename,'wb')
        except IOError as e:
            print "Error creating GridFS output file '%s':" % (filename)
            print (e.errno,e.strerror)
            sys.exit(1)

        hash_md5 = hashlib.md5()
        for chunk in gridfs_file:
            output_fh.write(chunk)
            hash_md5.update(chunk)

        output_fh.close()
        gridfs_file.close()

Adam Caprez's avatar
Adam Caprez committed
322
323
324
325
326
327
        if checkmd5:
            if gridfs_md5 == hash_md5.hexdigest():
                return True
            else:
                print "MD5 mismatch saving file from GridFS to '%s'" % (filename)
                return False
328
        else:
Adam Caprez's avatar
Adam Caprez committed
329
            return True
330

331
332
333
334
    def get_sample(self, accession, file_type):
        """
        :param accession: The accession number of the target sample
        :type accession: string
aknecht2's avatar
aknecht2 committed
335
        :param file_type: The file type of the target sample.
336
        :type file_type: string
337
        :returns: A tuple (valid, msg, data)
338

aknecht2's avatar
aknecht2 committed
339
340
341
342
        Gets the associated sample based on accession number and file_type.
        For loading input files for workflows the file_type should be fastq
        or bam.  Other file types can be specified for loading additional files
        saved in the experiment metadata.
343
344
345
346
        """
        valid = True
        msg = ""
        data = {}
347
        check_cache = self.get_cache(accession, file_type)
348
349
350
        if check_cache is not None:
            msg = "Retrieved data from cache."
            data = check_cache
351
        else:
352
353
354
355
356
357
            cursor = self.db.samples.find({
                "accession": accession,
                "file_type": file_type
            })
            if cursor.count() == 1:
                data = cursor.next()
358
                self.add_cache(accession, file_type, data)
359
360
361
362
363
364
365
            else:
                valid = False
                msg = "Found %s files with accession: %s, file_type: %s. Should only be 1." % (
                    cursor.count(),
                    accession,
                    file_type
                )
366
367
        return (valid, msg, data)

368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
    def clean_gfs(self):
        """
        This function finds all files stored in gridfs that are not currently
        referenced by any result file and removes them.
        A clean database is a happy database.
        """
        cursor = self.db.results.aggregate([
            {
                "$group": {
                    "_id": 1,
                    "valid_ids": {"$push": "$gridfs_id"}
                }
            }
        ])
        # Doc contains all our valid ids
        id_doc = cursor.next()
        # Find all fs.files documents
        gfs_cursor = self.db.fs.files.find({
            "_id": {
                "$nin": id_doc["valid_ids"]
            }
        })
        # Iterate through file, delete fs.chunks then fs.files
        total_files = gfs_cursor.count()
        print "Found %s unused gridfs files.  Preparing to delete...." % (total_files,)
        for i, fs_file in enumerate(gfs_cursor):
            progress(i, total_files)
            self.db.fs.chunks.remove({
                "files_id": fs_file["_id"]
            })
            self.db.fs.files.remove({
                "_id": fs_file["_id"]
            })
        progress(total_files, total_files)
        return

404
405
406
407
408
409
    def get_samples(self, experiment_accession, file_type):
        """
        :param experiment_accession: Accession number of the experiment to grab samples from.
        :type experiment_accession: str
        :param file_type: File type of samples to grab usually fastq or bam
        :type file_type: str
410
        :returns: A tuple (valid, msg, data)
411
412
413

        Validates and gets samples for the given experiment.  Experiments must
        have control and signal samples of the provided file_type to be
aknecht2's avatar
aknecht2 committed
414
415
416
417
        considered valid.  Returns a tuple with three values (valid, msg, data)
        valid -- Whether or not the accession / file_type combo is a valid exp
        msg -- Why it is or is not valid
        data -- A dictionary containing a list of all control / sample documents.
418

aknecht2's avatar
aknecht2 committed
419
420
421
        The data dictionary has two keys, "control" and "signal", each one containing
        a list of all metadata related to the experiment samples.  The sample metadata
        is taken directly from Mongo.
422
        """
423
424
425
        valid = True
        msg = ""
        data = {}
426
        # First, check to make sure the target experiment is valid
427
        if self.is_valid_experiment(experiment_accession):
428
429
            # Next, we check that there is a least 1 possible control
            check3 = self.db.experiments.find({
430
                "target": {"$exists": True},
431
                "possible_controls.0": {"$exists": True},
432
                "@id": "/experiments/%s/" % (experiment_accession,)
433
            })
434
435
436
437
438
439
440
441
442
443
444
445
446
447
            if check3.count() == 1:
                # Complicated aggregtaion pipeline does the following steps:
                # 1. Find the experiment that matches the given id
                # 2. Join samples into the collection by exp_id
                # 3. Iterate through possible_controls
                # 4. Join possible_control data into control_exps
                # 5. Iterate through control_exps
                # 6. Join samples into the control_exps by exp_id
                # 7. Re-aggregate all data into arrays
                pipeline = [
                    {
                        "$match": {
                            "target": {"$exists": True},
                            "possible_controls.0": {"$exists": True},
448
                            "@id": "/experiments/%s/" % (experiment_accession,)
449
                        }
450
451
452
453
454
                    },
                    {
                        "$lookup": {
                            "from": "samples",
                            "localField": "uuid",
455
                            "foreignField": "experiment_id",
456
                            "as": "samples"
457
                        }
458
459
460
461
462
463
464
465
                    },
                    {
                        "$unwind": "$possible_controls"
                    },
                    {
                        "$lookup": {
                            "from": "samples",
                            "localField": "possible_controls.uuid",
466
                            "foreignField": "experiment_id",
467
468
469
470
471
472
473
474
475
476
477
478
479
480
                            "as": "possible_controls.samples"
                        }
                    },
                    {
                        "$group": {
                            "_id": "$_id",
                            "possible_controls": {"$push": "$possible_controls"},
                            "samples": {"$push": "$samples"}
                        }
                    }
                ]
                cursor = self.db.experiments.aggregate(pipeline)
                # We should have only 1 document
                document = cursor.next()
481
                control_inputs = [sample for control in document["possible_controls"] for sample in control["samples"] if ("file_type" in sample and sample["file_type"] == file_type)]
482
483
                signal_inputs = [sample for sample in document["samples"][0] if ("file_type" in sample and sample["file_type"] == file_type)]
                if (len(control_inputs) > 0 and len(signal_inputs) > 0):
484
                    msg = "Succesfully retrieved input files for experiment with id '%s'.\n" % (experiment_accession,)
485
486
                    data = {
                        "control": control_inputs,
487
                        "signal": signal_inputs
488
                    }
489
490
                else:
                    valid = False
491
                    msg = "Experiment with id '%s' has %s possible control inputs, and %s possible signal inputs.\n" % (experiment_accession, len(control_inputs), len(signal_inputs))
492
493
            else:
                valid = False
494
                msg = "Experiment with id '%s' does not have possible_controls.\n" % (experiment_accession,)
495
496
        else:
            valid = False
497
            msg = "Experiment with id '%s' is not valid!  It may not exist, or it may be missing required metadata.\n" % (experiment_accession,)
498
        return (valid, msg, data)