Hone logo
Hone
Problems

Implement a Rust Broadcast Channel

This challenge involves implementing a broadcast channel in Rust. A broadcast channel is a communication primitive that allows multiple receivers to subscribe to a single sender. When the sender sends a message, all subscribed receivers will receive a copy of that message. This is useful for scenarios where you need to distribute information to many parts of your application concurrently without each receiver needing to poll or manage its own connection to the sender.

Problem Description

You are tasked with creating a BroadcastChannel<T> struct in Rust. This channel should enable a single sender to broadcast messages of type T to an arbitrary number of receivers. Each receiver will get its own clone of the sent message.

Key Requirements:

  1. new(): A constructor BroadcastChannel::new() to create a new, empty broadcast channel.
  2. subscribe(): A method subscribe() on BroadcastChannel that returns a receiver. Each call to subscribe() should create a new, independent receiver.
  3. send(): A method send(&self, message: T) on BroadcastChannel to send a message to all currently active receivers.
  4. Receiver Behavior: Each receiver must be able to receive messages sent after it has subscribed.
  5. Cloning: The message type T must be Clone. This is because each receiver needs its own copy of the message.
  6. Concurrency: The channel should be safe for concurrent access from multiple threads.

Expected Behavior:

  • When a message is sent, all receivers that were active before the message was sent should receive a clone of that message.
  • New receivers subscribed after a message is sent will not receive that particular message, but will receive subsequent messages.
  • If there are no receivers, send() should still succeed, but the message will be dropped.
  • The channel should handle multiple senders and receivers concurrently.

Edge Cases:

  • Sending a message when no receivers are subscribed.
  • Subscribing a new receiver after several messages have already been sent.
  • Dropping a receiver – subsequent sends should not attempt to send to the dropped receiver.

Examples

Example 1: Basic Send and Receive

// Assume BroadcastChannel<String> is implemented
use std::sync::Arc;
use std::thread;

let channel = Arc::new(BroadcastChannel::new());

let mut receiver1 = channel.subscribe();
let mut receiver2 = channel.subscribe();

// Send a message
channel.send("Hello from sender!".to_string());

// Receiver 1 gets the message
let msg1 = receiver1.recv();
assert_eq!(msg1, Some("Hello from sender!".to_string()));

// Receiver 2 gets the same message
let msg2 = receiver2.recv();
assert_eq!(msg2, Some("Hello from sender!".to_string()));

Explanation:

The sender sends "Hello from sender!". Both receiver1 and receiver2 were subscribed at the time of sending, so they both receive a clone of the message.

Example 2: New Subscriber Misses Old Messages

// Assume BroadcastChannel<i32> is implemented
use std::sync::Arc;
use std::thread;

let channel = Arc::new(BroadcastChannel::new());

channel.send(1);
channel.send(2);

let mut receiver3 = channel.subscribe(); // Subscribed after 1 and 2 were sent

// Receiver 3 only gets messages sent *after* it subscribed
let msg3_first = receiver3.recv();
assert_eq!(msg3_first, None); // No historical messages for new subscribers

channel.send(3);
let msg3_second = receiver3.recv();
assert_eq!(msg3_second, Some(3));

Explanation:

Messages 1 and 2 are sent before receiver3 is created. Therefore, receiver3 will not receive them. When message 3 is sent, receiver3 is subscribed and receives it. The recv() might return None initially if the implementation uses an internal queue that needs to be filled or if it's designed to block, but in this context, it implies no message is immediately available for receiver3 from historical sends.

Example 3: Multiple Threads and Receiver Dropping

// Assume BroadcastChannel<u32> is implemented
use std::sync::Arc;
use std::thread;
use std::time::Duration;

let channel = Arc::new(BroadcastChannel::new());

let mut receiver_main = channel.subscribe();

// Spawn a thread to send messages
let sender_handle = thread::spawn({
    let channel_clone = Arc::clone(&channel);
    move || {
        thread::sleep(Duration::from_millis(10));
        channel_clone.send(100);
        thread::sleep(Duration::from_millis(10));
        channel_clone.send(200);
    }
});

// Subscribe another receiver in a separate thread
let receiver_handle = thread::spawn({
    let channel_clone = Arc::clone(&channel);
    move || {
        let mut receiver_thread = channel_clone.subscribe();
        thread::sleep(Duration::from_millis(20)); // Ensure it subscribes after first send
        assert_eq!(receiver_thread.recv(), Some(200)); // Should get the second message
    }
});

// Main thread receives messages
assert_eq!(receiver_main.recv(), Some(100));
assert_eq!(receiver_main.recv(), Some(200));

// Explicitly drop a receiver (implicitly happens when thread ends)
// For demonstration, imagine dropping receiver_thread manually in its thread
// For this example, we rely on the thread ending to drop its receiver.

sender_handle.join().unwrap();
receiver_handle.join().unwrap();

// Send a message after one receiver (the one in receiver_handle) has presumably ended
// and its receiver has been dropped.
channel.send(300);
assert_eq!(receiver_main.recv(), Some(300));

Explanation:

The main thread subscribes and waits for messages. A sender thread sends 100 and 200. A separate thread subscribes and receives 200 (missing 100 due to subscription timing). The main thread receives both 100 and 200. After both spawned threads join (and their receivers are dropped), the main thread sends 300, which the main receiver gets.

Constraints

  • The message type T must implement Clone.
  • The channel implementation must be thread-safe.
  • The recv() method on receivers should ideally be non-blocking, returning None if no message is available. If a blocking recv() is implemented, clearly document its behavior. For this challenge, a non-blocking recv() is preferred for flexibility.
  • Memory usage should be reasonable, avoiding unbounded growth if receivers lag significantly behind senders (consider strategies like dropping old messages or blocking senders if a bounded channel is part of a more advanced requirement, though for this basic challenge, unbounded is acceptable).

Notes

  • You will likely need to use synchronization primitives like Mutex or RwLock from std::sync to protect shared state.
  • Consider how to manage the list of active receivers. When a receiver is dropped, it should be removed from the sender's list. The Arc and Weak pointers, or a custom Drop implementation on the receiver, can be useful here.
  • Think about the internal data structure for storing messages. A VecDeque might be suitable for holding sent messages.
  • The Receiver type returned by subscribe() will need to be able to retrieve messages from the channel. It might also need to hold state about which messages it has already received or a pointer back to the channel's message queue.
Loading editor...
rust