Hone logo
Hone
Problems

Bounded Concurrency Task Executor in Go

Imagine you have a collection of tasks that need to be executed, and you want to run them concurrently to speed up the process. However, you also need to ensure that you don't overwhelm your system by running too many tasks simultaneously. This challenge requires you to build a robust mechanism for executing tasks concurrently with a defined limit on the number of active goroutines.

Problem Description

You need to implement a TaskExecutor struct in Go that can accept a slice of functions (tasks) and execute them concurrently, but with a strict limit on the maximum number of goroutines that can be active at any given time.

Key Requirements:

  1. NewTaskExecutor(concurrency int) *TaskExecutor: A constructor function that initializes a TaskExecutor with a specified concurrency limit.
  2. Execute(tasks []func() error) error: A method that takes a slice of functions, where each function represents a task to be executed.
    • Each task function returns an error.
    • The Execute method should start executing these tasks concurrently, respecting the concurrency limit.
    • If any task returns a non-nil error, the Execute method should ideally stop processing further tasks or at least signal that an error occurred. For simplicity in this challenge, if any task returns an error, the Execute method should return the first error encountered. If all tasks complete successfully, it should return nil.
    • The Execute method should only return after all submitted tasks have been completed or an error has been returned.
  3. Concurrency Limit Enforcement: The TaskExecutor must ensure that no more than concurrency goroutines are running any given task at any point in time.

Expected Behavior:

  • Tasks should be picked up and executed as soon as a "slot" becomes available in the concurrency pool.
  • If the number of submitted tasks exceeds the concurrency limit, subsequent tasks should wait until a running task finishes.
  • The order of task execution is not guaranteed, but all tasks must be processed.
  • Error handling should be robust: the first error encountered should be returned.

Edge Cases:

  • Zero or Negative Concurrency: The NewTaskExecutor should handle cases where concurrency is zero or negative. A reasonable default or error handling strategy should be employed (e.g., setting a minimum concurrency of 1, or returning an error). For this challenge, assume concurrency will be a positive integer.
  • Empty Task List: If an empty slice of tasks is provided to Execute, the method should return nil immediately.
  • Tasks with Delays: Tasks might have varying execution times. The system should gracefully handle this.

Examples

Example 1:

import (
	"fmt"
	"time"
)

func task(id int, delay time.Duration) func() error {
	return func() error {
		fmt.Printf("Task %d started\n", id)
		time.Sleep(delay)
		fmt.Printf("Task %d finished\n", id)
		return nil
	}
}

// Input:
// concurrency = 2
// tasks = [task(1, 2s), task(2, 1s), task(3, 3s), task(4, 0.5s)]

// Expected Output (example order):
// Task 1 started
// Task 2 started
// Task 2 finished
// Task 3 started
// Task 1 finished
// Task 4 started
// Task 4 finished
// Task 3 finished
// nil (returned by Execute)

// Explanation:
// With concurrency=2, Task 1 and Task 2 start.
// Task 2 finishes first. Then Task 3 starts.
// Task 1 finishes. Then Task 4 starts.
// Task 4 finishes. Then Task 3 finishes.
// All tasks are executed, and no errors occurred.

Example 2:

import (
	"errors"
	"fmt"
	"time"
)

func taskWithError(id int, delay time.Duration, err error) func() error {
	return func() error {
		fmt.Printf("Task %d started\n", id)
		time.Sleep(delay)
		fmt.Printf("Task %d finished with error\n", id)
		return err
	}
}

// Input:
// concurrency = 2
// tasks = [task(1, 1s), taskWithError(2, 0.5s, errors.New("task 2 failed")), task(3, 2s)]

// Expected Output (example order):
// Task 1 started
// Task 2 started
// Task 2 finished with error
// errors.New("task 2 failed") (returned by Execute)

// Explanation:
// Task 1 and Task 2 start.
// Task 2 finishes with an error. The Execute method immediately returns this error.
// Task 3 might have started or not depending on timing, but its result (or lack thereof) is not relevant to the returned error.

Example 3:

import (
	"fmt"
	"time"
)

func shortTask(id int) func() error {
	return func() error {
		fmt.Printf("Short task %d started and finished\n", id)
		return nil
	}
}

// Input:
// concurrency = 3
// tasks = [shortTask(1), shortTask(2), shortTask(3), shortTask(4), shortTask(5)]

// Expected Output (example order):
// Short task 1 started and finished
// Short task 2 started and finished
// Short task 3 started and finished
// Short task 4 started and finished
// Short task 5 started and finished
// nil (returned by Execute)

// Explanation:
// With concurrency=3, the first three tasks start.
// As soon as one finishes, the next available task (4, then 5) starts.
// Because the tasks are short, they might all finish very quickly, potentially before the logging messages fully flush, but the overall behavior is that 3 run at a time.

Constraints

  • The concurrency parameter passed to NewTaskExecutor will be an integer greater than 0.
  • The tasks slice passed to Execute will contain func() error.
  • The total number of tasks can be large, so an efficient solution is preferred.
  • Your solution should be idiomatic Go.

Notes

  • Consider using Go's built-in concurrency primitives such as channels and sync.WaitGroup.
  • Think about how to signal completion and errors across goroutines.
  • A common pattern for managing bounded concurrency is using a channel as a semaphore.
Loading editor...
go