Implementing a Basic Task Queue in Python
Many applications require processing tasks asynchronously or in a specific order. A common pattern to manage this is a task queue, where tasks are added to a queue and processed by worker threads or processes. This challenge asks you to implement a simple, thread-safe task queue in Python.
Problem Description
You need to create a TaskQueue class that allows tasks to be added and retrieved for processing. The queue should be thread-safe, meaning multiple threads can interact with it concurrently without data corruption or race conditions.
Key Requirements:
add_task(task): A method to add a task to the queue. A "task" can be any Python object (e.g., a string, a tuple, a custom object).get_task(): A method to retrieve and remove a task from the queue. This method should block if the queue is empty, waiting until a task becomes available.is_empty(): A method to check if the queue is currently empty.- Thread Safety: All operations on the queue must be thread-safe.
Expected Behavior:
When get_task() is called on an empty queue, the calling thread should pause execution until another thread calls add_task() and puts an item into the queue. Once a task is available, get_task() should return it and remove it from the queue.
Edge Cases:
- Adding multiple tasks and then retrieving them.
- Multiple threads trying to add tasks simultaneously.
- Multiple threads trying to retrieve tasks simultaneously when tasks are available.
- Multiple threads trying to retrieve tasks simultaneously when the queue is empty.
Examples
Example 1:
# Assume a simple TaskQueue implementation
from threading import Thread
import time
queue = TaskQueue()
def worker(name):
while True:
task = queue.get_task()
if task is None: # Sentinel value to stop worker
print(f"Worker {name} stopping.")
break
print(f"Worker {name} processing: {task}")
time.sleep(0.1) # Simulate work
# Create workers
threads = []
for i in range(2):
t = Thread(target=worker, args=(i+1,))
threads.append(t)
t.start()
# Add tasks
for i in range(5):
queue.add_task(f"Task {i+1}")
# Signal workers to stop
for _ in range(len(threads)):
queue.add_task(None)
# Wait for all threads to finish
for t in threads:
t.join()
print("All tasks processed and workers stopped.")
Expected Output (order of worker processing may vary):
Worker 1 processing: Task 1
Worker 2 processing: Task 2
Worker 1 processing: Task 3
Worker 2 processing: Task 4
Worker 1 processing: Task 5
Worker 2 stopping.
Worker 1 stopping.
All tasks processed and workers stopped.
Explanation:
Two worker threads are started. Tasks "Task 1" through "Task 5" are added to the queue. The worker threads concurrently pick up tasks, process them (simulated by time.sleep), and print their progress. Once all tasks are added, None is added as a sentinel value for each worker to signal them to exit their loop.
Example 2:
queue = TaskQueue()
print(f"Is queue empty initially? {queue.is_empty()}")
queue.add_task("First Task")
print(f"Is queue empty after adding a task? {queue.is_empty()}")
task = queue.get_task()
print(f"Retrieved task: {task}")
print(f"Is queue empty after retrieving a task? {queue.is_empty()}")
Expected Output:
Is queue empty initially? True
Is queue empty after adding a task? False
Retrieved task: First Task
Is queue empty after retrieving a task? True
Explanation: This example demonstrates the basic functionality of adding and retrieving a single task, along with checking the queue's emptiness.
Constraints
- The queue should be able to store any Python object as a task.
- The
get_task()method must block indefinitely until a task is available if the queue is empty. - The solution must be implemented using Python's standard library.
Notes
- Consider using Python's
threadingmodule for thread safety. Specifically, look intoLockandConditionobjects. - A common way to signal threads to stop is by using a "sentinel value" (like
Nonein Example 1) that is added to the queue. - Think about how to handle the waiting and notification mechanism for
get_task()when the queue is empty.