Python Worker Pool Implementation
Asynchronous processing is crucial for improving the performance and responsiveness of applications, especially when dealing with I/O-bound or CPU-bound tasks. A common pattern to manage concurrent execution is the worker pool, which allows you to execute tasks concurrently without overwhelming system resources. This challenge asks you to implement a basic worker pool in Python.
Problem Description
You are tasked with creating a WorkerPool class in Python. This class should manage a fixed number of worker threads that can execute tasks concurrently. The WorkerPool should provide a way to submit tasks to the pool and retrieve their results.
Key Requirements:
- Initialization: The
WorkerPoolshould be initialized with a specified number of worker threads. - Task Submission: A
submitmethod should accept a callable (function) and its arguments. This task should be placed in a queue for workers to pick up. - Concurrent Execution: Worker threads should continuously fetch tasks from the queue and execute them.
- Result Retrieval: Each submitted task should return a "future" object. This future object should allow the user to check if the task is done, get the result of the task, or retrieve any exceptions that occurred during execution.
- Shutdown: A
shutdownmethod should gracefully stop all worker threads, ensuring all currently running tasks are completed.
Expected Behavior:
- When
submitis called, a task is enqueued. - When a worker thread becomes available, it dequeues a task and executes it.
- The result (or exception) of a task should be associated with its corresponding future object.
- The
shutdownmethod should signal workers to finish their current tasks and then terminate.
Edge Cases to Consider:
- Submitting tasks to a pool that has already been shut down.
- Handling exceptions raised by tasks.
- What happens if the number of submitted tasks exceeds the capacity of the queue?
Examples
Example 1:
import time
def multiply(x, y):
time.sleep(1) # Simulate work
return x * y
pool = WorkerPool(num_threads=2)
future1 = pool.submit(multiply, 5, 3)
future2 = pool.submit(multiply, 7, 2)
print(f"Result 1: {future1.get()}")
print(f"Result 2: {future2.get()}")
pool.shutdown()
Output:
Result 1: 15
Result 2: 14
Explanation: Two tasks (multiply(5, 3) and multiply(7, 2)) are submitted to a pool with 2 worker threads. The workers execute these tasks concurrently. future1.get() blocks until the first task is complete and returns its result. future2.get() does the same for the second task.
Example 2:
import time
def divide(x, y):
time.sleep(0.5)
return x / y
def greet(name):
time.sleep(0.2)
return f"Hello, {name}!"
pool = WorkerPool(num_threads=3)
future_div = pool.submit(divide, 10, 2)
future_greet = pool.submit(greet, "Alice")
future_error = pool.submit(divide, 10, 0) # This will raise a ZeroDivisionError
print(f"Division result: {future_div.get()}")
print(f"Greeting: {future_greet.get()}")
try:
future_error.get()
except ZeroDivisionError as e:
print(f"Caught expected error: {e}")
pool.shutdown()
Output:
Division result: 5.0
Greeting: Hello, Alice!
Caught expected error: division by zero
Explanation: Tasks with different types of operations and potential errors are submitted. The get() method on the future_error object successfully raises the ZeroDivisionError that occurred during task execution.
Example 3: (Shutdown behavior)
import time
def slow_task(duration):
time.sleep(duration)
return duration
pool = WorkerPool(num_threads=1)
future1 = pool.submit(slow_task, 3)
future2 = pool.submit(slow_task, 1)
print("Shutting down pool...")
pool.shutdown()
print("Pool shut down.")
# Attempting to submit after shutdown should ideally raise an error or be ignored.
# For this challenge, let's assume it can be ignored or logged.
try:
pool.submit(slow_task, 1)
except RuntimeError as e: # Example exception for submitting to a closed pool
print(f"Caught expected error: {e}")
print(f"Result 1: {future1.get()}")
print(f"Result 2: {future2.get()}")
Output:
Shutting down pool...
Pool shut down.
Result 1: 3
Result 2: 1
Explanation: Even though shutdown() is called after future1 and future2 are submitted, the workers continue until future1 and future2 are completed before terminating.
Constraints
- The
WorkerPoolshould usethreading.Threadfor its workers. - Tasks should be managed using
queue.Queue. - The number of worker threads (
num_threads) will be a positive integer between 1 and 16. - Task functions can accept any number of positional and keyword arguments.
- The
shutdownmethod should not block indefinitely if a task hangs. - The
get()method on a future should block until the result is available.
Notes
- You'll need to design a
Futureclass to hold the result or exception of a task. - Consider how to signal workers to stop. A common approach is to put special "sentinel" values into the queue.
- For exception handling, store the exception object in the
Futureif a task fails. - The
submitmethod should return an instance of yourFutureclass. - Think about thread safety when accessing shared resources (like the task queue and future objects).
- The
queue.Queueclass is thread-safe by default, which is helpful.