Hone logo
Hone
Problems

Parallel Task Execution with a Process Pool

Modern applications often need to perform computationally intensive tasks. To speed up these tasks, we can leverage multi-core processors by executing them in parallel. This challenge requires you to create a system that manages a pool of worker processes to execute tasks concurrently, significantly reducing the overall execution time for CPU-bound operations.

Problem Description

Your task is to implement a ProcessPool class in Python. This class should allow users to submit functions and their arguments to be executed in parallel across multiple worker processes. The ProcessPool should manage the lifecycle of these worker processes, ensuring they are created, utilized, and properly terminated.

Key Requirements:

  • Initialization: The ProcessPool should be initialized with a specified number of worker processes. If no number is provided, it should default to the number of CPU cores available on the system.
  • Task Submission: A method submit(func, *args, **kwargs) should allow users to submit a callable func along with its arguments (*args and **kwargs) to be executed by a worker process. This method should return a future-like object (you can simulate this with a simple object containing the result or an exception).
  • Result Retrieval: The future-like object returned by submit should have a get() method that blocks until the result of the task is available and then returns the result. If the task raised an exception, get() should re-raise that exception.
  • Process Management: The ProcessPool should handle the creation and termination of worker processes. A close() method should be implemented to gracefully shut down all worker processes.
  • Task Distribution: Tasks should be distributed among available worker processes. When a worker finishes a task, it should become available to pick up a new one.
  • Error Handling: Exceptions raised by the submitted functions should be captured and re-raised when get() is called on the corresponding future.

Expected Behavior:

The ProcessPool should behave like a simplified version of Python's built-in multiprocessing.Pool. When submit is called, a task is queued. Worker processes pick up tasks from the queue, execute them, and place the results back. The get() method on the returned future allows retrieval of these results.

Important Edge Cases:

  • No available workers: If all workers are busy and a new task is submitted, it should wait until a worker becomes free.
  • Exceptions in tasks: Ensure that exceptions occurring within the worker process are properly caught and propagated back to the main process.
  • Pool closure: The close() method should wait for all currently running tasks to complete before terminating the worker processes.

Examples

Example 1:

import time

def square(x):
    time.sleep(0.1) # Simulate work
    return x * x

pool = ProcessPool(num_workers=2)
futures = []
for i in range(5):
    futures.append(pool.submit(square, i))

results = []
for future in futures:
    results.append(future.get())

pool.close()
print(results)
Output: [0, 1, 4, 9, 16]

Explanation: Five tasks to calculate the square of numbers 0 through 4 are submitted to a process pool with 2 workers. The results are retrieved and printed in the order they were submitted.

Example 2:

import time

def divide(a, b):
    time.sleep(0.1)
    return a / b

pool = ProcessPool(num_workers=2)
future1 = pool.submit(divide, 10, 2)
future2 = pool.submit(divide, 5, 0) # This will raise a ZeroDivisionError

try:
    result1 = future1.get()
    print(f"Result 1: {result1}")
    result2 = future2.get() # This will raise the exception
except ZeroDivisionError as e:
    print(f"Caught expected error: {e}")

pool.close()
Output:
Result 1: 5.0
Caught expected error: division by zero

Explanation: Two tasks are submitted. The first is successful, and its result is retrieved. The second task raises a ZeroDivisionError, which is caught when future2.get() is called.

Example 3: (Demonstrating default worker count)

import multiprocessing
import time

def greet(name):
    time.sleep(0.05)
    return f"Hello, {name}!"

# Assuming your system has at least 4 CPU cores
pool = ProcessPool() # Defaults to multiprocessing.cpu_count()
futures = []
for name in ["Alice", "Bob", "Charlie", "David"]:
    futures.append(pool.submit(greet, name))

results = [future.get() for future in futures]
pool.close()
print(sorted(results)) # Sorting for predictable output order
Output: ['Hello, Alice!', 'Hello, Bob!', 'Hello, Charlie!', 'Hello, David!']

Explanation: A process pool is created without specifying the number of workers, so it defaults to the number of CPU cores. Several greeting tasks are submitted and their results are collected.

Constraints

  • The number of worker processes (num_workers) will be a positive integer between 1 and 16.
  • The input to submit will consist of a valid Python callable and its arguments.
  • The functions submitted to the pool should be picklable.
  • Your implementation should not rely on external libraries beyond Python's standard library (specifically multiprocessing and queue).
  • For testing purposes, the total number of tasks submitted will not exceed 100.

Notes

  • Consider using multiprocessing.Process for creating worker processes.
  • multiprocessing.Queue can be useful for communication between the main process and worker processes (e.g., for task distribution and result collection).
  • You'll need a way for worker processes to signal completion or errors.
  • Think about how to manage the state of each task (e.g., pending, running, completed, failed).
  • The future-like object can be a simple class that stores the result or exception and has a get() method.
Loading editor...
python