Implement a Bounded Buffer with Condition Variables in Rust
This challenge requires you to build a thread-safe bounded buffer (also known as a producer-consumer queue) in Rust. You will leverage Rust's standard library std::sync::Condvar and std::sync::Mutex to manage concurrent access to the buffer and signal between producer and consumer threads. This is a fundamental pattern for inter-thread communication and synchronization.
Problem Description
You need to implement a BoundedBuffer<T> struct that allows multiple producer threads to add items of type T and multiple consumer threads to remove items. The buffer has a fixed capacity.
Key Requirements:
- Thread Safety: The buffer must be safe to access from multiple threads concurrently.
- Bounded Capacity: The buffer should not exceed its specified maximum capacity. Producers attempting to add an item to a full buffer should block until space becomes available.
- Availability Signaling: Consumers attempting to remove an item from an empty buffer should block until an item is available.
- Graceful Shutdown: The buffer should provide a mechanism to signal that no more items will be produced, allowing consumers to eventually terminate gracefully.
Expected Behavior:
- Producers call
push(item)to add an item. If the buffer is full, the producer thread blocks. - Consumers call
pop()to remove an item. If the buffer is empty, the consumer thread blocks. - When
pushis called and the buffer was full, waiting producers should be unblocked. - When
popis called and the buffer was empty, waiting consumers should be unblocked. - A
close()method should be implemented to signal that no more items will be added. Afterclose()is called, subsequent calls topop()on an empty buffer should return an error orNoneto indicate that no more items will ever be available.
Important Edge Cases:
- Empty Buffer: Consumers should block.
- Full Buffer: Producers should block.
- Buffer becomes empty after
close(): Consumers should receive a signal that no more items are coming. - Simultaneous pushes and pops: The implementation must handle these race conditions correctly.
Examples
Example 1: Basic Producer-Consumer Interaction
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
// Assume BoundedBuffer struct and methods (push, pop, close) are implemented
let buffer = Arc::new(BoundedBuffer::new(5)); // Capacity of 5
// Producer thread
let producer_buffer = Arc::clone(&buffer);
let producer_handle = thread::spawn(move || {
for i in 0..10 {
producer_buffer.push(i).unwrap();
println!("Produced: {}", i);
thread::sleep(Duration::from_millis(50));
}
producer_buffer.close(); // Signal no more items
});
// Consumer thread
let consumer_buffer = Arc::clone(&buffer);
let consumer_handle = thread::spawn(move || {
let mut consumed_items = Vec::new();
loop {
match consumer_buffer.pop() {
Ok(item) => {
println!("Consumed: {}", item);
consumed_items.push(item);
}
Err(_) => { // Error signifies buffer is closed and empty
println!("Consumer finished.");
break;
}
}
}
consumed_items
});
producer_handle.join().unwrap();
let consumed = consumer_handle.join().unwrap();
// Expected output (order of produced/consumed might vary slightly due to concurrency,
// but the set of consumed items should be 0-9):
// Produced: 0
// Consumed: 0
// Produced: 1
// ...
// Produced: 9
// Consumer finished.
assert_eq!(consumed.len(), 10);
for i in 0..10 {
assert!(consumed.contains(&i));
}
Example 2: Full Buffer Blocking
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
// Assume BoundedBuffer struct and methods (push, pop, close) are implemented
let buffer = Arc::new(BoundedBuffer::new(2)); // Capacity of 2
// Producer thread 1 (fills the buffer)
let producer1_buffer = Arc::clone(&buffer);
let producer1_handle = thread::spawn(move || {
producer1_buffer.push(100).unwrap();
println!("Produced 100");
thread::sleep(Duration::from_millis(100)); // Give consumer a chance to pop
producer1_buffer.push(101).unwrap();
println!("Produced 101");
thread::sleep(Duration::from_millis(100)); // Buffer is now full
println!("Producer 1 attempting to push 102...");
producer1_buffer.push(102).unwrap(); // This should block
println!("Producer 1 successfully pushed 102");
producer1_buffer.close();
});
// Consumer thread
let consumer_buffer = Arc::clone(&buffer);
let consumer_handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50)); // Let producer fill buffer initially
let item1 = consumer_buffer.pop().unwrap();
println!("Consumed: {}", item1);
thread::sleep(Duration::from_millis(200)); // Give producer time to unblock and push 102
let item2 = consumer_buffer.pop().unwrap();
println!("Consumed: {}", item2);
let item3 = consumer_buffer.pop().unwrap();
println!("Consumed: {}", item3);
match consumer_buffer.pop() { // Try to pop after close and empty
Ok(_) => panic!("Should not receive an item after close and empty"),
Err(_) => println!("Consumer correctly signaled end."),
}
});
producer1_handle.join().unwrap();
let consumed_items = consumer_handle.join().unwrap();
// Expected output (order might vary, but key blocking behavior should be observed):
// Produced 100
// Produced 101
// Producer 1 attempting to push 102...
// Consumed: 100
// Consumed: 101
// Producer 1 successfully pushed 102
// Consumed: 102
// Consumer correctly signaled end.
Constraints
- The buffer capacity will be a positive integer (e.g.,
1 <= capacity <= 100). - The
Ttype parameter can be any type that implementsSendandSync. - The implementation should be efficient and avoid unnecessary busy-waiting.
Notes
- You will need to use
std::collections::VecDequeor a similar data structure for the underlying buffer storage. - A single
Mutexwill likely guard both the buffer data and any state related to its fullness/emptiness. - Two
Condvars are typically used: one for signaling when the buffer is not full (for producers) and one for signaling when the buffer is not empty (for consumers). - Consider how to handle the
close()operation. A boolean flag protected by the mutex is a common approach. - The
pop()method should returnResult<T, BufferClosedError>orOption<T>, where the error/Nonesignifies that the buffer is closed and empty. You'll need to define a suitable error type or useOption. - Pay close attention to the order of operations within your
lock()andwait()calls to avoid deadlocks and starvation. - When
notify_one()ornotify_all()is called, ensure it's done while the mutex is still held to prevent race conditions.