WebSocket Real-time ข้อมูล IoT แบบสดๆ

WebSocket Real-time ข้อมูล IoT แบบสดๆ

Showkhun · Workshop ·

Branch: workshop/dev-06-websocket Phase: Development — Real-time Layer Repository: https://github.com/kangana1024/iot-workshop


Hook: ทำไมต้องเป็น Real-time?

น้องๆ เคยเห็น Dashboard ที่ค่าอุณหภูมิมันอัปเดตแบบสดๆ โดยไม่ต้อง refresh ไหม? นั่นแหละ WebSocket! วันนี้เราจะมาทำให้ IoT Dashboard ของเราสดใสแบบนั้นกัน มาลุยกันเลยดีกว่า (ノ◕ヮ◕)ノ*:・゚✧


สิ่งที่น้องๆ จะได้เรียนรู้

  • ทำไม ต้อง WebSocket แทน Polling ธรรมดา
  • Hub Pattern คืออะไร และทำไมถึงต้องใช้
  • Room System — ให้ client subscribe เฉพาะ device ที่สนใจ
  • Fan-out ข้อมูล Sensor จาก MQTT ไปยัง Dashboard
  • จัดการ Connection Lifecycle และ Heartbeat อย่างถูกต้อง
  • ตัวอย่าง TypeScript Client ที่ใช้งานได้จริงใน React

WHY ก่อน HOW: ทำไมต้องเป็น WebSocket?

ปัญหาของ Polling แบบเดิม

ลองนึกภาพว่าน้องโทรหาเพื่อนทุกๆ 1 วินาทีเพื่อถามว่า “มีข่าวอะไรไหม?” — นั่นคือ HTTP Polling มันทำงานได้ แต่สิ้นเปลืองมาก!

HTTP Polling (แบบเดิม):
Client → "มีข้อมูลไหม?" → Server → "ยังไม่มี"  (ทุก 1 วิ)
Client → "มีข้อมูลไหม?" → Server → "ยังไม่มี"  (ทุก 1 วิ)
Client → "มีข้อมูลไหม?" → Server → "มีแล้ว!"   (ทุก 1 วิ)

วิธีที่ดีกว่า: WebSocket

WebSocket เหมือนกับการ โทรหากันแล้วไม่วาง ทั้งสองฝ่ายส่งข้อความหากันได้ตลอดโดยไม่ต้องต่อสายใหม่ทุกครั้ง

WebSocket (แบบใหม่):
Client ─── เชื่อมต่อครั้งเดียว ───────────────── Server
       ←←← ข้อมูลมาถึงเมื่อไหร่ก็ส่งทันที ←←←

ผลลัพธ์คือ: Latency ต่ำลง, CPU ลดลง, ประสบการณ์ผู้ใช้ดีขึ้นมาก


Architecture: ภาพรวมก่อนลงมือ

Mermaid Diagram

หรือถ้าอยากเห็น flow ของ Hub ภายในชัดๆ:

Mermaid Diagram


Hub Pattern คืออะไร? (อธิบายแบบภาษาคน)

ลองนึกถึง ห้องแชทในออฟฟิศ สักห้องนึง:

  • Hub = พนักงานต้อนรับ ที่รู้ว่าทุกคนอยู่ห้องไหน
  • Connection = คนทำงาน แต่ละคน
  • Room = ห้องประชุม (ห้อง sensor-001, ห้อง building-a, ห้อง global)
  • Broadcast = ประกาศเสียงตามสาย ไปยังห้องที่ตรงกัน

เมื่อมีข้อมูลจาก sensor มา Hub จะกระจายไปยังทุก Connection ที่อยู่ในห้องที่ตรงกัน — เรียกว่า Fan-out


WebSocket Message Types

internal/websocket/message.go

ก่อนเขียน code เราต้องออกแบบ “ภาษา” ที่ Client กับ Server จะคุยกัน เหมือนกับว่าเราตกลงกันว่า “ถ้าอยากสมัคร ส่ง subscribe มา ถ้าอยากออก ส่ง unsubscribe มา”

package websocket

import "time"

// MessageType defines the type of WebSocket message
type MessageType string

const (
	// Server → Client
	MsgTypeTelemetry    MessageType = "telemetry"     // Sensor data update
	MsgTypeDeviceStatus MessageType = "device_status" // Device online/offline
	MsgTypeAlert        MessageType = "alert"          // Alert triggered
	MsgTypeError        MessageType = "error"          // Error notification
	MsgTypePong         MessageType = "pong"           // Heartbeat response

	// Client → Server
	MsgTypeSubscribe   MessageType = "subscribe"   // Join a room
	MsgTypeUnsubscribe MessageType = "unsubscribe" // Leave a room
	MsgTypePing        MessageType = "ping"        // Heartbeat request
)

// Message is the envelope for all WebSocket communication
type Message struct {
	Type      MessageType `json:"type"`
	Room      string      `json:"room,omitempty"`
	Payload   interface{} `json:"payload,omitempty"`
	Timestamp time.Time   `json:"timestamp"`
	RequestID string      `json:"request_id,omitempty"`
}

// SubscribeRequest is the payload for subscribe/unsubscribe messages
type SubscribeRequest struct {
	Rooms []string `json:"rooms"` // e.g. ["device:sensor-001", "group:building-a"]
}

// TelemetryPayload is the data pushed to subscribed clients
type TelemetryPayload struct {
	DeviceID  string                 `json:"device_id"`
	Timestamp time.Time              `json:"timestamp"`
	Fields    map[string]interface{} `json:"fields"`
	Tags      map[string]string      `json:"tags,omitempty"`
}

// DeviceStatusPayload represents a device status change event
type DeviceStatusPayload struct {
	DeviceID  string    `json:"device_id"`
	Status    string    `json:"status"`
	Timestamp time.Time `json:"timestamp"`
}

// AlertPayload represents a triggered alert
type AlertPayload struct {
	AlertID   string    `json:"alert_id"`
	DeviceID  string    `json:"device_id"`
	RuleName  string    `json:"rule_name"`
	Severity  string    `json:"severity"`
	Field     string    `json:"field"`
	Value     float64   `json:"value"`
	Threshold float64   `json:"threshold"`
	Timestamp time.Time `json:"timestamp"`
}

// RoomPrefix constants
const (
	RoomPrefixDevice = "device:"
	RoomPrefixGroup  = "group:"
	RoomGlobal       = "global"
)

// DeviceRoom returns the room name for a specific device
func DeviceRoom(deviceID string) string {
	return RoomPrefixDevice + deviceID
}

// GroupRoom returns the room name for a device group
func GroupRoom(groupID string) string {
	return RoomPrefixGroup + groupID
}

เรื่องน่ารู้: เราใช้ prefix แยก namespace ของ Room เพื่อป้องกันชื่อชน เช่น device:sensor-001 vs group:building-a — เหมือนกับ path ใน URL เลย


Connection: จัดการ Client แต่ละคน

internal/websocket/connection.go

Connection คือ “ตัวแทน” ของ client แต่ละคนในระบบ มีหน้าที่หลักสองอย่าง:

  • readPump — คอยอ่านข้อความที่ client ส่งมา (subscribe, ping, ฯลฯ)
  • writePump — คอยส่งข้อมูลไปให้ client เมื่อมีข้อมูลใหม่
     ┌─────────────────────────────────────────┐
     │            Connection                   │
     │  readPump ─────────────► Hub.incoming   │
     │  writePump ◄──── send channel ◄── Hub   │
     │  pingTicker ────────────────────────►   │ (ส่ง ping ทุก 54 วิ)
     └─────────────────────────────────────────┘
package websocket

import (
	"encoding/json"
	"sync"
	"time"

	"github.com/gofiber/websocket/v2"
	"go.uber.org/zap"
)

const (
	// Time allowed to write a message to the client
	writeWait = 10 * time.Second

	// Time allowed to read the next pong message from the client
	pongWait = 60 * time.Second

	// Send pings to client with this period — must be less than pongWait
	pingPeriod = (pongWait * 9) / 10

	// Maximum message size in bytes
	maxMessageSize = 8192

	// Buffer size for the send channel
	sendBufferSize = 256
)

// Connection represents a single WebSocket client connection
type Connection struct {
	id     string
	userID string
	conn   *websocket.Conn
	hub    *Hub
	send   chan []byte
	rooms  map[string]struct{}
	mu     sync.RWMutex
	log    *zap.Logger
}

// newConnection creates a new Connection
func newConnection(id, userID string, conn *websocket.Conn, hub *Hub, log *zap.Logger) *Connection {
	return &Connection{
		id:     id,
		userID: userID,
		conn:   conn,
		hub:    hub,
		send:   make(chan []byte, sendBufferSize),
		rooms:  make(map[string]struct{}),
		log:    log,
	}
}

// ID returns the connection's unique ID
func (c *Connection) ID() string { return c.id }

// UserID returns the authenticated user's ID
func (c *Connection) UserID() string { return c.userID }

// JoinRoom adds this connection to a room
func (c *Connection) JoinRoom(room string) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.rooms[room] = struct{}{}
}

// LeaveRoom removes this connection from a room
func (c *Connection) LeaveRoom(room string) {
	c.mu.Lock()
	defer c.mu.Unlock()
	delete(c.rooms, room)
}

// InRoom returns true if the connection is subscribed to the given room
func (c *Connection) InRoom(room string) bool {
	c.mu.RLock()
	defer c.mu.RUnlock()
	_, ok := c.rooms[room]
	return ok
}

// Rooms returns a snapshot of all rooms this connection is in
func (c *Connection) Rooms() []string {
	c.mu.RLock()
	defer c.mu.RUnlock()
	rooms := make([]string, 0, len(c.rooms))
	for r := range c.rooms {
		rooms = append(rooms, r)
	}
	return rooms
}

// SendMessage enqueues a message for sending (non-blocking)
func (c *Connection) SendMessage(msg *Message) bool {
	msg.Timestamp = time.Now()
	data, err := json.Marshal(msg)
	if err != nil {
		c.log.Error("Failed to marshal WebSocket message", zap.Error(err))
		return false
	}

	select {
	case c.send <- data:
		return true
	default:
		// Channel is full — client is too slow, will be disconnected
		c.log.Warn("WebSocket send buffer full, dropping connection",
			zap.String("connection_id", c.id),
		)
		return false
	}
}

// writePump pumps messages from the hub to the WebSocket connection
func (c *Connection) writePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.conn.Close()
	}()

	for {
		select {
		case message, ok := <-c.send:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))

			if !ok {
				// Hub closed the channel
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
				c.log.Debug("WebSocket write error",
					zap.String("conn_id", c.id),
					zap.Error(err),
				)
				return
			}

		case <-ticker.C:
			// Send ping to check if client is still alive
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

// readPump reads messages from the WebSocket connection and processes them
func (c *Connection) readPump() {
	defer func() {
		c.hub.unregister <- c
		c.conn.Close()
	}()

	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetReadDeadline(time.Now().Add(pongWait))

	// Reset deadline whenever we receive a pong
	c.conn.SetPongHandler(func(string) error {
		c.conn.SetReadDeadline(time.Now().Add(pongWait))
		return nil
	})

	for {
		_, data, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err,
				websocket.CloseGoingAway,
				websocket.CloseAbnormalClosure,
			) {
				c.log.Debug("WebSocket unexpected close",
					zap.String("conn_id", c.id),
					zap.Error(err),
				)
			}
			break
		}

		// Parse incoming message
		var msg Message
		if err := json.Unmarshal(data, &msg); err != nil {
			c.SendMessage(&Message{
				Type:    MsgTypeError,
				Payload: map[string]string{"error": "invalid JSON"},
			})
			continue
		}

		// Dispatch message to hub
		c.hub.incoming <- &IncomingMessage{
			Conn:    c,
			Message: &msg,
		}
	}
}

Tip จากพี่โชว์: pingPeriod = (pongWait * 9) / 10 คือ 54 วินาที — เราส่ง ping ก่อนที่ deadline จะหมด เพื่อให้แน่ใจว่า connection ยังมีชีวิตอยู่ เหมือนกับการเช็คชื่อในห้องเรียน ถ้าไม่ตอบก็โดนลบออกจากระบบ!


Hub: ศูนย์กลางของทุกอย่าง

internal/websocket/hub.go

นี่คือหัวใจของระบบ Hub เปรียบเหมือน ตู้โทรศัพท์ส่วนกลาง ที่รู้ว่าแต่ละ connection อยู่ในห้องไหน และกระจายข้อความไปให้ถูกคน

package websocket

import (
	"context"
	"fmt"
	"sync"
	"time"

	"go.uber.org/zap"
)

// IncomingMessage wraps a message with its source connection
type IncomingMessage struct {
	Conn    *Connection
	Message *Message
}

// BroadcastRequest represents a request to broadcast to a room
type BroadcastRequest struct {
	Room    string
	Message *Message
}

// Hub manages all active WebSocket connections and message routing
type Hub struct {
	// Registered connections
	connections map[string]*Connection
	mu          sync.RWMutex

	// Rooms: room name → set of connection IDs
	rooms   map[string]map[string]struct{}
	roomsMu sync.RWMutex

	// Channels for connection lifecycle and messaging
	register   chan *Connection
	unregister chan *Connection
	incoming   chan *IncomingMessage
	broadcast  chan *BroadcastRequest

	log *zap.Logger
}

// NewHub creates a new Hub
func NewHub(log *zap.Logger) *Hub {
	return &Hub{
		connections: make(map[string]*Connection),
		rooms:       make(map[string]map[string]struct{}),
		register:    make(chan *Connection, 100),
		unregister:  make(chan *Connection, 100),
		incoming:    make(chan *IncomingMessage, 1000),
		broadcast:   make(chan *BroadcastRequest, 1000),
		log:         log,
	}
}

// Run starts the hub event loop — must be called in a goroutine
func (h *Hub) Run(ctx context.Context) {
	h.log.Info("WebSocket hub started")
	defer h.log.Info("WebSocket hub stopped")

	for {
		select {
		case conn := <-h.register:
			h.handleRegister(conn)

		case conn := <-h.unregister:
			h.handleUnregister(conn)

		case msg := <-h.incoming:
			h.handleIncoming(msg)

		case req := <-h.broadcast:
			h.handleBroadcast(req)

		case <-ctx.Done():
			h.closeAll()
			return
		}
	}
}

func (h *Hub) handleRegister(conn *Connection) {
	h.mu.Lock()
	h.connections[conn.id] = conn
	h.mu.Unlock()

	h.log.Info("WebSocket client connected",
		zap.String("conn_id", conn.id),
		zap.String("user_id", conn.userID),
	)

	// Send welcome message
	conn.SendMessage(&Message{
		Type: "connected",
		Payload: map[string]interface{}{
			"connection_id": conn.id,
			"server_time":   time.Now().UTC(),
		},
	})
}

func (h *Hub) handleUnregister(conn *Connection) {
	h.mu.Lock()
	if _, ok := h.connections[conn.id]; ok {
		delete(h.connections, conn.id)
		close(conn.send)
	}
	h.mu.Unlock()

	// Remove from all rooms
	for _, room := range conn.Rooms() {
		h.removeFromRoom(conn.id, room)
	}

	h.log.Info("WebSocket client disconnected",
		zap.String("conn_id", conn.id),
		zap.String("user_id", conn.userID),
	)
}

func (h *Hub) handleIncoming(msg *IncomingMessage) {
	conn := msg.Conn

	switch msg.Message.Type {
	case MsgTypePing:
		conn.SendMessage(&Message{
			Type:      MsgTypePong,
			RequestID: msg.Message.RequestID,
		})

	case MsgTypeSubscribe:
		req, ok := parseSubscribeRequest(msg.Message.Payload)
		if !ok {
			conn.SendMessage(&Message{
				Type:    MsgTypeError,
				Payload: map[string]string{"error": "invalid subscribe payload"},
			})
			return
		}

		for _, room := range req.Rooms {
			if !isValidRoom(room) {
				continue
			}
			conn.JoinRoom(room)
			h.addToRoom(conn.id, room)
		}

		conn.SendMessage(&Message{
			Type:      "subscribed",
			RequestID: msg.Message.RequestID,
			Payload:   map[string]interface{}{"rooms": conn.Rooms()},
		})

		h.log.Debug("Client subscribed to rooms",
			zap.String("conn_id", conn.id),
			zap.Strings("rooms", req.Rooms),
		)

	case MsgTypeUnsubscribe:
		req, ok := parseSubscribeRequest(msg.Message.Payload)
		if !ok {
			return
		}

		for _, room := range req.Rooms {
			conn.LeaveRoom(room)
			h.removeFromRoom(conn.id, room)
		}

		conn.SendMessage(&Message{
			Type:      "unsubscribed",
			RequestID: msg.Message.RequestID,
			Payload:   map[string]interface{}{"rooms": conn.Rooms()},
		})
	}
}

func (h *Hub) handleBroadcast(req *BroadcastRequest) {
	h.roomsMu.RLock()
	connIDs, ok := h.rooms[req.Room]
	if !ok {
		h.roomsMu.RUnlock()
		return
	}
	// Copy to avoid holding lock during send
	targets := make([]string, 0, len(connIDs))
	for id := range connIDs {
		targets = append(targets, id)
	}
	h.roomsMu.RUnlock()

	h.mu.RLock()
	defer h.mu.RUnlock()

	sent := 0
	for _, id := range targets {
		conn, ok := h.connections[id]
		if !ok {
			continue
		}
		if conn.SendMessage(req.Message) {
			sent++
		}
	}

	h.log.Debug("Broadcast to room",
		zap.String("room", req.Room),
		zap.Int("recipients", sent),
	)
}

// BroadcastToRoom sends a message to all connections in a room
func (h *Hub) BroadcastToRoom(room string, msg *Message) {
	select {
	case h.broadcast <- &BroadcastRequest{Room: room, Message: msg}:
	default:
		h.log.Warn("Broadcast channel full, dropping message",
			zap.String("room", room),
		)
	}
}

// BroadcastTelemetry fans out sensor data to all relevant rooms
func (h *Hub) BroadcastTelemetry(payload *TelemetryPayload) {
	msg := &Message{
		Type:    MsgTypeTelemetry,
		Payload: payload,
	}

	// 1. Broadcast to device-specific room
	h.BroadcastToRoom(DeviceRoom(payload.DeviceID), msg)

	// 2. Broadcast to global room (all data)
	h.BroadcastToRoom(RoomGlobal, msg)
}

// BroadcastDeviceStatus notifies subscribers of a device status change
func (h *Hub) BroadcastDeviceStatus(payload *DeviceStatusPayload) {
	msg := &Message{
		Type:    MsgTypeDeviceStatus,
		Payload: payload,
	}
	h.BroadcastToRoom(DeviceRoom(payload.DeviceID), msg)
	h.BroadcastToRoom(RoomGlobal, msg)
}

// Stats returns current hub statistics
func (h *Hub) Stats() map[string]interface{} {
	h.mu.RLock()
	connCount := len(h.connections)
	h.mu.RUnlock()

	h.roomsMu.RLock()
	roomCount := len(h.rooms)
	h.roomsMu.RUnlock()

	return map[string]interface{}{
		"connections": connCount,
		"rooms":       roomCount,
	}
}

func (h *Hub) addToRoom(connID, room string) {
	h.roomsMu.Lock()
	defer h.roomsMu.Unlock()

	if _, ok := h.rooms[room]; !ok {
		h.rooms[room] = make(map[string]struct{})
	}
	h.rooms[room][connID] = struct{}{}
}

func (h *Hub) removeFromRoom(connID, room string) {
	h.roomsMu.Lock()
	defer h.roomsMu.Unlock()

	if members, ok := h.rooms[room]; ok {
		delete(members, connID)
		if len(members) == 0 {
			delete(h.rooms, room)
		}
	}
}

func (h *Hub) closeAll() {
	h.mu.Lock()
	defer h.mu.Unlock()
	for _, conn := range h.connections {
		close(conn.send)
	}
	h.connections = make(map[string]*Connection)
}

func isValidRoom(room string) bool {
	if room == RoomGlobal {
		return true
	}
	if len(room) > len(RoomPrefixDevice) && room[:len(RoomPrefixDevice)] == RoomPrefixDevice {
		return true
	}
	if len(room) > len(RoomPrefixGroup) && room[:len(RoomPrefixGroup)] == RoomPrefixGroup {
		return true
	}
	return false
}

func parseSubscribeRequest(payload interface{}) (*SubscribeRequest, bool) {
	if payload == nil {
		return nil, false
	}
	// The payload comes as map[string]interface{} after JSON unmarshal
	m, ok := payload.(map[string]interface{})
	if !ok {
		return nil, false
	}
	rawRooms, ok := m["rooms"]
	if !ok {
		return nil, false
	}
	rooms, ok := rawRooms.([]interface{})
	if !ok {
		return nil, false
	}
	result := &SubscribeRequest{}
	for _, r := range rooms {
		if s, ok := r.(string); ok {
			result.Rooms = append(result.Rooms, s)
		}
	}
	return result, true
}

จุดสำคัญที่ต้องเข้าใจ: Hub ทำงานใน single goroutine (event loop) แต่ Connection แต่ละอันมี goroutine ของตัวเอง การส่งข้อความผ่าน channel จึงทำให้ thread-safe โดยไม่ต้อง lock ที่ Hub เอง ฉลาดมากเลย!


WebSocket Handler: ประตูเชื่อมโลกภายนอก

internal/handler/websocket_handler.go

Handler ทำหน้าที่รับ HTTP request ที่ขอ upgrade เป็น WebSocket แล้วส่งต่อให้ Hub จัดการ เหมือนพนักงานต้อนรับที่ check-in ลูกค้าแล้วพาไปห้องที่ถูกต้อง

package handler

import (
	"fmt"
	"time"

	"github.com/gofiber/fiber/v2"
	fiberws "github.com/gofiber/websocket/v2"
	"github.com/google/uuid"
	"go.uber.org/zap"

	appws "github.com/kangana1024/iot-workshop/backend/internal/websocket"
)

// WebSocketHandler handles WebSocket upgrade and connection management
type WebSocketHandler struct {
	hub *appws.Hub
	log *zap.Logger
}

// NewWebSocketHandler creates a new WebSocketHandler
func NewWebSocketHandler(hub *appws.Hub, log *zap.Logger) *WebSocketHandler {
	return &WebSocketHandler{hub: hub, log: log}
}

// Upgrade godoc
// @Summary      WebSocket upgrade
// @Description  Upgrades HTTP to WebSocket for real-time data streaming
// @Tags         realtime
// @Param        token  query  string  true  "JWT token for authentication"
// @Router       /api/v1/ws [get]
func (h *WebSocketHandler) Upgrade(c *fiber.Ctx) error {
	// Check if request is a WebSocket upgrade
	if !fiberws.IsWebSocketUpgrade(c) {
		return fiber.ErrUpgradeRequired
	}

	// Store user info in locals before upgrade (locals persist into WS handler)
	userID := c.Locals("user_id")
	c.Locals("ws_user_id", fmt.Sprintf("%v", userID))

	return c.Next()
}

// Handle is the WebSocket handler after upgrade
func (h *WebSocketHandler) Handle(c *fiberws.Conn) {
	userID, _ := c.Locals("ws_user_id").(string)
	connID := uuid.New().String()

	h.log.Info("New WebSocket connection",
		zap.String("conn_id", connID),
		zap.String("user_id", userID),
		zap.String("remote_addr", c.RemoteAddr().String()),
	)

	conn := appws.NewConnection(connID, userID, c, h.hub, h.log)

	// Register with hub
	h.hub.Register(conn)

	// Start read and write pumps
	go conn.WritePump()
	conn.ReadPump() // Blocks until connection closes
}

// Stats returns current WebSocket hub statistics
func (h *WebSocketHandler) Stats(c *fiber.Ctx) error {
	return c.JSON(h.hub.Stats())
}

Router Registration

// In router.go — add WebSocket routes

import (
	fiberws "github.com/gofiber/websocket/v2"
)

// WebSocket route (เพิ่มใน Setup function)
app.Use("/api/v1/ws", h.WebSocket.Upgrade)
app.Get("/api/v1/ws", fiberws.New(h.WebSocket.Handle))
app.Get("/api/v1/ws/stats", h.WebSocket.Stats)

เชื่อม MQTT กับ WebSocket Hub

internal/mqtt/service.go — เพิ่ม Hub Integration

นี่คือส่วนที่ “มัด” ทุกอย่างเข้าด้วยกัน เมื่อ MQTT รับข้อมูลจาก device มา เราจะกระจายต่อไปยัง WebSocket clients ที่รอรับอยู่ทันที

// เพิ่ม hub field ใน MQTTService
type MQTTService struct {
	client        *Client
	topics        *Topics
	sensorUsecase *usecase.SensorUsecase
	deviceUsecase *usecase.DeviceUsecase
	hub           *websocket.Hub  // ← เพิ่ม
	log           *zap.Logger
}

// อัปเดต handleTelemetry — หลังจาก ingest แล้ว broadcast ผ่าน WebSocket
func (s *MQTTService) handleTelemetry(ctx context.Context, topic string, payload []byte) {
	// ... (code เดิม) ...

	// After successful ingestion, fan-out to WebSocket clients
	if result.Accepted > 0 {
		wsPayload := &websocket.TelemetryPayload{
			DeviceID:  data.DeviceID,
			Timestamp: time.Now(),
			Fields:    data.Fields,
			Tags:      data.Tags,
		}
		s.hub.BroadcastTelemetry(wsPayload)
	}
}

// อัปเดต handleStatus — broadcast device status changes
func (s *MQTTService) handleStatus(ctx context.Context, topic string, payload []byte) {
	// ... (code เดิม) ...

	// Broadcast status change via WebSocket
	s.hub.BroadcastDeviceStatus(&websocket.DeviceStatusPayload{
		DeviceID:  deviceID,
		Status:    msg.Status,
		Timestamp: msg.Timestamp,
	})
}

ทดสอบ WebSocket กัน!

มาลองดูว่าระบบที่เราสร้างมาทำงานได้จริงไหม น้องๆ จะได้เห็นภาพชัดขึ้นมากเลย

ทดสอบด้วย websocat

# ติดตั้ง websocat
cargo install websocat
# หรือ
brew install websocat

# เชื่อมต่อ (ส่ง JWT token ผ่าน query parameter)
TOKEN="your-jwt-token-here"
websocat "ws://localhost:8080/api/v1/ws?token=$TOKEN"

Subscribe และรับข้อมูล

// ส่ง: Subscribe ไปยัง device ที่สนใจ
{
  "type": "subscribe",
  "request_id": "req-001",
  "payload": {
    "rooms": ["device:sensor-temp-001", "device:sensor-temp-002", "global"]
  }
}

// รับ: Confirmation
{
  "type": "subscribed",
  "request_id": "req-001",
  "payload": {
    "rooms": ["device:sensor-temp-001", "device:sensor-temp-002", "global"]
  },
  "timestamp": "2026-03-26T10:00:00Z"
}

// รับ: Real-time telemetry (เมื่อ device ส่งข้อมูล)
{
  "type": "telemetry",
  "timestamp": "2026-03-26T10:00:05Z",
  "payload": {
    "device_id": "sensor-temp-001",
    "timestamp": "2026-03-26T10:00:05Z",
    "fields": {
      "temperature": 28.7,
      "humidity": 64.3
    }
  }
}

// รับ: Device status change
{
  "type": "device_status",
  "timestamp": "2026-03-26T10:01:00Z",
  "payload": {
    "device_id": "sensor-temp-001",
    "status": "offline",
    "timestamp": "2026-03-26T10:01:00Z"
  }
}

Heartbeat: ชีพจรของ Connection

Heartbeat เหมือนกับการที่หมอเอาเครื่องวัด EKG ไปติดคนไข้ ถ้าสัญญาณหายไปก็แปลว่ามีปัญหา!

// ส่ง Ping
{ "type": "ping", "request_id": "heartbeat-001" }

// รับ Pong
{ "type": "pong", "request_id": "heartbeat-001", "timestamp": "2026-03-26T10:00:30Z" }

JavaScript Client Example

สำหรับ Admin Panel (Vite + React) เราสร้าง client class ที่ auto-reconnect และจัดการ event ต่างๆ ให้พร้อมใช้งาน:

// lib/websocket.ts

export type WSMessage = {
  type: string;
  room?: string;
  payload?: unknown;
  timestamp: string;
  request_id?: string;
};

export class IoTWebSocket {
  private ws: WebSocket | null = null;
  private reconnectDelay = 1000;
  private maxReconnectDelay = 30000;
  private heartbeatInterval: number | null = null;
  private listeners = new Map<string, Set<(payload: unknown) => void>>();

  constructor(private url: string, private token: string) {}

  connect(): void {
    this.ws = new WebSocket(`${this.url}?token=${this.token}`);

    this.ws.onopen = () => {
      console.log('WebSocket connected');
      this.reconnectDelay = 1000;
      this.startHeartbeat();
    };

    this.ws.onmessage = (event) => {
      try {
        const msg: WSMessage = JSON.parse(event.data);
        this.dispatch(msg.type, msg.payload);
      } catch (e) {
        console.error('Failed to parse WS message', e);
      }
    };

    this.ws.onclose = () => {
      console.log('WebSocket disconnected, reconnecting...');
      this.stopHeartbeat();
      setTimeout(() => this.connect(), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
    };

    this.ws.onerror = (err) => {
      console.error('WebSocket error', err);
    };
  }

  subscribe(rooms: string[]): void {
    this.send({
      type: 'subscribe',
      request_id: `sub-${Date.now()}`,
      payload: { rooms },
    });
  }

  unsubscribe(rooms: string[]): void {
    this.send({
      type: 'unsubscribe',
      payload: { rooms },
    });
  }

  on(messageType: string, handler: (payload: unknown) => void): () => void {
    if (!this.listeners.has(messageType)) {
      this.listeners.set(messageType, new Set());
    }
    this.listeners.get(messageType)!.add(handler);

    // Return unsubscribe function
    return () => {
      this.listeners.get(messageType)?.delete(handler);
    };
  }

  private send(msg: Partial<WSMessage>): void {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(msg));
    }
  }

  private dispatch(type: string, payload: unknown): void {
    this.listeners.get(type)?.forEach((handler) => handler(payload));
  }

  private startHeartbeat(): void {
    this.heartbeatInterval = window.setInterval(() => {
      this.send({ type: 'ping', request_id: `ping-${Date.now()}` });
    }, 30000);
  }

  private stopHeartbeat(): void {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
      this.heartbeatInterval = null;
    }
  }

  disconnect(): void {
    this.stopHeartbeat();
    this.ws?.close(1000, 'Client disconnecting');
  }
}

// ตัวอย่างการใช้งานใน React component:
//
// const ws = new IoTWebSocket('ws://localhost:8080/api/v1/ws', token);
// ws.connect();
// ws.subscribe(['device:sensor-temp-001', 'global']);
//
// const unsubscribe = ws.on('telemetry', (payload) => {
//   const data = payload as TelemetryPayload;
//   setTemperature(data.fields.temperature as number);
// });
//
// return () => {
//   unsubscribe();
//   ws.disconnect();
// };

สังเกตไหม? Reconnect logic ใช้ Exponential Backoff — เริ่มที่ 1 วิ, แล้วก็ 2, 4, 8, … จนสูงสุด 30 วิ เพื่อไม่ให้ client ไป hammer server เมื่อมันล่ม ฉลาดดีเนอะ!


Load Testing

# ติดตั้ง ws-load-test หรือใช้ k6
# ทดสอบ 1000 concurrent connections

k6 run --vus 1000 --duration 60s - <<'EOF'
import ws from 'k6/ws';
import { check } from 'k6';

export default function () {
  const url = 'ws://localhost:8080/api/v1/ws?token=test-token';
  const res = ws.connect(url, {}, function (socket) {
    socket.on('open', () => {
      socket.send(JSON.stringify({
        type: 'subscribe',
        payload: { rooms: ['global'] }
      }));
    });

    socket.on('message', (data) => {
      const msg = JSON.parse(data);
      check(msg, { 'valid message': (m) => m.type !== undefined });
    });

    socket.setTimeout(() => socket.close(), 30000);
  });

  check(res, { 'connected': (r) => r && r.status === 101 });
}
EOF

สรุปสิ่งที่ทำไปทั้งหมด

(づ。◕‿‿◕。)づ ยินดีด้วย! น้องๆ ผ่าน WebSocket Layer มาแล้ว!

ในตอนนี้เราได้ทำอะไรไปบ้าง:

  • Upgrade HTTP เป็น WebSocket ด้วย Fiber WebSocket middleware — ประตูเปิดแล้ว!
  • Hub Pattern จัดการ Connections และ Rooms อย่างมีประสิทธิภาพด้วย Goroutines และ Channels
  • Room System ให้ client subscribe เฉพาะ device/group ที่ต้องการ ไม่ต้องรับทุกอย่าง
  • Fan-out telemetry จาก MQTT ไปยัง WebSocket clients ที่ subscribe ไว้แบบ real-time
  • Connection Lifecycle ครบถ้วน: Register, Unregister, Write Pump, Read Pump
  • Heartbeat ด้วย Ping/Pong ป้องกัน dead connections ที่ค้างในระบบ
  • TypeScript WebSocket client ที่ auto-reconnect พร้อมใช้ใน Admin Panel

Workshop Complete! ขอบคุณทุกคนที่ทนมาถึงตรงนี้

เราได้สร้าง IoT Backend ครบชุดแล้ว:

  1. #4 Fiber Bootstrap — Foundation, Config, Middleware
  2. #5 MongoDB Models — Domain Models, Repository Pattern
  3. #6 Device API — CRUD REST API สำหรับจัดการ Devices
  4. #7 Sensor Ingestion — รับและเก็บข้อมูล Sensor ลง InfluxDB
  5. #8 MQTT Integration — รับข้อมูลจาก Devices แบบ Real-time
  6. #9 WebSocket — Push ข้อมูลไปยัง Dashboard แบบ Real-time ← ตรงนี้เอง!

Next Step: มาลุยกันต่อ!

Backend พร้อมแล้ว! ขั้นตอนถัดไปคือสร้าง LynxJS Mobile App และ Vite Admin Panel ที่จะใช้ WebSocket API ที่เราเพิ่งสร้างมา หน้า Dashboard จะได้ขยับตัวเลขแบบ real-time สวยงามมาก รอดูกันได้เลย!