Commit cf52cf6d authored by aknecht2's avatar aknecht2
Browse files

Updated input data handling, added paired end read handling.

parent 831a5b33
......@@ -234,6 +234,22 @@ class Workflow(object):
self.err += "DB error: %s.\n" % (msg, )
return
def _make_set(self, file_list):
return_data = []
find = {}
for f in file_list:
if "paired_with" in f:
if f["@id"] in find:
return_data.append(
(f, find[f["@id"]]) if f["paired_end"] == 1
else (find[f["@id"]], f)
)
else:
find[f["paired_with"]] = f
else:
return_data.append((f,))
return return_data
def _add_download(self):
# Remember, experiment always paired with randomly selected control,
# For alignment doesn't really matter, but we set up the groundwork
......@@ -241,25 +257,29 @@ class Workflow(object):
# equal number of control and experiment files
for run in self.run_data["runs"]:
run["input_sets"] = []
if (len(run["data"]["control"]) != len(run["data"]["experiment"])):
control_data = [random.choice(run["data"]["control"]) for i in range(0, len(run["data"]["experiment"]))]
experiment_data = self._make_set(run["data"]["experiment"])
control_base = self._make_set(run["data"]["control"])
if len(experiment_data) < len(control_base):
control_data = [random.choice(control_base) for i in range(0, len(experiment_data))]
elif len(experiment_data) > len(control_base):
control_data = random.shuffle(control_base) + [random.choice(control_base) for i in range(0, len(experiment_data) - len(control_base))]
else:
control_data = run["data"]["control"]
for pair in zip(run["data"]["experiment"], control_data):
exp_name = "%s_%s.fastq.gz" % (run["experiment"], pair[0]["accession"])
control_name = "%s_%s.fastq.gz" % (run["experiment"], pair[1]["accession"])
if exp_name not in self.files and control_name not in self.files:
control_data = control_base
random.shuffle(control_data)
for pair in zip(experiment_data, control_data):
append_pair = False
for treatment in pair:
for f in treatment:
name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
if name not in self.files:
append_pair = True
self._add_file(name, "http://" + f["url"], "dummylocal")
if append_pair:
run["input_sets"].append(pair)
for f in pair:
# For each fastq, have Pegasus do the http fetching.
# Files are named EXPID_ACCESSION.fastq.gz
output_name = "%s_%s.fastq.gz" % (run["experiment"], f["accession"])
self._add_file(output_name, "http://" + f["url"], "dummylocal")
return
def _add_align(self):
markers = {}
markers["read_end"] = "single"
for run in self.run_data["runs"]:
input_files = {}
additional_files = {}
......@@ -269,10 +289,18 @@ class Workflow(object):
for ext in chipathlon.conf.genomes[run["align"]]["additional_files"]:
additional_files["ref_genome." + ext] = self.run_data["genomes"][run["align"]]["grch38p6_files"]["additional_files"][ext]
for pair in run["input_sets"]:
for f in pair:
prefix = "%s_%s" % (run["experiment"], f["accession"])
input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
for file_tuple in pair:
if len(file_tuple) == 2:
markers["read_end"] = "paired"
prefix = "%s_%s_%s" % (run["experiment"], file_tuple[0]["accession"], file_tuple[1]["accession"])
input_files["download_1.fastq"] = "%s_%s.fastq.gz" % (run["experiment"], file_tuple[0]["accession"])
input_files["download_2.fastq"] = "%s_%s.fastq.gz" % (run["experiment"], file_tuple[1]["accession"])
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
else:
markers["read_end"] = "single"
prefix = "%s_%s" % (run["experiment"], f["accession"])
input_files["download_1.fastq"] = "%s.fastq.gz" % (prefix,)
self.modules["align"].add_jobs(self.dax, self.jobs, self.files, prefix, markers, input_files, additional_files)
return
def _create_setup(self):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment