ติดตั้ง InfluxDB 2.7 + Telegraf เป็นคลังข้อมูล IoT
Branch:
step-07-influx-setup(โครงรวมจบที่step-09-alerting) Phase: Development (7/9) — Time-series Storage Repo: kangana1024/showkhun-workshop
ถ้าเรามองว่า sensor คือคนที่เก็บข้อมูล แล้วถามว่า — ข้อมูลพวกนั้นไปนอนอยู่ที่ไหน? ใครจัดเก็บ? ใครลบของเก่าทิ้งให้?
คำตอบคือ InfluxDB ครับ — time-series database ที่เป็น “คลังเก็บของ” ของระบบ IoT เรา วันนี้เราจะติดตั้ง InfluxDB 2.7 คู่กับ Telegraf ด้วย Docker Compose พร้อม config ที่ใช้ในโปรเจกต์จริง มาลุยกันเลย! (ง •̀_•́)ง
หมายเหตุก่อนเริ่ม: ตอนแรกๆ ที่วาง draft เราเคยคิดจะใช้ “TICK Stack” (InfluxDB 1.8 + Telegraf + Chronograf + Kapacitor) แต่พอลงมือทำจริง โปรเจกต์นี้เลือก InfluxDB 2.7 + Flux เป็น query language และเขียน alerting engine เป็น Go เอง (จะเล่าใน ตอนที่ 9) — Chronograf/Kapacitor เลยไม่ได้ใช้ บทความนี้จึงอ้างอิงตามโค้ดจริงทั้งหมด
น้องๆ จะได้อะไรจาก workshop นี้
- เข้าใจว่า ทำไม IoT ต้องใช้ time-series database แทน relational ธรรมดา
- รู้ความต่างของ InfluxDB 2.x (Org / Bucket / Token / Flux) กับ 1.x แบบเก่า
- ติดตั้ง InfluxDB 2.7 + Telegraf ด้วย Docker Compose ที่ pin version ครบ
- เซ็ตอัป Org, Bucket, Retention และ bucket สำหรับ downsampled แยกต่างหาก
- เข้าใจว่า backend (Go) กับ Telegraf เขียนลงคนละ measurement ได้ยังไงโดยไม่ตีกัน
- รัน + ทดสอบ connectivity ทั้งระบบ
ก่อนอื่น — ทำไมต้อง Time-series Database?
ลองนึกภาพว่าอาคารมีเซ็นเซอร์วัดอุณหภูมิ 500 ตัว ส่งข้อมูลทุก 5 วินาที
ถ้าเราเก็บลง MySQL แบบปกติ… ตารางจะพองขึ้น หลายล้านแถวต่อวัน query ช้า index พัง ชีวิตพัง
InfluxDB ถูกออกแบบมาเพื่อเรื่องนี้โดยเฉพาะ — มันรู้ว่าข้อมูลมี “เวลา” เป็นแกนหลัก เก็บได้เร็ว query ได้เร็ว และที่เด็ดสุดคือ ลบข้อมูลเก่าให้อัตโนมัติ ตาม retention ที่เราตั้งไว้ ไม่ต้องมานั่งเขียน cron ลบเอง
อุปมา: InfluxDB เหมือนตู้เก็บเอกสารอัจฉริยะ ที่รู้เองว่า “เอกสารเก่าเกิน 30 วัน ฉีกทิ้งได้” โดยไม่ต้องรอเลขาฯ มาสั่ง
InfluxDB 2.x ต่างจาก 1.x ยังไง?
อันนี้สำคัญมาก เพราะถ้าเปิด tutorial เก่าๆ ในเน็ต ส่วนใหญ่ยังเป็น 1.x ซึ่ง concept คนละเรื่องกันเลย:
| แนวคิด | InfluxDB 1.x (เก่า) | InfluxDB 2.x (ที่เราใช้) |
|---|---|---|
| หน่วยเก็บข้อมูล | database + retention policy |
bucket (รวม retention มาในตัว) |
| การยืนยันตัวตน | username / password | API token |
| การจัดกลุ่ม | ไม่มี | org (organization) |
| ภาษา query | InfluxQL (คล้าย SQL) | Flux (เป็น pipeline |>) |
ในโปรเจกต์นี้ทุกอย่างวิ่งบน 2.x หมด: เราคุยกับ InfluxDB ด้วย token, เขียน/อ่านเข้า bucket ภายใต้ org ชื่อ showkhun และ query ด้วย Flux (ตอนหน้า จะเจอ Flux เต็มๆ)
ภาพรวม Architecture
ก่อนลงมือ เราต้องรู้ว่าข้อมูลไหลยังไง:
graph LR
A[🌡️ Sensors] -->|publish| B[📡 Mosquitto MQTT]
B -->|subscribe| C[🤖 Telegraf]
B -->|subscribe| D[🖥️ Go Backend]
C -->|telegraf_sensor_data| E[(💾 InfluxDB 2.7)]
D -->|sensor_data| E
D -->|Flux query| F[📊 REST API]
สังเกตว่าข้อมูลจาก MQTT มี 2 ทางวิ่งเข้า InfluxDB ขนานกัน — ทั้ง Telegraf และ Go backend ต่าง subscribe topic เดียวกัน แต่เขียนคนละ measurement (เดี๋ยวเราจะเคลียร์เรื่องนี้กัน) ส่วนการ “อ่าน” ออกไปให้ frontend เป็นหน้าที่ของ Go backend ที่ยิง Flux query
Step 1: เลือก Image แล้ว pin version ให้ครบ
ทำไม ต้อง pin version ถึง patch? เพราะ workshop ต้อง reproduce ได้เป๊ะทุกครั้ง ถ้าปล่อย :latest วันนี้ลงได้ พรุ่งนี้ image อัปเดตแล้วพังก็ปวดหัว เหมือนสั่งกาแฟสูตรเดิมแต่บาริสต้าเปลี่ยนทุกวัน
นี่คือ version จริงที่โปรเจกต์ pin ไว้ (จาก infra/docker-compose.yml):
mongodb : mongo:8.0.16 # device registry, users, alert rules/history
mosquitto : eclipse-mosquitto:2.0.22 # MQTT broker
influxdb : influxdb:2.7.12 # time-series storage
telegraf : telegraf:1.39.0 # MQTT -> InfluxDB ingestion agent
ในตอนนี้เราโฟกัสที่ influxdb กับ telegraf เป็นหลัก (mongodb/mosquitto ตั้งไว้ตั้งแต่ตอนก่อนๆ แล้ว)
Step 2: Docker Compose สำหรับ InfluxDB 2.7
ทำไม ถึงใช้ Docker Compose? เพราะเราต้องการให้ทุก container อยู่ใน network เดียวกัน คุยกันด้วย hostname (influxdb, mosquitto) โดยไม่ต้องจำ IP — เหมือนคนในบ้านเดียวกันตะโกนเรียกชื่อกันได้เลย
จุดที่ต่างจาก InfluxDB 1.x แบบชัดเจนคือ DOCKER_INFLUXDB_INIT_* — มันคือ “setup mode” ที่ตอน container บูตครั้งแรกจะสร้าง org, bucket, user, และ admin token ให้เสร็จในก้าวเดียว ไม่ต้องเข้าไป config มือทีหลัง
# infra/docker-compose.yml (เฉพาะ 2 service ที่เกี่ยวกับตอนนี้)
name: showkhun-iot
services:
influxdb:
image: influxdb:2.7.12
container_name: showkhun-influxdb
restart: unless-stopped
ports:
- "${INFLUX_PORT:-8086}:8086"
environment:
# setup mode = บูตครั้งแรกสร้าง org/bucket/token ให้เลย
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUX_USERNAME:-admin}
DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUX_PASSWORD:-adminpassword}
DOCKER_INFLUXDB_INIT_ORG: ${INFLUX_ORG:-showkhun}
DOCKER_INFLUXDB_INIT_BUCKET: ${INFLUX_BUCKET:-iot_workshop}
DOCKER_INFLUXDB_INIT_RETENTION: ${INFLUX_RETENTION:-30d}
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUX_TOKEN:-dev-influx-token-change-me}
volumes:
- influx_data:/var/lib/influxdb2 # ข้อมูลจริง (2.x ใช้ /var/lib/influxdb2)
- influx_config:/etc/influxdb2 # config + token
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 10s
timeout: 5s
retries: 5
start_period: 15s
telegraf:
image: telegraf:1.39.0
container_name: showkhun-telegraf
restart: unless-stopped
depends_on:
influxdb:
condition: service_healthy # รอ InfluxDB พร้อมก่อนค่อยเริ่ม
mosquitto:
condition: service_healthy
environment:
# ส่ง secret เข้าทาง env ไม่ hard-code ในไฟล์ config
INFLUX_TOKEN: ${INFLUX_TOKEN:-dev-influx-token-change-me}
INFLUX_ORG: ${INFLUX_ORG:-showkhun}
INFLUX_BUCKET: ${INFLUX_BUCKET:-iot_workshop}
MQTT_BROKER_URL: tcp://mosquitto:1883
volumes:
- ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
volumes:
influx_data:
influx_config:
สังเกต depends_on: ... condition: service_healthy — Telegraf จะรอจน InfluxDB และ Mosquitto ผ่าน healthcheck ก่อนค่อยสตาร์ท ไม่งั้น connect ไม่ติดตั้งแต่แรก (ˊ•͈ ꇴ •͈ˋ)
Step 3: ตั้งค่า .env (Org / Bucket / Token)
ทำไม ต้องแยก secret ออกมาไว้ใน .env? เพราะ token คือกุญแจบ้าน ถ้าเผลอ commit ขึ้น git ก็เหมือนแปะกุญแจไว้หน้าประตู โปรเจกต์เลยให้ copy จาก .env.example มาเป็น .env แล้วแก้ค่าจริงเอง (.env อยู่ใน .gitignore)
สร้าง infra/.env จาก template:
cd infra
cp .env.example .env
ค่าใน .env.example (ส่วน InfluxDB) หน้าตาแบบนี้ — ใช้ค่า default สำหรับ local ได้เลย แต่ production ต้องเปลี่ยน INFLUX_TOKEN เป็น token จริงที่แข็งแรง:
# --- InfluxDB 2.x ---
INFLUX_PORT=8086
INFLUX_USERNAME=admin
INFLUX_PASSWORD=adminpassword
INFLUX_ORG=showkhun
INFLUX_BUCKET=iot_workshop
INFLUX_RETENTION=30d
# Generate a strong token for anything beyond local development.
INFLUX_TOKEN=dev-influx-token-change-me
สำคัญ:
INFLUX_TOKENตัวนี้ต้องตรงกันทั้ง 3 ที่: container ของ InfluxDB (เป็น admin token), Telegraf (เอาไว้เขียน), และ Go backend (เอาไว้ทั้งเขียนและอ่าน) — ถ้าไม่ตรง จะเขียนไม่เข้าหรืออ่านไม่ออก
Step 4: Retention และ Bucket แบบ Downsampled
นี่คือส่วนที่หลายคนพลาด เราคุยเรื่อง ทำไม ก่อนเสมอ
ทำไมต้องมี Retention?
ใน InfluxDB 2.x retention ผูกมากับ bucket เลย ไม่ได้แยกเป็น “retention policy” เหมือน 1.x — bucket หนึ่งใบมี retention หนึ่งค่า ในโปรเจกต์เราตั้ง raw bucket ไว้ 30 วัน เก็บข้อมูลละเอียด พอเลย 30 วัน InfluxDB ลบให้เอง
ทำไมต้องมี bucket “downsampled” แยก?
raw data ละเอียดมากแต่ก็กินที่มาก เราเลยอยากเก็บ “ข้อมูลสรุป” (เช่น ค่าเฉลี่ยรายชั่วโมง) ไว้นานกว่า โดยไม่ต้องแบกข้อมูลดิบ — โปรเจกต์เลยมี bucket ที่ 2 ชื่อ iot_workshop_downsampled เก็บนานถึง 1 ปี
อุปมา: raw bucket คือสมุดจดทุกนาที (เก็บ 1 เดือนพอ), downsampled bucket คือสรุปรายเดือนใส่ใน Excel (เก็บไว้เป็นปีก็ไม่หนัก)
ฝั่ง Go backend มี routine ชื่อ EnsureBuckets ที่ตอนสตาร์ทจะ สร้าง bucket ให้อัตโนมัติถ้ายังไม่มี และปรับ retention ให้ตรงถ้าเพี้ยน — เป็น idempotent เรียกกี่รอบก็ปลอดภัย โค้ดจริงจาก backend/internal/database/influx_setup.go:
// EnsureBuckets สร้าง bucket ที่ระบบต้องใช้ถ้ายังไม่มี และจัด retention ให้ตรง
// เรียกได้ทุกครั้งตอน startup (idempotent)
func (i *Influx) EnsureBuckets(ctx context.Context, cfg config.InfluxConfig) error {
specs := []BucketSpec{
{Name: cfg.Bucket, Retention: cfg.RawRetention}, // raw 30 วัน
}
// ถ้าตั้ง downsample bucket ไว้และไม่ซ้ำกับ raw → เพิ่มเข้าไปด้วย
if cfg.DownsampleBucket != "" && cfg.DownsampleBucket != cfg.Bucket {
specs = append(specs, BucketSpec{
Name: cfg.DownsampleBucket,
Retention: cfg.DownsampleRetention, // 1 ปี
})
}
orgAPI := i.client.OrganizationsAPI()
org, err := orgAPI.FindOrganizationByName(ctx, i.org)
if err != nil {
return fmt.Errorf("ensure buckets: find org %q: %w", i.org, err)
}
for _, spec := range specs {
if err := i.ensureBucket(ctx, org, spec); err != nil {
return err
}
}
return nil
}
ค่า bucket/retention เหล่านี้มาจาก config ของ backend (backend/internal/config/config.go) ซึ่ง default ตรงกับ docker-compose:
INFLUX_ORG = showkhun
INFLUX_BUCKET = iot_workshop # raw
INFLUX_DOWNSAMPLE_BUCKET = iot_workshop_downsampled # สรุป
INFLUX_RAW_RETENTION = 720h (= 30 วัน)
INFLUX_DOWNSAMPLE_RETENTION = 8760h (= 365 วัน)
INFLUX_MEASUREMENT = sensor_data
INFLUX_ENSURE_BUCKETS = true # ให้ backend สร้าง/จัด bucket ให้ตอนบูต
เกร็ด: duration ในนี้ใช้หน่วยชั่วโมง (
720h) เพราะ Go duration ไม่มีหน่วย “วัน” (d) — ก็เลยต้องคูณเอา 30×24 = 720 นั่นแหละครับ
Step 5: เข้าใจ “สองทางเขียน” — Telegraf vs Go Backend
นี่คือจุดที่ workshop นี้จงใจออกแบบให้เห็น 2 วิธี ingestion ข้างกัน เพื่อเทียบ:
graph TD
M[📡 devices/+/telemetry] --> T[🤖 Telegraf]
M --> G[🖥️ Go Backend]
T -->|measurement: telegraf_sensor_data| B[(iot_workshop bucket)]
G -->|measurement: sensor_data| B
ทั้งสองตัว subscribe topic เดียวกัน (devices/+/telemetry) และเขียนลง bucket เดียวกัน (iot_workshop) — แต่เขียนคนละ measurement ถ้าเขียน measurement เดียวกัน ทุก reading จะถูกเก็บซ้ำสองรอบทันที! โปรเจกต์เลยให้:
- Telegraf เขียน measurement
telegraf_sensor_data(pure config, ไม่มี custom code) - Go backend เขียน measurement
sensor_data(validate กับ device registry, เติม tag, fan-out ออก WebSocket)
ใน production จริงเลือกใช้ทางเดียวพอ — แต่ระหว่าง workshop รันทั้งคู่เพื่อเทียบได้เลย เพราะมันเป็น “ของเสริม” ไม่ใช่ “ของซ้ำ”
อุปมา: เหมือนมีพนักงาน 2 คนรับออเดอร์โต๊ะเดียวกัน แต่จดลงคนละสมุด — ออเดอร์ไม่ตีกัน เพราะแยกสมุดชัดเจน
Step 6: รันระบบขึ้นมา
cd infra
# สตาร์ททุก service (mongodb, mosquitto, influxdb, telegraf)
docker compose up -d
# ดูสถานะ — รอจน influxdb เป็น (healthy) ก่อน
docker compose ps
# ดู log ของ InfluxDB และ Telegraf
docker compose logs -f influxdb
docker compose logs -f telegraf
Step 7: ทดสอบ Connectivity
ถ้าทุกอย่างขึ้นครบแล้ว ลองเช็คทีละตัวครับ:
# 1) InfluxDB ตอบ ping ไหม (2.x ใช้คำสั่ง `influx ping` ในตัว container)
docker exec showkhun-influxdb influx ping
# ควรได้: OK
# 2) เช็คว่า bucket ถูกสร้างครบ (ต้องเห็น iot_workshop)
docker exec showkhun-influxdb influx bucket list \
--org showkhun \
--token dev-influx-token-change-me
# 3) ลอง query ด้วย Flux ผ่าน HTTP API ตรงๆ
curl -s --request POST http://localhost:8086/api/v2/query?org=showkhun \
--header "Authorization: Token dev-influx-token-change-me" \
--header "Accept: application/csv" \
--header "Content-type: application/vnd.flux" \
--data 'from(bucket:"iot_workshop") |> range(start:-1h) |> limit(n:5)'
ฝั่ง Go backend ก็เช็คสุขภาพ InfluxDB ผ่าน health endpoint ของมันเองได้ (โค้ด Ping อยู่ใน backend/internal/database/influx.go):
// Ping เช็คว่า InfluxDB เข้าถึงได้ — ใช้เป็น health check ของ backend
func (i *Influx) Ping(ctx context.Context) error {
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, i.timeout)
defer cancel()
}
ok, err := i.client.Ping(ctx)
if err != nil {
return fmt.Errorf("ping influx: %w", err)
}
if !ok {
return fmt.Errorf("ping influx: server reported not ready")
}
return nil
}
# health check ของ backend (รวมเช็ค influxdb อยู่ในนั้น)
curl -s http://localhost:8080/healthz | jq
สรุปสิ่งที่ทำใน Workshop นี้
_________________________________________
| Time-series storage is ready! |
| InfluxDB 2.7.12 [OK] |
| Telegraf 1.39.0 [OK] |
| org: showkhun [OK] |
| bucket: iot_workshop (30d) [OK] |
| bucket: ...downsampled (1y) [OK] |
|_________________________________________|
\(^o^)/
น้องๆ ทำอะไรไปบ้างในตอนนี้:
| สิ่งที่ทำ | รายละเอียด |
|---|---|
| Docker Compose | deploy InfluxDB 2.7.12 + Telegraf 1.39.0 พร้อม healthcheck |
| Setup mode | บูตครั้งแรกสร้าง org showkhun, bucket iot_workshop, admin token ให้เลย |
| Retention | raw 30 วัน + bucket downsampled 1 ปี (จัดให้โดย EnsureBuckets) |
| สองทางเขียน | Telegraf (telegraf_sensor_data) + Go backend (sensor_data) ไม่ตีกัน |
| Connectivity | influx ping, Flux query ผ่าน HTTP API, backend health check |
สิ่งที่เราสร้างวันนี้คือ “คลังข้อมูล” ของระบบ IoT ทั้งหมด ข้อมูลจะไหลจาก sensor ผ่าน MQTT แล้วลงไปนอนใน InfluxDB รอให้เราดึงออกมาทำ dashboard และ alert ต่อไป
ขั้นตอนต่อไป
ตอนหน้าเราจะเจาะ Telegraf Pipeline แบบ json_v2 ตัวจริงในโปรเจกต์ — ตั้งแต่ subscribe MQTT, แกะ payload, ปั้น tag/field, จนเขียนลง influxdb_v2 พร้อมเขียน Flux query ดึงข้อมูลออกมาทาง REST API ด้วย อย่าพลาดนะครับ! (ง •̀_•́)ง