物联网实战:Spring Boot + Netty 搭建 MQTT
源码(sample-00)
https://gitee.com/kcnf-iot/mqtt-boot/tree/master/sample-00
目录
项目简介
MQTT Boot 是一个基于 Spring Boot 和 Netty 构建的轻量级 IoT 物联网平台框架。它提供了完整的设备接入、消息路由、规则引擎和设备管理能力,支持多种物联网协议(MQTT、CoAP、HTTP/WebSocket、自定义TCP协议)。
核心特性
- 多协议接入: 支持 MQTT、CoAP、HTTP/WebSocket 及自定义 TCP 协议
- 高性能通信: 基于 Netty 构建的高性能网络通信层
- 灵活路由: Trie 树实现的主题匹配与消息分发
- 设备管理: 设备注册、认证授权、设备影子
- 规则引擎: 基于条件的规则触发与动作执行
- 数据存储: 支持 Redis 缓存、TDengine 时序数据库、MySQL 元数据
- 模块化设计: 清晰的六层架构,便于扩展和维护
技术架构
技术栈
| 技术 | 版本 | 用途 |
|---|---|---|
| Java | 21 | 运行时环境 |
| Spring Boot | 3.5.11 | 应用框架 |
| Netty | 最新版 | 网络通信框架 |
| Maven | 3.x | 依赖管理 |
| Redis | - | 缓存与订阅关系存储 |
| Caffeine | - | 本地缓存 |
| JWT | 0.11.5 | 认证令牌 |
| Lombok | 1.18.34 | 代码简化 |
| Fastjson2 | 2.0.62 | JSON 处理 |
模块架构
mqtt-boot (根项目)
└── sample-00 (示例项目)
├── sample-00-iot-common # 公共模块:模型、常量、工具类
├── sample-00-iot-access # 接入层:Netty服务器、协议适配
├── sample-00-iot-routing # 路由层:主题匹配、消息分发
├── sample-00-iot-core # 核心层:设备管理、规则引擎、设备影子
├── sample-00-iot-business # 业务层:REST API、WebSocket、告警、OTA
└── sample-00-iot-bootstrap # 启动模块:Spring Boot主类、配置
数据流向
设备端 → Access层(协议解析) → Routing层(主题匹配) → Core层(规则引擎/持久化) → Business层(API推送)
↓
外部系统(TDengine/MySQL/Redis)
环境准备
必需软件
JDK 21
# 验证Java版本 java -version # 应输出类似内容: # openjdk version "21.0.x" 2026-xx-xx LTSMaven 3.6+
# 验证Maven版本 mvn -versionRedis (可选但推荐)
# 安装Redis (Ubuntu) sudo apt-get install redis-server # 启动Redis redis-server # 或使用Docker docker run -d -p 6379:6379 redis:latestIDE推荐: IntelliJ IDEA 2024+ (支持Java 21)
可选组件
- TDengine: 时序数据存储
- MySQL: 元数据存储
- Docker: 容器化部署
快速开始
1. 克隆项目
git clone <repository-url>
cd mqtt-boot
2. 编译项目
# 在项目根目录执行
mvn clean install
3. 配置应用
编辑 sample-00/sample-00-iot-bootstrap/src/main/resources/application.yml:
# 修改Redis连接(如果使用)
spring:
data:
redis:
host: localhost
port: 6379
# 修改MQTT服务器端口
mqtt:
server:
port: 1883
4. 启动应用
# 方式1: 使用Maven
cd sample-00/sample-00-iot-bootstrap
mvn spring-boot:run
# 方式2: 打包后运行
mvn clean package
java -jar target/sample-00-iot-bootstrap-1.0-SNAPSHOT.jar
# 方式3: IDE中直接运行
# 运行 IotPlatformApplication.main()
5. 验证启动
启动成功后会看到以下输出:
IoT物联网平台启动成功!
检查服务状态:
- REST API: http://localhost:8081/iot
- MQTT服务器: localhost:1883
6. 测试连接
使用 MQTT 客户端工具(如 MQTT.fx、MQTT Explorer)连接:
Host: localhost
Port: 1883
Protocol: MQTT v3.1.1
发布/订阅测试主题:
Topic: test/topic
Payload: {"message": "Hello IoT"}
模块详解
1. common 模块 - 公共基础
职责: 提供跨模块共享的数据结构、工具类和常量定义
包结构:
com.jysemel.iot.common
├── model # 统一消息模型
│ ├── InternalMessage # 内部统一消息对象
│ ├── DeviceIdentity # 设备身份标识
│ └── ProtocolType # 协议类型枚举
├── exception # 通用异常类
│ ├── IotException
│ ├── DeviceOfflineException
│ └── ProtocolNotSupportException
├── constants # 常量定义
│ ├── TopicConstants # 主题常量
│ └── RedisKeys # Redis键名常量
└── util # 工具类
├── JsonUtil # JSON处理工具
└── NettyUtil # Netty辅助工具
关键类说明:
InternalMessage: 统一的内部消息格式,所有协议转换后的消息都使用此格式DeviceIdentity: 设备唯一标识(产品Key + 设备Key)ProtocolType: 支持的协议类型枚举(MQTT, CoAP, HTTP, TCP_CUSTOM)
使用示例:
// 创建设备身份
DeviceIdentity identity = new DeviceIdentity("product001", "device001");
// 创建内部消息
InternalMessage message = InternalMessage.builder()
.deviceId(identity)
.topic("sensors/temperature")
.payload("{\"temp\": 25.5}")
.qos(1)
.timestamp(System.currentTimeMillis())
.build();
2. access 模块 - 接入层
职责: 基于 Netty 启动 TCP/UDP 服务,处理多协议接入
核心功能:
- 多端口监听,支持不同协议绑定
- 首包嗅探自动识别协议
- 协议编解码器插件化
- 连接管理(channelId ↔ deviceId 映射)
- 心跳超时检测
- TLS/SSL 支持
- 流量整形(令牌桶限流)
包结构:
com.jysemel.iot.access
├── server # Netty服务器
│ ├── IotNettyServer # 启动Netty服务
│ └── IotServerInitializer # ChannelPipeline初始化
├── protocol # 协议适配器
│ ├── ProtocolAdapter # 适配器接口
│ ├── MqttProtocolAdapter # MQTT协议实现
│ ├── CoapProtocolAdapter # CoAP协议实现
│ ├── HttpWsProtocolAdapter # HTTP/WebSocket实现
│ ├── CustomTcpProtocolAdapter # 自定义TCP协议
│ └── ProtocolDetector # 首包嗅探器
├── codec # 编解码器
│ └── MqttCodec # MQTT编解码
├── connection # 连接管理
│ ├── ConnectionManager # 连接管理接口
│ └── InMemoryConnectionManager # 内存实现
├── heartbeat # 心跳检测
│ ├── HeartbeatHandler # 心跳处理器
│ └── HeartbeatCallback # 超时回调
├── security # 安全控制
│ ├── SslContextFactory # TLS配置
│ ├── RateLimiterInterceptor # 限流拦截器
│ └── AclPreFilter # ACL前置校验
└── event # 事件定义
├── DeviceOnlineEvent # 设备上线事件
└── DeviceOfflineEvent # 设备下线事件
工作流程:
- 启动阶段:
IotNettyServer创建 EventLoopGroup,绑定端口 - 连接建立: 新连接到达,
IotServerInitializer初始化 Pipeline - 协议识别:
ProtocolDetector分析首包字节特征,确定协议类型 - 协议解析: 对应的
ProtocolAdapter解析消息,转换为InternalMessage - 连接注册:
ConnectionManager记录 channelId 与 deviceId 的映射 - 心跳监控:
HeartbeatHandler检测空闲连接,超时触发离线事件
扩展现有协议:
// 1. 实现协议适配器
@Component
public class MyProtocolAdapter implements ProtocolAdapter {
@Override
public ProtocolType supportProtocol() {
return ProtocolType.MY_CUSTOM;
}
@Override
public InternalMessage decode(ByteBuf buffer) {
// 解析自定义协议字节流
// ...
return message;
}
@Override
public ByteBuf encode(InternalMessage message) {
// 编码为自定义协议字节流
// ...
return buffer;
}
}
// 2. 在 ProtocolDetector 中添加识别规则
private ProtocolType detectProtocol(ByteBuf buffer) {
byte firstByte = buffer.getByte(0);
if (firstByte == 0xAA) { // 自定义协议魔数
return ProtocolType.MY_CUSTOM;
}
// ... 其他协议判断
}
3. routing 模块 - 路由层
职责: 维护主题订阅关系,实现消息的路由分发
核心功能:
- Trie 树实现的主题匹配(支持
+和#通配符) - 订阅关系管理(Redis + Caffeine 两级缓存)
- 上行消息分发:匹配订阅者并推送
- 下行消息处理:查找目标设备 Channel
- 保留消息存储
包结构:
com.jysemel.iot.routing
├── matcher # 主题匹配
│ ├── TopicMatcher # 匹配接口
│ └── TrieTopicMatcher # Trie树实现
├── subscription # 订阅管理
│ ├── SubscriptionManager # 管理接口
│ ├── RedisSubscriptionManager # Redis实现
│ └── Subscription # 订阅实体
├── dispatcher # 消息分发
│ ├── UpstreamDispatcher # 上行分发器
│ ├── DownstreamDispatcher # 下行分发器
│ └── RetainedMessageStore # 保留消息存储
├── cache # 缓存
│ └── LocalSubscriptionCache # 本地缓存
└── config
└── RoutingAutoConfiguration # 自动配置
主题匹配规则:
+: 单层通配符,匹配任意一个层级sensors/+/temperature匹配sensors/room1/temperature- 不匹配
sensors/room1/floor1/temperature
#: 多层通配符,匹配剩余所有层级(必须在末尾)sensors/#匹配sensors/room1/temperature和sensors/room1/floor1/humidity
订阅流程:
// 设备订阅主题
Subscription sub = new Subscription(deviceId, "sensors/+/temperature", QoS.AT_LEAST_ONCE);
subscriptionManager.subscribe(sub);
// 发布消息时匹配订阅者
List<Subscription> matchedSubs = topicMatcher.match("sensors/room1/temperature");
// 返回所有匹配的订阅关系
消息分发:
// 上行消息分发(设备 → 服务端)
upstreamDispatcher.dispatch(internalMessage);
// 1. 匹配主题订阅者
// 2. 通过事件总线推送到Core层
// 3. 在线设备直接推送
// 下行消息分发(服务端 → 设备)
downstreamDispatcher.dispatch(targetDeviceId, message);
// 1. 查询ConnectionManager获取Channel
// 2. 在线则直接发送
// 3. 离线则触发持久化暂存
4. core 模块 - 核心层
职责: 设备管理、认证授权、规则引擎、数据持久化、设备影子
核心功能:
- 设备注册与物模型管理
- X.509证书/JWT Token 认证
- ACL 动态权限控制
- 规则引擎(条件触发与动作执行)
- 时序数据写入 TDengine
- 元数据写入 MySQL
- 设备影子(期望状态 vs 上报状态)
包结构:
com.jysemel.iot.core
├── device # 设备管理
│ ├── DeviceRegistry # 设备注册/查询
│ ├── DeviceService # 设备业务服务
│ ├── model
│ │ ├── DeviceInfo # 设备实体
│ │ └── ProductModel # 物模型
│ └── repository
│ └── DeviceRepository # 数据访问
├── auth # 认证授权
│ ├── AuthenticationManager # 认证接口
│ ├── AuthorizationManager # ACL授权接口
│ └── token
│ └── JwtTokenProvider # JWT令牌生成/验证
├── rule # 规则引擎
│ ├── RuleEngine # 规则引擎接口
│ ├── DroolsRuleEngine # Drools实现
│ ├── ActionExecutor # 动作执行器
│ └── model
│ └── Rule # 规则定义
├── shadow # 设备影子
│ ├── DeviceShadowManager # 影子管理
│ ├── ShadowStore # Redis存储
│ └── ShadowSyncService # 状态同步
├── storage # 数据存储
│ ├── TimeSeriesWriter # 时序数据写入
│ ├── MetadataCache # 元数据缓存
│ └── DatabaseConfig # 数据库配置
└── event
└── CoreEventConsumer # 事件消费者
设备注册示例:
// 注册新设备
DeviceInfo device = DeviceInfo.builder()
.productKey("sensor_product")
.deviceKey("temp_sensor_001")
.deviceName("温度传感器001")
.protocolType(ProtocolType.MQTT)
.authType(AuthType.JWT)
.build();
deviceRegistry.register(device);
规则引擎使用:
// 定义规则
Rule rule = Rule.builder()
.ruleId("alert_high_temp")
.name("高温告警")
.condition("temperature > 80")
.action(ActionType.NOTIFY)
.actionConfig("{\"target\": \"admin@example.com\", \"method\": \"email\"}")
.enabled(true)
.build();
ruleEngine.addRule(rule);
// 规则触发(自动)
// 当设备上报 temperature > 80 时,自动发送邮件通知
设备影子操作:
// 更新期望状态(云端下发)
shadowManager.updateDesired(deviceId, "{\"targetTemp\": 25}");
// 更新上报状态(设备上报)
shadowManager.updateReported(deviceId, "{\"currentTemp\": 23}");
// 获取差异
ShadowDiff diff = shadowManager.getDiff(deviceId);
// diff = {desired: {targetTemp: 25}, reported: {currentTemp: 23}}
// 设备上线时同步期望状态
shadowSyncService.syncOnConnect(deviceId);
5. business 模块 - 业务层
职责: 提供面向用户的 REST API、WebSocket 推送、管理后台
核心功能:
- 设备管理 REST API
- 设备影子控制台 API
- 告警接收与推送(钉钉/邮件)
- OTA 升级管理
- WebSocket 实时数据推送
- OpenAPI 网关
包结构:
com.jysemel.iot.business
├── controller # REST控制器
│ ├── DeviceController # 设备管理API
│ ├── ShadowController # 影子控制台API
│ ├── AlertController # 告警API
│ └── OtaController # OTA升级API
├── dashboard # 实时监控
│ ├── WebSocketHandler # WebSocket推送
│ └── DashboardService # 大屏数据服务
├── alert # 告警管理
│ ├── AlertService # 告警服务
│ └── AlertRuleListener # 告警规则监听
├── ota # OTA升级
│ ├── OtaManager # 固件管理
│ ├── OtaProcessor # 升级分发
│ └── model
│ └── Firmware # 固件实体
└── openapi # 开放API
├── OpenApiGateway # API网关
└── dto
└── ApiResponse # 统一响应格式
REST API 示例:
# 查询设备列表
GET http://localhost:8081/iot/api/devices?page=1&size=20
Authorization: Bearer <jwt-token>
# 创建设备
POST http://localhost:8081/iot/api/devices
Content-Type: application/json
{
"productKey": "sensor_product",
"deviceKey": "temp_sensor_002",
"deviceName": "温度传感器002"
}
# 查询设备影子
GET http://localhost:8081/iot/api/shadow/{deviceId}
# 更新期望状态
PUT http://localhost:8081/iot/api/shadow/{deviceId}/desired
{
"targetTemp": 25,
"mode": "auto"
}
WebSocket 实时推送:
// 前端连接WebSocket
const ws = new WebSocket('ws://localhost:8081/iot/ws/dashboard');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('收到实时数据:', data);
// 更新仪表盘图表
};
// 订阅特定设备数据
ws.send(JSON.stringify({
type: 'subscribe',
deviceId: 'temp_sensor_001'
}));
告警推送配置:
// 钉钉告警
AlertService.sendDingTalk(webhookUrl, alertMessage);
// 邮件告警
AlertService.sendEmail(toAddress, subject, content);
6. bootstrap 模块 - 启动模块
职责: 提供 Spring Boot 启动类,配置组件扫描和外部化配置
包结构:
com.jysemel.iot.bootstrap
├── IotPlatformApplication # Spring Boot主类
└── config # 配置类
├── NettyConfig # Netty服务器配置
├── EventBusConfig # 事件总线配置
└── RedisConfig # Redis配置
启动类:
@EnableAsync // 启用异步支持
@EnableScheduling // 启用定时任务
@SpringBootApplication(scanBasePackages = "com.jysemel.iot")
public class IotPlatformApplication {
public static void main(String[] args) {
SpringApplication.run(IotPlatformApplication.class, args);
System.out.println("IoT物联网平台启动成功!");
}
}
核心功能
1. 多协议接入
MQTT 协议
标准 MQTT v3.1.1 协议支持:
# 使用 mosquitto_pub 测试
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m '{"data": "hello"}'
# 使用 mosquitto_sub 订阅
mosquitto_sub -h localhost -p 1883 -t "test/#"
自定义 TCP 协议
定义自己的二进制协议格式:
帧格式:
| 魔数(1B) | 版本号(1B) | 命令字(2B) | 数据长度(4B) | 数据(NB) | CRC(2B) |
示例:
AA 01 00 01 00 00 00 0A [数据...] [CRC]
实现适配器:
@Component
public class CustomTcpProtocolAdapter implements ProtocolAdapter {
@Override
public ProtocolType supportProtocol() {
return ProtocolType.TCP_CUSTOM;
}
@Override
public InternalMessage decode(ByteBuf buffer) {
// 读取魔数
byte magic = buffer.readByte();
if (magic != 0xAA) {
throw new ProtocolNotSupportException("Invalid magic number");
}
// 读取版本号、命令字、数据长度
byte version = buffer.readByte();
short command = buffer.readShort();
int length = buffer.readInt();
// 读取数据
byte[] data = new byte[length];
buffer.readBytes(data);
// 读取CRC
short crc = buffer.readShort();
// 转换为InternalMessage
return InternalMessage.builder()
.payload(new String(data, StandardCharsets.UTF_8))
.timestamp(System.currentTimeMillis())
.build();
}
}
2. 设备认证与授权
JWT Token 认证
// 生成Token
String token = jwtTokenProvider.generateToken(deviceIdentity);
// 验证Token
boolean valid = jwtTokenProvider.validateToken(token);
DeviceIdentity identity = jwtTokenProvider.getIdentityFromToken(token);
ACL 权限控制
// 设置ACL规则
authorizationManager.setAcl(deviceId, Arrays.asList(
new AclRule("sensors/+/temperature", Permission.SUBSCRIBE),
new AclRule("devices/" + deviceId + "/commands", Permission.PUBLISH)
));
// 检查权限
boolean allowed = authorizationManager.checkPermission(deviceId,
"sensors/room1/temperature", Permission.PUBLISH);
3. 规则引擎
规则定义(JSON格式)
{
"ruleId": "temp_alert",
"name": "温度超限告警",
"description": "当温度超过80度时发送告警",
"condition": "temperature > 80 AND humidity < 30",
"actions": [
{
"type": "NOTIFY",
"config": {
"method": "email",
"target": "admin@example.com"
}
},
{
"type": "INVOKE_API",
"config": {
"url": "http://external-system.com/api/alert",
"method": "POST"
}
}
],
"enabled": true
}
内置函数
规则条件支持以下函数:
avg(field, window): 窗口平均值max(field, window): 窗口最大值min(field, window): 窗口最小值count(field, window): 窗口计数rate(field, window): 变化率
示例:
avg(temperature, 5m) > 75 // 5分钟平均温度超过75度
rate(temperature, 1m) > 10 // 1分钟内温度变化超过10度
4. 设备影子
设备影子用于存储设备的期望状态和上报状态,解决设备离线时的状态同步问题。
影子文档结构:
{
"deviceId": "temp_sensor_001",
"desired": {
"targetTemp": 25,
"mode": "auto",
"updateTime": "2026-05-19 10:00:00"
},
"reported": {
"currentTemp": 23,
"batteryLevel": 85,
"updateTime": "2026-05-19 10:05:00"
},
"metadata": {
"desired": {
"targetTemp": { "timestamp": 1716091200000 }
},
"reported": {
"currentTemp": { "timestamp": 1716091500000 }
}
}
}
同步流程:
- 设备上线时,Core层调用
ShadowSyncService.syncOnConnect() - 比较
desired和reported的差异 - 将差异通过下行消息发送给设备
- 设备调整状态后上报新的
reported - 持续同步直到两者一致
5. OTA 升级
升级流程
1. 上传固件 → Business层(OtaManager)
2. 创建升级任务 → 选择目标设备
3. 推送升级通知 → Routing层 → Access层 → 设备
4. 设备下载固件 → 验证签名 → 安装
5. 设备重启 → 上报新版本号
6. 更新升级状态 → 完成任务
API 示例
# 上传固件
POST http://localhost:8081/iot/api/ota/firmware
Content-Type: multipart/form-data
firmware=@firmware_v1.2.bin
version=1.2
description="修复温度传感器bug"
# 创建升级任务
POST http://localhost:8081/iot/api/ota/task
{
"firmwareId": "fw_001",
"deviceIds": ["temp_sensor_001", "temp_sensor_002"],
"strategy": "BATCH", // BATCH:批量, GRAY:灰度
"batchSize": 10
}
# 查询升级状态
GET http://localhost:8081/iot/api/ota/task/{taskId}/status
配置说明
application.yml 完整配置
# 服务器配置
server:
port: 8081
servlet:
context-path: /iot
# Spring配置
spring:
application:
name: iot-platform
# Redis配置
data:
redis:
host: localhost
port: 6379
password: # Redis密码(如有)
database: 0
timeout: 3000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: -1ms
# JSON序列化配置
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
# MQTT服务器配置
mqtt:
server:
port: 1883 # MQTT监听端口
boss-threads: 1 # Boss线程数
worker-threads: 8 # Worker线程数
heartbeat-timeout: 120 # 心跳超时时间(秒)
version: MQTT_3_1_1 # MQTT协议版本
# Netty配置
netty:
so:
keep-alive: true # TCP保活
tcp-no-delay: true # 禁用Nagle算法
rcv-buf: 65536 # 接收缓冲区大小
snd-buf: 65536 # 发送缓冲区大小
# 日志配置
logging:
level:
root: info
com.jysemel.iot: debug # IoT模块DEBUG级别
io.netty: info
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n'
file:
name: logs/iot-platform.log
max-size: 100MB
max-history: 30
# JWT配置
jwt:
secret: iot-platform-secret-key-change-in-production-environment
expiration: 86400000 # Token有效期24小时(毫秒)
# TDengine配置(可选)
tdengine:
enabled: false
url: jdbc:TAOS://localhost:6030/iot_db
username: root
password: taosdata
# MySQL配置(可选)
mysql:
enabled: false
url: jdbc:mysql://localhost:3306/iot_db?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT+8
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
生产环境建议
修改JWT密钥:
jwt: secret: <使用强随机字符串>启用TLS/SSL:
mqtt: server: ssl-enabled: true cert-path: /path/to/cert.pem key-path: /path/to/key.pem调整线程池:
mqtt: server: boss-threads: 2 worker-threads: 16 # 根据CPU核心数调整启用数据库:
tdengine: enabled: true mysql: enabled: true配置日志滚动:
logging: file: max-size: 500MB max-history: 90
开发指南
添加新协议支持
步骤1: 在 common 模块添加协议类型
// ProtocolType.java
public enum ProtocolType {
MQTT, COAP, HTTP, TCP_CUSTOM, MY_NEW_PROTOCOL
}
步骤2: 在 access 模块实现协议适配器
@Component
public class MyNewProtocolAdapter implements ProtocolAdapter {
@Override
public ProtocolType supportProtocol() {
return ProtocolType.MY_NEW_PROTOCOL;
}
@Override
public InternalMessage decode(ByteBuf buffer) {
// 实现解码逻辑
}
@Override
public ByteBuf encode(InternalMessage message) {
// 实现编码逻辑
}
}
步骤3: 在 ProtocolDetector 添加识别规则
private ProtocolType detectProtocol(ByteBuf buffer) {
byte firstByte = buffer.getByte(0);
// 添加新协议识别
if (firstByte == 0xBB) {
return ProtocolType.MY_NEW_PROTOCOL;
}
// ... 其他协议判断
}
步骤4: 在 IotServerInitializer 添加Handler
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加协议识别
pipeline.addLast("protocolDetector", new ProtocolDetector());
// 根据协议添加对应Handler
pipeline.addLast("myProtocolHandler", new MyProtocolHandler());
// ... 其他Handler
}
添加新规则动作
步骤1: 定义动作类型
// ActionType.java
public enum ActionType {
NOTIFY, INVOKE_API, STORE_DATA, MY_CUSTOM_ACTION
}
步骤2: 实现动作执行器
@Component
public class MyCustomActionExecutor implements ActionExecutor {
@Override
public boolean support(ActionType type) {
return type == ActionType.MY_CUSTOM_ACTION;
}
@Override
public void execute(Rule rule, InternalMessage message) {
// 实现自定义动作逻辑
log.info("执行自定义动作: {}", rule.getRuleId());
// 例如:调用第三方服务
String apiUrl = rule.getActionConfig().get("url");
httpClient.post(apiUrl, buildPayload(message));
}
}
步骤3: 在规则中使用
{
"ruleId": "custom_rule",
"condition": "temperature > 80",
"actions": [
{
"type": "MY_CUSTOM_ACTION",
"config": {
"url": "http://third-party.com/api/action",
"param1": "value1"
}
}
]
}
添加新的REST API
步骤1: 创建Controller
@RestController
@RequestMapping("/api/custom")
public class CustomController {
@Autowired
private CustomService customService;
@GetMapping("/data")
public ApiResponse<List<DataVO>> getData(@RequestParam String deviceId) {
List<DataVO> data = customService.queryData(deviceId);
return ApiResponse.success(data);
}
@PostMapping("/command")
public ApiResponse<Void> sendCommand(@RequestBody CommandDTO command) {
customService.sendCommand(command);
return ApiResponse.success();
}
}
步骤2: 实现Service
@Service
public class CustomService {
@Autowired
private DownstreamDispatcher downstreamDispatcher;
public void sendCommand(CommandDTO command) {
InternalMessage message = InternalMessage.builder()
.deviceId(new DeviceIdentity(command.getProductKey(), command.getDeviceKey()))
.topic("devices/" + command.getDeviceKey() + "/commands")
.payload(JsonUtil.toJson(command.getData()))
.build();
downstreamDispatcher.dispatch(message.getDeviceId(), message);
}
}
单元测试
测试协议编解码:
@SpringBootTest
class MqttProtocolAdapterTest {
@Autowired
private MqttProtocolAdapter adapter;
@Test
void testDecodeConnectPacket() {
ByteBuf buffer = Unpooled.buffer();
// 构造MQTT CONNECT报文
buffer.writeBytes(mqttConnectBytes);
InternalMessage message = adapter.decode(buffer);
assertNotNull(message);
assertEquals(ProtocolType.MQTT, message.getProtocolType());
}
@Test
void testEncodePublishPacket() {
InternalMessage message = InternalMessage.builder()
.topic("test/topic")
.payload("Hello")
.build();
ByteBuf buffer = adapter.encode(message);
assertNotNull(buffer);
assertTrue(buffer.readableBytes() > 0);
}
}
测试主题匹配:
@SpringBootTest
class TrieTopicMatcherTest {
@Autowired
private TopicMatcher topicMatcher;
@Test
void testSingleWildcard() {
topicMatcher.subscribe("sensors/+/temperature", "device1");
List<String> matched = topicMatcher.match("sensors/room1/temperature");
assertTrue(matched.contains("device1"));
}
@Test
void testMultiWildcard() {
topicMatcher.subscribe("sensors/#", "device1");
List<String> matched = topicMatcher.match("sensors/room1/floor1/temperature");
assertTrue(matched.contains("device1"));
}
}
常见问题
1. 启动失败:端口被占用
错误信息:
Address already in use: bind
解决方案:
# 查看端口占用
netstat -ano | findstr :1883
taskkill /PID <pid> /F
# 或修改配置文件中的端口
mqtt:
server:
port: 1884
2. Redis连接失败
错误信息:
io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
解决方案:
确认Redis已启动
redis-cli ping # 应返回 PONG检查配置
spring: data: redis: host: localhost # 确认地址正确 port: 6379如果不需要Redis,可以暂时禁用相关功能
3. 设备连接后立即断开
可能原因:
- 心跳超时时间设置过短
- 认证失败
- ACL权限不足
排查步骤:
检查日志
tail -f logs/iot-platform.log | grep "device"验证认证信息
// 确认JWT Token有效 boolean valid = jwtTokenProvider.validateToken(token);检查心跳配置
mqtt: server: heartbeat-timeout: 120 # 增加超时时间
4. 消息无法送达
排查清单:
- [ ] 设备是否在线(检查ConnectionManager)
- [ ] 主题是否正确(检查TopicMatcher匹配结果)
- [ ] 订阅关系是否存在(检查SubscriptionManager)
- [ ] Channel是否有效(检查Channel.isActive())
调试代码:
// 检查设备连接状态
boolean online = connectionManager.isOnline(deviceId);
log.info("Device {} online: {}", deviceId, online);
// 检查订阅关系
List<Subscription> subs = subscriptionManager.getSubscriptionsByTopic(topic);
log.info("Subscriptions for {}: {}", topic, subs.size());
// 检查Channel状态
Channel channel = connectionManager.getChannel(deviceId);
if (channel != null) {
log.info("Channel active: {}", channel.isActive());
}
5. 规则引擎不触发
常见原因:
规则未启用
{ "enabled": true // 确保为true }条件表达式语法错误
// 正确: temperature > 80 // 错误: temperature > eighty字段名称不匹配
// 规则条件: temperature > 80 // 消息载荷: {"temp": 85} // 字段名不一致
调试方法:
// 启用规则引擎调试日志
logging:
level:
com.jysemel.iot.core.rule: debug
// 手动触发规则测试
ruleEngine.testRule(ruleId, testMessage);
6. 性能优化建议
高并发场景:
调整Netty线程池
mqtt: server: worker-threads: 16 # CPU核心数 * 2启用连接池
spring: data: redis: lettuce: pool: max-active: 20 max-idle: 10使用本地缓存
// Caffeine缓存配置 @Bean public Cache<String, DeviceInfo> deviceCache() { return Caffeine.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); }异步处理
@Async public void processMessage(InternalMessage message) { // 耗时操作 }批量写入
// 批量写入时序数据 timeSeriesWriter.batchWrite(messages);
7. 日志级别调整
开发环境:
logging:
level:
com.jysemel.iot: debug
io.netty.handler.codec.mqtt: debug
生产环境:
logging:
level:
com.jysemel.iot: info
io.netty: warn
临时调试特定模块:
# 通过Actuator动态调整(需引入spring-boot-starter-actuator)
curl -X POST http://localhost:8081/iot/actuator/loggers/com.jysemel.iot.access \
-H "Content-Type: application/json" \
-d '{"configuredLevel": "DEBUG"}'
附录
A. 项目结构总览
mqtt-boot/
├── pom.xml # 父POM
├── README.md # 项目说明
├── TUTORIAL.md # 本教程
├── sample-00/ # 示例项目
│ ├── pom.xml
│ ├── sample-00-iot-common/ # 公共模块
│ │ ├── pom.xml
│ │ ├── README.md
│ │ └── src/main/java/com/jysemel/iot/common/
│ ├── sample-00-iot-access/ # 接入层
│ │ ├── pom.xml
│ │ ├── README.md
│ │ └── src/main/java/com/jysemel/iot/access/
│ ├── sample-00-iot-routing/ # 路由层
│ │ ├── pom.xml
│ │ ├── README.md
│ │ └── src/main/java/com/jysemel/iot/routing/
│ ├── sample-00-iot-core/ # 核心层
│ │ ├── pom.xml
│ │ ├── README.md
│ │ └── src/main/java/com/jysemel/iot/core/
│ ├── sample-00-iot-business/ # 业务层
│ │ ├── pom.xml
│ │ ├── README.md
│ │ └── src/main/java/com/jysemel/iot/business/
│ └── sample-00-iot-bootstrap/ # 启动模块
│ ├── pom.xml
│ ├── README.md
│ └── src/
│ ├── main/java/com/jysemel/iot/bootstrap/
│ └── main/resources/
│ └── application.yml
└── mqtt-simulator/ # MQTT模拟器(测试工具)
└── pom.xml
B. 依赖关系图
bootstrap (依赖所有模块)
├── business
│ ├── core
│ │ ├── routing
│ │ │ └── common
│ │ └── access
│ │ └── common
│ └── common
└── common
C. 端口说明
| 端口 | 用途 | 配置项 |
|---|---|---|
| 8081 | REST API | server.port |
| 1883 | MQTT服务器 | mqtt.server.port |
| 6379 | Redis | spring.data.redis.port |
D. 常用命令
# 编译项目
mvn clean install
# 跳过测试编译
mvn clean install -DskipTests
# 运行指定模块
cd sample-00/sample-00-iot-bootstrap
mvn spring-boot:run
# 打包
mvn clean package
# 查看依赖树
mvn dependency:tree
# 清理构建产物
mvn clean
结语
本教程详细介绍了 MQTT Boot IoT 平台的架构、功能和使用方法。通过这个框架,你可以快速构建自己的物联网平台,支持海量设备接入和实时数据处理。
