Asynchronous Data Pipelines with Rust Streams
Asynchronous streams provide a powerful mechanism for processing data in a non-blocking manner, particularly useful for I/O-bound operations like network requests or file processing. This challenge asks you to implement a simple asynchronous data pipeline using Rust's stream and futures crates, demonstrating the core concepts of stream processing. Successfully completing this challenge will give you a solid foundation for building more complex asynchronous data processing systems.
Problem Description
You are tasked with creating an asynchronous data pipeline that reads integers from a source stream, squares each integer, and then filters out even numbers, finally emitting the remaining odd squares as a new stream. The pipeline should be implemented using Rust's stream API and futures for asynchronous operations.
What needs to be achieved:
- Create a function
process_streamthat takes an asynchronous stream of integers (impl Stream<Item = i32>) as input. - Within
process_stream, use stream combinators (map,filter) to:- Square each integer in the input stream.
- Filter out even numbers (keeping only odd numbers).
- Return a new asynchronous stream of integers (
impl Stream<Item = i32>) containing the odd squares.
Key Requirements:
- The solution must use Rust's
streamAPI (from thestreamcrate). - The solution must use
futuresfor asynchronous operations. - The code should be clear, concise, and well-documented.
- The solution must handle the asynchronous nature of streams correctly, ensuring non-blocking operation.
Expected Behavior:
The process_stream function should take an input stream, process each element asynchronously, and produce a new stream containing only the odd squares of the original integers. The output stream should be lazily evaluated; elements should only be processed as they are requested by a downstream consumer.
Edge Cases to Consider:
- Empty Input Stream: The function should return an empty stream if the input stream is empty.
- Negative Numbers: The squaring operation should work correctly with negative numbers.
- Large Numbers: Consider potential integer overflow when squaring large numbers. While overflow handling isn't explicitly required, be mindful of it.
Examples
Example 1:
Input: A stream emitting the following integers: [1, 2, 3, 4, 5]
Output: A stream emitting the following integers: [1, 9, 25]
Explanation: 1 squared is 1 (odd), 2 squared is 4 (even, filtered out), 3 squared is 9 (odd), 4 squared is 16 (even, filtered out), 5 squared is 25 (odd).
Example 2:
Input: A stream emitting the following integers: [-1, -2, 0, 2, 3]
Output: A stream emitting the following integers: [1, 9]
Explanation: -1 squared is 1 (odd), -2 squared is 4 (even, filtered out), 0 squared is 0 (even, filtered out), 2 squared is 4 (even, filtered out), 3 squared is 9 (odd).
Example 3: (Edge Case - Empty Stream)
Input: An empty stream.
Output: An empty stream.
Explanation: No input elements, so no output elements.
Constraints
- Crate Dependencies: You must use the
streamandfuturescrates. No other external crates are allowed. - Integer Type: The input and output streams must use
i32. - Performance: While not a primary focus, the solution should be reasonably efficient. Avoid unnecessary allocations or computations.
- Error Handling: No error handling is required for this challenge. Assume the input stream always provides valid integers.
Notes
- The
streamcrate provides powerful combinators likemapandfilterthat are essential for this task. Familiarize yourself with these combinators. - Remember that streams are lazily evaluated. The squaring and filtering operations will only be performed when a consumer requests elements from the output stream.
- Consider how to create a simple stream for testing purposes. You can use
futures::stream::iterto create a stream from a vector. - Think about the asynchronous nature of the operations. While this challenge doesn't involve explicit
asyncblocks, thestreamAPI is designed for asynchronous contexts.