智慧农业项目
本文将通过一个精准灌溉系统项目,演示农业物联网系统的完整开发流程,涵盖传感器网络、LoRa 通信、智能控制等关键技术。
项目概述
项目目标
搭建一个智慧农业精准灌溉系统,实现以下功能:
- 土壤温湿度监测
- 气象数据采集
- 自动灌溉控制
- 作物生长分析
- 远程监控与管理
系统架构
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ 农场管理 App │ 数据分析 │ 灌溉决策 │ 报表系统 │
├─────────────────────────────────────────────────────────┤
│ 云平台 │
│ 数据存储 │ 气象服务 │ 农业模型 │ AI 决策 │
├─────────────────────────────────────────────────────────┤
│ 网关层 │
│ LoRa 网关 │ 数据汇聚 │ 边缘计算 │
├─────────────────────────────────────────────────────────┤
│ 感知层 │
│ 土壤传感器 │ 气象站 │ 水泵控制器 │ 阀门控制器 │
└─────────────────────────────────────────────────────────┘
传感器网络
土壤传感器节点
硬件组成:
- MCU:ESP32 / STM32L4
- 传感器:电容式土壤湿度传感器、DS18B20 温度传感器
- 通信:LoRa 模块(SX1276)
- 电源:太阳能 + 锂电池
节点代码:
#include <Arduino.h>
#include <LoRa.h>
#include <OneWire.h>
#include <DallasTemperature.h>
#define SOIL_PIN 34
#define TEMP_PIN 4
#define LORA_SS 5
#define LORA_RST 14
#define LORA_DIO0 26
OneWire oneWire(TEMP_PIN);
DallasTemperature tempSensor(&oneWire);
void setup() {
Serial.begin(115200);
LoRa.setPins(LORA_SS, LORA_RST, LORA_DIO0);
if (!LoRa.begin(433E6)) {
Serial.println("LoRa init failed");
while (1);
}
tempSensor.begin();
}
void loop() {
// 读取土壤湿度
int soilRaw = analogRead(SOIL_PIN);
float soilMoisture = map(soilRaw, 0, 4095, 0, 100);
// 读取土壤温度
tempSensor.requestTemperatures();
float soilTemp = tempSensor.getTempCByIndex(0);
// 发送数据
sendSensorData(soilMoisture, soilTemp);
// 低功耗睡眠
esp_sleep_enable_timer_wakeup(300 * 1000000); // 5分钟
esp_deep_sleep_start();
}
void sendSensorData(float moisture, float temp) {
String data = String(moisture) + "," + String(temp);
LoRa.beginPacket();
LoRa.print(data);
LoRa.endPacket();
Serial.println("Sent: " + data);
}
气象站节点
硬件组成:
- 风速传感器
- 风向传感器
- 雨量传感器
- BME280(温度、湿度、气压)
- 光照传感器
气象数据处理:
# weather_station.py
import json
from datetime import datetime
class WeatherStation:
def __init__(self):
self.sensors = {
'wind_speed': 0,
'wind_direction': 0,
'rainfall': 0,
'temperature': 0,
'humidity': 0,
'pressure': 0,
'light': 0
}
def read_all(self):
return {
'timestamp': datetime.now().isoformat(),
'wind_speed': self.read_wind_speed(),
'wind_direction': self.read_wind_direction(),
'rainfall': self.read_rainfall(),
'temperature': self.read_temperature(),
'humidity': self.read_humidity(),
'pressure': self.read_pressure(),
'light': self.read_light()
}
def calculate_et0(self, data):
"""计算参考作物蒸散量(ET0)"""
# 使用 Penman-Monteith 公式
# 简化版本
temp = data['temperature']
humidity = data['humidity']
wind_speed = data['wind_speed']
radiation = data['light'] * 0.1 # 简化辐射计算
# ET0 计算公式(简化)
et0 = 0.0023 * (temp + 17.8) * (temp_max - temp_min) ** 0.5 * radiation / (temp + 273.15)
return et0
LoRa 通信
网关配置
# lora_gateway.py
import serial
import json
from datetime import datetime
class LoRaGateway:
def __init__(self, port='/dev/ttyUSB0', baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.nodes = {}
def listen(self):
while True:
if self.serial.in_waiting:
line = self.serial.readline().decode().strip()
self.process_message(line)
def process_message(self, message):
try:
# 解析 LoRa 数据
parts = message.split(',')
if len(parts) >= 3:
node_id = parts[0]
moisture = float(parts[1])
temperature = float(parts[2])
data = {
'node_id': node_id,
'timestamp': datetime.now().isoformat(),
'soil_moisture': moisture,
'soil_temperature': temperature
}
self.nodes[node_id] = data
self.send_to_cloud(data)
except Exception as e:
print(f"Error processing message: {e}")
def send_to_cloud(self, data):
# 发送到云平台
pass
LoRaWAN 接入
# lorawan_client.py
import struct
from lora import LoRa
class LoRaWANClient:
def __init__(self, dev_eui, app_eui, app_key):
self.dev_eui = dev_eui
self.app_eui = app_eui
self.app_key = app_key
self.lora = LoRa()
def join(self):
"""OTAA 入网"""
self.lora.join(mode='OTAA',
dev_eui=self.dev_eui,
app_eui=self.app_eui,
app_key=self.app_key)
def send(self, data, port=1, confirmed=False):
"""发送数据"""
payload = self.encode_payload(data)
self.lora.send(payload, port=port, confirmed=confirmed)
def encode_payload(self, data):
"""编码数据为二进制"""
# 使用 Cayenne LPP 格式
payload = b''
# 土壤湿度
payload += struct.pack('>BBH', 1, 1, int(data['moisture'] * 100))
# 温度
payload += struct.pack('>BBh', 2, 2, int(data['temperature'] * 10))
return payload
灌溉控制
智能灌溉算法
# irrigation_control.py
from datetime import datetime, timedelta
class IrrigationController:
def __init__(self):
self.zones = {}
self.schedules = []
self.rules = []
def add_zone(self, zone_id, valve_pin, area):
self.zones[zone_id] = {
'valve_pin': valve_pin,
'area': area,
'status': 'off',
'last_irrigation': None
}
def evaluate(self, zone_id, sensor_data, weather_data):
"""评估灌溉需求"""
zone = self.zones[zone_id]
# 获取土壤湿度
soil_moisture = sensor_data.get('soil_moisture', 50)
# 获取天气预报
rain_forecast = weather_data.get('rain_forecast', 0)
# 计算灌溉需求
need_irrigation = False
irrigation_duration = 0
# 规则1:土壤湿度低于阈值
if soil_moisture < 30:
need_irrigation = True
irrigation_duration = 30 # 分钟
# 规则2:未来有雨,减少灌溉
if rain_forecast > 5: # 预报降雨量 > 5mm
irrigation_duration *= 0.5
# 规则3:高温天气增加灌溉
if sensor_data.get('temperature', 25) > 30:
irrigation_duration *= 1.2
# 规则4:避免中午灌溉
current_hour = datetime.now().hour
if 11 <= current_hour <= 14:
need_irrigation = False
return {
'need_irrigation': need_irrigation,
'duration': irrigation_duration,
'reason': self._get_reason(soil_moisture, rain_forecast)
}
def execute_irrigation(self, zone_id, duration):
"""执行灌溉"""
zone = self.zones[zone_id]
# 开启阀门
self._open_valve(zone['valve_pin'])
zone['status'] = 'on'
# 定时关闭
# 实际实现中应使用定时器
time.sleep(duration * 60)
# 关闭阀门
self._close_valve(zone['valve_pin'])
zone['status'] = 'off'
zone['last_irrigation'] = datetime.now()
def _get_reason(self, moisture, rain):
if moisture < 30:
return "土壤湿度不足"
if rain > 5:
return "预计降雨,减少灌溉"
return "正常灌溉"
阀门控制器
// valve_controller.cpp
#include <Arduino.h>
#define VALVE_PIN_1 25
#define VALVE_PIN_2 26
#define VALVE_PIN_3 27
#define VALVE_PIN_4 14
class ValveController {
private:
int pins[4] = {VALVE_PIN_1, VALVE_PIN_2, VALVE_PIN_3, VALVE_PIN_4};
bool states[4] = {false, false, false, false};
public:
void init() {
for (int i = 0; i < 4; i++) {
pinMode(pins[i], OUTPUT);
digitalWrite(pins[i], LOW);
}
}
void open(int zone) {
if (zone >= 0 && zone < 4) {
digitalWrite(pins[zone], HIGH);
states[zone] = true;
}
}
void close(int zone) {
if (zone >= 0 && zone < 4) {
digitalWrite(pins[zone], LOW);
states[zone] = false;
}
}
void closeAll() {
for (int i = 0; i < 4; i++) {
digitalWrite(pins[i], LOW);
states[i] = false;
}
}
bool getState(int zone) {
if (zone >= 0 && zone < 4) {
return states[zone];
}
return false;
}
};
数据分析
作物生长模型
# crop_model.py
import numpy as np
from datetime import datetime
class CropGrowthModel:
def __init__(self, crop_type):
self.crop_type = crop_type
self.growth_stages = self._get_growth_stages()
def _get_growth_stages(self):
stages = {
'wheat': [
{'name': '播种期', 'days': (0, 10), 'water_need': 0.5},
{'name': '出苗期', 'days': (10, 30), 'water_need': 0.7},
{'name': '分蘖期', 'days': (30, 60), 'water_need': 1.0},
{'name': '拔节期', 'days': (60, 90), 'water_need': 1.2},
{'name': '抽穗期', 'days': (90, 110), 'water_need': 1.3},
{'name': '灌浆期', 'days': (110, 140), 'water_need': 1.0},
{'name': '成熟期', 'days': (140, 160), 'water_need': 0.5}
]
}
return stages.get(self.crop_type, [])
def get_current_stage(self, planting_date):
days = (datetime.now() - planting_date).days
for stage in self.growth_stages:
if stage['days'][0] <= days < stage['days'][1]:
return stage
return {'name': '未知', 'water_need': 1.0}
def calculate_water_need(self, planting_date, et0):
stage = self.get_current_stage(planting_date)
crop_coefficient = stage['water_need']
# 作物需水量 = ET0 × 作物系数
water_need = et0 * crop_coefficient
return {
'stage': stage['name'],
'water_need_mm': water_need,
'crop_coefficient': crop_coefficient
}
产量预测
# yield_prediction.py
import numpy as np
from sklearn.linear_model import LinearRegression
class YieldPredictor:
def __init__(self):
self.model = LinearRegression()
self.is_trained = False
def extract_features(self, data):
features = []
for record in data:
features.append([
record['total_irrigation'],
record['avg_temperature'],
record['total_rainfall'],
record['avg_soil_moisture'],
record['growing_days']
])
return np.array(features)
def train(self, X, y):
self.model.fit(X, y)
self.is_trained = True
def predict(self, current_data):
if not self.is_trained:
return None
X = self.extract_features([current_data])
prediction = self.model.predict(X)[0]
return {
'predicted_yield': prediction,
'unit': 'kg/ha'
}
农场管理平台
Web 界面
<!DOCTYPE html>
<html>
<head>
<title>智慧农业管理平台</title>
<script src="https://cdn.jsdelivr.net/npm/vue@3"></script>
<script src="https://cdn.jsdelivr.net/npm/echarts@5"></script>
</head>
<body>
<div id="app">
<div class="dashboard">
<div class="header">
<h1>智慧农业管理平台</h1>
</div>
<div class="map-view">
<div class="zone" v-for="zone in zones" :key="zone.id"
:style="{left: zone.x + '%', top: zone.y + '%'}">
<div class="zone-name">{{ zone.name }}</div>
<div class="zone-moisture">
湿度: {{ zone.moisture }}%
</div>
<div class="zone-status" :class="zone.irrigating ? 'on' : 'off'">
{{ zone.irrigating ? '灌溉中' : '待机' }}
</div>
</div>
</div>
<div class="control-panel">
<div class="irrigation-control">
<h3>灌溉控制</h3>
<div v-for="zone in zones" :key="zone.id">
<span>{{ zone.name }}</span>
<button @click="toggleIrrigation(zone.id)">
{{ zone.irrigating ? '停止' : '开始' }}
</button>
</div>
</div>
<div class="weather-info">
<h3>气象信息</h3>
<div>温度: {{ weather.temperature }}°C</div>
<div>湿度: {{ weather.humidity }}%</div>
<div>风速: {{ weather.windSpeed }} m/s</div>
<div>降雨预报: {{ weather.rainForecast }} mm</div>
</div>
</div>
<div class="charts">
<div id="moisture-chart" style="height: 300px;"></div>
<div id="irrigation-chart" style="height: 300px;"></div>
</div>
</div>
</div>
<script>
const app = Vue.createApp({
data() {
return {
zones: [],
weather: {},
moistureChart: null,
irrigationChart: null
}
},
mounted() {
this.initCharts();
this.loadData();
},
methods: {
initCharts() {
this.moistureChart = echarts.init(
document.getElementById('moisture-chart')
);
this.irrigationChart = echarts.init(
document.getElementById('irrigation-chart')
);
},
async toggleIrrigation(zoneId) {
await fetch(`/api/irrigation/${zoneId}/toggle`, {
method: 'POST'
});
},
async loadData() {
const response = await fetch('/api/data');
const data = await response.json();
this.zones = data.zones;
this.weather = data.weather;
}
}
});
app.mount('#app');
</script>
</body>
</html>
部署方案
现场部署
# docker-compose.yml
version: '3'
services:
lora-gateway:
build: ./gateway
devices:
- /dev/ttyUSB0:/dev/ttyUSB0
edge-server:
build: ./edge
ports:
- "8080:8080"
volumes:
- ./data:/data
controller:
build: ./controller
privileged: true
小结
智慧农业项目展示了物联网技术在农业领域的应用,包括传感器网络、LoRa 通信、智能灌溉控制和数据分析。通过本项目,你可以掌握农业物联网系统的完整开发流程。
下一步,我们将查看速查表,快速回顾物联网开发的关键知识点。