Implementing an Async Reader-Writer Lock in Rust
Concurrent access to shared data is a fundamental challenge in multithreaded programming. Traditional reader-writer locks allow multiple readers to access data simultaneously, but only one writer at a time. This problem focuses on implementing such a lock in Rust's asynchronous runtime, enabling efficient concurrent access to shared resources without blocking the entire thread. This is crucial for building scalable and responsive asynchronous applications where data sharing is common.
Problem Description
Your task is to implement a fair, asynchronous reader-writer lock (AsyncRwLock) in Rust. This lock should manage access to a shared data resource.
Key Requirements:
- Reader Access: Multiple asynchronous tasks can hold a read lock simultaneously.
- Writer Access: Only one asynchronous task can hold a write lock at any given time.
- Exclusivity: If a write lock is held, no read locks can be acquired, and vice-versa.
- Fairness: The lock should be fair, meaning that waiting writers should not be starved indefinitely by a continuous stream of readers, and waiting readers should not be starved by a continuous stream of writers. A common strategy is to prioritize writers when they are waiting.
- Asynchronous Operations: The lock acquisition methods (
read()andwrite()) must be asynchronous and returnFutures that resolve when the lock is acquired. They should not block the executor thread. - Data Protection: The lock should be generic over the type of data it protects.
- Re-entrancy (Optional but Recommended): Consider whether the lock should be re-entrant (i.e., a task holding a read lock can acquire another read lock, and a task holding a write lock can acquire a read lock). For this challenge, we will not require re-entrancy.
Expected Behavior:
- Tasks attempting to acquire a read lock while a writer holds the lock should be put to sleep and woken up when the writer releases the lock.
- Tasks attempting to acquire a write lock while any readers or another writer holds the lock should be put to sleep and woken up when all other locks are released.
- When a writer releases the lock, if there are waiting writers, one writer should be prioritized. If there are no waiting writers but waiting readers, all waiting readers should be allowed to proceed.
Examples
Example 1: Basic Concurrent Reads and Writes
Imagine a shared counter that multiple tasks read from and occasionally increment.
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
// Assume AsyncRwLock is implemented and works as expected.
// For this example, we'll use a placeholder that you will replace.
use your_crate::AsyncRwLock; // Replace 'your_crate' with your crate name
#[tokio::main]
async fn main() {
let data = Arc::new(AsyncRwLock::new(0));
let mut handles: Vec<JoinHandle<()>> = Vec::new();
// Spawn multiple reader tasks
for i in 0..5 {
let data_clone = Arc::clone(&data);
handles.push(tokio::spawn(async move {
println!("Reader {} trying to acquire lock", i);
let reader_guard = data_clone.read().await;
println!("Reader {} acquired lock, value: {}", i, *reader_guard);
sleep(Duration::from_millis(50 + i as u64 * 10)).await; // Simulate work
println!("Reader {} releasing lock", i);
}));
}
// Spawn a writer task
let data_clone = Arc::clone(&data);
handles.push(tokio::spawn(async move {
sleep(Duration::from_millis(20)).await; // Give readers a head start
println!("Writer trying to acquire lock");
let mut writer_guard = data_clone.write().await;
println!("Writer acquired lock");
*writer_guard += 10;
println!("Writer updated value to: {}", *writer_guard);
sleep(Duration::from_millis(100)).await; // Simulate work
println!("Writer releasing lock");
}));
// Spawn more reader tasks
for i in 5..10 {
let data_clone = Arc::clone(&data);
handles.push(tokio::spawn(async move {
println!("Reader {} trying to acquire lock", i);
let reader_guard = data_clone.read().await;
println!("Reader {} acquired lock, value: {}", i, *reader_guard);
sleep(Duration::from_millis(40 + i as u64 * 5)).await; // Simulate work
println!("Reader {} releasing lock", i);
}));
}
for handle in handles {
handle.await.unwrap();
}
let final_value = *data.read().await;
println!("Final value: {}", final_value);
// Expected output will show interleaved reads and writes, with the writer
// potentially blocking some reads and then the final value being updated.
// The exact order of print statements will vary due to async scheduling.
}
Example 2: Writer Starvation Scenario
This example demonstrates how the lock should prevent a writer from being starved.
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use your_crate::AsyncRwLock; // Replace 'your_crate' with your crate name
#[tokio::main]
async fn main() {
let data = Arc::new(AsyncRwLock::new(0));
let mut handles: Vec<JoinHandle<()>> = Vec::new();
// Spawn a writer that will continuously try to write
let writer_data = Arc::clone(&data);
handles.push(tokio::spawn(async move {
println!("Writer task started");
let mut writer_guard = writer_data.write().await;
println!("Writer acquired lock");
*writer_guard = 100;
println!("Writer set value to 100");
sleep(Duration::from_millis(50)).await; // Hold for a bit
println!("Writer releasing lock");
}));
// Spawn a stream of readers that arrive *after* the writer
sleep(Duration::from_millis(10)).await; // Ensure writer is likely waiting
for i in 0..3 {
let data_clone = Arc::clone(&data);
handles.push(tokio::spawn(async move {
println!("Reader {} trying to acquire lock", i);
let reader_guard = data_clone.read().await;
println!("Reader {} acquired lock, value: {}", i, *reader_guard);
sleep(Duration::from_millis(20)).await; // Short read
println!("Reader {} releasing lock", i);
}));
}
// Spawn another writer that should be prioritized over subsequent readers
sleep(Duration::from_millis(70)).await; // Let the first writer and some readers finish
let writer2_data = Arc::clone(&data);
handles.push(tokio::spawn(async move {
println!("Writer 2 trying to acquire lock");
let mut writer2_guard = writer2_data.write().await;
println!("Writer 2 acquired lock");
*writer2_guard = 200;
println!("Writer 2 set value to 200");
sleep(Duration::from_millis(50)).await;
println!("Writer 2 releasing lock");
}));
for handle in handles {
handle.await.unwrap();
}
let final_value = *data.read().await;
println!("Final value: {}", final_value);
// Expected: The second writer should acquire the lock, even if readers are waiting.
// The final value should be 200.
}
Constraints
- The implementation should be purely in Rust.
- You must use
tokioor another compatible asynchronous runtime (e.g.,async-std). The examples usetokio. - The lock must be generic over the type
Tit protects. - The lock should not require
unsafecode for its core functionality. - Memory safety must be maintained at all times.
- The implementation should be reasonably efficient, aiming to minimize contention and context switching overhead.
Notes
- You'll likely need to use
tokio::sync::Notifyortokio::sync::mpscchannels to signal waiting tasks when locks are released. - Consider how to manage the state of the lock: number of active readers, presence of a writer, and queues of waiting readers and writers.
- A common strategy for fairness is to have a "writer waiting" flag. When a writer arrives, set this flag. New readers are blocked until the writer finishes, but existing readers can still finish. Once the writer finishes, clear the flag and allow waiting writers to proceed, or if no writers are waiting, allow waiting readers.
- When implementing
read()andwrite(), you'll return aGuardtype that implementsDeref(andDerefMutfor writers) to access the data and also handles releasing the lock when it goes out of scope (RAII). ThisGuardwill also need to beSendandSyncif theAsyncRwLockitself isSendandSync. - Think about the lifetime of the data being protected and how it relates to the lifetime of the lock guards.
- The
Dropimplementation for your guards is critical for releasing the lock correctly.