Dwarves
Memo
Type ESC to close search bar

Approaches To Manage Concurrent Workloads Like Worker Pools And Pipelines

Introduction

Go provides us great and convenient ways to write concurrent programs with high performance to execute tasks concurrently (perhaps in parallel if the program is run on a machine with multiple physical cores, GOMAXPROCS are automatically set to the number of physical cores of the machine that the program is running on)

While the Go concurrency primitives are easy to work with (it means it’s easy to create the Go concurrency primitives and start using them), but they don’t prevent us the developers to write something incorrectly or buggy. They should be used with great care and ideally they should be combined together to achieve some concurrency patterns to be fit in different use cases or contexts where we might solve/handle our problems/business concurrently.

Introduction

Concurrency patterns in Go are different ways to put Go’s concurrency primitives together to build interesting structures and get our code to respond well to a lot of things happening at once. Below are four popular concurrency patterns in Go that helps handle large amount of workloads concurrently in a safe and elegant way.

This post will explain these patterns with a simple example and walk you through the code as well as the decision-making when writing these codes.

The Patterns

The Workers Pool pattern

The Patterns


// worker simply double the number received from the jobs channel and send it to the results channel
func worker(id int, wg *sync.WaitGroup, jobs <-chan int, results chan<- int) {
	defer wg.Done()
	for j := range jobs {
		fmt.Println("worker", id, "processing job", j)
		results <- j * 2
	}
}

func main() {
	const (
		numJobs, numWorkers = 5, 3
	)
	var (
		jobs     = make(chan int, numJobs)
		results  = make(chan int, numJobs)
		workerWg = new(sync.WaitGroup)
	)
	workerWg.Add(numWorkers)

	// Create a pool of 3 workers
	for w := 1; w <= numWorkers; w++ {
		go worker(w, workerWg, jobs, results)
	}

	go func() {
		defer close(results)
		workerWg.Wait()
	}()

	// Add jobs to the queue
	go func() {
		defer close(jobs)
		for j := 1; j <= numJobs; j++ {
			jobs <- j
		}
	}()

	// Collect results from the workers
	for res := range results {
		fmt.Println("RESULT:", res)
	}
}

The disadvantages of this pattern

Limited scalability The worker pool pattern, with a fixed number of workers, can be limited in terms of scalability. If the workload increases beyond the capacity of the worker pool, performance may suffer. One solution to this problem is to use a dynamic worker pool. In a dynamic worker pool, the number of workers varies based on the workload. When there are more tasks to be processed, the pool increases the number of workers, and when the workload decreases, it reduces the number of workers.

To implement a dynamic worker pool, we can use a combination of channels and goroutines. We can create a channel to receive tasks and another channel to send results. We can also create a goroutine that listens to the task channel and assigns tasks to available workers. Each worker is a goroutine that receives a task from the worker channel, processes it, and sends the result back to the result channel. To dynamically adjust the number of workers, we can use a separate goroutine that monitors the workload and adjusts the number of workers accordingly. By using a dynamic worker pool, we can achieve better scalability and utilization of resources. However, it requires careful tuning of parameters such as the workload threshold and the rate of worker creation/destruction to avoid overloading the system or creating too many unnecessary goroutines. Limited scalability The worker pool pattern, with a fixed number of workers, can be limited in terms of scalability. If the workload increases beyond the capacity of the worker pool, performance may suffer. One solution to this problem is to use a dynamic worker pool. In a dynamic worker pool, the number of workers varies based on the workload. When there are more tasks to be processed, the pool increases the number of workers, and when the workload decreases, it reduces the number of workers.

Task prioritization The worker pool pattern does not provide a built-in mechanism for task prioritization. This means that all tasks are treated equally, regardless of their importance or urgency. We can introduce a priority queue data structure to the workers pool pattern. A priority queue is a data structure that stores elements with associated priorities and allows for efficient retrieval of the element with the highest priority.

Resource management Managing resources such as memory and CPU usage can be challenging with the worker pool pattern. Since the number of workers is fixed, it can be difficult to optimize resource usage for different types of workloads. If a worker takes too long to complete a task, we can use a timeout mechanism for tasks, such that the task can be timed out and reassigned to another worker. This ensures that no worker is blocked for an extended period of time and helps maintain the overall performance of the system. Fan-out/fan-in is a pattern for parallelizing work across multiple goroutines. The idea is to split the work into smaller chunks and distribute them across a pool of workers. Once all the workers have finished processing their chunks, the results are collected and combined. Task prioritization The worker pool pattern does not provide a built-in mechanism for task prioritization. This means that all tasks are treated equally, regardless of their importance or urgency. We can introduce a priority queue data structure to the workers pool pattern. A priority queue is a data structure that stores elements with associated priorities and allows for efficient retrieval of the element with the highest priority.

The fan-out part of the pattern involves distributing work among multiple worker goroutines. These goroutines work concurrently, each handling a portion of the tasks. This approach helps to increase throughput and process large datasets more efficiently. The fan-in aspect of the pattern involves collecting the results from the worker goroutines and combining them into a single output. This process is typically done using a dedicated goroutine that listens to the individual output channels of the workers, merges the results, and sends them to a single output channel.

The fan-out, fan-in pattern is particularly useful in situations where tasks can be divided into smaller, independent units and processed concurrently. This pattern not only improves application performance but also enhances code maintainability and readability by separating the concerns of distributing tasks and aggregating results.

// simulateDownload simulates downloading a file and returns its content.
func simulateDownload(url string) string {
	time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
	return fmt.Sprintf("Content of %s", url)
}

// downloader downloads a list of URLs and returns the content.
func downloader(urls []string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for _, url := range urls {
			out <- simulateDownload(url)
		}
	}()
	return out
}

// worker processes the content and returns the number of words.
func worker(in <-chan string) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for content := range in {
			fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000000 -0700 MST"))
			fmt.Printf("Processing content: %s\\n", content)
			words := strings.Fields(content)
			out <- len(words)
		}
	}()
	return out
}

// merger merges the results from multiple workers.
func merger(ins ...<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	wg.Add(len(ins))

	for _, in := range ins {
		go func(in <-chan int) {
			defer wg.Done()
			for n := range in {
				fmt.Printf("Merging result: %d\\n\\n", n)
				out <- n
			}
		}(in)
	}

	go func() {
		defer close(out)
		wg.Wait()
	}()

	return out
}

func main() {
	rand.Seed(time.Now().UnixNano())

	urls := []string{
		"<https://example.com/file1.txt>",
		"<https://example.com/file2.txt>",
		"<https://example.com/file3.txt>",
		"<https://example.com/file4.txt>",
		"<https://example.com/file5.txt>",
	}

	downloadStream := downloader(urls)
	numWorkers := 3

	workerChannels := make([]<-chan int, numWorkers)
	for i := 0; i < numWorkers; i++ {
		workerChannels[i] = worker(downloadStream)
	}

	merged := merger(workerChannels...)

	totalWordCount := 0
	for count := range merged {
		totalWordCount += count
	}

	fmt.Printf("Total word count: %d\\n", totalWordCount)
}

Here’s a breakdown of the code:

  1. Import necessary packages.
  2. Define simulateDownload(url string) function, which simulates downloading a file from the provided URL and returns its content as a string.
  3. Define downloader(urls []string) function, which takes a slice of URLs and returns a channel that sends the content of each URL. It launches a goroutine that iterates over the URLs, simulates the download, and sends the content through the channel. The channel is closed after all URLs have been processed. It will be fan-out part: the* downloader function creates a single downloadStream channel that sends the content of each downloaded file. Later, we will create multiple worker goroutines that listen to this shared channel, effectively fanning out the work to be done concurrently.
  4. Define worker(in <-chan string) function, which takes a channel of strings as input and returns a channel of integers. It launches a goroutine that reads the content from the input channel, prints the processing timestamp and content, counts the number of words in the content, and sends the count through the output channel. The output channel is closed after all content has been processed.
  5. Define merger(ins ...<-chan int) function, which takes a variadic parameter of channels with integer values and returns a channel with integer values. It merges the input channels into a single output channel. A sync.WaitGroup is used to wait for all input channels to be processed, after which the output channel is closed. It will be fan-in part: the* merger function combines the results from multiple worker goroutines by listening to their individual output channels. It uses a sync.WaitGroup to ensure that it waits for all the worker goroutines to complete before closing its output channel.
  6. In the main function, seed the random generator, define a slice of URLs, and create a download stream by calling the downloader() function.
  7. Define simulateDownload(url string) function, which simulates downloading a file from the provided URL and returns its content as a string.
  8. Define downloader(urls []string) function, which takes a slice of URLs and returns a channel that sends the content of each URL. It launches a goroutine that iterates over the URLs, simulates the download, and sends the content through the channel. The channel is closed after all URLs have been processed. It will be fan-out part: the* downloader function creates a single downloadStream channel that sends the content of each downloaded file. Later, we will create multiple worker goroutines that listen to this shared channel, effectively fanning out the work to be done concurrently.
  9. Define worker(in <-chan string) function, which takes a channel of strings as input and returns a channel of integers. It launches a goroutine that reads the content from the input channel, prints the processing timestamp and content, counts the number of words in the content, and sends the count through the output channel. The output channel is closed after all content has been processed.
  10. Define merger(ins ...<-chan int) function, which takes a variadic parameter of channels with integer values and returns a channel with integer values. It merges the input channels into a single output channel. A sync.WaitGroup is used to wait for all input channels to be processed, after which the output channel is closed. It will be fan-in part: the* merger function combines the results from multiple worker goroutines by listening to their individual output channels. It uses a sync.WaitGroup to ensure that it waits for all the worker goroutines to complete before closing its output channel.
  11. In the main function, seed the random generator, define a slice of URLs, and create a download stream by calling the downloader() function.
  12. Define the number of workers, create a slice of worker channels, and start the worker goroutines with the download stream as input.
  13. Merge the worker channels using the merger() function.
  14. Iterate over the merged channel to compute the total word count.
  15. Print the total word count. Resource consumption Creating too many goroutines can lead to excessive resource consumption, which can cause performance issues or even crashes. This is particularly true if the sub-tasks are short-lived and the overhead of creating and managing goroutines outweighs the benefits. To avoid excessive resource consumption, you can limit the number of goroutines that are created at any given time. One way to do this is to use a worker pool, where a fixed number of goroutines are created upfront and then used to process tasks as they become available.

Increased complexity The fan-out/fan-in pattern can add complexity to your code, especially if you need to handle errors or timeouts. It can also make it harder to reason about the behavior of your program. To handle errors or timeouts, you can use the context cancellation, or use the context package to propagate cancellation signals to all the goroutines involved in the fan-out/fan-in pattern. This can help ensure that resources are released promptly and that your program doesn’t hang indefinitely.

Resource consumption Creating too many goroutines can lead to excessive resource consumption, which can cause performance issues or even crashes. This is particularly true if the sub-tasks are short-lived and the overhead of creating and managing goroutines outweighs the benefits. To avoid excessive resource consumption, you can limit the number of goroutines that are created at any given time. One way to do this is to use a worker pool, where a fixed number of goroutines are created upfront and then used to process tasks as they become available. func main() { Synchronization overhead Coordinating the results of multiple goroutines can introduce synchronization overhead, which can slow down your program and increase the likelihood of race conditions or deadlocks. If possible, try to design your program so that synchronization is only necessary when aggregating the results of the sub-tasks to avoid unnecessary synchronization. For example, you can use channels to pass data between goroutines instead of shared memory, which can reduce the likelihood of race conditions or deadlocks. multiply(add(input, 1), 2)

// Example 2:
// We can rearrange the stages to get diff result
add(multiply(input, 2), 1)

}

func add(x int, y int) int { return x + y }

func multiply(x int, y int) int { return x * y }


### The benefit of a pipeline is evident

- It separates the concerns of each stage in the pipeline. Each stage is responsible for one and only one thing.
- The stages are modular and allow us to mix and match how stages are combined. The stages in the example above run sequentially. Each can only begin after the previous stage has processed all the data. Leveraging the Goroutine and channel, stages can run and process data concurrently. First, we transform our `add` and `multiply`function to take in an `inputCh` and outputs a `resultCh`.

```go
func generator(doneCh chan struct{}, input []int) chan int {
	inputCh := make(chan int)

### The benefit of a pipeline is evident

- It separates the concerns of each stage in the pipeline. Each stage is responsible for one and only one thing.
- The stages are modular and allow us to mix and match how stages are combined. The stages in the example above run sequentially. Each can only begin after the previous stage has processed all the data. Leveraging the Goroutine and channel, stages can run and process data concurrently. First, we transform our `add` and `multiply`function to take in an `inputCh` and outputs a `resultCh`.
				return
			case inputCh <- data:
			}
		}
	}()

	return inputCh
}

func add(doneCh chan struct{}, inputCh chan int) chan int {
	addRes := make(chan int)

	go func() {
		defer close(addRes)

		for data := range inputCh {
			result := data + 1

			select {
			case <-doneCh:
				return
			case addRes <- result:
			}
		}
	}()

	return addRes
}

func multiply(doneCh chan struct{}, inputCh chan int) chan int {
	multiplyRes := make(chan int)

	go func() {
		defer close(multiplyRes)

		for data := range inputCh {
			result := data * 2

			select {
			case <-doneCh:
				return
			case multiplyRes <- result:
			}
		}
	}()

	return multiplyRes
}

func main() {
	input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

	doneCh := make(chan struct{})
	defer close(doneCh)

	inputCh := generator(doneCh, input)

	resultCh := multiply(doneCh, add(doneCh, inputCh))

	for res := range resultCh {
		fmt.Println(res)
	}
}

Here’s a breakdown of the code:

  1. We create a data stream using the generator function
  2. We create a doneCh and pass to all Goroutines for explicit cancellation
  3. We then chain the add and multiply stage together
  4. Whenever the add function has done processing an input. It will immediately pass the result to the multiply stage for further processing

Each stage processes the data concurrently and immediately passes it to the next stage once it’s done. Moreover, the multiply and the add stage can be mixed and matched to produce different results.

The disadvantages of this pattern

Increased complexity The pipeline pattern can become complex when dealing with multiple stages and channels. This complexity can make it difficult to debug and maintain the code. To reduce the complexity of the pipeline pattern, it is important to keep each stage simple and focused on a specific task. This will make it easier to debug and maintain the code.

Blocking If one stage of the pipeline is blocked, it can cause the entire pipeline to block. This can lead to performance issues and slow down the processing of data. To avoid blocking in the pipeline, non-blocking channels can be used. This will allow the pipeline to continue processing data even if one stage is blocked. 2. We create a doneCh and pass to all Goroutines for explicit cancellation 3. We then chain the add and multiply stage together 4. Whenever the add function has done processing an input. It will immediately pass the result to the multiply stage for further processing Proper error handling should be implemented in each stage of the pipeline to handle any errors that may occur. This will help to prevent the pipeline from crashing and losing data.

The Semaphore pattern

Increased complexity The pipeline pattern can become complex when dealing with multiple stages and channels. This complexity can make it difficult to debug and maintain the code. To reduce the complexity of the pipeline pattern, it is important to keep each stage simple and focused on a specific task. This will make it easier to debug and maintain the code.

Blocking If one stage of the pipeline is blocked, it can cause the entire pipeline to block. This can lead to performance issues and slow down the processing of data. To avoid blocking in the pipeline, non-blocking channels can be used. This will allow the pipeline to continue processing data even if one stage is blocked. type Semaphore struct { Data Loss If the pipeline is not designed properly, it can result in data loss. For example, if a channel is not buffered and a stage is not ready to receive data, the data will be lost. To prevent data loss, buffered channels can be used. This will ensure that data is not lost if a stage is not ready to receive data. func NewSemaphore(maxReq int) *Semaphore { return &Semaphore{ semaCh: make(chan struct{}, maxReq), }

func (s *Semaphore) Acquire() { s.semaCh <- struct{}{} }

func (s *Semaphore) Release() { <-s.semaCh }


1. The `NewSemaphore` initiates a `Semaphore` by creating a buffered channel with the capacity of `maxReq`
2. When a Goroutine `Acquire` a semaphore, we send an empty struct to `semaCh`
3. When the buffered channel is full, call to `Acquire` will be blocked
4. When a Goroutine `Release` a semaphore, an empty struct will be sent out of the channel, creating space in the buffered channel for subsequent `Acquire`

Let’s take a look at an example:

```go
func main() {
	var wg sync.WaitGroup
	semaphore := NewSemaphore(2)

	for idx := 0; idx < 10; idx++ {
		wg.Add(1)

		go func(taskID int) {
2. When a Goroutine `Acquire` a semaphore, we send an empty struct to `semaCh`
3. When the buffered channel is full, call to `Acquire` will be blocked
4. When a Goroutine `Release` a semaphore, an empty struct will be sent out of the channel, creating space in the buffered channel for subsequent `Acquire`
			defer semaphore.Release()

			msg := fmt.Sprintf(
				"%s Running worker %d",
				time.Now().Format("15:04:05"),
				taskID,
			)
			fmt.Println(msg)

			time.Sleep(1 * time.Second)
		}(idx)
	}

	wg.Wait()
}
  1. We create a semaphore with the capacity of 2
  2. We spawn ten Goroutines to process certain task
  3. Each Goroutine acquires a semaphore before processing
  4. Since there are ten tasks and the maximum number of concurrent tasks is 2, the total time needed to process all tasks will be five seconds (Each task takes one second)

The disadvantages of this pattern

Deadlocks One of the main disadvantages of the Semaphore pattern is the potential for deadlocks. Deadlocks occur when two or more processes are waiting for each other to release a resource, resulting in a deadlock situation where none of the processes can proceed. To avoid deadlocks, it is important to ensure that all resources are released after they have been used. In Golang, this can be achieved by using the deferstatement to ensure that resources are always released, even if an error occurs.

Starvation Another disadvantage of the Semaphore pattern is the potential for starvation. Starvation occurs when a process is unable to access a shared resource because other processes are constantly accessing it.

To avoid starvation, it is important to implement a fair scheduling algorithm that ensures that all processes have equal access to the shared resource. In Golang, this can be achieved by using a sync.Mutex to lock the shared resource and a sync.Cond to signal when the resource is available.

Performance Overhead The Semaphore pattern can also introduce performance overhead due to the additional synchronization mechanisms required to control access to the shared resource. To minimize performance overhead, it is important to use the Semaphore pattern only when necessary and to carefully consider the number of resources that need to be shared. In Golang, this can be achieved by using buffered channels to limit the number of goroutines that can access the shared resource at any given time.

Considering the number of goroutines

  1. We spawn ten Goroutines to process certain task
  2. Each Goroutine acquires a semaphore before processing
  3. Since there are ten tasks and the maximum number of concurrent tasks is 2, the total time needed to process all tasks will be five seconds (Each task takes one second) The operating system schedules threads to run against available processors and the Go runtime schedules goroutines to run within a logical processor that is bound to a single operating system thread. By default, the Go runtime allocates a single logical processor to execute all the goroutines that are created for our program. Even with this single logical processor and operating system thread, hundreds of thousands of goroutines can be scheduled to run concurrently with amazing efficiency and performance. It is not recommended to add more that one logical processor, but if you want to run goroutines in parallel, Go provides the ability to add more via the GOMAXPROCS environment variable or runtime function.

Deadlocks One of the main disadvantages of the Semaphore pattern is the potential for deadlocks. Deadlocks occur when two or more processes are waiting for each other to release a resource, resulting in a deadlock situation where none of the processes can proceed. To avoid deadlocks, it is important to ensure that all resources are released after they have been used. In Golang, this can be achieved by using the deferstatement to ensure that resources are always released, even if an error occurs.

Starvation Another disadvantage of the Semaphore pattern is the potential for starvation. Starvation occurs when a process is unable to access a shared resource because other processes are constantly accessing it.

It’s important to note that increasing the file descriptor limit can have performance implications, as each open file consumes system resources. Therefore, it’s generally recommended to only increase the limit if your application requires it and to monitor resource usage carefully. Performance Overhead The Semaphore pattern can also introduce performance overhead due to the additional synchronization mechanisms required to control access to the shared resource. To minimize performance overhead, it is important to use the Semaphore pattern only when necessary and to carefully consider the number of resources that need to be shared. In Golang, this can be achieved by using buffered channels to limit the number of goroutines that can access the shared resource at any given time.

  1. Design Considerations: When designing an application that uses a large number of goroutines, it’s important to consider the overall architecture and ensure that it is scalable and maintainable. This may involve breaking up tasks into smaller, more manageable pieces or using a distributed system architecture.

Conclusion

  1. Deadlocks and Race Conditions: When multiple goroutines access shared resources concurrently, it can lead to deadlocks and race conditions. These issues can be difficult to debug and fix, especially when dealing with a large number of goroutines.
  2. Resource Limitations: Running a large number of goroutines can also lead to resource limitations, such as running out of memory or hitting file descriptor limits. It’s important to monitor resource usage and adjust the number of goroutines accordingly.
  1. Design Considerations: When designing an application that uses a large number of goroutines, it’s important to consider the overall architecture and ensure that it is scalable and maintainable. This may involve breaking up tasks into smaller, more manageable pieces or using a distributed system architecture.