Telegraf Pipeline: สายพานข้อมูล IoT

Telegraf Pipeline: สายพานข้อมูล IoT

Showkhun · Workshop ·

Telegraf Pipeline: สายพานข้อมูล IoT

สวัสดีน้องๆ! พี่โชว์มาแล้ว วันนี้เราจะมาประกอบ “สายพานส่งข้อมูล” ที่ทำให้ข้อมูลจากเซนเซอร์ในออฟฟิศไหลไปถึง InfluxDB โดยอัตโนมัติ ไม่ต้องเขียน code เชื่อมเองสักบรรทัดเดียว มาลุยกัน! (ง •̀_•́)ง


ก่อนลุย: ทำไมเราถึงต้องการ Telegraf?

ลองนึกภาพนะ น้องๆ เปิดร้านกาแฟ แล้วมีลูกค้า (sensor) เดินเข้ามาสั่งโน่นนี่ตลอดเวลา ถ้าน้องไม่มีพนักงานรับออเดอร์ น้องก็ต้องวิ่งรับเองทุกโต๊ะ นั่นก็คือการเขียน code รับข้อมูลจากทุก sensor เอง

Telegraf คือ “พนักงานรับออเดอร์มืออาชีพ” ที่รับข้อมูลจาก MQTT, แปลง JSON, ปรับแต่งข้อมูล แล้วส่งไป InfluxDB — ทั้งหมดแค่แก้ config ไฟล์เดียว

ทำไมถึงสำคัญ?

  • ไม่ต้องเขียน consumer code เอง ลดความผิดพลาด
  • Config-driven ทำให้ทีมที่ไม่ใช่ dev แก้ได้ง่าย
  • มี plugin ให้เลือกกว่า 300 ตัว รองรับทุก use case
  • Handle back pressure และ retry ให้อัตโนมัติ

สิ่งที่น้องๆ จะได้จาก workshop นี้

  • เข้าใจ flow ข้อมูลจาก MQTT → Telegraf → InfluxDB ทั้งหมด
  • ตั้งค่า MQTT Consumer รับ 3 topic patterns (sensor / status / alert)
  • ใช้ JSON v2 Parser แกะ payload ซับซ้อน
  • ปั้นข้อมูลด้วย Processor Plugins (rename / converter / regex / starlark / dedup)
  • Route ข้อมูลไปยัง retention policy ที่เหมาะสม
  • Tune performance ให้รับ load จริงได้

ภาพรวม: สายพานข้อมูลของเรา

ก่อนลงมือ ขอให้น้องๆ เห็น big picture ก่อน เปรียบง่ายๆ ว่า Telegraf คือโรงงาน มี “สายพาน” หลายช่วง:

Input (รับวัตถุดิบ) → Parser (แกะกล่อง) → Processor (แปรรูป) → Output (ส่งสินค้า)

Mermaid Diagram

Note: Processor แต่ละตัวใช้ namepass กรองเฉพาะ measurement ที่ตัวเองรับผิดชอบ เหมือนสายพานโรงงานที่แต่ละสถานีทำงานเฉพาะผลิตภัณฑ์ของตัวเอง


MQTT Topic Structure: ทบทวนก่อนลุย

iot/sensors/{device_id}/data      → ค่าเซนเซอร์ (อุณหภูมิ, ความชื้น, CO2)
iot/devices/{device_id}/status    → สถานะอุปกรณ์ (online/offline, battery)
iot/devices/{device_id}/alert     → alert จากอุปกรณ์
iot/system/heartbeat              → สุขภาพระบบ

ตัวอย่าง Payload จริงๆ ที่เราจะรับ

Sensor Data (iot/sensors/device_001/data):

{
  "device_id": "device_001",
  "location": "floor_1",
  "building": "A",
  "timestamp": 1711411200,
  "readings": [
    {"type": "temperature", "value": 25.3, "unit": "celsius"},
    {"type": "humidity", "value": 62.5, "unit": "percent"},
    {"type": "co2", "value": 450, "unit": "ppm"}
  ]
}

Device Status (iot/devices/device_001/status):

{
  "device_id": "device_001",
  "location": "floor_1",
  "status": "online",
  "battery_level": 87,
  "signal_strength": -65,
  "firmware_version": "1.2.3",
  "uptime_seconds": 86400,
  "timestamp": 1711411200
}

Step 1: MQTT Consumer Input Plugin

ทำไมต้องมี Consumer แยกกัน 3 ตัว?

เหมือนร้านสะดวกซื้อที่มีพนักงานดูแลแต่ละโซน — พนักงานโซนอาหารไม่ต้องไปยุ่งกับโซนเครื่องดื่ม แยก consumer ทำให้เรา config แต่ละ topic ได้อิสระ เช่น alert ใช้ QoS 2 (exactly-once) เพราะพลาดไม่ได้ แต่ sensor data ใช้ QoS 1 ก็พอ

สร้างไฟล์ deployments/telegraf/conf.d/mqtt.conf:

# ─────────────────────────────────────────────────────
# INPUT: MQTT Consumer - Sensor Data
# ─────────────────────────────────────────────────────
[[inputs.mqtt_consumer]]
  ## MQTT broker URLs
  servers = ["$MQTT_BROKER"]

  ## Topics to subscribe to
  topics = [
    "iot/sensors/+/data",
  ]

  ## QoS subscription level
  qos = 1

  ## Connection timeout
  connection_timeout = "30s"
  keepalive = "60s"

  ## Authentication
  username = "$MQTT_USERNAME"
  password = "$MQTT_PASSWORD"

  ## Client ID (unique per agent instance)
  client_id = "telegraf-sensor-consumer"

  ## Message routing
  ## ชื่อ measurement ที่จะเขียนลง InfluxDB (ก่อน processor จะ rename)
  data_format = "json_v2"
  topic_tag = "mqtt_topic"

  ## แปลง topic path เป็น tag
  ## iot/sensors/device_001/data → device_id = device_001
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "iot/sensors/+/data"
    measurement = "_/_/_/_"
    tags = "_/sensors/<device_id>/_"
    fields = ""

  ## JSON v2 Parser Configuration
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "raw_sensor_data"
    timestamp_path = "timestamp"
    timestamp_format = "unix"
    timestamp_timezone = "UTC"

    # Tags จาก JSON payload
    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "device_id"
      rename = "device_id"

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "location"
      rename = "location"

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "building"
      rename = "building"

    # อ่าน readings array - temperature
    [[inputs.mqtt_consumer.json_v2.object]]
      path = "readings"
      tags = ["type"]
      disable_prepend_keys = true

      [inputs.mqtt_consumer.json_v2.object.renames]
        value = "sensor_value"
        unit  = "sensor_unit"

# ─────────────────────────────────────────────────────
# INPUT: MQTT Consumer - Device Status
# ─────────────────────────────────────────────────────
[[inputs.mqtt_consumer]]
  servers = ["$MQTT_BROKER"]
  topics = ["iot/devices/+/status"]
  qos = 1
  connection_timeout = "30s"
  username = "$MQTT_USERNAME"
  password = "$MQTT_PASSWORD"
  client_id = "telegraf-status-consumer"
  data_format = "json_v2"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "iot/devices/+/status"
    measurement = "_/_/_/_"
    tags = "_/devices/<device_id>/_"

  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "device_status"
    timestamp_path = "timestamp"
    timestamp_format = "unix"
    timestamp_timezone = "UTC"

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "device_id"

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "location"

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "status"
      rename = "online_status"

    [[inputs.mqtt_consumer.json_v2.field]]
      path = "battery_level"
      type = "float"

    [[inputs.mqtt_consumer.json_v2.field]]
      path = "signal_strength"
      type = "float"

    [[inputs.mqtt_consumer.json_v2.field]]
      path = "uptime_seconds"
      type = "int"

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "firmware_version"

# ─────────────────────────────────────────────────────
# INPUT: MQTT Consumer - Device Alerts
# ─────────────────────────────────────────────────────
[[inputs.mqtt_consumer]]
  servers = ["$MQTT_BROKER"]
  topics = ["iot/devices/+/alert"]
  qos = 2  # exactly-once สำหรับ alerts
  connection_timeout = "30s"
  username = "$MQTT_USERNAME"
  password = "$MQTT_PASSWORD"
  client_id = "telegraf-alert-consumer"
  data_format = "json_v2"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "iot/devices/+/alert"
    measurement = "_/_/_/_"
    tags = "_/devices/<device_id>/_"

  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "device_alerts"

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "device_id"
    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "alert_type"
    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "severity"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "message"
      type = "string"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "value"
      type = "float"

เรื่อง topic_parsing: ส่วนนี้เจ๋งมาก! มันแกะ iot/sensors/device_001/data ออกมาเป็น tag device_id = device_001 ให้อัตโนมัติ ไม่ต้องไป regex เอง


Step 2: Processor Plugins — แปรรูปข้อมูล

ทำไม Processor ถึงสำคัญ?

ลองนึกถึงโรงงานน้ำผลไม้ — ผลไม้ดิบ (raw data) ที่รับมาจาก MQTT ยังเอาไปขายไม่ได้ ต้องผ่านสายพานแปรรูปก่อน: ล้าง (rename), คัดขนาด (converter type), ตัดแต่ง (regex) แล้วถึงได้ผลิตภัณฑ์พร้อมวาง shelf

สร้างไฟล์ deployments/telegraf/conf.d/processors.conf:

# ─────────────────────────────────────────────────────
# PROCESSOR: Rename - แปลงชื่อ measurement และ fields
# ─────────────────────────────────────────────────────

## ย้าย raw_sensor_data → sensor_data และ rename fields
[[processors.rename]]
  namepass = ["raw_sensor_data"]

  [[processors.rename.replace]]
    measurement = "raw_sensor_data"
    dest = "sensor_data"

  [[processors.rename.replace]]
    field = "sensor_value"
    dest = "value"

  [[processors.rename.replace]]
    tag = "type"
    dest = "measurement_type"

# ─────────────────────────────────────────────────────
# PROCESSOR: Converter - แปลง type ของ fields
# ─────────────────────────────────────────────────────

[[processors.converter]]
  namepass = ["sensor_data"]

  [processors.converter.fields]
    ## แปลงค่าที่อาจมาเป็น string ให้เป็น float
    float = ["value"]

[[processors.converter]]
  namepass = ["device_status"]

  [processors.converter.fields]
    float  = ["battery_level", "signal_strength"]
    integer = ["uptime_seconds"]

# ─────────────────────────────────────────────────────
# PROCESSOR: Regex - แปลง tag values
# ─────────────────────────────────────────────────────

## ทำให้ online_status เป็นตัวพิมพ์เล็กทั้งหมด
[[processors.regex]]
  namepass = ["device_status"]

  [[processors.regex.tags]]
    key = "online_status"
    pattern = "^(.+)$"
    replacement = "${1}"
    result_key = "status_normalized"

## แยก location ออกเป็น building code จาก format "building_A_floor_1"
[[processors.regex]]
  namepass = ["sensor_data", "device_status"]

  [[processors.regex.tags]]
    key = "location"
    pattern = "^(floor_\\d+)$"
    replacement = "${1}"
    result_key = "floor"

# ─────────────────────────────────────────────────────
# PROCESSOR: Starlark - Logic ที่ซับซ้อน
# (ใช้สำหรับ calculated fields)
# ─────────────────────────────────────────────────────

[[processors.starlark]]
  namepass = ["sensor_data"]
  source = '''
def apply(metric):
    # คำนวณ Heat Index จาก temperature และ humidity
    mtype = metric.tags.get("measurement_type", "")
    if mtype != "temperature":
        return metric

    # เพิ่ม tag สำหรับ alert threshold
    val = metric.fields.get("value", 0.0)
    if val > 35.0:
        metric.tags["temp_zone"] = "critical"
    elif val > 30.0:
        metric.tags["temp_zone"] = "warning"
    elif val > 15.0:
        metric.tags["temp_zone"] = "normal"
    else:
        metric.tags["temp_zone"] = "cold"

    return metric
'''

# ─────────────────────────────────────────────────────
# PROCESSOR: Dedup - กรอง duplicate records
# ─────────────────────────────────────────────────────

[[processors.dedup]]
  namepass = ["device_status"]
  ## หากค่าเหมือนกันทุก field ภายใน 5 นาที ให้ drop
  dedup_interval = "5m"

เรื่อง Starlark — นี่คือ processor ที่ทรงพลังที่สุดในชุด! มันให้เราเขียน Python-like script เพื่อ logic ซับซ้อนได้ ในที่นี้เราใช้มันแปะ tag temp_zone ตามค่าอุณหภูมิ ทำให้ query ใน Grafana ง่ายขึ้นมาก แทนที่จะต้องเขียน threshold logic ซ้ำในทุก dashboard


Step 3: Multiple Measurement Routing

ทำไมต้อง route แยก?

เปรียบเหมือนไปรษณีย์: จดหมายธรรมดา (sensor data) กับพัสดุด่วนพิเศษ (alerts) ต้องไปคนละสาย คนละ SLA alert ต้องเก็บไว้นาน 1 ปีเพราะใช้ตรวจสอบ compliance แต่ sensor data raw เก็บแค่ 30 วันก็พอ การแยก output ทำให้จัดการ retention policy ได้ถูกต้อง

# ─────────────────────────────────────────────────────
# OUTPUT: InfluxDB - Sensor Data (raw_30d retention)
# ─────────────────────────────────────────────────────
[[outputs.influxdb]]
  ## กรองเฉพาะ measurements ที่เกี่ยวกับ sensor
  namepass = ["sensor_data"]

  urls = ["$INFLUXDB_URL"]
  database = "iot_data"
  retention_policy = "raw_30d"
  username = "$INFLUXDB_USER"
  password = "$INFLUXDB_PASSWORD"
  timeout = "10s"
  content_encoding = "gzip"
  write_consistency = "any"

  ## Tag ที่ไม่ต้องการส่งไป InfluxDB
  tagexclude = ["mqtt_topic", "host"]

# ─────────────────────────────────────────────────────
# OUTPUT: InfluxDB - Device Status
# ─────────────────────────────────────────────────────
[[outputs.influxdb]]
  namepass = ["device_status"]

  urls = ["$INFLUXDB_URL"]
  database = "iot_data"
  retention_policy = "raw_30d"
  username = "$INFLUXDB_USER"
  password = "$INFLUXDB_PASSWORD"
  timeout = "10s"
  content_encoding = "gzip"

  tagexclude = ["mqtt_topic", "host"]

# ─────────────────────────────────────────────────────
# OUTPUT: InfluxDB - Device Alerts (เก็บนานกว่า)
# ─────────────────────────────────────────────────────
[[outputs.influxdb]]
  namepass = ["device_alerts"]

  urls = ["$INFLUXDB_URL"]
  database = "iot_data"
  ## Alerts เก็บใน retention policy ที่นานกว่า (1 ปี)
  retention_policy = "hourly_1y"
  username = "$INFLUXDB_USER"
  password = "$INFLUXDB_PASSWORD"
  timeout = "10s"

  tagexclude = ["mqtt_topic", "host"]

# ─────────────────────────────────────────────────────
# OUTPUT: File (Debug - ดู metric ที่ไม่ได้ route)
# ─────────────────────────────────────────────────────
[[outputs.file]]
  ## Catch-all: log metrics ที่ไม่ match
  namedrop = ["sensor_data", "device_status", "device_alerts"]
  files = ["stdout"]
  data_format = "json"

Step 4: Performance Tuning

ทำไมต้อง tune ก่อน production?

ค่า default ของ Telegraf ออกแบบมาสำหรับ general use ไม่ใช่ IoT ที่มี sensor ส่งข้อมูลทุก 10 วินาที หากไม่ tune แล้วมี device เพิ่มขึ้นเรื่อยๆ Telegraf อาจ drop metrics หรือ InfluxDB รับไม่ทัน เหมือนร้านกาแฟที่ไม่เตรียม queue ไว้รับลูกค้าชั่วโมงเร่งด่วน

อัปเดต deployments/telegraf/telegraf.conf ส่วน [agent]:

[agent]
  ## ─── Timing ────────────────────────────────
  ## ความถี่ในการ collect metrics
  interval = "10s"

  ## ปัดเวลาให้ตรงกับ interval (เช่น :00, :10, :20)
  round_interval = true

  ## ─── Batching ───────────────────────────────
  ## จำนวน metrics ต่อ write request
  metric_batch_size = 5000

  ## buffer size ก่อน drop (ป้องกัน back pressure)
  metric_buffer_limit = 50000

  ## ─── Flushing ───────────────────────────────
  ## ความถี่ในการ flush buffer ไปยัง output
  flush_interval = "10s"

  ## jitter เพื่อกระจาย load (0 = ไม่มี jitter)
  flush_jitter = "3s"

  ## ─── Precision ──────────────────────────────
  ## ความละเอียดของ timestamp (s=second, ms=millisecond, us=microsecond, ns=nanosecond)
  ## สำหรับ IoT sensor data ที่ sample ทุก 10s ใช้ "s" พอ
  precision = "1s"

  ## ─── Concurrency ────────────────────────────
  ## จำนวน goroutine สำหรับ collection
  ## 0 = default (เท่ากับจำนวน input plugins)
  ## collection_jitter จะกระจาย collection time
  collection_jitter = "2s"

  ## ─── Logging ────────────────────────────────
  debug = false
  quiet = false
  ## log level: error, warn, info, debug
  logfile = ""
  logfile_rotation_interval = "0d"
  logfile_rotation_max_size = "100mb"
  logfile_rotation_max_archives = 5

เรื่อง flush_jitter: ค่า 3s นี้สำคัญมาก! ถ้า Telegraf หลายตัวรัน parallel แล้ว flush พร้อมกันทุก 10s ทุกตัว InfluxDB จะโดน spike load กระชากทุก 10 วินาที การใส่ jitter ทำให้แต่ละ instance flush ในเวลาต่างกันเล็กน้อย เหมือนการจัดคิวรถในลานจอด ไม่ให้ทุกคันออกพร้อมกัน

Environment Variables สำหรับ Telegraf

สร้างหรืออัปเดต deployments/.env ให้มี:

# InfluxDB
INFLUXDB_URL=http://influxdb:8086
INFLUXDB_USER=telegraf
INFLUXDB_USER_PASSWORD=telegraf123_secure

# MQTT
MQTT_BROKER=tcp://mosquitto:1883
MQTT_USERNAME=iot_client
MQTT_PASSWORD=mqtt_secure_pass

Step 5: ทดสอบ Pipeline

ทดสอบด้วย mosquitto_pub

# ส่ง sensor data จำลอง
docker exec iot-mosquitto mosquitto_pub \
  -h localhost \
  -t "iot/sensors/device_001/data" \
  -u iot_client -P mqtt_secure_pass \
  -m '{
    "device_id": "device_001",
    "location": "floor_1",
    "building": "A",
    "timestamp": '"$(date +%s)"',
    "readings": [
      {"type": "temperature", "value": 26.8, "unit": "celsius"},
      {"type": "humidity", "value": 65.2, "unit": "percent"}
    ]
  }'

# ส่ง device status จำลอง
docker exec iot-mosquitto mosquitto_pub \
  -h localhost \
  -t "iot/devices/device_001/status" \
  -u iot_client -P mqtt_secure_pass \
  -m '{
    "device_id": "device_001",
    "location": "floor_1",
    "status": "online",
    "battery_level": 85,
    "signal_strength": -62,
    "firmware_version": "1.2.3",
    "uptime_seconds": 3600,
    "timestamp": '"$(date +%s)"'
  }'

ตรวจสอบข้อมูลใน InfluxDB

# ดู measurements ที่มีอยู่
curl -G "http://localhost:8086/query" \
  -u admin:admin123 \
  --data-urlencode "db=iot_data" \
  --data-urlencode "q=SHOW MEASUREMENTS"

# Query sensor_data
curl -G "http://localhost:8086/query" \
  -u admin:admin123 \
  --data-urlencode "db=iot_data" \
  --data-urlencode "q=SELECT * FROM sensor_data WHERE time > now() - 5m ORDER BY time DESC LIMIT 20"

# Query device_status
curl -G "http://localhost:8086/query" \
  -u admin:admin123 \
  --data-urlencode "db=iot_data" \
  --data-urlencode "q=SELECT * FROM device_status WHERE time > now() - 5m"

# ดู tag values ของ device_id
curl -G "http://localhost:8086/query" \
  -u admin:admin123 \
  --data-urlencode "db=iot_data" \
  --data-urlencode "q=SHOW TAG VALUES FROM sensor_data WITH KEY = device_id"

ดู Telegraf Metrics (self-monitoring)

# ดู Telegraf logs
docker logs iot-telegraf -f --tail 50

# ดู internal metrics ของ Telegraf ใน InfluxDB
curl -G "http://localhost:8086/query" \
  -u admin:admin123 \
  --data-urlencode "db=iot_data" \
  --data-urlencode "q=SELECT * FROM internal_agent WHERE time > now() - 5m"

Step 6: Script ทดสอบแบบ Bulk

อยากรู้ว่าระบบรับ load จริงได้ไหม? มาสร้าง simulator กัน!

สร้าง scripts/simulate_sensors.sh:

#!/bin/bash
# Simulate IoT sensor data
# Usage: ./scripts/simulate_sensors.sh [num_devices] [interval_seconds]

NUM_DEVICES=${1:-5}
INTERVAL=${2:-10}
MQTT_HOST="localhost"
MQTT_USER="iot_client"
MQTT_PASS="mqtt_secure_pass"

LOCATIONS=("floor_1" "floor_2" "floor_3" "basement")
BUILDINGS=("A" "B" "C")

echo "Starting simulation: ${NUM_DEVICES} devices, every ${INTERVAL}s"
echo "Press Ctrl+C to stop"

while true; do
  for i in $(seq 1 $NUM_DEVICES); do
    DEVICE_ID="device_$(printf '%03d' $i)"
    LOC=${LOCATIONS[$((RANDOM % ${#LOCATIONS[@]}))]}
    BUILDING=${BUILDINGS[$((RANDOM % ${#BUILDINGS[@]}))]}

    # Random sensor values
    TEMP=$(echo "scale=1; 20 + $((RANDOM % 200)) / 10" | bc)
    HUMID=$(echo "scale=1; 40 + $((RANDOM % 400)) / 10" | bc)
    CO2=$((350 + RANDOM % 650))
    BATTERY=$((60 + RANDOM % 40))
    SIGNAL=$((-90 + RANDOM % 40))

    # Publish sensor data
    mosquitto_pub \
      -h "$MQTT_HOST" \
      -t "iot/sensors/${DEVICE_ID}/data" \
      -u "$MQTT_USER" -P "$MQTT_PASS" \
      -m "{
        \"device_id\": \"${DEVICE_ID}\",
        \"location\": \"${LOC}\",
        \"building\": \"${BUILDING}\",
        \"timestamp\": $(date +%s),
        \"readings\": [
          {\"type\": \"temperature\", \"value\": ${TEMP}, \"unit\": \"celsius\"},
          {\"type\": \"humidity\",    \"value\": ${HUMID}, \"unit\": \"percent\"},
          {\"type\": \"co2\",         \"value\": ${CO2},  \"unit\": \"ppm\"}
        ]
      }" &

    # Publish device status (ทุก 5 iterations)
    if (( i % 5 == 0 )); then
      mosquitto_pub \
        -h "$MQTT_HOST" \
        -t "iot/devices/${DEVICE_ID}/status" \
        -u "$MQTT_USER" -P "$MQTT_PASS" \
        -m "{
          \"device_id\": \"${DEVICE_ID}\",
          \"location\": \"${LOC}\",
          \"status\": \"online\",
          \"battery_level\": ${BATTERY},
          \"signal_strength\": ${SIGNAL},
          \"firmware_version\": \"1.2.3\",
          \"uptime_seconds\": $((RANDOM % 86400)),
          \"timestamp\": $(date +%s)
        }" &
    fi
  done

  wait  # รอ background jobs ทั้งหมด
  echo "[$(date '+%H:%M:%S')] Published data for ${NUM_DEVICES} devices"
  sleep "$INTERVAL"
done
chmod +x scripts/simulate_sensors.sh
./scripts/simulate_sensors.sh 10 5  # 10 devices, ทุก 5 วินาที

Recap: สรุปสิ่งที่เราทำวันนี้

  [MQTT]──►[Consumer]──►[Parser]──►[Processor]──►[InfluxDB]
     ╔═══════════════════════════════════════════════════╗
     ║  วันนี้เราประกอบสายพานทั้งหมดครบแล้ว! \(^o^)/   ║
     ╚═══════════════════════════════════════════════════╝
Componentรายละเอียด
MQTT Consumerรับข้อมูลจาก 3 topic patterns (sensor/status/alert)
JSON v2 Parserparse payload และแยก tags/fields
Processor: renameเปลี่ยนชื่อ measurement และ fields
Processor: converterแปลง field types ให้ถูกต้อง
Processor: regextransform tag values
Processor: starlarkcalculated fields (temp_zone)
Processor: dedupกรอง duplicate records ภายใน 5 นาที
Output routingแยก measurement ไปยัง retention policy ที่เหมาะสม
Performance tuningbatch size 5000, buffer 50000, flush 10s

Key Takeaway: Telegraf ทำให้เราไม่ต้องเขียน data pipeline code เองเลย ทุกอย่างเป็น config ทำให้ maintainable มากขึ้น และทีมที่ไม่ได้เป็น developer ก็สามารถเข้ามาช่วยดูแลได้


Next Step

ตอนนี้ข้อมูลไหลเข้า InfluxDB ได้แล้ว ขั้นต่อไปคือการตั้ง alerting อัตโนมัติด้วย Kapacitor เมื่อค่าเซนเซอร์เกิน threshold!