Producer / consumer with Goroutines

2022.04.15

Producer consumer is design pattern where multiple consumers receive data for processing from a producer. Today we will implement this pattern using Goroutines. First we will implement a simple version where data is just processed and everything exits. Later we will upgrade it to handle errors if any occur.

Version 1

    First implementation will consist of:

  1. producer
  2. n consumers
  3. wait group
  4. single channel

Producer

The producer will take a channel to send data to consumers and the data itself. Iterating over slice of integers it sends values one by one through the channel and once it's done it closes the channel.

func producer(buff chan int, data []int) {
	defer close(buff)
	for _, i := range data {
		fmt.Printf("Producer sending %d to channel\n", i)
		buff <- i
	}
	fmt.Println("Producer exiting")
}

Consumers

Each consumer will iterate forever and try to fetch data from the channel. If data is available and channel is still open it will process the data, otherwise it quits and calls Done on the sync.WaitGroup.

func consumer(idx int, buff chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		i, ok := <- buff
		if ok {
			fmt.Printf("Consumer #%d: received %d\n", idx, i)
			time.Sleep(1 * time.Second)
		} else {
			fmt.Printf("Consumer #%d: no more values to process, exiting\n", idx)
			return
		}
	}
}

Main

We define sync.WaitGroup to synchronize goroutines and quit main function after all consumers are done. First implementation will use only single channel for sending data from producer to consumers. We fire producer in it's own goroutine and n consumers (in this case 3) each incrementing sync.WaitGroup. Then the main function calls Wait and waits for everything to complete.

package main

import (
	"fmt"
	"time"
	"sync"
)

func main () {
	var wg sync.WaitGroup
	const workersCount = 3
	var buffer = make(chan int, workersCount)
	data := []int{1, 2, 3, 4, 5, 6}

        fmt.Println("Starting producer")
	go producer(buffer, data)

	for i := 0; i < workersCount; i++ {
		wg.Add(1)
		fmt.Printf("Starting consumer #%d\n", i)
		go consumer(i, buffer, &wg)
	}
	fmt.Println("Main waiting")
	wg.Wait()
	fmt.Println("Main exiting")
}

Output

Producer is started first followed by 3 consumers then main function starts waiting. Producers sends values to the channel and exits. Each consumer takes one value, processes it until all there is no data left. All consumers have called Done at this point there for main function stops waiting and exits on its own.

Starting producer
Starting consumer #0
Starting consumer #1
Starting consumer #2
Main waiting
Producer sending 1 to channel
Producer sending 2 to channel
Producer sending 3 to channel
Producer sending 4 to channel
Producer sending 5 to channel
Producer sending 6 to channel
Producer exiting
Consumer #1: received 3
Consumer #0: received 2
Consumer #2: received 1
Consumer #2: received 4
Consumer #1: received 5
Consumer #0: received 6
Consumer #0: no more values to process, exiting
Consumer #2: no more values to process, exiting
Consumer #1: no more values to process, exiting
Main exiting

Version 2

In second implementation we want to add error handling. Consumers and producer will check if any has been raised before processing data. If error was raised, they will quit.

    Second implementation will consist of:

  1. producer
  2. n consumers
  3. wait group
  4. two channels
  5. context

Producer

Upgraded producer now takes context as an argument. Before sending data to a channel, it checks if cancel for the context has been called. As we will see in upgraded consumer, the cancel function we be called if and error during processing values has occurred. If there is no error, it sends values as before.

func producer(buff chan int, ctx context.Context, data []int) {
	defer close(buff)
	for _, i := range data {
		select {
		case <-ctx.Done():
			fmt.Printf("Producer: error occurred, exiting\n")
			return
		default:
		}
		fmt.Printf("Producer sending %d to channel\n", i)
		buff <- i
	}
	fmt.Println("Producer exiting")
}

Consumers

The biggest change happens here, in consumers. Function parameters now include new channel, context and cancel function. New channel is used to send errors that are raised inside data processing function. Context is used to check if any consumer has errored and called the cancel function. Cancel function is called in case of an error. For demonstration the error will be raised if value is equal 3.

func consumer(idx int, buff chan int, errs chan error, wg *sync.WaitGroup, ctx context.Context, cancel func()) {
	defer wg.Done()

	procedure := func(value int) error {
		if value == 3 {
			msg := fmt.Sprintf("Consumer #%d: Can't count to 3", idx)
			return errors.New(msg)
		}
		fmt.Printf("Consumer #%d: processing %d\n", idx, value)
		return nil
	}
	
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("Consumer #%d: somewhere error occurred, exiting\n", idx)
			return
		default:
		}
		i, ok := <- buff
		if ok {
			fmt.Printf("Consumer #%d: received %d\n", idx, i)
			err := procedure(i)
			if err != nil {
				fmt.Printf("Consumer #%d: procedure returned an error, exiting\n", idx)
				errs <- err
				cancel()
				return
			}
			time.Sleep(1 * time.Second)
		} else {
			fmt.Printf("Consumer #%d: no more values to process, exiting\n", idx)
			return
		}
	}
}

Main

Important points are calling deferred cancel function and receiving errors from consumers if any were raised. Besides that there is no change to the main function.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"
	"sync"
)

func main () {
	var wg sync.WaitGroup
	const workersCount = 3
	var buffer = make(chan int, workersCount)
	var errs = make(chan error, workersCount)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	data := []int{1, 2, 3, 4, 5, 6}

	fmt.Println("Starting producer")
	go producer(buffer, ctx, data)

	for i := 0; i < workersCount; i++ {
		wg.Add(1)
		fmt.Printf("Starting consumer #%d\n", i)
		go consumer(i, buffer, errs, &wg, ctx, cancel)
	}
	fmt.Println("Main waiting")
	wg.Wait()
	close(errs)
	for {
		e, ok := <- errs
		if ok {
			fmt.Println("Error received", e)
		} else {
			fmt.Println("No errors left")
			return
		}
	}
	fmt.Println("Main exiting")
}

Output

Output looks similar until the value 3 is received by consumer #0. In that case the procedure raises an error which is sent through errors channel and cancel function is called. Other consumers will check context for done signal and exit. Producers does that too but in our example it manages to send all data and exit before the error is raised. Lastly main function receives error and prints it out.

Starting producer
Starting consumer #0
Starting consumer #1
Starting consumer #2
Main waiting
Producer sending 1 to channel
Producer sending 2 to channel
Producer sending 3 to channel
Producer sending 4 to channel
Producer sending 5 to channel
Consumer #2: received 1
Consumer #2: processing 1
Consumer #1: received 2
Consumer #1: processing 2
Producer sending 6 to channel
Consumer #0: received 3
Consumer #0: procedure returned an error, exiting
Producer exiting
Consumer #1: somewhere error occurred, exiting
Consumer #2: somewhere error occurred, exiting
Error received Consumer #0: Can't count to 3
No errors left

Conclusion

Go concurrency tools were the most fun to work with so far. Channels, context and wait groups make it fairly easy to control multithreaded programs and adapt the code to your needs. We implemented a simple version of producer / consumer pattern using only single channel and wait group. The context allowed us to upgrade it so we can handle errors and exit whole program if any error occurs.