Hone logo
Hone
Problems

Reactive Data Streams with Backpressure in Python

Backpressure handling is crucial when dealing with asynchronous data streams where the producer generates data faster than the consumer can process it. This challenge asks you to implement a simplified backpressure mechanism in Python using asyncio to prevent overwhelming the consumer and ensure data integrity. Successfully implementing backpressure allows your system to gracefully handle varying data rates and avoid resource exhaustion.

Problem Description

You are tasked with creating a system that simulates a data producer and a data consumer connected by a channel. The producer generates integers sequentially, and the consumer processes these integers. However, the consumer operates at a slower rate than the producer. Your goal is to implement backpressure so that the producer slows down when the consumer's buffer is full, preventing the consumer from being overwhelmed and data loss.

Specifically, you need to:

  1. Producer: A function that generates integers from 1 to n (inclusive) asynchronously.
  2. Consumer: A function that consumes integers from the channel asynchronously and processes them (in this case, simply printing them).
  3. Channel (Queue): An asyncio.Queue that acts as a buffer between the producer and consumer. The queue has a limited size.
  4. Backpressure Logic: The producer should check the queue's fullness before adding new items. If the queue is full, the producer should await until space becomes available. This ensures the consumer can keep up.
  5. Error Handling: Handle potential asyncio.CancelledError exceptions gracefully, ensuring the program exits cleanly.

Expected Behavior:

  • The producer should generate integers sequentially.
  • The consumer should process integers as they become available from the queue.
  • When the queue is full, the producer should pause until the consumer consumes items, preventing the queue from overflowing.
  • The program should terminate gracefully when the producer has finished generating all integers.

Examples

Example 1:

Input: n = 5, queue_size = 2

The producer generates [1, 2, 3, 4, 5]. The consumer processes them. Because the queue size is 2, the producer will pause after producing 1 and 2, waiting for the consumer to process them before producing 3, 4, and 5. The output will be:

1
2
3
4
5

Example 2:

Input: n = 10, queue_size = 3

The producer generates [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]. The consumer processes them. The producer will pause multiple times as the queue fills up. The output will be:

1
2
3
4
5
6
7
8
9
10

Example 3: (Edge Case - Small Queue)

Input: n = 100, queue_size = 1

The producer generates integers from 1 to 100. The consumer processes them one at a time. The producer will frequently pause, demonstrating the backpressure mechanism effectively. The output will be a stream of numbers from 1 to 100, printed one per line.

Constraints

  • n (the number of integers to generate) will be between 1 and 1000 (inclusive).
  • queue_size (the maximum size of the asyncio.Queue) will be between 1 and 10 (inclusive).
  • The solution must use asyncio for asynchronous operations.
  • The solution must handle asyncio.CancelledError gracefully.
  • The solution should be reasonably efficient; avoid unnecessary delays or resource consumption.

Notes

  • Consider using asyncio.sleep() to simulate the consumer's processing time. This will make the backpressure effect more apparent.
  • The asyncio.Queue.full() method is useful for checking if the queue is full.
  • The asyncio.Queue.get() method blocks until an item is available.
  • Think about how to signal the consumer to stop when the producer is finished. Using a sentinel value (e.g., None) in the queue can be a good approach.
  • Focus on the core backpressure logic; error handling and output formatting are secondary concerns.
  • The producer and consumer should run concurrently.
Loading editor...
python