WebSocket Real-time ข้อมูล IoT แบบสดๆ
Branch:
workshop/dev-06-websocketPhase: Development — Real-time Layer Repository: https://github.com/kangana1024/iot-workshop
Hook: ทำไมต้องเป็น Real-time?
น้องๆ เคยเห็น Dashboard ที่ค่าอุณหภูมิมันอัปเดตแบบสดๆ โดยไม่ต้อง refresh ไหม? นั่นแหละ WebSocket! วันนี้เราจะมาทำให้ IoT Dashboard ของเราสดใสแบบนั้นกัน มาลุยกันเลยดีกว่า (ノ◕ヮ◕)ノ*:・゚✧
สิ่งที่น้องๆ จะได้เรียนรู้
- ทำไม ต้อง WebSocket แทน Polling ธรรมดา
- Hub Pattern คืออะไร และทำไมถึงต้องใช้
- Room System — ให้ client subscribe เฉพาะ device ที่สนใจ
- Fan-out ข้อมูล Sensor จาก MQTT ไปยัง Dashboard
- จัดการ Connection Lifecycle และ Heartbeat อย่างถูกต้อง
- ตัวอย่าง TypeScript Client ที่ใช้งานได้จริงใน React
WHY ก่อน HOW: ทำไมต้องเป็น WebSocket?
ปัญหาของ Polling แบบเดิม
ลองนึกภาพว่าน้องโทรหาเพื่อนทุกๆ 1 วินาทีเพื่อถามว่า “มีข่าวอะไรไหม?” — นั่นคือ HTTP Polling มันทำงานได้ แต่สิ้นเปลืองมาก!
HTTP Polling (แบบเดิม):
Client → "มีข้อมูลไหม?" → Server → "ยังไม่มี" (ทุก 1 วิ)
Client → "มีข้อมูลไหม?" → Server → "ยังไม่มี" (ทุก 1 วิ)
Client → "มีข้อมูลไหม?" → Server → "มีแล้ว!" (ทุก 1 วิ)
วิธีที่ดีกว่า: WebSocket
WebSocket เหมือนกับการ โทรหากันแล้วไม่วาง ทั้งสองฝ่ายส่งข้อความหากันได้ตลอดโดยไม่ต้องต่อสายใหม่ทุกครั้ง
WebSocket (แบบใหม่):
Client ─── เชื่อมต่อครั้งเดียว ───────────────── Server
←←← ข้อมูลมาถึงเมื่อไหร่ก็ส่งทันที ←←←
ผลลัพธ์คือ: Latency ต่ำลง, CPU ลดลง, ประสบการณ์ผู้ใช้ดีขึ้นมาก
Architecture: ภาพรวมก่อนลงมือ
หรือถ้าอยากเห็น flow ของ Hub ภายในชัดๆ:
Hub Pattern คืออะไร? (อธิบายแบบภาษาคน)
ลองนึกถึง ห้องแชทในออฟฟิศ สักห้องนึง:
- Hub = พนักงานต้อนรับ ที่รู้ว่าทุกคนอยู่ห้องไหน
- Connection = คนทำงาน แต่ละคน
- Room = ห้องประชุม (ห้อง sensor-001, ห้อง building-a, ห้อง global)
- Broadcast = ประกาศเสียงตามสาย ไปยังห้องที่ตรงกัน
เมื่อมีข้อมูลจาก sensor มา Hub จะกระจายไปยังทุก Connection ที่อยู่ในห้องที่ตรงกัน — เรียกว่า Fan-out
WebSocket Message Types
internal/websocket/message.go
ก่อนเขียน code เราต้องออกแบบ “ภาษา” ที่ Client กับ Server จะคุยกัน เหมือนกับว่าเราตกลงกันว่า “ถ้าอยากสมัคร ส่ง subscribe มา ถ้าอยากออก ส่ง unsubscribe มา”
package websocket
import "time"
// MessageType defines the type of WebSocket message
type MessageType string
const (
// Server → Client
MsgTypeTelemetry MessageType = "telemetry" // Sensor data update
MsgTypeDeviceStatus MessageType = "device_status" // Device online/offline
MsgTypeAlert MessageType = "alert" // Alert triggered
MsgTypeError MessageType = "error" // Error notification
MsgTypePong MessageType = "pong" // Heartbeat response
// Client → Server
MsgTypeSubscribe MessageType = "subscribe" // Join a room
MsgTypeUnsubscribe MessageType = "unsubscribe" // Leave a room
MsgTypePing MessageType = "ping" // Heartbeat request
)
// Message is the envelope for all WebSocket communication
type Message struct {
Type MessageType `json:"type"`
Room string `json:"room,omitempty"`
Payload interface{} `json:"payload,omitempty"`
Timestamp time.Time `json:"timestamp"`
RequestID string `json:"request_id,omitempty"`
}
// SubscribeRequest is the payload for subscribe/unsubscribe messages
type SubscribeRequest struct {
Rooms []string `json:"rooms"` // e.g. ["device:sensor-001", "group:building-a"]
}
// TelemetryPayload is the data pushed to subscribed clients
type TelemetryPayload struct {
DeviceID string `json:"device_id"`
Timestamp time.Time `json:"timestamp"`
Fields map[string]interface{} `json:"fields"`
Tags map[string]string `json:"tags,omitempty"`
}
// DeviceStatusPayload represents a device status change event
type DeviceStatusPayload struct {
DeviceID string `json:"device_id"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
}
// AlertPayload represents a triggered alert
type AlertPayload struct {
AlertID string `json:"alert_id"`
DeviceID string `json:"device_id"`
RuleName string `json:"rule_name"`
Severity string `json:"severity"`
Field string `json:"field"`
Value float64 `json:"value"`
Threshold float64 `json:"threshold"`
Timestamp time.Time `json:"timestamp"`
}
// RoomPrefix constants
const (
RoomPrefixDevice = "device:"
RoomPrefixGroup = "group:"
RoomGlobal = "global"
)
// DeviceRoom returns the room name for a specific device
func DeviceRoom(deviceID string) string {
return RoomPrefixDevice + deviceID
}
// GroupRoom returns the room name for a device group
func GroupRoom(groupID string) string {
return RoomPrefixGroup + groupID
}
เรื่องน่ารู้: เราใช้ prefix แยก namespace ของ Room เพื่อป้องกันชื่อชน เช่น
device:sensor-001vsgroup:building-a— เหมือนกับ path ใน URL เลย
Connection: จัดการ Client แต่ละคน
internal/websocket/connection.go
Connection คือ “ตัวแทน” ของ client แต่ละคนในระบบ มีหน้าที่หลักสองอย่าง:
- readPump — คอยอ่านข้อความที่ client ส่งมา (subscribe, ping, ฯลฯ)
- writePump — คอยส่งข้อมูลไปให้ client เมื่อมีข้อมูลใหม่
┌─────────────────────────────────────────┐
│ Connection │
│ readPump ─────────────► Hub.incoming │
│ writePump ◄──── send channel ◄── Hub │
│ pingTicker ────────────────────────► │ (ส่ง ping ทุก 54 วิ)
└─────────────────────────────────────────┘
package websocket
import (
"encoding/json"
"sync"
"time"
"github.com/gofiber/websocket/v2"
"go.uber.org/zap"
)
const (
// Time allowed to write a message to the client
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the client
pongWait = 60 * time.Second
// Send pings to client with this period — must be less than pongWait
pingPeriod = (pongWait * 9) / 10
// Maximum message size in bytes
maxMessageSize = 8192
// Buffer size for the send channel
sendBufferSize = 256
)
// Connection represents a single WebSocket client connection
type Connection struct {
id string
userID string
conn *websocket.Conn
hub *Hub
send chan []byte
rooms map[string]struct{}
mu sync.RWMutex
log *zap.Logger
}
// newConnection creates a new Connection
func newConnection(id, userID string, conn *websocket.Conn, hub *Hub, log *zap.Logger) *Connection {
return &Connection{
id: id,
userID: userID,
conn: conn,
hub: hub,
send: make(chan []byte, sendBufferSize),
rooms: make(map[string]struct{}),
log: log,
}
}
// ID returns the connection's unique ID
func (c *Connection) ID() string { return c.id }
// UserID returns the authenticated user's ID
func (c *Connection) UserID() string { return c.userID }
// JoinRoom adds this connection to a room
func (c *Connection) JoinRoom(room string) {
c.mu.Lock()
defer c.mu.Unlock()
c.rooms[room] = struct{}{}
}
// LeaveRoom removes this connection from a room
func (c *Connection) LeaveRoom(room string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.rooms, room)
}
// InRoom returns true if the connection is subscribed to the given room
func (c *Connection) InRoom(room string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
_, ok := c.rooms[room]
return ok
}
// Rooms returns a snapshot of all rooms this connection is in
func (c *Connection) Rooms() []string {
c.mu.RLock()
defer c.mu.RUnlock()
rooms := make([]string, 0, len(c.rooms))
for r := range c.rooms {
rooms = append(rooms, r)
}
return rooms
}
// SendMessage enqueues a message for sending (non-blocking)
func (c *Connection) SendMessage(msg *Message) bool {
msg.Timestamp = time.Now()
data, err := json.Marshal(msg)
if err != nil {
c.log.Error("Failed to marshal WebSocket message", zap.Error(err))
return false
}
select {
case c.send <- data:
return true
default:
// Channel is full — client is too slow, will be disconnected
c.log.Warn("WebSocket send buffer full, dropping connection",
zap.String("connection_id", c.id),
)
return false
}
}
// writePump pumps messages from the hub to the WebSocket connection
func (c *Connection) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// Hub closed the channel
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
c.log.Debug("WebSocket write error",
zap.String("conn_id", c.id),
zap.Error(err),
)
return
}
case <-ticker.C:
// Send ping to check if client is still alive
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// readPump reads messages from the WebSocket connection and processes them
func (c *Connection) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
// Reset deadline whenever we receive a pong
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, data, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
c.log.Debug("WebSocket unexpected close",
zap.String("conn_id", c.id),
zap.Error(err),
)
}
break
}
// Parse incoming message
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
c.SendMessage(&Message{
Type: MsgTypeError,
Payload: map[string]string{"error": "invalid JSON"},
})
continue
}
// Dispatch message to hub
c.hub.incoming <- &IncomingMessage{
Conn: c,
Message: &msg,
}
}
}
Tip จากพี่โชว์:
pingPeriod = (pongWait * 9) / 10คือ 54 วินาที — เราส่ง ping ก่อนที่ deadline จะหมด เพื่อให้แน่ใจว่า connection ยังมีชีวิตอยู่ เหมือนกับการเช็คชื่อในห้องเรียน ถ้าไม่ตอบก็โดนลบออกจากระบบ!
Hub: ศูนย์กลางของทุกอย่าง
internal/websocket/hub.go
นี่คือหัวใจของระบบ Hub เปรียบเหมือน ตู้โทรศัพท์ส่วนกลาง ที่รู้ว่าแต่ละ connection อยู่ในห้องไหน และกระจายข้อความไปให้ถูกคน
package websocket
import (
"context"
"fmt"
"sync"
"time"
"go.uber.org/zap"
)
// IncomingMessage wraps a message with its source connection
type IncomingMessage struct {
Conn *Connection
Message *Message
}
// BroadcastRequest represents a request to broadcast to a room
type BroadcastRequest struct {
Room string
Message *Message
}
// Hub manages all active WebSocket connections and message routing
type Hub struct {
// Registered connections
connections map[string]*Connection
mu sync.RWMutex
// Rooms: room name → set of connection IDs
rooms map[string]map[string]struct{}
roomsMu sync.RWMutex
// Channels for connection lifecycle and messaging
register chan *Connection
unregister chan *Connection
incoming chan *IncomingMessage
broadcast chan *BroadcastRequest
log *zap.Logger
}
// NewHub creates a new Hub
func NewHub(log *zap.Logger) *Hub {
return &Hub{
connections: make(map[string]*Connection),
rooms: make(map[string]map[string]struct{}),
register: make(chan *Connection, 100),
unregister: make(chan *Connection, 100),
incoming: make(chan *IncomingMessage, 1000),
broadcast: make(chan *BroadcastRequest, 1000),
log: log,
}
}
// Run starts the hub event loop — must be called in a goroutine
func (h *Hub) Run(ctx context.Context) {
h.log.Info("WebSocket hub started")
defer h.log.Info("WebSocket hub stopped")
for {
select {
case conn := <-h.register:
h.handleRegister(conn)
case conn := <-h.unregister:
h.handleUnregister(conn)
case msg := <-h.incoming:
h.handleIncoming(msg)
case req := <-h.broadcast:
h.handleBroadcast(req)
case <-ctx.Done():
h.closeAll()
return
}
}
}
func (h *Hub) handleRegister(conn *Connection) {
h.mu.Lock()
h.connections[conn.id] = conn
h.mu.Unlock()
h.log.Info("WebSocket client connected",
zap.String("conn_id", conn.id),
zap.String("user_id", conn.userID),
)
// Send welcome message
conn.SendMessage(&Message{
Type: "connected",
Payload: map[string]interface{}{
"connection_id": conn.id,
"server_time": time.Now().UTC(),
},
})
}
func (h *Hub) handleUnregister(conn *Connection) {
h.mu.Lock()
if _, ok := h.connections[conn.id]; ok {
delete(h.connections, conn.id)
close(conn.send)
}
h.mu.Unlock()
// Remove from all rooms
for _, room := range conn.Rooms() {
h.removeFromRoom(conn.id, room)
}
h.log.Info("WebSocket client disconnected",
zap.String("conn_id", conn.id),
zap.String("user_id", conn.userID),
)
}
func (h *Hub) handleIncoming(msg *IncomingMessage) {
conn := msg.Conn
switch msg.Message.Type {
case MsgTypePing:
conn.SendMessage(&Message{
Type: MsgTypePong,
RequestID: msg.Message.RequestID,
})
case MsgTypeSubscribe:
req, ok := parseSubscribeRequest(msg.Message.Payload)
if !ok {
conn.SendMessage(&Message{
Type: MsgTypeError,
Payload: map[string]string{"error": "invalid subscribe payload"},
})
return
}
for _, room := range req.Rooms {
if !isValidRoom(room) {
continue
}
conn.JoinRoom(room)
h.addToRoom(conn.id, room)
}
conn.SendMessage(&Message{
Type: "subscribed",
RequestID: msg.Message.RequestID,
Payload: map[string]interface{}{"rooms": conn.Rooms()},
})
h.log.Debug("Client subscribed to rooms",
zap.String("conn_id", conn.id),
zap.Strings("rooms", req.Rooms),
)
case MsgTypeUnsubscribe:
req, ok := parseSubscribeRequest(msg.Message.Payload)
if !ok {
return
}
for _, room := range req.Rooms {
conn.LeaveRoom(room)
h.removeFromRoom(conn.id, room)
}
conn.SendMessage(&Message{
Type: "unsubscribed",
RequestID: msg.Message.RequestID,
Payload: map[string]interface{}{"rooms": conn.Rooms()},
})
}
}
func (h *Hub) handleBroadcast(req *BroadcastRequest) {
h.roomsMu.RLock()
connIDs, ok := h.rooms[req.Room]
if !ok {
h.roomsMu.RUnlock()
return
}
// Copy to avoid holding lock during send
targets := make([]string, 0, len(connIDs))
for id := range connIDs {
targets = append(targets, id)
}
h.roomsMu.RUnlock()
h.mu.RLock()
defer h.mu.RUnlock()
sent := 0
for _, id := range targets {
conn, ok := h.connections[id]
if !ok {
continue
}
if conn.SendMessage(req.Message) {
sent++
}
}
h.log.Debug("Broadcast to room",
zap.String("room", req.Room),
zap.Int("recipients", sent),
)
}
// BroadcastToRoom sends a message to all connections in a room
func (h *Hub) BroadcastToRoom(room string, msg *Message) {
select {
case h.broadcast <- &BroadcastRequest{Room: room, Message: msg}:
default:
h.log.Warn("Broadcast channel full, dropping message",
zap.String("room", room),
)
}
}
// BroadcastTelemetry fans out sensor data to all relevant rooms
func (h *Hub) BroadcastTelemetry(payload *TelemetryPayload) {
msg := &Message{
Type: MsgTypeTelemetry,
Payload: payload,
}
// 1. Broadcast to device-specific room
h.BroadcastToRoom(DeviceRoom(payload.DeviceID), msg)
// 2. Broadcast to global room (all data)
h.BroadcastToRoom(RoomGlobal, msg)
}
// BroadcastDeviceStatus notifies subscribers of a device status change
func (h *Hub) BroadcastDeviceStatus(payload *DeviceStatusPayload) {
msg := &Message{
Type: MsgTypeDeviceStatus,
Payload: payload,
}
h.BroadcastToRoom(DeviceRoom(payload.DeviceID), msg)
h.BroadcastToRoom(RoomGlobal, msg)
}
// Stats returns current hub statistics
func (h *Hub) Stats() map[string]interface{} {
h.mu.RLock()
connCount := len(h.connections)
h.mu.RUnlock()
h.roomsMu.RLock()
roomCount := len(h.rooms)
h.roomsMu.RUnlock()
return map[string]interface{}{
"connections": connCount,
"rooms": roomCount,
}
}
func (h *Hub) addToRoom(connID, room string) {
h.roomsMu.Lock()
defer h.roomsMu.Unlock()
if _, ok := h.rooms[room]; !ok {
h.rooms[room] = make(map[string]struct{})
}
h.rooms[room][connID] = struct{}{}
}
func (h *Hub) removeFromRoom(connID, room string) {
h.roomsMu.Lock()
defer h.roomsMu.Unlock()
if members, ok := h.rooms[room]; ok {
delete(members, connID)
if len(members) == 0 {
delete(h.rooms, room)
}
}
}
func (h *Hub) closeAll() {
h.mu.Lock()
defer h.mu.Unlock()
for _, conn := range h.connections {
close(conn.send)
}
h.connections = make(map[string]*Connection)
}
func isValidRoom(room string) bool {
if room == RoomGlobal {
return true
}
if len(room) > len(RoomPrefixDevice) && room[:len(RoomPrefixDevice)] == RoomPrefixDevice {
return true
}
if len(room) > len(RoomPrefixGroup) && room[:len(RoomPrefixGroup)] == RoomPrefixGroup {
return true
}
return false
}
func parseSubscribeRequest(payload interface{}) (*SubscribeRequest, bool) {
if payload == nil {
return nil, false
}
// The payload comes as map[string]interface{} after JSON unmarshal
m, ok := payload.(map[string]interface{})
if !ok {
return nil, false
}
rawRooms, ok := m["rooms"]
if !ok {
return nil, false
}
rooms, ok := rawRooms.([]interface{})
if !ok {
return nil, false
}
result := &SubscribeRequest{}
for _, r := range rooms {
if s, ok := r.(string); ok {
result.Rooms = append(result.Rooms, s)
}
}
return result, true
}
จุดสำคัญที่ต้องเข้าใจ: Hub ทำงานใน single goroutine (event loop) แต่ Connection แต่ละอันมี goroutine ของตัวเอง การส่งข้อความผ่าน channel จึงทำให้ thread-safe โดยไม่ต้อง lock ที่ Hub เอง ฉลาดมากเลย!
WebSocket Handler: ประตูเชื่อมโลกภายนอก
internal/handler/websocket_handler.go
Handler ทำหน้าที่รับ HTTP request ที่ขอ upgrade เป็น WebSocket แล้วส่งต่อให้ Hub จัดการ เหมือนพนักงานต้อนรับที่ check-in ลูกค้าแล้วพาไปห้องที่ถูกต้อง
package handler
import (
"fmt"
"time"
"github.com/gofiber/fiber/v2"
fiberws "github.com/gofiber/websocket/v2"
"github.com/google/uuid"
"go.uber.org/zap"
appws "github.com/kangana1024/iot-workshop/backend/internal/websocket"
)
// WebSocketHandler handles WebSocket upgrade and connection management
type WebSocketHandler struct {
hub *appws.Hub
log *zap.Logger
}
// NewWebSocketHandler creates a new WebSocketHandler
func NewWebSocketHandler(hub *appws.Hub, log *zap.Logger) *WebSocketHandler {
return &WebSocketHandler{hub: hub, log: log}
}
// Upgrade godoc
// @Summary WebSocket upgrade
// @Description Upgrades HTTP to WebSocket for real-time data streaming
// @Tags realtime
// @Param token query string true "JWT token for authentication"
// @Router /api/v1/ws [get]
func (h *WebSocketHandler) Upgrade(c *fiber.Ctx) error {
// Check if request is a WebSocket upgrade
if !fiberws.IsWebSocketUpgrade(c) {
return fiber.ErrUpgradeRequired
}
// Store user info in locals before upgrade (locals persist into WS handler)
userID := c.Locals("user_id")
c.Locals("ws_user_id", fmt.Sprintf("%v", userID))
return c.Next()
}
// Handle is the WebSocket handler after upgrade
func (h *WebSocketHandler) Handle(c *fiberws.Conn) {
userID, _ := c.Locals("ws_user_id").(string)
connID := uuid.New().String()
h.log.Info("New WebSocket connection",
zap.String("conn_id", connID),
zap.String("user_id", userID),
zap.String("remote_addr", c.RemoteAddr().String()),
)
conn := appws.NewConnection(connID, userID, c, h.hub, h.log)
// Register with hub
h.hub.Register(conn)
// Start read and write pumps
go conn.WritePump()
conn.ReadPump() // Blocks until connection closes
}
// Stats returns current WebSocket hub statistics
func (h *WebSocketHandler) Stats(c *fiber.Ctx) error {
return c.JSON(h.hub.Stats())
}
Router Registration
// In router.go — add WebSocket routes
import (
fiberws "github.com/gofiber/websocket/v2"
)
// WebSocket route (เพิ่มใน Setup function)
app.Use("/api/v1/ws", h.WebSocket.Upgrade)
app.Get("/api/v1/ws", fiberws.New(h.WebSocket.Handle))
app.Get("/api/v1/ws/stats", h.WebSocket.Stats)
เชื่อม MQTT กับ WebSocket Hub
internal/mqtt/service.go — เพิ่ม Hub Integration
นี่คือส่วนที่ “มัด” ทุกอย่างเข้าด้วยกัน เมื่อ MQTT รับข้อมูลจาก device มา เราจะกระจายต่อไปยัง WebSocket clients ที่รอรับอยู่ทันที
// เพิ่ม hub field ใน MQTTService
type MQTTService struct {
client *Client
topics *Topics
sensorUsecase *usecase.SensorUsecase
deviceUsecase *usecase.DeviceUsecase
hub *websocket.Hub // ← เพิ่ม
log *zap.Logger
}
// อัปเดต handleTelemetry — หลังจาก ingest แล้ว broadcast ผ่าน WebSocket
func (s *MQTTService) handleTelemetry(ctx context.Context, topic string, payload []byte) {
// ... (code เดิม) ...
// After successful ingestion, fan-out to WebSocket clients
if result.Accepted > 0 {
wsPayload := &websocket.TelemetryPayload{
DeviceID: data.DeviceID,
Timestamp: time.Now(),
Fields: data.Fields,
Tags: data.Tags,
}
s.hub.BroadcastTelemetry(wsPayload)
}
}
// อัปเดต handleStatus — broadcast device status changes
func (s *MQTTService) handleStatus(ctx context.Context, topic string, payload []byte) {
// ... (code เดิม) ...
// Broadcast status change via WebSocket
s.hub.BroadcastDeviceStatus(&websocket.DeviceStatusPayload{
DeviceID: deviceID,
Status: msg.Status,
Timestamp: msg.Timestamp,
})
}
ทดสอบ WebSocket กัน!
มาลองดูว่าระบบที่เราสร้างมาทำงานได้จริงไหม น้องๆ จะได้เห็นภาพชัดขึ้นมากเลย
ทดสอบด้วย websocat
# ติดตั้ง websocat
cargo install websocat
# หรือ
brew install websocat
# เชื่อมต่อ (ส่ง JWT token ผ่าน query parameter)
TOKEN="your-jwt-token-here"
websocat "ws://localhost:8080/api/v1/ws?token=$TOKEN"
Subscribe และรับข้อมูล
// ส่ง: Subscribe ไปยัง device ที่สนใจ
{
"type": "subscribe",
"request_id": "req-001",
"payload": {
"rooms": ["device:sensor-temp-001", "device:sensor-temp-002", "global"]
}
}
// รับ: Confirmation
{
"type": "subscribed",
"request_id": "req-001",
"payload": {
"rooms": ["device:sensor-temp-001", "device:sensor-temp-002", "global"]
},
"timestamp": "2026-03-26T10:00:00Z"
}
// รับ: Real-time telemetry (เมื่อ device ส่งข้อมูล)
{
"type": "telemetry",
"timestamp": "2026-03-26T10:00:05Z",
"payload": {
"device_id": "sensor-temp-001",
"timestamp": "2026-03-26T10:00:05Z",
"fields": {
"temperature": 28.7,
"humidity": 64.3
}
}
}
// รับ: Device status change
{
"type": "device_status",
"timestamp": "2026-03-26T10:01:00Z",
"payload": {
"device_id": "sensor-temp-001",
"status": "offline",
"timestamp": "2026-03-26T10:01:00Z"
}
}
Heartbeat: ชีพจรของ Connection
Heartbeat เหมือนกับการที่หมอเอาเครื่องวัด EKG ไปติดคนไข้ ถ้าสัญญาณหายไปก็แปลว่ามีปัญหา!
// ส่ง Ping
{ "type": "ping", "request_id": "heartbeat-001" }
// รับ Pong
{ "type": "pong", "request_id": "heartbeat-001", "timestamp": "2026-03-26T10:00:30Z" }
JavaScript Client Example
สำหรับ Admin Panel (Vite + React) เราสร้าง client class ที่ auto-reconnect และจัดการ event ต่างๆ ให้พร้อมใช้งาน:
// lib/websocket.ts
export type WSMessage = {
type: string;
room?: string;
payload?: unknown;
timestamp: string;
request_id?: string;
};
export class IoTWebSocket {
private ws: WebSocket | null = null;
private reconnectDelay = 1000;
private maxReconnectDelay = 30000;
private heartbeatInterval: number | null = null;
private listeners = new Map<string, Set<(payload: unknown) => void>>();
constructor(private url: string, private token: string) {}
connect(): void {
this.ws = new WebSocket(`${this.url}?token=${this.token}`);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectDelay = 1000;
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
try {
const msg: WSMessage = JSON.parse(event.data);
this.dispatch(msg.type, msg.payload);
} catch (e) {
console.error('Failed to parse WS message', e);
}
};
this.ws.onclose = () => {
console.log('WebSocket disconnected, reconnecting...');
this.stopHeartbeat();
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
};
this.ws.onerror = (err) => {
console.error('WebSocket error', err);
};
}
subscribe(rooms: string[]): void {
this.send({
type: 'subscribe',
request_id: `sub-${Date.now()}`,
payload: { rooms },
});
}
unsubscribe(rooms: string[]): void {
this.send({
type: 'unsubscribe',
payload: { rooms },
});
}
on(messageType: string, handler: (payload: unknown) => void): () => void {
if (!this.listeners.has(messageType)) {
this.listeners.set(messageType, new Set());
}
this.listeners.get(messageType)!.add(handler);
// Return unsubscribe function
return () => {
this.listeners.get(messageType)?.delete(handler);
};
}
private send(msg: Partial<WSMessage>): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(msg));
}
}
private dispatch(type: string, payload: unknown): void {
this.listeners.get(type)?.forEach((handler) => handler(payload));
}
private startHeartbeat(): void {
this.heartbeatInterval = window.setInterval(() => {
this.send({ type: 'ping', request_id: `ping-${Date.now()}` });
}, 30000);
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
disconnect(): void {
this.stopHeartbeat();
this.ws?.close(1000, 'Client disconnecting');
}
}
// ตัวอย่างการใช้งานใน React component:
//
// const ws = new IoTWebSocket('ws://localhost:8080/api/v1/ws', token);
// ws.connect();
// ws.subscribe(['device:sensor-temp-001', 'global']);
//
// const unsubscribe = ws.on('telemetry', (payload) => {
// const data = payload as TelemetryPayload;
// setTemperature(data.fields.temperature as number);
// });
//
// return () => {
// unsubscribe();
// ws.disconnect();
// };
สังเกตไหม? Reconnect logic ใช้ Exponential Backoff — เริ่มที่ 1 วิ, แล้วก็ 2, 4, 8, … จนสูงสุด 30 วิ เพื่อไม่ให้ client ไป hammer server เมื่อมันล่ม ฉลาดดีเนอะ!
Load Testing
# ติดตั้ง ws-load-test หรือใช้ k6
# ทดสอบ 1000 concurrent connections
k6 run --vus 1000 --duration 60s - <<'EOF'
import ws from 'k6/ws';
import { check } from 'k6';
export default function () {
const url = 'ws://localhost:8080/api/v1/ws?token=test-token';
const res = ws.connect(url, {}, function (socket) {
socket.on('open', () => {
socket.send(JSON.stringify({
type: 'subscribe',
payload: { rooms: ['global'] }
}));
});
socket.on('message', (data) => {
const msg = JSON.parse(data);
check(msg, { 'valid message': (m) => m.type !== undefined });
});
socket.setTimeout(() => socket.close(), 30000);
});
check(res, { 'connected': (r) => r && r.status === 101 });
}
EOF
สรุปสิ่งที่ทำไปทั้งหมด
(づ。◕‿‿◕。)づ ยินดีด้วย! น้องๆ ผ่าน WebSocket Layer มาแล้ว!
ในตอนนี้เราได้ทำอะไรไปบ้าง:
- Upgrade HTTP เป็น WebSocket ด้วย Fiber WebSocket middleware — ประตูเปิดแล้ว!
- Hub Pattern จัดการ Connections และ Rooms อย่างมีประสิทธิภาพด้วย Goroutines และ Channels
- Room System ให้ client subscribe เฉพาะ device/group ที่ต้องการ ไม่ต้องรับทุกอย่าง
- Fan-out telemetry จาก MQTT ไปยัง WebSocket clients ที่ subscribe ไว้แบบ real-time
- Connection Lifecycle ครบถ้วน: Register, Unregister, Write Pump, Read Pump
- Heartbeat ด้วย Ping/Pong ป้องกัน dead connections ที่ค้างในระบบ
- TypeScript WebSocket client ที่ auto-reconnect พร้อมใช้ใน Admin Panel
Workshop Complete! ขอบคุณทุกคนที่ทนมาถึงตรงนี้
เราได้สร้าง IoT Backend ครบชุดแล้ว:
- #4 Fiber Bootstrap — Foundation, Config, Middleware
- #5 MongoDB Models — Domain Models, Repository Pattern
- #6 Device API — CRUD REST API สำหรับจัดการ Devices
- #7 Sensor Ingestion — รับและเก็บข้อมูล Sensor ลง InfluxDB
- #8 MQTT Integration — รับข้อมูลจาก Devices แบบ Real-time
- #9 WebSocket — Push ข้อมูลไปยัง Dashboard แบบ Real-time ← ตรงนี้เอง!
Next Step: มาลุยกันต่อ!
Backend พร้อมแล้ว! ขั้นตอนถัดไปคือสร้าง LynxJS Mobile App และ Vite Admin Panel ที่จะใช้ WebSocket API ที่เราเพิ่งสร้างมา หน้า Dashboard จะได้ขยับตัวเลขแบบ real-time สวยงามมาก รอดูกันได้เลย!
Navigation
- ก่อนหน้า: IoT Workshop #8: MQTT Integration
- ถัดไป: IoT Workshop #10: TICK Stack Setup