รับ Sensor Data เข้า Pipeline แบบไม่กลัวตาย

รับ Sensor Data เข้า Pipeline แบบไม่กลัวตาย

Showkhun · Workshop ·

Branch: workshop/dev-04-sensor-ingestion Phase: Development — Data Pipeline Repository: https://github.com/kangana1024/iot-workshop


เคยสังเกตมั้ยว่าเวลา Sensor ส่งข้อมูลมา บางทีส่งมาพร้อมกันเป็นร้อยๆ ตัว ถ้าเรารับแบบไม่มีระบบ มันก็เหมือนเปิดประตูบ้านแล้วปล่อยคนแปลกหน้าเข้ามาได้เลย — วุ่นวายแน่นอน วันนี้เราจะมาสร้าง Sensor Data Ingestion Pipeline ที่รับข้อมูลได้เร็ว ปลอดภัย และไม่พัง แม้ข้อมูลจะมาพร้อมกันเป็นกอง (ง่าน ง่าน) (ง่าน)


สิ่งที่น้องๆ จะได้เรียนรู้วันนี้

  • ทำไม Architecture ถึงต้องแยก Layer ชัดเจน (WHY ก่อน HOW เสมอ)
  • สร้าง REST API รับ Sensor Data ทั้งแบบ single และ batch
  • ออกแบบ Transformation Pipeline แปลงข้อมูลดิบเป็น InfluxDB Point
  • เชื่อม InfluxDB แบบ non-blocking เพื่อความเร็วสูงสุด
  • ทำ Rate Limiting ป้องกัน Sensor ซนส่งข้อมูลมามากเกินไป

WHY: ทำไมต้องมี Pipeline แยกต่างหาก?

ลองนึกภาพโรงคัดแยกขยะนะ — ขยะที่เข้ามา ไม่ใช่ทุกชิ้นจะเข้าเครื่องได้ทันที ต้องผ่านสายพาน คัดแยก → ล้าง → บด → จัดเก็บ ถ้าโยนขยะเข้าเครื่องตรงๆ เครื่องพัง

Sensor Data ก็เหมือนกัน:

ข้อมูลดิบจาก Sensor

  [Validate]   ← ข้อมูลถูกต้องมั้ย? Device มีจริงมั้ย?

  [Transform]  ← แปลงให้ InfluxDB เข้าใจ

  [Write]      ← ยิงเข้า DB แบบ non-blocking

  InfluxDB ✓

ถ้าเราไม่ทำ Pipeline แยก แล้วยัดโค้ดทุกอย่างรวมกัน — เวลา business logic เปลี่ยน เราต้องแก้ทุกที่ โอกาสพังสูงมาก


ภาพรวม Architecture ด้วย Mermaid

Mermaid Diagram

Flow มันชัดเลยใช่มั้ยครับ — ทุก Request ต้องผ่าน Validate → Check Device → Transform → Write และมี side effect แยกว่าอัปเดต device status ด้วย goroutine ต่างหาก ไม่ blocking main path


Domain Model — กำหนด “รูปร่าง” ของข้อมูลก่อน

ทำไมต้องทำ Domain Model ก่อน?

เหมือนก่อนสร้างบ้าน เราต้องรู้ว่าจะสร้างอะไร — ห้องมีกี่ห้อง ขนาดเท่าไหร่ ถ้าไม่วางแผน สร้างไปแล้วค่อยรู้ว่าลืมห้องน้ำ มันเสียเวลากว่าเยอะมาก

internal/domain/sensor.go

package domain

import "time"

// SensorData represents a single data point from an IoT device
type SensorData struct {
	DeviceID  string                 `json:"device_id"  validate:"required,min=1,max=64"`
	Timestamp *time.Time             `json:"timestamp"  validate:"omitempty"`  // nil = use server time
	Fields    map[string]interface{} `json:"fields"     validate:"required,min=1"`
	Tags      map[string]string      `json:"tags"       validate:"omitempty"`
}

// SensorBatch is a collection of sensor data points from multiple devices
type SensorBatch struct {
	Points []SensorData `json:"points" validate:"required,min=1,max=500,dive"`
}

// SensorQueryFilter defines parameters for querying sensor data
type SensorQueryFilter struct {
	DeviceID string    `query:"device_id" validate:"required"`
	Field    string    `query:"field"     validate:"omitempty"`
	Start    time.Time `query:"start"     validate:"omitempty"`
	Stop     time.Time `query:"stop"      validate:"omitempty"`
	Window   string    `query:"window"    validate:"omitempty,oneof=1m 5m 15m 1h 6h 1d"`
	Limit    int       `query:"limit"     validate:"omitempty,min=1,max=10000"`
}

// AggregatedSensorData represents averaged/aggregated sensor values
type AggregatedSensorData struct {
	Time   time.Time              `json:"time"`
	Fields map[string]interface{} `json:"fields"`
}

// IngestResult is the response for an ingestion request
type IngestResult struct {
	Accepted int      `json:"accepted"`
	Rejected int      `json:"rejected"`
	Errors   []string `json:"errors,omitempty"`
}

สังเกตมั้ยว่า Timestamp เป็น pointer (*time.Time) — ถ้า nil แปลว่า Sensor ไม่ได้ส่ง timestamp มา เราก็ใช้เวลา server แทน นี่คือ defensive design ไม่บังคับ Sensor ทุกตัวต้องทำงานเหมือนกัน


InfluxDB Client — เชื่อมต่อแบบ Non-blocking

ทำไม Non-blocking Write ถึงสำคัญ?

ลองนึกถึงร้านอาหาร — ถ้าพ่อครัวต้องรอจนกว่าจะล้างจานเสร็จก่อนถึงทำอาหารจานใหม่ได้ — ร้านจะช้ามาก

Non-blocking Write ของ InfluxDB คือการ ส่งข้อมูลไปที่ buffer ก่อน แล้ว SDK จัดการ flush ให้เอง เราไม่ต้องรอ response ทุกครั้ง ทำให้ throughput สูงมาก

internal/database/influxdb.go

package database

import (
	"context"
	"fmt"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
	"github.com/influxdata/influxdb-client-go/v2/api"
	"github.com/influxdata/influxdb-client-go/v2/api/write"
	"go.uber.org/zap"

	"github.com/kangana1024/iot-workshop/backend/internal/config"
)

// InfluxDB wraps the InfluxDB client with application-specific helpers
type InfluxDB struct {
	client   influxdb2.Client
	writeAPI api.WriteAPI       // Non-blocking write
	queryAPI api.QueryAPI
	cfg      config.InfluxDBConfig
	log      *zap.Logger
	errCh    chan error
}

// NewInfluxDB creates and verifies an InfluxDB connection
func NewInfluxDB(cfg config.InfluxDBConfig, log *zap.Logger) (*InfluxDB, error) {
	// Create client with options
	options := influxdb2.DefaultOptions().
		SetBatchSize(500).           // Batch up to 500 points
		SetFlushInterval(1000).      // Flush every 1 second
		SetRetryInterval(5000).      // Retry failed writes after 5s
		SetMaxRetries(3).            // Max 3 retries
		SetHTTPRequestTimeout(10)    // 10 second timeout

	client := influxdb2.NewClientWithOptions(cfg.URL, cfg.Token, options)

	// Verify connection
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	health, err := client.Health(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to InfluxDB: %w", err)
	}

	if health.Status != "pass" {
		return nil, fmt.Errorf("InfluxDB is not healthy: %s", health.Status)
	}

	writeAPI := client.WriteAPI(cfg.Org, cfg.Bucket)
	queryAPI := client.QueryAPI(cfg.Org)

	db := &InfluxDB{
		client:   client,
		writeAPI: writeAPI,
		queryAPI: queryAPI,
		cfg:      cfg,
		log:      log,
		errCh:    make(chan error, 100),
	}

	// Start error handler goroutine
	go db.handleWriteErrors()

	log.Info("Connected to InfluxDB",
		zap.String("url", cfg.URL),
		zap.String("org", cfg.Org),
		zap.String("bucket", cfg.Bucket),
	)

	return db, nil
}

// WriteAPI returns the non-blocking write API
func (db *InfluxDB) WriteAPI() api.WriteAPI {
	return db.writeAPI
}

// QueryAPI returns the query API
func (db *InfluxDB) QueryAPI() api.QueryAPI {
	return db.queryAPI
}

// WritePoint writes a single data point asynchronously
func (db *InfluxDB) WritePoint(p *write.Point) {
	db.writeAPI.WritePoint(p)
}

// Flush forces all buffered points to be written immediately
func (db *InfluxDB) Flush() {
	db.writeAPI.Flush()
}

// Close flushes and closes the connection
func (db *InfluxDB) Close() {
	db.writeAPI.Flush()
	db.client.Close()
	db.log.Info("InfluxDB connection closed")
}

// handleWriteErrors monitors for async write errors
func (db *InfluxDB) handleWriteErrors() {
	for err := range db.writeAPI.Errors() {
		db.log.Error("InfluxDB write error", zap.Error(err))
	}
}

// CreateBucketIfNotExists ensures the target bucket exists
func (db *InfluxDB) CreateBucketIfNotExists(ctx context.Context, name string, retentionDays int) error {
	bucketsAPI := db.client.BucketsAPI()

	_, err := bucketsAPI.FindBucketByName(ctx, name)
	if err == nil {
		return nil // Already exists
	}

	// Get org details
	orgsAPI := db.client.OrganizationsAPI()
	org, err := orgsAPI.FindOrganizationByName(ctx, db.cfg.Org)
	if err != nil {
		return fmt.Errorf("failed to find org: %w", err)
	}

	retentionSeconds := int64(retentionDays * 24 * 60 * 60)
	_, err = bucketsAPI.CreateBucketWithName(ctx, org, name, domain.RetentionRule{
		EverySeconds: retentionSeconds,
	})
	return err
}

จุดที่น่าสนใจคือ handleWriteErrors() ทำงานเป็น goroutine แยก — มันคอย listen error channel ตลอดเวลา ถ้า InfluxDB ตอบกลับมาว่า write ไม่สำเร็จ เราจะรู้ทันที แต่ไม่ได้ block main thread เลย


Data Transformation Pipeline — หัวใจหลักของระบบ

ทำไม Transformation ต้องมี Layer แยก?

เหมือนสายพานในโรงงาน — ขั้นตอนแต่ละขั้นทำงานแยกกัน ถ้า spec ของ InfluxDB เปลี่ยน (เช่น format ของ field เปลี่ยน) เราแก้แค่ SensorPipeline ไม่ต้องไปแตะ Handler หรือ Usecase เลย

ASCII Art: สายพาน Transform

 [SensorData]
      |
      v
 +-----------+
 | Check TS  |  <-- timestamp ไม่เก่าเกิน 1hr / ไม่ล้ำอนาคตเกิน 5min
 +-----------+
      |
      v
 +-----------+
 | Build Tag |  <-- device_id + custom tags
 +-----------+
      |
      v
 +-----------+
 | Conv Field|  <-- float64, int64, bool, string เท่านั้น!
 +-----------+
      |
      v
 [InfluxDB Point] (o^^o)

internal/pipeline/sensor_pipeline.go

package pipeline

import (
	"fmt"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
	"github.com/influxdata/influxdb-client-go/v2/api/write"

	"github.com/kangana1024/iot-workshop/backend/internal/domain"
)

const (
	// MeasurementName is the InfluxDB measurement (table) name for sensor data
	MeasurementName = "sensor_data"
)

// TransformResult holds the result of a single data point transformation
type TransformResult struct {
	Point    *write.Point
	DeviceID string
	Error    error
}

// SensorPipeline handles transformation of raw sensor data into InfluxDB points
type SensorPipeline struct{}

// NewSensorPipeline creates a new SensorPipeline
func NewSensorPipeline() *SensorPipeline {
	return &SensorPipeline{}
}

// Transform converts a single SensorData into an InfluxDB Point
func (p *SensorPipeline) Transform(data domain.SensorData) TransformResult {
	// Determine timestamp
	ts := time.Now().UTC()
	if data.Timestamp != nil {
		ts = data.Timestamp.UTC()

		// Reject data that is too old (>1 hour) or in the future (>5 min)
		now := time.Now()
		if ts.Before(now.Add(-1 * time.Hour)) {
			return TransformResult{
				DeviceID: data.DeviceID,
				Error:    fmt.Errorf("timestamp too old: %s", ts.Format(time.RFC3339)),
			}
		}
		if ts.After(now.Add(5 * time.Minute)) {
			return TransformResult{
				DeviceID: data.DeviceID,
				Error:    fmt.Errorf("timestamp is in the future: %s", ts.Format(time.RFC3339)),
			}
		}
	}

	// Build InfluxDB point
	point := influxdb2.NewPoint(
		MeasurementName,
		buildTags(data),
		nil,
		ts,
	)

	// Add fields — validate and convert types
	fieldCount := 0
	for key, value := range data.Fields {
		converted, err := convertFieldValue(key, value)
		if err != nil {
			return TransformResult{
				DeviceID: data.DeviceID,
				Error:    fmt.Errorf("field '%s': %w", key, err),
			}
		}
		point.AddField(key, converted)
		fieldCount++
	}

	if fieldCount == 0 {
		return TransformResult{
			DeviceID: data.DeviceID,
			Error:    fmt.Errorf("no valid fields provided"),
		}
	}

	return TransformResult{
		Point:    point,
		DeviceID: data.DeviceID,
	}
}

// TransformBatch converts a batch of SensorData into InfluxDB Points
func (p *SensorPipeline) TransformBatch(batch []domain.SensorData) []TransformResult {
	results := make([]TransformResult, len(batch))
	for i, data := range batch {
		results[i] = p.Transform(data)
	}
	return results
}

// buildTags constructs the tag set for an InfluxDB point
func buildTags(data domain.SensorData) map[string]string {
	tags := map[string]string{
		"device_id": data.DeviceID,
	}
	// Merge custom tags (device_id cannot be overridden)
	for k, v := range data.Tags {
		if k != "device_id" {
			tags[k] = v
		}
	}
	return tags
}

// convertFieldValue normalizes field values to types InfluxDB accepts
func convertFieldValue(key string, value interface{}) (interface{}, error) {
	switch v := value.(type) {
	case float64:
		return v, nil
	case float32:
		return float64(v), nil
	case int:
		return int64(v), nil
	case int32:
		return int64(v), nil
	case int64:
		return v, nil
	case bool:
		return v, nil
	case string:
		return v, nil
	case nil:
		return nil, fmt.Errorf("value is null")
	default:
		// JSON numbers unmarshal to float64 — handle interface{} from JSON
		return nil, fmt.Errorf("unsupported type %T", value)
	}
}

convertFieldValue สำคัญมาก — InfluxDB ไม่รับ type สุ่มสี่สุ่มห้า JSON unmarshal ค่าตัวเลขเป็น float64 เสมอ แต่ถ้ามีคนส่งมาเป็น int32 จาก library อื่น เราก็ convert ให้ เพื่อกัน runtime panic


Sensor Handler — ประตูรับข้อมูล

ทำไม Handler ต้องบาง?

Handler เปรียบเหมือนพนักงานต้อนรับ — งานของเขาคือ รับแขก ตรวจสอบบัตร แล้วส่งต่อ ไม่ใช่ทำทุกอย่างเอง ถ้า Handler อ้วนเกิน (มี business logic เยอะ) มันยากทดสอบมาก

internal/handler/sensor_handler.go

package handler

import (
	"github.com/gofiber/fiber/v2"
	"go.uber.org/zap"

	"github.com/kangana1024/iot-workshop/backend/internal/domain"
	"github.com/kangana1024/iot-workshop/backend/internal/usecase"
	"github.com/kangana1024/iot-workshop/backend/pkg/response"
	"github.com/kangana1024/iot-workshop/backend/pkg/validator"
)

// SensorHandler handles sensor data ingestion and query requests
type SensorHandler struct {
	sensorUsecase *usecase.SensorUsecase
	validator     *validator.Validator
	log           *zap.Logger
}

// NewSensorHandler creates a new SensorHandler
func NewSensorHandler(su *usecase.SensorUsecase, v *validator.Validator, log *zap.Logger) *SensorHandler {
	return &SensorHandler{sensorUsecase: su, validator: v, log: log}
}

// Ingest godoc
// @Summary      Ingest single sensor data point
// @Description  Accepts a single sensor measurement from a device
// @Tags         sensors
// @Accept       json
// @Produce      json
// @Param        body  body  domain.SensorData  true  "Sensor data point"
// @Success      202   {object}  response.Response
// @Security     BearerAuth
// @Router       /api/v1/sensors/ingest [post]
func (h *SensorHandler) Ingest(c *fiber.Ctx) error {
	var data domain.SensorData
	if err := c.BodyParser(&data); err != nil {
		return response.BadRequest(c, "INVALID_BODY", "Cannot parse request body", err.Error())
	}

	if errs := h.validator.Validate(data); errs != nil {
		return response.BadRequest(c, "VALIDATION_ERROR", "Validation failed", errs)
	}

	result, err := h.sensorUsecase.Ingest(c.Context(), data)
	if err != nil {
		h.log.Error("Failed to ingest sensor data", zap.Error(err))
		return response.InternalError(c, "Failed to process sensor data")
	}

	if result.Rejected > 0 {
		return c.Status(fiber.StatusMultiStatus).JSON(response.Response{
			Success: true,
			Message: "Partial success",
			Data:    result,
		})
	}

	return c.Status(fiber.StatusAccepted).JSON(response.Response{
		Success: true,
		Message: "Data accepted",
		Data:    result,
	})
}

// IngestBatch godoc
// @Summary      Ingest batch of sensor data
// @Description  Accepts multiple sensor measurements in a single request (max 500 points)
// @Tags         sensors
// @Accept       json
// @Produce      json
// @Param        body  body  domain.SensorBatch  true  "Batch of sensor data"
// @Success      202   {object}  response.Response
// @Security     BearerAuth
// @Router       /api/v1/sensors/ingest/batch [post]
func (h *SensorHandler) IngestBatch(c *fiber.Ctx) error {
	var batch domain.SensorBatch
	if err := c.BodyParser(&batch); err != nil {
		return response.BadRequest(c, "INVALID_BODY", "Cannot parse request body", err.Error())
	}

	if errs := h.validator.Validate(batch); errs != nil {
		return response.BadRequest(c, "VALIDATION_ERROR", "Validation failed", errs)
	}

	result, err := h.sensorUsecase.IngestBatch(c.Context(), batch.Points)
	if err != nil {
		h.log.Error("Failed to ingest batch", zap.Error(err))
		return response.InternalError(c, "Failed to process batch")
	}

	statusCode := fiber.StatusAccepted
	if result.Rejected > 0 && result.Accepted == 0 {
		statusCode = fiber.StatusBadRequest
	} else if result.Rejected > 0 {
		statusCode = fiber.StatusMultiStatus
	}

	return c.Status(statusCode).JSON(response.Response{
		Success: result.Accepted > 0,
		Message: fmt.Sprintf("Accepted %d/%d points", result.Accepted, result.Accepted+result.Rejected),
		Data:    result,
	})
}

// GetData godoc
// @Summary      Query sensor data
// @Description  Returns historical sensor data for a device
// @Tags         sensors
// @Produce      json
// @Param        deviceId  path   string  true  "Device ID"
// @Param        field     query  string  false "Filter by field name"
// @Param        start     query  string  false "Start time (RFC3339)"
// @Param        stop      query  string  false "Stop time (RFC3339)"
// @Param        window    query  string  false "Aggregation window: 1m|5m|15m|1h|6h|1d"
// @Success      200       {object}  response.Response
// @Security     BearerAuth
// @Router       /api/v1/sensors/{deviceId}/data [get]
func (h *SensorHandler) GetData(c *fiber.Ctx) error {
	var filter domain.SensorQueryFilter
	filter.DeviceID = c.Params("deviceId")

	if err := c.QueryParser(&filter); err != nil {
		return response.BadRequest(c, "INVALID_QUERY", "Invalid query parameters", err.Error())
	}

	// Default time range: last 1 hour
	if filter.Start.IsZero() {
		filter.Start = time.Now().Add(-1 * time.Hour)
	}
	if filter.Stop.IsZero() {
		filter.Stop = time.Now()
	}
	if filter.Limit == 0 {
		filter.Limit = 1000
	}

	data, err := h.sensorUsecase.QueryData(c.Context(), filter)
	if err != nil {
		h.log.Error("Failed to query sensor data", zap.Error(err))
		return response.InternalError(c, "Failed to retrieve sensor data")
	}

	return response.OK(c, data)
}

เรียบง่ายดีมั้ยครับ — Handler ทำแค่ parse, validate, delegate แล้ว return response ไม่มี business logic ซ่อนอยู่เลย


Sensor Use Case — Brain ของระบบ

ทำไม Use Case ต้อง Avoid N+1?

สมมติน้องๆ มี Batch 100 points จาก 5 device — ถ้าเราตรวจสอบ device ทีละ request นั่นคือ 100 database calls เพื่อดึงข้อมูล 5 device ซ้ำๆ เหมือนไปซูเปอร์มาร์เก็ต 100 รอบเพื่อซื้อของ 5 อย่าง — มันไม่ make sense

Use Case ของเราแก้ด้วยการ batch verify — ดึง unique device IDs ก่อน แล้ว query ครั้งเดียว

internal/usecase/sensor_usecase.go

package usecase

import (
	"context"
	"fmt"
	"time"

	"go.uber.org/zap"

	"github.com/kangana1024/iot-workshop/backend/internal/database"
	"github.com/kangana1024/iot-workshop/backend/internal/domain"
	"github.com/kangana1024/iot-workshop/backend/internal/pipeline"
	"github.com/kangana1024/iot-workshop/backend/internal/repository"
)

// SensorUsecase handles sensor data ingestion and querying
type SensorUsecase struct {
	influxDB   *database.InfluxDB
	deviceRepo repository.DeviceRepository
	pipeline   *pipeline.SensorPipeline
	log        *zap.Logger
}

// NewSensorUsecase creates a new SensorUsecase
func NewSensorUsecase(
	influxDB *database.InfluxDB,
	deviceRepo repository.DeviceRepository,
	log *zap.Logger,
) *SensorUsecase {
	return &SensorUsecase{
		influxDB:   influxDB,
		deviceRepo: deviceRepo,
		pipeline:   pipeline.NewSensorPipeline(),
		log:        log,
	}
}

// Ingest validates and writes a single sensor data point
func (uc *SensorUsecase) Ingest(ctx context.Context, data domain.SensorData) (*domain.IngestResult, error) {
	// Verify device exists
	device, err := uc.deviceRepo.FindByDeviceID(ctx, data.DeviceID)
	if err != nil {
		return nil, fmt.Errorf("failed to verify device: %w", err)
	}
	if device == nil {
		return &domain.IngestResult{
			Rejected: 1,
			Errors:   []string{fmt.Sprintf("device '%s' not found", data.DeviceID)},
		}, nil
	}

	// Transform to InfluxDB point
	result := uc.pipeline.Transform(data)
	if result.Error != nil {
		return &domain.IngestResult{
			Rejected: 1,
			Errors:   []string{result.Error.Error()},
		}, nil
	}

	// Write to InfluxDB (non-blocking)
	uc.influxDB.WritePoint(result.Point)

	// Update device last_seen_at
	go func() {
		bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		if err := uc.deviceRepo.UpdateStatus(bgCtx, data.DeviceID, domain.DeviceStatusOnline); err != nil {
			uc.log.Warn("Failed to update device last_seen_at",
				zap.String("device_id", data.DeviceID),
				zap.Error(err),
			)
		}
	}()

	uc.log.Debug("Sensor data ingested",
		zap.String("device_id", data.DeviceID),
		zap.Int("field_count", len(data.Fields)),
	)

	return &domain.IngestResult{Accepted: 1}, nil
}

// IngestBatch validates and writes multiple sensor data points
func (uc *SensorUsecase) IngestBatch(ctx context.Context, points []domain.SensorData) (*domain.IngestResult, error) {
	// Batch verify devices (avoid N+1 queries)
	deviceIDs := uniqueDeviceIDs(points)
	existingDevices, err := uc.verifyDevices(ctx, deviceIDs)
	if err != nil {
		return nil, err
	}

	result := &domain.IngestResult{}

	for _, data := range points {
		// Check device existence from pre-fetched map
		if !existingDevices[data.DeviceID] {
			result.Rejected++
			result.Errors = append(result.Errors, fmt.Sprintf("device '%s' not found", data.DeviceID))
			continue
		}

		// Transform
		tr := uc.pipeline.Transform(data)
		if tr.Error != nil {
			result.Rejected++
			result.Errors = append(result.Errors,
				fmt.Sprintf("device '%s': %s", data.DeviceID, tr.Error.Error()))
			continue
		}

		uc.influxDB.WritePoint(tr.Point)
		result.Accepted++
	}

	// Update last_seen_at for all successfully ingested devices
	if result.Accepted > 0 {
		go uc.updateDevicesLastSeen(deviceIDs, existingDevices)
	}

	uc.log.Info("Batch ingestion complete",
		zap.Int("accepted", result.Accepted),
		zap.Int("rejected", result.Rejected),
	)

	return result, nil
}

// QueryData retrieves historical sensor data from InfluxDB
func (uc *SensorUsecase) QueryData(ctx context.Context, filter domain.SensorQueryFilter) ([]domain.AggregatedSensorData, error) {
	// Build Flux query
	query := uc.buildFluxQuery(filter)

	uc.log.Debug("Executing Flux query",
		zap.String("device_id", filter.DeviceID),
		zap.String("window", filter.Window),
	)

	result, err := uc.influxDB.QueryAPI().Query(ctx, query)
	if err != nil {
		return nil, fmt.Errorf("query failed: %w", err)
	}
	defer result.Close()

	var records []domain.AggregatedSensorData
	for result.Next() {
		record := result.Record()
		fields := map[string]interface{}{
			record.Field(): record.Value(),
		}
		records = append(records, domain.AggregatedSensorData{
			Time:   record.Time(),
			Fields: fields,
		})
	}

	if err := result.Err(); err != nil {
		return nil, fmt.Errorf("query result error: %w", err)
	}

	return records, nil
}

// buildFluxQuery constructs a Flux query from the filter
func (uc *SensorUsecase) buildFluxQuery(f domain.SensorQueryFilter) string {
	startStr := f.Start.UTC().Format(time.RFC3339)
	stopStr := f.Stop.UTC().Format(time.RFC3339)

	query := fmt.Sprintf(`
from(bucket: "sensor_data")
  |> range(start: %s, stop: %s)
  |> filter(fn: (r) => r["_measurement"] == "sensor_data")
  |> filter(fn: (r) => r["device_id"] == "%s")`,
		startStr, stopStr, f.DeviceID)

	if f.Field != "" {
		query += fmt.Sprintf(`
  |> filter(fn: (r) => r["_field"] == "%s")`, f.Field)
	}

	if f.Window != "" {
		query += fmt.Sprintf(`
  |> aggregateWindow(every: %s, fn: mean, createEmpty: false)`, f.Window)
	}

	query += fmt.Sprintf(`
  |> limit(n: %d)
  |> sort(columns: ["_time"])`, f.Limit)

	return query
}

// verifyDevices checks which device IDs exist in MongoDB
func (uc *SensorUsecase) verifyDevices(ctx context.Context, deviceIDs []string) (map[string]bool, error) {
	existing := make(map[string]bool)
	for _, id := range deviceIDs {
		device, err := uc.deviceRepo.FindByDeviceID(ctx, id)
		if err != nil {
			return nil, err
		}
		existing[id] = device != nil
	}
	return existing, nil
}

func (uc *SensorUsecase) updateDevicesLastSeen(deviceIDs []string, existing map[string]bool) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	for _, id := range deviceIDs {
		if existing[id] {
			_ = uc.deviceRepo.UpdateStatus(ctx, id, domain.DeviceStatusOnline)
		}
	}
}

func uniqueDeviceIDs(points []domain.SensorData) []string {
	seen := make(map[string]struct{})
	var ids []string
	for _, p := range points {
		if _, ok := seen[p.DeviceID]; !ok {
			seen[p.DeviceID] = struct{}{}
			ids = append(ids, p.DeviceID)
		}
	}
	return ids
}

Flux Query ของ InfluxDB อ่านง่ายมาก — มันเหมือน pipe ใน Linux |> คือส่งผลต่อไปยังขั้นตอนถัดไป filter → aggregate → limit → sort ลำดับเหมือนการกรองน้ำเลย


Rate Limiting — ประตูกั้นน้ำท่วม

ทำไมต้อง Rate Limit?

ลองคิดว่า Sensor มี Bug ส่งข้อมูลวนลูปไม่หยุด — ถ้าไม่มี Rate Limit เซิร์ฟเวอร์ล่มได้เลย เหมือนท่อประปาที่ไม่มีวาล์วควบคุม — เปิดสุดแล้วน้ำท่วม

internal/middleware/sensor_rate_limit.go

package middleware

import (
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/fiber/v2/middleware/limiter"
)

// SensorRateLimit applies strict rate limiting to sensor ingestion endpoints
// Allow 1000 requests per minute per device (identified by Authorization header or IP)
func SensorRateLimit() fiber.Handler {
	return limiter.New(limiter.Config{
		Max:               1000,
		Expiration:        1 * time.Minute,
		LimiterMiddleware: limiter.SlidingWindow{},
		KeyGenerator: func(c *fiber.Ctx) string {
			// Rate limit per device token if available, otherwise per IP
			auth := c.Get("Authorization")
			if len(auth) > 7 {
				return "sensor:" + auth[7:15] // First 8 chars of token
			}
			return "ip:" + c.IP()
		},
		LimitReached: func(c *fiber.Ctx) error {
			return c.Status(fiber.StatusTooManyRequests).JSON(fiber.Map{
				"success": false,
				"error": fiber.Map{
					"code":    "RATE_LIMIT_EXCEEDED",
					"message": "Sensor ingestion rate limit exceeded (1000 req/min)",
					"retry_after": "60s",
				},
			})
		},
	})
}

สังเกตว่า Key ใช้ token ของ device ไม่ใช่ IP — เพราะ IoT Gateway หลายตัวอาจอยู่หลัง IP เดียวกัน ถ้า Rate Limit ด้วย IP อาจโดน block ทั้ง network ซึ่งไม่ fair เลย


มาลุยกัน — ทดสอบ API จริง

Single Point Ingestion

# Ingest temperature and humidity from a sensor
curl -X POST http://localhost:8080/api/v1/sensors/ingest \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $DEVICE_TOKEN" \
  -d '{
    "device_id": "sensor-temp-001",
    "timestamp": "2026-03-26T10:00:00Z",
    "fields": {
      "temperature": 28.5,
      "humidity": 65.2,
      "pressure": 1013.25,
      "battery": 87
    },
    "tags": {
      "location": "floor-3",
      "unit": "celsius"
    }
  }'

Batch Ingestion (สูงสุด 500 points)

curl -X POST http://localhost:8080/api/v1/sensors/ingest/batch \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "points": [
      {
        "device_id": "sensor-temp-001",
        "timestamp": "2026-03-26T10:00:00Z",
        "fields": {"temperature": 28.5, "humidity": 65.2}
      },
      {
        "device_id": "sensor-temp-002",
        "timestamp": "2026-03-26T10:00:00Z",
        "fields": {"temperature": 27.1, "humidity": 68.0}
      },
      {
        "device_id": "sensor-motion-001",
        "timestamp": "2026-03-26T10:00:01Z",
        "fields": {"motion": true, "light_level": 450}
      }
    ]
  }'

Query Historical Data

# Get last 6 hours of temperature, aggregated by 5 minute windows
curl "http://localhost:8080/api/v1/sensors/sensor-temp-001/data?field=temperature&start=2026-03-26T04:00:00Z&window=5m" \
  -H "Authorization: Bearer $TOKEN"

ตัวอย่าง Response ที่คาดหวัง

Successful Batch Ingestion

{
  "success": true,
  "message": "Accepted 3/3 points",
  "data": {
    "accepted": 3,
    "rejected": 0
  }
}

Partial Success (บางตัวไม่ผ่าน)

{
  "success": true,
  "message": "Accepted 2/3 points",
  "data": {
    "accepted": 2,
    "rejected": 1,
    "errors": [
      "device 'sensor-unknown-999': device 'sensor-unknown-999' not found"
    ]
  }
}

ระบบยังตอบ success: true ถึงแม้บางตัวจะ reject — เพราะ HTTP 207 Multi-Status บอกว่า “บางอย่างสำเร็จ บางอย่างไม่สำเร็จ” ซึ่ง semantically ถูกต้องกว่า 400 Bad Request


Recap — สรุปว่าวันนี้ทำอะไรไปบ้าง

วันนี้เราสร้าง Sensor Ingestion Pipeline ครบ loop แล้ว:

  • REST API รองรับทั้ง single (/ingest) และ batch (/ingest/batch) สูงสุด 500 points
  • Transformation Pipeline แยก Layer ชัดเจน แก้ได้ง่ายไม่กระทบส่วนอื่น
  • Timestamp Validation ป้องกันข้อมูลเก่าเกิน 1 ชั่วโมง และล้ำอนาคตเกิน 5 นาที
  • Non-blocking Write API ทำให้ throughput สูง ไม่ต้องรอ DB ตอบทุก request
  • Device Status อัปเดตเป็น online อัตโนมัติผ่าน goroutine ไม่ block main path
  • Flux Query Builder สร้าง query แบบ dynamic ตาม filter ที่รับมา
  • Rate Limiting ป้องกัน Sensor ซนส่งข้อมูลมาเกิน 1000 req/min

Next Step

ตอนหน้าเราจะไปเชื่อม MQTT Broker เพราะ Sensor ในชีวิตจริงส่วนใหญ่ไม่ได้คุยผ่าน REST API — พวกมันใช้ MQTT ซึ่งเบากว่า เหมาะกับ device ที่มีแบตจำกัด