Kapacitor Alerting: ให้ระบบ IoT แจ้งเตือนเองเลย

Kapacitor Alerting: ให้ระบบ IoT แจ้งเตือนเองเลย

Showkhun · Workshop ·

Kapacitor Alerting: ให้ระบบ IoT แจ้งเตือนเองเลย

Branch: workshop/dev-09-alerting Phase: Development (9/9) Repo: kangana1024/iot-workshop


เคยเจอไหมครับ? ห้อง server ร้อนไป 40 องศา แต่รู้ตอนที่ hardware พังไปแล้ว อยากให้ระบบมันส่ง LINE มาบอกก่อนได้ไหม? วันนี้เราจะมาทำแบบนั้นกันครับ — ให้ Kapacitor เป็นยามเฝ้าระวังให้น้องๆ ตลอด 24 ชั่วโมง (ʘ‿ʘ)


ก่อนเริ่ม: น้องๆ จะได้อะไรจาก workshop นี้?

หลังจากผ่าน workshop นี้ไป น้องๆ จะ:

  • เขียน TICKscript ได้ตั้งแต่บรรทัดแรก
  • ตั้ง alert สำหรับ อุณหภูมิ/ความชื้น แบบ threshold
  • จับได้ว่า device ออฟไลน์ หายไปไหน
  • ตรวจจับค่าผิดปกติด้วย Z-Score (Anomaly Detection)
  • ทำ Dead Man’s Switch — ถ้าระบบเงียบผิดปกติ ให้ตีระฆังทันที
  • ส่ง alert ไปได้ทั้ง Slack, Email, และ Webhook ไปยัง Go API

ทำไมต้อง Kapacitor? (WHY ก่อน HOW เสมอ)

ลองนึกภาพว่าน้องๆ เป็น “ยาม” โรงงาน ถ้ามีเซ็นเซอร์อุณหภูมิ 500 ตัว แล้วน้องต้องนั่งดูหน้าจอตลอดเวลา — มันไม่ไหวใช่ไหมครับ?

Kapacitor ก็คือ “ยามอัตโนมัติ” ครับ มันนั่งดูข้อมูลแทนเรา พอเจอสิ่งผิดปกติก็ตาม rule ที่เราเขียนไว้ แล้วส่ง alert ออกมาเอง

อุปมาง่ายๆ: Kapacitor เหมือน Google Form with Notification แต่สำหรับ sensor data — เราตั้ง condition ไว้ พอมีคนกรอกแบบแผนออกนอกขอบเขต ก็ได้รับ email ทันที


ภาพรวม: Kapacitor อยู่ตรงไหนในระบบ?

Mermaid Diagram

Kapacitor รับ stream จาก InfluxDB แบบ real-time แล้วรัน TICKscript ตลอดเวลา — เหมือน while(true) loop ที่ฉลาดมากครับ


TICKscript Basics: ภาษาของ Kapacitor

ก่อนลงรายละเอียด เราต้องรู้จัก syntax ก่อนนะครับ TICKscript ใช้ pipe operator (|) เพื่อเชื่อม nodes เข้าหากัน เหมือน Unix pipe เลยครับ:

stream
    |from()...
    |window()...
    |mean('value')
    |alert()
        .crit(lambda: "mean" > 35.0)
        .slack()

Node Types หลักที่ใช้บ่อย:

Nodeหน้าที่
streamรับข้อมูล real-time จาก InfluxDB subscription
batchรับข้อมูลเป็น batch ตาม schedule
from()เลือก measurement และ tags
where()filter ข้อมูล
window()กำหนด time window
mean(), max(), min()aggregate functions
alert()กำหนด alert conditions
eval()คำนวณค่าใหม่

อุปมา: stream |from() |where() |alert() เหมือน SQL SELECT FROM WHERE แต่ทำงานกับ real-time data แทน static table ครับ


Step 1: เตรียม Directory Structure

มาสร้าง folder สำหรับเก็บ TICKscripts กันก่อนเลยครับ:

mkdir -p deployments/kapacitor/scripts/{temperature,humidity,device,system}

Structure จะเป็นแบบนี้:

deployments/kapacitor/scripts/
├── temperature/
│   ├── temp_high_alert.tick
│   └── temp_rate_of_change.tick
├── humidity/
│   └── humidity_alert.tick
├── device/
│   ├── device_offline.tick
│   └── dead_mans_switch.tick
└── system/
    └── anomaly_detection.tick

Step 2: Temperature Threshold Alert

ทำไม? ห้อง server ถ้าอุณหภูมิเกิน 35°C อาจทำให้ hardware พังได้ครับ เราต้องรู้ก่อน ไม่ใช่รู้ตอนพังแล้ว

สร้างไฟล์ deployments/kapacitor/scripts/temperature/temp_high_alert.tick:

// ─────────────────────────────────────────────────
// TICKscript: Temperature High Alert
// Triggers: WARNING > 30°C, CRITICAL > 35°C
// ─────────────────────────────────────────────────

var db = 'iot_data'
var rp = 'raw_30d'
var measurement = 'sensor_data'

// ─── Thresholds ────────────────────────────────────
var warn_temp  = 30.0
var crit_temp  = 35.0

// รับ stream จาก InfluxDB
var data = stream
    |from()
        .database(db)
        .retentionPolicy(rp)
        .measurement(measurement)
        .groupBy('device_id', 'location', 'building')
    // กรองเฉพาะ temperature readings
    |where(lambda: "measurement_type" == 'temperature')
    // คำนวณค่าเฉลี่ยใน 1 นาที (ลด noise)
    |window()
        .period(1m)
        .every(30s)
    |mean('value')
        .as('mean_temp')

// ─── Alert Node ──────────────────────────────────
data
    |alert()
        .id('temperature/{{ .Tags.building }}/{{ .Tags.location }}/{{ .Tags.device_id }}')
        .message(
            '[{{ .Level }}] อุณหภูมิผิดปกติ: {{ .Tags.device_id }}\n' +
            'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}\n' +
            'ค่าเฉลี่ย 1 นาที: {{ index .Fields "mean_temp" | printf "%.1f" }}°C\n' +
            'เวลา: {{ .Time.Format "2006-01-02 15:04:05 UTC" }}'
        )
        // Threshold levels
        .warn(lambda: "mean_temp" > warn_temp)
        .crit(lambda: "mean_temp" > crit_temp)
        // แจ้งเตือนเมื่อ state เปลี่ยนเท่านั้น (ไม่ spam)
        .stateChangesOnly(5m)
        // บันทึก alert history ลง InfluxDB
        .details(
            '<b>Temperature Alert</b><br>' +
            'Device: {{ .Tags.device_id }}<br>' +
            'Mean Temp: {{ index .Fields "mean_temp" | printf "%.1f" }}°C<br>' +
            'Threshold: WARN={{ .warn_temp }}°C, CRIT={{ .crit_temp }}°C'
        )
        // ─── Handlers ──────────────────────────────
        // 1. บันทึกลง InfluxDB
        .influxDBOut()
            .create()
            .database('iot_data')
            .retentionPolicy('hourly_1y')
            .measurement('kapacitor_alerts')
            .tag('alert_type', 'temperature')
        // 2. Slack
        .slack()
            .channel('#iot-alerts')
            .username('Kapacitor Temperature')
            .iconEmoji(':thermometer:')
        // 3. Webhook ไปยัง Go API
        .httpPost('go-api')

สังเกต .stateChangesOnly(5m) นะครับ — นี่คือ setting สำคัญมาก ถ้าไม่ใส่ Kapacitor จะส่ง alert ซ้ำทุก 30 วินาทีในขณะที่อุณหภูมิยังสูงอยู่ กลายเป็น spam น้องๆ ใน Slack ไปเลย (╯°□°)╯

Webhook Payload ที่ส่งไปยัง Go API

Kapacitor จะส่ง POST request พร้อม JSON body แบบนี้:

{
  "id": "temperature/A/floor_1/device_001",
  "message": "[CRITICAL] อุณหภูมิผิดปกติ: device_001...",
  "time": "2026-03-26T10:30:00Z",
  "level": "CRITICAL",
  "previousLevel": "WARNING",
  "data": {
    "series": [{
      "name": "sensor_data",
      "tags": {
        "device_id": "device_001",
        "location": "floor_1",
        "building": "A"
      },
      "columns": ["time", "mean_temp"],
      "values": [["2026-03-26T10:30:00Z", 36.2]]
    }]
  }
}

Step 3: Humidity Alert

ทำไม? ความชื้นที่สูงเกินไปทำให้วงจรไฟฟ้าสั้น ความชื้นต่ำเกินไปทำให้เกิด static electricity — ทั้งสองแบบทำลาย hardware ได้ครับ

สร้างไฟล์ deployments/kapacitor/scripts/humidity/humidity_alert.tick:

// ─────────────────────────────────────────────────
// TICKscript: Humidity Alert
// Triggers:
//   - WARNING: humidity > 70% หรือ < 30%
//   - CRITICAL: humidity > 85% หรือ < 20%
// ─────────────────────────────────────────────────

var db = 'iot_data'
var rp = 'raw_30d'

// Thresholds
var warn_high  = 70.0
var crit_high  = 85.0
var warn_low   = 30.0
var crit_low   = 20.0

var data = stream
    |from()
        .database(db)
        .retentionPolicy(rp)
        .measurement('sensor_data')
        .groupBy('device_id', 'location', 'building')
    |where(lambda: "measurement_type" == 'humidity')
    |window()
        .period(2m)
        .every(1m)
    |mean('value')
        .as('mean_humidity')

data
    |alert()
        .id('humidity/{{ .Tags.building }}/{{ .Tags.location }}/{{ .Tags.device_id }}')
        .message(
            '[{{ .Level }}] ความชื้นผิดปกติ: {{ .Tags.device_id }}\n' +
            'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}\n' +
            'ค่าความชื้น: {{ index .Fields "mean_humidity" | printf "%.1f" }}%\n' +
            'เวลา: {{ .Time.Format "2006-01-02 15:04:05" }}'
        )
        .warn(lambda: "mean_humidity" > warn_high OR "mean_humidity" < warn_low)
        .crit(lambda: "mean_humidity" > crit_high OR "mean_humidity" < crit_low)
        .stateChangesOnly(10m)
        .influxDBOut()
            .create()
            .database('iot_data')
            .retentionPolicy('hourly_1y')
            .measurement('kapacitor_alerts')
            .tag('alert_type', 'humidity')
        .slack()
            .channel('#iot-alerts')
            .iconEmoji(':droplet:')
        .httpPost('go-api')

Step 4: Device Offline Detection

ทำไม? บางที device ไม่ได้พัง แค่ battery หมด หรือ WiFi ตัด แต่ถ้าเราไม่รู้ ข้อมูลก็หาย และเราจะไม่รู้ว่าห้องนั้นเป็นยังไง

อุปมา: เหมือนพนักงานเช็คอินตอนเช้าทุกวัน ถ้าวันไหนไม่เช็คอินใน 15 นาที HR จะโทรถาม — device ก็เหมือนกันครับ ถ้าไม่ส่งข้อมูลมาตามรอบ เราต้องรู้

สร้างไฟล์ deployments/kapacitor/scripts/device/device_offline.tick:

// ─────────────────────────────────────────────────
// TICKscript: Device Offline Detection
// ตรวจสอบว่า device ส่งข้อมูลครั้งสุดท้ายเมื่อไหร่
// ถ้าไม่ส่งข้อมูลภายใน 5 นาที = WARNING
// ถ้าไม่ส่งข้อมูลภายใน 15 นาที = CRITICAL
// ─────────────────────────────────────────────────

var db = 'iot_data'
var rp = 'raw_30d'

// ใช้ batch query ตรวจสอบทุก 1 นาที
batch
    |query(
        'SELECT last("battery_level") AS last_battery, ' +
        'last("signal_strength") AS last_signal ' +
        'FROM "' + db + '"."' + rp + '"."device_status" ' +
        'WHERE time > now() - 20m '
    )
        .period(1m)
        .every(1m)
        .groupBy('device_id', 'location', 'building', 1m)
    |deadman(0.0, 5m)  // ถ้าไม่มีข้อมูลใน 5 นาที = alert
        .id('device_offline/{{ .Tags.device_id }}')
        .message(
            '[{{ .Level }}] Device ออฟไลน์: {{ .Tags.device_id }}\n' +
            'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}\n' +
            'ไม่พบข้อมูลใน 5 นาทีที่ผ่านมา\n' +
            'เวลา: {{ .Time.Format "2006-01-02 15:04:05" }}'
        )
        .stateChangesOnly()
        .influxDBOut()
            .create()
            .database('iot_data')
            .retentionPolicy('hourly_1y')
            .measurement('kapacitor_alerts')
            .tag('alert_type', 'device_offline')
        .slack()
            .channel('#iot-alerts')
            .iconEmoji(':red_circle:')
        .httpPost('go-api')

Step 5: Dead Man’s Switch — ระดับ System-wide

ทำไม? Device Offline ตรวจแค่ “device ตัวนึงหาย” แต่ Dead Man’s Switch ตรวจว่า “ข้อมูลทั้งระบบหายหมดเลย” — อาจแปลว่า Telegraf crash หรือ MQTT Broker ตาย ซึ่งแย่กว่ามากครับ

อุปมา: Dead Man’s Switch เหมือน “heartbeat” ของระบบ ถ้าหัวใจหยุดเต้น 3 นาที ต้องตีระฆังทันที ไม่ต้องรอให้ตายก่อน

  ___   ___   ___   ___
 /   \ /   \ /   \ /
|     |     |     |     <-- ปกติ: pulse ส่งมาเรื่อยๆ
 \___/ \___/ \___/

  ___   ___
 /   \ /   \
|     |     |           <-- ขาดหาย...
 \___/ \___/

                         ALERT! Dead Man Triggered! (x_x)

สร้างไฟล์ deployments/kapacitor/scripts/device/dead_mans_switch.tick:

// ─────────────────────────────────────────────────
// TICKscript: Dead Man's Switch
// ตรวจสอบว่า Telegraf ยังส่งข้อมูลอยู่หรือไม่
// ถ้าไม่มีข้อมูลเข้ามาเลยใน 3 นาที = CRITICAL
// (นี่คือปัญหาระบบ ไม่ใช่แค่ device เดียว)
// ─────────────────────────────────────────────────

var data = stream
    |from()
        .database('iot_data')
        .retentionPolicy('raw_30d')
        .measurement('sensor_data')

// Dead man's switch: ถ้าไม่มีข้อมูลใน 3 นาที
data
    |deadman(0.0, 3m)
        .id('system/data_pipeline/dead_man')
        .message(
            '[CRITICAL] Data Pipeline หยุดทำงาน!\n' +
            'ไม่มีข้อมูล sensor ใหม่ใน 3 นาทีที่ผ่านมา\n' +
            'กรุณาตรวจสอบ Telegraf และ MQTT Broker\n' +
            'เวลา: {{ .Time.Format "2006-01-02 15:04:05" }}'
        )
        .slack()
            .channel('#iot-critical')
            .iconEmoji(':skull:')
            .username('Kapacitor CRITICAL')
        // ส่ง email สำหรับ system-level alert
        .email('[email protected]')
            .from('[email protected]')
            .subject('[CRITICAL] IoT Data Pipeline Down')
        .httpPost('go-api')

Step 6: Anomaly Detection ด้วย Z-Score

ทำไม? Threshold alert จับได้แค่ค่าเกิน limit ที่ตั้งไว้ แต่ถ้าค่าปกติอยู่ที่ 25°C แต่วันนี้ขึ้นเป็น 32°C ซึ่งยังไม่เกิน 35°C — มันผิดปกติในแง่ “pattern” แต่ threshold alert จะไม่จับได้ครับ

อุปมา Z-Score: ลองนึกถึงคะแนนสอบ ถ้าค่าเฉลี่ยทั้งห้อง 50 คะแนน แล้วคนนึงได้ 90 — นั่นคือ “ผิดปกติมาก” แม้ 90 จะไม่ใช่ค่าที่เป็นปัญหาในตัวมันเอง Z-Score คือการวัดว่า “ผิดปกติแค่ไหนเมื่อเทียบกับประวัติที่ผ่านมา”

สร้างไฟล์ deployments/kapacitor/scripts/system/anomaly_detection.tick:

// ─────────────────────────────────────────────────
// TICKscript: Anomaly Detection (Z-Score)
// ตรวจสอบค่า sensor ที่ผิดปกติด้วยวิธีทางสถิติ
// Z-Score > 3 = ค่าผิดปกติมากกว่า 3 standard deviations
// ─────────────────────────────────────────────────

var db = 'iot_data'
var rp = 'raw_30d'

// คำนวณ rolling mean และ stddev ใน 30 นาที
var window_mean = stream
    |from()
        .database(db)
        .retentionPolicy(rp)
        .measurement('sensor_data')
        .groupBy('device_id', 'measurement_type')
    |window()
        .period(30m)
        .every(1m)
    |mean('value')
        .as('rolling_mean')

var window_std = stream
    |from()
        .database(db)
        .retentionPolicy(rp)
        .measurement('sensor_data')
        .groupBy('device_id', 'measurement_type')
    |window()
        .period(30m)
        .every(1m)
    |stddev('value')
        .as('rolling_stddev')

// Current value
var current = stream
    |from()
        .database(db)
        .retentionPolicy(rp)
        .measurement('sensor_data')
        .groupBy('device_id', 'measurement_type', 'location', 'building')
    |window()
        .period(1m)
        .every(1m)
    |last('value')
        .as('current_value')

// Join ข้อมูลเพื่อคำนวณ Z-Score
var joined = current
    |join(window_mean, window_std)
        .as('c', 'mean', 'std')
        .tolerance(1m)
        .fill('none')
    // คำนวณ Z-Score: (current - mean) / stddev
    |eval(
        lambda: (float("c.current_value") - float("mean.rolling_mean")) /
                IF(float("std.rolling_stddev") > 0.0, float("std.rolling_stddev"), 1.0)
    )
        .as('z_score')
        .keep('c.current_value', 'mean.rolling_mean', 'std.rolling_stddev')

joined
    |alert()
        .id('anomaly/{{ .Tags.device_id }}/{{ .Tags.measurement_type }}')
        .message(
            '[{{ .Level }}] ค่า Sensor ผิดปกติ (Anomaly): {{ .Tags.device_id }}\n' +
            'ประเภท: {{ .Tags.measurement_type }}\n' +
            'ค่าปัจจุบัน: {{ index .Fields "c.current_value" | printf "%.2f" }}\n' +
            'ค่าเฉลี่ย: {{ index .Fields "mean.rolling_mean" | printf "%.2f" }}\n' +
            'Z-Score: {{ index .Fields "z_score" | printf "%.2f" }}\n' +
            'ตำแหน่ง: {{ .Tags.location }}'
        )
        .warn(lambda: abs("z_score") > 2.5)
        .crit(lambda: abs("z_score") > 3.5)
        .stateChangesOnly(15m)
        .influxDBOut()
            .create()
            .database('iot_data')
            .retentionPolicy('hourly_1y')
            .measurement('kapacitor_alerts')
            .tag('alert_type', 'anomaly')
        .slack()
            .channel('#iot-alerts')
            .iconEmoji(':chart_with_upwards_trend:')
        .httpPost('go-api')

Step 7: Temperature Rate of Change Alert

ทำไม? อุณหภูมิที่เปลี่ยนเร็วผิดปกติอาจบอกว่ามีไฟไหม้ หรือ cooling system เสีย แม้ค่าปัจจุบันยังไม่เกิน threshold ครับ

สร้างไฟล์ deployments/kapacitor/scripts/temperature/temp_rate_of_change.tick:

// ─────────────────────────────────────────────────
// TICKscript: Temperature Rate of Change
// ตรวจสอบการเปลี่ยนแปลงอุณหภูมิที่รวดเร็วผิดปกติ
// WARNING: เปลี่ยนแปลง > 5°C ใน 5 นาที
// CRITICAL: เปลี่ยนแปลง > 10°C ใน 5 นาที
// ─────────────────────────────────────────────────

var db = 'iot_data'
var rp = 'raw_30d'

var data = stream
    |from()
        .database(db)
        .retentionPolicy(rp)
        .measurement('sensor_data')
        .groupBy('device_id', 'location', 'building')
    |where(lambda: "measurement_type" == 'temperature')
    |window()
        .period(5m)
        .every(1m)

// คำนวณ first และ last ใน window เพื่อหา rate of change
var first_val = data
    |first('value')
        .as('first_temp')

var last_val = data
    |last('value')
        .as('last_temp')

first_val
    |join(last_val)
        .as('first', 'last')
        .tolerance(30s)
    |eval(lambda: abs(float("last.last_temp") - float("first.first_temp")))
        .as('temp_change')
    |alert()
        .id('temp_rate/{{ .Tags.device_id }}')
        .message(
            '[{{ .Level }}] อุณหภูมิเปลี่ยนแปลงรวดเร็ว: {{ .Tags.device_id }}\n' +
            'เปลี่ยนแปลง: {{ index .Fields "temp_change" | printf "%.1f" }}°C ใน 5 นาที\n' +
            'ตำแหน่ง: {{ .Tags.building }} - {{ .Tags.location }}'
        )
        .warn(lambda: "temp_change" > 5.0)
        .crit(lambda: "temp_change" > 10.0)
        .stateChangesOnly(5m)
        .influxDBOut()
            .create()
            .database('iot_data')
            .retentionPolicy('hourly_1y')
            .measurement('kapacitor_alerts')
            .tag('alert_type', 'temp_rate_change')
        .slack()
            .channel('#iot-alerts')
            .iconEmoji(':fire:')
        .httpPost('go-api')

Step 8: Go API Webhook Handler

ฝั่ง Go backend ก็ต้องรับ payload ที่ Kapacitor ส่งมาด้วยนะครับ เพิ่ม handler ใน backend/internal/handler/alert_handler.go:

package handler

import (
    "encoding/json"
    "net/http"
    "time"
    "log"
)

// KapacitorAlertPayload คือ structure ของ payload จาก Kapacitor
type KapacitorAlertPayload struct {
    ID            string                 `json:"id"`
    Message       string                 `json:"message"`
    Time          time.Time              `json:"time"`
    Level         string                 `json:"level"`        // OK, INFO, WARNING, CRITICAL
    PreviousLevel string                 `json:"previousLevel"`
    Data          map[string]interface{} `json:"data"`
    Details       string                 `json:"details"`
}

// HandleKapacitorWebhook รับ alert จาก Kapacitor
func HandleKapacitorWebhook(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

    var payload KapacitorAlertPayload
    if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
        log.Printf("Error decoding Kapacitor payload: %v", err)
        http.Error(w, "Bad request", http.StatusBadRequest)
        return
    }

    log.Printf("[Alert] ID=%s Level=%s -> %s Message=%s",
        payload.ID, payload.PreviousLevel, payload.Level, payload.Message)

    // บันทึกลง database
    // alertService.SaveAlert(r.Context(), &payload)

    // ส่งไปยัง WebSocket clients ที่ connected อยู่
    // wsHub.Broadcast(payload)

    // ส่ง push notification ไปยัง mobile app
    // notificationService.Send(r.Context(), &payload)

    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{"status": "received"})
}

Step 9: Deploy TICKscripts ด้วย Kapacitor CLI

ทำไม define ก่อน enable? เพราะ Kapacitor แยก “ลงทะเบียน task” กับ “เปิดใช้งาน” ออกจากกัน เหมือนการ install app แล้ว launch ทีหลังนั่นแหละครับ

# ─────────────────────────────────────────────────
# ติดตั้ง TICKscript tasks
# ─────────────────────────────────────────────────

KAPACITOR_URL="http://localhost:9092"

# 1. Temperature Alert
kapacitor define temperature_high_alert \
  -type stream \
  -tick deployments/kapacitor/scripts/temperature/temp_high_alert.tick \
  -dbrp iot_data.raw_30d

# 2. Temperature Rate of Change
kapacitor define temp_rate_of_change \
  -type stream \
  -tick deployments/kapacitor/scripts/temperature/temp_rate_of_change.tick \
  -dbrp iot_data.raw_30d

# 3. Humidity Alert
kapacitor define humidity_alert \
  -type stream \
  -tick deployments/kapacitor/scripts/humidity/humidity_alert.tick \
  -dbrp iot_data.raw_30d

# 4. Device Offline
kapacitor define device_offline_detection \
  -type batch \
  -tick deployments/kapacitor/scripts/device/device_offline.tick \
  -dbrp iot_data.raw_30d

# 5. Dead Man's Switch
kapacitor define dead_mans_switch \
  -type stream \
  -tick deployments/kapacitor/scripts/device/dead_mans_switch.tick \
  -dbrp iot_data.raw_30d

# 6. Anomaly Detection
kapacitor define anomaly_detection \
  -type stream \
  -tick deployments/kapacitor/scripts/system/anomaly_detection.tick \
  -dbrp iot_data.raw_30d

# ─────────────────────────────────────────────────
# Enable ทุก tasks
# ─────────────────────────────────────────────────
kapacitor enable temperature_high_alert
kapacitor enable temp_rate_of_change
kapacitor enable humidity_alert
kapacitor enable device_offline_detection
kapacitor enable dead_mans_switch
kapacitor enable anomaly_detection

# ─────────────────────────────────────────────────
# ตรวจสอบสถานะ
# ─────────────────────────────────────────────────
kapacitor list tasks

# ดู task details
kapacitor show temperature_high_alert

Step 10: Alert History Query

ข้อมูล alert ที่ Kapacitor บันทึกลง InfluxDB ดูได้แบบนี้ครับ:

-- ดู alert history ใน InfluxDB
-- ─────────────────────────────────────────────────

-- Alert ทั้งหมดใน 24 ชั่วโมงล่าสุด
SELECT * FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE time > now() - 24h
ORDER BY time DESC
LIMIT 50;

-- จำนวน alert แต่ละประเภท
SELECT COUNT(*) AS alert_count
FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE time > now() - 7d
GROUP BY "alert_type", time(1d)
FILL(0);

-- Critical alerts เฉพาะ
SELECT * FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE "level" = 'CRITICAL'
  AND time > now() - 24h
ORDER BY time DESC;

-- Alert สรุปรายอุปกรณ์
SELECT COUNT(*) AS total_alerts
FROM "iot_data"."hourly_1y"."kapacitor_alerts"
WHERE time > now() - 7d
GROUP BY "alert_type", "device_id"
ORDER BY total_alerts DESC;

Step 11: Makefile Commands สำหรับ Kapacitor

เพิ่มใน Makefile ให้ deploy ง่ายๆ ด้วยคำสั่งเดียวครับ:

# ─────────────────────────────────────────────────
# Kapacitor Tasks Management
# ─────────────────────────────────────────────────

KAPACITOR_URL ?= http://localhost:9092
TICK_DIR = deployments/kapacitor/scripts

kapacitor-deploy:
	@echo "Deploying TICKscript tasks..."
	kapacitor -url $(KAPACITOR_URL) define temperature_high_alert \
		-type stream \
		-tick $(TICK_DIR)/temperature/temp_high_alert.tick \
		-dbrp iot_data.raw_30d
	kapacitor -url $(KAPACITOR_URL) define temp_rate_of_change \
		-type stream \
		-tick $(TICK_DIR)/temperature/temp_rate_of_change.tick \
		-dbrp iot_data.raw_30d
	kapacitor -url $(KAPACITOR_URL) define humidity_alert \
		-type stream \
		-tick $(TICK_DIR)/humidity/humidity_alert.tick \
		-dbrp iot_data.raw_30d
	kapacitor -url $(KAPACITOR_URL) define device_offline_detection \
		-type batch \
		-tick $(TICK_DIR)/device/device_offline.tick \
		-dbrp iot_data.raw_30d
	kapacitor -url $(KAPACITOR_URL) define dead_mans_switch \
		-type stream \
		-tick $(TICK_DIR)/device/dead_mans_switch.tick \
		-dbrp iot_data.raw_30d
	kapacitor -url $(KAPACITOR_URL) define anomaly_detection \
		-type stream \
		-tick $(TICK_DIR)/system/anomaly_detection.tick \
		-dbrp iot_data.raw_30d
	@echo "Enabling all tasks..."
	kapacitor -url $(KAPACITOR_URL) enable temperature_high_alert
	kapacitor -url $(KAPACITOR_URL) enable temp_rate_of_change
	kapacitor -url $(KAPACITOR_URL) enable humidity_alert
	kapacitor -url $(KAPACITOR_URL) enable device_offline_detection
	kapacitor -url $(KAPACITOR_URL) enable dead_mans_switch
	kapacitor -url $(KAPACITOR_URL) enable anomaly_detection
	@echo "All tasks deployed and enabled"

kapacitor-list:
	kapacitor -url $(KAPACITOR_URL) list tasks

kapacitor-disable-all:
	kapacitor -url $(KAPACITOR_URL) disable temperature_high_alert
	kapacitor -url $(KAPACITOR_URL) disable temp_rate_of_change
	kapacitor -url $(KAPACITOR_URL) disable humidity_alert
	kapacitor -url $(KAPACITOR_URL) disable device_offline_detection
	kapacitor -url $(KAPACITOR_URL) disable dead_mans_switch
	kapacitor -url $(KAPACITOR_URL) disable anomaly_detection
	@echo "All tasks disabled"

kapacitor-logs:
	docker logs iot-kapacitor -f --tail 50

Step 12: ทดสอบ Alert System

มาลุยกันเลยครับ! ทดสอบด้วยการยิง MQTT message จำลองสถานการณ์ฉุกเฉิน:

ทดสอบ Temperature Alert

# ส่งค่าอุณหภูมิสูงมาก (Critical > 35°C)
docker exec iot-mosquitto mosquitto_pub \
  -h localhost \
  -t "iot/sensors/device_001/data" \
  -u iot_client -P mqtt_secure_pass \
  -m '{
    "device_id": "device_001",
    "location": "floor_1",
    "building": "A",
    "timestamp": '"$(date +%s)"',
    "readings": [
      {"type": "temperature", "value": 38.5, "unit": "celsius"},
      {"type": "humidity", "value": 45.0, "unit": "percent"}
    ]
  }'

echo "Sent critical temperature alert trigger"

# รอ 30 วินาทีแล้วตรวจสอบ alerts
sleep 30

# ดู alert history
curl -G "http://localhost:8086/query" \
  -u admin:admin123 \
  --data-urlencode "db=iot_data" \
  --data-urlencode "q=SELECT * FROM kapacitor_alerts WHERE time > now() - 5m ORDER BY time DESC"

ทดสอบ Dead Man’s Switch

# หยุด Telegraf ชั่วคราว
docker stop iot-telegraf

echo "Telegraf stopped. Waiting 3 minutes for Dead Man alert..."
# Kapacitor จะ trigger Dead Man's Switch ใน 3 นาที
# ตรวจสอบ Slack channel #iot-critical

# เปิด Telegraf กลับมา
sleep 180
docker start iot-telegraf
echo "Telegraf restarted"

สรุป: เราทำอะไรไปบ้างใน workshop นี้

น้องๆ ผ่านมาได้ไกลมากเลยนะครับ! มาดูว่า Alerting ครบแค่ไหน:

TICKscriptเงื่อนไขHandler
temp_high_alert> 30°C WARN, > 35°C CRITSlack + Webhook + InfluxDB
temp_rate_of_changeเปลี่ยน > 5°C/5min WARNSlack + Webhook
humidity_alert> 70% หรือ < 30% WARNSlack + Webhook + InfluxDB
device_offlineไม่มีข้อมูลใน 5 นาทีSlack + Webhook + InfluxDB
dead_mans_switchไม่มีข้อมูลใน 3 นาทีSlack + Email + Webhook
anomaly_detectionZ-Score > 2.5 WARNSlack + Webhook + InfluxDB

ตอนนี้ระบบเรามี “ยามอัตโนมัติ” แล้วนะครับ ไม่ต้องนั่งจ้องหน้าจอตลอดเวลา Kapacitor จะตาม rules ที่เราเขียนและส่ง alert มาถ้าเกิดอะไรผิดปกติ

TICK Stack Workshop เสร็จสมบูรณ์แล้วครับ!

เราได้ผ่านการ setup ครบทุกส่วนของ TICK Stack:

  1. [Workshop #10] TICK Stack Setup: Docker Compose, InfluxDB, Chronograf
  2. [Workshop #11] Telegraf Pipeline: MQTT Consumer, JSON Parsing, Processors
  3. [Workshop #12] Kapacitor Alerting: TICKscripts, Alert Handlers, Monitoring

Next Step

Workshop นี้จบ TICK Stack แล้วครับ ขั้นต่อไปเราจะไปทำ LynxJS Mobile App เพื่อให้ดู alert และ dashboard บน smartphone ได้เลย — เพราะ real-time monitoring บน mobile มันคูลกว่าแน่นอน (^_^)v