Commit 15dea386 authored by aknecht2's avatar aknecht2
Browse files

Updated workflow_job validation. Added rawfile type. Added gem wrapper...

Updated workflow_job validation.  Added rawfile type.  Added gem wrapper script, yaml, and chromsome conversion script.
parent 10f026fd
......@@ -51,6 +51,8 @@ file_extensions = {
"xls": ["xls"],
"yaml": ["yaml"],
"result": ["bed", "narrowPeak", "broadPeak", "tagAlign"],
"txt": ["txt"],
"chrom_sizes": ["sizes"]
}
# list of resources that can be specified per job (step) in
......@@ -76,18 +78,33 @@ resources = {
# Defines the types of input / output arguments
# argument -> Any non file argument
# string -> Any string argument
# numeric -> Numeric arguments.
# file -> Any file argument that isn't redirected
# file -> Normal file arguments.
# stdout -> Any file argument that is redirected from stdout
# stderr -> Any file argument that is redirected from stderr
argument_types = {
"argument": ["argument"],
"file": ["file", "stdout", "stderr"]
"argument": ["string", "numeric"],
"file": ["file", "rawfile", "stdout", "stderr"]
}
# Defines information about arguments
argument_keys = {
"required": ["type", "changeable", "has_value"],
"optional": ["required", "default"]
}
# workflow_job keys
job_keys = {
"required": ["inputs", "additional_inputs", "outputs", "command", "arguments"] + resources.keys(),
"optional": []
}
# param keys
param_keys = {
"required": ["arguments"],
"optional": resources.keys()
"required": [],
"optional": ["arguments"] + resources.keys()
}
# workflow order
......
......@@ -33,3 +33,11 @@ remove_duplicates:
outputs:
- no_dups.bed:
type: stdout
- chr_locus_convert.yaml:
inputs:
- no_dups.bed:
type: file
additional_inputs: null
outputs:
- no_dups_chr.bed:
type: file
db_save_result:
inputs:
- bed:
type: file
additional_inputs: null
outputs:
- bed:
type: file
command: db_save_result.py
arguments:
- "-b":
changeable: false
required: true
has_value: true
default: $inputs.0
- "-o":
changeable: false
required: true
has_value: true
default: $outputs.0
walltime: 120
memory: 4000
cores: 1
nodes: 1
gem_callpeak:
inputs:
- txt:
type: rawfile
- chrom_sizes:
type: file
- bed:
type: file
- bed:
type: file
- genome:
type: argument
- prefix:
type: argument
additional_inputs: null
outputs:
- peak:
type: file
command: gem
arguments:
- "--d":
type: file
changeable: true
required: true
has_value: true
default: $inputs.0
- "--g":
type: file
changeable: false
required: true
has_value: true
default: $inputs.1
- "--expt":
type: file
changeable: false
required: true
has_value: true
default: $inputs.2
- "--ctrl":
type: file
changeable: false
required: true
has_value: true
defualt: $inputs.3
- "--genome":
type: file
changeable: false
required: true
has_value: true
default: $inputs.4
- "--out":
type: argument
changeable: false
required: true
has_value: true
default: $inputs.5
- "--k_min":
type: numeric
changeable: true
required: true
has_value: true
default: 6
- "--k_max":
type: numeric
changeable: true
required: false
has_value: true
default: 13
- "--q":
type: numeric
changeable: true
required: false
has_value: true
default: 1
- "--t":
type: numeric
changeable: true
required: false
has_value: true
default: 2
- "--outNP":
type: boolean
changeable: false
required: false
has_value: false
walltime: 720
memory: 16000
cores: 2
nodes: 1
import pymysql
import argparse
import os
parser = argparse.ArgumentParser(description="Replace chromosome locus with number.")
parser.add_argument("-b", "--bed", dest="bed", required=True, help="Input bed file.")
parser.add_argument("-o", "--out", dest="out", required=True, help="Output file.")
parser.add_argument("-c", "--c", dest="chr", default=False, action="store_true", help="If specified convert from name to chr number, otherwise convert from chr number to name.")
parser.add_argument("-d", "--db", dest="db", default="hg38", help="Database to load conversion from.")
parser.add_argument("-s", "--server", dest="server", default="genome-mysql.cse.ucsc.edu", help="Location of mysql server.")
parser.add_argument("-u", "--user", dest="user", default="genome", help="Username for db login.")
parser.add_argument("-p", "--password", dest="password", default="", help="Password for db login.")
parser.add_argument("-t", "--table", dest="table", default="ucscToINSDC", help="Table to retrieve locus -> number info from.")
args = parser.parse_args()
if os.path.isfile(args.bed):
# Setup connection, retrieve all info
conn = pymysql.connect(host=args.server, user=args.user, password=args.password, db=args.db, cursorclass=pymysql.cursors.DictCursor)
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM %s" % (args.table,))
conn.commit()
results = cursor.fetchall()
# Results are a list of dictionaries ->
# [{"name": "CM000663.2", "chrom": "chr1"}, ...]
# Convert to a single dictionary with comprehension
# depending on directionality of the conversion
chr_map = dict([
(row["name"], row["chrom"]) if args.chr else (row["chrom"], row["name"])
for row in results
])
# Read in the input file, write the output file
with open(args.bed, "r") as rh:
with open(args.out, "w") as wh:
for line in rh.readlines():
# Bed is a 6 column format
bed_data = line.split()
if bed_data[0] in chr_map:
bed_data[0] = chr_map[bed_data[0]]
# Convert back from array to lines
wh.write("\t".join(bed_data) + "\n")
else:
print "Input file %s does not exist." % (args.bed,)
This diff is collapsed.
#!/bin/bash
. /util/opt/lmod/lmod/init/profile
module load java gem
gem "$@"
......@@ -124,16 +124,15 @@ class Workflow(object):
def _load_workflow_jobs(self):
# Load each yaml_job as an actual object
for root, dirs, files in os.walk("%s/%s" % (os.path.dirname(os.path.realpath(__file__)), chipathlon.conf.job_params)):
for f in files:
with open(os.path.join(root, f), "r") as rh:
job_info = yaml.load(rh)
ex_name = job_info[job_info.keys()[0]]["command"]
if ex_name in self.executables:
yj = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file, self.executables[ex_name])
if not yj.err:
self.workflow_jobs[yj.jobname] = yj
yaml_job = chipathlon.workflow_job.WorkflowJob(os.path.join(root, f), self.param_file)
if not yaml_job.err:
if yaml_job.command in self.executables:
yaml_job.add_executable(self.executables[yaml_job.command])
for raw_file in yaml_job.raw_files:
self._add_file(raw_file["name"], raw_file["path"], "local")
raw_file["file"] = self.files(raw_file["name"])
else:
self.err += yj.err
self.err += yaml_job.err
elif self.debug:
print "[WARNING] Skipping param file %s, corresponding executable %s not found." % (f,)
break
......@@ -445,19 +444,19 @@ class Workflow(object):
for run in self.run_data["runs"]:
inputs = {}
markers["tool"] = run["peak"]
if run["peak"] == "macs2":
print run["jobs"]
for pair in run["input_set"]:
# Now we use individual file_tuples to extract the correct
# prefix and add in as input_files
# remove_duplicates returns prefix_no_dups.bam
# remove_duplicates returns prefix_no_dups_chr.bed
# REMEMBER: experiment is always first
final_prefix = run["prefix"]["align"][0] + "_" + run["prefix"]["align"][1]
inputs["exp.bed"] = run["prefix"]["align"][0] + "_no_dups.bed"
inputs["control.bed"] = run["prefix"]["align"][1] + "_no_dups.bed"
inputs["exp.bed"] = run["prefix"]["align"][0] + "_no_dups_chr.bed"
inputs["control.bed"] = run["prefix"]["align"][1] + "_no_dups_chr.bed"
run["prefix"]["peak_call"] = self.modules["peak_call"]._get_full_prefix(final_prefix, markers)
if run["peak"] == "macs2":
inputs["prefix"] = run["prefix"]["peak_call"]
elif run["peak"] == "gem":
pass
run["markers"]["peak_call"] = markers
self.modules["peak_call"].add_jobs(self.dax, self.jobs, self.files, final_prefix, markers, inputs, {})
run["jobs"]=list(set(self.modules["peak_call"].get_job_names(markers)) | set(run["jobs"]))
......
import yaml
import traceback
import os
import sys
from Pegasus.DAX3 import *
import chipathlon
......@@ -10,56 +10,166 @@ from pprint import pprint
class WorkflowJob(object):
def __init__(self, base_yaml, param_file, executable):
def __init__(self, base_yaml, param_file):
"""
:param base_yaml: The yaml job definition file.
:param param_file: The input param file to overwrite job options.
:param executable: A pegasus executable
For more information on the workflow_job formatting,
see doc/diagrams/images/workflow_job.png
"""
# The workflow job yaml is never directly interacted with,
# only through the workflow & workflow module classes.
# As such, errors are communicated through the self.err
# variable.
self.err = ""
self.executable = executable
if os.path.isfile(base_yaml):
if os.path.isfile(param_file):
try:
with open(base_yaml, "r") as rh:
self.base = yaml.load(rh)
self.jobname = self.base.keys()[0]
self.inputs = self.base[self.jobname]["inputs"]
self.outputs = self.base[self.jobname]["outputs"]
except:
self.err += "Error parsing job template yaml file %s.\n" % (base_yaml, )
self.err += traceback.format_exc()
base = yaml.load(rh)
self.job_name = base.keys()[0]
self.job_data = base[self.job_name]
self.command = self.job_data["command"]
except yaml.YAMLError as ye:
self.err += "Error parsing job template yaml file %s.\n" % (base_yaml,)
self.err += ye
try:
with open(param_file, "r") as rh:
params = yaml.load(rh)
if self.jobname in params:
self.params = params[self.jobname]
else:
self.err += "Params file does not contain definition for '%s'.\n" % (self.jobname,)
except:
self.err += "Error parsing params yaml file.\n"
self.err += traceback.format_exc()
if self.job_name in params:
self.params = params[self.job_name]
except yaml.YamlError as ye:
self.err += "Error parsing params yaml file %s.\n" % (param_file,)
self.err += ye
self.raw_files = []
self.validate()
else:
self.err += "Error loading param_file %s, file does not exist." % (param_file,)
else:
self.err += "Error loading base yaml %s, file does not exist." % (base_yaml,)
return
def validate(self):
if not self.err:
try:
self._validate_input()
self._validate_base_yaml()
self._validate_params()
self._validate_arguments()
self._validate_resource_params()
except SystemExit as se:
return
return
def add_executable(self, executable):
self.executable = executable
return
def _raise(self):
if self.err:
raise SystemExit(self.err)
return
def _validate_input(self):
for key in chipathlon.conf.param_keys["required"]:
if key not in self.params:
self.err += "Required key '%s' not defined for job '%s'.\n" % (key, self.jobname)
for key in self.params:
if key not in (chipathlon.conf.param_keys["required"] + chipathlon.conf.param_keys["optional"]):
self.err += "Specified key '%s' does not exist for job '%s'.\n" % (key, self.jobname)
def _check_keys(self, d, keys, error_prefix):
"""
d -- Dictionary to check against
keys -- A dictionary containing required & optional keys:
{"required": [a, b, c], "optional": [d, e, f]}
error_prefix -- Prefix added to error messages.
"""
for key in keys["required"]:
if key not in d:
self.err += "[%s] Required key '%s' not defined for job %s." % (error_prefix, key, self.job_name)
for key in d:
if key not in (keys["required"] + keys["optional"]):
self.err += "[%s] Specified key '%s' is not valid for job %s." % (error_prefix, key, self.job_name)
return
def _validate_base_yaml(self):
"""
Ensure that the yaml job definition file meets the required standards.
"""
self._check_keys(self.job_data, chipathlon.conf.job_keys, "Base Yaml Error")
self._raise()
return
def _validate_params(self):
"""
Ensure that the params file meets the required standards.
"""
self._check_keys(self.params, chipathlon.conf.param_keys, "Param Yaml Error")
self._raise()
return
def _get_arg_value(self, arg_name, arg_info):
"""
Get the argument value.
Changeable implies
changeable = false, has_value = false -> arg_name
changeable = false, has_value = true -> default
changeable = true, param = true -> param
changeable = true, param = false -> default
"""
if argument["changeable"]:
if arg_name in self.params["arguments"]:
return self.params["arguments"][arg_name]
else:
return arg_info["default"]
else:
if arg_info["has_value"]:
return arg_info["default"]
else:
return arg_name
return None
def _is_valid_arg(self, arg_name, arg_info):
"""
type, changeable, required, has_value, default
changeable implies has_value
changeable, has_value, no default implies in params
not changeable implies required
not changeable, has_value, implies required and default
"""
# First we check our implication rules
if arg_info["changeable"]:
if arg_info["has_value"]:
if arg_info["required"] and arg_name not in self.params["arguments"] and "default" not in arg_info:
return (False, "[Argument '%s' Error] Changeable argument does not have default defined for job %s." % (arg_name, self.job_name))
else:
return (False, "[Argument '%s' Error] Changeable argument does not have has_value=True for job %s." % (arg_name, self.job_name))
elif arg_info["required"]:
if arg_info["has_value"] and "default" not in arg_info:
return (False, "[Argument '%s' Error] No defualt value defined for job %s." % (arg_name, self.job_name))
else:
return (False, "[Argument '%s' Error] Unchangeable non-required argument defined for job %s." % (arg_name, self.job_name))
# If all our implication rules pass, we check the values
return self._has_valid_arg_value(arg_name, arg_info)
def _has_valid_arg_value(self, arg_name, arg_info):
"""
If it's numeric make sure the value is appropriate.
If it's a rawfile make sure the input path exists
If it's a file make sure it references a file list.
If it references a file list make sure it's a valid index.
"""
arg_value = self._get_arg_value(arg_name, arg_info)
if arg_info["type"] == "numeric" and not chipathlon.utils.is_number(arg_value):
return (False, "[Argument '%s' Error] Numeric argument has invalid value defined for job %s." % (arg_name, self.job_name))
elif arg_info["type"] in chipathlon.conf.argument_types["file"]:
if arg["type"] == "rawfile" and not os.path.isfile(arg_value):
return (False, "[Argument '%s' Error] Raw file argument does not exist defined for job %s." % (arg_name, self.job_name))
if not any([file_type in arg_value for file_type in ["$inputs.", "$additional_inputs.", "$outputs."]]):
return (False, "[Argument '%s' Error] File argument has invalid value defined for job %s." % (arg_name, self.job_name))
else:
key, index = arg_value[1:].split(".")
if len(index) > arg_info[key]:
return (False, "[Argument '%s' Error] File argument references non-existant file index %s for job %s." % (arg_name, index, self.job_name))
return (True, None)
def _validate_arguments(self):
"""
Here, we loop through each argument in the base_yaml file,
......@@ -70,21 +180,21 @@ class WorkflowJob(object):
'bad' entries
"""
valid_args = []
for argd in self.base[self.jobname]["arguments"]:
arg = argd.keys()[0]
arg_info = argd[arg]
if self.params["arguments"] is not None:
if arg[:1] != "$":
valid_args.append(arg)
if not arg_info["changeable"] and arg in self.params["arguments"]:
self.err += "Unchangeable argument '%s' specified in params file.\n" % (arg,)
if arg_info["required"] and arg_info["changeable"] and "default" not in arg_info:
if arg not in self.params["arguments"]:
self.err += "Required argument '%s' does not have a default, and is not defined in params file.\n" % (arg,)
for arg_info in self.job_data["arguments"]:
arg_name = arg_info.keys()[0]
self._check_keys(arg_info, chipathlon.conf.argument_keys, "Argument '%s' Error" % (arg_name,))
valid_arg, msg = self._is_valid_arg(arg_name, arg_info)
if valid_arg:
valid_args.append(arg_name)
if arg_info["type"] == "rawfile":
arg_value = self._get_arg_value(arg_name, arg_info)
self.raw_files.append({"type": "file", "path": arg_value, "name": os.path.basename(arg_value), "file": ""})
else:
self.err += msg + "\n"
if self.params["arguments"] is not None:
for arg in self.params["arguments"]:
if arg not in valid_args:
self.err += "Argument '%s' specified in params file does not exist.\n" % (arg,)
for arg_name in self.params["arguments"]:
if arg_name not in valid_args:
self.err += "[Param File Error] Argument '%s' specified in params file does not exist.\n" % (arg_name,)
self._raise()
return
......@@ -97,25 +207,33 @@ class WorkflowJob(object):
try:
if not is_number(self.params[resource_type]):
self.err += "Resource specification of type '%s' for '%s' must be numeric.\n" \
% (resource_type, self.jobname)
% (resource_type, self.job_name)
except TypeError:
self.err += "Resource specification of type '%s' for '%s' is missing a value.\n" \
% (resource_type, self.jobname)
% (resource_type, self.job_name)
except KeyError:
pass
self._raise()
return
def _add_raw_files(self, supplied_files):
file_list = []
raw_iter = iter(self.raw_files)
input_iter = iter(supplied_files)
for arg_info in self.job_data["arguments"]:
file_list.append(raw_iter.next() if arg_info["type"] == "rawfile" else input_iter.next())
return file_list
def _check_params(self, inputs, additional_inputs, output_files):
valid_params = True
for param_type, param_list in zip(["inputs", "additional_inputs", "outputs"], [inputs, additional_inputs, output_files]):
if self.base[self.jobname][param_type] is not None:
if (len(param_list) == len(self.base[self.jobname][param_type])):
if self.base[self.job_name][param_type] is not None:
if (len(param_list) == len(self.base[self.job_name][param_type])):
for i, param in enumerate(param_list):
if param["type"] == "file":
# Only extension check against actual files
if not any([param["name"][-len(ext):] == ext if param["type"] == "file" else True for ext in chipathlon.conf.file_extensions[self.base[self.jobname][param_type][i].keys()[0]]]):
self.err += "File '%s' is not of type '%s'. Should match one of '%s' extensions.\n" % (param["name"], self.base[self.jobname][param_type][i].keys()[0], chipathlon.conf.file_extensions[self.base[self.jobname][param_type][i].keys()[0]])
if not any([param["name"][-len(ext):] == ext if param["type"] == "file" else True for ext in chipathlon.conf.file_extensions[self.base[self.job_name][param_type][i].keys()[0]]]):
self.err += "File '%s' is not of type '%s'. Should match one of '%s' extensions.\n" % (param["name"], self.base[self.job_name][param_type][i].keys()[0], chipathlon.conf.file_extensions[self.base[self.job_name][param_type][i].keys()[0]])
valid_params = False
else:
self.err += "Number of file_names '%s' must match number of expected %s files.\n" % (len(param_list), param_type[:-1])
......@@ -128,7 +246,7 @@ class WorkflowJob(object):
the database.
"""
arguments = {}
for arg_dict in self.base[self.jobname]["arguments"]:
for arg_dict in self.base[self.job_name]["arguments"]:
# root key is actual argument name
arg = arg_dict.keys()[0]
arg_info = arg_dict[arg]
......@@ -149,6 +267,7 @@ class WorkflowJob(object):
- output_files should be a list of dictionaries:
[{"name": FILE_NAME, "type": "file", "file": FILE_OBJECT},...]
"""
inputs = self._add_raw_files(inputs)
if self._check_params(inputs, additional_inputs, output_files):
job = Job(self.executable)
for param in inputs + additional_inputs:
......@@ -177,7 +296,7 @@ class WorkflowJob(object):
try:
resource_value = self.params[resource_type]
except KeyError:
resource_value = self.base[self.jobname][resource_type]
resource_value = self.base[self.job_name][resource_type]
ns = chipathlon.conf.resources[resource_type]["namespace"]
key = chipathlon.conf.resources[resource_type]["key"]
job.profile(ns, key, resource_value)
......@@ -201,43 +320,27 @@ class WorkflowJob(object):
}
arg_list = []
# Loop through each argument, which are stored as a list of dicts
for arg_dict in self.base[self.jobname]["arguments"]:
for arg_dict in self.base[self.job_name]["arguments"]:
# root key is actual argument name
arg = arg_dict.keys()[0]
arg_name = arg_dict.keys()[0]
arg_info = arg_dict[arg]
# If argument is changeable, prioritze argument from param file
if arg_info["changeable"] and arg_info["has_value"]: