Concurrent Task Executor in Rust
This challenge asks you to implement a simple, thread-pool-based task executor in Rust. A task executor allows you to submit tasks (closures) to be executed concurrently, managing a pool of worker threads to efficiently utilize CPU resources. This is a fundamental building block for many concurrent applications.
Problem Description
You are to implement a TaskExecutor struct that manages a pool of worker threads and executes submitted tasks concurrently. The executor should:
- Accept a number of threads: The
TaskExecutorshould be initialized with a specified number of worker threads. - Submit tasks: Provide a
submitmethod that accepts a closure (aFnOnce() + Send + 'static) representing the task to be executed. The closure should beSendto allow it to be safely moved between threads and'staticto ensure it doesn't contain references to data that might be deallocated before the task completes. - Execute tasks concurrently: The submitted tasks should be executed concurrently by the worker threads in the pool.
- Graceful shutdown: Provide a
shutdownmethod that signals the executor to stop accepting new tasks and gracefully terminate the worker threads. Theshutdownmethod should return aResult<(), String>to indicate success or failure. - Handle errors: If a task panics, the executor should catch the panic and log an error message (to
stderris fine) without crashing the entire executor. The panic should not be re-raised.
The executor should use a channel (mpsc) to communicate tasks from the submitter to the worker threads. The worker threads should continuously receive tasks from the channel and execute them until the channel is closed (signaling shutdown).
Examples
Example 1:
Input:
```rust
let executor = TaskExecutor::new(4).unwrap();
executor.submit(|| {
println!("Task 1 running");
});
executor.submit(|| {
println!("Task 2 running");
});
executor.submit(|| {
println!("Task 3 running");
});
executor.shutdown().unwrap();
Output: (Order may vary due to concurrency)
Task 1 running
Task 2 running
Task 3 running
Explanation: Three tasks are submitted to the executor, which uses 4 worker threads to execute them concurrently. The shutdown method gracefully terminates the executor.
Example 2:
Input:
```rust
let executor = TaskExecutor::new(2).unwrap();
executor.submit(|| {
panic!("Task panicked!");
});
executor.submit(|| {
println!("Task 4 running");
});
executor.shutdown().unwrap();
Output: (Order may vary due to concurrency)
Task 4 running
[Error message to stderr: "Task panicked!"]
Explanation: One task panics. The executor catches the panic, logs an error message to stderr, and continues executing other tasks. The shutdown method gracefully terminates the executor.
Example 3: (Edge Case - Zero Threads)
Input:
```rust
let executor = TaskExecutor::new(0).unwrap();
executor.submit(|| {
println!("Task 5 running");
});
executor.shutdown().unwrap();
Output: (No output to stdout, but no panic)
Explanation: The executor is initialized with zero threads. The task is submitted, but never executed. The shutdown method should still succeed.
Constraints
- The number of threads passed to
TaskExecutor::new()must be greater than 0. Return an error if it is 0 or less. - The
submitmethod should not block. It should return immediately after submitting the task to the channel. - The
shutdownmethod should wait for all submitted tasks to complete before returning. - The executor should handle panics within tasks gracefully, preventing the entire executor from crashing.
- The executor should be thread-safe.
Notes
- Consider using
mpscchannels for communication between the submitter and worker threads. - Use
thread::spawnto create the worker threads. - The
FnOnce()trait allows the closure to be executed only once, which is suitable for tasks in a task executor. - Think about how to signal the worker threads to terminate gracefully when
shutdownis called. Closing the channel is a common approach. - Error handling is crucial. Ensure that panics within tasks are caught and handled appropriately.
- The
'staticlifetime bound on the closure is important to prevent dangling references.
use std::sync::mpsc;
use std::thread;
use std::sync::{Arc, Mutex};
use std::io::{self, Write};
pub struct TaskExecutor {
num_threads: usize,
sender: mpsc::Sender<Box<dyn FnOnce() + Send + 'static>>,
receiver: mpsc::Receiver<Box<dyn FnOnce() + Send + 'static>>,
shutdown_flag: Arc<Mutex<bool>>,
}
impl TaskExecutor {
pub fn new(num_threads: usize) -> Result<TaskExecutor, String> {
if num_threads == 0 {
return Err("Number of threads must be greater than 0".to_string());
}
let (sender, receiver) = mpsc::channel();
let shutdown_flag = Arc::new(Mutex::new(false));
let executor = TaskExecutor {
num_threads,
sender,
receiver,
shutdown_flag,
};
for _ in 0..num_threads {
let receiver = receiver.clone();
let shutdown_flag = Arc::clone(&shutdown_flag);
thread::spawn(move || {
loop {
match receiver.recv() {
Ok(task) => {
if shutdown_flag.lock().unwrap() {
break;
}
task();
}
Err(_) => {
// Channel closed, exit thread
break;
}
}
}
});
}
Ok(executor)
}
pub fn submit<F>(&self, task: F)
where
F: FnOnce() + Send + 'static
{
let task = Box::new(task);
self.sender.send(task).unwrap();
}
pub fn shutdown(&self) -> Result<(), String> {
{
let mut shutdown = self.shutdown_flag.lock().unwrap();
*shutdown = true;
}
drop(self.sender); // Close the channel to signal workers to exit
// Wait for all tasks to complete (not strictly necessary, but good practice)
// This is a simplified approach and might not be perfectly accurate in all scenarios.
// A more robust solution would involve tracking the number of submitted tasks and waiting for them to finish.
thread::sleep(std::time::Duration::from_millis(100)); // Give threads time to finish
Ok(())
}
}