Semaphored Wait Group

2020-04-29 • edited 2022-01-05 • 4 minutes to read

One of the first things you might like to try when starting your journey with Go are the concurrency patterns. You will probably start using goroutines to run things “in the background”, and you will also get to know channels that allow for safe communication between the sub-processes.

When you’ll want to spawn many goroutines, you will surely get to know WaitGroups to wait for them to finish. There will also be the defer statement which helps you to not forget to clean up after a function has finished running (and remember, deferred calls are executed in reverse order).

Let’s start with a simple example:

package main

import (
	"log"
	"sync"
	"time"
)

func worker(id int, wg *sync.WaitGroup) {
	defer wg.Done()
	defer log.Printf("#%d done", id)
	log.Printf("#%d starting", id)
	time.Sleep(time.Second)
}

func main() {
	var wg sync.WaitGroup
	for i := 1; i <= 100; i++ {
		wg.Add(1)
		go worker(i, &wg)
	}
	wg.Wait()
	log.Printf("all done")
}

This works well, but what if we had a million items to process, and the actual work would be a memory-heavy operation? We don’t want to start a million goroutines. We need something to limit the number of goroutines being run at the same time.

This is where a semaphore will come in handy. We’ll define how many concurrent workers we want, and the semaphore will not allow starting new goroutines until a slot is free.

Using a Channel to Create a Simple Semaphore

Let’s say we want a maximum of 5 workers running at any time. We’ll need a buffered channel, and we do not really care what type of values it holds.

sem := make(chan bool, 5)

Whenever we want to start a goroutine, we’ll push a value to the channel:

		wg.Add(1)
		sem <- true
		go worker(i, &wg, sem) // we need to pass the channel to the worker

When we’re done - we’ll take one value out of the channel:

func worker(id int, wg WaitGroup, sem <-chan bool) {
	defer wg.Done()
	defer log.Printf("#%d done", id)
	log.Printf("#%d starting", id)
	time.Sleep(time.Second)
	<-sem
}

Please notice, that wen we added the sem argument to the worker, we also converted the semaphore to a read-only channel using <-chan syntax.

But wait…, we do not want to change the code too much. Can we just create our own implementation of a wait-group that will have the desired feature?

Yes we can, and Go’s interfaces will help us do it.

Go’s Implicit Interfaces - The Ultimate Decoupling

In most other languages the interface has to be defined before it can be implemented. In Go - it is the other way around. We can create interfaces that satisfy our needs and if anything (internal or external) has the methods with matching signatures, Go will treat it as a type that implements the interface.

A Tour of Go : Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.

So, even though we cannot change the sync.WaitGroup type, we can extract an interface that matches our current requirements. We’ll have:

type WaitGroup interface {
	Add(delta int)
	Done()
	Wait()
}

and change worker function’s signature to:

func worker(id int, wg WaitGroup)

Now, we can create our own implementation of our WaitGroup interface with a built-in semaphore.

type SemaphoredWaitGroup struct {
	sem chan bool
	wg  sync.WaitGroup
}

func (s *SemaphoredWaitGroup) Add(delta int) {
	s.wg.Add(delta)
	s.sem <- true
}

func (s *SemaphoredWaitGroup) Done() {
	<-s.sem
	s.wg.Done()
}

func (s *SemaphoredWaitGroup) Wait() {
	s.wg.Wait()
}

There will be a separate post on how to do it in a more “go idiomatic” way. For now, let’s stick with simple composition.

Finally, instead of using the default sync.WaitGroup we can use our own SemaphoredWaitGroup:

	wg := SemaphoredWaitGroup{sem: make(chan bool, 5)}

and we have ourselves a limited pool of workers.

The complete example is available as a GitHub Gist and on Go Playground .

software engineeringgoconcurrencysemaphores

Embedding Types to Reuse Code with Less Noise

Automated testing in a build pipeline

comments powered by Disqus