IoT Workshop #8: MQTT เชื่อม Device กับ Server

IoT Workshop #8: MQTT เชื่อม Device กับ Server

Showkhun · Workshop ·

Branch: workshop/dev-05-mqtt-broker Phase: Development — Messaging Layer Repository: https://github.com/kangana1024/iot-workshop


สวัสดีน้องๆ ทุกคน! วันนี้เราจะมาคุยเรื่องที่เป็น “หัวใจ” ของ IoT เลย นั่นก็คือ MQTT โปรโตคอลที่ทำให้ device เล็กๆ อย่าง sensor คุยกับ server ได้แบบเบาและเร็ว ถ้าพร้อมแล้ว… มาลุยกัน! (ง •̀_•́)ง


ก่อนเริ่ม: ทำไมต้องเป็น MQTT?

ลองนึกภาพนี้ — น้องๆ เคยใช้กลุ่ม LINE ไหม? ถ้าใช้ แสดงว่าเข้าใจ Pub/Sub แล้ว!

  • Publisher คือคนที่โพสต์ข้อความในกลุ่ม (device ส่งข้อมูล sensor)
  • Topic คือชื่อกลุ่ม LINE (เช่น “กลุ่มข้อมูลอุณหภูมิ”)
  • Subscriber คือคนที่อยู่ในกลุ่มและรับข้อความ (server ของเรา)
  • Broker คือเซิร์ฟเวอร์ LINE ที่กระจายข้อความ (Eclipse Mosquitto)

ข้อดีของ MQTT คือมันออกแบบมาสำหรับ เครือข่ายที่ไม่เสถียร และ device ที่กินไฟน้อย เหมาะกับ IoT มากกว่า HTTP ธรรมดา เพราะ HTTP ต้องเปิด-ปิด connection ทุกครั้ง แต่ MQTT เปิด connection ค้างไว้แล้วแชร์กันใช้

HTTP:  [Device] → Request → [Server] → Response → [Device]  (ทุกครั้งที่จะส่ง)
MQTT:  [Device] ──────────── Broker ──────────── [Server]   (connection เดียว ส่งได้เรื่อยๆ)

สิ่งที่น้องๆ จะได้เรียนรู้วันนี้

  • WHY ต้อง MQTT และ Pub/Sub Pattern คืออะไร
  • ออกแบบ Topic Structure ที่ดีสำหรับ IoT Platform
  • เชื่อมต่อ MQTT Broker ด้วย Paho Go Client
  • Subscribe รับ Telemetry Data จาก Devices
  • Publish Commands กลับไปยัง Devices
  • ใช้ QoS Levels ให้ถูกจุดประสงค์
  • Auto-Reconnection เพื่อ High Availability
  • Message Handler Pipeline

MQTT Pub/Sub Flow — ภาพรวมระบบ

ก่อนลงโค้ด เราดูภาพรวมก่อนว่าข้อมูลไหลยังไงนะ

Mermaid Diagram

เห็นมั้ยว่า Broker เปรียบเหมือน “ตู้ไปรษณีย์กลาง” ไม่มีใครต้องรู้จักกันโดยตรง Device แค่รู้จัก Broker และ Server แค่รู้จัก Broker เท่านั้น แยกส่วนกันสวยมาก


MQTT Topic Structure

ทำไมต้องออกแบบ Topic ให้ดี?

Topic Structure ที่ดีคือเหมือน โครงสร้างโฟลเดอร์ ในคอม ถ้าตั้งชื่อไฟล์แบบสะเปะสะปะตั้งแต่แรก วันนึงคุณจะหาไฟล์ไม่เจอ Topic ก็เหมือนกัน ถ้าออกแบบไว้ดี จะ query/filter ง่ายมาก

เราใช้รูปแบบ:

iot/{tenant}/{device_id}/{direction}/{type}

ตัวอย่าง:
iot/workshop/sensor-temp-001/up/telemetry    ← Device → Server (ข้อมูล sensor)
iot/workshop/sensor-temp-001/up/status       ← Device → Server (สถานะ device)
iot/workshop/sensor-temp-001/up/event        ← Device → Server (event พิเศษ)
iot/workshop/sensor-temp-001/down/command    ← Server → Device (คำสั่ง)
iot/workshop/sensor-temp-001/down/config     ← Server → Device (config update)

Wildcards:
iot/workshop/+/up/telemetry    ← Subscribe ทุก device (+ = wildcard 1 level)
iot/workshop/#                 ← Subscribe ทุกอย่างใน workshop tenant (# = wildcard หลาย level)
  • tenant ไว้แยกกลุ่มลูกค้า (multi-tenant architecture)
  • up = device ส่งขึ้น server, down = server ส่งลง device
  • type ไว้แยกว่าเป็น telemetry / status / command

internal/mqtt/topics.go

package mqtt

import "fmt"

const (
	// TenantID is the namespace for this deployment
	TenantID = "workshop"

	// Direction: up = device to server, down = server to device
	DirectionUp   = "up"
	DirectionDown = "down"

	// Message types
	TypeTelemetry = "telemetry"
	TypeStatus    = "status"
	TypeEvent     = "event"
	TypeCommand   = "command"
	TypeConfig    = "config"
	TypeAck       = "ack"
)

// Topics provides helpers for building MQTT topic strings
type Topics struct {
	tenant string
}

// NewTopics creates a Topics helper with the given tenant
func NewTopics(tenant string) *Topics {
	return &Topics{tenant: tenant}
}

// Telemetry returns the topic for device telemetry (device → server)
func (t *Topics) Telemetry(deviceID string) string {
	return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionUp, TypeTelemetry)
}

// Status returns the topic for device status messages
func (t *Topics) Status(deviceID string) string {
	return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionUp, TypeStatus)
}

// Event returns the topic for device event messages
func (t *Topics) Event(deviceID string) string {
	return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionUp, TypeEvent)
}

// Command returns the topic to send commands to a device
func (t *Topics) Command(deviceID string) string {
	return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionDown, TypeCommand)
}

// Config returns the topic to push config updates to a device
func (t *Topics) Config(deviceID string) string {
	return fmt.Sprintf("iot/%s/%s/%s/%s", t.tenant, deviceID, DirectionDown, TypeConfig)
}

// AllTelemetry returns a wildcard topic to subscribe to all device telemetry
func (t *Topics) AllTelemetry() string {
	return fmt.Sprintf("iot/%s/+/%s/%s", t.tenant, DirectionUp, TypeTelemetry)
}

// AllStatus returns a wildcard topic for all device status messages
func (t *Topics) AllStatus() string {
	return fmt.Sprintf("iot/%s/+/%s/%s", t.tenant, DirectionUp, TypeStatus)
}

// AllUp returns a wildcard that catches all uplink messages from all devices
func (t *Topics) AllUp() string {
	return fmt.Sprintf("iot/%s/+/%s/#", t.tenant, DirectionUp)
}

// ParseDeviceID extracts the device_id from a topic string
// Topic format: iot/{tenant}/{device_id}/{direction}/{type}
func ParseDeviceID(topic string) string {
	parts := splitTopic(topic)
	if len(parts) >= 3 {
		return parts[2]
	}
	return ""
}

// ParseTopicType extracts the message type from a topic string
func ParseTopicType(topic string) string {
	parts := splitTopic(topic)
	if len(parts) >= 5 {
		return parts[4]
	}
	return ""
}

func splitTopic(topic string) []string {
	var parts []string
	start := 0
	for i := 0; i < len(topic); i++ {
		if topic[i] == '/' {
			parts = append(parts, topic[start:i])
			start = i + 1
		}
	}
	parts = append(parts, topic[start:])
	return parts
}

MQTT Client — หัวใจของการเชื่อมต่อ

ทำไมต้อง Auto-Reconnect?

เรามักลืมว่า เครือข่าย IoT ไม่เสถียร device อาจอยู่กลางแจ้ง Wi-Fi อาจตัดชั่วคราว ถ้า app crash เพราะ connection หลุด ก็จบเลย

คิดง่ายๆ แบบนี้: Auto-Reconnect คือ การกดปุ่ม retry อัตโนมัติ แทนที่เราจะต้องนั่งเฝ้ากด Broker แค่บอกให้ client ลองใหม่ทุก 30 วินาทีจนกว่าจะได้

internal/mqtt/client.go

package mqtt

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"time"

	paho "github.com/eclipse/paho.mqtt.golang"
	"go.uber.org/zap"

	"github.com/kangana1024/iot-workshop/backend/internal/config"
)

// QoS levels — ระดับความมั่นใจในการส่งข้อความ
const (
	QoSAtMostOnce  byte = 0 // Fire and forget
	QoSAtLeastOnce byte = 1 // Guaranteed delivery, may duplicate
	QoSExactlyOnce byte = 2 // Guaranteed exactly once (most expensive)
)

// MessageHandler is a callback function for received messages
type MessageHandler func(ctx context.Context, topic string, payload []byte)

// Client wraps Paho MQTT client with reconnection and subscription management
type Client struct {
	paho     paho.Client
	cfg      config.MQTTConfig
	log      *zap.Logger
	topics   *Topics
	handlers map[string]MessageHandler
	mu       sync.RWMutex
	ctx      context.Context
	cancel   context.CancelFunc
}

// NewClient creates a new MQTT client and connects to the broker
func NewClient(cfg config.MQTTConfig, log *zap.Logger) (*Client, error) {
	ctx, cancel := context.WithCancel(context.Background())

	c := &Client{
		cfg:      cfg,
		log:      log,
		topics:   NewTopics(TenantID),
		handlers: make(map[string]MessageHandler),
		ctx:      ctx,
		cancel:   cancel,
	}

	opts := c.buildClientOptions()
	c.paho = paho.NewClient(opts)

	// Connect with timeout
	connectCtx, connectCancel := context.WithTimeout(ctx, cfg.ConnectTimeout)
	defer connectCancel()

	token := c.paho.Connect()
	select {
	case <-token.Done():
		if token.Error() != nil {
			cancel()
			return nil, fmt.Errorf("failed to connect to MQTT broker %s: %w", cfg.Broker, token.Error())
		}
	case <-connectCtx.Done():
		cancel()
		return nil, fmt.Errorf("connection to MQTT broker timed out after %s", cfg.ConnectTimeout)
	}

	log.Info("Connected to MQTT broker",
		zap.String("broker", cfg.Broker),
		zap.String("client_id", cfg.ClientID),
	)

	return c, nil
}

// buildClientOptions constructs the Paho client configuration
func (c *Client) buildClientOptions() *paho.ClientOptions {
	opts := paho.NewClientOptions()

	opts.AddBroker(c.cfg.Broker)
	opts.SetClientID(c.cfg.ClientID)
	opts.SetKeepAlive(c.cfg.KeepAlive)
	opts.SetConnectTimeout(c.cfg.ConnectTimeout)
	opts.SetAutoReconnect(true)
	opts.SetMaxReconnectInterval(30 * time.Second)
	opts.SetCleanSession(false) // Persist subscriptions across reconnects

	if c.cfg.Username != "" {
		opts.SetUsername(c.cfg.Username)
		opts.SetPassword(c.cfg.Password)
	}

	// Will message — published automatically if client disconnects ungracefully
	willPayload, _ := json.Marshal(map[string]interface{}{
		"status":    "offline",
		"timestamp": time.Now().UTC(),
	})
	opts.SetWill(
		fmt.Sprintf("iot/%s/server/status", TenantID),
		string(willPayload),
		QoSAtLeastOnce,
		true, // Retained
	)

	// Connection callbacks
	opts.SetOnConnectHandler(func(client paho.Client) {
		c.log.Info("MQTT connected — re-subscribing to topics")
		c.resubscribeAll()
	})

	opts.SetConnectionLostHandler(func(client paho.Client, err error) {
		c.log.Warn("MQTT connection lost, will auto-reconnect",
			zap.Error(err),
		)
	})

	opts.SetReconnectingHandler(func(client paho.Client, opts *paho.ClientOptions) {
		c.log.Info("MQTT reconnecting...")
	})

	// Default message handler for unmatched topics
	opts.SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) {
		c.log.Debug("MQTT unhandled message",
			zap.String("topic", msg.Topic()),
			zap.Int("payload_size", len(msg.Payload())),
		)
	})

	return opts
}

// Subscribe registers a handler for a topic pattern
func (c *Client) Subscribe(topic string, qos byte, handler MessageHandler) error {
	c.mu.Lock()
	c.handlers[topic] = handler
	c.mu.Unlock()

	token := c.paho.Subscribe(topic, qos, func(client paho.Client, msg paho.Message) {
		c.mu.RLock()
		h, ok := c.handlers[topic]
		c.mu.RUnlock()

		if ok {
			go h(c.ctx, msg.Topic(), msg.Payload())
		}
	})

	if token.Wait() && token.Error() != nil {
		return fmt.Errorf("failed to subscribe to %s: %w", topic, token.Error())
	}

	c.log.Info("Subscribed to MQTT topic",
		zap.String("topic", topic),
		zap.Uint8("qos", qos),
	)
	return nil
}

// Publish sends a message to a topic
func (c *Client) Publish(topic string, qos byte, retained bool, payload interface{}) error {
	var data []byte
	var err error

	switch v := payload.(type) {
	case []byte:
		data = v
	case string:
		data = []byte(v)
	default:
		data, err = json.Marshal(payload)
		if err != nil {
			return fmt.Errorf("failed to marshal payload: %w", err)
		}
	}

	token := c.paho.Publish(topic, qos, retained, data)
	if token.Wait() && token.Error() != nil {
		return fmt.Errorf("failed to publish to %s: %w", topic, token.Error())
	}

	c.log.Debug("Published MQTT message",
		zap.String("topic", topic),
		zap.Int("payload_size", len(data)),
		zap.Uint8("qos", qos),
	)
	return nil
}

// Unsubscribe removes a topic subscription
func (c *Client) Unsubscribe(topics ...string) error {
	c.mu.Lock()
	for _, t := range topics {
		delete(c.handlers, t)
	}
	c.mu.Unlock()

	token := c.paho.Unsubscribe(topics...)
	if token.Wait() && token.Error() != nil {
		return fmt.Errorf("failed to unsubscribe: %w", token.Error())
	}
	return nil
}

// Disconnect gracefully disconnects from the broker
func (c *Client) Disconnect(timeout time.Duration) {
	c.cancel()
	c.paho.Disconnect(uint(timeout.Milliseconds()))
	c.log.Info("MQTT client disconnected")
}

// IsConnected returns true if the client is currently connected
func (c *Client) IsConnected() bool {
	return c.paho.IsConnected()
}

// resubscribeAll re-subscribes to all registered topics (called after reconnect)
func (c *Client) resubscribeAll() {
	c.mu.RLock()
	defer c.mu.RUnlock()

	for topic, handler := range c.handlers {
		h := handler // capture loop variable
		t := topic
		token := c.paho.Subscribe(t, QoSAtLeastOnce, func(client paho.Client, msg paho.Message) {
			go h(c.ctx, msg.Topic(), msg.Payload())
		})
		if token.Wait() && token.Error() != nil {
			c.log.Error("Failed to re-subscribe after reconnect",
				zap.String("topic", t),
				zap.Error(token.Error()),
			)
		}
	}
}

เรื่อง QoS — เลือกให้ถูกสถานการณ์

น้องๆ อาจสงสัยว่าจะใช้ QoS ระดับไหนดี เราขอเปรียบง่ายๆ แบบนี้:

QoS 0 (At Most Once)  — เหมือนโยนกระดาษบันทึกข้ามห้อง
                         ถ้าไม่ถึงก็ช่างมัน เหมาะกับ telemetry ที่ส่งบ่อยมาก

QoS 1 (At Least Once) — เหมือนส่ง LINE แล้วรอ "tick เขียว"
                         อาจได้รับซ้ำบ้าง แต่ถึงแน่นอน เหมาะกับ status/command

QoS 2 (Exactly Once)  — เหมือนส่ง EMS พร้อมลงชื่อรับ
                         ช้าที่สุดแต่แม่นที่สุด เหมาะกับ critical commands

Message Handler Service — ตัวจัดการข้อความ

ทำไม Pipeline ถึงสำคัญ?

เวลา device ส่งข้อมูลมา เราไม่อยากให้โค้ดรับ MQTT ทำทุกอย่างเอง (parse, validate, save to DB) เพราะมัน tight coupling มาก

แทนที่จะเขียนแบบ “ทำทุกอย่างในฟังก์ชันเดียว” เราแยกหน้าที่ออกมาเป็น Pipeline:

รับ Message → Parse JSON → ตรวจสอบ DeviceID → ส่งต่อ Usecase → บันทึก DB

เหมือนสายการผลิตในโรงงาน แต่ละสถานีทำหน้าที่ของตัวเอง ไม่ยุ่งกับของคนอื่น

internal/mqtt/service.go

package mqtt

import (
	"context"
	"encoding/json"
	"time"

	"go.uber.org/zap"

	"github.com/kangana1024/iot-workshop/backend/internal/domain"
	"github.com/kangana1024/iot-workshop/backend/internal/usecase"
)

// TelemetryMessage is the expected payload format for device telemetry
type TelemetryMessage struct {
	Timestamp *time.Time             `json:"timestamp"`
	Fields    map[string]interface{} `json:"fields"`
	Tags      map[string]string      `json:"tags,omitempty"`
}

// StatusMessage is the device status update payload
type StatusMessage struct {
	Status    string    `json:"status"`
	Timestamp time.Time `json:"timestamp"`
	Message   string    `json:"message,omitempty"`
}

// CommandMessage is the payload for commands sent to devices
type CommandMessage struct {
	CommandID string                 `json:"command_id"`
	Command   string                 `json:"command"`
	Payload   map[string]interface{} `json:"payload,omitempty"`
	IssuedAt  time.Time              `json:"issued_at"`
	ExpiresAt *time.Time             `json:"expires_at,omitempty"`
}

// MQTTService orchestrates MQTT subscriptions and message processing
type MQTTService struct {
	client        *Client
	topics        *Topics
	sensorUsecase *usecase.SensorUsecase
	deviceUsecase *usecase.DeviceUsecase
	log           *zap.Logger
}

// NewMQTTService creates a new MQTTService
func NewMQTTService(
	client *Client,
	sensorUsecase *usecase.SensorUsecase,
	deviceUsecase *usecase.DeviceUsecase,
	log *zap.Logger,
) *MQTTService {
	return &MQTTService{
		client:        client,
		topics:        NewTopics(TenantID),
		sensorUsecase: sensorUsecase,
		deviceUsecase: deviceUsecase,
		log:           log,
	}
}

// Start registers all subscriptions and begins processing messages
func (s *MQTTService) Start() error {
	s.log.Info("Starting MQTT service, subscribing to topics...")

	// Subscribe to all device telemetry
	if err := s.client.Subscribe(
		s.topics.AllTelemetry(),
		QoSAtLeastOnce,
		s.handleTelemetry,
	); err != nil {
		return err
	}

	// Subscribe to all device status updates
	if err := s.client.Subscribe(
		s.topics.AllStatus(),
		QoSAtLeastOnce,
		s.handleStatus,
	); err != nil {
		return err
	}

	// Publish server online status (retained)
	if err := s.client.Publish(
		fmt.Sprintf("iot/%s/server/status", TenantID),
		QoSAtLeastOnce,
		true, // Retained — new subscribers see this immediately
		map[string]interface{}{
			"status":    "online",
			"timestamp": time.Now().UTC(),
		},
	); err != nil {
		s.log.Warn("Failed to publish server status", zap.Error(err))
	}

	s.log.Info("MQTT service started")
	return nil
}

// handleTelemetry processes telemetry messages from devices
func (s *MQTTService) handleTelemetry(ctx context.Context, topic string, payload []byte) {
	deviceID := ParseDeviceID(topic)
	if deviceID == "" {
		s.log.Warn("Could not parse device_id from telemetry topic", zap.String("topic", topic))
		return
	}

	s.log.Debug("Received telemetry",
		zap.String("device_id", deviceID),
		zap.Int("payload_size", len(payload)),
	)

	// Parse payload
	var msg TelemetryMessage
	if err := json.Unmarshal(payload, &msg); err != nil {
		s.log.Warn("Failed to parse telemetry payload",
			zap.String("device_id", deviceID),
			zap.Error(err),
			zap.ByteString("payload", payload),
		)
		return
	}

	// Construct SensorData and ingest
	data := domain.SensorData{
		DeviceID:  deviceID,
		Timestamp: msg.Timestamp,
		Fields:    msg.Fields,
		Tags:      msg.Tags,
	}

	result, err := s.sensorUsecase.Ingest(ctx, data)
	if err != nil {
		s.log.Error("Failed to ingest MQTT telemetry",
			zap.String("device_id", deviceID),
			zap.Error(err),
		)
		return
	}

	if result.Rejected > 0 {
		s.log.Warn("MQTT telemetry partially rejected",
			zap.String("device_id", deviceID),
			zap.Strings("errors", result.Errors),
		)
	}
}

// handleStatus processes device status update messages
func (s *MQTTService) handleStatus(ctx context.Context, topic string, payload []byte) {
	deviceID := ParseDeviceID(topic)
	if deviceID == "" {
		return
	}

	var msg StatusMessage
	if err := json.Unmarshal(payload, &msg); err != nil {
		s.log.Warn("Failed to parse status payload",
			zap.String("device_id", deviceID),
			zap.Error(err),
		)
		return
	}

	s.log.Info("Device status update",
		zap.String("device_id", deviceID),
		zap.String("status", msg.Status),
	)

	var newStatus domain.DeviceStatus
	switch msg.Status {
	case "online":
		newStatus = domain.DeviceStatusOnline
	case "offline":
		newStatus = domain.DeviceStatusOffline
	default:
		s.log.Warn("Unknown device status", zap.String("status", msg.Status))
		return
	}

	if err := s.deviceUsecase.UpdateDeviceStatus(ctx, deviceID, newStatus); err != nil {
		s.log.Error("Failed to update device status from MQTT",
			zap.String("device_id", deviceID),
			zap.Error(err),
		)
	}
}

// SendCommand publishes a command to a specific device
func (s *MQTTService) SendCommand(ctx context.Context, deviceID, command string, payload map[string]interface{}) error {
	cmdMsg := CommandMessage{
		CommandID: generateCommandID(),
		Command:   command,
		Payload:   payload,
		IssuedAt:  time.Now().UTC(),
	}

	topic := s.topics.Command(deviceID)
	if err := s.client.Publish(topic, QoSAtLeastOnce, false, cmdMsg); err != nil {
		return fmt.Errorf("failed to send command to device %s: %w", deviceID, err)
	}

	s.log.Info("Command sent to device",
		zap.String("device_id", deviceID),
		zap.String("command", command),
		zap.String("command_id", cmdMsg.CommandID),
	)

	return nil
}

// SendConfig pushes a configuration update to a device (retained)
func (s *MQTTService) SendConfig(ctx context.Context, deviceID string, config map[string]interface{}) error {
	topic := s.topics.Config(deviceID)
	return s.client.Publish(topic, QoSAtLeastOnce, true, config) // Retained!
}

func generateCommandID() string {
	return fmt.Sprintf("cmd-%d", time.Now().UnixNano())
}

Mosquitto Configuration

ทำไม Mosquitto ถึงเหมาะกับ Dev?

Mosquitto เป็น MQTT Broker ที่เบา ติดตั้งง่าย และฟรี เหมาะมากสำหรับ development และ production scale กลาง-เล็ก ถ้าอยากได้ scale ใหญ่ขึ้นค่อยย้ายไป EMQX หรือ HiveMQ ทีหลัง

configs/mosquitto/mosquitto.conf

# Mosquitto configuration for IoT Workshop

listener 1883
protocol mqtt

listener 9001
protocol websockets

# Allow anonymous connections in development
# In production: allow_anonymous false + password_file
allow_anonymous true

# Persistence
persistence true
persistence_location /mosquitto/data/

# Logging
log_dest stdout
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true

# Connection settings
max_connections 10000
max_keepalive 300

# Message size limit: 1MB
message_size_limit 1048576

# Queue limit for offline clients
max_queued_messages 1000
queue_qos0_messages false

หมายเหตุ: allow_anonymous true ใช้ได้ใน dev เท่านั้น! Production ต้องปิดและใส่ password_file ด้วย ไม่งั้นใครก็ต่อได้เลย (°ロ°) !


ทดสอบด้วย MQTT Client

ก่อนรัน backend จริง เราลองทดสอบผ่าน command line ก่อนได้เลย เหมือนใช้ curl แต่เป็นสำหรับ MQTT:

# ติดตั้ง mosquitto-clients
apt install mosquitto-clients  # Linux
brew install mosquitto          # macOS

# Subscribe ดูข้อมูลจาก device ทั้งหมด
mosquitto_sub -h localhost -p 1883 \
  -t "iot/workshop/+/up/#" \
  -v

# ส่ง telemetry จาก device simulator
mosquitto_pub -h localhost -p 1883 \
  -t "iot/workshop/sensor-temp-001/up/telemetry" \
  -m '{
    "timestamp": "2026-03-26T10:00:00Z",
    "fields": {
      "temperature": 29.3,
      "humidity": 72.1
    }
  }'

# ส่ง status update
mosquitto_pub -h localhost -p 1883 \
  -t "iot/workshop/sensor-temp-001/up/status" \
  -m '{"status": "online", "timestamp": "2026-03-26T10:00:00Z"}'

# ส่ง command ไปยัง device (ผ่าน API)
curl -X POST http://localhost:8080/api/v1/devices/65f1234567890abcdef12345/commands \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"command": "reboot", "payload": {"delay_seconds": 5}}'

Device Simulator ใน Go — ทดสอบโดยไม่ต้องมีฮาร์ดแวร์จริง

นี่คือสิ่งที่เราชอบมาก บางทีซื้อ sensor จริงยังไม่มา หรือทดสอบระบบโดยไม่อยากพึ่งฮาร์ดแวร์ — ก็ simulate เองได้เลย!

  .--.
 |o_o |    "เราคือ sensor ปลอม
 |:_/ |     แต่ข้อมูลที่ส่งไปนั้นจริง!"
//   \ \
(|     | )
/'\_   _/`\
\___)=(___/
// cmd/simulator/main.go

package main

import (
	"encoding/json"
	"fmt"
	"math/rand"
	"time"

	paho "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	opts := paho.NewClientOptions().
		AddBroker("tcp://localhost:1883").
		SetClientID("device-simulator-001")

	client := paho.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	defer client.Disconnect(250)

	// Announce online
	publishStatus(client, "sensor-temp-001", "online")

	// Send telemetry every 5 seconds
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	for range ticker.C {
		temp := 25.0 + rand.Float64()*10.0 - 5.0  // 20-35°C
		humidity := 60.0 + rand.Float64()*30.0 - 15.0 // 45-90%

		payload := map[string]interface{}{
			"timestamp": time.Now().UTC().Format(time.RFC3339),
			"fields": map[string]interface{}{
				"temperature": round(temp, 1),
				"humidity":    round(humidity, 1),
				"battery":     80 + rand.Intn(20),
			},
		}

		data, _ := json.Marshal(payload)
		topic := "iot/workshop/sensor-temp-001/up/telemetry"
		client.Publish(topic, 1, false, data)
		fmt.Printf("[%s] Sent: temp=%.1f°C humidity=%.1f%%\n",
			time.Now().Format("15:04:05"), temp, humidity)
	}
}

func publishStatus(client paho.Client, deviceID, status string) {
	payload := map[string]interface{}{
		"status":    status,
		"timestamp": time.Now().UTC(),
	}
	data, _ := json.Marshal(payload)
	topic := fmt.Sprintf("iot/workshop/%s/up/status", deviceID)
	client.Publish(topic, 1, false, data)
}

func round(val float64, places int) float64 {
	pow := math.Pow(10, float64(places))
	return math.Round(val*pow) / pow
}
# รัน simulator
go run ./cmd/simulator/main.go

สรุปที่เราทำมาวันนี้

เยี่ยมมากน้องๆ! วันนี้เราลุยมาเยอะเลย มาทบทวนกัน:

  • MQTT Pub/Sub คือ “กลุ่ม LINE” สำหรับ IoT — Publisher ส่ง, Broker กระจาย, Subscriber รับ
  • Topic Structure ที่ดี (iot/{tenant}/{device}/{direction}/{type}) ทำให้ filter และ scale ง่าย
  • Paho Go Client มี Auto-Reconnect ในตัว ตั้งค่า SetAutoReconnect(true) แล้วลืมได้เลย
  • QoS ต้องเลือกให้เหมาะ: QoS 0 สำหรับข้อมูลซ้ำได้, QoS 1 สำหรับ command/status
  • Will Message เป็น “ข้อความสั่งเสีย” — ถ้า disconnect ผิดปกติ Broker จะส่งให้อัตโนมัติ
  • Device Simulator ช่วยทดสอบโดยไม่ต้องมีฮาร์ดแวร์จริง ชีวิตดีขึ้นมาก

ขั้นตอนถัดไป

ตอนนี้ device ส่งข้อมูลมาที่ backend ได้แล้ว ขั้นต่อไปคือทำให้ Dashboard ของ User เห็นข้อมูล real-time โดยไม่ต้อง refresh หน้าเลย — นั่นคือเรื่องของ WebSocket!