Hone logo
Hone
Problems

Implementing a Watch Channel in Rust

This challenge involves creating a "watch channel" mechanism in Rust. A watch channel is a communication primitive that allows multiple subscribers to receive notifications (or the latest value) from a single publisher. This is useful for scenarios where you need to broadcast state changes or events to several parts of your application without having to manually send the update to each subscriber.

Problem Description

You need to implement a WatchChannel<T> that supports the following operations:

  1. Publishing: A Publisher can send new values of type T to the channel.
  2. Subscribing: Any number of Subscribers can register to receive values.
  3. Receiving: Each Subscriber should be able to receive the latest value that has been published to the channel. If no value has been published yet, the subscriber should wait until one is available.
  4. Multiple Subscribers: The channel must correctly handle multiple subscribers, ensuring each one receives all published values from the point they subscribed onwards.

Key Requirements:

  • Asynchronous: The WatchChannel should be designed to work asynchronously, particularly for receiving values, to avoid blocking the publisher or other subscribers.
  • Latest Value Semantics: Subscribers should receive the most recent value published. If a subscriber joins after several values have been sent, they should immediately receive the latest one published before their subscription.
  • Concurrency Safe: The WatchChannel must be safe for concurrent access from multiple threads or asynchronous tasks.
  • Graceful Shutdown: Consider how subscribers might indicate they are no longer interested or how the channel might be closed. (This is a good-to-have for a more advanced version, but focus on the core functionality first).

Expected Behavior:

A subscriber joining a channel will:

  • If a value has been published, immediately receive the latest published value.
  • If no value has been published, wait until the next value is published.
  • After subscribing, receive all subsequent published values.

Edge Cases to Consider:

  • Subscribing when no values have been published.
  • Publishing values before any subscribers exist.
  • Multiple subscribers subscribing simultaneously.
  • Publishing many values in quick succession.

Examples

Example 1: Basic Publishing and Subscribing

// Imagine this scenario in an async runtime (like tokio)

// Create a watch channel for integers
let (publisher, mut subscriber1) = WatchChannel::new();

// Publish a value
publisher.publish(10);

// Subscriber 1 receives the first value
let value1 = subscriber1.recv().await; // Should be Ok(10)

// Publish another value
publisher.publish(20);

// Subscriber 1 receives the second value
let value2 = subscriber1.recv().await; // Should be Ok(20)

// ... more operations

Explanation: The publisher sends 10, and subscriber1 (which subscribed before the publish) immediately receives it. Then, 20 is published, and subscriber1 receives it.

Example 2: Subscriber Joins Late

// Imagine this scenario in an async runtime

// Create a watch channel
let (publisher, _) = WatchChannel::new(); // We don't need the first subscriber for this example

// Publish a few values
publisher.publish(100);
publisher.publish(200);
publisher.publish(300);

// Now, create a new subscriber
let mut subscriber2 = publisher.subscribe();

// Subscriber 2 should immediately receive the LATEST value (300)
let value_latest = subscriber2.recv().await; // Should be Ok(300)

// Publish another value
publisher.publish(400);

// Subscriber 2 receives the new value
let value_next = subscriber2.recv().await; // Should be Ok(400)

Explanation: subscriber2 joins after 100, 200, and 300 have been published. It immediately receives 300 (the latest). Subsequently, when 400 is published, subscriber2 receives it.

Example 3: Multiple Subscribers

// Imagine this scenario in an async runtime

// Create a watch channel
let (publisher, mut subscriber1) = WatchChannel::new();

// Publish first value
publisher.publish(5);

// Subscribe a second subscriber
let mut subscriber2 = publisher.subscribe();

// Subscriber 1 receives the first value
let s1_val1 = subscriber1.recv().await; // Should be Ok(5)

// Subscriber 2 also receives the first value
let s2_val1 = subscriber2.recv().await; // Should be Ok(5)

// Publish another value
publisher.publish(15);

// Subscriber 1 receives the second value
let s1_val2 = subscriber1.recv().await; // Should be Ok(15)

// Subscriber 2 receives the second value
let s2_val2 = subscriber2.recv().await; // Should be Ok(15)

Explanation: Both subscribers receive the values published after their respective subscription points. If they subscribe before a publish, they get that value.

Constraints

  • The WatchChannel<T> must be generic over T.
  • T must be Cloneable to allow multiple subscribers to receive copies of the same value.
  • The implementation should leverage Rust's standard library and potentially tokio or async-std for asynchronous operations (e.g., tokio::sync::watch is a good reference, but you should implement your own).
  • Focus on correctness and clarity of the asynchronous receiving mechanism.

Notes

  • Consider using an Arc<Mutex<...>> or similar for shared mutable state within the channel to ensure thread safety.
  • For the asynchronous receiving part, tokio::sync::Notify or a custom mechanism based on channels (tokio::sync::mpsc) could be useful.
  • Think about how to store the "latest" value efficiently.
  • When a subscriber receives a value, they should be able to continue receiving subsequent values until the channel is closed or they disconnect.
  • The subscribe() method on the Publisher should return a Subscriber.
  • The recv() method on the Subscriber should be an async function that returns Result<T, RecvError> where RecvError indicates the channel has been closed. (For this challenge, you can simplify and assume the channel stays open indefinitely if you prefer, but a proper RecvError is good practice).
Loading editor...
rust