Implement a Concurrent Bounded Queue in Rust
Modern applications often require efficient ways to pass data between different threads. A common pattern for this is a queue, where one or more producer threads add items and one or more consumer threads remove them. This challenge focuses on building a bounded concurrent queue in Rust, ensuring thread safety and managing memory usage effectively.
Problem Description
Your task is to implement a thread-safe, bounded queue in Rust. This queue will allow multiple threads to concurrently push and pop elements without data races or deadlocks. The "bounded" nature means the queue has a maximum capacity, and pushing an element to a full queue should block the producer thread until space becomes available. Similarly, popping from an empty queue should block the consumer thread until an element is available.
Key Requirements:
- Thread Safety: The queue must be safe to use from multiple threads simultaneously. This implies using synchronization primitives correctly.
- Bounded Capacity: The queue must have a fixed maximum capacity, specified during its creation.
- Blocking Behavior:
push: If the queue is full, the calling thread must block until space is available.pop: If the queue is empty, the calling thread must block until an element is available.
SendandSync: The queue itself and its elements must beSendandSyncwhere appropriate to allow for safe transfer between threads.- API: Implement the following methods:
new(capacity: usize) -> Self: Creates a new bounded concurrent queue with the given capacity.push(&self, item: T): Pushes an item into the queue. Blocks if the queue is full.pop(&self) -> T: Removes and returns an item from the queue. Blocks if the queue is empty.len(&self) -> usize: Returns the current number of elements in the queue.is_empty(&self) -> bool: Returnstrueif the queue is empty,falseotherwise.is_full(&self) -> bool: Returnstrueif the queue is full,falseotherwise.
Expected Behavior:
Producers should be able to add items to the queue, and consumers should be able to remove them. When the queue reaches its capacity, subsequent push operations should pause until a pop operation frees up space. Conversely, when the queue becomes empty, pop operations should pause until a push operation adds an item.
Important Edge Cases:
- Zero Capacity: What happens if a queue is created with a capacity of 0? (Though usually a capacity of at least 1 is expected).
- Multiple Producers/Consumers: Ensure the implementation handles scenarios with multiple threads trying to push and pop concurrently.
- Interruption: While not explicitly required for this basic implementation, consider how a real-world system might handle thread interruption (e.g., if a thread is signaled to stop). For this challenge, assume blocking operations will eventually complete.
Examples
Example 1:
// Assume we have a queue `q` of capacity 2
let q = ConcurrentBoundedQueue::new(2);
// Thread 1 (Producer)
q.push(1); // q: [1]
q.push(2); // q: [1, 2] - queue is now full
// Thread 2 (Consumer)
let item1 = q.pop(); // item1 = 1, q: [2]
let item2 = q.pop(); // item2 = 2, q: [] - queue is now empty
// Thread 3 (Producer) tries to push to a full queue
// q.push(3); // This would block until space is available.
// Thread 4 (Consumer) tries to pop from an empty queue
// let item3 = q.pop(); // This would block until an item is available.
Explanation: Items are added and removed sequentially. The push and pop operations reflect the state of the queue.
Example 2: Consider a scenario with two producers and one consumer.
use std::thread;
use std::sync::Arc;
let q = Arc::new(ConcurrentBoundedQueue::new(3));
let q_clone1 = Arc::clone(&q);
let producer1 = thread::spawn(move || {
q_clone1.push("A");
q_clone1.push("B");
});
let q_clone2 = Arc::clone(&q);
let producer2 = thread::spawn(move || {
q_clone2.push("C");
q_clone2.push("D"); // This push might block if the queue fills up
});
// Allow producers to run for a bit
thread::sleep(std::time::Duration::from_millis(10));
let q_clone3 = Arc::clone(&q);
let consumer = thread::spawn(move || {
let mut items = Vec::new();
items.push(q_clone3.pop()); // Expecting "A" or "B" or "C"
items.push(q_clone3.pop()); // Expecting another item
items.push(q_clone3.pop()); // Expecting a third item
items
});
producer1.join().unwrap();
producer2.join().unwrap();
let consumed_items = consumer.join().unwrap();
println!("Consumed items: {:?}", consumed_items);
// Expected output might be something like:
// Consumed items: ["A", "B", "C"] or ["B", "A", "C"] etc., depending on thread scheduling.
// The key is that exactly three items are consumed eventually.
Explanation: Multiple producers add items. The bounded nature ensures that neither producer can indefinitely fill the queue. The consumer will eventually receive items as they are produced. The exact order of consumed items depends on thread scheduling.
Constraints
- The
capacityof the queue must be a non-negative integer. A capacity of 0 is technically valid but unlikely to be useful. - The queue should support elements of any type
Tthat implementsSendandSync(andSendfor the item itself). - Performance: While explicit performance targets are not set, the implementation should be reasonably efficient for typical use cases and avoid unnecessary busy-waiting. Using standard library synchronization primitives is encouraged.
Notes
- Consider using Rust's standard library synchronization primitives such as
std::sync::Mutex,std::sync::Condvar, and potentiallystd::collections::VecDequeor a similar collection for the underlying storage. Arcwill be necessary if you want to share the queue instance across multiple threads.- The
popmethod should return the oldest element in the queue (FIFO - First-In, First-Out). - Think carefully about the conditions for waiting and notifying
Condvars. When apushhappens, should consumers be notified? When apophappens, should producers be notified?