Asynchronous Data Stream Processing
Asynchronous generators are powerful tools for handling streams of data in a non-blocking manner. They are particularly useful when dealing with I/O-bound operations, such as fetching data from external APIs, reading from large files, or processing real-time events. This challenge will guide you through creating and utilizing an async generator to process a simulated stream of data.
Problem Description
Your task is to create an asynchronous generator function that simulates fetching data in chunks and processing it. The generator should yield processed data items one by one as they become available, allowing the consumer to iterate over the stream without blocking.
Requirements:
- Create an async generator function: This function should be named
process_data_streamand accept an iterable of raw data items (e.g., a list of dictionaries or strings). - Simulate asynchronous fetching: For each raw data item, introduce an artificial delay using
asyncio.sleep()to mimic network latency or I/O operations. - Process data: After the simulated fetch, process each data item. For this challenge, processing can be as simple as converting it to uppercase if it's a string, or adding a "processed" key if it's a dictionary.
- Yield processed items: The async generator should
yieldeach processed data item. - Handle potential errors: Although not strictly required for this basic version, consider how you might gracefully handle errors during the processing of a single item (e.g., skipping it and continuing). For this challenge, assume successful processing.
- Consume the async generator: Write a separate async function that consumes the
process_data_streamgenerator, iterating over the yielded processed items and printing them.
Expected Behavior:
The process_data_stream function will take a collection of raw data, simulate fetching each item with a delay, process it, and yield the result. The consuming function will then receive these processed items and display them in the order they are yielded.
Edge Cases:
- Empty input stream: The generator should handle an empty iterable gracefully, yielding nothing.
- Varying data types: The processing logic should be robust enough to handle different (but expected) data types within the input stream.
Examples
Example 1:
import asyncio
async def process_data_stream(raw_data_items):
for item in raw_data_items:
await asyncio.sleep(0.1) # Simulate async fetch
if isinstance(item, str):
processed_item = item.upper()
elif isinstance(item, dict):
processed_item = item.copy() # Avoid modifying original
processed_item["processed"] = True
else:
processed_item = item # Handle other types as is
yield processed_item
async def consume_stream(data_source):
print("Starting to consume stream...")
async for processed_item in process_data_stream(data_source):
print(f"Received: {processed_item}")
print("Finished consuming stream.")
# --- Running the example ---
async def main():
input_data = ["hello", "world", {"id": 1, "name": "item1"}, "python"]
await consume_stream(input_data)
if __name__ == "__main__":
asyncio.run(main())
Output:
Starting to consume stream...
Received: HELLO
Received: WORLD
Received: {'id': 1, 'name': 'item1', 'processed': True}
Received: PYTHON
Finished consuming stream.
Explanation: The process_data_stream generator takes a list of strings and a dictionary. Each item is fetched with a 0.1-second delay. Strings are converted to uppercase, and the dictionary gets an additional "processed" key. The consume_stream function iterates through the yielded items and prints them as they are received.
Example 2: Empty Stream
import asyncio
async def process_data_stream(raw_data_items):
for item in raw_data_items:
await asyncio.sleep(0.1)
if isinstance(item, str):
processed_item = item.upper()
else:
processed_item = item
yield processed_item
async def consume_stream(data_source):
print("Starting to consume stream...")
count = 0
async for processed_item in process_data_stream(data_source):
print(f"Received: {processed_item}")
count += 1
print(f"Finished consuming stream. Processed {count} items.")
# --- Running the example ---
async def main():
input_data = []
await consume_stream(input_data)
if __name__ == "__main__":
asyncio.run(main())
Output:
Starting to consume stream...
Finished consuming stream. Processed 0 items.
Explanation: When provided with an empty list, the process_data_stream generator yields nothing, and the consuming function correctly reports that no items were processed.
Constraints
- The input
raw_data_itemswill be an iterable (e.g., list, tuple). - Items within
raw_data_itemswill be either strings or dictionaries. - The simulated delay (
asyncio.sleep) for each item should be between 0.05 and 0.2 seconds. - The
consume_streamfunction should useasync forto iterate over the async generator. - The total number of items in the input stream will not exceed 1000.
Notes
- Remember to import the
asynciomodule. - An async generator is defined using
async defandyield. - Consuming an async generator requires an
async forloop. - The
asyncio.run()function is typically used to start the top-level async function. - Consider the difference between
yieldandreturnin async generators.yieldpauses execution and returns a value, whilereturnwould terminate the generator.