Skip to content
Snippets Groups Projects
Select Git revision
  • b53fa2b3b05741f88dacb35ccfd73651b6f9f191
  • main default protected
2 results

mapReduce.js

Blame
  • mapReduce.js 1.54 KiB
    import { Worker } from 'worker_threads';
    
    class ModuleWorker extends Worker {
      constructor(path) {
        const code = `import '${new URL(path, import.meta.url).href}';`;
        super(new URL(`data:text/javascript,${encodeURIComponent(code)}`));
      }
    
      dispatch(data) {
        this.postMessage(data);
        return new Promise((resolve) => this.once('message', resolve));
      }
    }
    
    function iterativeReduce(identityElement, combineMonoidElements, sequence) {
      let result = identityElement;
      for (const element of sequence) {
        result = combineMonoidElements(result, element);
      }
      return result;
    }
    
    export async function parallelMapReduce(proceduresURL, sequence, workerCount) {
      console.assert(workerCount > 0, `Cannot distribute work to a nonpositive (${workerCount}) number of threads.`);
      const {
        IDENTITY_ELEMENT,
        combineMonoidElements,
      } = await import(proceduresURL);
      const workers = [...Array(workerCount)].map(() => new ModuleWorker('./worker.js'));
      const jobs = workers.map((worker, workerIndex) => {
        const low = Math.floor(workerIndex * sequence.length / workerCount);
        const high = Math.floor((workerIndex + 1) * sequence.length / workerCount);
        return worker.dispatch({
          proceduresURL,
          data: [...sequence.slice(low, high)],
        });
      });
      const threadResults = (await Promise.all(jobs)).map(
        (element) => Object.assign(Object.create(Object.getPrototypeOf(IDENTITY_ELEMENT)), element),
      );
      for (const worker of workers) {
        worker.terminate();
      }
      return iterativeReduce(IDENTITY_ELEMENT, combineMonoidElements, threadResults);
    }