Hone logo
Hone
Problems

Asynchronous Stream Processing in Rust

Asynchronous streams are a powerful tool for handling data that arrives over time, such as from network connections, files, or user input. This challenge asks you to implement a simplified asynchronous stream in Rust, allowing you to process data items as they become available without blocking the main thread. This is crucial for building responsive and efficient applications.

Problem Description

You are tasked with creating a basic asynchronous stream in Rust using the async_stream crate. The stream should be able to produce a sequence of integers asynchronously. The stream should be configurable with a start and count parameter, generating integers starting from start up to (but not including) start + count. The stream should yield each integer after a short, configurable delay (simulating asynchronous data arrival).

Key Requirements:

  • Asynchronous Generation: The stream must be asynchronous, utilizing async and await.
  • Configurable Start and Count: The stream should accept start and count parameters to define the range of integers to generate.
  • Asynchronous Delay: Introduce a delay between yielding each integer using tokio::time::sleep. The delay should be configurable.
  • Stream Termination: The stream should terminate gracefully after yielding all integers in the specified range.
  • Error Handling: The stream should not panic. If count is negative, it should return an empty stream.

Expected Behavior:

The stream should yield integers sequentially, with a delay between each integer. The total number of integers yielded should be equal to count (if count is positive).

Edge Cases to Consider:

  • count is zero: The stream should yield nothing.
  • count is negative: The stream should yield nothing.
  • start is a large number: The stream should still function correctly.
  • Concurrency: While not explicitly required, consider how your implementation might behave in a concurrent environment (though this challenge focuses on the core stream logic).

Examples

Example 1:

Input: start = 0, count = 3, delay = 100ms
Output: 0, 1, 2 (each yielded after a 100ms delay)
Explanation: The stream starts at 0 and yields 3 integers (0, 1, and 2), each after a 100ms delay.

Example 2:

Input: start = 5, count = 2, delay = 500ms
Output: 5, 6 (each yielded after a 500ms delay)
Explanation: The stream starts at 5 and yields 2 integers (5 and 6), each after a 500ms delay.

Example 3:

Input: start = 10, count = -1, delay = 200ms
Output: (empty stream)
Explanation: Since count is negative, the stream yields nothing.

Constraints

  • The delay parameter should be in milliseconds (represented as u64).
  • The start parameter should be a u32.
  • The count parameter should be a u32.
  • The stream should be implemented using the async_stream crate.
  • The solution should compile and run without panics.
  • The solution should be reasonably efficient (avoid unnecessary allocations or computations).

Notes

  • You'll need to add tokio and async-stream as dependencies in your Cargo.toml.
  • Consider using a pin to ensure the stream is not moved during iteration.
  • The tokio::time::sleep function is used to simulate asynchronous delays.
  • Focus on the core stream logic; error handling beyond the negative count case is not required.
  • Think about how to structure your code to make it readable and maintainable. A separate function to generate the stream is a good approach.
use async_stream::stream;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Example usage (replace with your testing logic)
    let stream = generate_stream(0, 3, 100).await;
    tokio::pin!(stream);

    while let Some(item) = stream.next().await {
        println!("Received: {}", item);
    }
}

async fn generate_stream(start: u32, count: u32, delay: u64) -> impl async_stream::Stream<Item = u32> {
    if count == 0 || count as i32 < 0 {
        return stream![];
    }

    stream![
        for i in start..start + count {
            sleep(Duration::from_millis(delay)).await;
            yield i;
        }
    ]
}
Loading editor...
rust