Asynchronous Task Execution with concurrent.futures
This challenge focuses on leveraging Python's concurrent.futures module to execute tasks concurrently. You will learn how to submit functions for execution and retrieve their results efficiently, which is crucial for improving the performance of I/O-bound or CPU-bound operations in your Python applications.
Problem Description
Your task is to create a Python script that simulates a series of independent computational tasks and executes them concurrently using concurrent.futures. You need to manage the submission of these tasks and collect their results in a way that demonstrates effective concurrency.
Key Requirements:
- Task Simulation: Define a function that simulates a time-consuming task (e.g., a CPU-bound calculation or an I/O operation like fetching data from a URL). This function should accept an argument and return a result.
- Concurrent Execution: Use either
ThreadPoolExecutororProcessPoolExecutorfromconcurrent.futuresto run multiple instances of your simulated task concurrently. - Result Collection: Collect the results from all completed tasks. The order of results might not necessarily match the order of submission.
- Error Handling: Gracefully handle potential exceptions that might occur during task execution.
Expected Behavior:
The script should launch several instances of the simulated task. As each task completes, its result should be made available. The script should then print all the collected results. If a task fails, an indication of the failure should be displayed instead of a result.
Edge Cases to Consider:
- Tasks that raise exceptions.
- A large number of tasks to be executed.
Examples
Example 1:
Input:
A list of numbers [1, 2, 3, 4, 5] and a task that squares each number after a short delay.
import concurrent.futures
import time
def square_number(number):
"""Simulates a task that squares a number after a delay."""
time.sleep(1) # Simulate work
return number * number
numbers = [1, 2, 3, 4, 5]
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks
future_to_num = {executor.submit(square_number, num): num for num in numbers}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_num):
num = future_to_num[future]
try:
result = future.result()
results.append(result)
print(f"Task for {num} completed with result: {result}")
except Exception as exc:
print(f"Task for {num} generated an exception: {exc}")
print(f"All results: {results}")
Output:
(The order of completion messages may vary due to concurrency. The final results list will contain all squared numbers.)
Task for 1 completed with result: 1
Task for 2 completed with result: 4
Task for 3 completed with result: 9
Task for 4 completed with result: 16
Task for 5 completed with result: 25
All results: [1, 4, 9, 16, 25]
Explanation:
Five square_number tasks are submitted. ThreadPoolExecutor with max_workers=3 ensures that no more than three tasks run at any given moment. as_completed iterates over futures as they finish. Each result is appended to the results list, and a message is printed.
Example 2:
Input:
A list of numbers [10, 20, 30, 40] and a task that divides by zero for a specific input.
import concurrent.futures
import time
def divide_by_number(number):
"""Simulates a task that might raise an exception."""
time.sleep(0.5)
if number == 20:
raise ValueError("Cannot divide by zero for this number!")
return 100 / number
numbers = [10, 20, 30, 40]
results = []
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
future_to_num = {executor.submit(divide_by_number, num): num for num in numbers}
for future in concurrent.futures.as_completed(future_to_num):
num = future_to_num[future]
try:
result = future.result()
results.append(result)
print(f"Task for {num} completed with result: {result}")
except Exception as exc:
print(f"Task for {num} generated an exception: {exc}")
print(f"All collected values (results or error indicators): {results}")
Output: (The order of completion messages may vary.)
Task for 10 completed with result: 10.0
Task for 30 completed with result: 3.3333333333333335
Task for 40 completed with result: 2.5
Task for 20 generated an exception: Cannot divide by zero for this number!
All collected values (results or error indicators): [10.0, 3.3333333333333335, 2.5]
Explanation:
Tasks are submitted to a ProcessPoolExecutor. The task for 20 raises a ValueError. The try-except block around future.result() catches this exception, and an informative message is printed. The results list will only contain successful computations.
Constraints
- The simulated task function should have a delay of at least
0.5seconds to clearly demonstrate concurrency. - The input will be a list of integers.
- You should handle at least 10 tasks concurrently.
- The maximum number of worker threads/processes can be set to
5. - Your script should successfully execute and print results or exceptions for all submitted tasks.
Notes
- Consider when to use
ThreadPoolExecutor(for I/O-bound tasks) versusProcessPoolExecutor(for CPU-bound tasks). For this challenge, either is acceptable. - The
concurrent.futures.as_completed()function is highly recommended for retrieving results as they become available, rather than waiting for all tasks to finish. - Be mindful of how exceptions are propagated and handled when calling
future.result().