Commit d0f38192 authored by aknecht2's avatar aknecht2
Browse files

Finished resolving merge conflicts. There were a lot.

parents 3d21b860 ff84032d
......@@ -9,5 +9,3 @@ examples/all_files.list
chipathlon/test/auth.yaml
meta/*.json
meta/data/*.json
doc/source/_static
doc/source/_templates
......@@ -37,11 +37,47 @@ peak_tools = [
"gem",
"peakranger",
"ccat",
"music",
"zerone",
"hiddendomains",
"pepr"
]
executables = [
"bedtools",
"samtools",
"idr",
"picard",
"bwa",
"bowtie2",
"macs2",
"gem",
"peakranger",
"MUSIC",
"CCAT",
"PePr",
"hiddenDomains",
"zerone",
"run_spp_nodups",
"chip-job-cat-peak",
"chip-job-ccat-format-bed",
"chip-job-chr-convert",
"chip-job-save-result",
"chip-job-download-encode",
"chip-job-download-gridfs",
"chip-job-music",
"chip-job-peakranger-format",
"chip-job-sort-peak",
"chip-job-zcat-peak",
"chip-job-zerone-add-cols",
"chip-job-hd-add-cols"
]
# Java needs to have -Xmx specified...
java_tools = [
"gem"
]
# Peak_type validation
peak_types = {
"spp": ["narrow", "broad"],
......@@ -49,9 +85,10 @@ peak_types = {
"gem": ["narrow"],
"peakranger": ["narrow"],
"ccat": ["broad"],
"music": ["narrow", "punctate", "broad"],
"zerone": ["broad"],
"hiddendomains": ["broad"],
"pepr": ["narrow", "broad"]
"pepr": ["narrow", "broad"],
}
# File extensions
......@@ -62,6 +99,7 @@ file_extensions = {
"sam": ["sam"],
"bam": ["bam"],
"bed": ["bed", "peak", "region", "narrowPeak", "broadPeak", "tagAlign", "narrowPeak.gz", "broadPeak.gz"],
"bin": ["bin"],
"bwa_genome": ["amb", "ann", "bwt", "pac", "sa"],
"bowtie2_genome": ["1.bt2", "2.bt2", "3.bt2", "4.bt2", "rev.1.bt2", "rev.2.bt2"],
"quality": ["quality"],
......@@ -114,7 +152,8 @@ resources = {
argument_types = {
"argument": ["string", "numeric"],
"file": ["file", "rawfile", "stdout", "stderr"],
"list": ["list"]
"list": ["file_list", "argument_list"],
"folder": ["folder", "rawfolder"]
}
# Defines information about arguments
......@@ -125,21 +164,21 @@ argument_keys = {
# workflow_job keys
job_keys = {
"required": ["inputs", "additional_inputs", "outputs", "command", "arguments"] + resources.keys(),
"required": ["inputs", "outputs", "command", "arguments"] + resources.keys(),
"optional": []
}
job_inout_keys = {
"required": ["type"],
"optional": ["file_type"]
}
# param keys
param_keys = {
"required": [],
"optional": ["arguments"] + resources.keys()
}
file_list_keys = {
"required": ["name", "type"],
"optional": ["file_type"]
}
# workflow order
workflow = ["align", "remove_duplicates", "peak_calling"]
......@@ -154,3 +193,12 @@ genomes = {
"additional_files": file_extensions["bowtie2_genome"]
}
}
config_file = {
"required_keys": [
"chipathlon_bin", "idr_bin", "pegasus_home", "email"
],
"optional_keys": [
"arch", "os"
]
}
......@@ -6,10 +6,25 @@ import traceback
import os
import itertools
import time
import collections
import chipathlon.conf
from pprint import pprint
import hashlib
from chipathlon.utils import progress
import bson
def download_from_gridfs(host, gridfs_id, local_path, username=None, password=None, retries=3, overwrite=True, checkmd5=False):
mdb = chipathlon.db.MongoDB(host, username, password)
if not os.path.isfile(local_path) or overwrite:
for i in range(0, retries):
print "Attempt #%s, downloading file with ID '%s' to '%s'" % (i + 1, gridfs_id, local_path)
if mdb.fetch_from_gridfs(bson.objectid.ObjectId(gridfs_id), localpath, checkmd5):
return True
else:
print "Download attempt #%s from GridFS failed, retrying..." % (i + 1)
else:
print "File already exists, skipping download.\n"
return False
class MongoDB(object):
"""
......@@ -28,29 +43,30 @@ class MongoDB(object):
operations much easier.
"""
def __init__(self, host, username, password, debug=False):
def __init__(self, host="localhost", username=None, password=None, debug=False):
self.debug = debug
self.host = host
self.username = username
self.password = password
self.client = MongoClient(host)
self.db = self.client.chipseq
self.cache = {}
try:
self.db.authenticate(username, password, mechanism="SCRAM-SHA-1")
except:
print("Could not authenticate to db %s!" % (host,))
print traceback.format_exc()
sys.exit(1)
self.cache = collections.defaultdict(dict)
if username and password:
try:
self.db.authenticate(username, password, mechanism="SCRAM-SHA-1")
except:
print("Could not authenticate to db %s!" % (host,))
print traceback.format_exc()
sys.exit(1)
self.gfs = gridfs.GridFS(self.db)
return
def add_cache(self, function, key, data):
def add_cache(self, accession, file_type, data):
"""
:param function: The function name
:type function: str
:param key: The key to add the cache entry under.
:type key: Any hashable
:param accession: The accession of the file to store.
:type accession: str
:param file_type: The type of file to store.
:type file_type: str
:param data: The data to add to the cache.
:type data: Object
......@@ -60,23 +76,20 @@ class MongoDB(object):
peak calling tools. In these cases we don't want to request info
from the database multiple times for the same data.
"""
if function not in self.cache:
self.cache[function] = {}
self.cache[function][key] = data
self.cache[accession][file_type] = data
return
def get_cache(self, function, key):
def get_cache(self, accession, file_type):
"""
:param function: The function name
:type function: str
:param key: The key to get from the cache.
:type key: Any hashable
:param accession: The accession of the file to retrieve.
:type accession: str
:param file_type: The type of file to retrieve.
:type file_type: str
Gets a data item from the internal cache.
"""
if function in self.cache:
if key in self.cache[function]:
return self.cache[function][key]
if accession in self.cache:
return self.cache[accession].get(file_type)
return None
def delete_result(self, result, genome):
......@@ -321,6 +334,7 @@ class MongoDB(object):
:type accession: string
:param file_type: The file type of the target sample.
:type file_type: string
:returns: A tuple (valid, msg, data)
Gets the associated sample based on accession number and file_type.
For loading input files for workflows the file_type should be fastq
......@@ -330,7 +344,7 @@ class MongoDB(object):
valid = True
msg = ""
data = {}
check_cache = self.get_cache("get_sample", accession)
check_cache = self.get_cache(accession, file_type)
if check_cache is not None:
msg = "Retrieved data from cache."
data = check_cache
......@@ -341,7 +355,7 @@ class MongoDB(object):
})
if cursor.count() == 1:
data = cursor.next()
self.add_cache("get_sample", accession, data)
self.add_cache(accession, file_type, data)
else:
valid = False
msg = "Found %s files with accession: %s, file_type: %s. Should only be 1." % (
......@@ -351,12 +365,49 @@ class MongoDB(object):
)
return (valid, msg, data)
def clean_gfs(self):
"""
This function finds all files stored in gridfs that are not currently
referenced by any result file and removes them.
A clean database is a happy database.
"""
cursor = self.db.results.aggregate([
{
"$group": {
"_id": 1,
"valid_ids": {"$push": "$gridfs_id"}
}
}
])
# Doc contains all our valid ids
id_doc = cursor.next()
# Find all fs.files documents
gfs_cursor = self.db.fs.files.find({
"_id": {
"$nin": id_doc["valid_ids"]
}
})
# Iterate through file, delete fs.chunks then fs.files
total_files = gfs_cursor.count()
print "Found %s unused gridfs files. Preparing to delete...." % (total_files,)
for i, fs_file in enumerate(gfs_cursor):
progress(i, total_files)
self.db.fs.chunks.remove({
"files_id": fs_file["_id"]
})
self.db.fs.files.remove({
"_id": fs_file["_id"]
})
progress(total_files, total_files)
return
def get_samples(self, experiment_accession, file_type):
"""
:param experiment_accession: Accession number of the experiment to grab samples from.
:type experiment_accession: str
:param file_type: File type of samples to grab usually fastq or bam
:type file_type: str
:returns: A tuple (valid, msg, data)
Validates and gets samples for the given experiment. Experiments must
have control and signal samples of the provided file_type to be
......
......@@ -2,27 +2,38 @@ from module_generator import ModuleGenerator
from chipathlon.result import Result
class AlignGenerator(ModuleGenerator):
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
"""
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:chipathlon.db.MongoDB
:param workflow_module: The actual module being used.
:type workflow_module: chipathlon.workflow_module.WorkflowModule
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
super(AlignGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
"""
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(AlignGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.generate_calls = {
"bwa": {
1: self._bwa_single,
......@@ -40,7 +51,7 @@ class AlignGenerator(ModuleGenerator):
def _bwa_init(self, run):
"""
:param run: The run to load genome files for
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
Loads required genome files for bwa in to the dictionary structure
expected by the actual workflow module. Specifically, this maps:
......@@ -51,16 +62,15 @@ class AlignGenerator(ModuleGenerator):
"""
base_file = run.genome.get_base_file()
inputs = {"ref_genome": base_file["name"]}
additional_inputs = {}
for f in run.genome.get_additional_files():
ext = f["name"].split(".")[-1]
additional_inputs["ref_genome.%s" % (ext,)] = f["name"]
return (inputs, additional_inputs)
inputs["ref_genome.%s" % (ext,)] = f["name"]
return inputs
def _bowtie2_init(self, run):
"""
:param run: The run to load genome files for
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
Loads required genome files for bowtie2 in to the dictionary structure
expected by the actual workflow module. Specifically, this maps:
......@@ -72,81 +82,83 @@ class AlignGenerator(ModuleGenerator):
"""
base_file = run.genome.get_base_file()
additional_files = run.genome.get_additional_files()
inputs = {"ref_genome_prefix": run.genome.file_prefix}
additional_inputs = {"ref_genome": base_file["name"]}
inputs = {
"ref_genome_prefix": run.genome.file_prefix,
"ref_genome": base_file["name"]
}
for f in run.genome.get_additional_files():
ext = ".".join(f["name"].split(".")[-3 if "rev" in f["name"] else -2:])
additional_inputs["ref_genome.%s" % (ext,)] = f["name"]
return (inputs, additional_inputs)
inputs["ref_genome.%s" % (ext,)] = f["name"]
return inputs
def _bwa_single(self, run, result):
"""
:param run: The run to generate jobs for
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
Creates inputs, additional_inputs, and outputs for bwa alignment
jobs with single end reads. Only the first file in prev_results will
be processed
"""
markers = {"tool": "bwa", "read_end": "single"}
inputs, additional_inputs = self._bwa_init(run)
inputs = self._bwa_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _bwa_paired(self, run, result):
"""
:param run: The run to generate jobs for
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
Creates inputs, additional_inputs, and outputs for bwa alignment
jobs with paired end reads.
"""
markers = {"tool": "bwa", "read_end": "paired"}
inputs, additional_inputs = self._bwa_init(run)
inputs = self._bwa_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
inputs["download_2.fastq"] = prev_results[1].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _bowtie2_single(self, run, result):
"""
:param run: The run to generate jobs for
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
Creates inputs, additional_inputs, and outputs for bowtie2 alignment
jobs with single end reads. Only the first file in prev_results will
be processed.
"""
markers = {"tool": "bowtie2", "read_end": "single"}
inputs, additional_inputs = self._bowtie2_init(run)
inputs = self._bowtie2_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _bowtie2_paired(self, run, result):
"""
:param run: The run to generate jobs for
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
:param result: The result to generate jobs for.
:type result: :py:class:chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
Creates inputs, additional_inputs, and outputs for bowtie2 alignment
jobs with paired end reads.
"""
markers = {"tool": "bowtie2", "read_end": "paired"}
inputs, additional_inputs = self._bowtie2_init(run)
inputs = self._bowtie2_init(run)
prev_results = self.get_prev_results(run, result)
inputs["download_1.fastq"] = prev_results[0].full_name
inputs["download_2.fastq"] = prev_results[1].full_name
return (markers, inputs, additional_inputs)
return (markers, inputs)
def _find_pairs(self, sample_list):
"""
......@@ -178,7 +190,7 @@ class AlignGenerator(ModuleGenerator):
def create_final_results(self, run):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
"""
control_samples = run.get_samples("control")
signal_samples = run.get_samples("signal")
......@@ -214,9 +226,9 @@ class AlignGenerator(ModuleGenerator):
def find_prev_results(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
"""
download_samples = run.get_results("download", "encode.fastq.gz")
prev_results = []
......@@ -231,11 +243,11 @@ class AlignGenerator(ModuleGenerator):
def parse_result(self, run, result):
"""
:param run: The target run to generate jobs for.
:type run: :py:class:chipathlon.run.Run
:type run: :py:class:`~chipathlon.run.Run`
:param result: The target result to create jobs for.
:type result: :py:class:chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
"""
prev_results = self.get_prev_results(run, result)
markers, inputs, additional_inputs = self.generate_calls[run.genome.tool][len(prev_results)](run, result)
markers, inputs = self.generate_calls[run.genome.tool][len(prev_results)](run, result)
results = self.create_results(run, result)
return markers, inputs, additional_inputs, self.get_outputs(results)
return markers, inputs, self.get_outputs(results)
from module_generator import ModuleGenerator
from chipathlon.result import Result
import collections
class IdrGenerator(ModuleGenerator):
"""
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param workflow_module: The actual module being used.
:type workflow_module: `~chipathlon.workflow_module.WorkflowModule`
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param save_db: Whether or not we want to save results to the database.
True by default.
:type save_db: bool
:param rewrite: Whether or not to rewrite existing files. If true, it will
ignore files in Mongo and recreate them. If false, it will download
files based on the latest available completed job.
:type rewrite: bool
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug=False):
"""
:param dax: The workflow graph object
:type dax: Peagasus.DAX3.ADAG
:param master_jobs: The dictionary mapping job name -> pegasus job object.
:type master_jobs: dict
:param master_files: The dictionary mapping file name -> pegasus file object.
:type master_files: dict
:param mdb: A MongoDB database class for fetching sample meta data.
:type mdb: :py:class:chipathlon.db.MongoDB
:param workflow_module: The actual module being used.
:type workflow_module: chipathlon.workflow_module.WorkflowModule
:param workflow_jobs: Dictionary mapping workflow_job name -> workflow_job instance
:type workflow_jobs: dict
:param base_path: Base location of the workflow, used to save metadata files.
:type base_path: str
:param debug: If true, prints out params for each job & module.
:type debug: bool
"""
super(IdrGenerator, self).__init__(dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs, base_path, debug)
def __init__(self, dax, master_jobs, master_files, mdb, workflow_module,
workflow_jobs, base_path, save_db=True, rewrite=False, debug=False):
super(IdrGenerator, self).__init__(
dax, master_jobs, master_files, mdb, workflow_module, workflow_jobs,
base_path, save_db=save_db, rewrite=rewrite, debug=debug
)
self.module_name = "idr"
self.result_dict = {}
self.output_files = {
"peakranger": ["region_sorted.bed", "summit_sorted.bed"],
"ccat": ["region_sorted.bed", "peak_sorted.bed"],
"gem": ["results_GEM_sorted.bed", "results_GPS_sorted.bed"],
"spp": ["results_sorted.bed"],
"zerone": ["results_sorted_final.bed"],
"hiddendomains": ["results_final.bed"],
"macs2": ["results_sorted.bed"],
"pepr": ["pepr_result.bed"]
}
self.output_files = collections.defaultdict(dict)
self.output_files["peakranger"]["narrow"] = ["region_sorted.bed", "summit_sorted.bed"]
self.output_files["ccat"]["broad"] = ["region_sorted.bed", "peak_sorted.bed"]
self.output_files["gem"]["narrow"] = ["results_GEM_sorted.bed", "results_GPS_sorted.bed"]
self.output_files["spp"]["narrow"] = self.output_files["spp"]["broad"] = ["results_sorted.bed"]
self.output_files["macs2"]["narrow"] = self.output_files["macs2"]["broad"] = ["results_sorted.bed"]
self.output_files["music"]["narrow"] = ["sorted_scale_%s_all.bed" % (i,) for i in [129, 194, 291]]
self.output_files["music"]["punctate"] = ["sorted_scale_%s_all.bed" % (i,) for i in [129, 194, 291, 437, 656, 985, 1477, 2216]]
self.output_files["music"]["broad"] = ["sorted_scale_%s_all.bed" % (i,) for i in [1459, 2189, 3284, 4926, 7389, 11084, 16626]]
self.output_files["zerone"]["broad"] = ["results_sorted.bed"]
self.output_files["hiddendomains"]["broad"] = ["results_sorted.bed"]
self.output_files["pepr"]["broad"] = ["results_sorted.bed"]
self.output_files["pepr"]["narrow"] = ["results_sorted.bed"]
if debug:
print "[LOADING GENERATOR] IdrGenerator"