Commit dc6a2061 authored by aknecht2's avatar aknecht2
Browse files

All python scripts now conform to PEP8 standard (except E501).

parent 740e5fb7
......@@ -4,6 +4,7 @@ import sys
import traceback
from pprint import pprint
class MongoDB(object):
def __init__(self, host, username, password):
......@@ -18,7 +19,7 @@ class MongoDB(object):
self.gfs = gridfs.GridFS(self.db)
return
def load_bed(self, collection, result_id, bed_file, attributes = {}):
def load_bed(self, collection, result_id, bed_file, attributes={}):
return
......
......@@ -2,9 +2,9 @@
import chipathlon
import argparse
parser = argparse.ArgumentParser(description = "Download target file.")
parser.add_argument("--url", dest="url", required = True, help="Target url.")
parser.add_argument("--local_path", dest="local_path", required = True, help="Local path to file.")
parser = argparse.ArgumentParser(description="Download target file.")
parser.add_argument("--url", dest="url", required=True, help="Target url.")
parser.add_argument("--local_path", dest="local_path", required=True, help="Local path to file.")
parser.add_argument("--url_type", dest="url_type", default="ftp://", help="Type of url to access.")
parser.add_argument("--retries", dest="retries", default=3, type=int, help="Number of retries.")
parser.add_argument("--overwrite", dest="overwrite", default=False, action="store_true", help="Overwrite local file if exists.")
......@@ -14,9 +14,9 @@ args = parser.parse_args()
chipathlon.utils.downloadFile(
args.url,
args.local_path,
urltype = args.url_type,
retries = args.retries,
overwrite = args.overwrite,
checkmd5 = (not not args.md5),
md5 = args.md5
urltype=args.url_type,
retries=args.retries,
overwrite=args.overwrite,
checkmd5=(not not args.md5),
md5=args.md5
)
......@@ -5,7 +5,7 @@ import chipathlon.db
import chipathlon.workflow
import argparse
parser = argparse.ArgumentParser(description = "Perform a join between the experiment and sample collections.")
parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.")
parser.add_argument("--password", dest="password", required=True, help="Database user password.")
parser.add_argument("--username", dest="username", default="aknecht", required=True, help="Database user.")
parser.add_argument("--host", dest="host", default="hcc-anvil-241-41.unl.edu", required=True, help="Database host.")
......@@ -13,6 +13,7 @@ args = parser.parse_args()
mdb = chipathlon.db.MongoDB(args.host, args.username, args.password)
# Shamelessly stolen from: https://svn.blender.org/svnroot/bf-blender/trunk/blender/build_files/scons/tools/bcolors.py
class bcolors(object):
HEADER = '\033[95m'
......@@ -24,6 +25,7 @@ class bcolors(object):
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def print_test(name, success):
if success:
print bcolors.OKBLUE + name + ": Success" + bcolors.ENDC
......@@ -31,6 +33,7 @@ def print_test(name, success):
print bcolors.FAIL + name + ": Failure" + bcolors.ENDC
return
# Test 1, valid inputs
def yaml_job_test_1():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params1.yaml")
......@@ -38,6 +41,7 @@ def yaml_job_test_1():
print_test("Yaml_Job_Test_1", arg_list == ['mem', '-M', '-t 1', 'lfn1', 'lfn2', 'lfn3'])
return
# Test2, invalid arguments
def yaml_job_test_2():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params2.yaml")
......@@ -45,12 +49,14 @@ def yaml_job_test_2():
print_test("Yaml_Job_Test_2", yj.err != err)
return
# Test3, non-yaml input file
def yaml_job_test_3():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params3.yaml")
print_test("Yaml_Job_Test_3", not yj.valid())
return
# Test4, ill-defined input file
def yaml_job_test_4():
yj = chipathlon.yaml_job.YamlJob("bwa_align_paired", "test/yaml_job/params4.yaml")
......@@ -58,48 +64,54 @@ def yaml_job_test_4():
print_test("Yaml_Job_Test_4", yj.err != err)
return
# Test5, valid exp, with 2 experiment and 2 control samples
def exp_files_test_1():
valid, msg, data = mdb.get_samples("ENCSR000BSE")
print_test("Exp_Files_Test_1", valid and msg == "Succesfully retrieved input files for experiment with id 'ENCSR000BSE'." and len(data["control"]) == 2 and len(data["experiment"]) == 2)
return
# Test6, invalid exp id
def exp_files_test_2():
valid, msg, data = mdb.get_samples("NOT_AN_ID")
print_test("Exp_Files_Test_2", not valid and msg == "Experiment with id 'NOT_AN_ID' does not exist.")
return
# Test7, invalid metadata
def exp_files_test_3():
valid, msg, data = mdb.get_samples("ENCSR329RIP")
print_test("Exp_Files_Test_3", not valid and msg == "Experiment with id 'ENCSR329RIP' does not have all required metadata (assembly, target, no revoked_files).")
return
# Test8, multiple control experiments
def exp_files_test_4():
valid, msg, data = mdb.get_samples("ENCSR000CWZ")
print_test("Exp_Files_Test_4", valid and msg == "Succesfully retrieved input files for experiment with id 'ENCSR000CWZ'." and len(data["control"]) == 4 and len(data["experiment"]) == 2)
return
# Test9, multiple control experiments, no possible control_inputs
def exp_files_test_5():
valid, msg, data = mdb.get_samples("ENCSR000DKB")
print_test("Exp_Files_Test_5", not valid and msg == "Experiment with id 'ENCSR000DKB' has '0' possible control inputs, and '3' possible experiment inputs.")
return
def workflow_test_1():
workflow = chipathlon.workflow.Workflow("test/run/", "test/run/run.yaml", "test/run/param.yaml", args.host, args.username, args.password)
workflow.generate()
return
def valid_samples():
has_samples, total = mdb.check_valid_samples()
print_test("Verify_All_Samples", has_samples == total)
return
# tests = [
# yaml_job_test_1,
# yaml_job_test_2,
......
......@@ -4,14 +4,16 @@ import urllib2
import os
import traceback
def progress(current, end, length = 20):
def progress(current, end, length=20):
percent = float(current) / end
hashes = "#" * int(round(percent * length))
spaces = " " * (length - len(hashes))
sys.stdout.write("\rProcessed %s / %s entries. [%s] %s%%" % (current, end, hashes + spaces, int(round(percent * 100))))
sys.stdout.flush()
def downloadFile(url, localpath, urltype = "ftp://", retries = 3, overwrite = False, checkmd5 = False, md5 = None):
def downloadFile(url, localpath, urltype="ftp://", retries=3, overwrite=False, checkmd5=False, md5=None):
success = False
if url[:7] == "http://":
urltype = ""
......
......@@ -11,16 +11,10 @@ import chipathlon
from pprint import pprint
from Pegasus.DAX3 import *
class Workflow(object):
def __init__(self, jobhome, run_file, param_file, host, username, password):
"""
:param jobhome: The base directory for job submission.
:type jobhome: str
Creates a Workflow class based on the input directory. Only loads and
validates the config file by default.
"""
# Initialize db connection
self.mdb = chipathlon.db.MongoDB(host, username, password)
# Jobname info & err
......@@ -65,19 +59,19 @@ class Workflow(object):
sys.exit(1)
return
def load_executables(self, os_type = "linux", arch = "x86_64"):
def load_executables(self, os_type="linux", arch="x86_64"):
# Load wrapper scripts for commands that need to be loaded from module
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_wrappers)):
for f in files:
ex_name = f.split("_")[0]
self.executables[ex_name] = Executable(name = ex_name, os = os_type, arch = arch)
self.executables[ex_name] = Executable(name=ex_name, os=os_type, arch=arch)
self.executables[ex_name].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
self.dax.addExecutable(self.executables[ex_name])
break
# Load actual scripts
for root, dirs, files in os.walk("%s/scripts" % (os.path.dirname(os.path.realpath(__file__)),)):
for f in files:
self.executables[f] = Executable(name = f, os = os_type, arch = arch)
self.executables[f] = Executable(name=f, os=os_type, arch=arch)
self.executables[f].addPFN(PFN("file://%s/%s" % (root, f), "condorpool"))
self.dax.addExecutable(self.executables[f])
return
......@@ -128,7 +122,7 @@ class Workflow(object):
break
return
def _addFile(self, name, inout, site = "condorpool", path = None):
def _addFile(self, name, inout, site="condorpool", path=None):
"""
:param name: Name of the file to add.
:type name: str
......@@ -140,78 +134,12 @@ class Workflow(object):
self.files[inout][name] = {"file": File(name), "path": ""}
if inout == "input":
self.files[inout][name]["path"] = path if path else name
self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ","%20"), site))
self.files[inout][name]["file"].addPFN(PFN("file://" + path.replace(" ", "%20"), site))
self.dax.addFile(self.files[dax][inout][name]["file"])
# elif inout == "output":
# self.map.write("%s file://%s pool=\"local\"\n" % (name, self.basepath + "/output/" + name))
return
def _addJob(self, jobname, executable, inputs, outputs, arguments, dependencies = None, dax = None, label = None, walltime = None):
"""
:param jobname: Name of the job to add.
:type jobname: str
:param executable: Executable to run.
:type executable: str
:param inputs: Information for all necessary inputs. See formatting information below.
:type inputs: dict
:param outputs: Information for all necessary outputs. See formatting information below.
:type outputs: dict
:param arguments: Dictionary of arguments to add.
:type arguments: dict
:param dependencies: List of jobnames to be dependent upon.
:type dependencies: list
Input / Ouptut dicts are formatted the same way:
{
FNAME1: {
"file": PATH,
"transfer": True / False
},
FNAME2: {
"file": PATH,
"transfer": True / False
},
...
}
The keys of the dictionary represent the internal naming of an
individual file within the dax. The path definition should
either be the full path to the input file, or the full name
of the output file (output paths are NOT created at runtime).
The transfer argument simply represents if you want the file to
be transferred to the output folder. As such, it is not required
for input files.
"""
dax = dax if dax else self.dax
if not self._isJob(jobname, dax):
self.jobs[dax][jobname] = Job(executable)
dax.addJob(self.jobs[dax][jobname])
for key in inputs:
self.jobs[dax][jobname].uses(inputs[key]["file"], link = Link.INPUT)
for key in outputs:
self._addFile(os.path.basename(key), "output")
self.jobs[dax][jobname].uses(outputs[key]["file"], link = Link.OUTPUT, transfer = outputs[key]["transfer"])
arglist = []
for arg in arguments:
arglist.append(arg)
if str(arguments[arg]) in inputs:
arglist.append(inputs[str(arguments[arg])]["file"])
elif str(arguments[arg]) in outputs:
arglist.append(outputs[str(arguments[arg])]["file"])
else:
arglist.append(str(arguments[arg]))
self.jobs[dax][jobname].addArguments(*arglist)
if dependencies:
for depend in dependencies:
dax.depends(child = self.jobs[dax][jobname], parent = self.jobs[dax][depend])
if label:
self.jobs[dax][jobname].profile(Namespace.PEGASUS, "label", label)
if walltime:
self.jobs[dax][jobname].profile(Namespace.GLOBUS, "maxwalltime", walltime)
return
def _loadNotify(self, config):
self.dax.invoke(When.AT_END, self.basepath + "/input/notify.sh")
with open(self.basepath + "/input/notify.sh", "w") as wh:
......@@ -244,7 +172,6 @@ class Workflow(object):
wh.write(pegasusrc)
return
def _createSites(self, config):
"""
Creates the pegasus site catalog. input/sites.xml
......@@ -270,7 +197,7 @@ class Workflow(object):
val = ":".join(config["profile"][namespace][key]) if "path" in key.lower() else config["profile"][namespace][key]
sites += """\n\t<profile namespace="%s" key="%s">%s</profile> """ % (namespace, key, val)
sites += "</site></sitecatalog>"
sites = sites.replace("\n","")
sites = sites.replace("\n", "")
wh.write("\n".join([line for line in xml.dom.minidom.parseString(sites).toprettyxml().split('\n') if line.strip()]))
return
......@@ -310,7 +237,6 @@ class Workflow(object):
os.chmod(self.basepath + "/input/submit.sh", 0755)
return
def write(self):
with open(self.basepath + "/input/chipathlon.dax", "w") as wh:
self.dax.writeXML(wh)
......
......@@ -11,6 +11,7 @@ import chipathlon
from pprint import pprint
from Pegasus.DAX3 import *
class WorkflowModule(Object):
def __init__(self, module_yaml, yaml_jobs):
......@@ -48,6 +49,5 @@ class WorkflowModule(Object):
# We always have a list!
for item in data:
if item not in self.yaml_jobs:
pass
return
......@@ -6,6 +6,7 @@ import chipathlon
import chipathlon.conf
from pprint import pprint
class YamlJob(object):
def __init__(self, base_yaml, param_file):
......@@ -68,7 +69,7 @@ class YamlJob(object):
if not arg_info["changeable"] and arg in self.params["arguments"]:
self.err += "Unchangeable argument '%s' specified in params file.\n" % (arg,)
if arg_info["required"] and arg_info["changeable"] and "default" not in arg_info:
if not arg in self.params["arguments"]:
if arg not in self.params["arguments"]:
self.err += "Required argument '%s' does not have a default, and is not defined in params file.\n" % (arg,)
if self.params["arguments"] is not None:
for arg in self.params["arguments"]:
......@@ -80,7 +81,7 @@ class YamlJob(object):
valid_files = True
for ftype, flist in zip(["inputs", "additional_inputs", "outputs"], [input_files, additional_inputs, output_files]):
if (len(flist) == len(self.base[self.jobname][ftype])):
for i,f in enumerate(flist):
for i, f in enumerate(flist):
if not f["name"].split(".")[1] in chipathlon.conf.file_extensions[self.base[self.jobname][ftype][i]]:
self.err += "File '%s' is not of type '%s'. Should match one of '%s' extensions.\n" % (f, self.base[self.jobname][ftype][i], chipathlon.conf.file_extensions[self.base[self.jobname]["inputs"][i]])
valid_files = False
......@@ -104,7 +105,7 @@ class YamlJob(object):
for f in input_files + additional_inputs:
job.uses(f["file"], link=Link.INPUT)
for f in output_files:
job.uses(f["file"], link=Link.OUTPUT, transfer = True)
job.uses(f["file"], link=Link.OUTPUT, transfer=True)
job.addArguments(*self._create_arg_list(input_files, output_files))
else:
print self.err
......
......@@ -3,7 +3,7 @@ import argparse
from pprint import pprint
import gridfs
parser = argparse.ArgumentParser(description = "Perform a join between the experiment and sample collections.")
parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.")
parser.add_argument("--password", dest="password", required=True, help="Database user password.")
parser.add_argument("--username", dest="username", default="aknecht", required=True, help="Database user.")
parser.add_argument("--host", dest="host", default="hcc-anvil-241-41.unl.edu", required=True, help="Database host.")
......@@ -20,7 +20,7 @@ gfs = gridfs.GridFS(db)
# Put a file
with open("join_example.py", "r") as rh:
gfs.put(rh, filename = "foo", attribute1 = "asdf", attribute2 = "asdf")
gfs.put(rh, filename="foo", attribute1="asdf", attribute2="asdf")
# Find files
cursor = gfs.find({
......
......@@ -2,7 +2,7 @@ from pymongo import MongoClient
import argparse
from pprint import pprint
parser = argparse.ArgumentParser(description = "Perform a join between the experiment and sample collections.")
parser = argparse.ArgumentParser(description="Perform a join between the experiment and sample collections.")
parser.add_argument("--password", dest="password", required=True, help="Database user password.")
parser.add_argument("--username", dest="username", default="aknecht", required=True, help="Database user.")
parser.add_argument("--host", dest="host", default="hcc-anvil-241-41.unl.edu", required=True, help="Database host.")
......@@ -24,7 +24,7 @@ cursor = db.experiments.aggregate([
"as": "samples"
}
}
## Potentially more aggregation steps here.
# Potentially more aggregation steps here.
])
for document in cursor:
......
......@@ -8,12 +8,12 @@ import requests
import urlparse
import os.path
parser = argparse.ArgumentParser(description = "Download raw JSON for all experiments.")
parser.add_argument("-o","--output-dir",dest="outputdir",default=os.getcwd(),help="Output directory. (default: %(default)s)")
parser = argparse.ArgumentParser(description="Download raw JSON for all experiments.")
parser.add_argument("-o", "--output-dir", dest="outputdir", default=os.getcwd(), help="Output directory. (default: %(default)s)")
args = parser.parse_args()
encode_baseurl = "https://www.encodeproject.org/experiments/"
json_arg = {'format':'json'}
json_arg = {'format': 'json'}
# Use the all-experiments file to get the experiment IDs
with open("data/meta_clean.json", "r") as rh:
......@@ -21,17 +21,17 @@ with open("data/meta_clean.json", "r") as rh:
exp_ids = []
for exp in data:
exp_ids.append(exp['accession'])
exp_ids.append(exp['accession'])
# Loop through the IDs and use the exp ID to download the full JSON file.
total = len(exp_ids)
for i,exp_id in enumerate(exp_ids):
exp_url = urlparse.urljoin(encode_baseurl,exp_id)
r = requests.get(exp_url,params=json_arg)
fh = open(os.path.join(args.outputdir,exp_id+'.json'),'w')
fh.write(r.text)
fh.close()
for i, exp_id in enumerate(exp_ids):
exp_url = urlparse.urljoin(encode_baseurl, exp_id)
r = requests.get(exp_url, params=json_arg)
fh = open(os.path.join(args.outputdir, exp_id+'.json'), 'w')
fh.write(r.text)
fh.close()
progress(i,total)
progress(total,total)
progress(i, total)
progress(total, total)
print
......@@ -8,12 +8,12 @@ from chipathlon.utils import progress
import os
import os.path
parser = argparse.ArgumentParser(description = "Read per-experiment JSON files and create experiment and samples collections.")
parser.add_argument("-p","--password", dest="password", required=True, help="Database user password.")
parser.add_argument("-u","--username", dest="username", default="aknecht", required=True, help="Database user. (default: %(default)s)")
parser.add_argument("-H","--host", dest="host", default="hcc-anvil-175-9.unl.edu", help="Database host. (default: %(default)s)")
parser.add_argument("-i","--input-dir",dest="inputdir",default=os.getcwd(),help="Directory containing per-experiment JSON files. (default: %(default)s)")
parser.add_argument("-d","--drop", dest="drop", default=False, action="store_true", help="Drop data if it exists. (default: %(default)s)")
parser = argparse.ArgumentParser(description="Read per-experiment JSON files and create experiment and samples collections.")
parser.add_argument("-p", "--password", dest="password", required=True, help="Database user password.")
parser.add_argument("-u", "--username", dest="username", default="aknecht", required=True, help="Database user. (default: %(default)s)")
parser.add_argument("-H", "--host", dest="host", default="hcc-anvil-175-9.unl.edu", help="Database host. (default: %(default)s)")
parser.add_argument("-i", "--input-dir", dest="inputdir", default=os.getcwd(), help="Directory containing per-experiment JSON files. (default: %(default)s)")
parser.add_argument("-d", "--drop", dest="drop", default=False, action="store_true", help="Drop data if it exists. (default: %(default)s)")
args = parser.parse_args()
......@@ -22,25 +22,25 @@ db = client.chipseq
db.authenticate(args.username, args.password, mechanism="SCRAM-SHA-1")
if args.drop:
db.experiments.drop()
db.samples.drop()
db.experiments.drop()
db.samples.drop()
# Insert all experiment JSON files into DB
json_filelist = os.listdir(args.inputdir)
for i,json_file in enumerate(json_filelist):
exp_file_contents = open(os.path.join(args.inputdir,json_file)).read()
# '$' is a reserved character (operator) in MongoDB. Some experiments have keys that start
# with $, which breaks the insert. Replace with _$ everywhere as a quick-and-dirty workaround.
exp_file_contents = exp_file_contents.replace('$','_$')
exp_json_data = json.loads(exp_file_contents)
db.experiments.insert_one(exp_json_data)
progress(i,len(json_filelist))
for i, json_file in enumerate(json_filelist):
exp_file_contents = open(os.path.join(args.inputdir, json_file)).read()
# '$' is a reserved character (operator) in MongoDB. Some experiments have keys that start
# with $, which breaks the insert. Replace with _$ everywhere as a quick-and-dirty workaround.
exp_file_contents = exp_file_contents.replace('$', '_$')
exp_json_data = json.loads(exp_file_contents)
db.experiments.insert_one(exp_json_data)
progress(i, len(json_filelist))
organism = {
"mm": "mouse",
"hg": "human",
"ce": "celegans"
"mm": "mouse",
"hg": "human",
"ce": "celegans"
}
# Find the experiments we want to create the samples collection.
......@@ -49,11 +49,11 @@ cursor = db.experiments.find({
"revoked_files.0": {"$exists": False},
"assembly.0": {"$exists": True},
"assembly.1": {"$exists": False}
},no_cursor_timeout=True)
}, no_cursor_timeout=True)
total = cursor.count()
# Create the samples collection using the 'files' value from each experiment
for i,document in enumerate(cursor):
for i, document in enumerate(cursor):
for f in document["files"]:
doc = {}
doc["experiment_id"] = document["uuid"]
......@@ -68,9 +68,9 @@ for i,document in enumerate(cursor):
doc["transcription_factor"] = document["target"]["@id"].split("/")[2].split("-")[0]
doc["cell_type"] = document["biosample_term_name"]
doc["filename"] = os.path.split(f['href'])[-1]
doc["url"] = os.path.join("encodeproject.org","files",f["accession"],"@@download")
doc["url"] = os.path.join("encodeproject.org", "files", f["accession"], "@@download")
result = db.samples.insert_one(doc)
db.samples.update_one({'_id': result.inserted_id},{"$set": f})
db.samples.update_one({'_id': result.inserted_id}, {"$set": f})
progress(i, total)
progress(total, total)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment