iTranslated by AI
Cruel Boss and Corporate Slaves: Playing with Go Channels
Suddenly, I wanted to review the online event from the other day.
Concurrency and Data Races
The book "The Go Programming Language" defines a "data race" as follows (Chapter 9):
A data race occurs whenever two or more goroutines access the same variable concurrently and at least one of the accesses is a write.
And it lists the following three ways to avoid data races:
- Do not write to the variable; immutable structures are inherently safe for concurrency.
- Avoid accessing the variable from multiple goroutines; confine the variable to a single goroutine and share data via communication.
- Allow multiple goroutines to access the variable, but only one at a time; mutual exclusion.
The first one might be easy to understand if you think of "value objects" that often appear in Java and other languages. The third one involves something called an "invariant" and is a bit complicated[1], so I'll skip it for now (until I study it a bit more).
Anyway, today's story is about the second method.
Message Passing Using Channels
Go provides a mechanism called "channels" for communication (message-passing) between goroutines.
A channel functions as a FIFO (First-In, First-Out). Furthermore, sending and receiving to/from a channel is guaranteed to be atomic. This means there's no risk of a value disappearing due to multiple goroutines sending concurrently, or conversely, getting the same value due to multiple goroutines receiving concurrently.
When receiving from an empty channel (or an unbuffered channel), the process blocks until something enters the channel (or the channel is closed).
Conversely, when sending to a full channel (or an unbuffered channel), the process blocks until data is taken from the channel.
If you only use channels to interact with other goroutines (without using shared memory access or references/updates via methods), it can be considered concurrency-safe.
Short Play: The Heartless Boss and the Corporate-Slave Subordinates
With that in mind, I thought of a little skit.
There are three actors: one boss and his two subordinates. The boss is a slacker who wants to dump all the tasks onto his subordinates and go home as soon as possible. The two subordinates are corporate slaves who wait with their mouths open like baby birds until work comes down from the boss. In a sense, it's a good match (lol).
The boss created a simple task list class using the channel mechanism.
// Queue: FIFO
type Queue struct {
q chan int
}
// New: create a new instance
func New(size int) *Queue {
return &Queue{make(chan int, size)}
}
// Add: enqueue
func (q *Queue) Add(s int) {
q.q <- s
}
// Get: dequeue
func (q *Queue) Get() (int, bool) {
n, ok := <-q.q
return n, ok
}
//Complete: close channel
func (q *Queue) Complete() {
close(q.q)
}
You might think the contents of the Queue.Get() method could just be:
func (q *Queue) Get() (int, bool) {
return <-q.q
}
However, this will result in a compilation error.
n, ok := <-q.q
This is a "[special form](https://text.baldanders.info/golang/special-forms/ "About expression evaluation by special forms | text.Baldanders.info")", so it needs to be explicitly received as (int, bool).
Now, using this, he registers the tasks scheduled for that day, hands them over to the subordinates, and heads home early (lol). The code would look something like this.
func Manager(wg *sync.WaitGroup, tasklist []int) *Queue {
plan := New(len(tasklist))
wg.Add(1)
go func() {
defer wg.Done()
defer plan.Complete()
for _, n := range tasklist {
plan.Add(n)
log.Printf("Manager: set Task(%d)\n", n)
}
log.Println("Manager: return home")
}()
return plan
}
Meanwhile, the subordinates' work would look like this.
const MaxWorkers = 2
func Workers(wg *sync.WaitGroup, q *Queue) {
for i := 0; i < MaxWorkers; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for {
if n, ok := q.Get(); ok {
log.Printf("Worker(%d): start Task(%d)\n", i, n)
time.Sleep(2 * time.Second) //working...
log.Printf("Worker(%d): end Task(%d)\n", i, n)
} else {
break
}
}
log.Printf("Worker(%d): return home\n", i)
}(i + 1)
}
}
The subordinates silently carry out their work until there are no tasks left. It brings a tear to the eye!
Even after a channel is closed, if there is still content inside, it can be received until the content is gone. Once the content is exhausted, ok is set to false, and it returns immediately.
On the other hand, sending to a closed channel causes a panic. Therefore, closing the channel is basically the responsibility of the sender. However, if there are multiple sending goroutines for a single channel, another means to stop the receiving goroutines will be necessary.
Now, let's actually run this. First, the main() function would look like this.
func main() {
tasklist := []int{1, 2, 3, 4, 5}
log.Println("Start...")
var wg sync.WaitGroup
plan := Manager(&wg, tasklist)
Workers(&wg, plan)
wg.Wait()
log.Println("...End")
}
Running this results in something like the following.
$ go run sample1.go
2021/03/08 20:36:02 Start...
2021/03/08 20:36:02 Manager: set Task(1)
2021/03/08 20:36:02 Manager: set Task(2)
2021/03/08 20:36:02 Manager: set Task(3)
2021/03/08 20:36:02 Manager: set Task(4)
2021/03/08 20:36:02 Manager: set Task(5)
2021/03/08 20:36:02 Worker(2): start Task(1)
2021/03/08 20:36:02 Worker(1): start Task(2)
2021/03/08 20:36:02 Manager: return home
2021/03/08 20:36:04 Worker(2): end Task(1)
2021/03/08 20:36:04 Worker(2): start Task(3)
2021/03/08 20:36:04 Worker(1): end Task(2)
2021/03/08 20:36:04 Worker(1): start Task(4)
2021/03/08 20:36:06 Worker(1): end Task(4)
2021/03/08 20:36:06 Worker(1): start Task(5)
2021/03/08 20:36:06 Worker(2): end Task(3)
2021/03/08 20:36:06 Worker(2): return home
2021/03/08 20:36:08 Worker(1): end Task(5)
2021/03/08 20:36:08 Worker(1): return home
2021/03/08 20:36:08 ...End
It's a bit hard to follow, so let's make a sequence diagram. It might look something like this.
The boss just goes home without a second thought for his subordinates. Truly heartless (lol).
Blocking on Channel Send
Now, it seems the heartless boss has received a complaint: "Don't give the subordinates more work than their capacity at once." So, the boss adjusted the task list (channel) buffer to match the number of subordinates.
func Manager(wg *sync.WaitGroup, tasklist []int) *Queue {
- plan := New(len(tasklist))
+ plan := New(MaxWorkers)
wg.Add(1)
go func() {
defer wg.Done()
defer plan.Complete()
for _, n := range tasklist {
plan.Add(n)
log.Printf("Manager: set Task(%d)\n", n)
}
log.Println("Manager: return home")
}()
return plan
}
Let's run this. Going straight to the sequence diagram:
Well, the fact that the boss goes home early remains the same. But more than that, he didn't like having to keep monitoring the subordinates until all tasks were set.
"Wait. I don't need to watch them all the time; I can just check in occasionally (while doing my own work)!"
Non-blocking Channel Sending
So, the Queue.Add() function was rewritten as follows:
func (q *Queue) Add(s int) error {
- q.q <- s
+ select {
+ case q.q <- s:
+ return nil
+ default:
+ return ErrTooBusy
+ }
}
If you add a default clause to a select statement, it will fall through to default without waiting if all case branches are in a waiting state (the same applies to channel reception). Now, if the buffer is full, it returns an error immediately without blocking. Using this:
func Manager(wg *sync.WaitGroup, tasklist []int) *Queue {
plan := New(MaxWorkers)
wg.Add(1)
go func() {
defer wg.Done()
defer plan.Complete()
- for _, n := range tasklist {
- plan.Add(n)
- log.Printf("Manager: set Task(%d)\n", n)
+ offset := 0
+ for {
+ rest := false
+ for i := offset; i < len(tasklist); i++ {
+ offset = i
+ n := tasklist[i]
+ if err := plan.Add(n); err != nil {
+ log.Printf("Manager: canot assign Task(%d): %v\n", n, err)
+ rest = true
+ break
+ } else {
+ log.Printf("Manager: set Task(%d)\n", n)
+ }
+ }
+ if rest {
+ time.Sleep(time.Second)
+ } else {
+ break
+ }
}
log.Println("Manager: return home")
}()
return plan
}
Let's rewrite it like this. The key point is that if the Queue.Add() function fails, it takes a short interval and then tries again from where it left off.
Now the sequence diagram looks like this:
The heartless boss used his spare time to do his own work, slightly improving his reputation, while the corporate-slave subordinates remained corporate slaves. And they lived happily ever after.
Goroutine Priority
When you draw concurrency in a sequence diagram, the downside is that it inevitably looks like they are "executing alternately." In reality, there is no priority among the goroutines representing the three actors; they run completely equally and concurrently. Because it is difficult to predict which goroutine will run at what timing, it is not suitable for strict real-time processing[2].
If you want to create a priority between goroutines, even indirectly, you would need to build in some other mechanism (though I think that would be quite difficult even considering things like GC).
Reference Books
-
If I were to explain invariants seriously, group theory and such would come up. So here, I'll limit it to a rough explanation related to concurrency (please excuse the lack of precision). One might say "the invariant is true" or "the invariant is maintained" when the internal state of an instance or the relationship between instances is not broken. For example, while sorting an array, the invariant is temporarily false. If you access that array from the outside while the invariant is false, the content is undefined and not guaranteed. That's why you need to lock the entire sorting process to prevent external access. Explaining this every time depending on the situation is difficult, so it's abstracted with the term "truth/falsity of the invariant." However, it's truly a case of "easier said than done," and it can be quite complex when you think about it in terms of concrete implementation code. This is likely one of the reasons why people think "concurrency (parallel processing) is difficult." ↩︎
-
Real-time processing here refers to "completing divided jobs at set timings and within set periods." In embedded systems, designing jobs so that real-time processing occurs without delay can be quite tedious. Especially systems that break via hardware are hard to debug and truly tough (lol). ↩︎
Discussion