Skip to main content

Advanced Concurrency Patterns

Building on the concurrency fundamentals, this chapter explores production-grade patterns used in real-world Go applications.

Worker Pool Pattern

A worker pool limits the number of concurrent goroutines to prevent resource exhaustion.
package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID      int
    Payload string
}

type Result struct {
    JobID  int
    Output string
    Error  error
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // Simulate work
        time.Sleep(100 * time.Millisecond)
        results <- Result{
            JobID:  job.ID,
            Output: fmt.Sprintf("Worker %d processed: %s", id, job.Payload),
        }
    }
}

func main() {
    const numWorkers = 5
    const numJobs = 20

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    var wg sync.WaitGroup

    // Start workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // Send jobs
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{ID: i, Payload: fmt.Sprintf("task-%d", i)}
    }
    close(jobs)

    // Wait for workers and close results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect results
    for result := range results {
        fmt.Printf("Job %d: %s\n", result.JobID, result.Output)
    }
}

Generic Worker Pool

type WorkerPool[T any, R any] struct {
    workers    int
    jobs       chan T
    results    chan R
    processor  func(T) R
    wg         sync.WaitGroup
}

func NewWorkerPool[T any, R any](workers int, processor func(T) R) *WorkerPool[T, R] {
    return &WorkerPool[T, R]{
        workers:   workers,
        jobs:      make(chan T, workers*2),
        results:   make(chan R, workers*2),
        processor: processor,
    }
}

func (p *WorkerPool[T, R]) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for job := range p.jobs {
                p.results <- p.processor(job)
            }
        }()
    }
}

func (p *WorkerPool[T, R]) Submit(job T) {
    p.jobs <- job
}

func (p *WorkerPool[T, R]) Close() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

func (p *WorkerPool[T, R]) Results() <-chan R {
    return p.results
}

Pipeline Pattern

Pipelines are a series of stages connected by channels, where each stage is a group of goroutines running the same function.
// Stage 1: Generate numbers
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Stage 2: Square numbers
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

// Stage 3: Filter even numbers
func filterEven(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            if n%2 == 0 {
                out <- n
            }
        }
    }()
    return out
}

func main() {
    // Build pipeline: generate -> square -> filter
    nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(nums)
    evens := filterEven(squared)

    // Consume
    for n := range evens {
        fmt.Println(n) // 4, 16, 36, 64, 100
    }
}

Pipeline with Context (Cancellation)

func generateWithContext(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case <-ctx.Done():
                return
            case out <- n:
            }
        }
    }()
    return out
}

func squareWithContext(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case <-ctx.Done():
                return
            case out <- n * n:
            }
        }
    }()
    return out
}

Fan-Out / Fan-In Pattern

Fan-out: Multiple goroutines read from the same channel until it’s closed. Fan-in: A function reads from multiple inputs and multiplexes onto a single channel.
// Fan-out: distribute work across multiple workers
func fanOut(in <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        outputs[i] = worker(in)
    }
    return outputs
}

func worker(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            // Process and send result
            out <- n * n
        }
    }()
    return out
}

// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    // Start a goroutine for each input channel
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                out <- n
            }
        }(ch)
    }

    // Close output when all inputs are done
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

// Generic fan-in
func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan T) {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case v, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case <-ctx.Done():
                        return
                    case out <- v:
                    }
                }
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Semaphore Pattern

Limit concurrent access to a resource using a buffered channel as a semaphore.
type Semaphore struct {
    sem chan struct{}
}

func NewSemaphore(maxConcurrent int) *Semaphore {
    return &Semaphore{
        sem: make(chan struct{}, maxConcurrent),
    }
}

func (s *Semaphore) Acquire() {
    s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.sem
}

func (s *Semaphore) TryAcquire() bool {
    select {
    case s.sem <- struct{}{}:
        return true
    default:
        return false
    }
}

// Usage
func processWithLimit(items []string, maxConcurrent int) {
    sem := NewSemaphore(maxConcurrent)
    var wg sync.WaitGroup

    for _, item := range items {
        wg.Add(1)
        sem.Acquire()
        go func(item string) {
            defer wg.Done()
            defer sem.Release()
            // Process item
            process(item)
        }(item)
    }

    wg.Wait()
}

Using golang.org/x/sync/semaphore

import "golang.org/x/sync/semaphore"

func processWithSemaphore(ctx context.Context, items []string, maxConcurrent int64) error {
    sem := semaphore.NewWeighted(maxConcurrent)
    g, ctx := errgroup.WithContext(ctx)

    for _, item := range items {
        item := item // Capture loop variable
        if err := sem.Acquire(ctx, 1); err != nil {
            return err
        }
        g.Go(func() error {
            defer sem.Release(1)
            return process(item)
        })
    }

    return g.Wait()
}

Rate Limiter Pattern

Control the rate of operations using a ticker.
type RateLimiter struct {
    ticker *time.Ticker
    tokens chan struct{}
    stop   chan struct{}
}

func NewRateLimiter(rate int, burst int) *RateLimiter {
    rl := &RateLimiter{
        ticker: time.NewTicker(time.Second / time.Duration(rate)),
        tokens: make(chan struct{}, burst),
        stop:   make(chan struct{}),
    }

    // Fill initial burst
    for i := 0; i < burst; i++ {
        rl.tokens <- struct{}{}
    }

    // Refill tokens
    go func() {
        for {
            select {
            case <-rl.stop:
                return
            case <-rl.ticker.C:
                select {
                case rl.tokens <- struct{}{}:
                default: // Token bucket is full
                }
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Wait() {
    <-rl.tokens
}

func (rl *RateLimiter) Stop() {
    close(rl.stop)
    rl.ticker.Stop()
}

// Usage
func main() {
    limiter := NewRateLimiter(10, 5) // 10 requests/sec, burst of 5
    defer limiter.Stop()

    for i := 0; i < 20; i++ {
        limiter.Wait()
        fmt.Printf("Request %d at %v\n", i, time.Now())
    }
}

Using golang.org/x/time/rate

import "golang.org/x/time/rate"

func main() {
    // 10 events per second, burst of 5
    limiter := rate.NewLimiter(10, 5)

    for i := 0; i < 20; i++ {
        // Block until allowed
        if err := limiter.Wait(context.Background()); err != nil {
            log.Fatal(err)
        }
        fmt.Printf("Request %d\n", i)
    }
}

Circuit Breaker Pattern

Prevent cascading failures by temporarily blocking requests to a failing service.
type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    mu              sync.Mutex
    state           State
    failureCount    int
    successCount    int
    failureThreshold int
    successThreshold int
    timeout         time.Duration
    lastFailure     time.Time
}

func NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        state:            StateClosed,
        failureThreshold: failureThreshold,
        successThreshold: successThreshold,
        timeout:          timeout,
    }
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()

    // Check if circuit should be reset from open to half-open
    if cb.state == StateOpen {
        if time.Since(cb.lastFailure) > cb.timeout {
            cb.state = StateHalfOpen
            cb.successCount = 0
        } else {
            cb.mu.Unlock()
            return errors.New("circuit breaker is open")
        }
    }
    cb.mu.Unlock()

    // Execute the function
    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.failureCount++
        cb.lastFailure = time.Now()
        if cb.failureCount >= cb.failureThreshold {
            cb.state = StateOpen
        }
        return err
    }

    // Success
    if cb.state == StateHalfOpen {
        cb.successCount++
        if cb.successCount >= cb.successThreshold {
            cb.state = StateClosed
            cb.failureCount = 0
        }
    } else {
        cb.failureCount = 0
    }

    return nil
}

func (cb *CircuitBreaker) State() State {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    return cb.state
}

Error Group Pattern

errgroup manages a group of goroutines and returns the first error encountered.
import "golang.org/x/sync/errgroup"

func fetchAllURLs(ctx context.Context, urls []string) ([][]byte, error) {
    g, ctx := errgroup.WithContext(ctx)
    results := make([][]byte, len(urls))

    for i, url := range urls {
        i, url := i, url // Capture loop variables
        g.Go(func() error {
            resp, err := http.Get(url)
            if err != nil {
                return fmt.Errorf("fetching %s: %w", url, err)
            }
            defer resp.Body.Close()

            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return fmt.Errorf("reading %s: %w", url, err)
            }
            results[i] = body
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

Error Group with Limit

func processWithLimit(ctx context.Context, items []string) error {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(10) // Max 10 concurrent goroutines

    for _, item := range items {
        item := item
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }

    return g.Wait()
}

Publish-Subscribe Pattern

type Message struct {
    Topic   string
    Payload interface{}
}

type Subscriber struct {
    id     string
    ch     chan Message
    topics map[string]bool
}

type PubSub struct {
    mu          sync.RWMutex
    subscribers map[string]*Subscriber
}

func NewPubSub() *PubSub {
    return &PubSub{
        subscribers: make(map[string]*Subscriber),
    }
}

func (ps *PubSub) Subscribe(id string, topics ...string) <-chan Message {
    ps.mu.Lock()
    defer ps.mu.Unlock()

    topicMap := make(map[string]bool)
    for _, t := range topics {
        topicMap[t] = true
    }

    sub := &Subscriber{
        id:     id,
        ch:     make(chan Message, 100),
        topics: topicMap,
    }
    ps.subscribers[id] = sub
    return sub.ch
}

func (ps *PubSub) Unsubscribe(id string) {
    ps.mu.Lock()
    defer ps.mu.Unlock()

    if sub, ok := ps.subscribers[id]; ok {
        close(sub.ch)
        delete(ps.subscribers, id)
    }
}

func (ps *PubSub) Publish(topic string, payload interface{}) {
    ps.mu.RLock()
    defer ps.mu.RUnlock()

    msg := Message{Topic: topic, Payload: payload}
    for _, sub := range ps.subscribers {
        if sub.topics[topic] {
            select {
            case sub.ch <- msg:
            default:
                // Channel full, skip
            }
        }
    }
}

// Usage
func main() {
    ps := NewPubSub()

    // Subscriber 1
    ch1 := ps.Subscribe("sub1", "orders", "payments")
    go func() {
        for msg := range ch1 {
            fmt.Printf("Sub1 received: %s - %v\n", msg.Topic, msg.Payload)
        }
    }()

    // Subscriber 2
    ch2 := ps.Subscribe("sub2", "orders")
    go func() {
        for msg := range ch2 {
            fmt.Printf("Sub2 received: %s - %v\n", msg.Topic, msg.Payload)
        }
    }()

    // Publish messages
    ps.Publish("orders", "Order #123")
    ps.Publish("payments", "Payment received")
}

Graceful Shutdown Pattern

func main() {
    // Create server
    srv := &http.Server{Addr: ":8080"}

    // Channel to signal shutdown
    done := make(chan struct{})

    // Handle shutdown signals
    go func() {
        sigChan := make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
        <-sigChan

        fmt.Println("\nShutting down...")

        // Create shutdown context with timeout
        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
        defer cancel()

        // Gracefully shutdown server
        if err := srv.Shutdown(ctx); err != nil {
            log.Printf("Server shutdown error: %v", err)
        }

        close(done)
    }()

    // Start server
    fmt.Println("Server starting on :8080")
    if err := srv.ListenAndServe(); err != http.ErrServerClosed {
        log.Fatal(err)
    }

    <-done
    fmt.Println("Server stopped")
}

Sync Primitives Deep Dive

sync.Once

Execute initialization code exactly once, safely across goroutines.
type Database struct {
    once sync.Once
    conn *sql.DB
}

func (db *Database) Connect() *sql.DB {
    db.once.Do(func() {
        var err error
        db.conn, err = sql.Open("postgres", "connection-string")
        if err != nil {
            panic(err)
        }
    })
    return db.conn
}

sync.Pool

Reuse objects to reduce allocations.
var bufferPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func processRequest(data []byte) string {
    buf := bufferPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        bufferPool.Put(buf)
    }()

    buf.Write(data)
    // Process...
    return buf.String()
}

sync.Map

Concurrent map optimized for specific use cases.
var cache sync.Map

func getOrCompute(key string, compute func() interface{}) interface{} {
    if val, ok := cache.Load(key); ok {
        return val
    }

    val := compute()
    actual, _ := cache.LoadOrStore(key, val)
    return actual
}

sync.Cond

Wait for and signal conditions.
type Queue struct {
    items []interface{}
    cond  *sync.Cond
}

func NewQueue() *Queue {
    return &Queue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

func (q *Queue) Enqueue(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    q.items = append(q.items, item)
    q.cond.Signal() // Wake one waiting goroutine
}

func (q *Queue) Dequeue() interface{} {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()

    for len(q.items) == 0 {
        q.cond.Wait() // Release lock and wait
    }

    item := q.items[0]
    q.items = q.items[1:]
    return item
}

Interview Questions

  • Unbuffered: Synchronous. Sender blocks until receiver is ready.
  • Buffered: Asynchronous up to capacity. Sender only blocks when buffer is full.
Use unbuffered for synchronization guarantees, buffered for decoupling producers and consumers.
Use context.WithTimeout or time.After with select:
select {
case result := <-resultCh:
    return result, nil
case <-ctx.Done():
    return nil, ctx.Err()
case <-time.After(5 * time.Second):
    return nil, errors.New("timeout")
}
Goroutine leaks occur when goroutines are blocked forever (waiting on channels that never receive/send). Prevent by:
  • Always closing channels when done
  • Using context for cancellation
  • Using select with done channels
  • Avoiding sending to nil channels
Use sync.Map when:
  • Keys are only ever written once but read many times
  • Goroutines operate on disjoint sets of keys
Use regular map with sync.RWMutex for general-purpose concurrent access.

Summary

PatternUse Case
Worker PoolLimit concurrent goroutines, process jobs
PipelineChain processing stages
Fan-Out/Fan-InDistribute work, collect results
SemaphoreLimit access to resources
Rate LimiterControl operation frequency
Circuit BreakerPrevent cascading failures
Error GroupManage goroutine errors
Pub/SubEvent-driven communication