Kapacitor Alerting: ให้ระบบ IoT แจ้งเตือนเองเลย
Kapacitor Alerting: ให้ระบบ IoT แจ้งเตือนเองเลย
Branch:
workshop/dev-09-alertingPhase: Development (9/9) Repo: kangana1024/iot-workshop
เคยเจอไหมครับ? ห้อง server ร้อนไป 40 องศา แต่รู้ตอนที่ hardware พังไปแล้ว อยากให้ระบบมันส่ง LINE มาบอกก่อนได้ไหม? วันนี้เราจะมาทำแบบนั้นกันครับ — ให้ Kapacitor เป็นยามเฝ้าระวังให้น้องๆ ตลอด 24 ชั่วโมง (ʘ‿ʘ)
ก่อนเริ่ม: น้องๆ จะได้อะไรจาก workshop นี้?
หลังจากผ่าน workshop นี้ไป น้องๆ จะ:
- เขียน TICKscript ได้ตั้งแต่บรรทัดแรก
- ตั้ง alert สำหรับ อุณหภูมิ/ความชื้น แบบ threshold
- จับได้ว่า device ออฟไลน์ หายไปไหน
- ตรวจจับค่าผิดปกติด้วย Z-Score (Anomaly Detection)
- ทำ Dead Man’s Switch — ถ้าระบบเงียบผิดปกติ ให้ตีระฆังทันที
- ส่ง alert ไปได้ทั้ง Slack, Email, และ Webhook ไปยัง Go API
ทำไมต้อง Kapacitor? (WHY ก่อน HOW เสมอ)
ลองนึกภาพว่าน้องๆ เป็น “ยาม” โรงงาน ถ้ามีเซ็นเซอร์อุณหภูมิ 500 ตัว แล้วน้องต้องนั่งดูหน้าจอตลอดเวลา — มันไม่ไหวใช่ไหมครับ?
Kapacitor ก็คือ “ยามอัตโนมัติ” ครับ มันนั่งดูข้อมูลแทนเรา พอเจอสิ่งผิดปกติก็ตาม rule ที่เราเขียนไว้ แล้วส่ง alert ออกมาเอง
อุปมาง่ายๆ: Kapacitor เหมือน Google Form with Notification แต่สำหรับ sensor data — เราตั้ง condition ไว้ พอมีคนกรอกแบบแผนออกนอกขอบเขต ก็ได้รับ email ทันที
ภาพรวม: Kapacitor อยู่ตรงไหนในระบบ?
Kapacitor รับ stream จาก InfluxDB แบบ real-time แล้วรัน TICKscript ตลอดเวลา — เหมือน while(true) loop ที่ฉลาดมากครับ
TICKscript Basics: ภาษาของ Kapacitor
ก่อนลงรายละเอียด เราต้องรู้จัก syntax ก่อนนะครับ TICKscript ใช้ pipe operator (|) เพื่อเชื่อม nodes เข้าหากัน เหมือน Unix pipe เลยครับ:
stream
|from()...
|window()...
|mean('value')
|alert()
.crit(lambda: "mean" > 35.0)
.slack()
Node Types หลักที่ใช้บ่อย:
| Node | หน้าที่ |
|---|---|
stream | รับข้อมูล real-time จาก InfluxDB subscription |
batch | รับข้อมูลเป็น batch ตาม schedule |
from() | เลือก measurement และ tags |
where() | filter ข้อมูล |
window() | กำหนด time window |
mean(), max(), min() | aggregate functions |
alert() | กำหนด alert conditions |
eval() | คำนวณค่าใหม่ |
อุปมา:
stream |from() |where() |alert()เหมือน SQLSELECT FROM WHEREแต่ทำงานกับ real-time data แทน static table ครับ
Step 1: เตรียม Directory Structure
มาสร้าง folder สำหรับเก็บ TICKscripts กันก่อนเลยครับ:
mkdir -p deployments/kapacitor/scripts/{temperature,humidity,device,system}
Structure จะเป็นแบบนี้:
deployments/kapacitor/scripts/
├── temperature/
│ ├── temp_high_alert.tick
│ └── temp_rate_of_change.tick
├── humidity/
│ └── humidity_alert.tick
├── device/
│ ├── device_offline.tick
│ └── dead_mans_switch.tick
└── system/
└── anomaly_detection.tick
Step 2: Temperature Threshold Alert
ทำไม? ห้อง server ถ้าอุณหภูมิเกิน 35°C อาจทำให้ hardware พังได้ครับ เราต้องรู้ก่อน ไม่ใช่รู้ตอนพังแล้ว
สร้างไฟล์ deployments/kapacitor/scripts/temperature/temp_high_alert.tick:
// ─────────────────────────────────────────────────
// TICKscript: Temperature High Alert
// Triggers: WARNING > 30°C, CRITICAL > 35°C
// ─────────────────────────────────────────────────
var db = 'iot_data'
var rp = 'raw_30d'
var measurement = 'sensor_data'
// ─── Thresholds ────────────────────────────────────
var warn_temp = 30.0
var crit_temp = 35.0
// รับ stream จาก InfluxDB
var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.groupBy('device_id', 'location', 'building')
// กรองเฉพาะ temperature readings
|where(lambda: "measurement_type" == 'temperature')
// คำนวณค่าเฉลี่ยใน 1 นาที (ลด noise)
|window()
.period(1m)
.every(30s)
|mean('value')
.as('mean_temp')
// ─── Alert Node ──────────────────────────────────
data
|alert()
.id('temperature/{{ .Tags.building }}/{{ .Tags.location }}/{{ .Tags.device_id }}')
.message(
'[{{ .Level }}] อุณหภูมิผิดปกติ: {{ .Tags.device_id }}\n' +
'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}\n' +
'ค่าเฉลี่ย 1 นาที: {{ index .Fields "mean_temp" | printf "%.1f" }}°C\n' +
'เวลา: {{ .Time.Format "2006-01-02 15:04:05 UTC" }}'
)
// Threshold levels
.warn(lambda: "mean_temp" > warn_temp)
.crit(lambda: "mean_temp" > crit_temp)
// แจ้งเตือนเมื่อ state เปลี่ยนเท่านั้น (ไม่ spam)
.stateChangesOnly(5m)
// บันทึก alert history ลง InfluxDB
.details(
'<b>Temperature Alert</b><br>' +
'Device: {{ .Tags.device_id }}<br>' +
'Mean Temp: {{ index .Fields "mean_temp" | printf "%.1f" }}°C<br>' +
'Threshold: WARN={{ .warn_temp }}°C, CRIT={{ .crit_temp }}°C'
)
// ─── Handlers ──────────────────────────────
// 1. บันทึกลง InfluxDB
.influxDBOut()
.create()
.database('iot_data')
.retentionPolicy('hourly_1y')
.measurement('kapacitor_alerts')
.tag('alert_type', 'temperature')
// 2. Slack
.slack()
.channel('#iot-alerts')
.username('Kapacitor Temperature')
.iconEmoji(':thermometer:')
// 3. Webhook ไปยัง Go API
.httpPost('go-api')
สังเกต .stateChangesOnly(5m) นะครับ — นี่คือ setting สำคัญมาก ถ้าไม่ใส่ Kapacitor จะส่ง alert ซ้ำทุก 30 วินาทีในขณะที่อุณหภูมิยังสูงอยู่ กลายเป็น spam น้องๆ ใน Slack ไปเลย (╯°□°)╯
Webhook Payload ที่ส่งไปยัง Go API
Kapacitor จะส่ง POST request พร้อม JSON body แบบนี้:
{
"id": "temperature/A/floor_1/device_001",
"message": "[CRITICAL] อุณหภูมิผิดปกติ: device_001...",
"time": "2026-03-26T10:30:00Z",
"level": "CRITICAL",
"previousLevel": "WARNING",
"data": {
"series": [{
"name": "sensor_data",
"tags": {
"device_id": "device_001",
"location": "floor_1",
"building": "A"
},
"columns": ["time", "mean_temp"],
"values": [["2026-03-26T10:30:00Z", 36.2]]
}]
}
}
Step 3: Humidity Alert
ทำไม? ความชื้นที่สูงเกินไปทำให้วงจรไฟฟ้าสั้น ความชื้นต่ำเกินไปทำให้เกิด static electricity — ทั้งสองแบบทำลาย hardware ได้ครับ
สร้างไฟล์ deployments/kapacitor/scripts/humidity/humidity_alert.tick:
// ─────────────────────────────────────────────────
// TICKscript: Humidity Alert
// Triggers:
// - WARNING: humidity > 70% หรือ < 30%
// - CRITICAL: humidity > 85% หรือ < 20%
// ─────────────────────────────────────────────────
var db = 'iot_data'
var rp = 'raw_30d'
// Thresholds
var warn_high = 70.0
var crit_high = 85.0
var warn_low = 30.0
var crit_low = 20.0
var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('sensor_data')
.groupBy('device_id', 'location', 'building')
|where(lambda: "measurement_type" == 'humidity')
|window()
.period(2m)
.every(1m)
|mean('value')
.as('mean_humidity')
data
|alert()
.id('humidity/{{ .Tags.building }}/{{ .Tags.location }}/{{ .Tags.device_id }}')
.message(
'[{{ .Level }}] ความชื้นผิดปกติ: {{ .Tags.device_id }}\n' +
'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}\n' +
'ค่าความชื้น: {{ index .Fields "mean_humidity" | printf "%.1f" }}%\n' +
'เวลา: {{ .Time.Format "2006-01-02 15:04:05" }}'
)
.warn(lambda: "mean_humidity" > warn_high OR "mean_humidity" < warn_low)
.crit(lambda: "mean_humidity" > crit_high OR "mean_humidity" < crit_low)
.stateChangesOnly(10m)
.influxDBOut()
.create()
.database('iot_data')
.retentionPolicy('hourly_1y')
.measurement('kapacitor_alerts')
.tag('alert_type', 'humidity')
.slack()
.channel('#iot-alerts')
.iconEmoji(':droplet:')
.httpPost('go-api')
Step 4: Device Offline Detection
ทำไม? บางที device ไม่ได้พัง แค่ battery หมด หรือ WiFi ตัด แต่ถ้าเราไม่รู้ ข้อมูลก็หาย และเราจะไม่รู้ว่าห้องนั้นเป็นยังไง
อุปมา: เหมือนพนักงานเช็คอินตอนเช้าทุกวัน ถ้าวันไหนไม่เช็คอินใน 15 นาที HR จะโทรถาม — device ก็เหมือนกันครับ ถ้าไม่ส่งข้อมูลมาตามรอบ เราต้องรู้
สร้างไฟล์ deployments/kapacitor/scripts/device/device_offline.tick:
// ─────────────────────────────────────────────────
// TICKscript: Device Offline Detection
// ตรวจสอบว่า device ส่งข้อมูลครั้งสุดท้ายเมื่อไหร่
// ถ้าไม่ส่งข้อมูลภายใน 5 นาที = WARNING
// ถ้าไม่ส่งข้อมูลภายใน 15 นาที = CRITICAL
// ─────────────────────────────────────────────────
var db = 'iot_data'
var rp = 'raw_30d'
// ใช้ batch query ตรวจสอบทุก 1 นาที
batch
|query(
'SELECT last("battery_level") AS last_battery, ' +
'last("signal_strength") AS last_signal ' +
'FROM "' + db + '"."' + rp + '"."device_status" ' +
'WHERE time > now() - 20m '
)
.period(1m)
.every(1m)
.groupBy('device_id', 'location', 'building', 1m)
|deadman(0.0, 5m) // ถ้าไม่มีข้อมูลใน 5 นาที = alert
.id('device_offline/{{ .Tags.device_id }}')
.message(
'[{{ .Level }}] Device ออฟไลน์: {{ .Tags.device_id }}\n' +
'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}\n' +
'ไม่พบข้อมูลใน 5 นาทีที่ผ่านมา\n' +
'เวลา: {{ .Time.Format "2006-01-02 15:04:05" }}'
)
.stateChangesOnly()
.influxDBOut()
.create()
.database('iot_data')
.retentionPolicy('hourly_1y')
.measurement('kapacitor_alerts')
.tag('alert_type', 'device_offline')
.slack()
.channel('#iot-alerts')
.iconEmoji(':red_circle:')
.httpPost('go-api')
Step 5: Dead Man’s Switch — ระดับ System-wide
ทำไม? Device Offline ตรวจแค่ “device ตัวนึงหาย” แต่ Dead Man’s Switch ตรวจว่า “ข้อมูลทั้งระบบหายหมดเลย” — อาจแปลว่า Telegraf crash หรือ MQTT Broker ตาย ซึ่งแย่กว่ามากครับ
อุปมา: Dead Man’s Switch เหมือน “heartbeat” ของระบบ ถ้าหัวใจหยุดเต้น 3 นาที ต้องตีระฆังทันที ไม่ต้องรอให้ตายก่อน
___ ___ ___ ___
/ \ / \ / \ /
| | | | <-- ปกติ: pulse ส่งมาเรื่อยๆ
\___/ \___/ \___/
___ ___
/ \ / \
| | | <-- ขาดหาย...
\___/ \___/
ALERT! Dead Man Triggered! (x_x)
สร้างไฟล์ deployments/kapacitor/scripts/device/dead_mans_switch.tick:
// ─────────────────────────────────────────────────
// TICKscript: Dead Man's Switch
// ตรวจสอบว่า Telegraf ยังส่งข้อมูลอยู่หรือไม่
// ถ้าไม่มีข้อมูลเข้ามาเลยใน 3 นาที = CRITICAL
// (นี่คือปัญหาระบบ ไม่ใช่แค่ device เดียว)
// ─────────────────────────────────────────────────
var data = stream
|from()
.database('iot_data')
.retentionPolicy('raw_30d')
.measurement('sensor_data')
// Dead man's switch: ถ้าไม่มีข้อมูลใน 3 นาที
data
|deadman(0.0, 3m)
.id('system/data_pipeline/dead_man')
.message(
'[CRITICAL] Data Pipeline หยุดทำงาน!\n' +
'ไม่มีข้อมูล sensor ใหม่ใน 3 นาทีที่ผ่านมา\n' +
'กรุณาตรวจสอบ Telegraf และ MQTT Broker\n' +
'เวลา: {{ .Time.Format "2006-01-02 15:04:05" }}'
)
.slack()
.channel('#iot-critical')
.iconEmoji(':skull:')
.username('Kapacitor CRITICAL')
// ส่ง email สำหรับ system-level alert
.email('[email protected]')
.from('[email protected]')
.subject('[CRITICAL] IoT Data Pipeline Down')
.httpPost('go-api')
Step 6: Anomaly Detection ด้วย Z-Score
ทำไม? Threshold alert จับได้แค่ค่าเกิน limit ที่ตั้งไว้ แต่ถ้าค่าปกติอยู่ที่ 25°C แต่วันนี้ขึ้นเป็น 32°C ซึ่งยังไม่เกิน 35°C — มันผิดปกติในแง่ “pattern” แต่ threshold alert จะไม่จับได้ครับ
อุปมา Z-Score: ลองนึกถึงคะแนนสอบ ถ้าค่าเฉลี่ยทั้งห้อง 50 คะแนน แล้วคนนึงได้ 90 — นั่นคือ “ผิดปกติมาก” แม้ 90 จะไม่ใช่ค่าที่เป็นปัญหาในตัวมันเอง Z-Score คือการวัดว่า “ผิดปกติแค่ไหนเมื่อเทียบกับประวัติที่ผ่านมา”
สร้างไฟล์ deployments/kapacitor/scripts/system/anomaly_detection.tick:
// ─────────────────────────────────────────────────
// TICKscript: Anomaly Detection (Z-Score)
// ตรวจสอบค่า sensor ที่ผิดปกติด้วยวิธีทางสถิติ
// Z-Score > 3 = ค่าผิดปกติมากกว่า 3 standard deviations
// ─────────────────────────────────────────────────
var db = 'iot_data'
var rp = 'raw_30d'
// คำนวณ rolling mean และ stddev ใน 30 นาที
var window_mean = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('sensor_data')
.groupBy('device_id', 'measurement_type')
|window()
.period(30m)
.every(1m)
|mean('value')
.as('rolling_mean')
var window_std = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('sensor_data')
.groupBy('device_id', 'measurement_type')
|window()
.period(30m)
.every(1m)
|stddev('value')
.as('rolling_stddev')
// Current value
var current = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('sensor_data')
.groupBy('device_id', 'measurement_type', 'location', 'building')
|window()
.period(1m)
.every(1m)
|last('value')
.as('current_value')
// Join ข้อมูลเพื่อคำนวณ Z-Score
var joined = current
|join(window_mean, window_std)
.as('c', 'mean', 'std')
.tolerance(1m)
.fill('none')
// คำนวณ Z-Score: (current - mean) / stddev
|eval(
lambda: (float("c.current_value") - float("mean.rolling_mean")) /
IF(float("std.rolling_stddev") > 0.0, float("std.rolling_stddev"), 1.0)
)
.as('z_score')
.keep('c.current_value', 'mean.rolling_mean', 'std.rolling_stddev')
joined
|alert()
.id('anomaly/{{ .Tags.device_id }}/{{ .Tags.measurement_type }}')
.message(
'[{{ .Level }}] ค่า Sensor ผิดปกติ (Anomaly): {{ .Tags.device_id }}\n' +
'ประเภท: {{ .Tags.measurement_type }}\n' +
'ค่าปัจจุบัน: {{ index .Fields "c.current_value" | printf "%.2f" }}\n' +
'ค่าเฉลี่ย: {{ index .Fields "mean.rolling_mean" | printf "%.2f" }}\n' +
'Z-Score: {{ index .Fields "z_score" | printf "%.2f" }}\n' +
'ตำแหน่ง: {{ .Tags.location }}'
)
.warn(lambda: abs("z_score") > 2.5)
.crit(lambda: abs("z_score") > 3.5)
.stateChangesOnly(15m)
.influxDBOut()
.create()
.database('iot_data')
.retentionPolicy('hourly_1y')
.measurement('kapacitor_alerts')
.tag('alert_type', 'anomaly')
.slack()
.channel('#iot-alerts')
.iconEmoji(':chart_with_upwards_trend:')
.httpPost('go-api')
Step 7: Temperature Rate of Change Alert
ทำไม? อุณหภูมิที่เปลี่ยนเร็วผิดปกติอาจบอกว่ามีไฟไหม้ หรือ cooling system เสีย แม้ค่าปัจจุบันยังไม่เกิน threshold ครับ
สร้างไฟล์ deployments/kapacitor/scripts/temperature/temp_rate_of_change.tick:
// ─────────────────────────────────────────────────
// TICKscript: Temperature Rate of Change
// ตรวจสอบการเปลี่ยนแปลงอุณหภูมิที่รวดเร็วผิดปกติ
// WARNING: เปลี่ยนแปลง > 5°C ใน 5 นาที
// CRITICAL: เปลี่ยนแปลง > 10°C ใน 5 นาที
// ─────────────────────────────────────────────────
var db = 'iot_data'
var rp = 'raw_30d'
var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('sensor_data')
.groupBy('device_id', 'location', 'building')
|where(lambda: "measurement_type" == 'temperature')
|window()
.period(5m)
.every(1m)
// คำนวณ first และ last ใน window เพื่อหา rate of change
var first_val = data
|first('value')
.as('first_temp')
var last_val = data
|last('value')
.as('last_temp')
first_val
|join(last_val)
.as('first', 'last')
.tolerance(30s)
|eval(lambda: abs(float("last.last_temp") - float("first.first_temp")))
.as('temp_change')
|alert()
.id('temp_rate/{{ .Tags.device_id }}')
.message(
'[{{ .Level }}] อุณหภูมิเปลี่ยนแปลงรวดเร็ว: {{ .Tags.device_id }}\n' +
'เปลี่ยนแปลง: {{ index .Fields "temp_change" | printf "%.1f" }}°C ใน 5 นาที\n' +
'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}'
)
.warn(lambda: "temp_change" > 5.0)
.crit(lambda: "temp_change" > 10.0)
.stateChangesOnly(5m)
.influxDBOut()
.create()
.database('iot_data')
.retentionPolicy('hourly_1y')
.measurement('kapacitor_alerts')
.tag('alert_type', 'temp_rate_change')
.slack()
.channel('#iot-alerts')
.iconEmoji(':fire:')
.httpPost('go-api')
Step 8: Go API Webhook Handler
ฝั่ง Go backend ก็ต้องรับ payload ที่ Kapacitor ส่งมาด้วยนะครับ เพิ่ม handler ใน backend/internal/handler/alert_handler.go:
package handler
import (
"encoding/json"
"net/http"
"time"
"log"
)
// KapacitorAlertPayload คือ structure ของ payload จาก Kapacitor
type KapacitorAlertPayload struct {
ID string `json:"id"`
Message string `json:"message"`
Time time.Time `json:"time"`
Level string `json:"level"` // OK, INFO, WARNING, CRITICAL
PreviousLevel string `json:"previousLevel"`
Data map[string]interface{} `json:"data"`
Details string `json:"details"`
}
// HandleKapacitorWebhook รับ alert จาก Kapacitor
func HandleKapacitorWebhook(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var payload KapacitorAlertPayload
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
log.Printf("Error decoding Kapacitor payload: %v", err)
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
log.Printf("[Alert] ID=%s Level=%s -> %s Message=%s",
payload.ID, payload.PreviousLevel, payload.Level, payload.Message)
// บันทึกลง database
// alertService.SaveAlert(r.Context(), &payload)
// ส่งไปยัง WebSocket clients ที่ connected อยู่
// wsHub.Broadcast(payload)
// ส่ง push notification ไปยัง mobile app
// notificationService.Send(r.Context(), &payload)
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "received"})
}
Step 9: Deploy TICKscripts ด้วย Kapacitor CLI
ทำไม define ก่อน enable? เพราะ Kapacitor แยก “ลงทะเบียน task” กับ “เปิดใช้งาน” ออกจากกัน เหมือนการ install app แล้ว launch ทีหลังนั่นแหละครับ
# ─────────────────────────────────────────────────
# ติดตั้ง TICKscript tasks
# ─────────────────────────────────────────────────
KAPACITOR_URL="http://localhost:9092"
# 1. Temperature Alert
kapacitor define temperature_high_alert \
-type stream \
-tick deployments/kapacitor/scripts/temperature/temp_high_alert.tick \
-dbrp iot_data.raw_30d
# 2. Temperature Rate of Change
kapacitor define temp_rate_of_change \
-type stream \
-tick deployments/kapacitor/scripts/temperature/temp_rate_of_change.tick \
-dbrp iot_data.raw_30d
# 3. Humidity Alert
kapacitor define humidity_alert \
-type stream \
-tick deployments/kapacitor/scripts/humidity/humidity_alert.tick \
-dbrp iot_data.raw_30d
# 4. Device Offline
kapacitor define device_offline_detection \
-type batch \
-tick deployments/kapacitor/scripts/device/device_offline.tick \
-dbrp iot_data.raw_30d
# 5. Dead Man's Switch
kapacitor define dead_mans_switch \
-type stream \
-tick deployments/kapacitor/scripts/device/dead_mans_switch.tick \
-dbrp iot_data.raw_30d
# 6. Anomaly Detection
kapacitor define anomaly_detection \
-type stream \
-tick deployments/kapacitor/scripts/system/anomaly_detection.tick \
-dbrp iot_data.raw_30d
# ─────────────────────────────────────────────────
# Enable ทุก tasks
# ─────────────────────────────────────────────────
kapacitor enable temperature_high_alert
kapacitor enable temp_rate_of_change
kapacitor enable humidity_alert
kapacitor enable device_offline_detection
kapacitor enable dead_mans_switch
kapacitor enable anomaly_detection
# ─────────────────────────────────────────────────
# ตรวจสอบสถานะ
# ─────────────────────────────────────────────────
kapacitor list tasks
# ดู task details
kapacitor show temperature_high_alert
Step 10: Alert History Query
ข้อมูล alert ที่ Kapacitor บันทึกลง InfluxDB ดูได้แบบนี้ครับ:
-- ดู alert history ใน InfluxDB
-- ─────────────────────────────────────────────────
-- Alert ทั้งหมดใน 24 ชั่วโมงล่าสุด
SELECT * FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE time > now() - 24h
ORDER BY time DESC
LIMIT 50;
-- จำนวน alert แต่ละประเภท
SELECT COUNT(*) AS alert_count
FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE time > now() - 7d
GROUP BY "alert_type", time(1d)
FILL(0);
-- Critical alerts เฉพาะ
SELECT * FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE "level" = 'CRITICAL'
AND time > now() - 24h
ORDER BY time DESC;
-- Alert สรุปรายอุปกรณ์
SELECT COUNT(*) AS total_alerts
FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE time > now() - 7d
GROUP BY "alert_type", "device_id"
ORDER BY total_alerts DESC;
Step 11: Makefile Commands สำหรับ Kapacitor
เพิ่มใน Makefile ให้ deploy ง่ายๆ ด้วยคำสั่งเดียวครับ:
# ─────────────────────────────────────────────────
# Kapacitor Tasks Management
# ─────────────────────────────────────────────────
KAPACITOR_URL ?= http://localhost:9092
TICK_DIR = deployments/kapacitor/scripts
kapacitor-deploy:
@echo "Deploying TICKscript tasks..."
kapacitor -url $(KAPACITOR_URL) define temperature_high_alert \
-type stream \
-tick $(TICK_DIR)/temperature/temp_high_alert.tick \
-dbrp iot_data.raw_30d
kapacitor -url $(KAPACITOR_URL) define temp_rate_of_change \
-type stream \
-tick $(TICK_DIR)/temperature/temp_rate_of_change.tick \
-dbrp iot_data.raw_30d
kapacitor -url $(KAPACITOR_URL) define humidity_alert \
-type stream \
-tick $(TICK_DIR)/humidity/humidity_alert.tick \
-dbrp iot_data.raw_30d
kapacitor -url $(KAPACITOR_URL) define device_offline_detection \
-type batch \
-tick $(TICK_DIR)/device/device_offline.tick \
-dbrp iot_data.raw_30d
kapacitor -url $(KAPACITOR_URL) define dead_mans_switch \
-type stream \
-tick $(TICK_DIR)/device/dead_mans_switch.tick \
-dbrp iot_data.raw_30d
kapacitor -url $(KAPACITOR_URL) define anomaly_detection \
-type stream \
-tick $(TICK_DIR)/system/anomaly_detection.tick \
-dbrp iot_data.raw_30d
@echo "Enabling all tasks..."
kapacitor -url $(KAPACITOR_URL) enable temperature_high_alert
kapacitor -url $(KAPACITOR_URL) enable temp_rate_of_change
kapacitor -url $(KAPACITOR_URL) enable humidity_alert
kapacitor -url $(KAPACITOR_URL) enable device_offline_detection
kapacitor -url $(KAPACITOR_URL) enable dead_mans_switch
kapacitor -url $(KAPACITOR_URL) enable anomaly_detection
@echo "All tasks deployed and enabled"
kapacitor-list:
kapacitor -url $(KAPACITOR_URL) list tasks
kapacitor-disable-all:
kapacitor -url $(KAPACITOR_URL) disable temperature_high_alert
kapacitor -url $(KAPACITOR_URL) disable temp_rate_of_change
kapacitor -url $(KAPACITOR_URL) disable humidity_alert
kapacitor -url $(KAPACITOR_URL) disable device_offline_detection
kapacitor -url $(KAPACITOR_URL) disable dead_mans_switch
kapacitor -url $(KAPACITOR_URL) disable anomaly_detection
@echo "All tasks disabled"
kapacitor-logs:
docker logs iot-kapacitor -f --tail 50
Step 12: ทดสอบ Alert System
มาลุยกันเลยครับ! ทดสอบด้วยการยิง MQTT message จำลองสถานการณ์ฉุกเฉิน:
ทดสอบ Temperature Alert
# ส่งค่าอุณหภูมิสูงมาก (Critical > 35°C)
docker exec iot-mosquitto mosquitto_pub \
-h localhost \
-t "iot/sensors/device_001/data" \
-u iot_client -P mqtt_secure_pass \
-m '{
"device_id": "device_001",
"location": "floor_1",
"building": "A",
"timestamp": '"$(date +%s)"',
"readings": [
{"type": "temperature", "value": 38.5, "unit": "celsius"},
{"type": "humidity", "value": 45.0, "unit": "percent"}
]
}'
echo "Sent critical temperature alert trigger"
# รอ 30 วินาทีแล้วตรวจสอบ alerts
sleep 30
# ดู alert history
curl -G "http://localhost:8086/query" \
-u admin:admin123 \
--data-urlencode "db=iot_data" \
--data-urlencode "q=SELECT * FROM kapacitor_alerts WHERE time > now() - 5m ORDER BY time DESC"
ทดสอบ Dead Man’s Switch
# หยุด Telegraf ชั่วคราว
docker stop iot-telegraf
echo "Telegraf stopped. Waiting 3 minutes for Dead Man alert..."
# Kapacitor จะ trigger Dead Man's Switch ใน 3 นาที
# ตรวจสอบ Slack channel #iot-critical
# เปิด Telegraf กลับมา
sleep 180
docker start iot-telegraf
echo "Telegraf restarted"
สรุป: เราทำอะไรไปบ้างใน workshop นี้
น้องๆ ผ่านมาได้ไกลมากเลยนะครับ! มาดูว่า Alerting ครบแค่ไหน:
| TICKscript | เงื่อนไข | Handler |
|---|---|---|
temp_high_alert | > 30°C WARN, > 35°C CRIT | Slack + Webhook + InfluxDB |
temp_rate_of_change | เปลี่ยน > 5°C/5min WARN | Slack + Webhook |
humidity_alert | > 70% หรือ < 30% WARN | Slack + Webhook + InfluxDB |
device_offline | ไม่มีข้อมูลใน 5 นาที | Slack + Webhook + InfluxDB |
dead_mans_switch | ไม่มีข้อมูลใน 3 นาที | Slack + Email + Webhook |
anomaly_detection | Z-Score > 2.5 WARN | Slack + Webhook + InfluxDB |
ตอนนี้ระบบเรามี “ยามอัตโนมัติ” แล้วนะครับ ไม่ต้องนั่งจ้องหน้าจอตลอดเวลา Kapacitor จะตาม rules ที่เราเขียนและส่ง alert มาถ้าเกิดอะไรผิดปกติ
TICK Stack Workshop เสร็จสมบูรณ์แล้วครับ!
เราได้ผ่านการ setup ครบทุกส่วนของ TICK Stack:
- [Workshop #10] TICK Stack Setup: Docker Compose, InfluxDB, Chronograf
- [Workshop #11] Telegraf Pipeline: MQTT Consumer, JSON Parsing, Processors
- [Workshop #12] Kapacitor Alerting: TICKscripts, Alert Handlers, Monitoring
Next Step
Workshop นี้จบ TICK Stack แล้วครับ ขั้นต่อไปเราจะไปทำ LynxJS Mobile App เพื่อให้ดู alert และ dashboard บน smartphone ได้เลย — เพราะ real-time monitoring บน mobile มันคูลกว่าแน่นอน (^_^)v
Navigation
- ก่อนหน้า: IoT Workshop #11: Telegraf Data Pipeline
- ถัดไป: IoT Workshop #13: LynxJS Mobile App Setup
- แผนการ Workshop ทั้งหมด: IoT Workshop Master Plan