Cooperative Task Scheduler in Go
This challenge involves building a cooperative scheduler in Go. Cooperative scheduling is a concurrency model where tasks voluntarily yield control of the CPU to other tasks, rather than being preempted by the operating system. This approach can simplify reasoning about concurrent programs and improve performance in certain scenarios.
Problem Description
Your task is to implement a Scheduler that manages a collection of Tasks. The Scheduler should be able to start, stop, and run these tasks. Tasks will be designed to run in a cooperative manner, meaning they will periodically yield control to the scheduler.
Key Requirements:
-
Task Interface: Define an interface for a
Task. This interface should have at least two methods:Run(yield func()): This method contains the task's logic. It receives ayieldfunction as an argument, which the task must call to relinquish control to the scheduler.Stop(): This method allows the scheduler to signal to the task that it should gracefully shut down.
-
Scheduler Structure: Implement a
Schedulerstruct. It should hold a collection of running tasks. -
Scheduler Methods: Implement the following methods for the
Scheduler:NewScheduler(): A constructor to create a newSchedulerinstance.StartTask(task Task): Adds a task to the scheduler and begins its execution.StopTask(task Task): Signals a specific task to stop and removes it from the scheduler.Run(): This is the main loop of the scheduler. It iterates through the active tasks, calls theirRunmethod, and handles yielding. It should continue running until all tasks have been stopped.
-
Cooperative Yielding: The
Scheduler.Run()method should manage the execution of tasks. When a task'sRunmethod is called, the scheduler provides ayieldfunction. The task must call thisyieldfunction at appropriate points in its execution to allow other tasks to run. If a task does not yield, it can starve other tasks. Theyieldfunction provided to the task should internally signal the scheduler to move to the next task. -
Task Termination: When
StopTaskis called, the scheduler should initiate a graceful shutdown of the target task. TheRunmethod of theSchedulershould terminate when all tasks have been stopped.
Expected Behavior:
- Tasks should execute sequentially in a round-robin fashion.
- When a task calls
yield(), the scheduler should immediately pause that task and start executing the next task in the queue. - The
Scheduler.Run()method should block until all started tasks are explicitly stopped.
Edge Cases to Consider:
- What happens if a task never calls
yield()? (This is a critical aspect of cooperative scheduling – a poorly behaved task can block the entire scheduler.) - What happens if
StopTaskis called on a task that is not currently running? - What happens if
StartTaskis called multiple times with the same task instance?
Examples
Example 1: Basic Task Execution
package main
import (
"fmt"
"sync"
"time"
)
// Define a simple Task that prints messages and yields periodically.
type SimpleTask struct {
id int
message string
stopCh chan struct{}
wg sync.WaitGroup
}
func NewSimpleTask(id int, message string) *SimpleTask {
return &SimpleTask{
id: id,
message: message,
stopCh: make(chan struct{}),
}
}
func (t *SimpleTask) Run(yield func()) {
t.wg.Add(1)
defer t.wg.Done()
fmt.Printf("Task %d started.\n", t.id)
for {
select {
case <-t.stopCh:
fmt.Printf("Task %d stopping.\n", t.id)
return
default:
fmt.Printf("Task %d: %s\n", t.id, t.message)
// Simulate some work and then yield
time.Sleep(50 * time.Millisecond)
yield() // Crucial: yield control
}
}
}
func (t *SimpleTask) Stop() {
close(t.stopCh)
t.wg.Wait() // Wait for Run to finish after stopping
}
// ... (Scheduler implementation would go here) ...
func main() {
scheduler := NewScheduler() // Assuming NewScheduler is implemented
task1 := NewSimpleTask(1, "Hello from Task 1!")
task2 := NewSimpleTask(2, "Greetings from Task 2!")
scheduler.StartTask(task1)
scheduler.StartTask(task2)
// Let tasks run for a bit
go func() {
time.Sleep(500 * time.Millisecond)
scheduler.StopTask(task1)
scheduler.StopTask(task2)
}()
scheduler.Run() // This will block until all tasks are stopped
fmt.Println("Scheduler finished.")
}
Expected Output (Order may vary slightly due to scheduling):
Task 1 started.
Task 1: Hello from Task 1!
Task 2 started.
Task 2: Greetings from Task 2!
Task 1: Hello from Task 1!
Task 1: Hello from Task 1!
Task 2: Greetings from Task 2!
Task 1: Hello from Task 1!
Task 1: Hello from Task 1!
Task 2: Greetings from Task 2!
Task 1: Hello from Task 1!
Task 1: Hello from Task 1!
Task 2: Greetings from Task 2!
Task 1: Hello from Task 1!
Task 1: Hello from Task 1!
Task 2: Greetings from Task 2!
Task 1: Hello from Task 1!
Task 1 stopping.
Task 2 stopping.
Scheduler finished.
Explanation:
Task 1 and Task 2 are started. They run in a round-robin fashion, printing their messages and yielding. After a delay, Task 1 and Task 2 are signaled to stop. The Scheduler.Run() method blocks until both tasks have finished their Stop() method and exited their Run() goroutines.
Example 2: A Task That Doesn't Yield (Demonstrates Starvation)
package main
import (
"fmt"
"sync"
"time"
)
// Task that deliberately does NOT yield for a while.
type StarvingTask struct {
id int
stopCh chan struct{}
wg sync.WaitGroup
yieldOk bool // Controls whether it yields
}
func NewStarvingTask(id int, yield bool) *StarvingTask {
return &StarvingTask{
id: id,
stopCh: make(chan struct{}),
yieldOk: yield,
}
}
func (t *StarvingTask) Run(yield func()) {
t.wg.Add(1)
defer t.wg.Done()
fmt.Printf("Starving Task %d started.\n", t.id)
for i := 0; i < 5; i++ { // Run a few times
select {
case <-t.stopCh:
fmt.Printf("Starving Task %d stopping.\n", t.id)
return
default:
fmt.Printf("Starving Task %d: Iteration %d\n", t.id, i)
time.Sleep(100 * time.Millisecond) // Simulate work
if t.yieldOk {
fmt.Printf("Starving Task %d yielding.\n", t.id)
yield() // Yield if configured
} else {
fmt.Printf("Starving Task %d NOT yielding.\n", t.id)
// No yield here for demonstration
}
}
}
fmt.Printf("Starving Task %d finished its loop.\n", t.id)
}
func (t *StarvingTask) Stop() {
close(t.stopCh)
t.wg.Wait()
}
// ... (Scheduler implementation would go here) ...
func main() {
scheduler := NewScheduler()
starvingTask := NewStarvingTask(1, false) // This task will NOT yield
normalTask := NewSimpleTask(2, "I yield often!") // Use task from Example 1
scheduler.StartTask(starvingTask)
scheduler.StartTask(normalTask)
// Let them run for a short while, then stop
go func() {
time.Sleep(1 * time.Second)
scheduler.StopTask(starvingTask)
scheduler.StopTask(normalTask)
}()
scheduler.Run()
fmt.Println("Scheduler finished.")
}
Expected Output (Demonstrating Starvation):
Starving Task 1 started.
Starving Task 1: Iteration 0
Starving Task 1 NOT yielding.
Starving Task 1: Iteration 1
Starving Task 1 NOT yielding.
Starving Task 1: Iteration 2
Starving Task 1 NOT yielding.
Normal Task 2 started. // This might appear after a significant delay!
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Starving Task 1: Iteration 3
Starving Task 1 NOT yielding.
Starving Task 1: Iteration 4
Starving Task 1 NOT yielding.
Starving Task 1 finished its loop.
Starving Task 1 stopping.
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Normal Task 2: I yield often!
Normal Task 2 stopping.
Scheduler finished.
Explanation:
The starvingTask is configured not to yield. As a result, it monopolizes the scheduler's execution. The normalTask will only get a chance to run after the starvingTask completes its entire loop (or is externally stopped if its loop was infinite). This illustrates the crucial need for cooperative yielding.
Constraints
- The scheduler must manage an arbitrary number of concurrently running tasks.
- The
yieldfunction provided to a task must be idempotent (calling it multiple times without interveningRuncalls should have no ill effects beyond the initial yield). - The
Stop()method on a task should be safe to call multiple times. - The scheduler's
Run()method should not consume excessive CPU when there are no tasks to run or when tasks are yielding frequently. - The solution should use Go's standard library and concurrency primitives.
Notes
- Think carefully about how the
Schedulerwill keep track of which task is next to run. A simple slice or a ring buffer could work. - The
yieldfunction is the core of cooperative scheduling. How will theScheduler'sRunloop detect that a task has yielded? - Consider using a channel to signal between tasks and the scheduler for yielding and stopping.
- The
Stop()method should be a signal, and the task'sRun()method should detect this signal and exit cleanly. TheStop()method itself might need to wait for theRun()goroutine to finish. - This implementation of cooperative scheduling is fundamentally different from Go's goroutines and the Go runtime scheduler, which uses preemptive multitasking. This challenge focuses on building a user-level cooperative scheduler.