Building a Reactive Data Processor with Async Streams in Rust
Modern applications often deal with continuous streams of data, such as sensor readings, user events, or network requests. Efficiently processing these asynchronous data flows requires tools that can handle data as it arrives without blocking the entire application. This challenge focuses on leveraging Rust's async and Stream capabilities to build a flexible and reactive data processing pipeline.
Problem Description
Your task is to implement a system that can consume an asynchronous stream of data, perform transformations on it, and then produce a new asynchronous stream of processed results. You will be given a source Stream that emits integers. You need to create a pipeline that:
- Filters the incoming integers, keeping only those that are even.
- Transforms the remaining even integers by squaring them.
- Collects a specified number of these transformed values into a
Vec.
The final output should be an async function that returns a Vec of the processed integers. This approach demonstrates how to compose asynchronous operations to build robust data processing pipelines.
Key Requirements:
- Utilize Rust's
futurescrate forStreamandStreamExttraits. - Implement filtering logic to select only even numbers.
- Implement transformation logic to square the filtered numbers.
- Implement a mechanism to stop processing after a certain number of results have been collected.
- The main function should be an
asyncfunction that orchestrates the stream processing.
Expected Behavior:
The system should take an input Stream of integers and a count representing how many processed (squared even) numbers to collect. It should asynchronously iterate through the input stream, apply the filtering and transformation, and stop once count elements have been successfully processed and collected.
Edge Cases:
- Empty Input Stream: If the input stream is empty, the resulting
Vecshould also be empty. - Not Enough Elements: If the input stream finishes before
countelements are processed, the resultingVecshould contain all the elements that were successfully processed. - Zero Count: If
countis 0, the function should return an emptyVecimmediately.
Examples
Example 1:
Input Stream: stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8])
Count: 3
Output: vec![4, 16, 36]
Explanation: The stream emits 1, 2, 3, 4, 5, 6, 7, 8.
- 1 is odd, filtered out.
- 2 is even, squared to 4. Collected. (Count = 1)
- 3 is odd, filtered out.
- 4 is even, squared to 16. Collected. (Count = 2)
- 5 is odd, filtered out.
- 6 is even, squared to 36. Collected. (Count = 3) We have collected 3 elements, so processing stops.
Example 2:
Input Stream: stream::iter(vec![10, 20, 30])
Count: 5
Output: vec![100, 400, 900]
Explanation: The stream emits 10, 20, 30.
- 10 is even, squared to 100. Collected. (Count = 1)
- 20 is even, squared to 400. Collected. (Count = 2)
- 30 is even, squared to 900. Collected. (Count = 3) The input stream has finished. We could not collect 5 elements, so we return the 3 elements collected.
Example 3:
Input Stream: stream::iter(vec![1, 3, 5, 7])
Count: 2
Output: vec![]
Explanation: All numbers in the input stream are odd and are filtered out. No even numbers are found, so the resulting vector is empty.
Constraints
- The input
Streamwill emiti32values. - The
countwill be ausizevalue. - The output
Vecshould containi32values. - The solution should be efficient and avoid unnecessary blocking.
Notes
- You will need to add
futuresandtokio(for the runtime) as dependencies in yourCargo.toml. - Consider using
StreamExtmethods likefilter,map, andtake. - The
take(count)method can be useful for limiting the number of items from the stream. - To collect the stream into a
Vec, you can use thecollect()method. - Remember to run your
asyncfunction within a Tokio runtime.