IoT Workshop #8: MQTT เชื่อม Device กับ Server
Branch:
workshop/dev-05-mqtt-brokerPhase: Development — Messaging Layer Repository: https://github.com/kangana1024/iot-workshop
สวัสดีน้องๆ ทุกคน! วันนี้เราจะมาคุยเรื่องที่เป็น “หัวใจ” ของ IoT เลย นั่นก็คือ MQTT โปรโตคอลที่ทำให้ device เล็กๆ อย่าง sensor คุยกับ server ได้แบบเบาและเร็ว ถ้าพร้อมแล้ว… มาลุยกัน! (ง •̀_•́)ง
ก่อนเริ่ม: ทำไมต้องเป็น MQTT?
ลองนึกภาพนี้ — น้องๆ เคยใช้กลุ่ม LINE ไหม? ถ้าใช้ แสดงว่าเข้าใจ Pub/Sub แล้ว!
- Publisher คือคนที่โพสต์ข้อความในกลุ่ม (device ส่งข้อมูล sensor)
- Topic คือชื่อกลุ่ม LINE (เช่น “กลุ่มข้อมูลอุณหภูมิ”)
- Subscriber คือคนที่อยู่ในกลุ่มและรับข้อความ (server ของเรา)
- Broker คือเซิร์ฟเวอร์ LINE ที่กระจายข้อความ (Eclipse Mosquitto)
ข้อดีของ MQTT คือมันออกแบบมาสำหรับ เครือข่ายที่ไม่เสถียร และ device ที่กินไฟน้อย เหมาะกับ IoT มากกว่า HTTP ธรรมดา เพราะ HTTP ต้องเปิด-ปิด connection ทุกครั้ง แต่ MQTT เปิด connection ค้างไว้แล้วแชร์กันใช้
HTTP: [Device] → Request → [Server] → Response → [Device] (ทุกครั้งที่จะส่ง)
MQTT: [Device] ──────────── Broker ──────────── [Server] (connection เดียว ส่งได้เรื่อยๆ)
สิ่งที่น้องๆ จะได้เรียนรู้วันนี้
- WHY ต้อง MQTT และ Pub/Sub Pattern คืออะไร
- ออกแบบ Topic Structure ที่ดีสำหรับ IoT Platform
- เชื่อมต่อ MQTT Broker ด้วย Paho Go Client
- Subscribe รับ Telemetry Data จาก Devices
- Publish Commands กลับไปยัง Devices
- ใช้ QoS Levels ให้ถูกจุดประสงค์
- Auto-Reconnection เพื่อ High Availability
- Message Handler Pipeline
MQTT Pub/Sub Flow — ภาพรวมระบบ
ก่อนลงโค้ด เราดูภาพรวมก่อนว่าข้อมูลไหลยังไงนะ
เห็นมั้ยว่า Broker เปรียบเหมือน “ตู้ไปรษณีย์กลาง” ไม่มีใครต้องรู้จักกันโดยตรง Device แค่รู้จัก Broker และ Server แค่รู้จัก Broker เท่านั้น แยกส่วนกันสวยมาก
MQTT Topic Structure
ทำไมต้องออกแบบ Topic ให้ดี?
Topic Structure ที่ดีคือเหมือน โครงสร้างโฟลเดอร์ ในคอม ถ้าตั้งชื่อไฟล์แบบสะเปะสะปะตั้งแต่แรก วันนึงคุณจะหาไฟล์ไม่เจอ Topic ก็เหมือนกัน ถ้าออกแบบไว้ดี จะ query/filter ง่ายมาก
เราใช้รูปแบบ:
iot/{tenant}/{device_id}/{direction}/{type}
ตัวอย่าง:
iot/workshop/sensor-temp-001/up/telemetry ← Device → Server (ข้อมูล sensor)
iot/workshop/sensor-temp-001/up/status ← Device → Server (สถานะ device)
iot/workshop/sensor-temp-001/up/event ← Device → Server (event พิเศษ)
iot/workshop/sensor-temp-001/down/command ← Server → Device (คำสั่ง)
iot/workshop/sensor-temp-001/down/config ← Server → Device (config update)
Wildcards:
iot/workshop/+/up/telemetry ← Subscribe ทุก device (+ = wildcard 1 level)
iot/workshop/# ← Subscribe ทุกอย่างใน workshop tenant (# = wildcard หลาย level)
- tenant ไว้แยกกลุ่มลูกค้า (multi-tenant architecture)
- up = device ส่งขึ้น server, down = server ส่งลง device
- type ไว้แยกว่าเป็น telemetry / status / command
internal/mqtt/topics.go
package mqtt
import "fmt"
const (
// TenantID is the namespace for this deployment
TenantID = "workshop"
// Direction: up = device to server, down = server to device
DirectionUp = "up"
DirectionDown = "down"
// Message types
TypeTelemetry = "telemetry"
TypeStatus = "status"
TypeEvent = "event"
TypeCommand = "command"
TypeConfig = "config"
TypeAck = "ack"
)
// Topics provides helpers for building MQTT topic strings
type Topics struct {
tenant string
}
// NewTopics creates a Topics helper with the given tenant
func NewTopics(tenant string) *Topics {
return &Topics{tenant: tenant}
}
// Telemetry returns the topic for device telemetry (device → server)
func (t *Topics) Telemetry(deviceID string) string {
return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionUp, TypeTelemetry)
}
// Status returns the topic for device status messages
func (t *Topics) Status(deviceID string) string {
return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionUp, TypeStatus)
}
// Event returns the topic for device event messages
func (t *Topics) Event(deviceID string) string {
return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionUp, TypeEvent)
}
// Command returns the topic to send commands to a device
func (t *Topics) Command(deviceID string) string {
return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionDown, TypeCommand)
}
// Config returns the topic to push config updates to a device
func (t *Topics) Config(deviceID string) string {
return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionDown, TypeConfig)
}
// AllTelemetry returns a wildcard topic to subscribe to all device telemetry
func (t *Topics) AllTelemetry() string {
return fmt.Sprintf("iot/%s/+/%s/%s", t.tenant, DirectionUp, TypeTelemetry)
}
// AllStatus returns a wildcard topic for all device status messages
func (t *Topics) AllStatus() string {
return fmt.Sprintf("iot/%s/+/%s/%s", t.tenant, DirectionUp, TypeStatus)
}
// AllUp returns a wildcard that catches all uplink messages from all devices
func (t *Topics) AllUp() string {
return fmt.Sprintf("iot/%s/+/%s/#", t.tenant, DirectionUp)
}
// ParseDeviceID extracts the device_id from a topic string
// Topic format: iot/{tenant}/{device_id}/{direction}/{type}
func ParseDeviceID(topic string) string {
parts := splitTopic(topic)
if len(parts) >= 3 {
return parts[2]
}
return ""
}
// ParseTopicType extracts the message type from a topic string
func ParseTopicType(topic string) string {
parts := splitTopic(topic)
if len(parts) >= 5 {
return parts[4]
}
return ""
}
func splitTopic(topic string) []string {
var parts []string
start := 0
for i := 0; i < len(topic); i++ {
if topic[i] == '/' {
parts = append(parts, topic[start:i])
start = i + 1
}
}
parts = append(parts, topic[start:])
return parts
}
MQTT Client — หัวใจของการเชื่อมต่อ
ทำไมต้อง Auto-Reconnect?
เรามักลืมว่า เครือข่าย IoT ไม่เสถียร device อาจอยู่กลางแจ้ง Wi-Fi อาจตัดชั่วคราว ถ้า app crash เพราะ connection หลุด ก็จบเลย
คิดง่ายๆ แบบนี้: Auto-Reconnect คือ การกดปุ่ม retry อัตโนมัติ แทนที่เราจะต้องนั่งเฝ้ากด Broker แค่บอกให้ client ลองใหม่ทุก 30 วินาทีจนกว่าจะได้
internal/mqtt/client.go
package mqtt
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
"github.com/kangana1024/iot-workshop/backend/internal/config"
)
// QoS levels — ระดับความมั่นใจในการส่งข้อความ
const (
QoSAtMostOnce byte = 0 // Fire and forget
QoSAtLeastOnce byte = 1 // Guaranteed delivery, may duplicate
QoSExactlyOnce byte = 2 // Guaranteed exactly once (most expensive)
)
// MessageHandler is a callback function for received messages
type MessageHandler func(ctx context.Context, topic string, payload []byte)
// Client wraps Paho MQTT client with reconnection and subscription management
type Client struct {
paho paho.Client
cfg config.MQTTConfig
log *zap.Logger
topics *Topics
handlers map[string]MessageHandler
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
// NewClient creates a new MQTT client and connects to the broker
func NewClient(cfg config.MQTTConfig, log *zap.Logger) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
cfg: cfg,
log: log,
topics: NewTopics(TenantID),
handlers: make(map[string]MessageHandler),
ctx: ctx,
cancel: cancel,
}
opts := c.buildClientOptions()
c.paho = paho.NewClient(opts)
// Connect with timeout
connectCtx, connectCancel := context.WithTimeout(ctx, cfg.ConnectTimeout)
defer connectCancel()
token := c.paho.Connect()
select {
case <-token.Done():
if token.Error() != nil {
cancel()
return nil, fmt.Errorf("failed to connect to MQTT broker %s: %w", cfg.Broker, token.Error())
}
case <-connectCtx.Done():
cancel()
return nil, fmt.Errorf("connection to MQTT broker timed out after %s", cfg.ConnectTimeout)
}
log.Info("Connected to MQTT broker",
zap.String("broker", cfg.Broker),
zap.String("client_id", cfg.ClientID),
)
return c, nil
}
// buildClientOptions constructs the Paho client configuration
func (c *Client) buildClientOptions() *paho.ClientOptions {
opts := paho.NewClientOptions()
opts.AddBroker(c.cfg.Broker)
opts.SetClientID(c.cfg.ClientID)
opts.SetKeepAlive(c.cfg.KeepAlive)
opts.SetConnectTimeout(c.cfg.ConnectTimeout)
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(30 * time.Second)
opts.SetCleanSession(false) // Persist subscriptions across reconnects
if c.cfg.Username != "" {
opts.SetUsername(c.cfg.Username)
opts.SetPassword(c.cfg.Password)
}
// Will message — published automatically if client disconnects ungracefully
willPayload, _ := json.Marshal(map[string]interface{}{
"status": "offline",
"timestamp": time.Now().UTC(),
})
opts.SetWill(
fmt.Sprintf("iot/%s/server/status", TenantID),
string(willPayload),
QoSAtLeastOnce,
true, // Retained
)
// Connection callbacks
opts.SetOnConnectHandler(func(client paho.Client) {
c.log.Info("MQTT connected — re-subscribing to topics")
c.resubscribeAll()
})
opts.SetConnectionLostHandler(func(client paho.Client, err error) {
c.log.Warn("MQTT connection lost, will auto-reconnect",
zap.Error(err),
)
})
opts.SetReconnectingHandler(func(client paho.Client, opts *paho.ClientOptions) {
c.log.Info("MQTT reconnecting...")
})
// Default message handler for unmatched topics
opts.SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) {
c.log.Debug("MQTT unhandled message",
zap.String("topic", msg.Topic()),
zap.Int("payload_size", len(msg.Payload())),
)
})
return opts
}
// Subscribe registers a handler for a topic pattern
func (c *Client) Subscribe(topic string, qos byte, handler MessageHandler) error {
c.mu.Lock()
c.handlers[topic] = handler
c.mu.Unlock()
token := c.paho.Subscribe(topic, qos, func(client paho.Client, msg paho.Message) {
c.mu.RLock()
h, ok := c.handlers[topic]
c.mu.RUnlock()
if ok {
go h(c.ctx, msg.Topic(), msg.Payload())
}
})
if token.Wait() && token.Error() != nil {
return fmt.Errorf("failed to subscribe to %s: %w", topic, token.Error())
}
c.log.Info("Subscribed to MQTT topic",
zap.String("topic", topic),
zap.Uint8("qos", qos),
)
return nil
}
// Publish sends a message to a topic
func (c *Client) Publish(topic string, qos byte, retained bool, payload interface{}) error {
var data []byte
var err error
switch v := payload.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
data, err = json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal payload: %w", err)
}
}
token := c.paho.Publish(topic, qos, retained, data)
if token.Wait() && token.Error() != nil {
return fmt.Errorf("failed to publish to %s: %w", topic, token.Error())
}
c.log.Debug("Published MQTT message",
zap.String("topic", topic),
zap.Int("payload_size", len(data)),
zap.Uint8("qos", qos),
)
return nil
}
// Unsubscribe removes a topic subscription
func (c *Client) Unsubscribe(topics ...string) error {
c.mu.Lock()
for _, t := range topics {
delete(c.handlers, t)
}
c.mu.Unlock()
token := c.paho.Unsubscribe(topics...)
if token.Wait() && token.Error() != nil {
return fmt.Errorf("failed to unsubscribe: %w", token.Error())
}
return nil
}
// Disconnect gracefully disconnects from the broker
func (c *Client) Disconnect(timeout time.Duration) {
c.cancel()
c.paho.Disconnect(uint(timeout.Milliseconds()))
c.log.Info("MQTT client disconnected")
}
// IsConnected returns true if the client is currently connected
func (c *Client) IsConnected() bool {
return c.paho.IsConnected()
}
// resubscribeAll re-subscribes to all registered topics (called after reconnect)
func (c *Client) resubscribeAll() {
c.mu.RLock()
defer c.mu.RUnlock()
for topic, handler := range c.handlers {
h := handler // capture loop variable
t := topic
token := c.paho.Subscribe(t, QoSAtLeastOnce, func(client paho.Client, msg paho.Message) {
go h(c.ctx, msg.Topic(), msg.Payload())
})
if token.Wait() && token.Error() != nil {
c.log.Error("Failed to re-subscribe after reconnect",
zap.String("topic", t),
zap.Error(token.Error()),
)
}
}
}
เรื่อง QoS — เลือกให้ถูกสถานการณ์
น้องๆ อาจสงสัยว่าจะใช้ QoS ระดับไหนดี เราขอเปรียบง่ายๆ แบบนี้:
QoS 0 (At Most Once) — เหมือนโยนกระดาษบันทึกข้ามห้อง
ถ้าไม่ถึงก็ช่างมัน เหมาะกับ telemetry ที่ส่งบ่อยมาก
QoS 1 (At Least Once) — เหมือนส่ง LINE แล้วรอ "tick เขียว"
อาจได้รับซ้ำบ้าง แต่ถึงแน่นอน เหมาะกับ status/command
QoS 2 (Exactly Once) — เหมือนส่ง EMS พร้อมลงชื่อรับ
ช้าที่สุดแต่แม่นที่สุด เหมาะกับ critical commands
Message Handler Service — ตัวจัดการข้อความ
ทำไม Pipeline ถึงสำคัญ?
เวลา device ส่งข้อมูลมา เราไม่อยากให้โค้ดรับ MQTT ทำทุกอย่างเอง (parse, validate, save to DB) เพราะมัน tight coupling มาก
แทนที่จะเขียนแบบ “ทำทุกอย่างในฟังก์ชันเดียว” เราแยกหน้าที่ออกมาเป็น Pipeline:
รับ Message → Parse JSON → ตรวจสอบ DeviceID → ส่งต่อ Usecase → บันทึก DB
เหมือนสายการผลิตในโรงงาน แต่ละสถานีทำหน้าที่ของตัวเอง ไม่ยุ่งกับของคนอื่น
internal/mqtt/service.go
package mqtt
import (
"context"
"encoding/json"
"time"
"go.uber.org/zap"
"github.com/kangana1024/iot-workshop/backend/internal/domain"
"github.com/kangana1024/iot-workshop/backend/internal/usecase"
)
// TelemetryMessage is the expected payload format for device telemetry
type TelemetryMessage struct {
Timestamp *time.Time `json:"timestamp"`
Fields map[string]interface{} `json:"fields"`
Tags map[string]string `json:"tags,omitempty"`
}
// StatusMessage is the device status update payload
type StatusMessage struct {
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
Message string `json:"message,omitempty"`
}
// CommandMessage is the payload for commands sent to devices
type CommandMessage struct {
CommandID string `json:"command_id"`
Command string `json:"command"`
Payload map[string]interface{} `json:"payload,omitempty"`
IssuedAt time.Time `json:"issued_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
// MQTTService orchestrates MQTT subscriptions and message processing
type MQTTService struct {
client *Client
topics *Topics
sensorUsecase *usecase.SensorUsecase
deviceUsecase *usecase.DeviceUsecase
log *zap.Logger
}
// NewMQTTService creates a new MQTTService
func NewMQTTService(
client *Client,
sensorUsecase *usecase.SensorUsecase,
deviceUsecase *usecase.DeviceUsecase,
log *zap.Logger,
) *MQTTService {
return &MQTTService{
client: client,
topics: NewTopics(TenantID),
sensorUsecase: sensorUsecase,
deviceUsecase: deviceUsecase,
log: log,
}
}
// Start registers all subscriptions and begins processing messages
func (s *MQTTService) Start() error {
s.log.Info("Starting MQTT service, subscribing to topics...")
// Subscribe to all device telemetry
if err := s.client.Subscribe(
s.topics.AllTelemetry(),
QoSAtLeastOnce,
s.handleTelemetry,
); err != nil {
return err
}
// Subscribe to all device status updates
if err := s.client.Subscribe(
s.topics.AllStatus(),
QoSAtLeastOnce,
s.handleStatus,
); err != nil {
return err
}
// Publish server online status (retained)
if err := s.client.Publish(
fmt.Sprintf("iot/%s/server/status", TenantID),
QoSAtLeastOnce,
true, // Retained — new subscribers see this immediately
map[string]interface{}{
"status": "online",
"timestamp": time.Now().UTC(),
},
); err != nil {
s.log.Warn("Failed to publish server status", zap.Error(err))
}
s.log.Info("MQTT service started")
return nil
}
// handleTelemetry processes telemetry messages from devices
func (s *MQTTService) handleTelemetry(ctx context.Context, topic string, payload []byte) {
deviceID := ParseDeviceID(topic)
if deviceID == "" {
s.log.Warn("Could not parse device_id from telemetry topic", zap.String("topic", topic))
return
}
s.log.Debug("Received telemetry",
zap.String("device_id", deviceID),
zap.Int("payload_size", len(payload)),
)
// Parse payload
var msg TelemetryMessage
if err := json.Unmarshal(payload, &msg); err != nil {
s.log.Warn("Failed to parse telemetry payload",
zap.String("device_id", deviceID),
zap.Error(err),
zap.ByteString("payload", payload),
)
return
}
// Construct SensorData and ingest
data := domain.SensorData{
DeviceID: deviceID,
Timestamp: msg.Timestamp,
Fields: msg.Fields,
Tags: msg.Tags,
}
result, err := s.sensorUsecase.Ingest(ctx, data)
if err != nil {
s.log.Error("Failed to ingest MQTT telemetry",
zap.String("device_id", deviceID),
zap.Error(err),
)
return
}
if result.Rejected > 0 {
s.log.Warn("MQTT telemetry partially rejected",
zap.String("device_id", deviceID),
zap.Strings("errors", result.Errors),
)
}
}
// handleStatus processes device status update messages
func (s *MQTTService) handleStatus(ctx context.Context, topic string, payload []byte) {
deviceID := ParseDeviceID(topic)
if deviceID == "" {
return
}
var msg StatusMessage
if err := json.Unmarshal(payload, &msg); err != nil {
s.log.Warn("Failed to parse status payload",
zap.String("device_id", deviceID),
zap.Error(err),
)
return
}
s.log.Info("Device status update",
zap.String("device_id", deviceID),
zap.String("status", msg.Status),
)
var newStatus domain.DeviceStatus
switch msg.Status {
case "online":
newStatus = domain.DeviceStatusOnline
case "offline":
newStatus = domain.DeviceStatusOffline
default:
s.log.Warn("Unknown device status", zap.String("status", msg.Status))
return
}
if err := s.deviceUsecase.UpdateDeviceStatus(ctx, deviceID, newStatus); err != nil {
s.log.Error("Failed to update device status from MQTT",
zap.String("device_id", deviceID),
zap.Error(err),
)
}
}
// SendCommand publishes a command to a specific device
func (s *MQTTService) SendCommand(ctx context.Context, deviceID, command string, payload map[string]interface{}) error {
cmdMsg := CommandMessage{
CommandID: generateCommandID(),
Command: command,
Payload: payload,
IssuedAt: time.Now().UTC(),
}
topic := s.topics.Command(deviceID)
if err := s.client.Publish(topic, QoSAtLeastOnce, false, cmdMsg); err != nil {
return fmt.Errorf("failed to send command to device %s: %w", deviceID, err)
}
s.log.Info("Command sent to device",
zap.String("device_id", deviceID),
zap.String("command", command),
zap.String("command_id", cmdMsg.CommandID),
)
return nil
}
// SendConfig pushes a configuration update to a device (retained)
func (s *MQTTService) SendConfig(ctx context.Context, deviceID string, config map[string]interface{}) error {
topic := s.topics.Config(deviceID)
return s.client.Publish(topic, QoSAtLeastOnce, true, config) // Retained!
}
func generateCommandID() string {
return fmt.Sprintf("cmd-%d", time.Now().UnixNano())
}
Mosquitto Configuration
ทำไม Mosquitto ถึงเหมาะกับ Dev?
Mosquitto เป็น MQTT Broker ที่เบา ติดตั้งง่าย และฟรี เหมาะมากสำหรับ development และ production scale กลาง-เล็ก ถ้าอยากได้ scale ใหญ่ขึ้นค่อยย้ายไป EMQX หรือ HiveMQ ทีหลัง
configs/mosquitto/mosquitto.conf
# Mosquitto configuration for IoT Workshop
listener 1883
protocol mqtt
listener 9001
protocol websockets
# Allow anonymous connections in development
# In production: allow_anonymous false + password_file
allow_anonymous true
# Persistence
persistence true
persistence_location /mosquitto/data/
# Logging
log_dest stdout
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
# Connection settings
max_connections 10000
max_keepalive 300
# Message size limit: 1MB
message_size_limit 1048576
# Queue limit for offline clients
max_queued_messages 1000
queue_qos0_messages false
หมายเหตุ:
allow_anonymous trueใช้ได้ใน dev เท่านั้น! Production ต้องปิดและใส่password_fileด้วย ไม่งั้นใครก็ต่อได้เลย (°ロ°) !
ทดสอบด้วย MQTT Client
ก่อนรัน backend จริง เราลองทดสอบผ่าน command line ก่อนได้เลย เหมือนใช้ curl แต่เป็นสำหรับ MQTT:
# ติดตั้ง mosquitto-clients
apt install mosquitto-clients # Linux
brew install mosquitto # macOS
# Subscribe ดูข้อมูลจาก device ทั้งหมด
mosquitto_sub -h localhost -p 1883 \
-t "iot/workshop/+/up/#" \
-v
# ส่ง telemetry จาก device simulator
mosquitto_pub -h localhost -p 1883 \
-t "iot/workshop/sensor-temp-001/up/telemetry" \
-m '{
"timestamp": "2026-03-26T10:00:00Z",
"fields": {
"temperature": 29.3,
"humidity": 72.1
}
}'
# ส่ง status update
mosquitto_pub -h localhost -p 1883 \
-t "iot/workshop/sensor-temp-001/up/status" \
-m '{"status": "online", "timestamp": "2026-03-26T10:00:00Z"}'
# ส่ง command ไปยัง device (ผ่าน API)
curl -X POST http://localhost:8080/api/v1/devices/65f1234567890abcdef12345/commands \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"command": "reboot", "payload": {"delay_seconds": 5}}'
Device Simulator ใน Go — ทดสอบโดยไม่ต้องมีฮาร์ดแวร์จริง
นี่คือสิ่งที่เราชอบมาก บางทีซื้อ sensor จริงยังไม่มา หรือทดสอบระบบโดยไม่อยากพึ่งฮาร์ดแวร์ — ก็ simulate เองได้เลย!
.--.
|o_o | "เราคือ sensor ปลอม
|:_/ | แต่ข้อมูลที่ส่งไปนั้นจริง!"
// \ \
(| | )
/'\_ _/`\
\___)=(___/
// cmd/simulator/main.go
package main
import (
"encoding/json"
"fmt"
"math/rand"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
)
func main() {
opts := paho.NewClientOptions().
AddBroker("tcp://localhost:1883").
SetClientID("device-simulator-001")
client := paho.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
defer client.Disconnect(250)
// Announce online
publishStatus(client, "sensor-temp-001", "online")
// Send telemetry every 5 seconds
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
temp := 25.0 + rand.Float64()*10.0 - 5.0 // 20-35°C
humidity := 60.0 + rand.Float64()*30.0 - 15.0 // 45-90%
payload := map[string]interface{}{
"timestamp": time.Now().UTC().Format(time.RFC3339),
"fields": map[string]interface{}{
"temperature": round(temp, 1),
"humidity": round(humidity, 1),
"battery": 80 + rand.Intn(20),
},
}
data, _ := json.Marshal(payload)
topic := "iot/workshop/sensor-temp-001/up/telemetry"
client.Publish(topic, 1, false, data)
fmt.Printf("[%s] Sent: temp=%.1f°C humidity=%.1f%%\n",
time.Now().Format("15:04:05"), temp, humidity)
}
}
func publishStatus(client paho.Client, deviceID, status string) {
payload := map[string]interface{}{
"status": status,
"timestamp": time.Now().UTC(),
}
data, _ := json.Marshal(payload)
topic := fmt.Sprintf("iot/workshop/%s/up/status", deviceID)
client.Publish(topic, 1, false, data)
}
func round(val float64, places int) float64 {
pow := math.Pow(10, float64(places))
return math.Round(val*pow) / pow
}
# รัน simulator
go run ./cmd/simulator/main.go
สรุปที่เราทำมาวันนี้
เยี่ยมมากน้องๆ! วันนี้เราลุยมาเยอะเลย มาทบทวนกัน:
- MQTT Pub/Sub คือ “กลุ่ม LINE” สำหรับ IoT — Publisher ส่ง, Broker กระจาย, Subscriber รับ
- Topic Structure ที่ดี (
iot/{tenant}/{device}/{direction}/{type}) ทำให้ filter และ scale ง่าย - Paho Go Client มี Auto-Reconnect ในตัว ตั้งค่า
SetAutoReconnect(true)แล้วลืมได้เลย - QoS ต้องเลือกให้เหมาะ: QoS 0 สำหรับข้อมูลซ้ำได้, QoS 1 สำหรับ command/status
- Will Message เป็น “ข้อความสั่งเสีย” — ถ้า disconnect ผิดปกติ Broker จะส่งให้อัตโนมัติ
- Device Simulator ช่วยทดสอบโดยไม่ต้องมีฮาร์ดแวร์จริง ชีวิตดีขึ้นมาก
ขั้นตอนถัดไป
ตอนนี้ device ส่งข้อมูลมาที่ backend ได้แล้ว ขั้นต่อไปคือทำให้ Dashboard ของ User เห็นข้อมูล real-time โดยไม่ต้อง refresh หน้าเลย — นั่นคือเรื่องของ WebSocket!