Asynchronous Iterator with Backpressure in JavaScript
This challenge focuses on building an asynchronous iterator that incorporates backpressure. Backpressure is a crucial mechanism for preventing a fast producer from overwhelming a slower consumer in asynchronous operations, ensuring efficient resource utilization and preventing memory issues. Your task is to implement an async iterator that respects the consumer's pull rate.
Problem Description
You are required to implement an AsyncBackpressureIterator class in JavaScript. This class should take an asynchronous data source (another async iterable) as input and provide an async iterator that limits the rate at which data is emitted to the consumer based on the consumer's pull() requests. The iterator should maintain a queue of data received from the source but not yet delivered to the consumer, ensuring that the consumer is not overwhelmed.
Key Requirements:
- Asynchronous Source: The iterator must accept an async iterable as its data source.
- Backpressure: The iterator should only emit data to the consumer when the consumer explicitly requests it via a
pull()method. - Queueing: Maintain a queue to buffer data received from the source that hasn't been consumed yet.
- Error Handling: Propagate errors from the source async iterable to the consumer.
- Completion: Signal completion to the consumer when the source async iterable is exhausted.
Expected Behavior:
- The
AsyncBackpressureIteratorshould be constructed with an async iterable. - The iterator should internally consume data from the source async iterable and enqueue it.
- The consumer should call the
pull()method on the iterator to request data. - The
pull()method should return a promise that resolves with the next item in the queue (if available) or rejects with an error (if the source rejected). - If the queue is empty and the source is not yet exhausted, the
pull()method should suspend until data becomes available. - When the source async iterable completes, the iterator should signal completion to the consumer by returning
undefinedfrompull(). - If the source async iterable rejects, the iterator should reject the
pull()promise.
Edge Cases to Consider:
- Empty source async iterable.
- Source async iterable that emits data very quickly.
- Source async iterable that rejects.
- Consumer calling
pull()multiple times in a row. - Consumer calling
pull()and then cancelling the request (not required, but good to consider).
Examples
Example 1:
Input: asyncIterable = async function*() { yield 1; yield 2; yield 3; }
Output: pull() -> 1, pull() -> 2, pull() -> 3, pull() -> undefined
Explanation: The iterator consumes the three values from the source and delivers them to the consumer one at a time via `pull()`. The final `pull()` returns `undefined` indicating completion.
Example 2:
Input: asyncIterable = async function*() { yield 1; throw new Error("Source Error"); }
Output: pull() -> 1, pull() -> Error: "Source Error"
Explanation: The iterator consumes the first value, then encounters an error from the source. The subsequent `pull()` call rejects with the same error.
Example 3: (Empty Source)
Input: asyncIterable = async function*() {}
Output: pull() -> undefined
Explanation: The iterator immediately signals completion because the source is empty.
Constraints
- The source async iterable can emit any type of data.
- The
pull()method should resolve or reject within a reasonable timeframe (e.g., 5 seconds). If it doesn't, consider adding a timeout mechanism (not strictly required for the core functionality, but good practice). - The queue size should be limited to prevent unbounded memory usage. A reasonable limit is 100.
- The implementation should be efficient and avoid unnecessary memory allocations.
Notes
- Consider using
async/awaitfor cleaner asynchronous code. - Think about how to handle concurrency and ensure that the queue is accessed safely.
- The
pull()method should be the primary interface for the consumer to interact with the iterator. - You don't need to implement the source async iterable; you only need to create the iterator that consumes it and applies backpressure.
- Focus on the core backpressure logic and error handling. Advanced features like cancellation are not required.