Implementing a Worker Pool for Asynchronous Tasks in Jest
This challenge focuses on building a robust worker pool mechanism to manage and execute a set of asynchronous tasks efficiently. You'll be tasked with creating a system that can distribute work among a fixed number of workers, handle task completion and errors, and manage the overall lifecycle of the pool. This is a fundamental pattern for improving performance and resource utilization in applications dealing with concurrent operations.
Problem Description
You need to implement a WorkerPool class in TypeScript that manages a pool of worker processes (simulated as asynchronous functions for this challenge) to execute a series of tasks. The WorkerPool should:
- Limit Concurrency: Execute a maximum of
maxWorkerstasks concurrently. - Queue Tasks: If more tasks are submitted than can be run concurrently, they should be queued and executed as workers become available.
- Handle Task Results: Successfully completed tasks should have their results returned.
- Handle Task Errors: Uncaught errors thrown by tasks should be caught and reported.
- Graceful Shutdown: Provide a method to gracefully shut down the pool, waiting for all currently running and queued tasks to complete before resolving.
For this challenge, "workers" will be represented by asynchronous functions that simulate work (e.g., using setTimeout).
Examples
Example 1: Basic Task Execution
// Assume WorkerPool is imported and initialized
const pool = new WorkerPool(2); // Max 2 workers
const task1 = () => new Promise(resolve => setTimeout(() => resolve('Result 1'), 100));
const task2 = () => new Promise(resolve => setTimeout(() => resolve('Result 2'), 50));
const task3 = () => new Promise(resolve => setTimeout(() => resolve('Result 3'), 75));
const results = await pool.run([task1, task2, task3]);
// Expected Output: ['Result 1', 'Result 2', 'Result 3']
// The order of results in the array should match the order of tasks submitted.
// task2 and task3 might run concurrently, and task1 starts after one of them finishes.
Example 2: Handling Task Errors
const pool = new WorkerPool(1); // Max 1 worker
const successfulTask = () => new Promise(resolve => setTimeout(() => resolve('Success'), 50));
const failingTask = () => new Promise((_, reject) => setTimeout(() => reject(new Error('Task Failed')), 100));
const anotherSuccessfulTask = () => new Promise(resolve => setTimeout(() => resolve('Another Success'), 75));
try {
await pool.run([successfulTask, failingTask, anotherSuccessfulTask]);
} catch (error) {
// Expected Output: Error: Task Failed
// The pool should reject when any task within the run() call rejects.
// The promise returned by run() should reject with the error of the first failing task.
}
Example 3: Graceful Shutdown
const pool = new WorkerPool(2); // Max 2 workers
const longTask1 = () => new Promise(resolve => setTimeout(() => resolve('Long Task 1 Done'), 500));
const longTask2 = () => new Promise(resolve => setTimeout(() => resolve('Long Task 2 Done'), 600));
const shortTask = () => new Promise(resolve => setTimeout(() => resolve('Short Task Done'), 100));
// Submit tasks
const taskPromises = pool.run([longTask1, longTask2, shortTask]);
// Immediately after submitting, initiate shutdown
await pool.shutdown();
// The results from taskPromises should eventually resolve,
// even though shutdown was called. The shutdown ensures all submitted tasks finish.
// Expected Output: ['Long Task 1 Done', 'Long Task 2 Done', 'Short Task Done']
Constraints
- The
WorkerPoolclass should acceptmaxWorkers(a positive integer) in its constructor. - Tasks are represented as functions that return a
Promise. - The
runmethod should accept an array of task functions and return aPromisethat resolves with an array of results in the same order as the input tasks, or rejects with the error of the first task that fails. - The
shutdownmethod should return aPromisethat resolves when all tasks (currently running and queued) have completed. - The pool should not start new tasks if
shutdownhas been called.
Notes
- Consider how to manage the queue of pending tasks.
- Think about how to track which worker is executing which task and when a worker becomes free.
- The
runmethod should collect results in the correct order. You might need to store results in an array indexed by the original task position. - For the
shutdownmethod, ensure that any tasks already submitted to the pool (even if not yet started) are processed before the shutdown promise resolves. - You can simulate the "worker" behavior by simply invoking the task function and handling its promise. The challenge is in orchestrating these invocations.