Bounded Concurrency Task Executor in Go
Imagine you have a collection of tasks that need to be executed, and you want to run them concurrently to speed up the process. However, you also need to ensure that you don't overwhelm your system by running too many tasks simultaneously. This challenge requires you to build a robust mechanism for executing tasks concurrently with a defined limit on the number of active goroutines.
Problem Description
You need to implement a TaskExecutor struct in Go that can accept a slice of functions (tasks) and execute them concurrently, but with a strict limit on the maximum number of goroutines that can be active at any given time.
Key Requirements:
NewTaskExecutor(concurrency int) *TaskExecutor: A constructor function that initializes aTaskExecutorwith a specifiedconcurrencylimit.Execute(tasks []func() error) error: A method that takes a slice of functions, where each function represents a task to be executed.- Each task function returns an
error. - The
Executemethod should start executing these tasks concurrently, respecting theconcurrencylimit. - If any task returns a non-nil error, the
Executemethod should ideally stop processing further tasks or at least signal that an error occurred. For simplicity in this challenge, if any task returns an error, theExecutemethod should return the first error encountered. If all tasks complete successfully, it should returnnil. - The
Executemethod should only return after all submitted tasks have been completed or an error has been returned.
- Each task function returns an
- Concurrency Limit Enforcement: The
TaskExecutormust ensure that no more thanconcurrencygoroutines are running any given task at any point in time.
Expected Behavior:
- Tasks should be picked up and executed as soon as a "slot" becomes available in the concurrency pool.
- If the number of submitted tasks exceeds the concurrency limit, subsequent tasks should wait until a running task finishes.
- The order of task execution is not guaranteed, but all tasks must be processed.
- Error handling should be robust: the first error encountered should be returned.
Edge Cases:
- Zero or Negative Concurrency: The
NewTaskExecutorshould handle cases whereconcurrencyis zero or negative. A reasonable default or error handling strategy should be employed (e.g., setting a minimum concurrency of 1, or returning an error). For this challenge, assumeconcurrencywill be a positive integer. - Empty Task List: If an empty slice of tasks is provided to
Execute, the method should returnnilimmediately. - Tasks with Delays: Tasks might have varying execution times. The system should gracefully handle this.
Examples
Example 1:
import (
"fmt"
"time"
)
func task(id int, delay time.Duration) func() error {
return func() error {
fmt.Printf("Task %d started\n", id)
time.Sleep(delay)
fmt.Printf("Task %d finished\n", id)
return nil
}
}
// Input:
// concurrency = 2
// tasks = [task(1, 2s), task(2, 1s), task(3, 3s), task(4, 0.5s)]
// Expected Output (example order):
// Task 1 started
// Task 2 started
// Task 2 finished
// Task 3 started
// Task 1 finished
// Task 4 started
// Task 4 finished
// Task 3 finished
// nil (returned by Execute)
// Explanation:
// With concurrency=2, Task 1 and Task 2 start.
// Task 2 finishes first. Then Task 3 starts.
// Task 1 finishes. Then Task 4 starts.
// Task 4 finishes. Then Task 3 finishes.
// All tasks are executed, and no errors occurred.
Example 2:
import (
"errors"
"fmt"
"time"
)
func taskWithError(id int, delay time.Duration, err error) func() error {
return func() error {
fmt.Printf("Task %d started\n", id)
time.Sleep(delay)
fmt.Printf("Task %d finished with error\n", id)
return err
}
}
// Input:
// concurrency = 2
// tasks = [task(1, 1s), taskWithError(2, 0.5s, errors.New("task 2 failed")), task(3, 2s)]
// Expected Output (example order):
// Task 1 started
// Task 2 started
// Task 2 finished with error
// errors.New("task 2 failed") (returned by Execute)
// Explanation:
// Task 1 and Task 2 start.
// Task 2 finishes with an error. The Execute method immediately returns this error.
// Task 3 might have started or not depending on timing, but its result (or lack thereof) is not relevant to the returned error.
Example 3:
import (
"fmt"
"time"
)
func shortTask(id int) func() error {
return func() error {
fmt.Printf("Short task %d started and finished\n", id)
return nil
}
}
// Input:
// concurrency = 3
// tasks = [shortTask(1), shortTask(2), shortTask(3), shortTask(4), shortTask(5)]
// Expected Output (example order):
// Short task 1 started and finished
// Short task 2 started and finished
// Short task 3 started and finished
// Short task 4 started and finished
// Short task 5 started and finished
// nil (returned by Execute)
// Explanation:
// With concurrency=3, the first three tasks start.
// As soon as one finishes, the next available task (4, then 5) starts.
// Because the tasks are short, they might all finish very quickly, potentially before the logging messages fully flush, but the overall behavior is that 3 run at a time.
Constraints
- The
concurrencyparameter passed toNewTaskExecutorwill be an integer greater than 0. - The
tasksslice passed toExecutewill containfunc() error. - The total number of tasks can be large, so an efficient solution is preferred.
- Your solution should be idiomatic Go.
Notes
- Consider using Go's built-in concurrency primitives such as channels and
sync.WaitGroup. - Think about how to signal completion and errors across goroutines.
- A common pattern for managing bounded concurrency is using a channel as a semaphore.