工业物联网项目
本文将通过一个预测性维护项目,演示工业物联网系统的完整开发流程,涵盖工业协议、边缘计算、数据分析等关键技术。
项目概述
项目目标
搭建一个工业设备预测性维护系统,实现以下功能:
- 设备状态实时监测
- 振动数据采集与分析
- 设备故障预测
- 维护工单自动生成
- 数据可视化大屏
系统架构
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ 监控大屏 │ 预测分析 │ 维护管理 │ 报表系统 │
├─────────────────────────────────────────────────────────┤
│ 云平台 │
│ 数据湖 │ AI 模型训练 │ 大数据分析 │
├─────────────────────────────────────────────────────────┤
│ 边缘层 │
│ 边缘网关 │ 实时计算 │ 本地 AI │ 数据缓存 │
├─────────────────────────────────────────────────────────┤
│ 设备层 │
│ PLC │ CNC │ 电机 │ 振动传感器 │ 温度传感器 │
└─────────────────────────────────────────────────────────┘
工业协议
Modbus 协议
Modbus RTU 读取示例:
from pymodbus.client import ModbusSerialClient
client = ModbusSerialClient(
port='/dev/ttyUSB0',
baudrate=9600,
parity='N',
stopbits=1,
bytesize=8
)
client.connect()
# 读取保持寄存器
result = client.read_holding_registers(address=0, count=10, unit=1)
if not result.isError():
registers = result.registers
print(f"Registers: {registers}")
# 写入单个寄存器
client.write_register(address=0, value=100, unit=1)
client.close()
Modbus TCP 读取示例:
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient('192.168.1.100', port=502)
client.connect()
# 读取输入寄存器
result = client.read_input_registers(address=0, count=10, unit=1)
print(f"Input registers: {result.registers}")
# 读取线圈状态
result = client.read_coils(address=0, count=10, unit=1)
print(f"Coils: {result.bits}")
client.close()
OPC-UA 协议
OPC-UA 客户端示例:
from opcua import Client
client = Client("opc.tcp://192.168.1.100:4840")
client.connect()
# 浏览节点
root = client.get_root_node()
objects = client.get_objects_node()
# 读取节点值
temperature_node = client.get_node("ns=2;i=2")
temperature = temperature_node.get_value()
print(f"Temperature: {temperature}")
# 订阅数据变化
class SubHandler:
def datachange_notification(self, node, val, data):
print(f"Node {node} value changed to {val}")
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(temperature_node)
client.close()
边缘网关开发
数据采集服务
# data_collector.py
import asyncio
from pymodbus.client import AsyncModbusTcpClient
import json
from datetime import datetime
class DataCollector:
def __init__(self, device_configs):
self.devices = {}
for config in device_configs:
self.devices[config['name']] = AsyncModbusTcpClient(
config['host'],
port=config['port']
)
async def connect_all(self):
for name, client in self.devices.items():
await client.connect()
async def read_device(self, device_name, registers):
client = self.devices[device_name]
data = {}
for reg in registers:
result = await client.read_holding_registers(
address=reg['address'],
count=reg['count'],
unit=reg['unit']
)
if not result.isError():
data[reg['name']] = self._convert_value(
result.registers,
reg['type']
)
return {
'device': device_name,
'timestamp': datetime.now().isoformat(),
'data': data
}
def _convert_value(self, registers, data_type):
if data_type == 'int16':
return registers[0]
elif data_type == 'float32':
return struct.unpack('>f', struct.pack('>HH', registers[0], registers[1]))[0]
return registers
async def main():
configs = [
{'name': 'plc-1', 'host': '192.168.1.100', 'port': 502},
{'name': 'plc-2', 'host': '192.168.1.101', 'port': 502}
]
collector = DataCollector(configs)
await collector.connect_all()
registers = [
{'name': 'temperature', 'address': 0, 'count': 1, 'unit': 1, 'type': 'int16'},
{'name': 'vibration', 'address': 1, 'count': 2, 'unit': 1, 'type': 'float32'},
{'name': 'speed', 'address': 3, 'count': 1, 'unit': 1, 'type': 'int16'}
]
while True:
for device_name in collector.devices:
data = await collector.read_device(device_name, registers)
await send_to_cloud(data)
await asyncio.sleep(1)
asyncio.run(main())
边缘计算
# edge_analytics.py
import numpy as np
from collections import deque
import json
class VibrationAnalyzer:
def __init__(self, window_size=1024):
self.window_size = window_size
self.buffer = deque(maxlen=window_size)
self.baseline = None
self.threshold = 3.0
def add_sample(self, sample):
self.buffer.append(sample)
if len(self.buffer) == self.window_size:
return self.analyze()
return None
def analyze(self):
data = np.array(self.buffer)
# 时域特征
rms = np.sqrt(np.mean(data**2))
peak = np.max(np.abs(data))
crest_factor = peak / rms if rms > 0 else 0
# 频域特征(FFT)
fft = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data))
dominant_freq = np.abs(freqs[np.argmax(np.abs(fft))])
# 异常检测
is_anomaly = False
if self.baseline is not None:
z_score = (rms - self.baseline['mean']) / self.baseline['std']
is_anomaly = abs(z_score) > self.threshold
return {
'rms': float(rms),
'peak': float(peak),
'crest_factor': float(crest_factor),
'dominant_freq': float(dominant_freq),
'is_anomaly': is_anomaly
}
def update_baseline(self, data):
self.baseline = {
'mean': np.mean(data),
'std': np.std(data)
}
class EdgeProcessor:
def __init__(self):
self.analyzers = {}
self.rules = []
def process(self, device_id, data):
if device_id not in self.analyzers:
self.analyzers[device_id] = VibrationAnalyzer()
analyzer = self.analyzers[device_id]
result = analyzer.add_sample(data.get('vibration'))
if result:
result['device_id'] = device_id
result['timestamp'] = data['timestamp']
# 规则引擎处理
for rule in self.rules:
if rule.match(result):
rule.execute(result)
return result
return None
数据存储与分析
时序数据库
# timescaledb_storage.py
import psycopg2
from datetime import datetime
class TimeSeriesDB:
def __init__(self, host, database, user, password):
self.conn = psycopg2.connect(
host=host,
database=database,
user=user,
password=password
)
self._init_tables()
def _init_tables(self):
with self.conn.cursor() as cur:
cur.execute('''
CREATE TABLE IF NOT EXISTS device_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION
);
SELECT create_hypertable('device_data', 'time', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_device_id ON device_data (device_id, time DESC);
''')
self.conn.commit()
def insert(self, device_id, metrics):
with self.conn.cursor() as cur:
for name, value in metrics.items():
cur.execute('''
INSERT INTO device_data (time, device_id, metric_name, value)
VALUES (%s, %s, %s, %s)
''', (datetime.now(), device_id, name, value))
self.conn.commit()
def query(self, device_id, start_time, end_time):
with self.conn.cursor() as cur:
cur.execute('''
SELECT time, metric_name, value
FROM device_data
WHERE device_id = %s
AND time BETWEEN %s AND %s
ORDER BY time DESC
''', (device_id, start_time, end_time))
return cur.fetchall()
预测模型
# prediction_model.py
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import joblib
class PredictiveMaintenanceModel:
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100)
self.scaler = StandardScaler()
self.is_trained = False
def extract_features(self, data):
features = []
for record in data:
features.append([
record['rms'],
record['peak'],
record['crest_factor'],
record['dominant_freq'],
record['temperature'],
record['speed']
])
return np.array(features)
def train(self, X, y):
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled, y)
self.is_trained = True
def predict(self, data):
if not self.is_trained:
return None
X = self.extract_features([data])
X_scaled = self.scaler.transform(X)
prediction = self.model.predict(X_scaled)[0]
probability = self.model.predict_proba(X_scaled)[0]
return {
'failure_predicted': bool(prediction),
'probability': float(probability[1]),
'confidence': float(max(probability))
}
def save(self, path):
joblib.dump({
'model': self.model,
'scaler': self.scaler,
'is_trained': self.is_trained
}, path)
def load(self, path):
data = joblib.load(path)
self.model = data['model']
self.scaler = data['scaler']
self.is_trained = data['is_trained']
监控大屏
数据可视化
<!DOCTYPE html>
<html>
<head>
<title>工业设备监控大屏</title>
<script src="https://cdn.jsdelivr.net/npm/vue@3"></script>
<script src="https://cdn.jsdelivr.net/npm/echarts@5"></script>
</head>
<body>
<div id="app">
<div class="dashboard">
<div class="header">
<h1>工业设备预测性维护系统</h1>
<span class="time">{{ currentTime }}</span>
</div>
<div class="main-content">
<div class="left-panel">
<div class="device-list">
<h3>设备状态</h3>
<div v-for="device in devices" :key="device.id"
:class="['device-item', device.status]">
<span class="name">{{ device.name }}</span>
<span class="status">{{ device.status }}</span>
</div>
</div>
</div>
<div class="center-panel">
<div id="vibration-chart" style="height: 300px;"></div>
<div id="temperature-chart" style="height: 300px;"></div>
</div>
<div class="right-panel">
<div class="alerts">
<h3>告警信息</h3>
<div v-for="alert in alerts" :key="alert.id"
:class="['alert-item', alert.level]">
<span class="time">{{ alert.time }}</span>
<span class="message">{{ alert.message }}</span>
</div>
</div>
<div class="predictions">
<h3>故障预测</h3>
<div v-for="pred in predictions" :key="pred.device_id">
<span>{{ pred.device_id }}</span>
<span>故障概率: {{ (pred.probability * 100).toFixed(1) }}%</span>
</div>
</div>
</div>
</div>
</div>
</div>
<script>
const app = Vue.createApp({
data() {
return {
currentTime: '',
devices: [],
alerts: [],
predictions: [],
vibrationChart: null,
temperatureChart: null,
ws: null
}
},
mounted() {
this.initCharts();
this.connectWebSocket();
this.updateTime();
setInterval(() => this.updateTime(), 1000);
},
methods: {
initCharts() {
this.vibrationChart = echarts.init(
document.getElementById('vibration-chart')
);
this.temperatureChart = echarts.init(
document.getElementById('temperature-chart')
);
this.vibrationChart.setOption({
title: {text: '振动趋势'},
xAxis: {type: 'category'},
yAxis: {type: 'value'},
series: [{type: 'line', data: []}]
});
this.temperatureChart.setOption({
title: {text: '温度趋势'},
xAxis: {type: 'category'},
yAxis: {type: 'value'},
series: [{type: 'line', data: []}]
});
},
connectWebSocket() {
this.ws = new WebSocket('ws://localhost:8000/ws');
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleData(data);
};
},
handleData(data) {
// 更新设备状态
// 更新图表
// 检查告警
},
updateTime() {
this.currentTime = new Date().toLocaleString();
}
}
});
app.mount('#app');
</script>
</body>
</html>
部署架构
边缘部署
# docker-compose.edge.yml
version: '3'
services:
edge-gateway:
build: ./edge
ports:
- "8080:8080"
volumes:
- ./config:/config
- ./data:/data
environment:
- CLOUD_ENDPOINT=cloud.example.com
- DEVICE_ID=edge-gateway-001
云端部署
# docker-compose.cloud.yml
version: '3'
services:
timescaledb:
image: timescale/timescaledb:latest-pg15
volumes:
- ts-data:/var/lib/postgresql/data
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
api-server:
build: ./api
ports:
- "8000:8000"
depends_on:
- timescaledb
- kafka
prediction-service:
build: ./prediction
depends_on:
- kafka
dashboard:
build: ./dashboard
ports:
- "80:80"
depends_on:
- api-server
volumes:
ts-data:
小结
工业物联网项目相比智能家居有更高的要求,包括工业协议支持、边缘计算能力、实时数据处理和预测分析。本项目演示了完整的工业物联网系统开发流程,包括数据采集、边缘计算、数据存储、预测模型和监控大屏。
下一步,我们将学习智慧农业项目,了解农业物联网的特殊需求和解决方案。