Go Alerting Engine: ให้ระบบ IoT แจ้งเตือนเองเลย

Go Alerting Engine: ให้ระบบ IoT แจ้งเตือนเองเลย

ShowkhunWorkshop

Go Alerting Engine: ให้ระบบ IoT แจ้งเตือนเองเลย

Branch: step-09-alerting Phase: Development (9/9) — Alerting Repo: kangana1024/showkhun-workshop


เคยเจอไหมครับ? ห้อง server ร้อนไป 40 องศา แต่รู้ตอนที่ hardware พังไปแล้ว อยากให้ระบบมันส่งข้อความมาบอกก่อนได้ไหม? วันนี้เราจะทำแบบนั้นกัน — แต่แทนที่จะพึ่งเครื่องมือสำเร็จรูป เราจะ เขียน alerting engine เป็น Go เอง ฝังไปกับ ingestion path เลย (ʘ‿ʘ)

ทำไมไม่ใช้ Kapacitor/TICKscript? ตอนวาง draft แรกเราเคยคิดจะใช้ Kapacitor แต่พอลงมือทำจริง การเขียน engine เป็น Go เองให้ข้อดีหลายอย่าง: ประเมิน reading ได้ทันที ตอน ingest (ไม่ต้องรอ subscription), reuse logic เดียวกันทั้ง REST และ MQTT, unit-test ได้ละเอียด, และจัดการ rule ผ่าน REST API + MongoDB ได้เหมือน resource อื่นๆ ในระบบ — บทความนี้อ้างอิงตามโค้ดจริงใน branch step-09-alerting ทั้งหมด


ก่อนเริ่ม: น้องๆ จะได้อะไรจาก workshop นี้?

  • เข้าใจ โครงสร้าง alert rule 3 แบบ: threshold, offline, anomaly
  • เขียน evaluator แบบ pure function ที่ unit-test ได้สบาย
  • รู้ว่า engine ประเมิน reading ตอน ingest ยังไง (ไม่ต้อง poll)
  • ทำ anomaly detection ด้วย z-score จาก rolling baseline ในหน่วยความจำ
  • กัน alert spam ด้วย cooldown + เก็บ history บน Mongo แบบ TTL
  • ส่ง webhook แบบ Slack-compatible พร้อม SSRF guard
  • จัดการ rule ผ่าน REST CRUD ที่ /api/v1/alert-rules

ทำไมต้องมี Alerting Engine? (WHY ก่อน HOW เสมอ)

ลองนึกภาพน้องเป็น “ยาม” โรงงาน ถ้ามีเซ็นเซอร์ 500 ตัว แล้วน้องต้องนั่งจ้องหน้าจอตลอดเวลา — มันไม่ไหวใช่ไหมครับ?

Alerting engine คือ “ยามอัตโนมัติ” มันดูข้อมูลแทนเรา พอเจอสิ่งผิดปกติตาม rule ที่ตั้งไว้ ก็ยิงแจ้งเตือนออกมาเอง

อุปมา: เหมือน Google Form ที่ตั้ง notification แต่สำหรับ sensor data — เราตั้ง condition ไว้ พอมีค่ากรอกออกนอกขอบเขต ก็ได้รับข้อความทันที


ภาพรวม: Engine อยู่ตรงไหนในระบบ?

จุดที่ต่างจากของเก่าแบบ Kapacitor มากที่สุดคือ — engine นี้ ไม่ได้ subscribe InfluxDB แต่ถูกเรียก ตอนข้อมูลถูก ingest เลย:

graph TD
    A[📡 MQTT / REST reading] --> B[SensorService.IngestBatch]
    B -->|เขียนลง| C[(InfluxDB 2.7)]
    B -->|fan-out| D[WebSocket]
    B -->|evaluateAlerts| E[AlertEngine.Evaluate]
    E -->|โหลด rule| F[(MongoDB: alert_rules)]
    E -->|บันทึก| G[(MongoDB: alert_history TTL)]
    E -->|ยิง| H[🔔 Webhook Slack-compatible]

เพราะ engine ถูกเรียกจาก ingestion path เดียวกัน ทั้ง reading ที่มาทาง REST และ MQTT จึงถูกประเมินแบบเดียวกันเป๊ะ — โค้ดจาก backend/internal/service/sensor.go:

// evaluateAlerts รัน alert engine กับ reading ที่เพิ่งเขียนลงไป
// เป็น no-op ถ้าไม่ได้ wire engine ไว้ และ "ห้าม" ทำให้ ingestion ล้ม
func (s *SensorService) evaluateAlerts(ctx context.Context, live LiveReading) {
	if s.alerts == nil {
		return
	}
	s.alerts.Evaluate(ctx, live.DeviceID, live.Fields, live.Timestamp)
}

กฎเหล็ก: alerting เป็น side-channel — ถ้า engine พังต้องไม่ลาก ingestion ล้มตามไปด้วย (เดี๋ยวจะเห็นว่าโค้ด swallow error แล้ว log แทน)


Alert Rule: หน้าตาเป็นยังไง?

ก่อนเขียน logic เราต้องรู้จัก “rule” ก่อน โปรเจกต์รองรับ 3 type (จาก backend/internal/model/alert.go):

const (
	// threshold: ค่าทะลุเส้นที่ตั้งไว้
	AlertRuleThreshold AlertRuleType = "threshold"
	// offline: device เงียบหายไปเกินเวลาที่กำหนด
	AlertRuleOffline AlertRuleType = "offline"
	// anomaly: ค่าเบี่ยงจาก baseline เกิน z-score ที่ตั้ง
	AlertRuleAnomaly AlertRuleType = "anomaly"
)

แต่ละ type ใช้ field ใน AlertCondition ต่างกัน:

Type ใช้ field อะไร ความหมาย
threshold metric, operator, value เช่น temperature gt 35
offline offline_after_seconds ไม่มี reading ภายใน N วินาที = ออฟไลน์
anomaly metric, zscore_threshold ค่าเบี่ยงจาก baseline เกิน z ที่ตั้ง

และ rule ทั้งก้อนมี severity (info/warning/critical), device_id (เว้นว่าง = ทุก device), cooldown_seconds กับ enabled:

type AlertRule struct {
	ID          bson.ObjectID  `bson:"_id,omitempty"  json:"id"`
	Name        string         `bson:"name"           json:"name"`
	Type        AlertRuleType  `bson:"type"           json:"type"`
	Condition   AlertCondition `bson:"condition"      json:"condition"`
	Severity    AlertSeverity  `bson:"severity"       json:"severity"`
	DeviceID    string         `bson:"device_id,omitempty" json:"device_id,omitempty"`
	CooldownSeconds int        `bson:"cooldown_seconds,omitempty" json:"cooldown_seconds,omitempty"`
	Enabled     bool           `bson:"enabled"        json:"enabled"`
	// ... CreatedAt / UpdatedAt
}

Step 1: Evaluator — หัวใจที่เป็น Pure Function

ทำไม ต้องแยก evaluator ให้เป็น pure function (ไม่มี I/O, ไม่แตะนาฬิกาเอง)? เพราะมันทำให้ unit-test ได้แบบ exhaustive — ป้อน input เข้าไป ได้ output ออกมา ไม่ต้องตั้ง InfluxDB/Mongo จริง

backend/internal/service/alert_eval.go แตก logic ตาม type:

// evaluateRule เลือก strategy ตาม type ของ rule — pure ล้วน ไม่มี I/O
func evaluateRule(in EvalInput) EvalResult {
	switch in.Rule.Type {
	case model.AlertRuleThreshold:
		return evalThreshold(in)
	case model.AlertRuleOffline:
		return evalOffline(in)
	case model.AlertRuleAnomaly:
		return evalAnomaly(in)
	default:
		return EvalResult{}
	}
}

Threshold — ค่าทะลุเส้น

// evalThreshold ยิงเมื่อ metric ที่เฝ้าผ่านเงื่อนไข operator/value
func evalThreshold(in EvalInput) EvalResult {
	metric := in.Rule.Condition.Metric
	v, ok := in.Fields[metric]
	if !ok {
		return EvalResult{} // reading นี้ไม่มี metric ที่เฝ้า → ไม่ยิง
	}
	if !compare(v, in.Rule.Condition.Operator, in.Rule.Condition.Value) {
		return EvalResult{}
	}
	return EvalResult{
		Fired:  true,
		Metric: metric,
		Value:  v,
		Message: fmt.Sprintf("%s on %s: %s = %.4g %s %.4g",
			in.Rule.Name, in.DeviceID, metric, v,
			operatorSymbol(in.Rule.Condition.Operator), in.Rule.Condition.Value),
	}
}

operator รองรับ gt, gte, lt, lte, eq, neq — ครอบทุกการเทียบที่ต้องใช้

Offline — device เงียบหาย

ทำไม? บางที device ไม่ได้พัง แค่ battery หมด หรือ WiFi ตัด ถ้าเราไม่รู้ ข้อมูลก็หายเงียบๆ

อุปมา: เหมือนพนักงานเช็คอินทุกเช้า ถ้าวันไหนหายไปเกินเวลา HR ต้องโทรถาม

// evalOffline ยิงเมื่อช่วงห่างจาก reading ก่อนหน้าเกิน window ที่ตั้ง
// PrevSeen เป็นศูนย์ (reading แรกสุด) จะไม่ยิง
func evalOffline(in EvalInput) EvalResult {
	window := time.Duration(in.Rule.Condition.OfflineAfterSeconds) * time.Second
	if window <= 0 || in.PrevSeen.IsZero() {
		return EvalResult{}
	}
	gap := in.Now.Sub(in.PrevSeen)
	if gap < window {
		return EvalResult{}
	}
	return EvalResult{
		Fired: true,
		Value: gap.Seconds(),
		Message: fmt.Sprintf("%s on %s: device was silent for %s (threshold %s)",
			in.Rule.Name, in.DeviceID, gap.Round(time.Second), window),
	}
}

Anomaly — z-score กับ baseline

ทำไม? threshold จับได้แค่ค่าเกิน limit ที่ตั้ง แต่ถ้าปกติอยู่ 25°C วันนี้พุ่งเป็น 32°C (ยังไม่ถึง 35) — มันผิดปกติเชิง “pattern” แต่ threshold มองไม่เห็น

อุปมา z-score: เหมือนคะแนนสอบ ถ้าค่าเฉลี่ยห้อง 50 แล้วคนนึงได้ 90 นั่นคือ “ผิดปกติมาก” เมื่อเทียบกับเพื่อน z-score วัดว่า “เบี่ยงจากปกติกี่เท่าของ standard deviation”

// evalAnomaly ยิงเมื่อ z-score ของ reading เทียบ baseline เกิน threshold
// baseline ที่มีน้อยกว่า 2 ตัวอย่าง หรือ sd = 0 จะไม่ยิง (คำนวณ z ไม่ได้)
func evalAnomaly(in EvalInput) EvalResult {
	metric := in.Rule.Condition.Metric
	v, ok := in.Fields[metric]
	if !ok {
		return EvalResult{}
	}
	if in.Baseline.Count < 2 || in.Baseline.StdDev <= 0 {
		return EvalResult{}
	}
	threshold := in.Rule.Condition.ZScoreThreshold
	if threshold <= 0 {
		return EvalResult{}
	}
	z := (v - in.Baseline.Mean) / in.Baseline.StdDev
	if math.Abs(z) < threshold {
		return EvalResult{}
	}
	return EvalResult{Fired: true, Metric: metric, Value: v, /* ...message... */ }
}

Step 2: Engine — ร้อยทุกอย่างเข้าด้วยกัน

AlertEngine (backend/internal/service/alert.go) คือตัวที่: โหลด rule, เรียก evaluator, เช็ค cooldown, บันทึก history, ยิง notify นี่คือ Evaluate ตัวเต็ม:

// Evaluate รันทุก rule ที่เกี่ยวกับ reading — ไม่เคย return error ออก ingestion path
func (e *AlertEngine) Evaluate(ctx context.Context, deviceID string, fields map[string]float64, now time.Time) {
	if e == nil {
		return
	}
	rules, err := e.rules.ListEnabledForDevice(ctx, deviceID)
	if err != nil {
		e.logWarn("alert: list rules failed", "device_id", deviceID, "error", err)
	}

	prevSeen := e.takeLastSeen(deviceID, now) // ใช้กับ offline rule

	for i := range rules {
		rule := rules[i]
		base := e.baselineFor(deviceID, rule) // snapshot baseline สำหรับ anomaly

		res := evaluateRule(EvalInput{
			Rule: rule, DeviceID: deviceID, Fields: fields,
			Now: now, PrevSeen: prevSeen, Baseline: base,
		})
		if !res.Fired {
			continue
		}
		if e.inCooldown(ctx, rule, deviceID, now) {
			continue
		}
		e.record(ctx, rule, deviceID, res, now)
	}

	// อัปเดต baseline "หลัง" ประเมิน เพื่อให้ reading ถูกวัดกับ distribution
	// "ก่อนหน้า" ตัวมันเอง ไม่ใช่ distribution ที่รวมตัวมันเข้าไปแล้ว
	e.updateBaselines(deviceID, rules, fields)
}

จุดเด็ดคือคอมเมนต์บรรทัดสุดท้าย — อัปเดต baseline หลังประเมินเสมอ ไม่งั้น reading จะถูกวัดกับค่าเฉลี่ยที่มีตัวมันเองปนอยู่ ทำให้ z-score เพี้ยน

Rolling Baseline เก็บใน memory

anomaly ใช้ rollingStats ที่เก็บค่าล่าสุดแบบ ring buffer ขนาดคงที่ (anomalyWindow = 100) แล้วคำนวณ mean/stddev:

// anomalyWindow คือจำนวน sample ล่าสุดที่ baseline เก็บไว้
const anomalyWindow = 100

ข้อดีของการเก็บ baseline ใน memory: เร็วและไม่ต้องยิง query หนักๆ ไป InfluxDB ทุก reading — ส่วนถ้า restart แล้ว baseline หาย ก็แค่เริ่มสะสมใหม่ ไม่กระทบ “ความถูกต้อง” ของ threshold/offline เลย (มันเป็นแค่ตัวเร่ง)


Step 3: กัน Spam ด้วย Cooldown + History แบบ TTL

ถ้าไม่กันอะไรเลย พออุณหภูมิสูงค้างไว้ ทุก reading ก็จะยิง alert รัวๆ จน Slack แตก (╯°□°)╯ โปรเจกต์เลยมี cooldown ต่อ rule+device:

// inCooldown บอกว่า rule นี้เพิ่งยิงให้ device นี้ภายใน cooldown หรือยัง
// ถ้า query history พลาด ให้ถือว่า "ไม่ติด cooldown" จะได้ไม่กลืน alert เงียบๆ
func (e *AlertEngine) inCooldown(ctx context.Context, rule model.AlertRule, deviceID string, now time.Time) bool {
	if rule.CooldownSeconds <= 0 {
		return false
	}
	last, err := e.history.LastTriggeredAt(ctx, rule.ID, deviceID)
	if err != nil {
		e.logWarn("alert: cooldown lookup failed", "rule_id", rule.ID.Hex(), "error", err)
		return false
	}
	if last.IsZero() {
		return false
	}
	return now.Sub(last) < time.Duration(rule.CooldownSeconds)*time.Second
}

ทุกครั้งที่ยิง จะบันทึก AlertHistory ลง Mongo พร้อม ExpiresAt — collection มี TTL index ที่ Mongo ลบ document เก่าทิ้งให้เอง (เก็บไว้ historyTTL = 90 * 24h) ไม่ต้องเขียน cron ลบเอง:

// historyTTL คือระยะเวลาที่ alert ถูกเก็บก่อน TTL index จะลบทิ้ง
const historyTTL = 90 * 24 * time.Hour

Step 4: Notifier — Webhook แบบ Slack-compatible (+ SSRF Guard)

ทำไมต้องมี SSRF guard?

webhook URL มาจาก config ของ operator ก็จริง แต่ถ้าตั้งพลาดให้ชี้ไปที่ 169.254.169.254 (cloud metadata) หรือ host ภายใน อาจกลายเป็นช่องโหว่ SSRF ได้ engine เลย validate URL ตั้งแต่ตอนสร้าง — ถ้า resolve ไปลง private/loopback/link-local จะ ปฏิเสธไม่ยอมสตาร์ท (กัน config อันตรายตั้งแต่ต้น)

backend/internal/notify/webhook.go:

// isDisallowedIP บอกว่า ip อยู่ในช่วงที่ webhook ห้ามยิงไปหา
func isDisallowedIP(ip net.IP) bool {
	if ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() ||
		ip.IsLinkLocalMulticast() || ip.IsUnspecified() {
		return true
	}
	// บล็อก cloud metadata endpoint ชัดๆ (link-local คลุมอยู่แล้ว แต่ย้ำให้เห็นเจตนา)
	if ip.Equal(net.ParseIP("169.254.169.254")) {
		return true
	}
	return false
}

Payload ที่ส่ง — รูปแบบ Slack

notifier ส่ง JSON ที่ Slack incoming webhook กิน (text + attachments) — เด่นคือมีแต่ field ที่ไม่ใช่ข้อมูลลับ ไม่มี token/secret หลุดไปในนั้น:

func (w *WebhookNotifier) Notify(ctx context.Context, e Event) error {
	payload := slackPayload{
		Text: fmt.Sprintf("[%s] %s", strings.ToUpper(string(e.Severity)), e.Message),
		Attachments: []slackAttachment{{
			Color: severityColor(e.Severity), // critical=แดง, warning=ส้ม, info=น้ำเงิน
			Title: e.RuleName,
			Fields: []slackField{
				{Title: "Device", Value: e.DeviceID, Short: true},
				{Title: "Type",   Value: string(e.Type), Short: true},
				{Title: "Metric", Value: e.Metric, Short: true},
				{Title: "Value",  Value: fmt.Sprintf("%.4g", e.Value), Short: true},
			},
			TS: e.TriggeredAt.Unix(),
		}},
	}
	// ... marshal + POST ด้วย client ที่ "ไม่ตาม redirect" (กัน 30x เด้งไป host อื่น)
}

เกร็ด design: notifier อยู่หลัง interface Notifier และมี Multi ที่ fan-out หลาย channel ได้ — อยากเพิ่ม email/SMS ทีหลังก็ทำได้โดยไม่ต้องแตะ engine

เปิดใช้ด้วย env (ค่าเริ่มต้นว่าง = ไม่มี notifier, engine ยังทำงานแค่บันทึก history):

APP_ALERT_ENABLED=true
APP_ALERT_WEBHOOK_URL=https://hooks.slack.com/services/XXX/YYY/ZZZ
APP_ALERT_WEBHOOK_TIMEOUT=5s

Step 5: จัดการ Rule ผ่าน REST API

rule ทั้งหมดจัดการผ่าน CRUD endpoint ที่ mount ใต้ /api/v1 (backend/internal/handler/alert.go):

POST   /api/v1/alert-rules        # สร้าง rule
GET    /api/v1/alert-rules        # list (filter: enabled, type, device_id, limit, offset)
GET    /api/v1/alert-rules/:id    # ดูตัวเดียว
PUT    /api/v1/alert-rules/:id    # แก้ไข (partial update)
DELETE /api/v1/alert-rules/:id    # ลบ

ตัวอย่างสร้าง threshold rule: เตือนเมื่ออุณหภูมิ > 35°C ระดับ critical และกัน spam 5 นาที:

curl -s -X POST http://localhost:8080/api/v1/alert-rules \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Server room too hot",
    "type": "threshold",
    "severity": "critical",
    "device_id": "sensor-01",
    "cooldown_seconds": 300,
    "condition": {
      "metric": "temperature",
      "operator": "gt",
      "value": 35
    }
  }' | jq

สร้าง offline rule (เงียบเกิน 5 นาที = เตือน) และ anomaly rule (z-score > 3):

# offline: ไม่มี reading ใน 300 วินาที
curl -s -X POST http://localhost:8080/api/v1/alert-rules \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Device went silent",
    "type": "offline",
    "severity": "warning",
    "condition": { "offline_after_seconds": 300 }
  }' | jq

# anomaly: temperature เบี่ยงจาก baseline เกิน 3 sd
curl -s -X POST http://localhost:8080/api/v1/alert-rules \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Temperature anomaly",
    "type": "anomaly",
    "severity": "warning",
    "condition": { "metric": "temperature", "zscore_threshold": 3 }
  }' | jq

WHY define ก่อน enable? payload มี field enabled ถ้าไม่ส่งจะ default เป็น true — แต่ส่ง "enabled": false ก็สร้างไว้ “ปิด” ก่อนแล้วค่อยเปิดทีหลังได้ เหมือน install app ไว้แล้วค่อย launch


Step 6: ทดสอบ Alert System

มาลุยกัน! สร้าง rule แล้วยิง reading ที่เกิน threshold:

# 1) มี threshold rule (temperature > 35) อยู่แล้วจาก Step 5

# 2) ยิง reading อุณหภูมิสูง 38.5°C ผ่าน MQTT
docker exec showkhun-mosquitto mosquitto_pub \
  -h localhost \
  -t "devices/sensor-01/telemetry" \
  -m '{ "fields": { "temperature": 38.5 }, "tags": { "location": "room-a" } }'

# 3) เช็ค Slack channel — ควรเด้ง alert สีแดง [CRITICAL] ขึ้นมา
#    หรือถ้าไม่ได้ตั้ง webhook ก็ดู alert history ผ่าน log / Mongo
docker exec showkhun-mongodb mongosh iot_workshop --quiet \
  --eval 'db.alert_history.find().sort({triggered_at:-1}).limit(5).pretty()'

ทดสอบ offline ก็ง่าย — หยุดส่ง telemetry ของ device นั้นไปเกิน offline_after_seconds แล้วส่ง reading ใหม่ทีหลัง engine จะเห็นว่า gap เกิน window และยิง alert offline ออกมา


สรุป: เราทำอะไรไปบ้างใน workshop นี้

น้องๆ ผ่านมาได้ไกลมากเลยครับ! มาดูว่า Alerting ครบแค่ไหน:

ส่วนประกอบ รายละเอียด
Rule types threshold, offline, anomaly (z-score)
Evaluator pure function แยกต่อ type — unit-test ได้ exhaustive
Trigger point ประเมิน ตอน ingest (REST + MQTT ใช้ logic เดียวกัน)
Baseline rolling stats ใน memory (window 100) สำหรับ anomaly
Cooldown กัน spam ต่อ rule+device
History บันทึกบน Mongo + TTL index ลบเก่าทิ้งเอง (90 วัน)
Notifier webhook แบบ Slack-compatible + SSRF guard + no-redirect
REST CRUD /api/v1/alert-rules (create/list/get/update/delete)

ตอนนี้ระบบมี “ยามอัตโนมัติ” ที่เขียนเอง 100% แล้ว — ไม่ต้องพึ่ง Kapacitor/TICKscript, ประเมินไวตั้งแต่ตอน ingest, test ได้ทุกซอก และจัดการ rule ได้เหมือน resource อื่นในระบบ

IoT Backend Workshop เสร็จสมบูรณ์แล้วครับ!

เราได้ผ่าน data layer + alerting ครบทั้งสามตอน:

  1. [Workshop #10] InfluxDB 2.7 + Telegraf Setup: org/bucket/token, retention, downsampled
  2. [Workshop #11] Telegraf json_v2 Pipeline + Flux Query แบบ bounded/ปลอดภัย
  3. [Workshop #12] Go Alerting Engine: threshold/offline/anomaly, cooldown, webhook

Next Step

Backend จบแล้วครับ! ขั้นต่อไปเราจะไปทำ LynxJS Mobile App เพื่อดู alert และ dashboard บน smartphone — เพราะ real-time monitoring บนมือถือมันคูลกว่าแน่นอน (^_^)v