跳到主要内容

智慧农业项目

本文将通过一个精准灌溉系统项目,演示农业物联网系统的完整开发流程,涵盖传感器网络、LoRa 通信、智能控制等关键技术。

项目概述

项目目标

搭建一个智慧农业精准灌溉系统,实现以下功能:

  • 土壤温湿度监测
  • 气象数据采集
  • 自动灌溉控制
  • 作物生长分析
  • 远程监控与管理

系统架构

┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ 农场管理 App │ 数据分析 │ 灌溉决策 │ 报表系统 │
├─────────────────────────────────────────────────────────┤
│ 云平台 │
│ 数据存储 │ 气象服务 │ 农业模型 │ AI 决策 │
├─────────────────────────────────────────────────────────┤
│ 网关层 │
│ LoRa 网关 │ 数据汇聚 │ 边缘计算 │
├─────────────────────────────────────────────────────────┤
│ 感知层 │
│ 土壤传感器 │ 气象站 │ 水泵控制器 │ 阀门控制器 │
└─────────────────────────────────────────────────────────┘

传感器网络

土壤传感器节点

硬件组成:

  • MCU:ESP32 / STM32L4
  • 传感器:电容式土壤湿度传感器、DS18B20 温度传感器
  • 通信:LoRa 模块(SX1276)
  • 电源:太阳能 + 锂电池

节点代码:

#include <Arduino.h>
#include <LoRa.h>
#include <OneWire.h>
#include <DallasTemperature.h>

#define SOIL_PIN 34
#define TEMP_PIN 4
#define LORA_SS 5
#define LORA_RST 14
#define LORA_DIO0 26

OneWire oneWire(TEMP_PIN);
DallasTemperature tempSensor(&oneWire);

void setup() {
Serial.begin(115200);

LoRa.setPins(LORA_SS, LORA_RST, LORA_DIO0);
if (!LoRa.begin(433E6)) {
Serial.println("LoRa init failed");
while (1);
}

tempSensor.begin();
}

void loop() {
// 读取土壤湿度
int soilRaw = analogRead(SOIL_PIN);
float soilMoisture = map(soilRaw, 0, 4095, 0, 100);

// 读取土壤温度
tempSensor.requestTemperatures();
float soilTemp = tempSensor.getTempCByIndex(0);

// 发送数据
sendSensorData(soilMoisture, soilTemp);

// 低功耗睡眠
esp_sleep_enable_timer_wakeup(300 * 1000000); // 5分钟
esp_deep_sleep_start();
}

void sendSensorData(float moisture, float temp) {
String data = String(moisture) + "," + String(temp);

LoRa.beginPacket();
LoRa.print(data);
LoRa.endPacket();

Serial.println("Sent: " + data);
}

气象站节点

硬件组成:

  • 风速传感器
  • 风向传感器
  • 雨量传感器
  • BME280(温度、湿度、气压)
  • 光照传感器

气象数据处理:

# weather_station.py
import json
from datetime import datetime

class WeatherStation:
def __init__(self):
self.sensors = {
'wind_speed': 0,
'wind_direction': 0,
'rainfall': 0,
'temperature': 0,
'humidity': 0,
'pressure': 0,
'light': 0
}

def read_all(self):
return {
'timestamp': datetime.now().isoformat(),
'wind_speed': self.read_wind_speed(),
'wind_direction': self.read_wind_direction(),
'rainfall': self.read_rainfall(),
'temperature': self.read_temperature(),
'humidity': self.read_humidity(),
'pressure': self.read_pressure(),
'light': self.read_light()
}

def calculate_et0(self, data):
"""计算参考作物蒸散量(ET0)"""
# 使用 Penman-Monteith 公式
# 简化版本
temp = data['temperature']
humidity = data['humidity']
wind_speed = data['wind_speed']
radiation = data['light'] * 0.1 # 简化辐射计算

# ET0 计算公式(简化)
et0 = 0.0023 * (temp + 17.8) * (temp_max - temp_min) ** 0.5 * radiation / (temp + 273.15)

return et0

LoRa 通信

网关配置

# lora_gateway.py
import serial
import json
from datetime import datetime

class LoRaGateway:
def __init__(self, port='/dev/ttyUSB0', baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.nodes = {}

def listen(self):
while True:
if self.serial.in_waiting:
line = self.serial.readline().decode().strip()
self.process_message(line)

def process_message(self, message):
try:
# 解析 LoRa 数据
parts = message.split(',')
if len(parts) >= 3:
node_id = parts[0]
moisture = float(parts[1])
temperature = float(parts[2])

data = {
'node_id': node_id,
'timestamp': datetime.now().isoformat(),
'soil_moisture': moisture,
'soil_temperature': temperature
}

self.nodes[node_id] = data
self.send_to_cloud(data)

except Exception as e:
print(f"Error processing message: {e}")

def send_to_cloud(self, data):
# 发送到云平台
pass

LoRaWAN 接入

# lorawan_client.py
import struct
from lora import LoRa

class LoRaWANClient:
def __init__(self, dev_eui, app_eui, app_key):
self.dev_eui = dev_eui
self.app_eui = app_eui
self.app_key = app_key
self.lora = LoRa()

def join(self):
"""OTAA 入网"""
self.lora.join(mode='OTAA',
dev_eui=self.dev_eui,
app_eui=self.app_eui,
app_key=self.app_key)

def send(self, data, port=1, confirmed=False):
"""发送数据"""
payload = self.encode_payload(data)
self.lora.send(payload, port=port, confirmed=confirmed)

def encode_payload(self, data):
"""编码数据为二进制"""
# 使用 Cayenne LPP 格式
payload = b''

# 土壤湿度
payload += struct.pack('>BBH', 1, 1, int(data['moisture'] * 100))

# 温度
payload += struct.pack('>BBh', 2, 2, int(data['temperature'] * 10))

return payload

灌溉控制

智能灌溉算法

# irrigation_control.py
from datetime import datetime, timedelta

class IrrigationController:
def __init__(self):
self.zones = {}
self.schedules = []
self.rules = []

def add_zone(self, zone_id, valve_pin, area):
self.zones[zone_id] = {
'valve_pin': valve_pin,
'area': area,
'status': 'off',
'last_irrigation': None
}

def evaluate(self, zone_id, sensor_data, weather_data):
"""评估灌溉需求"""
zone = self.zones[zone_id]

# 获取土壤湿度
soil_moisture = sensor_data.get('soil_moisture', 50)

# 获取天气预报
rain_forecast = weather_data.get('rain_forecast', 0)

# 计算灌溉需求
need_irrigation = False
irrigation_duration = 0

# 规则1:土壤湿度低于阈值
if soil_moisture < 30:
need_irrigation = True
irrigation_duration = 30 # 分钟

# 规则2:未来有雨,减少灌溉
if rain_forecast > 5: # 预报降雨量 > 5mm
irrigation_duration *= 0.5

# 规则3:高温天气增加灌溉
if sensor_data.get('temperature', 25) > 30:
irrigation_duration *= 1.2

# 规则4:避免中午灌溉
current_hour = datetime.now().hour
if 11 <= current_hour <= 14:
need_irrigation = False

return {
'need_irrigation': need_irrigation,
'duration': irrigation_duration,
'reason': self._get_reason(soil_moisture, rain_forecast)
}

def execute_irrigation(self, zone_id, duration):
"""执行灌溉"""
zone = self.zones[zone_id]

# 开启阀门
self._open_valve(zone['valve_pin'])
zone['status'] = 'on'

# 定时关闭
# 实际实现中应使用定时器
time.sleep(duration * 60)

# 关闭阀门
self._close_valve(zone['valve_pin'])
zone['status'] = 'off'
zone['last_irrigation'] = datetime.now()

def _get_reason(self, moisture, rain):
if moisture < 30:
return "土壤湿度不足"
if rain > 5:
return "预计降雨,减少灌溉"
return "正常灌溉"

阀门控制器

// valve_controller.cpp
#include <Arduino.h>

#define VALVE_PIN_1 25
#define VALVE_PIN_2 26
#define VALVE_PIN_3 27
#define VALVE_PIN_4 14

class ValveController {
private:
int pins[4] = {VALVE_PIN_1, VALVE_PIN_2, VALVE_PIN_3, VALVE_PIN_4};
bool states[4] = {false, false, false, false};

public:
void init() {
for (int i = 0; i < 4; i++) {
pinMode(pins[i], OUTPUT);
digitalWrite(pins[i], LOW);
}
}

void open(int zone) {
if (zone >= 0 && zone < 4) {
digitalWrite(pins[zone], HIGH);
states[zone] = true;
}
}

void close(int zone) {
if (zone >= 0 && zone < 4) {
digitalWrite(pins[zone], LOW);
states[zone] = false;
}
}

void closeAll() {
for (int i = 0; i < 4; i++) {
digitalWrite(pins[i], LOW);
states[i] = false;
}
}

bool getState(int zone) {
if (zone >= 0 && zone < 4) {
return states[zone];
}
return false;
}
};

数据分析

作物生长模型

# crop_model.py
import numpy as np
from datetime import datetime

class CropGrowthModel:
def __init__(self, crop_type):
self.crop_type = crop_type
self.growth_stages = self._get_growth_stages()

def _get_growth_stages(self):
stages = {
'wheat': [
{'name': '播种期', 'days': (0, 10), 'water_need': 0.5},
{'name': '出苗期', 'days': (10, 30), 'water_need': 0.7},
{'name': '分蘖期', 'days': (30, 60), 'water_need': 1.0},
{'name': '拔节期', 'days': (60, 90), 'water_need': 1.2},
{'name': '抽穗期', 'days': (90, 110), 'water_need': 1.3},
{'name': '灌浆期', 'days': (110, 140), 'water_need': 1.0},
{'name': '成熟期', 'days': (140, 160), 'water_need': 0.5}
]
}
return stages.get(self.crop_type, [])

def get_current_stage(self, planting_date):
days = (datetime.now() - planting_date).days

for stage in self.growth_stages:
if stage['days'][0] <= days < stage['days'][1]:
return stage

return {'name': '未知', 'water_need': 1.0}

def calculate_water_need(self, planting_date, et0):
stage = self.get_current_stage(planting_date)
crop_coefficient = stage['water_need']

# 作物需水量 = ET0 × 作物系数
water_need = et0 * crop_coefficient

return {
'stage': stage['name'],
'water_need_mm': water_need,
'crop_coefficient': crop_coefficient
}

产量预测

# yield_prediction.py
import numpy as np
from sklearn.linear_model import LinearRegression

class YieldPredictor:
def __init__(self):
self.model = LinearRegression()
self.is_trained = False

def extract_features(self, data):
features = []
for record in data:
features.append([
record['total_irrigation'],
record['avg_temperature'],
record['total_rainfall'],
record['avg_soil_moisture'],
record['growing_days']
])
return np.array(features)

def train(self, X, y):
self.model.fit(X, y)
self.is_trained = True

def predict(self, current_data):
if not self.is_trained:
return None

X = self.extract_features([current_data])
prediction = self.model.predict(X)[0]

return {
'predicted_yield': prediction,
'unit': 'kg/ha'
}

农场管理平台

Web 界面

<!DOCTYPE html>
<html>
<head>
<title>智慧农业管理平台</title>
<script src="https://cdn.jsdelivr.net/npm/vue@3"></script>
<script src="https://cdn.jsdelivr.net/npm/echarts@5"></script>
</head>
<body>
<div id="app">
<div class="dashboard">
<div class="header">
<h1>智慧农业管理平台</h1>
</div>

<div class="map-view">
<div class="zone" v-for="zone in zones" :key="zone.id"
:style="{left: zone.x + '%', top: zone.y + '%'}">
<div class="zone-name">{{ zone.name }}</div>
<div class="zone-moisture">
湿度: {{ zone.moisture }}%
</div>
<div class="zone-status" :class="zone.irrigating ? 'on' : 'off'">
{{ zone.irrigating ? '灌溉中' : '待机' }}
</div>
</div>
</div>

<div class="control-panel">
<div class="irrigation-control">
<h3>灌溉控制</h3>
<div v-for="zone in zones" :key="zone.id">
<span>{{ zone.name }}</span>
<button @click="toggleIrrigation(zone.id)">
{{ zone.irrigating ? '停止' : '开始' }}
</button>
</div>
</div>

<div class="weather-info">
<h3>气象信息</h3>
<div>温度: {{ weather.temperature }}°C</div>
<div>湿度: {{ weather.humidity }}%</div>
<div>风速: {{ weather.windSpeed }} m/s</div>
<div>降雨预报: {{ weather.rainForecast }} mm</div>
</div>
</div>

<div class="charts">
<div id="moisture-chart" style="height: 300px;"></div>
<div id="irrigation-chart" style="height: 300px;"></div>
</div>
</div>
</div>

<script>
const app = Vue.createApp({
data() {
return {
zones: [],
weather: {},
moistureChart: null,
irrigationChart: null
}
},
mounted() {
this.initCharts();
this.loadData();
},
methods: {
initCharts() {
this.moistureChart = echarts.init(
document.getElementById('moisture-chart')
);
this.irrigationChart = echarts.init(
document.getElementById('irrigation-chart')
);
},
async toggleIrrigation(zoneId) {
await fetch(`/api/irrigation/${zoneId}/toggle`, {
method: 'POST'
});
},
async loadData() {
const response = await fetch('/api/data');
const data = await response.json();
this.zones = data.zones;
this.weather = data.weather;
}
}
});
app.mount('#app');
</script>
</body>
</html>

部署方案

现场部署

# docker-compose.yml
version: '3'
services:
lora-gateway:
build: ./gateway
devices:
- /dev/ttyUSB0:/dev/ttyUSB0

edge-server:
build: ./edge
ports:
- "8080:8080"
volumes:
- ./data:/data

controller:
build: ./controller
privileged: true

小结

智慧农业项目展示了物联网技术在农业领域的应用,包括传感器网络、LoRa 通信、智能灌溉控制和数据分析。通过本项目,你可以掌握农业物联网系统的完整开发流程。

下一步,我们将查看速查表,快速回顾物联网开发的关键知识点。