Telegraf Pipeline: สายพานข้อมูล IoT
Telegraf Pipeline: สายพานข้อมูล IoT
สวัสดีน้องๆ! พี่โชว์มาแล้ว วันนี้เราจะมาประกอบ “สายพานส่งข้อมูล” ที่ทำให้ข้อมูลจากเซนเซอร์ในออฟฟิศไหลไปถึง InfluxDB โดยอัตโนมัติ ไม่ต้องเขียน code เชื่อมเองสักบรรทัดเดียว มาลุยกัน! (ง •̀_•́)ง
ก่อนลุย: ทำไมเราถึงต้องการ Telegraf?
ลองนึกภาพนะ น้องๆ เปิดร้านกาแฟ แล้วมีลูกค้า (sensor) เดินเข้ามาสั่งโน่นนี่ตลอดเวลา ถ้าน้องไม่มีพนักงานรับออเดอร์ น้องก็ต้องวิ่งรับเองทุกโต๊ะ นั่นก็คือการเขียน code รับข้อมูลจากทุก sensor เอง
Telegraf คือ “พนักงานรับออเดอร์มืออาชีพ” ที่รับข้อมูลจาก MQTT, แปลง JSON, ปรับแต่งข้อมูล แล้วส่งไป InfluxDB — ทั้งหมดแค่แก้ config ไฟล์เดียว
ทำไมถึงสำคัญ?
- ไม่ต้องเขียน consumer code เอง ลดความผิดพลาด
- Config-driven ทำให้ทีมที่ไม่ใช่ dev แก้ได้ง่าย
- มี plugin ให้เลือกกว่า 300 ตัว รองรับทุก use case
- Handle back pressure และ retry ให้อัตโนมัติ
สิ่งที่น้องๆ จะได้จาก workshop นี้
- เข้าใจ flow ข้อมูลจาก MQTT → Telegraf → InfluxDB ทั้งหมด
- ตั้งค่า MQTT Consumer รับ 3 topic patterns (sensor / status / alert)
- ใช้ JSON v2 Parser แกะ payload ซับซ้อน
- ปั้นข้อมูลด้วย Processor Plugins (rename / converter / regex / starlark / dedup)
- Route ข้อมูลไปยัง retention policy ที่เหมาะสม
- Tune performance ให้รับ load จริงได้
ภาพรวม: สายพานข้อมูลของเรา
ก่อนลงมือ ขอให้น้องๆ เห็น big picture ก่อน เปรียบง่ายๆ ว่า Telegraf คือโรงงาน มี “สายพาน” หลายช่วง:
Input (รับวัตถุดิบ) → Parser (แกะกล่อง) → Processor (แปรรูป) → Output (ส่งสินค้า)
Note: Processor แต่ละตัวใช้
namepassกรองเฉพาะ measurement ที่ตัวเองรับผิดชอบ เหมือนสายพานโรงงานที่แต่ละสถานีทำงานเฉพาะผลิตภัณฑ์ของตัวเอง
MQTT Topic Structure: ทบทวนก่อนลุย
iot/sensors/{device_id}/data → ค่าเซนเซอร์ (อุณหภูมิ, ความชื้น, CO2)
iot/devices/{device_id}/status → สถานะอุปกรณ์ (online/offline, battery)
iot/devices/{device_id}/alert → alert จากอุปกรณ์
iot/system/heartbeat → สุขภาพระบบ
ตัวอย่าง Payload จริงๆ ที่เราจะรับ
Sensor Data (iot/sensors/device_001/data):
{
"device_id": "device_001",
"location": "floor_1",
"building": "A",
"timestamp": 1711411200,
"readings": [
{"type": "temperature", "value": 25.3, "unit": "celsius"},
{"type": "humidity", "value": 62.5, "unit": "percent"},
{"type": "co2", "value": 450, "unit": "ppm"}
]
}
Device Status (iot/devices/device_001/status):
{
"device_id": "device_001",
"location": "floor_1",
"status": "online",
"battery_level": 87,
"signal_strength": -65,
"firmware_version": "1.2.3",
"uptime_seconds": 86400,
"timestamp": 1711411200
}
Step 1: MQTT Consumer Input Plugin
ทำไมต้องมี Consumer แยกกัน 3 ตัว?
เหมือนร้านสะดวกซื้อที่มีพนักงานดูแลแต่ละโซน — พนักงานโซนอาหารไม่ต้องไปยุ่งกับโซนเครื่องดื่ม แยก consumer ทำให้เรา config แต่ละ topic ได้อิสระ เช่น alert ใช้ QoS 2 (exactly-once) เพราะพลาดไม่ได้ แต่ sensor data ใช้ QoS 1 ก็พอ
สร้างไฟล์ deployments/telegraf/conf.d/mqtt.conf:
# ─────────────────────────────────────────────────────
# INPUT: MQTT Consumer - Sensor Data
# ─────────────────────────────────────────────────────
[[inputs.mqtt_consumer]]
## MQTT broker URLs
servers = ["$MQTT_BROKER"]
## Topics to subscribe to
topics = [
"iot/sensors/+/data",
]
## QoS subscription level
qos = 1
## Connection timeout
connection_timeout = "30s"
keepalive = "60s"
## Authentication
username = "$MQTT_USERNAME"
password = "$MQTT_PASSWORD"
## Client ID (unique per agent instance)
client_id = "telegraf-sensor-consumer"
## Message routing
## ชื่อ measurement ที่จะเขียนลง InfluxDB (ก่อน processor จะ rename)
data_format = "json_v2"
topic_tag = "mqtt_topic"
## แปลง topic path เป็น tag
## iot/sensors/device_001/data → device_id = device_001
[[inputs.mqtt_consumer.topic_parsing]]
topic = "iot/sensors/+/data"
measurement = "_/_/_/_"
tags = "_/sensors/<device_id>/_"
fields = ""
## JSON v2 Parser Configuration
[[inputs.mqtt_consumer.json_v2]]
measurement_name = "raw_sensor_data"
timestamp_path = "timestamp"
timestamp_format = "unix"
timestamp_timezone = "UTC"
# Tags จาก JSON payload
[[inputs.mqtt_consumer.json_v2.tag]]
path = "device_id"
rename = "device_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "location"
rename = "location"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "building"
rename = "building"
# อ่าน readings array - temperature
[[inputs.mqtt_consumer.json_v2.object]]
path = "readings"
tags = ["type"]
disable_prepend_keys = true
[inputs.mqtt_consumer.json_v2.object.renames]
value = "sensor_value"
unit = "sensor_unit"
# ─────────────────────────────────────────────────────
# INPUT: MQTT Consumer - Device Status
# ─────────────────────────────────────────────────────
[[inputs.mqtt_consumer]]
servers = ["$MQTT_BROKER"]
topics = ["iot/devices/+/status"]
qos = 1
connection_timeout = "30s"
username = "$MQTT_USERNAME"
password = "$MQTT_PASSWORD"
client_id = "telegraf-status-consumer"
data_format = "json_v2"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "iot/devices/+/status"
measurement = "_/_/_/_"
tags = "_/devices/<device_id>/_"
[[inputs.mqtt_consumer.json_v2]]
measurement_name = "device_status"
timestamp_path = "timestamp"
timestamp_format = "unix"
timestamp_timezone = "UTC"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "device_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "location"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "status"
rename = "online_status"
[[inputs.mqtt_consumer.json_v2.field]]
path = "battery_level"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "signal_strength"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "uptime_seconds"
type = "int"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "firmware_version"
# ─────────────────────────────────────────────────────
# INPUT: MQTT Consumer - Device Alerts
# ─────────────────────────────────────────────────────
[[inputs.mqtt_consumer]]
servers = ["$MQTT_BROKER"]
topics = ["iot/devices/+/alert"]
qos = 2 # exactly-once สำหรับ alerts
connection_timeout = "30s"
username = "$MQTT_USERNAME"
password = "$MQTT_PASSWORD"
client_id = "telegraf-alert-consumer"
data_format = "json_v2"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "iot/devices/+/alert"
measurement = "_/_/_/_"
tags = "_/devices/<device_id>/_"
[[inputs.mqtt_consumer.json_v2]]
measurement_name = "device_alerts"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "device_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "alert_type"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "severity"
[[inputs.mqtt_consumer.json_v2.field]]
path = "message"
type = "string"
[[inputs.mqtt_consumer.json_v2.field]]
path = "value"
type = "float"
เรื่อง
topic_parsing: ส่วนนี้เจ๋งมาก! มันแกะiot/sensors/device_001/dataออกมาเป็น tagdevice_id = device_001ให้อัตโนมัติ ไม่ต้องไป regex เอง
Step 2: Processor Plugins — แปรรูปข้อมูล
ทำไม Processor ถึงสำคัญ?
ลองนึกถึงโรงงานน้ำผลไม้ — ผลไม้ดิบ (raw data) ที่รับมาจาก MQTT ยังเอาไปขายไม่ได้ ต้องผ่านสายพานแปรรูปก่อน: ล้าง (rename), คัดขนาด (converter type), ตัดแต่ง (regex) แล้วถึงได้ผลิตภัณฑ์พร้อมวาง shelf
สร้างไฟล์ deployments/telegraf/conf.d/processors.conf:
# ─────────────────────────────────────────────────────
# PROCESSOR: Rename - แปลงชื่อ measurement และ fields
# ─────────────────────────────────────────────────────
## ย้าย raw_sensor_data → sensor_data และ rename fields
[[processors.rename]]
namepass = ["raw_sensor_data"]
[[processors.rename.replace]]
measurement = "raw_sensor_data"
dest = "sensor_data"
[[processors.rename.replace]]
field = "sensor_value"
dest = "value"
[[processors.rename.replace]]
tag = "type"
dest = "measurement_type"
# ─────────────────────────────────────────────────────
# PROCESSOR: Converter - แปลง type ของ fields
# ─────────────────────────────────────────────────────
[[processors.converter]]
namepass = ["sensor_data"]
[processors.converter.fields]
## แปลงค่าที่อาจมาเป็น string ให้เป็น float
float = ["value"]
[[processors.converter]]
namepass = ["device_status"]
[processors.converter.fields]
float = ["battery_level", "signal_strength"]
integer = ["uptime_seconds"]
# ─────────────────────────────────────────────────────
# PROCESSOR: Regex - แปลง tag values
# ─────────────────────────────────────────────────────
## ทำให้ online_status เป็นตัวพิมพ์เล็กทั้งหมด
[[processors.regex]]
namepass = ["device_status"]
[[processors.regex.tags]]
key = "online_status"
pattern = "^(.+)$"
replacement = "${1}"
result_key = "status_normalized"
## แยก location ออกเป็น building code จาก format "building_A_floor_1"
[[processors.regex]]
namepass = ["sensor_data", "device_status"]
[[processors.regex.tags]]
key = "location"
pattern = "^(floor_\\d+)$"
replacement = "${1}"
result_key = "floor"
# ─────────────────────────────────────────────────────
# PROCESSOR: Starlark - Logic ที่ซับซ้อน
# (ใช้สำหรับ calculated fields)
# ─────────────────────────────────────────────────────
[[processors.starlark]]
namepass = ["sensor_data"]
source = '''
def apply(metric):
# คำนวณ Heat Index จาก temperature และ humidity
mtype = metric.tags.get("measurement_type", "")
if mtype != "temperature":
return metric
# เพิ่ม tag สำหรับ alert threshold
val = metric.fields.get("value", 0.0)
if val > 35.0:
metric.tags["temp_zone"] = "critical"
elif val > 30.0:
metric.tags["temp_zone"] = "warning"
elif val > 15.0:
metric.tags["temp_zone"] = "normal"
else:
metric.tags["temp_zone"] = "cold"
return metric
'''
# ─────────────────────────────────────────────────────
# PROCESSOR: Dedup - กรอง duplicate records
# ─────────────────────────────────────────────────────
[[processors.dedup]]
namepass = ["device_status"]
## หากค่าเหมือนกันทุก field ภายใน 5 นาที ให้ drop
dedup_interval = "5m"
เรื่อง Starlark — นี่คือ processor ที่ทรงพลังที่สุดในชุด! มันให้เราเขียน Python-like script เพื่อ logic ซับซ้อนได้ ในที่นี้เราใช้มันแปะ tag temp_zone ตามค่าอุณหภูมิ ทำให้ query ใน Grafana ง่ายขึ้นมาก แทนที่จะต้องเขียน threshold logic ซ้ำในทุก dashboard
Step 3: Multiple Measurement Routing
ทำไมต้อง route แยก?
เปรียบเหมือนไปรษณีย์: จดหมายธรรมดา (sensor data) กับพัสดุด่วนพิเศษ (alerts) ต้องไปคนละสาย คนละ SLA alert ต้องเก็บไว้นาน 1 ปีเพราะใช้ตรวจสอบ compliance แต่ sensor data raw เก็บแค่ 30 วันก็พอ การแยก output ทำให้จัดการ retention policy ได้ถูกต้อง
# ─────────────────────────────────────────────────────
# OUTPUT: InfluxDB - Sensor Data (raw_30d retention)
# ─────────────────────────────────────────────────────
[[outputs.influxdb]]
## กรองเฉพาะ measurements ที่เกี่ยวกับ sensor
namepass = ["sensor_data"]
urls = ["$INFLUXDB_URL"]
database = "iot_data"
retention_policy = "raw_30d"
username = "$INFLUXDB_USER"
password = "$INFLUXDB_PASSWORD"
timeout = "10s"
content_encoding = "gzip"
write_consistency = "any"
## Tag ที่ไม่ต้องการส่งไป InfluxDB
tagexclude = ["mqtt_topic", "host"]
# ─────────────────────────────────────────────────────
# OUTPUT: InfluxDB - Device Status
# ─────────────────────────────────────────────────────
[[outputs.influxdb]]
namepass = ["device_status"]
urls = ["$INFLUXDB_URL"]
database = "iot_data"
retention_policy = "raw_30d"
username = "$INFLUXDB_USER"
password = "$INFLUXDB_PASSWORD"
timeout = "10s"
content_encoding = "gzip"
tagexclude = ["mqtt_topic", "host"]
# ─────────────────────────────────────────────────────
# OUTPUT: InfluxDB - Device Alerts (เก็บนานกว่า)
# ─────────────────────────────────────────────────────
[[outputs.influxdb]]
namepass = ["device_alerts"]
urls = ["$INFLUXDB_URL"]
database = "iot_data"
## Alerts เก็บใน retention policy ที่นานกว่า (1 ปี)
retention_policy = "hourly_1y"
username = "$INFLUXDB_USER"
password = "$INFLUXDB_PASSWORD"
timeout = "10s"
tagexclude = ["mqtt_topic", "host"]
# ─────────────────────────────────────────────────────
# OUTPUT: File (Debug - ดู metric ที่ไม่ได้ route)
# ─────────────────────────────────────────────────────
[[outputs.file]]
## Catch-all: log metrics ที่ไม่ match
namedrop = ["sensor_data", "device_status", "device_alerts"]
files = ["stdout"]
data_format = "json"
Step 4: Performance Tuning
ทำไมต้อง tune ก่อน production?
ค่า default ของ Telegraf ออกแบบมาสำหรับ general use ไม่ใช่ IoT ที่มี sensor ส่งข้อมูลทุก 10 วินาที หากไม่ tune แล้วมี device เพิ่มขึ้นเรื่อยๆ Telegraf อาจ drop metrics หรือ InfluxDB รับไม่ทัน เหมือนร้านกาแฟที่ไม่เตรียม queue ไว้รับลูกค้าชั่วโมงเร่งด่วน
อัปเดต deployments/telegraf/telegraf.conf ส่วน [agent]:
[agent]
## ─── Timing ────────────────────────────────
## ความถี่ในการ collect metrics
interval = "10s"
## ปัดเวลาให้ตรงกับ interval (เช่น :00, :10, :20)
round_interval = true
## ─── Batching ───────────────────────────────
## จำนวน metrics ต่อ write request
metric_batch_size = 5000
## buffer size ก่อน drop (ป้องกัน back pressure)
metric_buffer_limit = 50000
## ─── Flushing ───────────────────────────────
## ความถี่ในการ flush buffer ไปยัง output
flush_interval = "10s"
## jitter เพื่อกระจาย load (0 = ไม่มี jitter)
flush_jitter = "3s"
## ─── Precision ──────────────────────────────
## ความละเอียดของ timestamp (s=second, ms=millisecond, us=microsecond, ns=nanosecond)
## สำหรับ IoT sensor data ที่ sample ทุก 10s ใช้ "s" พอ
precision = "1s"
## ─── Concurrency ────────────────────────────
## จำนวน goroutine สำหรับ collection
## 0 = default (เท่ากับจำนวน input plugins)
## collection_jitter จะกระจาย collection time
collection_jitter = "2s"
## ─── Logging ────────────────────────────────
debug = false
quiet = false
## log level: error, warn, info, debug
logfile = ""
logfile_rotation_interval = "0d"
logfile_rotation_max_size = "100mb"
logfile_rotation_max_archives = 5
เรื่อง flush_jitter: ค่า 3s นี้สำคัญมาก! ถ้า Telegraf หลายตัวรัน parallel แล้ว flush พร้อมกันทุก 10s ทุกตัว InfluxDB จะโดน spike load กระชากทุก 10 วินาที การใส่ jitter ทำให้แต่ละ instance flush ในเวลาต่างกันเล็กน้อย เหมือนการจัดคิวรถในลานจอด ไม่ให้ทุกคันออกพร้อมกัน
Environment Variables สำหรับ Telegraf
สร้างหรืออัปเดต deployments/.env ให้มี:
# InfluxDB
INFLUXDB_URL=http://influxdb:8086
INFLUXDB_USER=telegraf
INFLUXDB_USER_PASSWORD=telegraf123_secure
# MQTT
MQTT_BROKER=tcp://mosquitto:1883
MQTT_USERNAME=iot_client
MQTT_PASSWORD=mqtt_secure_pass
Step 5: ทดสอบ Pipeline
ทดสอบด้วย mosquitto_pub
# ส่ง sensor data จำลอง
docker exec iot-mosquitto mosquitto_pub \
-h localhost \
-t "iot/sensors/device_001/data" \
-u iot_client -P mqtt_secure_pass \
-m '{
"device_id": "device_001",
"location": "floor_1",
"building": "A",
"timestamp": '"$(date +%s)"',
"readings": [
{"type": "temperature", "value": 26.8, "unit": "celsius"},
{"type": "humidity", "value": 65.2, "unit": "percent"}
]
}'
# ส่ง device status จำลอง
docker exec iot-mosquitto mosquitto_pub \
-h localhost \
-t "iot/devices/device_001/status" \
-u iot_client -P mqtt_secure_pass \
-m '{
"device_id": "device_001",
"location": "floor_1",
"status": "online",
"battery_level": 85,
"signal_strength": -62,
"firmware_version": "1.2.3",
"uptime_seconds": 3600,
"timestamp": '"$(date +%s)"'
}'
ตรวจสอบข้อมูลใน InfluxDB
# ดู measurements ที่มีอยู่
curl -G "http://localhost:8086/query" \
-u admin:admin123 \
--data-urlencode "db=iot_data" \
--data-urlencode "q=SHOW MEASUREMENTS"
# Query sensor_data
curl -G "http://localhost:8086/query" \
-u admin:admin123 \
--data-urlencode "db=iot_data" \
--data-urlencode "q=SELECT * FROM sensor_data WHERE time > now() - 5m ORDER BY time DESC LIMIT 20"
# Query device_status
curl -G "http://localhost:8086/query" \
-u admin:admin123 \
--data-urlencode "db=iot_data" \
--data-urlencode "q=SELECT * FROM device_status WHERE time > now() - 5m"
# ดู tag values ของ device_id
curl -G "http://localhost:8086/query" \
-u admin:admin123 \
--data-urlencode "db=iot_data" \
--data-urlencode "q=SHOW TAG VALUES FROM sensor_data WITH KEY = device_id"
ดู Telegraf Metrics (self-monitoring)
# ดู Telegraf logs
docker logs iot-telegraf -f --tail 50
# ดู internal metrics ของ Telegraf ใน InfluxDB
curl -G "http://localhost:8086/query" \
-u admin:admin123 \
--data-urlencode "db=iot_data" \
--data-urlencode "q=SELECT * FROM internal_agent WHERE time > now() - 5m"
Step 6: Script ทดสอบแบบ Bulk
อยากรู้ว่าระบบรับ load จริงได้ไหม? มาสร้าง simulator กัน!
สร้าง scripts/simulate_sensors.sh:
#!/bin/bash
# Simulate IoT sensor data
# Usage: ./scripts/simulate_sensors.sh [num_devices] [interval_seconds]
NUM_DEVICES=${1:-5}
INTERVAL=${2:-10}
MQTT_HOST="localhost"
MQTT_USER="iot_client"
MQTT_PASS="mqtt_secure_pass"
LOCATIONS=("floor_1" "floor_2" "floor_3" "basement")
BUILDINGS=("A" "B" "C")
echo "Starting simulation: ${NUM_DEVICES} devices, every ${INTERVAL}s"
echo "Press Ctrl+C to stop"
while true; do
for i in $(seq 1 $NUM_DEVICES); do
DEVICE_ID="device_$(printf '%03d' $i)"
LOC=${LOCATIONS[$((RANDOM % ${#LOCATIONS[@]}))]}
BUILDING=${BUILDINGS[$((RANDOM % ${#BUILDINGS[@]}))]}
# Random sensor values
TEMP=$(echo "scale=1; 20 + $((RANDOM % 200)) / 10" | bc)
HUMID=$(echo "scale=1; 40 + $((RANDOM % 400)) / 10" | bc)
CO2=$((350 + RANDOM % 650))
BATTERY=$((60 + RANDOM % 40))
SIGNAL=$((-90 + RANDOM % 40))
# Publish sensor data
mosquitto_pub \
-h "$MQTT_HOST" \
-t "iot/sensors/${DEVICE_ID}/data" \
-u "$MQTT_USER" -P "$MQTT_PASS" \
-m "{
\"device_id\": \"${DEVICE_ID}\",
\"location\": \"${LOC}\",
\"building\": \"${BUILDING}\",
\"timestamp\": $(date +%s),
\"readings\": [
{\"type\": \"temperature\", \"value\": ${TEMP}, \"unit\": \"celsius\"},
{\"type\": \"humidity\", \"value\": ${HUMID}, \"unit\": \"percent\"},
{\"type\": \"co2\", \"value\": ${CO2}, \"unit\": \"ppm\"}
]
}" &
# Publish device status (ทุก 5 iterations)
if (( i % 5 == 0 )); then
mosquitto_pub \
-h "$MQTT_HOST" \
-t "iot/devices/${DEVICE_ID}/status" \
-u "$MQTT_USER" -P "$MQTT_PASS" \
-m "{
\"device_id\": \"${DEVICE_ID}\",
\"location\": \"${LOC}\",
\"status\": \"online\",
\"battery_level\": ${BATTERY},
\"signal_strength\": ${SIGNAL},
\"firmware_version\": \"1.2.3\",
\"uptime_seconds\": $((RANDOM % 86400)),
\"timestamp\": $(date +%s)
}" &
fi
done
wait # รอ background jobs ทั้งหมด
echo "[$(date '+%H:%M:%S')] Published data for ${NUM_DEVICES} devices"
sleep "$INTERVAL"
done
chmod +x scripts/simulate_sensors.sh
./scripts/simulate_sensors.sh 10 5 # 10 devices, ทุก 5 วินาที
Recap: สรุปสิ่งที่เราทำวันนี้
[MQTT]──►[Consumer]──►[Parser]──►[Processor]──►[InfluxDB]
╔═══════════════════════════════════════════════════╗
║ วันนี้เราประกอบสายพานทั้งหมดครบแล้ว! \(^o^)/ ║
╚═══════════════════════════════════════════════════╝
| Component | รายละเอียด |
|---|---|
| MQTT Consumer | รับข้อมูลจาก 3 topic patterns (sensor/status/alert) |
| JSON v2 Parser | parse payload และแยก tags/fields |
| Processor: rename | เปลี่ยนชื่อ measurement และ fields |
| Processor: converter | แปลง field types ให้ถูกต้อง |
| Processor: regex | transform tag values |
| Processor: starlark | calculated fields (temp_zone) |
| Processor: dedup | กรอง duplicate records ภายใน 5 นาที |
| Output routing | แยก measurement ไปยัง retention policy ที่เหมาะสม |
| Performance tuning | batch size 5000, buffer 50000, flush 10s |
Key Takeaway: Telegraf ทำให้เราไม่ต้องเขียน data pipeline code เองเลย ทุกอย่างเป็น config ทำให้ maintainable มากขึ้น และทีมที่ไม่ได้เป็น developer ก็สามารถเข้ามาช่วยดูแลได้
Next Step
ตอนนี้ข้อมูลไหลเข้า InfluxDB ได้แล้ว ขั้นต่อไปคือการตั้ง alerting อัตโนมัติด้วย Kapacitor เมื่อค่าเซนเซอร์เกิน threshold!
- ก่อนหน้า: IoT Workshop #10: TICK Stack Setup
- ถัดไป: IoT Workshop #12: Kapacitor Alerting