Hone logo
Hone
Problems

Python Worker Pool Implementation

Asynchronous processing is crucial for improving the performance and responsiveness of applications, especially when dealing with I/O-bound or CPU-bound tasks. A common pattern to manage concurrent execution is the worker pool, which allows you to execute tasks concurrently without overwhelming system resources. This challenge asks you to implement a basic worker pool in Python.

Problem Description

You are tasked with creating a WorkerPool class in Python. This class should manage a fixed number of worker threads that can execute tasks concurrently. The WorkerPool should provide a way to submit tasks to the pool and retrieve their results.

Key Requirements:

  1. Initialization: The WorkerPool should be initialized with a specified number of worker threads.
  2. Task Submission: A submit method should accept a callable (function) and its arguments. This task should be placed in a queue for workers to pick up.
  3. Concurrent Execution: Worker threads should continuously fetch tasks from the queue and execute them.
  4. Result Retrieval: Each submitted task should return a "future" object. This future object should allow the user to check if the task is done, get the result of the task, or retrieve any exceptions that occurred during execution.
  5. Shutdown: A shutdown method should gracefully stop all worker threads, ensuring all currently running tasks are completed.

Expected Behavior:

  • When submit is called, a task is enqueued.
  • When a worker thread becomes available, it dequeues a task and executes it.
  • The result (or exception) of a task should be associated with its corresponding future object.
  • The shutdown method should signal workers to finish their current tasks and then terminate.

Edge Cases to Consider:

  • Submitting tasks to a pool that has already been shut down.
  • Handling exceptions raised by tasks.
  • What happens if the number of submitted tasks exceeds the capacity of the queue?

Examples

Example 1:

import time

def multiply(x, y):
    time.sleep(1) # Simulate work
    return x * y

pool = WorkerPool(num_threads=2)
future1 = pool.submit(multiply, 5, 3)
future2 = pool.submit(multiply, 7, 2)

print(f"Result 1: {future1.get()}")
print(f"Result 2: {future2.get()}")

pool.shutdown()

Output:

Result 1: 15
Result 2: 14

Explanation: Two tasks (multiply(5, 3) and multiply(7, 2)) are submitted to a pool with 2 worker threads. The workers execute these tasks concurrently. future1.get() blocks until the first task is complete and returns its result. future2.get() does the same for the second task.

Example 2:

import time

def divide(x, y):
    time.sleep(0.5)
    return x / y

def greet(name):
    time.sleep(0.2)
    return f"Hello, {name}!"

pool = WorkerPool(num_threads=3)

future_div = pool.submit(divide, 10, 2)
future_greet = pool.submit(greet, "Alice")
future_error = pool.submit(divide, 10, 0) # This will raise a ZeroDivisionError

print(f"Division result: {future_div.get()}")
print(f"Greeting: {future_greet.get()}")

try:
    future_error.get()
except ZeroDivisionError as e:
    print(f"Caught expected error: {e}")

pool.shutdown()

Output:

Division result: 5.0
Greeting: Hello, Alice!
Caught expected error: division by zero

Explanation: Tasks with different types of operations and potential errors are submitted. The get() method on the future_error object successfully raises the ZeroDivisionError that occurred during task execution.

Example 3: (Shutdown behavior)

import time

def slow_task(duration):
    time.sleep(duration)
    return duration

pool = WorkerPool(num_threads=1)
future1 = pool.submit(slow_task, 3)
future2 = pool.submit(slow_task, 1)

print("Shutting down pool...")
pool.shutdown()
print("Pool shut down.")

# Attempting to submit after shutdown should ideally raise an error or be ignored.
# For this challenge, let's assume it can be ignored or logged.
try:
    pool.submit(slow_task, 1)
except RuntimeError as e: # Example exception for submitting to a closed pool
    print(f"Caught expected error: {e}")

print(f"Result 1: {future1.get()}")
print(f"Result 2: {future2.get()}")

Output:

Shutting down pool...
Pool shut down.
Result 1: 3
Result 2: 1

Explanation: Even though shutdown() is called after future1 and future2 are submitted, the workers continue until future1 and future2 are completed before terminating.

Constraints

  • The WorkerPool should use threading.Thread for its workers.
  • Tasks should be managed using queue.Queue.
  • The number of worker threads (num_threads) will be a positive integer between 1 and 16.
  • Task functions can accept any number of positional and keyword arguments.
  • The shutdown method should not block indefinitely if a task hangs.
  • The get() method on a future should block until the result is available.

Notes

  • You'll need to design a Future class to hold the result or exception of a task.
  • Consider how to signal workers to stop. A common approach is to put special "sentinel" values into the queue.
  • For exception handling, store the exception object in the Future if a task fails.
  • The submit method should return an instance of your Future class.
  • Think about thread safety when accessing shared resources (like the task queue and future objects).
  • The queue.Queue class is thread-safe by default, which is helpful.
Loading editor...
python