Commit 33b306b1 authored by aknecht2's avatar aknecht2
Browse files

Job argumnet creation is now actually legible AND works with the new style of...

Job argumnet creation is now actually legible AND works with the new style of inputs/outputs as well as job definition files.
parent edbc4f8d
...@@ -65,6 +65,11 @@ executables = [ ...@@ -65,6 +65,11 @@ executables = [
"chip-job-zcat-peak" "chip-job-zcat-peak"
] ]
# Java needs to have -Xmx specified...
java_tools = [
"gem"
]
# Peak_type validation # Peak_type validation
peak_types = { peak_types = {
"spp": ["narrow", "broad"], "spp": ["narrow", "broad"],
...@@ -136,7 +141,7 @@ resources = { ...@@ -136,7 +141,7 @@ resources = {
argument_types = { argument_types = {
"argument": ["string", "numeric"], "argument": ["string", "numeric"],
"file": ["file", "rawfile", "stdout", "stderr"], "file": ["file", "rawfile", "stdout", "stderr"],
"list": ["list"], "list": ["file_list", "argument_list"],
"folder": ["folder", "rawfolder"] "folder": ["folder", "rawfolder"]
} }
......
...@@ -19,7 +19,7 @@ download_from_encode: ...@@ -19,7 +19,7 @@ download_from_encode:
- "-m": - "-m":
type: string type: string
changeable: false changeable: false
required: true required: false
has_value: true has_value: true
default: $md5 default: $md5
- "-p": - "-p":
......
import unittest import unittest
from chipathlon.workflow_job import WorkflowJob from chipathlon.workflow_job import WorkflowJob
from Pegasus.DAX3 import *
import yaml import yaml
import os import os
...@@ -73,6 +74,96 @@ class WorkflowJobTests(unittest.TestCase): ...@@ -73,6 +74,96 @@ class WorkflowJobTests(unittest.TestCase):
self.assertTrue(job.is_valid(), msg=job.get_error_string()) self.assertTrue(job.is_valid(), msg=job.get_error_string())
return return
def test_create_job_missing_inputs(self):
job = WorkflowJob(
os.path.join(self.base_path, "download_from_encode.yaml"),
{},
debug=True
)
executable = Executable(name="ls", os="linux", arch="x86_64", installed=True)
executable.addPFN(PFN(os.path.join("file://", "/bin/ls"), "local"))
job.add_executable(executable)
pegasus_job = job.create_job(
{
"md5": {
"name": "md5",
"type": "string",
"value": "asdfasdf"
}
},
{
"downloaded_file": {
"name": "full-name.fastq",
"type": "file",
"transfer": True,
"file": File("full-name.fastq")
}
}
)
self.assertEqual(pegasus_job, None, msg=job.get_error_string())
return
def test_create_job_missing_outputs(self):
job = WorkflowJob(
os.path.join(self.base_path, "download_from_encode.yaml"),
{},
debug=True
)
executable = Executable(name="ls", os="linux", arch="x86_64", installed=True)
executable.addPFN(PFN(os.path.join("file://", "/bin/ls"), "local"))
job.add_executable(executable)
pegasus_job = job.create_job(
{
"url": {
"name": "url",
"type": "string",
"value": "some.encode.url.org/asdf"
},
"md5": {
"name": "md5",
"type": "string",
"value": "asdfasdf"
}
},
{}
)
self.assertEqual(pegasus_job, None, msg=job.get_error_string())
return
def test_create_job_valid(self):
job = WorkflowJob(
os.path.join(self.base_path, "download_from_encode.yaml"),
{},
debug=True
)
executable = Executable(name="ls", os="linux", arch="x86_64", installed=True)
executable.addPFN(PFN(os.path.join("file://", "/bin/ls"), "local"))
job.add_executable(executable)
pegasus_job = job.create_job(
{
"url": {
"name": "url",
"type": "string",
"value": "some.encode.url.org/asdf"
},
"md5": {
"name": "md5",
"type": "string",
"value": "asdfasdf"
}
},
{
"downloaded_file": {
"name": "full-name.fastq",
"type": "file",
"transfer": True,
"file": File("full-name.fastq")
}
}
)
self.assertEqual(pegasus_job, None, msg=job.get_error_string())
return
if __name__ == "__main__": if __name__ == "__main__":
unittest.TextTestRunner(verbosity=2).run( unittest.TextTestRunner(verbosity=2).run(
unittest.TestLoader().loadTestsFromTestCase(WorkflowJobTests) unittest.TestLoader().loadTestsFromTestCase(WorkflowJobTests)
......
...@@ -204,8 +204,8 @@ class WorkflowJob(object): ...@@ -204,8 +204,8 @@ class WorkflowJob(object):
Checks if an argument is valid. Argument descriptors have the following Checks if an argument is valid. Argument descriptors have the following
implication rules: implication rules:
changeable & has_value & no default => must be in params changeable & has_value & no default => must be in params
not changeable => required
not changeable & has_value => required and default not changeable & has_value => required and default
not changeable & not_required => Optional argument, but not passed in from params
""" """
# First we check our implication rules # First we check our implication rules
if arg_info["changeable"]: if arg_info["changeable"]:
...@@ -214,9 +214,6 @@ class WorkflowJob(object): ...@@ -214,9 +214,6 @@ class WorkflowJob(object):
elif arg_info["required"]: elif arg_info["required"]:
if arg_info["has_value"] and "default" not in arg_info: if arg_info["has_value"] and "default" not in arg_info:
return (False, "[Error parsing %s job file] Argument '%s' is unchangeable and required but has no default." % (self.job_name, arg_name)) return (False, "[Error parsing %s job file] Argument '%s' is unchangeable and required but has no default." % (self.job_name, arg_name))
else:
return (False, "[Error parsing %s job file] Argument '%s' is not required but is unchangeable. \
Either set argument to required, or allow it to be changeable." % (self.job_name, arg_name))
# If all our implication rules pass, we check the values # If all our implication rules pass, we check the values
return self._has_valid_arg_value(arg_name, arg_info) return self._has_valid_arg_value(arg_name, arg_info)
...@@ -356,42 +353,43 @@ class WorkflowJob(object): ...@@ -356,42 +353,43 @@ class WorkflowJob(object):
passed in should match the job definition, and files should have passed in should match the job definition, and files should have
correctly matching extension. correctly matching extension.
""" """
for param_type, param_dict in zip(["inputs", "outputs"], [inputs, outputs]): for param_type, passed_param_dict in zip(["inputs", "outputs"], [inputs, outputs]):
# In theory there could be a job that doesn't need inputs... # In theory there could be a job that doesn't need inputs...
if self.job_data[param_type] is not None: if self.job_data[param_type] is not None:
# Loop through all required params to make sure they are provided # Loop through all required params to make sure they are provided
for param_name, param_info in self.job_data[param_type].iteritems(): for param_name, base_param_info in self.job_data[param_type].iteritems():
if self._is_param_required(param_name): if self._is_param_required(param_name):
if param_name in param_dict: if param_name in passed_param_dict:
if param_info["type"] in chipathlon.conf.argument_types["file"]: if base_param_info["type"] in chipathlon.conf.argument_types["file"]:
# We need to check the FULL file name of the passed in file to # We need to check the FULL file name of the passed in file to
# validate the extensions correctly. # validate the extensions correctly.
if not param_info["name"].endswith(tuple(chipathlon.conf.file_extensions[param_info["file_type"]])): if not passed_param_dict[param_name]["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])):
self.errors.append("Job creation error %s: File %s specified for param %s is not of \ self.errors.append("Job creation error %s: File %s specified for param %s is not of \
type %s. Should match one of these extensions: %s." % ( type %s. Should match one of these extensions: %s." % (
self, self,
param_info["name"], passed_param_dict[param_name]["name"],
param_name, param_name,
param_info["file_type"], base_param_info["file_type"],
chipathlon.conf.file_extensions[param_info["file_type"]] chipathlon.conf.file_extensions[base_param_info["file_type"]]
) )
) )
else: else:
self.errors.append("Job creation error %s: %s is a required %s paramater, but was not passed in." \ self.errors.append("Job creation error %s: %s is a required %s paramater, but was not passed in." \
% (self, param_name, param_type)) % (self, param_name, param_type))
# Now we loop through all passed in params to make sure they are valid # Now we loop through all passed in params to make sure they are valid
for param_name, param_info in param_dict.iteritems(): for param_name, passed_param_info in passed_param_dict.iteritems():
if param_name in self.job_data[param_type]: if param_name in self.job_data[param_type]:
base_param_info = self.job_data[param_type][param_name]
if self.job_data[param_type][param_name]["type"] in chipathlon.conf.argument_types["file"]: if self.job_data[param_type][param_name]["type"] in chipathlon.conf.argument_types["file"]:
# We need to check the FULL file name of the passed in file to # We need to check the FULL file name of the passed in file to
# validate the extensions correctly. # validate the extensions correctly.
if not param_info["name"].endswith(tuple(chipathlon.conf.file_extensions[self.job_data[param_type][param_name]["file_type"]])): if not passed_param_info["name"].endswith(tuple(chipathlon.conf.file_extensions[base_param_info["file_type"]])):
self.errors.append("Job creation error %s: File %s specified for param %s is not of \ self.errors.append("Job creation error %s: File %s specified for param %s is not of \
type %s. Should match one of these extensions: %s." % ( type %s. Should match one of these extensions: %s." % (
self, self,
param, param,
param_name, param_name,
param_info["file_type"], base_param_info["file_type"],
chipathlon.conf.file_extensions[param_info["file_type"]] chipathlon.conf.file_extensions[param_info["file_type"]]
) )
) )
...@@ -426,6 +424,12 @@ class WorkflowJob(object): ...@@ -426,6 +424,12 @@ class WorkflowJob(object):
"type": "argument", "type": "argument",
"value": argument-value "value": argument-value
}, },
"param-name": {
"name": "full-file-name",
"type": "file_list",
"value": [
]
... ...
} }
...@@ -443,19 +447,21 @@ class WorkflowJob(object): ...@@ -443,19 +447,21 @@ class WorkflowJob(object):
""" """
if self._params_are_valid(inputs,outputs): if self._params_are_valid(inputs,outputs):
job = Job(self.executable) job = Job(self.executable)
for param in inputs: for param_name, param_info in inputs.iteritems():
if param["type"] == "file": if param_info["type"] == "file_list":
job.uses(param["file"], link=Link.INPUT) for f in param_info["value"]:
elif param["type"] == "list":
for f in param["value"]:
job.uses(f["file"], link=Link.INPUT) job.uses(f["file"], link=Link.INPUT)
for output_file in outputs: elif param_info["type"] in chipathlon.conf.argument_types["file"]:
job.uses(output_file["file"], link=Link.OUTPUT, transfer=output_file["transfer"]) job.uses(param_info["file"], link=Link.INPUT)
for output_name, output_info in outputs.iteritems():
job.uses(output_info["file"], link=Link.OUTPUT, transfer=output_info["transfer"])
# Redirect stdout / stderr # Redirect stdout / stderr
if output_file["type"] == "stdout": if output_info["type"] == "stdout":
job.setStdout(output_file["name"]) job.setStdout(output_file["name"])
elif output_file["type"] == "stderr": elif output_info["type"] == "stderr":
job.setStderr(output_file["name"]) job.setStderr(output_file["name"])
arg_list = self._create_arg_list(inputs, outputs)
print "Created arg_list: %s" % (arg_list,)
job.addArguments(*self._create_arg_list(inputs, outputs)) job.addArguments(*self._create_arg_list(inputs, outputs))
self._add_job_resources(job) self._add_job_resources(job)
return job return job
...@@ -482,77 +488,78 @@ class WorkflowJob(object): ...@@ -482,77 +488,78 @@ class WorkflowJob(object):
job.profile(ns, key, resource_value) job.profile(ns, key, resource_value)
return return
def _interpolate_value(self, inputs, outputs, arg_name, arg_info):
"""
:param inputs: Input params for the job.
:type inputs: dict
:param outputs: Output params for the job.
:type outputs: dict
:param arg_name: The name of the arg to convert.
:type arg_name: str
:param arg_info: Argument info from the yaml.
:type arg_info: dict
This helper method converts an argument into its proper value, meaning
it can convert a reference string like "$target_file" into an actual
Pegasus File instance, or interpolate the proper arguments that are
passed in.
"""
arg_value = self._get_arg_value(arg_name, arg_info)
add_value = arg_value
if arg_value.startswith("$"):
arg_value = arg_value[1:]
if arg_info["type"] == "rawfile":
add_value = self.raw_files[os.path.basename(arg_value)]["file"]
elif arg_info["type"] == "file_list":
# Lists can only be loaded from inputs
add_value = []
for file_dict in inputs[arg_value]["values"]:
add_value.append(file_dict["file"])
elif arg_info["type"] in chipathlon.conf.argument_types["file"]:
add_value = (inputs if arg_value in inputs else outputs)[arg_value]["file"]
else:
add_value = inputs[arg_value]["value"]
return add_value
def _create_arg_list(self, inputs, outputs): def _create_arg_list(self, inputs, outputs):
""" """
:param inputs: Input params for the job. :param inputs: Input params for the job.
:type inputs: list :type inputs: dict
:param outputs: Output params for the job. :param outputs: Output params for the job.
:type outputs: list :type outputs: dict
- inputs should be a list of dictionaries: Inputs / outputs should be dictionaries like the ones described in
[{"name": FILE_NAME, "type": "file", "file": FILE_OBJECT}, the create_job documentation.
{"name": ARGUMENT_NAME, "type": "argument", "value": ARGUMENT_VALUE},..]
- outputs should be a list of dictionaries:
[{"name": FILE_NAME, "type": "file", "file": FILE_OBJECT},...]
This function creates the necessary argument list for pegasus. This function creates the necessary argument list for pegasus.
This includes all arguments the executable needs to run, This includes all arguments the executable needs to run,
and the correct file objects passed in the correct order. and the correct file objects passed in the correct order.
To add arguments to the pegasus job use: job.addArguments(*arg_list) To add arguments to the pegasus job use: job.addArguments(*arg_list)
""" """
argument_map = {
"$inputs": inputs,
"$outputs": outputs
}
arg_list = [] arg_list = []
if self.executable == "gem": # Java tools need to have -Xmx specified...
if self.executable in chipathlon.conf.java_tools:
arg_list.append("-Xmx%sM" % (self.params.get("memory", self.job_data["memory"]))) arg_list.append("-Xmx%sM" % (self.params.get("memory", self.job_data["memory"])))
has_args = re.compile("\$((in)|(out))puts\.[0-9]") # Go through each argument
# Loop through each argument, which are stored as a list of dicts
for arg_dict in self.job_data["arguments"]: for arg_dict in self.job_data["arguments"]:
# root key is actual argument name
arg_name = arg_dict.keys()[0] arg_name = arg_dict.keys()[0]
arg_info = arg_dict[arg_name] arg_info = arg_dict[arg_name]
arg_value = self._get_arg_value(arg_name, arg_info) # Need to figure out 2 things:
add_name = (arg_name != arg_value) # 1. Should we add the argument name?
if arg_info["type"] == "rawfile": # 2. What's the correct value to add?
add_value = self.raw_files[os.path.basename(arg_value)]["file"] add_value = self._interpolate_value(inputs, outputs, arg_name, arg_info)
if not arg_info["has_value"]: # Now we actually add the argument
add_name = False if arg_info["has_value"]:
elif isinstance(arg_value, list): # If it's a file we want to add the actual Pegasus.File instance
add_value = []
for i, fname in enumerate(arg_value):
if "$inputs" in fname:
index = int(fname.split(".")[1])
add_value.append(argument_map["$inputs"][index]["file"])
else:
add_value.append(fname)
elif isinstance(arg_value, str) and has_args.search(arg_value) is not None:
search_group = has_args.search(arg_value).group(0)
arg_type, index = search_group.split(".")
index = int(index)
if arg_type == "$inputs":
if argument_map[arg_type][index]["type"] == "file":
add_value = argument_map[arg_type][index]["file"]
elif argument_map[arg_type][index]["type"] in chipathlon.conf.argument_types["argument"]:
add_value = arg_value
for i in range(0, len(argument_map[arg_type])):
if argument_map["$inputs"][i]["type"] in chipathlon.conf.argument_types["argument"]:
# replace is NOT in place!
add_value = add_value.replace("$inputs.%s" % (i,), str(argument_map["$inputs"][i]["value"]))
elif arg_type == "$outputs":
if argument_map[arg_type][index]["type"] == "file":
add_value = argument_map[arg_type][index]["file"]
else:
add_value = arg_value
if add_name:
if isinstance(add_value, File): if isinstance(add_value, File):
arg_list.append(arg_name) arg_list.append(arg_name)
arg_list.append(add_value) arg_list.append(add_value)
# If it's a list we want to add each of the Pegasus.File instances
elif isinstance(add_value, list): elif isinstance(add_value, list):
arg_list.append(arg_name) arg_list.append(arg_name)
for f in add_value: for f in add_value:
arg_list.append(f) arg_list.append(f)
# Otherwise, add stuff as a string
else: else:
arg_list.append("%s %s" % (arg_name, add_value)) arg_list.append("%s %s" % (arg_name, add_value))
else: else:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment