Hone logo
Hone
Problems

Python Background Task Scheduler

This challenge focuses on implementing background tasks in Python, a crucial skill for improving application responsiveness and handling long-running operations without blocking the main program flow. You will create a system that can schedule and execute tasks asynchronously.

Problem Description

Your task is to build a simple background task scheduler. The scheduler should be able to:

  1. Register tasks: Allow users to register functions to be executed in the background.
  2. Schedule tasks: Provide a mechanism to schedule these registered tasks to run at a specified time in the future or with a recurring interval.
  3. Execute tasks: Run scheduled tasks in a separate thread or process to avoid blocking the main application.
  4. Handle task completion/errors: Provide a way to track the status of tasks (e.g., running, completed, failed) and potentially capture their output or exceptions.

Key Requirements:

  • The scheduler should be thread-safe.
  • Tasks should be executed in a non-blocking manner.
  • The scheduler should support one-time execution at a specific time.
  • The scheduler should support periodic execution at a fixed interval.
  • You should be able to retrieve the results of a completed task (if any).
  • Exceptions raised by tasks should be caught and reported.

Expected Behavior:

When a task is scheduled, it should be picked up by a background worker and executed. The main program should not wait for the task to complete. If a task is scheduled to run at a future time, it should only start executing at or after that time. Periodic tasks should repeat indefinitely until explicitly stopped (though for this challenge, we'll assume they run a finite number of times for demonstration).

Edge Cases to Consider:

  • Scheduling a task with invalid arguments.
  • Scheduling a task that raises an exception.
  • Scheduling tasks at very close or overlapping times.
  • The scheduler starting and stopping gracefully.

Examples

Example 1: One-Time Task Scheduling

import time
from datetime import datetime, timedelta

# Assume MyScheduler is your implemented scheduler class

def say_hello(name):
    print(f"Hello, {name}!")
    return f"Greeting for {name} sent."

scheduler = MyScheduler()
now = datetime.now()
run_time = now + timedelta(seconds=2)

# Schedule the task to run in 2 seconds
task_id = scheduler.schedule_one_time(say_hello, run_time, args=["Alice"])

print(f"Task scheduled with ID: {task_id}")

# Simulate the main program doing other work
time.sleep(3)

# Later, check the status or result
result = scheduler.get_task_result(task_id)
print(f"Task result: {result}")

# Expected Output (after 2 seconds):
# Task scheduled with ID: <some_unique_id>
# Hello, Alice!
# Task result: Greeting for Alice sent.

Example 2: Periodic Task Scheduling

import time
from datetime import timedelta

# Assume MyScheduler is your implemented scheduler class

def count_up(start, step):
    print(f"Counting: {start}")
    return start + step

scheduler = MyScheduler()

# Schedule a task to run every 1 second, starting immediately
task_id = scheduler.schedule_periodic(count_up, timedelta(seconds=1), args=[0, 1])

print(f"Periodic task scheduled with ID: {task_id}")

# Let it run for a few seconds
time.sleep(3.5)

# To stop a periodic task (implementation detail for the challenge)
# scheduler.stop_task(task_id)

# Expected Output (will vary slightly due to timing):
# Periodic task scheduled with ID: <some_unique_id>
# Counting: 0
# Counting: 1
# Counting: 2
# Counting: 3

Example 3: Task with Exception Handling

import time

# Assume MyScheduler is your implemented scheduler class

def divide_by_zero(x):
    return x / 0

scheduler = MyScheduler()
now = datetime.now()
run_time = now + timedelta(seconds=1)

task_id = scheduler.schedule_one_time(divide_by_zero, run_time, args=[10])

print(f"Task with potential error scheduled: {task_id}")

time.sleep(2)

# Check status and error
status = scheduler.get_task_status(task_id)
error = scheduler.get_task_error(task_id)

print(f"Task status: {status}")
print(f"Task error: {error}")

# Expected Output:
# Task with potential error scheduled: <some_unique_id>
# Task status: FAILED
# Task error: ZeroDivisionError('division by zero')

Constraints

  • The scheduler should be able to manage at least 100 concurrent tasks.
  • Task scheduling times can be up to 24 hours in the future.
  • Periodic task intervals can be as small as 0.1 seconds.
  • The schedule_periodic function should accept a timedelta object for the interval.
  • Task IDs should be unique identifiers (e.g., UUIDs).

Notes

  • You'll likely need to use Python's threading or multiprocessing modules for executing tasks in the background.
  • A way to manage a queue of tasks to be executed is essential.
  • Consider how you will store task states (e.g., pending, running, completed, failed), results, and exceptions.
  • For schedule_periodic, you may want to add a mechanism to stop the recurring task, although for simplicity in this challenge, the examples show them running for a duration.
  • Think about how you'll handle tasks that might take a long time to complete – your scheduler should not block.
  • This challenge encourages you to explore different approaches for task management and synchronization.
Loading editor...
python