Hone logo
Hone
Problems

Mastering Concurrency: Implementing a Worker Pool with sync.WaitGroup

Concurrency is a fundamental aspect of modern software development, enabling applications to perform multiple tasks simultaneously, improving responsiveness and efficiency. In Go, sync.WaitGroup is a powerful tool for coordinating goroutines, ensuring that a program waits for a collection of goroutines to finish their execution before proceeding. This challenge will test your understanding of sync.WaitGroup by having you implement a basic worker pool pattern.

Problem Description

Your task is to create a Go program that simulates a simple worker pool. You will have a fixed number of worker goroutines that process a list of tasks. Each task will take a variable amount of time to complete. The main goroutine must wait for all worker goroutines to finish processing all tasks before exiting.

What needs to be achieved:

  1. Define a set of tasks: These tasks can be represented as simple functions or closures that perform some operation and potentially print a message.
  2. Create a worker pool: Implement a mechanism where a predetermined number of goroutines (workers) are launched.
  3. Distribute tasks to workers: Tasks should be assigned to available workers for processing. A channel can be used to distribute tasks.
  4. Coordinate completion: Use sync.WaitGroup to ensure the main goroutine waits until all tasks are completed by all workers.
  5. Graceful shutdown: Ensure that workers can detect when there are no more tasks and exit their goroutine.

Key Requirements:

  • You must use sync.WaitGroup to synchronize the completion of worker goroutines.
  • The program should launch a configurable number of worker goroutines.
  • The program should process a configurable list of tasks.
  • Each task should simulate some work by sleeping for a random duration.
  • The main goroutine must not exit until all tasks are processed.

Expected Behavior: The program should launch the specified number of workers. Workers will pick up tasks, process them (simulated by sleeping), and report completion. The main goroutine will wait for all tasks to be completed and then print a "All tasks completed!" message before exiting. The order in which tasks are completed might vary due to concurrency.

Edge Cases to Consider:

  • No tasks: What happens if an empty list of tasks is provided? The program should still exit gracefully.
  • Zero workers: While not ideal, consider how the program would behave (it should likely hang or error if not handled, but for this challenge, we'll assume at least one worker).
  • Tasks taking longer than expected: The WaitGroup should correctly handle tasks with varying durations.

Examples

Example 1:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task represents a unit of work
type Task func()

// simulateWork performs a simulated task
func simulateWork(id int, wg *sync.WaitGroup) {
	defer wg.Done() // Signal that this worker has finished its task
	fmt.Printf("Worker %d starting task...\n", id)
	// Simulate work by sleeping for a random duration
	sleepDuration := time.Duration(rand.Intn(500)+100) * time.Millisecond
	time.Sleep(sleepDuration)
	fmt.Printf("Worker %d finished task in %s.\n", id, sleepDuration)
}

func main() {
	numWorkers := 3
	numTasks := 5

	var wg sync.WaitGroup
	tasks := make(chan Task, numTasks)

	// Launch worker goroutines
	for i := 1; i <= numWorkers; i++ {
		go func(workerID int) {
			for task := range tasks {
				task()
			}
		}(i)
	}

	// Add tasks to the channel
	for i := 0; i < numTasks; i++ {
		wg.Add(1) // Increment the WaitGroup counter for each task
		taskID := i // Capture loop variable
		tasks <- func() {
			defer wg.Done() // Decrement the WaitGroup counter when the task is done
			fmt.Printf("Processing task %d...\n", taskID)
			time.Sleep(time.Duration(rand.Intn(300)+50) * time.Millisecond)
			fmt.Printf("Task %d completed.\n", taskID)
		}
	}
	close(tasks) // Close the tasks channel to signal no more tasks will be sent

	wg.Wait() // Wait for all tasks to complete

	fmt.Println("All tasks completed!")
}

Input: The code above is a self-contained example. The "input" to the program are the parameters numWorkers and numTasks.

Output (will vary due to concurrency and random sleep durations):

Worker 1 starting task...
Worker 2 starting task...
Worker 3 starting task...
Processing task 0...
Worker 1 finished task in 345ms.
Processing task 1...
Worker 2 finished task in 210ms.
Processing task 2...
Worker 3 finished task in 490ms.
Task 0 completed.
Processing task 3...
Worker 1 finished task in 150ms.
Task 1 completed.
Processing task 4...
Worker 2 finished task in 400ms.
Task 2 completed.
Worker 3 finished task in 250ms.
Task 3 completed.
Worker 1 finished task in 300ms.
Task 4 completed.
All tasks completed!

Explanation: The main function sets up numWorkers and numTasks. It creates a sync.WaitGroup. Tasks are sent to a channel. Each worker goroutine continuously receives tasks from the channel. Before sending a task, wg.Add(1) is called. Inside the task's closure, wg.Done() is deferred to decrement the counter when the task is finished. wg.Wait() blocks the main goroutine until the counter reaches zero.

Example 2:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	numWorkers := 2
	numTasks := 3

	var wg sync.WaitGroup
	taskChan := make(chan int, numTasks) // Channel to send task IDs

	// Start workers
	for i := 1; i <= numWorkers; i++ {
		go func(workerID int) {
			for taskID := range taskChan {
				fmt.Printf("Worker %d: Starting task %d\n", workerID, taskID)
				time.Sleep(time.Duration(rand.Intn(400)+200) * time.Millisecond) // Simulate work
				fmt.Printf("Worker %d: Finished task %d\n", workerID, taskID)
				wg.Done() // Signal completion of this specific task
			}
		}(i)
	}

	// Send tasks to the channel
	for i := 0; i < numTasks; i++ {
		wg.Add(1) // Increment counter for each task
		taskChan <- i
	}
	close(taskChan) // No more tasks will be sent

	wg.Wait() // Wait for all tasks to be processed
	fmt.Println("All tasks have been successfully processed.")
}

Input: Similar to Example 1, this is a self-contained program. The effective "input" parameters are numWorkers = 2 and numTasks = 3.

Output (will vary):

Worker 1: Starting task 0
Worker 2: Starting task 1
Worker 1: Finished task 0
Worker 1: Starting task 2
Worker 2: Finished task 1
Worker 2: Starting task 2
Worker 1: Finished task 2
All tasks have been successfully processed.

Explanation: This example uses a channel of integers to represent task IDs. Each worker receives a task ID, performs simulated work, and then calls wg.Done(). The wg.Add(1) call happens before sending the task ID into the channel. The close(taskChan) signals to the workers that no more tasks will arrive. wg.Wait() ensures all wg.Done() calls corresponding to wg.Add(1) have occurred.

Constraints

  • The number of worker goroutines will be between 1 and 10.
  • The number of tasks will be between 0 and 50.
  • The simulated work duration for each task will be between 50ms and 500ms.
  • The solution must use sync.WaitGroup to manage goroutine completion.
  • Standard Go libraries are permitted.

Notes

  • Consider how to signal to the worker goroutines that there are no more tasks to process. Closing the task channel is a common and idiomatic way to do this.
  • Remember that wg.Add() should be called before starting a goroutine that will call wg.Done().
  • wg.Done() is typically deferred within the goroutine or the function that represents the work unit.
  • Be mindful of race conditions when accessing shared variables, although this challenge primarily focuses on WaitGroup synchronization, not complex shared state management.
  • The order of output messages will likely differ between runs due to the nature of concurrent execution.
Loading editor...
go