7 Concurrency Patterns ใน Go ที่คุณต้องรู้ — อัปเดต Go 1.25/1.26

7 Concurrency Patterns ใน Go ที่คุณต้องรู้ — อัปเดต Go 1.25/1.26

Dev Team · IT ·

7 Concurrency Patterns ใน Go ที่คุณต้องรู้

Concurrency เป็นหนึ่งในจุดแข็งที่สุดของ Go และการเข้าใจ pattern เหล่านี้เป็นสิ่งจำเป็นสำหรับการสร้างแอปพลิเคชันที่ scalable และมีประสิทธิภาพ บทความนี้รวบรวม 7 concurrency patterns ที่ใช้กันจริงในงาน production พร้อมอัปเดตให้รองรับฟีเจอร์ใหม่ใน Go 1.25 และ Go 1.26

หมายเหตุเรื่อง Go version: ตัวอย่างทั้งหมดในบทความนี้ทดสอบกับ Go 1.26 (stable release ล่าสุด ณ มีนาคม 2026) ฟีเจอร์ใหม่จะระบุ version ไว้ชัดเจน


มีอะไรใหม่ใน Go 1.25/1.26 ที่เกี่ยวกับ Concurrency?

ก่อนเข้า pattern มาดูฟีเจอร์ใหม่ที่เกี่ยวข้องกับ concurrency กันก่อน:

Go 1.25 — ฟีเจอร์เด่น

  • sync.WaitGroup.Go() — method ใหม่ที่ช่วยลดโค้ดซ้ำซ้อน แทนที่จะเขียน wg.Add(1); go func() { defer wg.Done(); ... }() ยาว ๆ เหลือแค่บรรทัดเดียว
  • testing/synctest — package ใหม่สำหรับทดสอบ concurrent code แบบ deterministic ด้วย virtual clock
  • Container-aware GOMAXPROCS — runtime ปรับตัวตาม cgroup CPU limit โดยอัตโนมัติ ไม่ต้องตั้งเองอีกต่อไปเมื่อรันใน container
  • runtime/trace.FlightRecorder — เก็บ trace ใน ring buffer แบบ lightweight สำหรับ snapshot เวลามีปัญหา

Go 1.26 — ฟีเจอร์เด่น

  • Goroutine Leak Profile (experimental) — ตรวจจับ goroutine ที่ติด block ถาวรบน channel/mutex ผ่าน /debug/pprof/goroutineleak
  • os/signal.NotifyContext + Cause — context ที่ถูก cancel จาก signal จะบอกได้ว่าเป็น signal อะไร ผ่าน context.Cause(ctx)
  • Scheduler Metrics ใหม่/sched/goroutines, /sched/threads, /sched/goroutines-created สำหรับ monitor runtime

⚠️ เตรียมตัวสำหรับ Go 1.27: asynctimerchan GODEBUG จะถูกลบออก — timer/ticker channel จะเป็น unbuffered เสมอ


1. Worker Pool — ควบคุมจำนวน Goroutine ที่ทำงานพร้อมกัน

Worker Pool คือ pattern ที่สร้าง goroutine จำนวนคงที่เพื่อประมวลผล task จาก queue ที่ใช้ร่วมกัน ช่วยควบคุมการใช้ทรัพยากรอย่างมีประสิทธิภาพ

แบบดั้งเดิม (ก่อน Go 1.25)

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker %d เริ่มงาน %d\n", id, job)
        time.Sleep(time.Second)
        fmt.Printf("Worker %d เสร็จงาน %d\n", id, job)
        results <- job * 2
    }
}

func main() {
    const numJobs = 5
    const numWorkers = 3
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    var wg sync.WaitGroup

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    wg.Wait()
    close(results)

    for result := range results {
        fmt.Println("ผลลัพธ์:", result)
    }
}

แบบ Go 1.25+ ด้วย WaitGroup.Go()

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    const numJobs = 5
    const numWorkers = 3
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    var wg sync.WaitGroup

    // Go 1.25: ใช้ wg.Go() แทน wg.Add(1) + go func() + defer wg.Done()
    for i := 1; i <= numWorkers; i++ {
        id := i
        wg.Go(func() {
            for job := range jobs {
                fmt.Printf("Worker %d เริ่มงาน %d\n", id, job)
                time.Sleep(time.Second)
                fmt.Printf("Worker %d เสร็จงาน %d\n", id, job)
                results <- job * 2
            }
        })
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    wg.Wait()
    close(results)

    for result := range results {
        fmt.Println("ผลลัพธ์:", result)
    }
}

💡 wg.Go() ทำสามอย่างในบรรทัดเดียว: เรียก Add(1), สร้าง goroutine, และเรียก Done() เมื่อ function return — ลดโอกาสลืม Done() หรือ Add() ผิดจำนวน

สถานการณ์จริง: Web server ที่รับ HTTP request เข้ามา โดยแต่ละ request จะถูกประมวลผลโดย worker จาก pool ที่กำหนดไว้ ช่วยป้องกันไม่ให้สร้าง goroutine มากเกินไปจนทรัพยากรหมด


2. Fan-Out, Fan-In — กระจายงานออก รวมผลกลับ

Fan-Out คือการสร้างหลาย goroutine เพื่อประมวลผลข้อมูลแบบขนาน ส่วน Fan-In คือการรวมผลลัพธ์จากหลาย goroutine กลับมาเป็น pipeline เดียว

package main

import (
    "fmt"
    "sync"
)

func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Printf("Producer %d ส่งค่า %d\n", id, i)
    }
}

func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for v := range in {
        out <- v * 2
        fmt.Printf("Consumer %d ประมวลผล %d\n", id, v)
    }
}

func main() {
    numProducers := 2
    numConsumers := 2
    input := make(chan int, 10)
    output := make(chan int, 10)
    var prodWg, consWg sync.WaitGroup

    // Fan-Out: สร้างหลาย producer
    for i := 1; i <= numProducers; i++ {
        prodWg.Add(1)
        go producer(i, input, &prodWg)
    }

    // รอ producer ทั้งหมดเสร็จแล้วปิด channel
    go func() {
        prodWg.Wait()
        close(input)
    }()

    // Fan-In: สร้างหลาย consumer
    for i := 1; i <= numConsumers; i++ {
        consWg.Add(1)
        go consumer(i, input, output, &consWg)
    }

    // รอ consumer ทั้งหมดเสร็จแล้วปิด output
    go func() {
        consWg.Wait()
        close(output)
    }()

    for result := range output {
        fmt.Println("ผลลัพธ์:", result)
    }
}

เวอร์ชัน Go 1.25+ ด้วย WaitGroup.Go()

func main() {
    numProducers := 2
    numConsumers := 2
    input := make(chan int, 10)
    output := make(chan int, 10)
    var prodWg, consWg sync.WaitGroup

    for i := 1; i <= numProducers; i++ {
        id := i
        prodWg.Go(func() {
            for j := 0; j < 5; j++ {
                input <- j
                fmt.Printf("Producer %d ส่งค่า %d\n", id, j)
            }
        })
    }

    go func() { prodWg.Wait(); close(input) }()

    for i := 1; i <= numConsumers; i++ {
        id := i
        consWg.Go(func() {
            for v := range input {
                output <- v * 2
                fmt.Printf("Consumer %d ประมวลผล %d\n", id, v)
            }
        })
    }

    go func() { consWg.Wait(); close(output) }()

    for result := range output {
        fmt.Println("ผลลัพธ์:", result)
    }
}

สถานการณ์จริง: Data processing pipeline ที่แต่ละ stage ของการประมวลผลถูกจัดการโดย worker คนละชุด เช่น ระบบ ETL ที่ต้อง extract, transform และ load ข้อมูลแบบขนาน


3. Pipeline — ส่งข้อมูลผ่านหลาย stage ต่อกัน

Pipeline pattern คือการเชื่อม stage หลาย ๆ ตัวเข้าด้วยกัน โดยแต่ละ stage รับข้อมูลจาก channel, ประมวลผล แล้วส่งต่อไปยัง channel ถัดไป

package main

import "fmt"

func stage1(nums []int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func stage2(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2 // คูณสอง
        }
        close(out)
    }()
    return out
}

func stage3(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n + 1 // บวกหนึ่ง
        }
        close(out)
    }()
    return out
}

func main() {
    nums := []int{1, 2, 3, 4, 5}

    // เชื่อม pipeline: stage1 → stage2 → stage3
    c1 := stage1(nums)
    c2 := stage2(c1)
    c3 := stage3(c2)

    for result := range c3 {
        fmt.Println(result)
        // ผลลัพธ์: 3, 5, 7, 9, 11
    }
}

สถานการณ์จริง: ระบบประมวลผลภาพ (image processing) ที่รูปภาพต้องผ่านหลาย stage เช่น resize → filter → encode แต่ละ stage ทำงานเป็น goroutine แยกต่างหาก ส่งผ่าน channel ต่อกัน


4. Publish-Subscribe — กระจายข้อความไปยังหลาย subscriber

Publish-Subscribe (Pub-Sub) ช่วยให้ส่งข้อความไปยัง subscriber หลายตัวพร้อมกัน เหมาะกับระบบที่ service ต่าง ๆ ต้องตอบสนองต่อ event อย่างอิสระ

package main

import (
    "fmt"
    "sync"
    "time"
)

type PubSub struct {
    mu       sync.Mutex
    channels map[string][]chan string
}

func NewPubSub() *PubSub {
    return &PubSub{
        channels: make(map[string][]chan string),
    }
}

func (ps *PubSub) Subscribe(topic string) <-chan string {
    ch := make(chan string, 1) // buffered เพื่อไม่ให้ Publish block
    ps.mu.Lock()
    ps.channels[topic] = append(ps.channels[topic], ch)
    ps.mu.Unlock()
    return ch
}

func (ps *PubSub) Publish(topic, msg string) {
    ps.mu.Lock()
    for _, ch := range ps.channels[topic] {
        ch <- msg
    }
    ps.mu.Unlock()
}

func (ps *PubSub) Close(topic string) {
    ps.mu.Lock()
    for _, ch := range ps.channels[topic] {
        close(ch)
    }
    ps.mu.Unlock()
}

func main() {
    ps := NewPubSub()

    subscriber1 := ps.Subscribe("news")
    subscriber2 := ps.Subscribe("news")

    var wg sync.WaitGroup

    // Go 1.25+: ใช้ wg.Go() สำหรับ subscriber goroutine
    wg.Go(func() {
        for msg := range subscriber1 {
            fmt.Println("Subscriber 1 ได้รับ:", msg)
        }
    })

    wg.Go(func() {
        for msg := range subscriber2 {
            fmt.Println("Subscriber 2 ได้รับ:", msg)
        }
    })

    ps.Publish("news", "ข่าวด่วน!")
    ps.Publish("news", "ข่าวเพิ่มเติม!")

    time.Sleep(time.Second)
    ps.Close("news")
    wg.Wait()
}

สถานการณ์จริง: ระบบ messaging ที่ service ต่าง ๆ subscribe รับ event ที่สนใจ เช่น ระบบ e-commerce ที่ order event ถูกส่งไป inventory service, notification service, analytics service พร้อมกัน


5. Select + Timeout — หลีกเลี่ยงการ block ไม่รู้จบ

ใช้ select ร่วมกับ timeout เพื่อป้องกันการรอ channel ตลอดกาล pattern นี้สำคัญมากสำหรับระบบที่ต้อง resilient

package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(chan string)

    go func() {
        time.Sleep(2 * time.Second)
        c <- "ผลลัพธ์"
    }()

    select {
    case res := <-c:
        fmt.Println("ได้รับ:", res)
    case <-time.After(1 * time.Second):
        fmt.Println("หมดเวลา!")
    }
}

เวอร์ชันที่ดีกว่าด้วย context (แนะนำสำหรับ production)

package main

import (
    "context"
    "fmt"
    "time"
)

func fetchData(ctx context.Context) (string, error) {
    ch := make(chan string, 1)

    go func() {
        time.Sleep(2 * time.Second) // จำลองงานที่ช้า
        ch <- "ข้อมูลจาก API"
    }()

    select {
    case result := <-ch:
        return result, nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // สร้าง context ที่มี timeout 1 วินาที
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err := fetchData(ctx)
    if err != nil {
        fmt.Println("Error:", err)
        // ใน Go 1.26: ดู cause จาก context.Cause(ctx) ได้ด้วย
        return
    }
    fmt.Println("สำเร็จ:", result)
}

💡 Go 1.26 Tip: หากใช้ os/signal.NotifyContext สำหรับ graceful shutdown ตอนนี้สามารถตรวจสอบได้ว่า signal ไหนที่ cancel context ผ่าน context.Cause(ctx) — เหมาะสำหรับ logging ว่าถูก SIGTERM หรือ SIGINT

สถานการณ์จริง: Network client ที่พยายามเชื่อมต่อกับ server และหยุดรอหาก server ไม่ตอบกลับทันเวลาที่กำหนด


6. Semaphore — จำกัดจำนวน goroutine ที่เข้าถึงทรัพยากร

Semaphore ใช้ควบคุมจำนวน goroutine ที่เข้าถึงทรัพยากรใดทรัพยากรหนึ่งพร้อมกัน ป้องกัน resource overload

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, sem chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    sem <- struct{}{} // ขอ semaphore
    fmt.Printf("Worker %d เริ่มทำงาน\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d ทำงานเสร็จ\n", id)
    <-sem // คืน semaphore
}

func main() {
    const numWorkers = 5
    const maxConcurrent = 2
    sem := make(chan struct{}, maxConcurrent)
    var wg sync.WaitGroup

    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, sem, &wg)
    }

    wg.Wait()
}

เวอร์ชัน Go 1.25+ ด้วย WaitGroup.Go()

func main() {
    const numWorkers = 5
    const maxConcurrent = 2
    sem := make(chan struct{}, maxConcurrent)
    var wg sync.WaitGroup

    for i := 1; i <= numWorkers; i++ {
        id := i
        wg.Go(func() {
            sem <- struct{}{} // ขอ semaphore
            fmt.Printf("Worker %d เริ่มทำงาน\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d ทำงานเสร็จ\n", id)
            <-sem // คืน semaphore
        })
    }

    wg.Wait()
}

สถานการณ์จริง: Database connection pool ที่อนุญาตให้มี connection พร้อมกันได้จำนวนจำกัด หรือระบบที่ต้องจำกัดจำนวน concurrent HTTP request ไปยัง external API


7. Rate Limiting — ควบคุมความถี่ในการประมวลผล

Rate Limiting ใช้ ticker ควบคุมความเร็วในการประมวลผล event ป้องกันไม่ให้ระบบปลายทางรับภาระมากเกินไป

package main

import (
    "fmt"
    "time"
)

func main() {
    rate := time.Second
    ticker := time.NewTicker(rate)
    defer ticker.Stop()

    requests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)

    for req := range requests {
        <-ticker.C // รอ tick ถัดไป
        fmt.Println("กำลังประมวลผล request", req)
    }
}

⚠️ เตรียมตัวสำหรับ Go 1.27: time.Timer และ time.Ticker channel จะเป็น unbuffered เสมอ (ลบ asynctimerchan GODEBUG) — ถ้าคุณมีโค้ดที่ depend กับ buffered timer channel ให้ตรวจสอบและปรับแก้ตั้งแต่ตอนนี้

สถานการณ์จริง: API gateway ที่จำกัดจำนวน request ที่ผู้ใช้สามารถส่งได้ในช่วงเวลาหนึ่ง เช่น 100 requests ต่อวินาที


เคล็ดลับ Production สำหรับ Go 1.25/1.26

1. ทดสอบ Concurrent Code ด้วย testing/synctest

Go 1.25 มี package testing/synctest ที่ช่วยให้ทดสอบ concurrent code ได้แบบ deterministic:

package main_test

import (
    "testing"
    "testing/synctest"
    "time"
)

func TestTimeout(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ch := make(chan string)

        go func() {
            time.Sleep(5 * time.Second) // virtual clock — ไม่รอจริง!
            ch <- "done"
        }()

        synctest.Wait() // รอจน goroutine ทั้งหมด block

        select {
        case <-ch:
            t.Fatal("ไม่ควรได้รับค่า")
        case <-time.After(1 * time.Second):
            // ถูกต้อง — timeout ก่อน
        }
    })
}

2. ตรวจจับ Goroutine Leak ด้วย Go 1.26

// เปิดใช้งาน: GOEXPERIMENT=goroutineleakprofile go run main.go

import _ "net/http/pprof"

// เข้าถึงได้ที่: /debug/pprof/goroutineleak
// จะแสดง goroutine ที่ติด block ถาวรบน channel, mutex, หรือ cond
// ที่ไม่สามารถ unblock ได้อีกแล้ว (ตรวจจากผ่าน GC reachability)

3. Container-Aware GOMAXPROCS (Go 1.25)

// ใน Go 1.25+ ไม่ต้องใช้ library ภายนอกเช่น automaxprocs อีกต่อไป
// runtime จะอ่าน cgroup CPU limit โดยอัตโนมัติ

// ถ้าต้องการ reset กลับเป็นค่า container-aware default:
import "runtime"

func init() {
    runtime.SetDefaultGOMAXPROCS()
}

4. Graceful Shutdown ด้วย Signal Cause (Go 1.26)

package main

import (
    "context"
    "fmt"
    "os/signal"
    "syscall"
)

func main() {
    ctx, stop := signal.NotifyContext(
        context.Background(),
        syscall.SIGTERM, syscall.SIGINT,
    )
    defer stop()

    <-ctx.Done()

    // Go 1.26: ดูได้ว่า signal ไหนที่ cancel context
    cause := context.Cause(ctx)
    fmt.Printf("ปิดระบบเนื่องจาก: %v\n", cause)
    // เช่น "signal: terminated" หรือ "signal: interrupt"
}

สรุป

Concurrency patterns ใน Go เป็นพื้นฐานที่สำคัญสำหรับการสร้างแอปพลิเคชันที่มีประสิทธิภาพและ scalable โดยเฉพาะใน Go 1.25/1.26 ที่มีฟีเจอร์ใหม่ช่วยให้เขียนโค้ดได้สะอาดและปลอดภัยขึ้น:

Patternใช้เมื่อGo 1.25+ ดีขึ้นยังไง
Worker Poolควบคุมจำนวน concurrent taskswg.Go() ลดโค้ดซ้ำซ้อน
Fan-Out/Fan-Inประมวลผลขนานแล้วรวมผลwg.Go() + goroutine leak detection
Pipelineประมวลผลหลาย stage ต่อกันsynctest ทดสอบได้ง่ายขึ้น
Pub-Subกระจาย event ไปหลาย servicewg.Go() สำหรับ subscriber
Select + Timeoutป้องกัน block ไม่รู้จบsignal.NotifyContext + cause
Semaphoreจำกัด concurrent resource accessContainer-aware GOMAXPROCS
Rate Limitingควบคุมความถี่ requestเตรียมตัวสำหรับ unbuffered timer (1.27)

ขอบคุณที่อ่านจนจบ! ถ้าบทความนี้มีประโยชน์ อย่าลืมแชร์และติดตามเนื้อหาเกี่ยวกับ Go และการพัฒนาซอฟต์แวร์เพิ่มเติม