7 Concurrency Patterns ใน Go ที่คุณต้องรู้ — อัปเดต Go 1.25/1.26
7 Concurrency Patterns ใน Go ที่คุณต้องรู้
Concurrency เป็นหนึ่งในจุดแข็งที่สุดของ Go และการเข้าใจ pattern เหล่านี้เป็นสิ่งจำเป็นสำหรับการสร้างแอปพลิเคชันที่ scalable และมีประสิทธิภาพ บทความนี้รวบรวม 7 concurrency patterns ที่ใช้กันจริงในงาน production พร้อมอัปเดตให้รองรับฟีเจอร์ใหม่ใน Go 1.25 และ Go 1.26
หมายเหตุเรื่อง Go version: ตัวอย่างทั้งหมดในบทความนี้ทดสอบกับ Go 1.26 (stable release ล่าสุด ณ มีนาคม 2026) ฟีเจอร์ใหม่จะระบุ version ไว้ชัดเจน
มีอะไรใหม่ใน Go 1.25/1.26 ที่เกี่ยวกับ Concurrency?
ก่อนเข้า pattern มาดูฟีเจอร์ใหม่ที่เกี่ยวข้องกับ concurrency กันก่อน:
Go 1.25 — ฟีเจอร์เด่น
sync.WaitGroup.Go()— method ใหม่ที่ช่วยลดโค้ดซ้ำซ้อน แทนที่จะเขียนwg.Add(1); go func() { defer wg.Done(); ... }()ยาว ๆ เหลือแค่บรรทัดเดียวtesting/synctest— package ใหม่สำหรับทดสอบ concurrent code แบบ deterministic ด้วย virtual clock- Container-aware
GOMAXPROCS— runtime ปรับตัวตาม cgroup CPU limit โดยอัตโนมัติ ไม่ต้องตั้งเองอีกต่อไปเมื่อรันใน container runtime/trace.FlightRecorder— เก็บ trace ใน ring buffer แบบ lightweight สำหรับ snapshot เวลามีปัญหา
Go 1.26 — ฟีเจอร์เด่น
- Goroutine Leak Profile (experimental) — ตรวจจับ goroutine ที่ติด block ถาวรบน channel/mutex ผ่าน
/debug/pprof/goroutineleak os/signal.NotifyContext+ Cause — context ที่ถูก cancel จาก signal จะบอกได้ว่าเป็น signal อะไร ผ่านcontext.Cause(ctx)- Scheduler Metrics ใหม่ —
/sched/goroutines,/sched/threads,/sched/goroutines-createdสำหรับ monitor runtime
⚠️ เตรียมตัวสำหรับ Go 1.27:
asynctimerchanGODEBUG จะถูกลบออก — timer/ticker channel จะเป็น unbuffered เสมอ
1. Worker Pool — ควบคุมจำนวน Goroutine ที่ทำงานพร้อมกัน
Worker Pool คือ pattern ที่สร้าง goroutine จำนวนคงที่เพื่อประมวลผล task จาก queue ที่ใช้ร่วมกัน ช่วยควบคุมการใช้ทรัพยากรอย่างมีประสิทธิภาพ
แบบดั้งเดิม (ก่อน Go 1.25)
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d เริ่มงาน %d\n", id, job)
time.Sleep(time.Second)
fmt.Printf("Worker %d เสร็จงาน %d\n", id, job)
results <- job * 2
}
}
func main() {
const numJobs = 5
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
close(results)
for result := range results {
fmt.Println("ผลลัพธ์:", result)
}
}
แบบ Go 1.25+ ด้วย WaitGroup.Go()
package main
import (
"fmt"
"sync"
"time"
)
func main() {
const numJobs = 5
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// Go 1.25: ใช้ wg.Go() แทน wg.Add(1) + go func() + defer wg.Done()
for i := 1; i <= numWorkers; i++ {
id := i
wg.Go(func() {
for job := range jobs {
fmt.Printf("Worker %d เริ่มงาน %d\n", id, job)
time.Sleep(time.Second)
fmt.Printf("Worker %d เสร็จงาน %d\n", id, job)
results <- job * 2
}
})
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
close(results)
for result := range results {
fmt.Println("ผลลัพธ์:", result)
}
}
💡
wg.Go()ทำสามอย่างในบรรทัดเดียว: เรียกAdd(1), สร้าง goroutine, และเรียกDone()เมื่อ function return — ลดโอกาสลืมDone()หรือAdd()ผิดจำนวน
สถานการณ์จริง: Web server ที่รับ HTTP request เข้ามา โดยแต่ละ request จะถูกประมวลผลโดย worker จาก pool ที่กำหนดไว้ ช่วยป้องกันไม่ให้สร้าง goroutine มากเกินไปจนทรัพยากรหมด
2. Fan-Out, Fan-In — กระจายงานออก รวมผลกลับ
Fan-Out คือการสร้างหลาย goroutine เพื่อประมวลผลข้อมูลแบบขนาน ส่วน Fan-In คือการรวมผลลัพธ์จากหลาย goroutine กลับมาเป็น pipeline เดียว
package main
import (
"fmt"
"sync"
)
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- i
fmt.Printf("Producer %d ส่งค่า %d\n", id, i)
}
}
func consumer(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range in {
out <- v * 2
fmt.Printf("Consumer %d ประมวลผล %d\n", id, v)
}
}
func main() {
numProducers := 2
numConsumers := 2
input := make(chan int, 10)
output := make(chan int, 10)
var prodWg, consWg sync.WaitGroup
// Fan-Out: สร้างหลาย producer
for i := 1; i <= numProducers; i++ {
prodWg.Add(1)
go producer(i, input, &prodWg)
}
// รอ producer ทั้งหมดเสร็จแล้วปิด channel
go func() {
prodWg.Wait()
close(input)
}()
// Fan-In: สร้างหลาย consumer
for i := 1; i <= numConsumers; i++ {
consWg.Add(1)
go consumer(i, input, output, &consWg)
}
// รอ consumer ทั้งหมดเสร็จแล้วปิด output
go func() {
consWg.Wait()
close(output)
}()
for result := range output {
fmt.Println("ผลลัพธ์:", result)
}
}
เวอร์ชัน Go 1.25+ ด้วย WaitGroup.Go()
func main() {
numProducers := 2
numConsumers := 2
input := make(chan int, 10)
output := make(chan int, 10)
var prodWg, consWg sync.WaitGroup
for i := 1; i <= numProducers; i++ {
id := i
prodWg.Go(func() {
for j := 0; j < 5; j++ {
input <- j
fmt.Printf("Producer %d ส่งค่า %d\n", id, j)
}
})
}
go func() { prodWg.Wait(); close(input) }()
for i := 1; i <= numConsumers; i++ {
id := i
consWg.Go(func() {
for v := range input {
output <- v * 2
fmt.Printf("Consumer %d ประมวลผล %d\n", id, v)
}
})
}
go func() { consWg.Wait(); close(output) }()
for result := range output {
fmt.Println("ผลลัพธ์:", result)
}
}
สถานการณ์จริง: Data processing pipeline ที่แต่ละ stage ของการประมวลผลถูกจัดการโดย worker คนละชุด เช่น ระบบ ETL ที่ต้อง extract, transform และ load ข้อมูลแบบขนาน
3. Pipeline — ส่งข้อมูลผ่านหลาย stage ต่อกัน
Pipeline pattern คือการเชื่อม stage หลาย ๆ ตัวเข้าด้วยกัน โดยแต่ละ stage รับข้อมูลจาก channel, ประมวลผล แล้วส่งต่อไปยัง channel ถัดไป
package main
import "fmt"
func stage1(nums []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func stage2(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2 // คูณสอง
}
close(out)
}()
return out
}
func stage3(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + 1 // บวกหนึ่ง
}
close(out)
}()
return out
}
func main() {
nums := []int{1, 2, 3, 4, 5}
// เชื่อม pipeline: stage1 → stage2 → stage3
c1 := stage1(nums)
c2 := stage2(c1)
c3 := stage3(c2)
for result := range c3 {
fmt.Println(result)
// ผลลัพธ์: 3, 5, 7, 9, 11
}
}
สถานการณ์จริง: ระบบประมวลผลภาพ (image processing) ที่รูปภาพต้องผ่านหลาย stage เช่น resize → filter → encode แต่ละ stage ทำงานเป็น goroutine แยกต่างหาก ส่งผ่าน channel ต่อกัน
4. Publish-Subscribe — กระจายข้อความไปยังหลาย subscriber
Publish-Subscribe (Pub-Sub) ช่วยให้ส่งข้อความไปยัง subscriber หลายตัวพร้อมกัน เหมาะกับระบบที่ service ต่าง ๆ ต้องตอบสนองต่อ event อย่างอิสระ
package main
import (
"fmt"
"sync"
"time"
)
type PubSub struct {
mu sync.Mutex
channels map[string][]chan string
}
func NewPubSub() *PubSub {
return &PubSub{
channels: make(map[string][]chan string),
}
}
func (ps *PubSub) Subscribe(topic string) <-chan string {
ch := make(chan string, 1) // buffered เพื่อไม่ให้ Publish block
ps.mu.Lock()
ps.channels[topic] = append(ps.channels[topic], ch)
ps.mu.Unlock()
return ch
}
func (ps *PubSub) Publish(topic, msg string) {
ps.mu.Lock()
for _, ch := range ps.channels[topic] {
ch <- msg
}
ps.mu.Unlock()
}
func (ps *PubSub) Close(topic string) {
ps.mu.Lock()
for _, ch := range ps.channels[topic] {
close(ch)
}
ps.mu.Unlock()
}
func main() {
ps := NewPubSub()
subscriber1 := ps.Subscribe("news")
subscriber2 := ps.Subscribe("news")
var wg sync.WaitGroup
// Go 1.25+: ใช้ wg.Go() สำหรับ subscriber goroutine
wg.Go(func() {
for msg := range subscriber1 {
fmt.Println("Subscriber 1 ได้รับ:", msg)
}
})
wg.Go(func() {
for msg := range subscriber2 {
fmt.Println("Subscriber 2 ได้รับ:", msg)
}
})
ps.Publish("news", "ข่าวด่วน!")
ps.Publish("news", "ข่าวเพิ่มเติม!")
time.Sleep(time.Second)
ps.Close("news")
wg.Wait()
}
สถานการณ์จริง: ระบบ messaging ที่ service ต่าง ๆ subscribe รับ event ที่สนใจ เช่น ระบบ e-commerce ที่ order event ถูกส่งไป inventory service, notification service, analytics service พร้อมกัน
5. Select + Timeout — หลีกเลี่ยงการ block ไม่รู้จบ
ใช้ select ร่วมกับ timeout เพื่อป้องกันการรอ channel ตลอดกาล pattern นี้สำคัญมากสำหรับระบบที่ต้อง resilient
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan string)
go func() {
time.Sleep(2 * time.Second)
c <- "ผลลัพธ์"
}()
select {
case res := <-c:
fmt.Println("ได้รับ:", res)
case <-time.After(1 * time.Second):
fmt.Println("หมดเวลา!")
}
}
เวอร์ชันที่ดีกว่าด้วย context (แนะนำสำหรับ production)
package main
import (
"context"
"fmt"
"time"
)
func fetchData(ctx context.Context) (string, error) {
ch := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second) // จำลองงานที่ช้า
ch <- "ข้อมูลจาก API"
}()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
// สร้าง context ที่มี timeout 1 วินาที
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := fetchData(ctx)
if err != nil {
fmt.Println("Error:", err)
// ใน Go 1.26: ดู cause จาก context.Cause(ctx) ได้ด้วย
return
}
fmt.Println("สำเร็จ:", result)
}
💡 Go 1.26 Tip: หากใช้
os/signal.NotifyContextสำหรับ graceful shutdown ตอนนี้สามารถตรวจสอบได้ว่า signal ไหนที่ cancel context ผ่านcontext.Cause(ctx)— เหมาะสำหรับ logging ว่าถูก SIGTERM หรือ SIGINT
สถานการณ์จริง: Network client ที่พยายามเชื่อมต่อกับ server และหยุดรอหาก server ไม่ตอบกลับทันเวลาที่กำหนด
6. Semaphore — จำกัดจำนวน goroutine ที่เข้าถึงทรัพยากร
Semaphore ใช้ควบคุมจำนวน goroutine ที่เข้าถึงทรัพยากรใดทรัพยากรหนึ่งพร้อมกัน ป้องกัน resource overload
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, sem chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
sem <- struct{}{} // ขอ semaphore
fmt.Printf("Worker %d เริ่มทำงาน\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d ทำงานเสร็จ\n", id)
<-sem // คืน semaphore
}
func main() {
const numWorkers = 5
const maxConcurrent = 2
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, sem, &wg)
}
wg.Wait()
}
เวอร์ชัน Go 1.25+ ด้วย WaitGroup.Go()
func main() {
const numWorkers = 5
const maxConcurrent = 2
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
id := i
wg.Go(func() {
sem <- struct{}{} // ขอ semaphore
fmt.Printf("Worker %d เริ่มทำงาน\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d ทำงานเสร็จ\n", id)
<-sem // คืน semaphore
})
}
wg.Wait()
}
สถานการณ์จริง: Database connection pool ที่อนุญาตให้มี connection พร้อมกันได้จำนวนจำกัด หรือระบบที่ต้องจำกัดจำนวน concurrent HTTP request ไปยัง external API
7. Rate Limiting — ควบคุมความถี่ในการประมวลผล
Rate Limiting ใช้ ticker ควบคุมความเร็วในการประมวลผล event ป้องกันไม่ให้ระบบปลายทางรับภาระมากเกินไป
package main
import (
"fmt"
"time"
)
func main() {
rate := time.Second
ticker := time.NewTicker(rate)
defer ticker.Stop()
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-ticker.C // รอ tick ถัดไป
fmt.Println("กำลังประมวลผล request", req)
}
}
⚠️ เตรียมตัวสำหรับ Go 1.27:
time.Timerและtime.Tickerchannel จะเป็น unbuffered เสมอ (ลบasynctimerchanGODEBUG) — ถ้าคุณมีโค้ดที่ depend กับ buffered timer channel ให้ตรวจสอบและปรับแก้ตั้งแต่ตอนนี้
สถานการณ์จริง: API gateway ที่จำกัดจำนวน request ที่ผู้ใช้สามารถส่งได้ในช่วงเวลาหนึ่ง เช่น 100 requests ต่อวินาที
เคล็ดลับ Production สำหรับ Go 1.25/1.26
1. ทดสอบ Concurrent Code ด้วย testing/synctest
Go 1.25 มี package testing/synctest ที่ช่วยให้ทดสอบ concurrent code ได้แบบ deterministic:
package main_test
import (
"testing"
"testing/synctest"
"time"
)
func TestTimeout(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ch := make(chan string)
go func() {
time.Sleep(5 * time.Second) // virtual clock — ไม่รอจริง!
ch <- "done"
}()
synctest.Wait() // รอจน goroutine ทั้งหมด block
select {
case <-ch:
t.Fatal("ไม่ควรได้รับค่า")
case <-time.After(1 * time.Second):
// ถูกต้อง — timeout ก่อน
}
})
}
2. ตรวจจับ Goroutine Leak ด้วย Go 1.26
// เปิดใช้งาน: GOEXPERIMENT=goroutineleakprofile go run main.go
import _ "net/http/pprof"
// เข้าถึงได้ที่: /debug/pprof/goroutineleak
// จะแสดง goroutine ที่ติด block ถาวรบน channel, mutex, หรือ cond
// ที่ไม่สามารถ unblock ได้อีกแล้ว (ตรวจจากผ่าน GC reachability)
3. Container-Aware GOMAXPROCS (Go 1.25)
// ใน Go 1.25+ ไม่ต้องใช้ library ภายนอกเช่น automaxprocs อีกต่อไป
// runtime จะอ่าน cgroup CPU limit โดยอัตโนมัติ
// ถ้าต้องการ reset กลับเป็นค่า container-aware default:
import "runtime"
func init() {
runtime.SetDefaultGOMAXPROCS()
}
4. Graceful Shutdown ด้วย Signal Cause (Go 1.26)
package main
import (
"context"
"fmt"
"os/signal"
"syscall"
)
func main() {
ctx, stop := signal.NotifyContext(
context.Background(),
syscall.SIGTERM, syscall.SIGINT,
)
defer stop()
<-ctx.Done()
// Go 1.26: ดูได้ว่า signal ไหนที่ cancel context
cause := context.Cause(ctx)
fmt.Printf("ปิดระบบเนื่องจาก: %v\n", cause)
// เช่น "signal: terminated" หรือ "signal: interrupt"
}
สรุป
Concurrency patterns ใน Go เป็นพื้นฐานที่สำคัญสำหรับการสร้างแอปพลิเคชันที่มีประสิทธิภาพและ scalable โดยเฉพาะใน Go 1.25/1.26 ที่มีฟีเจอร์ใหม่ช่วยให้เขียนโค้ดได้สะอาดและปลอดภัยขึ้น:
| Pattern | ใช้เมื่อ | Go 1.25+ ดีขึ้นยังไง |
|---|---|---|
| Worker Pool | ควบคุมจำนวน concurrent tasks | wg.Go() ลดโค้ดซ้ำซ้อน |
| Fan-Out/Fan-In | ประมวลผลขนานแล้วรวมผล | wg.Go() + goroutine leak detection |
| Pipeline | ประมวลผลหลาย stage ต่อกัน | synctest ทดสอบได้ง่ายขึ้น |
| Pub-Sub | กระจาย event ไปหลาย service | wg.Go() สำหรับ subscriber |
| Select + Timeout | ป้องกัน block ไม่รู้จบ | signal.NotifyContext + cause |
| Semaphore | จำกัด concurrent resource access | Container-aware GOMAXPROCS |
| Rate Limiting | ควบคุมความถี่ request | เตรียมตัวสำหรับ unbuffered timer (1.27) |
ขอบคุณที่อ่านจนจบ! ถ้าบทความนี้มีประโยชน์ อย่าลืมแชร์และติดตามเนื้อหาเกี่ยวกับ Go และการพัฒนาซอฟต์แวร์เพิ่มเติม