Commit 12d2ea5a authored by aknecht2's avatar aknecht2
Browse files

Merge branch '3-method-auto-doc' into 'master'

Resolve "Method Auto Doc"

Closes #3

See merge request !25
parents 1d2ebea3 da85b698
......@@ -12,8 +12,6 @@ import hashlib
class MongoDB(object):
def __init__(self, host, username, password, debug=False):
"""
:param host: The host address of the MongoDB database.
:type host: str
......@@ -21,9 +19,16 @@ class MongoDB(object):
:type username: str
:param password: The password for the user.
:type password: str
:param debug: If true print out debug messages
:param debug: A flag for printing additional messages.
:type debug: bool
This class is used to manage all interactions with the encode metadata.
The metadata can be very unruly and difficult to deal with. There
are several helper functions within this class to make some database
operations much easier.
"""
def __init__(self, host, username, password, debug=False):
self.debug = debug
self.host = host
self.username = username
......@@ -48,6 +53,12 @@ class MongoDB(object):
:type key: Any hashable
:param data: The data to add to the cache.
:type data: Object
Adds a data result to the internal cache. This is used to speed up
requests that are identical. We may have multiple runs that use
identical control / signal files but change around the alignment or
peak calling tools. In these cases we don't want to request info
from the database multiple times for the same data.
"""
if function not in self.cache:
self.cache[function] = {}
......@@ -60,6 +71,8 @@ class MongoDB(object):
:type function: str
:param key: The key to get from the cache.
:type key: Any hashable
Gets a data item from the internal cache.
"""
if function in self.cache:
if key in self.cache[function]:
......@@ -69,9 +82,9 @@ class MongoDB(object):
def delete_result(self, result, genome):
"""
:param result: The result to delete
:type result: :py:class:~chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
:param genome: The genome to find information from.
:type genome: :py:meth:~chipathlon.genome.Genome
:type genome: :py:class:`~chipathlon.genome.Genome`
Deletes a result and it's corresponding gridfs entry.
"""
......@@ -114,11 +127,13 @@ class MongoDB(object):
def result_exists(self, result, genome):
"""
:param result: The result to check.
:type result: :py:meth:~chipathlon.result.Result
:type result: :py:meth:`~chipathlon.result.Result`
:param genome: The genome to find information from.
:type genome: :py:meth:~chipathlon.genome.Genome
:type genome: :py:meth:`~chipathlon.genome.Genome`
Check if a result exists.
Check if a result exists in the database. The genome parameter
is required since some files have been aligned or use individual
chromsome fasta or size files for peak calling.
"""
try:
cursor = self.db.results.find(self._get_result_query(result, genome))
......@@ -130,11 +145,12 @@ class MongoDB(object):
def get_result_id(self, result, genome):
"""
:param result: The result to check.
:type result: :py:meth:~chipathlon.result.Result
:type result: :py:meth:`~chipathlon.result.Result`
:param genome: The genome to find information from.
:type genome: :py:meth:~chipathlon.genome.Genome
:type genome: :py:meth:`~chipathlon.genome.Genome`
:returns: The id found or None
Get the id of a result.
Get the id of a result in the database.
"""
try:
cursor = self.db.results.find(self._get_result_query(result, genome))
......@@ -177,8 +193,11 @@ class MongoDB(object):
:param gfs_attributes: Additional metadata to store in gridfs.
:type gfs_attributes: dict
Saves a result file into mongodb and also creates the corresponding
gridfs file.
Saves a result entry into MongodDB and uploads the file into gridfs.
The only difference between additional_data and gfs_attributes is the
location the metadata is stored. Both just store key value pairs of
information, the additional_data information is stored in the result
entry, the gfs_attributes information is stored in gridfs.
"""
# Make sure output_file exists
if os.path.isfile(output_file):
......@@ -218,6 +237,7 @@ class MongoDB(object):
"""
:param sample_accession: The accession number to check.
:type sample_accession: str
:returns: Whether or not the sample is valid.
Ensures that a sample with the accession specified actually exists.
"""
......@@ -235,6 +255,7 @@ class MongoDB(object):
"""
:param experiment_accession: The accession number to check.
:type experiment_accession: str
:returns: Whether or not the experiment is valid
Ensures that an experiment with the accession specified actually exists.
"""
......@@ -252,15 +273,15 @@ class MongoDB(object):
def fetch_from_gridfs(self, gridfs_id, filename, checkmd5=True):
"""
:param gridfs_id: GridFS _id of file to get.
:type gridfs_id: bson.objectid.ObjectId
:type gridfs_id: :py:class:`bson.objectid.ObjectId`
:param filename: Filename to save file to.
:type filename: str
:param checkmd5: Whether or not to validate the md5 of the result
:type checkmd5: bool
Fetch the file with the corresponding id and save it under the
specified 'filename'. If checkmd5 is specified, validate that the saved
file has a correct md5 value.
specified 'filename'. If checkmd5 is specified, validate that the
saved file has a correct md5 value.
"""
try:
gridfs_file = self.gfs.get(gridfs_id)
......@@ -298,10 +319,13 @@ class MongoDB(object):
"""
:param accession: The accession number of the target sample
:type accession: string
:param file_type: The file type of the target sample should be [fastq|bam]
:param file_type: The file type of the target sample.
:type file_type: string
Gets the associated sample based on accession number and file_type
Gets the associated sample based on accession number and file_type.
For loading input files for workflows the file_type should be fastq
or bam. Other file types can be specified for loading additional files
saved in the experiment metadata.
"""
valid = True
msg = ""
......
......@@ -3,8 +3,6 @@ import chipathlon.conf
from Pegasus.DAX3 import File, PFN
class Genome(object):
def __init__(self, assembly, tool, base_file, chrom_sizes):
"""
:param assembly: Version of genome used for building i.e. hg19, grch38p6, mm9
:type assembly: string
......@@ -14,7 +12,13 @@ class Genome(object):
:type base_file: string
:param chrom_sizes: Chromsome sizes file.
:type chrom_sizes: string
The genome handles loading and validating genome files on the disk.
It servers as a helper class to make managing genome input files
much easier.
"""
def __init__(self, assembly, tool, base_file, chrom_sizes):
self.assembly = assembly
self.tool = tool
self.base_file = base_file
......@@ -38,7 +42,7 @@ class Genome(object):
def is_valid(self):
"""
Checks if the run is valid.
Checks if the genome is valid.
"""
return len(self.errors) == 0
......@@ -49,18 +53,40 @@ class Genome(object):
return "\n".join(self.errors)
def get_base_file(self):
"""
:returns: The full name of the base file.
"""
return self.files.get("base_file")
def get_chrom_sizes(self):
"""
:returns: The full name of the chromosome sizes file.
"""
return self.files.get("chrom.sizes")
def get_additional_files(self):
"""
:returns: A list of all additional genome indices.
For bwa returns the file names of the .amb, .ann, .bwt, .pac, and .sa
files. For bowtie2 returns the file names of the .1.bt2, .2.bt2,
.3.bt2, .4.bt2, .rev.1.bt2, and .rev.2.bt2 files.
"""
return self.files.get("additional_files")
def get_chr_fasta_files(self):
"""
:returns: A list of all individual chromsome fasta files (if they exist)
"""
return self.files.get("chr_fasta")
def get_all_files(self):
"""
:returns: A list of all files.
The base file, the chromsome sizes file, all additional genome indices,
and all individual chromosme fasta files.
"""
return [self.get_base_file(), self.get_chrom_sizes()] + self.get_additional_files() + self.get_chr_fasta_files()
def _load_prefixes(self):
......
......@@ -4,13 +4,6 @@ import yaml
from Pegasus.DAX3 import File, PFN
class Result(object):
"""
A class containing information about a result file.
i.e. MongoDB metadata, as well as all jobs and arguments
run on the result file up to this point.
"""
def __init__(self, logical_name, control_samples, signal_samples, all_markers, all_jobs, should_save=False, prefix_join=None, name_template=None, last_result=False):
"""
:param logical_name: The unique name of the file as presented in the module yaml
:type logical_name: string
......@@ -31,16 +24,19 @@ class Result(object):
:param last_result: A boolean to determine if result is the last result in a module.
:type last_result: boolean
The result class is for managing all intermediate output files.
It also helps manage checking if a result already exists for the
purpose of generators creating jobs.
The result class is for managing all intermediate output files. A result
contains all information about a file -- all MongoDB metadata, as well as
jobs and their arguments that have been run on the file up to this point.
The full name of the fill will be a prefix + the given logical name
Consider a bwa paired end read alignment, say we our two paired ends
are control files with accession ENCF0001 and ENCF0002 respectively.
The prefix would be computed as ENCF0001_ENCF0002_bwa_paired_
The final output file from align would be ENCF0001_ENCF0002_bwa_paired_align.bam
The final output file from align would be
ENCF0001_ENCF0002_bwa_paired_align.bam.
"""
def __init__(self, logical_name, control_samples, signal_samples, all_markers, all_jobs, should_save=False, prefix_join=None, name_template=None, last_result=False):
self.logical_name = logical_name
self.all_markers = all_markers
self.control_samples = control_samples
......@@ -96,7 +92,9 @@ class Result(object):
:param val: The meta value.
:type val: Any object
Adds a metadata value.
Adds a metadata value. This function is pretty generic, but sometimes
we want to include additional information on result files that aren't
metadata that already exist in encode.
"""
self.meta[key] = val
return
......@@ -106,7 +104,9 @@ class Result(object):
:param key: The meta key.
:type key: hashable
Gets a stored metadata value.
Gets a stored metadata value. This function is pretty generic, but
sometimes we want to include additional information on result files
that aren't metadata that already exist in encode.
"""
return self.meta.get(key)
......@@ -127,8 +127,7 @@ class Result(object):
:type mdb: MongoDB
:param genome: The genome class containing genomic information
:type genome: Genome
Checks whether or not the result exists in the database.
:returns: Whether or not the result exists in the database.
"""
return mdb.result_exists(self, genome)
......
......@@ -3,14 +3,9 @@ from chipathlon.result import Result
from pprint import pprint
class Run(object):
"""
A class representing an individual smallest workflow.
"""
def __init__(self, genome, peak, signals, controls, file_type, peak_type=None, idr=None):
"""
:param genome: The genome class containing alignment, assembly and file information.
:type genome: :py:class:chipathlon.genome.Genome
:type genome: :py:class:`~chipathlon.genome.Genome`
:param peak: The peak calling method to be used.
:type peak: string
:param signals: List of accessions for signal samples
......@@ -23,7 +18,13 @@ class Run(object):
:type peak_type: string
:param idr: A list of two signal accessions corresponding to which peak results to run with idr.
:type idr: boolean
A run has everything necessary for a single workflow to be run from start
to finish. That is, definitions of tools to use, as well as a list of
control and signal input files to use them on.
"""
def __init__(self, genome, peak, signals, controls, file_type, peak_type=None, idr=None):
self.genome = genome
self.peak = peak
self.signals = signals
......@@ -71,14 +72,14 @@ class Run(object):
def _load_sample_data(self, mdb, accession, sample_type):
"""
Loads records from the database for an individual accession
:param mdb: MongoDB instance to fetch samples from.
:type mdb: :py:class:chipathlon.db.MongoDB
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:param accession: Accession to load data for.
:type accession: string
:param sample_type: Whether the sample is a signal or control file. Should be [signal|control]
:type sample_type: string
Loads records from the database for an individual accession
"""
if accession is not None:
valid, msg, sample = mdb.get_sample(accession, self.file_type)
......@@ -97,10 +98,11 @@ class Run(object):
def load_samples(self, mdb):
"""
:param mdb: MongoDB class instance
:type mdb: :py:class:chipathlon.db.MongoDB
:type mdb: :py:class:`~chipathlon.db.MongoDB`
:returns: None
Loads samples based on the accessions defined in self.signals and
self.controls
self.controls.
"""
for accession in self.signals:
self._load_sample_data(mdb, accession, "signal")
......@@ -110,13 +112,13 @@ class Run(object):
def is_valid(self):
"""
Checks if the run is valid.
:returns: Whether or not the run is valid.
"""
return len(self.errors) == 0
def get_error_string(self):
"""
Returns the errors as a newline separated string.
:returns: The errors as a newline separated string.
"""
return "\n".join(self.errors)
......@@ -124,22 +126,22 @@ class Run(object):
"""
:param sample_type: Type of samples to retrieve either control or signal
:type sample_type: str
:returns: The samples if they exist else none
Gets the mongodb metadata associated with all samples for the run.
"""
if sample_type in self.samples:
return self.samples[sample_type]
else:
return None
return self.samples.get(sample_type)
def add_result(self, module_name, result):
"""
:param module_name: The name of the module (i.e. align, remove_duplicates, peak_call...) to add a result for.
:type module_name: string
:param result: The result class containing necessary metadata information about the module output file.
:type result: :py:class:chipathlon.result.Result
:type result: :py:class:`~chipathlon.result.Result`
Adds a result to the current run indexed by module_name.
Adds a result to the current run indexed by module_name and logical
name. For example, if you add the final result from the align
module (align.bam). It will be indexed under
"""
if module_name not in self.results:
self.results[module_name] = {}
......@@ -155,9 +157,10 @@ class Run(object):
:type module_name: string
:param logical_file_name: The unique file name as seen in the module yaml file. I.e. align.bam
:type logical_file_name: string
:returns: A list of results, or an empty list if no results are found.
Returns a result based on module_name and logical_file_name.
Returns None if no such result exists.
Returns all results that match the provided module and logical file
name. If not results exist, get_results returns an empty list.
"""
if module_name in self.results:
if logical_file_name in self.results[module_name]:
......@@ -171,11 +174,13 @@ class Run(object):
:param logical_file_name: The unique file name as seen in the module yaml file. I.e. align.bam
:type logical_file_name: string
:param final_result: The final result of the module w/ markers
:type final_result: :py:class:~chipathlon.result.Result
:type final_result: :py:class:`~chipathlon.result.Result`
:returns: A :py:class:`~chipathlon.result.Result` or None
Returns a result that matches the metadata of the final result exactly.
We are guarenteed either a single result or no result.
That is, the markers taken across modules are the same, the
signal and control accession are the same, and the prefixes of the
file match exactly.
"""
results = self.get_results(module_name, logical_file_name)
for result in results:
......
No preview for this file type
......@@ -10,8 +10,6 @@ from pprint import pprint
class WorkflowJob(object):
def __init__(self, base_file, params, debug=False):
"""
:param base_file: The yaml job definition file.
:type base_file: str
......@@ -19,7 +17,12 @@ class WorkflowJob(object):
:type param_file: str
:param debug: A debug flag to print out additional information.
:type debug: bool
The WorkflowJob contains all infomration necessary to create a single job.
Parameters and resources are loaded from a yaml file.
"""
def __init__(self, base_file, params, debug=False):
self.errors = []
self.debug = debug
self.raw_files = {}
......@@ -50,7 +53,7 @@ class WorkflowJob(object):
def is_valid(self):
"""
Checks if the run is valid.
Checks if the run is valid... Pretty straightforward.
"""
return len(self.errors) == 0
......@@ -353,6 +356,7 @@ class WorkflowJob(object):
:type additional_inputs: list
:param outputs: Output params for the job.
:type outputs: list
:returns: The job created or None if unsuccessful
Creates an actual Pegasus.Dax3.Job instance from the provided
information. Inputs, additional_inputs, and outputs should
......
......@@ -15,10 +15,21 @@ from Pegasus.DAX3 import *
class WorkflowModule(object):
"""
:param module_yaml: The yaml module definition file.
:type module_yaml: str
:param workflow_jobs: A dictionary mapping job name -> workflow job class.
:type workflow_jobs: dict
:param debug: A flag to print additional info.
:type debug: bool
The workflow module handles the inner workings of an individual module
yaml file. Provided with the correct workflow_jobs and a list of
files, the workflow module creates all jobs in the correct order for
the target module.
"""
def __init__(self, module_yaml, workflow_jobs, debug = False):
# module_yaml is the module yaml file
# workflow_jobs is a dictionary that points from name -> actual job class
self.err = ""
self.debug = debug
try:
......@@ -46,13 +57,14 @@ class WorkflowModule(object):
def get_job_list(self, markers):
"""
:param markers: A dictionary containing marker values for the module.
:param markers: The splits to take for the current module.
:type markers: dict
:returns: A list of all jobs that will be run for this workflow with
the provided markers.
This function returns the list of jobs that will be run provided
the markers. If markers is not specified, it simply returns the
entire workflow, otherwise, it returns the appropriate jobs.
"""
if markers:
job_list = self.workflow
......@@ -159,6 +171,18 @@ class WorkflowModule(object):
return
def get_job_params(self, markers, job_name):
"""
:param markers: Input markers.
:type markers: dict
:param job_name: The name of the job to get params for.
:type job_name: str
:returns: A dictionary containing inputs, additional_inputs, and outputs.
Returns the inputs, additional_inputs, and outputs defined for the
specified job & marker combination from the module. The values returned
will be based on the names defined in the module template file, they
will not be real file names.
"""
job_list = self.get_job_list(markers)
for job_dict in job_list:
if job_name == job_dict.keys()[0]:
......@@ -169,8 +193,8 @@ class WorkflowModule(object):
"""
:param markers: The input markers.
:type markers: dict
Returns a list of job names that match the provided markers.
:returns: The names of the jobs that will be run for this module with
the provided markers.
"""
job_list = self.get_job_list(markers)
job_names = []
......@@ -180,12 +204,32 @@ class WorkflowModule(object):
def add_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
"""
dax -- The pegasus dax object from workflow
master_jobs -- A dictionary that maps jobname -> Job object
master_files -- A dicitonary that maps filename -> File object
markers -- Dictionary for splitting within module
inputs -- Dictionary of input_file name -> master_files index OR input argument name -> value
additional_files -- dictionary of additional_file name -> master_files index
:param dax: The dax to add jobs to.
:type dax: Pegasus.DAX3.ADAG
:param master_jobs: A dictionary mapping job names -> Pegasus job objects.
:type master_jobs: dict
:param master_files: A dictionary mapping file names -> File objects.
:type master_files: dict
:param markers: The splits to take for the current module.
:type markers: dict
:param inputs: A dictionary mapping the logical file names defined in
the workflow module -> the full file name.
:type inputs: dict
:param additional_inputs: A dictionary mapping the logical file names
defined in the workflow module -> the full file name.
:type additional_inputs: dict
:param outputs: A dictionary mapping the logical file names defined in
the workflow module -> the full file name.
:returns: None
Adds all the jobs in the correct order for the current workflow.
In this case, the inputs, additional_inputs, and outputs need to be
passed in for all jobs in the workflow. If you consider a small
example of three jobs that get run back to back, with the output of
each getting piped into the input of the next. In this case, we only
need to pass a single input in, because the inputs of the future jobs
use the outputs of the previous jobs. We need to pass in three outputs
still for the outputs of each of the jobs.
"""
valid, msg = self._check_params(master_files, markers, inputs, additional_inputs, outputs)
if valid:
......@@ -350,26 +394,14 @@ class WorkflowModule(object):
dax.depends(child = job, parent = master_jobs[file_dict["name"]])
return
def get_dependent_jobs(self, dax, master_jobs, master_files, markers, inputs, additional_inputs, outputs):
dep_list = []
job_list = self.get_job_list(markers)
for job_dict in job_list:
job_name = job_dict.keys()[0]
job_info = job_dict[job_name]
job_inputs = self._setup_job_params(master_files, job_info, markers, "inputs", inputs, outputs)
for file_dict in job_inputs:
if file_dict["name"] in master_jobs and file_dict["name"] not in dep_list:
dep_list.append(file_dict["name"])
return dep_list
def get_final_outputs(self, markers):
job_list = self.get_job_list(markers)
job_dict = job_list[-1]
job_name = job_dict.keys()[0]
outputs = job_dict[job_name]["outputs"]
return [output.keys()[0] for output in outputs]
def get_all_final_results(self, markers):
"""
:param markers: The splits to take for the current workflow.
:type markers: dict
:returns: A list of all output files marked with final_result: True
Gets all the final_results for the workflow with the target markers.
"""
job_list = self.get_job_list(markers)
final_results = []
for i, job_dict in enumerate(job_list):
......
MongoDB
==============
MongoDB Class
^^^^^^^^^^^^^^
.. autoclass:: chipathlon.db.MongoDB
:members:
Genome
==============
Genome Class
^^^^^^^^^^^^^
.. autoclass:: chipathlon.genome.Genome
:members:
Result
==============
Result Class
^^^^^^^^^^^^^
.. autoclass:: chipathlon.result.Result
:members:
Run
==============
Run Class