Hone logo
Hone
Problems

Asynchronous Data Stream Processing

Asynchronous generators are powerful tools for handling streams of data in a non-blocking manner. They are particularly useful when dealing with I/O-bound operations, such as fetching data from external APIs, reading from large files, or processing real-time events. This challenge will guide you through creating and utilizing an async generator to process a simulated stream of data.

Problem Description

Your task is to create an asynchronous generator function that simulates fetching data in chunks and processing it. The generator should yield processed data items one by one as they become available, allowing the consumer to iterate over the stream without blocking.

Requirements:

  1. Create an async generator function: This function should be named process_data_stream and accept an iterable of raw data items (e.g., a list of dictionaries or strings).
  2. Simulate asynchronous fetching: For each raw data item, introduce an artificial delay using asyncio.sleep() to mimic network latency or I/O operations.
  3. Process data: After the simulated fetch, process each data item. For this challenge, processing can be as simple as converting it to uppercase if it's a string, or adding a "processed" key if it's a dictionary.
  4. Yield processed items: The async generator should yield each processed data item.
  5. Handle potential errors: Although not strictly required for this basic version, consider how you might gracefully handle errors during the processing of a single item (e.g., skipping it and continuing). For this challenge, assume successful processing.
  6. Consume the async generator: Write a separate async function that consumes the process_data_stream generator, iterating over the yielded processed items and printing them.

Expected Behavior: The process_data_stream function will take a collection of raw data, simulate fetching each item with a delay, process it, and yield the result. The consuming function will then receive these processed items and display them in the order they are yielded.

Edge Cases:

  • Empty input stream: The generator should handle an empty iterable gracefully, yielding nothing.
  • Varying data types: The processing logic should be robust enough to handle different (but expected) data types within the input stream.

Examples

Example 1:

import asyncio

async def process_data_stream(raw_data_items):
    for item in raw_data_items:
        await asyncio.sleep(0.1)  # Simulate async fetch
        if isinstance(item, str):
            processed_item = item.upper()
        elif isinstance(item, dict):
            processed_item = item.copy() # Avoid modifying original
            processed_item["processed"] = True
        else:
            processed_item = item # Handle other types as is
        yield processed_item

async def consume_stream(data_source):
    print("Starting to consume stream...")
    async for processed_item in process_data_stream(data_source):
        print(f"Received: {processed_item}")
    print("Finished consuming stream.")

# --- Running the example ---
async def main():
    input_data = ["hello", "world", {"id": 1, "name": "item1"}, "python"]
    await consume_stream(input_data)

if __name__ == "__main__":
    asyncio.run(main())

Output:

Starting to consume stream...
Received: HELLO
Received: WORLD
Received: {'id': 1, 'name': 'item1', 'processed': True}
Received: PYTHON
Finished consuming stream.

Explanation: The process_data_stream generator takes a list of strings and a dictionary. Each item is fetched with a 0.1-second delay. Strings are converted to uppercase, and the dictionary gets an additional "processed" key. The consume_stream function iterates through the yielded items and prints them as they are received.

Example 2: Empty Stream

import asyncio

async def process_data_stream(raw_data_items):
    for item in raw_data_items:
        await asyncio.sleep(0.1)
        if isinstance(item, str):
            processed_item = item.upper()
        else:
            processed_item = item
        yield processed_item

async def consume_stream(data_source):
    print("Starting to consume stream...")
    count = 0
    async for processed_item in process_data_stream(data_source):
        print(f"Received: {processed_item}")
        count += 1
    print(f"Finished consuming stream. Processed {count} items.")

# --- Running the example ---
async def main():
    input_data = []
    await consume_stream(input_data)

if __name__ == "__main__":
    asyncio.run(main())

Output:

Starting to consume stream...
Finished consuming stream. Processed 0 items.

Explanation: When provided with an empty list, the process_data_stream generator yields nothing, and the consuming function correctly reports that no items were processed.

Constraints

  • The input raw_data_items will be an iterable (e.g., list, tuple).
  • Items within raw_data_items will be either strings or dictionaries.
  • The simulated delay (asyncio.sleep) for each item should be between 0.05 and 0.2 seconds.
  • The consume_stream function should use async for to iterate over the async generator.
  • The total number of items in the input stream will not exceed 1000.

Notes

  • Remember to import the asyncio module.
  • An async generator is defined using async def and yield.
  • Consuming an async generator requires an async for loop.
  • The asyncio.run() function is typically used to start the top-level async function.
  • Consider the difference between yield and return in async generators. yield pauses execution and returns a value, while return would terminate the generator.
Loading editor...
python