Select Git revision
mapReduce.js
Brady James Garvin authored
mapReduce.js 1.54 KiB
import { Worker } from 'worker_threads';
class ModuleWorker extends Worker {
constructor(path) {
const code = `import '${new URL(path, import.meta.url).href}';`;
super(new URL(`data:text/javascript,${encodeURIComponent(code)}`));
}
dispatch(data) {
this.postMessage(data);
return new Promise((resolve) => this.once('message', resolve));
}
}
function iterativeReduce(identityElement, combineMonoidElements, sequence) {
let result = identityElement;
for (const element of sequence) {
result = combineMonoidElements(result, element);
}
return result;
}
export async function parallelMapReduce(proceduresURL, sequence, workerCount) {
console.assert(workerCount > 0, `Cannot distribute work to a nonpositive (${workerCount}) number of threads.`);
const {
IDENTITY_ELEMENT,
combineMonoidElements,
} = await import(proceduresURL);
const workers = [...Array(workerCount)].map(() => new ModuleWorker('./worker.js'));
const jobs = workers.map((worker, workerIndex) => {
const low = Math.floor(workerIndex * sequence.length / workerCount);
const high = Math.floor((workerIndex + 1) * sequence.length / workerCount);
return worker.dispatch({
proceduresURL,
data: [...sequence.slice(low, high)],
});
});
const threadResults = (await Promise.all(jobs)).map(
(element) => Object.assign(Object.create(Object.getPrototypeOf(IDENTITY_ELEMENT)), element),
);
for (const worker of workers) {
worker.terminate();
}
return iterativeReduce(IDENTITY_ELEMENT, combineMonoidElements, threadResults);
}