Hone logo
Hone
Problems

Building a Reactive Data Processor with Async Streams in Rust

Modern applications often deal with continuous streams of data, such as sensor readings, user events, or network requests. Efficiently processing these asynchronous data flows requires tools that can handle data as it arrives without blocking the entire application. This challenge focuses on leveraging Rust's async and Stream capabilities to build a flexible and reactive data processing pipeline.

Problem Description

Your task is to implement a system that can consume an asynchronous stream of data, perform transformations on it, and then produce a new asynchronous stream of processed results. You will be given a source Stream that emits integers. You need to create a pipeline that:

  1. Filters the incoming integers, keeping only those that are even.
  2. Transforms the remaining even integers by squaring them.
  3. Collects a specified number of these transformed values into a Vec.

The final output should be an async function that returns a Vec of the processed integers. This approach demonstrates how to compose asynchronous operations to build robust data processing pipelines.

Key Requirements:

  • Utilize Rust's futures crate for Stream and StreamExt traits.
  • Implement filtering logic to select only even numbers.
  • Implement transformation logic to square the filtered numbers.
  • Implement a mechanism to stop processing after a certain number of results have been collected.
  • The main function should be an async function that orchestrates the stream processing.

Expected Behavior:

The system should take an input Stream of integers and a count representing how many processed (squared even) numbers to collect. It should asynchronously iterate through the input stream, apply the filtering and transformation, and stop once count elements have been successfully processed and collected.

Edge Cases:

  • Empty Input Stream: If the input stream is empty, the resulting Vec should also be empty.
  • Not Enough Elements: If the input stream finishes before count elements are processed, the resulting Vec should contain all the elements that were successfully processed.
  • Zero Count: If count is 0, the function should return an empty Vec immediately.

Examples

Example 1:

Input Stream: stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8]) Count: 3

Output: vec![4, 16, 36]

Explanation: The stream emits 1, 2, 3, 4, 5, 6, 7, 8.

  • 1 is odd, filtered out.
  • 2 is even, squared to 4. Collected. (Count = 1)
  • 3 is odd, filtered out.
  • 4 is even, squared to 16. Collected. (Count = 2)
  • 5 is odd, filtered out.
  • 6 is even, squared to 36. Collected. (Count = 3) We have collected 3 elements, so processing stops.

Example 2:

Input Stream: stream::iter(vec![10, 20, 30]) Count: 5

Output: vec![100, 400, 900]

Explanation: The stream emits 10, 20, 30.

  • 10 is even, squared to 100. Collected. (Count = 1)
  • 20 is even, squared to 400. Collected. (Count = 2)
  • 30 is even, squared to 900. Collected. (Count = 3) The input stream has finished. We could not collect 5 elements, so we return the 3 elements collected.

Example 3:

Input Stream: stream::iter(vec![1, 3, 5, 7]) Count: 2

Output: vec![]

Explanation: All numbers in the input stream are odd and are filtered out. No even numbers are found, so the resulting vector is empty.

Constraints

  • The input Stream will emit i32 values.
  • The count will be a usize value.
  • The output Vec should contain i32 values.
  • The solution should be efficient and avoid unnecessary blocking.

Notes

  • You will need to add futures and tokio (for the runtime) as dependencies in your Cargo.toml.
  • Consider using StreamExt methods like filter, map, and take.
  • The take(count) method can be useful for limiting the number of items from the stream.
  • To collect the stream into a Vec, you can use the collect() method.
  • Remember to run your async function within a Tokio runtime.
Loading editor...
rust