跳到主要内容

后端开发

物联网后端负责设备连接管理、数据存储、业务逻辑处理等核心功能。本文将介绍物联网后端开发的关键技术和实践方法。

后端架构

整体架构

┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ API 网关 │ 业务服务 │ 数据分析 │ 可视化 │
├─────────────────────────────────────────────────────────┤
│ 服务层 │
│ 设备服务 │ 数据服务 │ 规则服务 │ 告警服务 │
├─────────────────────────────────────────────────────────┤
│ 中间件层 │
│ 消息队列 │ 缓存 │ 时序数据库 │ 关系数据库 │
├─────────────────────────────────────────────────────────┤
│ 接入层 │
│ MQTT Broker │ HTTP Server │ WebSocket Server │
└─────────────────────────────────────────────────────────┘

技术选型

层次技术选型
接入层EMQX、Mosquitto、Nginx
消息队列Kafka、RabbitMQ、Redis Pub/Sub
时序数据库InfluxDB、TimescaleDB、TDengine
关系数据库PostgreSQL、MySQL
缓存Redis
后端框架Node.js、Spring Boot、FastAPI

消息中间件

MQTT Broker

EMQX 配置示例:

# emqx.conf
listeners.tcp.default {
bind = "0.0.0.0:1883"
max_connections = 1024000
}

listeners.ssl.default {
bind = "0.0.0.0:8883"
ssl_options {
cacertfile = "/etc/emqx/certs/cacert.pem"
certfile = "/etc/emqx/certs/cert.pem"
keyfile = "/etc/emqx/certs/key.pem"
}
}

EMQX 规则引擎:

-- 温度数据转发到数据库
SELECT
payload.temperature as temperature,
payload.humidity as humidity,
clientid as device_id,
timestamp as ts
FROM "sensors/+/data"
WHERE payload.temperature > 0

Kafka

Kafka 架构:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Producer │────▶│ Broker │────▶│ Consumer │
│ (设备数据) │ │ (消息存储) │ │ (数据处理) │
└─────────────┘ └─────────────┘ └─────────────┘

┌──────┴──────┐
│ Topics │
│ • telemetry │
│ • events │
│ • commands │
└─────────────┘

Python Kafka 生产者:

from kafka import KafkaProducer
import json

producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_telemetry(device_id, data):
topic = f'telemetry.{device_id}'
producer.send(topic, {
'device_id': device_id,
'data': data,
'timestamp': time.time()
})
producer.flush()

Python Kafka 消费者:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
'telemetry.*',
bootstrap_servers=['localhost:9092'],
group_id='iot-processor',
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
data = message.value
process_telemetry(data)

数据存储

时序数据库

InfluxDB:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="your-token")
write_api = client.write_api(write_options=SYNCHRONOUS)

def write_telemetry(device_id, temperature, humidity):
point = Point("sensor_data") \
.tag("device_id", device_id) \
.field("temperature", temperature) \
.field("humidity", humidity)
write_api.write(bucket="iot-bucket", record=point)

def query_data(device_id, start_time, end_time):
query = f'''
from(bucket: "iot-bucket")
|> range(start: {start_time}, stop: {end_time})
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["device_id"] == "{device_id}")
'''
return client.query_api().query(query)

TimescaleDB:

-- 创建时序表
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);

-- 创建 hypertable
SELECT create_hypertable('sensor_data', 'time');

-- 创建连续聚合视图
CREATE MATERIALIZED VIEW sensor_hourly
WITH (timescaledb.continuous) AS
SELECT device_id,
time_bucket('1 hour', time) AS bucket,
AVG(temperature) AS avg_temp,
AVG(humidity) AS avg_humidity
FROM sensor_data
GROUP BY device_id, time_bucket('1 hour', time);

-- 查询数据
SELECT * FROM sensor_data
WHERE device_id = 'sensor-001'
AND time > NOW() - INTERVAL '24 hours'
ORDER BY time DESC;

关系数据库

PostgreSQL 设备管理:

-- 设备表
CREATE TABLE devices (
id SERIAL PRIMARY KEY,
device_id VARCHAR(64) UNIQUE NOT NULL,
name VARCHAR(128),
device_type VARCHAR(64),
status VARCHAR(32) DEFAULT 'offline',
last_seen TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 设备属性表
CREATE TABLE device_properties (
id SERIAL PRIMARY KEY,
device_id VARCHAR(64) REFERENCES devices(device_id),
key VARCHAR(64),
value TEXT,
updated_at TIMESTAMPTZ DEFAULT NOW()
);

Redis 缓存

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cache_device_status(device_id, status):
key = f'device:{device_id}:status'
redis_client.setex(key, 300, json.dumps(status))

def get_device_status(device_id):
key = f'device:{device_id}:status'
data = redis_client.get(key)
return json.loads(data) if data else None

def cache_device_list(devices):
redis_client.setex('devices:all', 60, json.dumps(devices))

API 设计

RESTful API

设备管理 API:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

class Device(BaseModel):
device_id: str
name: str
device_type: str
status: Optional[str] = 'offline'

class Telemetry(BaseModel):
temperature: float
humidity: float

@app.post('/api/v1/devices')
async def create_device(device: Device):
return await device_service.create(device)

@app.get('/api/v1/devices/{device_id}')
async def get_device(device_id: str):
device = await device_service.get(device_id)
if not device:
raise HTTPException(status_code=404, detail='Device not found')
return device

@app.post('/api/v1/devices/{device_id}/telemetry')
async def send_telemetry(device_id: str, telemetry: Telemetry):
await telemetry_service.save(device_id, telemetry)
return {'status': 'ok'}

@app.get('/api/v1/devices/{device_id}/telemetry')
async def get_telemetry(device_id: str, start: str, end: str):
return await telemetry_service.query(device_id, start, end)

WebSocket 实时推送

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket('/ws/devices/{device_id}')
async def device_websocket(websocket: WebSocket, device_id: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
await handle_device_message(device_id, message)
except WebSocketDisconnect:
await handle_disconnect(device_id)

规则引擎

规则配置

class RuleEngine:
def __init__(self):
self.rules = []

def add_rule(self, rule):
self.rules.append(rule)

def evaluate(self, data):
results = []
for rule in self.rules:
if rule.match(data):
results.append(rule.execute(data))
return results

class Rule:
def __init__(self, name, condition, actions):
self.name = name
self.condition = condition
self.actions = actions

def match(self, data):
return eval(self.condition, {}, data)

def execute(self, data):
for action in self.actions:
action(data)
return self.name

engine = RuleEngine()
engine.add_rule(Rule(
name='high_temperature_alert',
condition='temperature > 30',
actions=[send_alert, log_event]
))

告警处理

class AlertManager:
def __init__(self):
self.alerts = {}

async def create_alert(self, device_id, alert_type, message):
alert = {
'device_id': device_id,
'type': alert_type,
'message': message,
'timestamp': datetime.now(),
'status': 'active'
}
await self.notify(alert)
return alert

async def notify(self, alert):
await send_email(alert)
await send_sms(alert)
await push_notification(alert)

数据分析

实时流处理

from kafka import KafkaConsumer
import pandas as pd

def process_stream():
consumer = KafkaConsumer('telemetry.*', ...)

for message in consumer:
data = message.value

if data['temperature'] > 30:
trigger_alert(data)

if is_anomaly(data):
log_anomaly(data)

update_statistics(data)

数据聚合

def aggregate_hourly(device_id):
query = '''
SELECT
time_bucket('1 hour', time) as hour,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
COUNT(*) as count
FROM sensor_data
WHERE device_id = %s
AND time > NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour
'''
return execute_query(query, device_id)

部署与运维

Docker 部署

version: '3'
services:
mqtt-broker:
image: emqx/emqx:latest
ports:
- "1883:1883"
- "18083:18083"

timescaledb:
image: timescale/timescaledb:latest-pg15
environment:
POSTGRES_PASSWORD: password
volumes:
- timescale-data:/var/lib/postgresql/data

redis:
image: redis:alpine
ports:
- "6379:6379"

api-server:
build: .
ports:
- "8000:8000"
depends_on:
- mqtt-broker
- timescaledb
- redis

volumes:
timescale-data:

监控告警

# Prometheus 配置
global:
scrape_interval: 15s

scrape_configs:
- job_name: 'emqx'
static_configs:
- targets: ['emqx:8081']

- job_name: 'api-server'
static_configs:
- targets: ['api-server:8000']

# 告警规则
groups:
- name: iot-alerts
rules:
- alert: HighTemperature
expr: sensor_temperature > 30
for: 5m
annotations:
summary: "High temperature detected"

小结

物联网后端开发涉及消息中间件、数据存储、API 设计、规则引擎等多个方面。选择合适的技术栈,设计可扩展的架构,实现可靠的数据处理能力,是构建物联网后端系统的关键。

下一步,我们将学习智能家居项目实战,将所学知识应用到实际项目中。