Eating up a Golang Channel
Channels of Go are great to deal with real concurrent programs. By real, I mean server or daemon processes which are supposed to work or listen permanently, without a down time. The posts on Go blog (1) (2) are very useful to learn the best practices and to understand the merits of channels.
On the other hand, when I use Go as a C substitute for my algorithms, I deal with bulk data. Often, I have to process an array of data as fast as possible. Although concurrency is not parallelism, I use the goroutines to better utilize 8 (pseudo-)cores of my CPU. A system similar to the map-reduce framework often comes handy: multiple heavy processors and multiple filters. This blog post about pipelines recommends the use of quit channels for graceful shutdowns but their shutdown is not as graceful as I want. As such, using that system does not give me the guaranty of completion.
Let me elaborate with an example. Here are three functions:
processiterates over its assigned slice, process each element and send the processed value tooutchannel. When it completes its slice, it signalswg.Done().filterreads a value frominchannel and adds it to the final list if it is eligible.mainis a glue code that starts a goroutine for thefilter, shares the data between two processors and prints out thefinalvalues when the processors are finished.
func filter(final *[]Type, in chan *Type) {
for {
v := <-in
if verify(v) {
*final = append(final, *v)
}
}
}
func processor(data []int, out chan *Type, *wg sync.WaitGroup){
for _, v := range data {
out <-timeConsumingOperation(v)
}
wg.Done()
}
func main(){
data := readData()
pipe := make(chan *Type,10)
final := make([]Type,0,100)
var wg sync.WaitGroup
go filter(&final, pipe)
half := len(data)/2
wg.Add(2)
go process(data[:half], pipe, &wg)
go process(data[half:], pipe, &wg)
wg.Wait()
fmt.Println(final)
}Well, as experienced gophers can easily see it has a problem: it exits prematurely. We don’t wait for filter to finish.
Monitoring the conclusion of filter is not trivial. Since it’s in an infinite loop, we cannot just use sync.WaitGroup. Let’s start with the question: when should it say it is finished?
- When all the
processors are complete and there is no data in thechannel. Task of signalingfilterabout the completion ofprocessors looks like a perfect fit forquitchannels, as mentioned in the blog post. We can introduce aselectstatement andquitchannel and letfilterdeal with its completion. However, I couldn’t find an elegant and race condition safe solution using quit channel.
I turned to collective knowledge and search for an elegant solution. I came accross this very nice answer on stackoverflow and I learned that receive operator returns two values: next value on the channel and the channel condition. Here is the table of possible values for v, ok := <-myChannel:
len(myChannel) == 0 |
len(myChannel) > 0 |
|
|---|---|---|
| Channel open | Block | v == value ok == true |
| Channel closed | v == ZeroValue ok == false |
v == value ok == false |
Therefore, signalling of the completion of processors can be done over one channel, elegantly.
Final code looks like this:
func filter(final *[]Type, in chan *Type, complete chan bool) {
for {
v,ok := <-in
if !ok && v == nil{ // NEW
complete <-true // NEW
} // NEW
if verify(v) {
*final = append(final, *v)
}
}
}
func processor(data []int, out chan *Type, *wg sync.WaitGroup){
for _,v := range data {
out <-timeConsumingOperation(v)
}
wg.Done()
}
func main(){
data := readData()
pipe := make(chan *Type,10)
final := make([]Type,0,100)
var wg sync.WaitGroup
go filter(&final, pipe)
half := len(data)/2
wg.Add(2)
go process(data[:half], pipe, &wg)
go process(data[half:], pipe, &wg)
wg.Wait()
close(pipe) // NEW
<-complete // NEW
fmt.Println(final)
}PS: While I was preparing the example, I realized that Zero value used often for some types, expecially built-ins like int and float. I’m not yet sure about how to handle them. Maybe I’ll return to it in a later post.







Post a comment
All comments are held for moderation; basic HTML formatting accepted.