Hone logo
Hone
Problems

Concurrent Task Executor in Rust

This challenge asks you to implement a simple, thread-pool-based task executor in Rust. A task executor allows you to submit tasks (closures) to be executed concurrently, managing a pool of worker threads to efficiently utilize CPU resources. This is a fundamental building block for many concurrent applications.

Problem Description

You are to implement a TaskExecutor struct that manages a pool of worker threads and executes submitted tasks concurrently. The executor should:

  1. Accept a number of threads: The TaskExecutor should be initialized with a specified number of worker threads.
  2. Submit tasks: Provide a submit method that accepts a closure (a FnOnce() + Send + 'static) representing the task to be executed. The closure should be Send to allow it to be safely moved between threads and 'static to ensure it doesn't contain references to data that might be deallocated before the task completes.
  3. Execute tasks concurrently: The submitted tasks should be executed concurrently by the worker threads in the pool.
  4. Graceful shutdown: Provide a shutdown method that signals the executor to stop accepting new tasks and gracefully terminate the worker threads. The shutdown method should return a Result<(), String> to indicate success or failure.
  5. Handle errors: If a task panics, the executor should catch the panic and log an error message (to stderr is fine) without crashing the entire executor. The panic should not be re-raised.

The executor should use a channel (mpsc) to communicate tasks from the submitter to the worker threads. The worker threads should continuously receive tasks from the channel and execute them until the channel is closed (signaling shutdown).

Examples

Example 1:

Input:
```rust
let executor = TaskExecutor::new(4).unwrap();
executor.submit(|| {
    println!("Task 1 running");
});
executor.submit(|| {
    println!("Task 2 running");
});
executor.submit(|| {
    println!("Task 3 running");
});
executor.shutdown().unwrap();

Output: (Order may vary due to concurrency)

Task 1 running
Task 2 running
Task 3 running

Explanation: Three tasks are submitted to the executor, which uses 4 worker threads to execute them concurrently. The shutdown method gracefully terminates the executor.

Example 2:

Input:
```rust
let executor = TaskExecutor::new(2).unwrap();
executor.submit(|| {
    panic!("Task panicked!");
});
executor.submit(|| {
    println!("Task 4 running");
});
executor.shutdown().unwrap();

Output: (Order may vary due to concurrency)

Task 4 running
[Error message to stderr: "Task panicked!"]

Explanation: One task panics. The executor catches the panic, logs an error message to stderr, and continues executing other tasks. The shutdown method gracefully terminates the executor.

Example 3: (Edge Case - Zero Threads)

Input:
```rust
let executor = TaskExecutor::new(0).unwrap();
executor.submit(|| {
    println!("Task 5 running");
});
executor.shutdown().unwrap();

Output: (No output to stdout, but no panic)

Explanation: The executor is initialized with zero threads. The task is submitted, but never executed. The shutdown method should still succeed.

Constraints

  • The number of threads passed to TaskExecutor::new() must be greater than 0. Return an error if it is 0 or less.
  • The submit method should not block. It should return immediately after submitting the task to the channel.
  • The shutdown method should wait for all submitted tasks to complete before returning.
  • The executor should handle panics within tasks gracefully, preventing the entire executor from crashing.
  • The executor should be thread-safe.

Notes

  • Consider using mpsc channels for communication between the submitter and worker threads.
  • Use thread::spawn to create the worker threads.
  • The FnOnce() trait allows the closure to be executed only once, which is suitable for tasks in a task executor.
  • Think about how to signal the worker threads to terminate gracefully when shutdown is called. Closing the channel is a common approach.
  • Error handling is crucial. Ensure that panics within tasks are caught and handled appropriately.
  • The 'static lifetime bound on the closure is important to prevent dangling references.
use std::sync::mpsc;
use std::thread;
use std::sync::{Arc, Mutex};
use std::io::{self, Write};

pub struct TaskExecutor {
    num_threads: usize,
    sender: mpsc::Sender<Box<dyn FnOnce() + Send + 'static>>,
    receiver: mpsc::Receiver<Box<dyn FnOnce() + Send + 'static>>,
    shutdown_flag: Arc<Mutex<bool>>,
}

impl TaskExecutor {
    pub fn new(num_threads: usize) -> Result<TaskExecutor, String> {
        if num_threads == 0 {
            return Err("Number of threads must be greater than 0".to_string());
        }

        let (sender, receiver) = mpsc::channel();
        let shutdown_flag = Arc::new(Mutex::new(false));

        let executor = TaskExecutor {
            num_threads,
            sender,
            receiver,
            shutdown_flag,
        };

        for _ in 0..num_threads {
            let receiver = receiver.clone();
            let shutdown_flag = Arc::clone(&shutdown_flag);
            thread::spawn(move || {
                loop {
                    match receiver.recv() {
                        Ok(task) => {
                            if shutdown_flag.lock().unwrap() {
                                break;
                            }
                            task();
                        }
                        Err(_) => {
                            // Channel closed, exit thread
                            break;
                        }
                    }
                }
            });
        }

        Ok(executor)
    }

    pub fn submit<F>(&self, task: F)
        where
            F: FnOnce() + Send + 'static
    {
        let task = Box::new(task);
        self.sender.send(task).unwrap();
    }

    pub fn shutdown(&self) -> Result<(), String> {
        {
            let mut shutdown = self.shutdown_flag.lock().unwrap();
            *shutdown = true;
        }

        drop(self.sender); // Close the channel to signal workers to exit

        // Wait for all tasks to complete (not strictly necessary, but good practice)
        // This is a simplified approach and might not be perfectly accurate in all scenarios.
        // A more robust solution would involve tracking the number of submitted tasks and waiting for them to finish.
        thread::sleep(std::time::Duration::from_millis(100)); // Give threads time to finish

        Ok(())
    }
}
Loading editor...
rust