Python Background Task Scheduler
This challenge focuses on implementing background tasks in Python, a crucial skill for improving application responsiveness and handling long-running operations without blocking the main program flow. You will create a system that can schedule and execute tasks asynchronously.
Problem Description
Your task is to build a simple background task scheduler. The scheduler should be able to:
- Register tasks: Allow users to register functions to be executed in the background.
- Schedule tasks: Provide a mechanism to schedule these registered tasks to run at a specified time in the future or with a recurring interval.
- Execute tasks: Run scheduled tasks in a separate thread or process to avoid blocking the main application.
- Handle task completion/errors: Provide a way to track the status of tasks (e.g., running, completed, failed) and potentially capture their output or exceptions.
Key Requirements:
- The scheduler should be thread-safe.
- Tasks should be executed in a non-blocking manner.
- The scheduler should support one-time execution at a specific time.
- The scheduler should support periodic execution at a fixed interval.
- You should be able to retrieve the results of a completed task (if any).
- Exceptions raised by tasks should be caught and reported.
Expected Behavior:
When a task is scheduled, it should be picked up by a background worker and executed. The main program should not wait for the task to complete. If a task is scheduled to run at a future time, it should only start executing at or after that time. Periodic tasks should repeat indefinitely until explicitly stopped (though for this challenge, we'll assume they run a finite number of times for demonstration).
Edge Cases to Consider:
- Scheduling a task with invalid arguments.
- Scheduling a task that raises an exception.
- Scheduling tasks at very close or overlapping times.
- The scheduler starting and stopping gracefully.
Examples
Example 1: One-Time Task Scheduling
import time
from datetime import datetime, timedelta
# Assume MyScheduler is your implemented scheduler class
def say_hello(name):
print(f"Hello, {name}!")
return f"Greeting for {name} sent."
scheduler = MyScheduler()
now = datetime.now()
run_time = now + timedelta(seconds=2)
# Schedule the task to run in 2 seconds
task_id = scheduler.schedule_one_time(say_hello, run_time, args=["Alice"])
print(f"Task scheduled with ID: {task_id}")
# Simulate the main program doing other work
time.sleep(3)
# Later, check the status or result
result = scheduler.get_task_result(task_id)
print(f"Task result: {result}")
# Expected Output (after 2 seconds):
# Task scheduled with ID: <some_unique_id>
# Hello, Alice!
# Task result: Greeting for Alice sent.
Example 2: Periodic Task Scheduling
import time
from datetime import timedelta
# Assume MyScheduler is your implemented scheduler class
def count_up(start, step):
print(f"Counting: {start}")
return start + step
scheduler = MyScheduler()
# Schedule a task to run every 1 second, starting immediately
task_id = scheduler.schedule_periodic(count_up, timedelta(seconds=1), args=[0, 1])
print(f"Periodic task scheduled with ID: {task_id}")
# Let it run for a few seconds
time.sleep(3.5)
# To stop a periodic task (implementation detail for the challenge)
# scheduler.stop_task(task_id)
# Expected Output (will vary slightly due to timing):
# Periodic task scheduled with ID: <some_unique_id>
# Counting: 0
# Counting: 1
# Counting: 2
# Counting: 3
Example 3: Task with Exception Handling
import time
# Assume MyScheduler is your implemented scheduler class
def divide_by_zero(x):
return x / 0
scheduler = MyScheduler()
now = datetime.now()
run_time = now + timedelta(seconds=1)
task_id = scheduler.schedule_one_time(divide_by_zero, run_time, args=[10])
print(f"Task with potential error scheduled: {task_id}")
time.sleep(2)
# Check status and error
status = scheduler.get_task_status(task_id)
error = scheduler.get_task_error(task_id)
print(f"Task status: {status}")
print(f"Task error: {error}")
# Expected Output:
# Task with potential error scheduled: <some_unique_id>
# Task status: FAILED
# Task error: ZeroDivisionError('division by zero')
Constraints
- The scheduler should be able to manage at least 100 concurrent tasks.
- Task scheduling times can be up to 24 hours in the future.
- Periodic task intervals can be as small as 0.1 seconds.
- The
schedule_periodicfunction should accept atimedeltaobject for the interval. - Task IDs should be unique identifiers (e.g., UUIDs).
Notes
- You'll likely need to use Python's
threadingormultiprocessingmodules for executing tasks in the background. - A way to manage a queue of tasks to be executed is essential.
- Consider how you will store task states (e.g., pending, running, completed, failed), results, and exceptions.
- For
schedule_periodic, you may want to add a mechanism to stop the recurring task, although for simplicity in this challenge, the examples show them running for a duration. - Think about how you'll handle tasks that might take a long time to complete – your scheduler should not block.
- This challenge encourages you to explore different approaches for task management and synchronization.