Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Senior Go developer skill covering concurrency patterns, gRPC microservices, generics, pprof optimization, and idiomatic testing.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/concurrency.md
1# Concurrency Patterns23## Goroutine Lifecycle Management45```go6package main78import (9"context"10"fmt"11"sync"12"time"13)1415// Worker pool with bounded concurrency16type WorkerPool struct {17workers int18tasks chan func()19wg sync.WaitGroup20}2122func NewWorkerPool(workers int) *WorkerPool {23wp := &WorkerPool{24workers: workers,25tasks: make(chan func(), workers*2), // Buffered channel26}27wp.start()28return wp29}3031func (wp *WorkerPool) start() {32for i := 0; i < wp.workers; i++ {33wp.wg.Add(1)34go func() {35defer wp.wg.Done()36for task := range wp.tasks {37task()38}39}()40}41}4243func (wp *WorkerPool) Submit(task func()) {44wp.tasks <- task45}4647func (wp *WorkerPool) Shutdown() {48close(wp.tasks)49wp.wg.Wait()50}51```5253## Channel Patterns5455```go56// Generator pattern57func generateNumbers(ctx context.Context, max int) <-chan int {58out := make(chan int)59go func() {60defer close(out)61for i := 0; i < max; i++ {62select {63case out <- i:64case <-ctx.Done():65return66}67}68}()69return out70}7172// Fan-out, fan-in pattern73func fanOut(ctx context.Context, input <-chan int, workers int) []<-chan int {74channels := make([]<-chan int, workers)75for i := 0; i < workers; i++ {76channels[i] = process(ctx, input)77}78return channels79}8081func process(ctx context.Context, input <-chan int) <-chan int {82out := make(chan int)83go func() {84defer close(out)85for val := range input {86select {87case out <- val * 2:88case <-ctx.Done():89return90}91}92}()93return out94}9596func fanIn(ctx context.Context, channels ...<-chan int) <-chan int {97out := make(chan int)98var wg sync.WaitGroup99100for _, ch := range channels {101wg.Add(1)102go func(c <-chan int) {103defer wg.Done()104for val := range c {105select {106case out <- val:107case <-ctx.Done():108return109}110}111}(ch)112}113114go func() {115wg.Wait()116close(out)117}()118119return out120}121```122123## Select Statement Patterns124125```go126// Timeout pattern127func fetchWithTimeout(ctx context.Context, url string) (string, error) {128result := make(chan string, 1)129errCh := make(chan error, 1)130131go func() {132// Simulate network call133time.Sleep(100 * time.Millisecond)134result <- "data from " + url135}()136137select {138case res := <-result:139return res, nil140case err := <-errCh:141return "", err142case <-time.After(50 * time.Millisecond):143return "", fmt.Errorf("timeout")144case <-ctx.Done():145return "", ctx.Err()146}147}148149// Done channel pattern for graceful shutdown150type Server struct {151done chan struct{}152}153154func (s *Server) Shutdown() {155close(s.done)156}157158func (s *Server) Run(ctx context.Context) {159ticker := time.NewTicker(1 * time.Second)160defer ticker.Stop()161162for {163select {164case <-ticker.C:165fmt.Println("tick")166case <-s.done:167fmt.Println("shutting down")168return169case <-ctx.Done():170fmt.Println("context cancelled")171return172}173}174}175```176177## Sync Primitives178179```go180import "sync"181182// Mutex for protecting shared state183type Counter struct {184mu sync.Mutex185count int186}187188func (c *Counter) Increment() {189c.mu.Lock()190defer c.mu.Unlock()191c.count++192}193194func (c *Counter) Value() int {195c.mu.Lock()196defer c.mu.Unlock()197return c.count198}199200// RWMutex for read-heavy workloads201type Cache struct {202mu sync.RWMutex203items map[string]string204}205206func (c *Cache) Get(key string) (string, bool) {207c.mu.RLock()208defer c.mu.RUnlock()209val, ok := c.items[key]210return val, ok211}212213func (c *Cache) Set(key, value string) {214c.mu.Lock()215defer c.mu.Unlock()216c.items[key] = value217}218219// sync.Once for initialization220type Service struct {221once sync.Once222config *Config223}224225func (s *Service) getConfig() *Config {226s.once.Do(func() {227s.config = loadConfig() // Only called once228})229return s.config230}231```232233## Rate Limiting and Backpressure234235```go236import "golang.org/x/time/rate"237238// Token bucket rate limiter239type RateLimiter struct {240limiter *rate.Limiter241}242243func NewRateLimiter(rps int) *RateLimiter {244return &RateLimiter{245limiter: rate.NewLimiter(rate.Limit(rps), rps),246}247}248249func (rl *RateLimiter) Process(ctx context.Context, item string) error {250if err := rl.limiter.Wait(ctx); err != nil {251return err252}253// Process item254return nil255}256257// Semaphore pattern for limiting concurrency258type Semaphore struct {259slots chan struct{}260}261262func NewSemaphore(n int) *Semaphore {263return &Semaphore{264slots: make(chan struct{}, n),265}266}267268func (s *Semaphore) Acquire() {269s.slots <- struct{}{}270}271272func (s *Semaphore) Release() {273<-s.slots274}275276func (s *Semaphore) Do(fn func()) {277s.Acquire()278defer s.Release()279fn()280}281```282283## Pipeline Pattern284285```go286// Stage-based processing pipeline287func pipeline(ctx context.Context, input <-chan int) <-chan int {288// Stage 1: Square numbers289stage1 := make(chan int)290go func() {291defer close(stage1)292for num := range input {293select {294case stage1 <- num * num:295case <-ctx.Done():296return297}298}299}()300301// Stage 2: Filter even numbers302stage2 := make(chan int)303go func() {304defer close(stage2)305for num := range stage1 {306if num%2 == 0 {307select {308case stage2 <- num:309case <-ctx.Done():310return311}312}313}314}()315316return stage2317}318```319320## Quick Reference321322| Pattern | Use Case | Key Points |323|---------|----------|------------|324| Worker Pool | Bounded concurrency | Limit goroutines, reuse workers |325| Fan-out/Fan-in | Parallel processing | Distribute work, merge results |326| Pipeline | Stream processing | Chain transformations |327| Rate Limiter | API throttling | Control request rate |328| Semaphore | Resource limits | Cap concurrent operations |329| Done Channel | Graceful shutdown | Signal completion |330