Go Worker Pool Implementation
Concurrency is a fundamental aspect of modern software development. In Go, goroutines provide a lightweight way to achieve concurrency. A common pattern to manage a large number of concurrent tasks efficiently is the worker pool. This challenge asks you to implement a generic worker pool in Go to process a collection of tasks concurrently.
Problem Description
Your task is to design and implement a WorkerPool struct in Go that can manage a fixed number of worker goroutines. The worker pool should be able to accept a stream of tasks, distribute them among the available workers, and collect their results.
Key Requirements:
- Generic Task and Result Types: The worker pool should be able to handle any type of task and produce any type of result. This implies using Go generics.
- Fixed Number of Workers: The pool should be initialized with a specific number of worker goroutines.
- Task Submission: A method to submit tasks to the pool. Tasks should be processed as they arrive, as long as there are available workers.
- Result Collection: A mechanism to collect the results of the completed tasks. The order of results does not necessarily need to match the order of task submission, but all results must be returned.
- Graceful Shutdown: A method to signal the pool to stop accepting new tasks and to wait for all currently running tasks to complete before returning.
- Error Handling: If a task function returns an error, the worker pool should be able to capture and report these errors.
Expected Behavior:
When tasks are submitted, they should be picked up by any available worker. If all workers are busy, tasks should be queued internally and processed when a worker becomes free. Upon shutdown, the pool should not accept new tasks, and all in-progress tasks should finish.
Edge Cases to Consider:
- Submitting tasks to a pool that has already been shut down.
- Submitting zero tasks.
- A task function that panics.
- A worker pool with zero workers (though this might be disallowed by constraints).
Examples
Example 1: Let's assume tasks are integers and the worker processes them by squaring them.
Input Tasks: []int{1, 2, 3, 4, 5}
Number of Workers: 2
// Task: square a number
square := func(n int) (int, error) {
return n * n, nil
}
// Submit tasks
pool.Submit(square, 1)
pool.Submit(square, 2)
pool.Submit(square, 3)
pool.Submit(square, 4)
pool.Submit(square, 5)
// Shutdown and collect results
results := pool.Shutdown()
Output Results (order may vary): [1, 4, 9, 16, 25]
Explanation: The worker pool with 2 workers processes the 5 tasks concurrently. For instance, worker 1 might take tasks 1 and 3, while worker 2 takes tasks 2 and 4. Task 5 would wait for a worker to become free. All results are eventually collected.
Example 2: Task that might return an error.
Input Tasks: []int{10, 0, 5}
Worker Function:
divideByTen := func(n int) (float64, error) {
if n == 0 {
return 0, fmt.Errorf("cannot divide by zero")
}
return float64(n) / 10.0, nil
}
Number of Workers: 1
// Submit tasks
pool.Submit(divideByTen, 10)
pool.Submit(divideByTen, 0)
pool.Submit(divideByTen, 5)
// Shutdown and collect results
results, errors := pool.Shutdown() // Assuming a method that returns both
Expected Output (structure will depend on your design for collecting results/errors):
Results: [1.0, 0.5]
Errors: [runtime error: cannot divide by zero] (The exact format of error reporting depends on your implementation.)
Explanation: The first task divideByTen(10) successfully returns 1.0. The second task divideByTen(0) returns an error. The third task divideByTen(5) returns 0.5. The worker pool collects both successful results and any errors encountered.
Constraints
- The number of workers (
N) will be between1and100(inclusive). - The number of tasks submitted (
M) can be up to1000. - Task functions will take a single argument of type
Tand return a value of typeRand anerror. - The worker pool must be implemented using standard Go libraries.
Notes
- Consider using Go channels for communication between the main goroutine and the worker goroutines, and among the workers themselves.
- A
sync.WaitGroupcan be helpful to manage the completion of worker goroutines. - Think about how you will manage the internal queue of tasks if all workers are busy.
- How will you handle the collection of results to ensure all are gathered without blocking indefinitely?
- Consider using
context.Contextfor cancellation signals if you want to add more sophisticated cancellation capabilities beyond just shutdown.