Commit 28e76d80 authored by aknecht2's avatar aknecht2
Browse files

Updated stdout & stderr redirection. Includes adjustments in individual job...

Updated stdout & stderr redirection.  Includes adjustments in individual job yaml files, module yaml files, and workflow generation handling.
parent 685e02bf
......@@ -43,6 +43,16 @@ resources = {
}
}
# Defines the types of input / output arguments
# argument -> Any non file argument
# file -> Any file argument that isn't redirected
# stdout -> Any file argument that is redirected from stdout
# stderr -> Any file argument that is redirected from stderr
argument_types = {
"argument": ["argument"],
"file": ["file", "stdout", "stderr"]
]
# param keys
param_keys = {
"required": ["arguments"],
......
......@@ -65,7 +65,7 @@ align:
type: file
outputs:
- align.sam:
type: file
type: stdout
bowtie2[tool]:
- single[read_end]:
- bowtie2_align_single:
......@@ -91,9 +91,9 @@ align:
type: file
outputs:
- align.sam:
type: file
type: stdout
- align.quality:
type: file
type: stderr
paired[read_end]:
- bowtie2_align_paired:
inputs:
......@@ -120,9 +120,9 @@ align:
type: file
outputs:
- align.sam:
type: file
type: stdout
- align.quality:
type: file
type: stderr
- samtools_sam_to_bam:
inputs:
- align.sam:
......
......@@ -32,4 +32,4 @@ remove_duplicates:
additional_inputs: null
outputs:
- no_dups.bed:
type: file
type: stdout
......@@ -5,7 +5,7 @@ bedtools_bam_to_bed:
additional_inputs: null
outputs:
- bed:
type: file
type: stdout
command: bedtools
arguments:
- "bamtobed":
......@@ -17,11 +17,6 @@ bedtools_bam_to_bed:
required: true
has_value: true
default: $inputs.0
- ">":
changeable: false
required: true
has_value: true
default: $outputs.0
walltime: 2000
memory: 2000
cores: 1
......@@ -25,7 +25,7 @@ bowtie2_align_paired:
- sam:
type: file
- quality:
type: file
type: stderr
command: bowtie2
arguments:
- "-x":
......@@ -53,11 +53,6 @@ bowtie2_align_paired:
required: false
has_value: true
default: 8
- "2>":
changeable: false
required: true
has_value: true
default: $outputs.1
walltime: 2000
memory: 2000
cores: 8
......@@ -23,7 +23,7 @@ bowtie2_align_single:
- sam:
type: file
- quality:
type: file
type: stderr
command: bowtie2
arguments:
- "-x":
......@@ -46,11 +46,6 @@ bowtie2_align_single:
required: false
has_value: true
default: 8
- "2>":
changeable: false
required: true
has_value: true
default: $outputs.1
walltime: 2000
memory: 2000
cores: 8
......@@ -19,7 +19,7 @@ bwa_align_paired:
type: file
outputs:
- sam:
type: file
type: stderr
command: bwa
arguments:
- "mem":
......@@ -47,11 +47,6 @@ bwa_align_paired:
changeable: false
required: true
has_value: false
- "2>":
changeable: false
required: true
has_value: true
default: $outputs.0
walltime: 2000
memory: 2000
cores: 8
......@@ -139,6 +139,11 @@ class WorkflowJob(object):
job.uses(param["file"], link=Link.INPUT)
for output_file in output_files:
job.uses(output_file["file"], link=Link.OUTPUT, transfer=True)
# Redirect stdout / stderr
if output_file["type"] == "stdout":
job.setStdout(output_file["name"])
elif output_file["type"] == "stderr":
job.setStderr(output_file["name"])
job.addArguments(*self._create_arg_list(inputs, output_files))
self._add_job_resources(job)
return job
......
......@@ -9,6 +9,7 @@ import yaml
import traceback
import re
import itertools
import chipathlon.conf
from pprint import pprint
from Pegasus.DAX3 import *
......@@ -102,6 +103,8 @@ class WorkflowModule(object):
return
def _check_input_markers(self, markers):
# Ensures that valid markers are passed in before trying to generate
# a module workflow.
valid = True
msg = ""
for marker in markers:
......@@ -127,7 +130,8 @@ class WorkflowModule(object):
for param_dict in job_info[param_type]:
param_name = param_dict.keys()[0]
param = param_dict[param_name]
if param["type"] == "file":
# Make sure param is one of the file types
if param["type"] in conf.argument_types["file"]:
if param_name not in [x["name"] for x in param_list] and param_name not in [x["name"] for x in output_files]:
if prefix is None or self._get_full_name(prefix, markers, param_name) not in master_files:
param_list.append({"name": param_name, "type": "file"})
......@@ -152,12 +156,23 @@ class WorkflowModule(object):
return
def _setup_param_list(self, master_files, job_info, param_type, param_dict, prefix, markers):
"""
master_files -- A dictionary that maps filename -> File object
job_info -- A specific job dictionary from the workflow_module yaml file
param_type -- Should be inputs, additional_inputs, or outputs
param_dict -- Should be a dictionary mapping workflow_module file
names to master_file list names i.e.
align.sam -> ENCSR000GZ_FOO_bwa.._align.sam
prefix -- The module prefix created from the markers of all
Previous modules run up to that point.
markers -- Which splits to take within the workflow
"""
param_list = []
if job_info[param_type] is not None:
for job_dict in job_info[param_type]:
param_name = job_dict.keys()[0]
param = job_dict[param_name]
if param["type"] == "file":
if param["type"] in conf.argument_types["file"]:
full_name = self._get_full_name(prefix, markers, param_name)
# If file is an output file, add to master_files
if param_type == "outputs":
......@@ -165,7 +180,7 @@ class WorkflowModule(object):
param_list.append({
"name": full_name if full_name in master_files else param_dict[param_name],
"file": master_files[full_name] if full_name in master_files else master_files[param_dict[param_name]],
"type": "file"
"type": param["type"]
})
else:
param_list.append({
......@@ -218,7 +233,7 @@ class WorkflowModule(object):
}
data = module_params
for arg_params, param_key in zip([inputs, additional_files], ["inputs", "additional_files"]):
for file_dict in [file_param for file_param in module_params[param_key] if file_param["type"] == "file"]:
for file_dict in [file_param for file_param in module_params[param_key] if file_param["type"] in conf.argument_types["file"]]:
file_name = file_dict["name"]
if file_name not in module_params[param_key]:
valid_params = False
......@@ -227,7 +242,7 @@ class WorkflowModule(object):
valid_params = False
msg += "Error loading '%s' jobs. File with name '%s', value '%s', does not exist in master_files.\n" % (self.name, file_name, arg_params[file_name])
for param_dict in module_params[param_key]:
if param_dict["type"] == "file":
if param_dict["type"] in conf.argument_types["file"]:
file_name = param_dict["name"]
if file_name not in arg_params:
valid_params = False
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment