รับ Sensor Data เข้า Pipeline แบบไม่กลัวตาย
Branch:
workshop/dev-04-sensor-ingestionPhase: Development — Data Pipeline Repository: https://github.com/kangana1024/iot-workshop
เคยสังเกตมั้ยว่าเวลา Sensor ส่งข้อมูลมา บางทีส่งมาพร้อมกันเป็นร้อยๆ ตัว ถ้าเรารับแบบไม่มีระบบ มันก็เหมือนเปิดประตูบ้านแล้วปล่อยคนแปลกหน้าเข้ามาได้เลย — วุ่นวายแน่นอน วันนี้เราจะมาสร้าง Sensor Data Ingestion Pipeline ที่รับข้อมูลได้เร็ว ปลอดภัย และไม่พัง แม้ข้อมูลจะมาพร้อมกันเป็นกอง (ง่าน ง่าน) (ง่าน)
สิ่งที่น้องๆ จะได้เรียนรู้วันนี้
- ทำไม Architecture ถึงต้องแยก Layer ชัดเจน (WHY ก่อน HOW เสมอ)
- สร้าง REST API รับ Sensor Data ทั้งแบบ single และ batch
- ออกแบบ Transformation Pipeline แปลงข้อมูลดิบเป็น InfluxDB Point
- เชื่อม InfluxDB แบบ non-blocking เพื่อความเร็วสูงสุด
- ทำ Rate Limiting ป้องกัน Sensor ซนส่งข้อมูลมามากเกินไป
WHY: ทำไมต้องมี Pipeline แยกต่างหาก?
ลองนึกภาพโรงคัดแยกขยะนะ — ขยะที่เข้ามา ไม่ใช่ทุกชิ้นจะเข้าเครื่องได้ทันที ต้องผ่านสายพาน คัดแยก → ล้าง → บด → จัดเก็บ ถ้าโยนขยะเข้าเครื่องตรงๆ เครื่องพัง
Sensor Data ก็เหมือนกัน:
ข้อมูลดิบจาก Sensor
↓
[Validate] ← ข้อมูลถูกต้องมั้ย? Device มีจริงมั้ย?
↓
[Transform] ← แปลงให้ InfluxDB เข้าใจ
↓
[Write] ← ยิงเข้า DB แบบ non-blocking
↓
InfluxDB ✓
ถ้าเราไม่ทำ Pipeline แยก แล้วยัดโค้ดทุกอย่างรวมกัน — เวลา business logic เปลี่ยน เราต้องแก้ทุกที่ โอกาสพังสูงมาก
ภาพรวม Architecture ด้วย Mermaid
Flow มันชัดเลยใช่มั้ยครับ — ทุก Request ต้องผ่าน Validate → Check Device → Transform → Write และมี side effect แยกว่าอัปเดต device status ด้วย goroutine ต่างหาก ไม่ blocking main path
Domain Model — กำหนด “รูปร่าง” ของข้อมูลก่อน
ทำไมต้องทำ Domain Model ก่อน?
เหมือนก่อนสร้างบ้าน เราต้องรู้ว่าจะสร้างอะไร — ห้องมีกี่ห้อง ขนาดเท่าไหร่ ถ้าไม่วางแผน สร้างไปแล้วค่อยรู้ว่าลืมห้องน้ำ มันเสียเวลากว่าเยอะมาก
internal/domain/sensor.go
package domain
import "time"
// SensorData represents a single data point from an IoT device
type SensorData struct {
DeviceID string `json:"device_id" validate:"required,min=1,max=64"`
Timestamp *time.Time `json:"timestamp" validate:"omitempty"` // nil = use server time
Fields map[string]interface{} `json:"fields" validate:"required,min=1"`
Tags map[string]string `json:"tags" validate:"omitempty"`
}
// SensorBatch is a collection of sensor data points from multiple devices
type SensorBatch struct {
Points []SensorData `json:"points" validate:"required,min=1,max=500,dive"`
}
// SensorQueryFilter defines parameters for querying sensor data
type SensorQueryFilter struct {
DeviceID string `query:"device_id" validate:"required"`
Field string `query:"field" validate:"omitempty"`
Start time.Time `query:"start" validate:"omitempty"`
Stop time.Time `query:"stop" validate:"omitempty"`
Window string `query:"window" validate:"omitempty,oneof=1m 5m 15m 1h 6h 1d"`
Limit int `query:"limit" validate:"omitempty,min=1,max=10000"`
}
// AggregatedSensorData represents averaged/aggregated sensor values
type AggregatedSensorData struct {
Time time.Time `json:"time"`
Fields map[string]interface{} `json:"fields"`
}
// IngestResult is the response for an ingestion request
type IngestResult struct {
Accepted int `json:"accepted"`
Rejected int `json:"rejected"`
Errors []string `json:"errors,omitempty"`
}
สังเกตมั้ยว่า Timestamp เป็น pointer (*time.Time) — ถ้า nil แปลว่า Sensor ไม่ได้ส่ง timestamp มา เราก็ใช้เวลา server แทน นี่คือ defensive design ไม่บังคับ Sensor ทุกตัวต้องทำงานเหมือนกัน
InfluxDB Client — เชื่อมต่อแบบ Non-blocking
ทำไม Non-blocking Write ถึงสำคัญ?
ลองนึกถึงร้านอาหาร — ถ้าพ่อครัวต้องรอจนกว่าจะล้างจานเสร็จก่อนถึงทำอาหารจานใหม่ได้ — ร้านจะช้ามาก
Non-blocking Write ของ InfluxDB คือการ ส่งข้อมูลไปที่ buffer ก่อน แล้ว SDK จัดการ flush ให้เอง เราไม่ต้องรอ response ทุกครั้ง ทำให้ throughput สูงมาก
internal/database/influxdb.go
package database
import (
"context"
"fmt"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"go.uber.org/zap"
"github.com/kangana1024/iot-workshop/backend/internal/config"
)
// InfluxDB wraps the InfluxDB client with application-specific helpers
type InfluxDB struct {
client influxdb2.Client
writeAPI api.WriteAPI // Non-blocking write
queryAPI api.QueryAPI
cfg config.InfluxDBConfig
log *zap.Logger
errCh chan error
}
// NewInfluxDB creates and verifies an InfluxDB connection
func NewInfluxDB(cfg config.InfluxDBConfig, log *zap.Logger) (*InfluxDB, error) {
// Create client with options
options := influxdb2.DefaultOptions().
SetBatchSize(500). // Batch up to 500 points
SetFlushInterval(1000). // Flush every 1 second
SetRetryInterval(5000). // Retry failed writes after 5s
SetMaxRetries(3). // Max 3 retries
SetHTTPRequestTimeout(10) // 10 second timeout
client := influxdb2.NewClientWithOptions(cfg.URL, cfg.Token, options)
// Verify connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
health, err := client.Health(ctx)
if err != nil {
return nil, fmt.Errorf("failed to connect to InfluxDB: %w", err)
}
if health.Status != "pass" {
return nil, fmt.Errorf("InfluxDB is not healthy: %s", health.Status)
}
writeAPI := client.WriteAPI(cfg.Org, cfg.Bucket)
queryAPI := client.QueryAPI(cfg.Org)
db := &InfluxDB{
client: client,
writeAPI: writeAPI,
queryAPI: queryAPI,
cfg: cfg,
log: log,
errCh: make(chan error, 100),
}
// Start error handler goroutine
go db.handleWriteErrors()
log.Info("Connected to InfluxDB",
zap.String("url", cfg.URL),
zap.String("org", cfg.Org),
zap.String("bucket", cfg.Bucket),
)
return db, nil
}
// WriteAPI returns the non-blocking write API
func (db *InfluxDB) WriteAPI() api.WriteAPI {
return db.writeAPI
}
// QueryAPI returns the query API
func (db *InfluxDB) QueryAPI() api.QueryAPI {
return db.queryAPI
}
// WritePoint writes a single data point asynchronously
func (db *InfluxDB) WritePoint(p *write.Point) {
db.writeAPI.WritePoint(p)
}
// Flush forces all buffered points to be written immediately
func (db *InfluxDB) Flush() {
db.writeAPI.Flush()
}
// Close flushes and closes the connection
func (db *InfluxDB) Close() {
db.writeAPI.Flush()
db.client.Close()
db.log.Info("InfluxDB connection closed")
}
// handleWriteErrors monitors for async write errors
func (db *InfluxDB) handleWriteErrors() {
for err := range db.writeAPI.Errors() {
db.log.Error("InfluxDB write error", zap.Error(err))
}
}
// CreateBucketIfNotExists ensures the target bucket exists
func (db *InfluxDB) CreateBucketIfNotExists(ctx context.Context, name string, retentionDays int) error {
bucketsAPI := db.client.BucketsAPI()
_, err := bucketsAPI.FindBucketByName(ctx, name)
if err == nil {
return nil // Already exists
}
// Get org details
orgsAPI := db.client.OrganizationsAPI()
org, err := orgsAPI.FindOrganizationByName(ctx, db.cfg.Org)
if err != nil {
return fmt.Errorf("failed to find org: %w", err)
}
retentionSeconds := int64(retentionDays * 24 * 60 * 60)
_, err = bucketsAPI.CreateBucketWithName(ctx, org, name, domain.RetentionRule{
EverySeconds: retentionSeconds,
})
return err
}
จุดที่น่าสนใจคือ handleWriteErrors() ทำงานเป็น goroutine แยก — มันคอย listen error channel ตลอดเวลา ถ้า InfluxDB ตอบกลับมาว่า write ไม่สำเร็จ เราจะรู้ทันที แต่ไม่ได้ block main thread เลย
Data Transformation Pipeline — หัวใจหลักของระบบ
ทำไม Transformation ต้องมี Layer แยก?
เหมือนสายพานในโรงงาน — ขั้นตอนแต่ละขั้นทำงานแยกกัน ถ้า spec ของ InfluxDB เปลี่ยน (เช่น format ของ field เปลี่ยน) เราแก้แค่ SensorPipeline ไม่ต้องไปแตะ Handler หรือ Usecase เลย
ASCII Art: สายพาน Transform
[SensorData]
|
v
+-----------+
| Check TS | <-- timestamp ไม่เก่าเกิน 1hr / ไม่ล้ำอนาคตเกิน 5min
+-----------+
|
v
+-----------+
| Build Tag | <-- device_id + custom tags
+-----------+
|
v
+-----------+
| Conv Field| <-- float64, int64, bool, string เท่านั้น!
+-----------+
|
v
[InfluxDB Point] (o^^o)
internal/pipeline/sensor_pipeline.go
package pipeline
import (
"fmt"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/kangana1024/iot-workshop/backend/internal/domain"
)
const (
// MeasurementName is the InfluxDB measurement (table) name for sensor data
MeasurementName = "sensor_data"
)
// TransformResult holds the result of a single data point transformation
type TransformResult struct {
Point *write.Point
DeviceID string
Error error
}
// SensorPipeline handles transformation of raw sensor data into InfluxDB points
type SensorPipeline struct{}
// NewSensorPipeline creates a new SensorPipeline
func NewSensorPipeline() *SensorPipeline {
return &SensorPipeline{}
}
// Transform converts a single SensorData into an InfluxDB Point
func (p *SensorPipeline) Transform(data domain.SensorData) TransformResult {
// Determine timestamp
ts := time.Now().UTC()
if data.Timestamp != nil {
ts = data.Timestamp.UTC()
// Reject data that is too old (>1 hour) or in the future (>5 min)
now := time.Now()
if ts.Before(now.Add(-1 * time.Hour)) {
return TransformResult{
DeviceID: data.DeviceID,
Error: fmt.Errorf("timestamp too old: %s", ts.Format(time.RFC3339)),
}
}
if ts.After(now.Add(5 * time.Minute)) {
return TransformResult{
DeviceID: data.DeviceID,
Error: fmt.Errorf("timestamp is in the future: %s", ts.Format(time.RFC3339)),
}
}
}
// Build InfluxDB point
point := influxdb2.NewPoint(
MeasurementName,
buildTags(data),
nil,
ts,
)
// Add fields — validate and convert types
fieldCount := 0
for key, value := range data.Fields {
converted, err := convertFieldValue(key, value)
if err != nil {
return TransformResult{
DeviceID: data.DeviceID,
Error: fmt.Errorf("field '%s': %w", key, err),
}
}
point.AddField(key, converted)
fieldCount++
}
if fieldCount == 0 {
return TransformResult{
DeviceID: data.DeviceID,
Error: fmt.Errorf("no valid fields provided"),
}
}
return TransformResult{
Point: point,
DeviceID: data.DeviceID,
}
}
// TransformBatch converts a batch of SensorData into InfluxDB Points
func (p *SensorPipeline) TransformBatch(batch []domain.SensorData) []TransformResult {
results := make([]TransformResult, len(batch))
for i, data := range batch {
results[i] = p.Transform(data)
}
return results
}
// buildTags constructs the tag set for an InfluxDB point
func buildTags(data domain.SensorData) map[string]string {
tags := map[string]string{
"device_id": data.DeviceID,
}
// Merge custom tags (device_id cannot be overridden)
for k, v := range data.Tags {
if k != "device_id" {
tags[k] = v
}
}
return tags
}
// convertFieldValue normalizes field values to types InfluxDB accepts
func convertFieldValue(key string, value interface{}) (interface{}, error) {
switch v := value.(type) {
case float64:
return v, nil
case float32:
return float64(v), nil
case int:
return int64(v), nil
case int32:
return int64(v), nil
case int64:
return v, nil
case bool:
return v, nil
case string:
return v, nil
case nil:
return nil, fmt.Errorf("value is null")
default:
// JSON numbers unmarshal to float64 — handle interface{} from JSON
return nil, fmt.Errorf("unsupported type %T", value)
}
}
convertFieldValue สำคัญมาก — InfluxDB ไม่รับ type สุ่มสี่สุ่มห้า JSON unmarshal ค่าตัวเลขเป็น float64 เสมอ แต่ถ้ามีคนส่งมาเป็น int32 จาก library อื่น เราก็ convert ให้ เพื่อกัน runtime panic
Sensor Handler — ประตูรับข้อมูล
ทำไม Handler ต้องบาง?
Handler เปรียบเหมือนพนักงานต้อนรับ — งานของเขาคือ รับแขก ตรวจสอบบัตร แล้วส่งต่อ ไม่ใช่ทำทุกอย่างเอง ถ้า Handler อ้วนเกิน (มี business logic เยอะ) มันยากทดสอบมาก
internal/handler/sensor_handler.go
package handler
import (
"github.com/gofiber/fiber/v2"
"go.uber.org/zap"
"github.com/kangana1024/iot-workshop/backend/internal/domain"
"github.com/kangana1024/iot-workshop/backend/internal/usecase"
"github.com/kangana1024/iot-workshop/backend/pkg/response"
"github.com/kangana1024/iot-workshop/backend/pkg/validator"
)
// SensorHandler handles sensor data ingestion and query requests
type SensorHandler struct {
sensorUsecase *usecase.SensorUsecase
validator *validator.Validator
log *zap.Logger
}
// NewSensorHandler creates a new SensorHandler
func NewSensorHandler(su *usecase.SensorUsecase, v *validator.Validator, log *zap.Logger) *SensorHandler {
return &SensorHandler{sensorUsecase: su, validator: v, log: log}
}
// Ingest godoc
// @Summary Ingest single sensor data point
// @Description Accepts a single sensor measurement from a device
// @Tags sensors
// @Accept json
// @Produce json
// @Param body body domain.SensorData true "Sensor data point"
// @Success 202 {object} response.Response
// @Security BearerAuth
// @Router /api/v1/sensors/ingest [post]
func (h *SensorHandler) Ingest(c *fiber.Ctx) error {
var data domain.SensorData
if err := c.BodyParser(&data); err != nil {
return response.BadRequest(c, "INVALID_BODY", "Cannot parse request body", err.Error())
}
if errs := h.validator.Validate(data); errs != nil {
return response.BadRequest(c, "VALIDATION_ERROR", "Validation failed", errs)
}
result, err := h.sensorUsecase.Ingest(c.Context(), data)
if err != nil {
h.log.Error("Failed to ingest sensor data", zap.Error(err))
return response.InternalError(c, "Failed to process sensor data")
}
if result.Rejected > 0 {
return c.Status(fiber.StatusMultiStatus).JSON(response.Response{
Success: true,
Message: "Partial success",
Data: result,
})
}
return c.Status(fiber.StatusAccepted).JSON(response.Response{
Success: true,
Message: "Data accepted",
Data: result,
})
}
// IngestBatch godoc
// @Summary Ingest batch of sensor data
// @Description Accepts multiple sensor measurements in a single request (max 500 points)
// @Tags sensors
// @Accept json
// @Produce json
// @Param body body domain.SensorBatch true "Batch of sensor data"
// @Success 202 {object} response.Response
// @Security BearerAuth
// @Router /api/v1/sensors/ingest/batch [post]
func (h *SensorHandler) IngestBatch(c *fiber.Ctx) error {
var batch domain.SensorBatch
if err := c.BodyParser(&batch); err != nil {
return response.BadRequest(c, "INVALID_BODY", "Cannot parse request body", err.Error())
}
if errs := h.validator.Validate(batch); errs != nil {
return response.BadRequest(c, "VALIDATION_ERROR", "Validation failed", errs)
}
result, err := h.sensorUsecase.IngestBatch(c.Context(), batch.Points)
if err != nil {
h.log.Error("Failed to ingest batch", zap.Error(err))
return response.InternalError(c, "Failed to process batch")
}
statusCode := fiber.StatusAccepted
if result.Rejected > 0 && result.Accepted == 0 {
statusCode = fiber.StatusBadRequest
} else if result.Rejected > 0 {
statusCode = fiber.StatusMultiStatus
}
return c.Status(statusCode).JSON(response.Response{
Success: result.Accepted > 0,
Message: fmt.Sprintf("Accepted %d/%d points", result.Accepted, result.Accepted+result.Rejected),
Data: result,
})
}
// GetData godoc
// @Summary Query sensor data
// @Description Returns historical sensor data for a device
// @Tags sensors
// @Produce json
// @Param deviceId path string true "Device ID"
// @Param field query string false "Filter by field name"
// @Param start query string false "Start time (RFC3339)"
// @Param stop query string false "Stop time (RFC3339)"
// @Param window query string false "Aggregation window: 1m|5m|15m|1h|6h|1d"
// @Success 200 {object} response.Response
// @Security BearerAuth
// @Router /api/v1/sensors/{deviceId}/data [get]
func (h *SensorHandler) GetData(c *fiber.Ctx) error {
var filter domain.SensorQueryFilter
filter.DeviceID = c.Params("deviceId")
if err := c.QueryParser(&filter); err != nil {
return response.BadRequest(c, "INVALID_QUERY", "Invalid query parameters", err.Error())
}
// Default time range: last 1 hour
if filter.Start.IsZero() {
filter.Start = time.Now().Add(-1 * time.Hour)
}
if filter.Stop.IsZero() {
filter.Stop = time.Now()
}
if filter.Limit == 0 {
filter.Limit = 1000
}
data, err := h.sensorUsecase.QueryData(c.Context(), filter)
if err != nil {
h.log.Error("Failed to query sensor data", zap.Error(err))
return response.InternalError(c, "Failed to retrieve sensor data")
}
return response.OK(c, data)
}
เรียบง่ายดีมั้ยครับ — Handler ทำแค่ parse, validate, delegate แล้ว return response ไม่มี business logic ซ่อนอยู่เลย
Sensor Use Case — Brain ของระบบ
ทำไม Use Case ต้อง Avoid N+1?
สมมติน้องๆ มี Batch 100 points จาก 5 device — ถ้าเราตรวจสอบ device ทีละ request นั่นคือ 100 database calls เพื่อดึงข้อมูล 5 device ซ้ำๆ เหมือนไปซูเปอร์มาร์เก็ต 100 รอบเพื่อซื้อของ 5 อย่าง — มันไม่ make sense
Use Case ของเราแก้ด้วยการ batch verify — ดึง unique device IDs ก่อน แล้ว query ครั้งเดียว
internal/usecase/sensor_usecase.go
package usecase
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"github.com/kangana1024/iot-workshop/backend/internal/database"
"github.com/kangana1024/iot-workshop/backend/internal/domain"
"github.com/kangana1024/iot-workshop/backend/internal/pipeline"
"github.com/kangana1024/iot-workshop/backend/internal/repository"
)
// SensorUsecase handles sensor data ingestion and querying
type SensorUsecase struct {
influxDB *database.InfluxDB
deviceRepo repository.DeviceRepository
pipeline *pipeline.SensorPipeline
log *zap.Logger
}
// NewSensorUsecase creates a new SensorUsecase
func NewSensorUsecase(
influxDB *database.InfluxDB,
deviceRepo repository.DeviceRepository,
log *zap.Logger,
) *SensorUsecase {
return &SensorUsecase{
influxDB: influxDB,
deviceRepo: deviceRepo,
pipeline: pipeline.NewSensorPipeline(),
log: log,
}
}
// Ingest validates and writes a single sensor data point
func (uc *SensorUsecase) Ingest(ctx context.Context, data domain.SensorData) (*domain.IngestResult, error) {
// Verify device exists
device, err := uc.deviceRepo.FindByDeviceID(ctx, data.DeviceID)
if err != nil {
return nil, fmt.Errorf("failed to verify device: %w", err)
}
if device == nil {
return &domain.IngestResult{
Rejected: 1,
Errors: []string{fmt.Sprintf("device '%s' not found", data.DeviceID)},
}, nil
}
// Transform to InfluxDB point
result := uc.pipeline.Transform(data)
if result.Error != nil {
return &domain.IngestResult{
Rejected: 1,
Errors: []string{result.Error.Error()},
}, nil
}
// Write to InfluxDB (non-blocking)
uc.influxDB.WritePoint(result.Point)
// Update device last_seen_at
go func() {
bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := uc.deviceRepo.UpdateStatus(bgCtx, data.DeviceID, domain.DeviceStatusOnline); err != nil {
uc.log.Warn("Failed to update device last_seen_at",
zap.String("device_id", data.DeviceID),
zap.Error(err),
)
}
}()
uc.log.Debug("Sensor data ingested",
zap.String("device_id", data.DeviceID),
zap.Int("field_count", len(data.Fields)),
)
return &domain.IngestResult{Accepted: 1}, nil
}
// IngestBatch validates and writes multiple sensor data points
func (uc *SensorUsecase) IngestBatch(ctx context.Context, points []domain.SensorData) (*domain.IngestResult, error) {
// Batch verify devices (avoid N+1 queries)
deviceIDs := uniqueDeviceIDs(points)
existingDevices, err := uc.verifyDevices(ctx, deviceIDs)
if err != nil {
return nil, err
}
result := &domain.IngestResult{}
for _, data := range points {
// Check device existence from pre-fetched map
if !existingDevices[data.DeviceID] {
result.Rejected++
result.Errors = append(result.Errors, fmt.Sprintf("device '%s' not found", data.DeviceID))
continue
}
// Transform
tr := uc.pipeline.Transform(data)
if tr.Error != nil {
result.Rejected++
result.Errors = append(result.Errors,
fmt.Sprintf("device '%s': %s", data.DeviceID, tr.Error.Error()))
continue
}
uc.influxDB.WritePoint(tr.Point)
result.Accepted++
}
// Update last_seen_at for all successfully ingested devices
if result.Accepted > 0 {
go uc.updateDevicesLastSeen(deviceIDs, existingDevices)
}
uc.log.Info("Batch ingestion complete",
zap.Int("accepted", result.Accepted),
zap.Int("rejected", result.Rejected),
)
return result, nil
}
// QueryData retrieves historical sensor data from InfluxDB
func (uc *SensorUsecase) QueryData(ctx context.Context, filter domain.SensorQueryFilter) ([]domain.AggregatedSensorData, error) {
// Build Flux query
query := uc.buildFluxQuery(filter)
uc.log.Debug("Executing Flux query",
zap.String("device_id", filter.DeviceID),
zap.String("window", filter.Window),
)
result, err := uc.influxDB.QueryAPI().Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
defer result.Close()
var records []domain.AggregatedSensorData
for result.Next() {
record := result.Record()
fields := map[string]interface{}{
record.Field(): record.Value(),
}
records = append(records, domain.AggregatedSensorData{
Time: record.Time(),
Fields: fields,
})
}
if err := result.Err(); err != nil {
return nil, fmt.Errorf("query result error: %w", err)
}
return records, nil
}
// buildFluxQuery constructs a Flux query from the filter
func (uc *SensorUsecase) buildFluxQuery(f domain.SensorQueryFilter) string {
startStr := f.Start.UTC().Format(time.RFC3339)
stopStr := f.Stop.UTC().Format(time.RFC3339)
query := fmt.Sprintf(`
from(bucket: "sensor_data")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["device_id"] == "%s")`,
startStr, stopStr, f.DeviceID)
if f.Field != "" {
query += fmt.Sprintf(`
|> filter(fn: (r) => r["_field"] == "%s")`, f.Field)
}
if f.Window != "" {
query += fmt.Sprintf(`
|> aggregateWindow(every: %s, fn: mean, createEmpty: false)`, f.Window)
}
query += fmt.Sprintf(`
|> limit(n: %d)
|> sort(columns: ["_time"])`, f.Limit)
return query
}
// verifyDevices checks which device IDs exist in MongoDB
func (uc *SensorUsecase) verifyDevices(ctx context.Context, deviceIDs []string) (map[string]bool, error) {
existing := make(map[string]bool)
for _, id := range deviceIDs {
device, err := uc.deviceRepo.FindByDeviceID(ctx, id)
if err != nil {
return nil, err
}
existing[id] = device != nil
}
return existing, nil
}
func (uc *SensorUsecase) updateDevicesLastSeen(deviceIDs []string, existing map[string]bool) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, id := range deviceIDs {
if existing[id] {
_ = uc.deviceRepo.UpdateStatus(ctx, id, domain.DeviceStatusOnline)
}
}
}
func uniqueDeviceIDs(points []domain.SensorData) []string {
seen := make(map[string]struct{})
var ids []string
for _, p := range points {
if _, ok := seen[p.DeviceID]; !ok {
seen[p.DeviceID] = struct{}{}
ids = append(ids, p.DeviceID)
}
}
return ids
}
Flux Query ของ InfluxDB อ่านง่ายมาก — มันเหมือน pipe ใน Linux |> คือส่งผลต่อไปยังขั้นตอนถัดไป filter → aggregate → limit → sort ลำดับเหมือนการกรองน้ำเลย
Rate Limiting — ประตูกั้นน้ำท่วม
ทำไมต้อง Rate Limit?
ลองคิดว่า Sensor มี Bug ส่งข้อมูลวนลูปไม่หยุด — ถ้าไม่มี Rate Limit เซิร์ฟเวอร์ล่มได้เลย เหมือนท่อประปาที่ไม่มีวาล์วควบคุม — เปิดสุดแล้วน้ำท่วม
internal/middleware/sensor_rate_limit.go
package middleware
import (
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/limiter"
)
// SensorRateLimit applies strict rate limiting to sensor ingestion endpoints
// Allow 1000 requests per minute per device (identified by Authorization header or IP)
func SensorRateLimit() fiber.Handler {
return limiter.New(limiter.Config{
Max: 1000,
Expiration: 1 * time.Minute,
LimiterMiddleware: limiter.SlidingWindow{},
KeyGenerator: func(c *fiber.Ctx) string {
// Rate limit per device token if available, otherwise per IP
auth := c.Get("Authorization")
if len(auth) > 7 {
return "sensor:" + auth[7:15] // First 8 chars of token
}
return "ip:" + c.IP()
},
LimitReached: func(c *fiber.Ctx) error {
return c.Status(fiber.StatusTooManyRequests).JSON(fiber.Map{
"success": false,
"error": fiber.Map{
"code": "RATE_LIMIT_EXCEEDED",
"message": "Sensor ingestion rate limit exceeded (1000 req/min)",
"retry_after": "60s",
},
})
},
})
}
สังเกตว่า Key ใช้ token ของ device ไม่ใช่ IP — เพราะ IoT Gateway หลายตัวอาจอยู่หลัง IP เดียวกัน ถ้า Rate Limit ด้วย IP อาจโดน block ทั้ง network ซึ่งไม่ fair เลย
มาลุยกัน — ทดสอบ API จริง
Single Point Ingestion
# Ingest temperature and humidity from a sensor
curl -X POST http://localhost:8080/api/v1/sensors/ingest \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEVICE_TOKEN" \
-d '{
"device_id": "sensor-temp-001",
"timestamp": "2026-03-26T10:00:00Z",
"fields": {
"temperature": 28.5,
"humidity": 65.2,
"pressure": 1013.25,
"battery": 87
},
"tags": {
"location": "floor-3",
"unit": "celsius"
}
}'
Batch Ingestion (สูงสุด 500 points)
curl -X POST http://localhost:8080/api/v1/sensors/ingest/batch \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d '{
"points": [
{
"device_id": "sensor-temp-001",
"timestamp": "2026-03-26T10:00:00Z",
"fields": {"temperature": 28.5, "humidity": 65.2}
},
{
"device_id": "sensor-temp-002",
"timestamp": "2026-03-26T10:00:00Z",
"fields": {"temperature": 27.1, "humidity": 68.0}
},
{
"device_id": "sensor-motion-001",
"timestamp": "2026-03-26T10:00:01Z",
"fields": {"motion": true, "light_level": 450}
}
]
}'
Query Historical Data
# Get last 6 hours of temperature, aggregated by 5 minute windows
curl "http://localhost:8080/api/v1/sensors/sensor-temp-001/data?field=temperature&start=2026-03-26T04:00:00Z&window=5m" \
-H "Authorization: Bearer $TOKEN"
ตัวอย่าง Response ที่คาดหวัง
Successful Batch Ingestion
{
"success": true,
"message": "Accepted 3/3 points",
"data": {
"accepted": 3,
"rejected": 0
}
}
Partial Success (บางตัวไม่ผ่าน)
{
"success": true,
"message": "Accepted 2/3 points",
"data": {
"accepted": 2,
"rejected": 1,
"errors": [
"device 'sensor-unknown-999': device 'sensor-unknown-999' not found"
]
}
}
ระบบยังตอบ success: true ถึงแม้บางตัวจะ reject — เพราะ HTTP 207 Multi-Status บอกว่า “บางอย่างสำเร็จ บางอย่างไม่สำเร็จ” ซึ่ง semantically ถูกต้องกว่า 400 Bad Request
Recap — สรุปว่าวันนี้ทำอะไรไปบ้าง
วันนี้เราสร้าง Sensor Ingestion Pipeline ครบ loop แล้ว:
- REST API รองรับทั้ง single (
/ingest) และ batch (/ingest/batch) สูงสุด 500 points - Transformation Pipeline แยก Layer ชัดเจน แก้ได้ง่ายไม่กระทบส่วนอื่น
- Timestamp Validation ป้องกันข้อมูลเก่าเกิน 1 ชั่วโมง และล้ำอนาคตเกิน 5 นาที
- Non-blocking Write API ทำให้ throughput สูง ไม่ต้องรอ DB ตอบทุก request
- Device Status อัปเดตเป็น online อัตโนมัติผ่าน goroutine ไม่ block main path
- Flux Query Builder สร้าง query แบบ dynamic ตาม filter ที่รับมา
- Rate Limiting ป้องกัน Sensor ซนส่งข้อมูลมาเกิน 1000 req/min
Next Step
ตอนหน้าเราจะไปเชื่อม MQTT Broker เพราะ Sensor ในชีวิตจริงส่วนใหญ่ไม่ได้คุยผ่าน REST API — พวกมันใช้ MQTT ซึ่งเบากว่า เหมาะกับ device ที่มีแบตจำกัด
- ก่อนหน้า: IoT Workshop #6: Device Management API
- ถัดไป: IoT Workshop #8: MQTT Integration