砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
    • netty-mqtt-boot

      • 物联网实战

        • 模块化设计
        • 创建工程并引入
        • 编写可托管的 Netty 服务器启动器
        • 在 Pipeline 中添加 MQTT 编解码器
        • 实现 CONNECT 与 PUBLISH 基础响应
        • 内存中维护订阅-发布映射并转发消息
        • 集成 application.yml 管理 Netty 参数
        • 增加 REST API 监控连接与订阅状态
  • 物联网实战:Spring Boot + Netty 搭建 MQTT
    • 源码(sample-00)
    • 目录
    • 项目简介
      • 核心特性
    • 技术架构
      • 技术栈
      • 模块架构
      • 数据流向
    • 环境准备
      • 必需软件
      • 可选组件
    • 快速开始
      • 1. 克隆项目
      • 2. 编译项目
      • 3. 配置应用
      • 4. 启动应用
      • 5. 验证启动
      • 6. 测试连接
    • 模块详解
      • 1. common 模块 - 公共基础
      • 2. access 模块 - 接入层
      • 3. routing 模块 - 路由层
      • 4. core 模块 - 核心层
      • 5. business 模块 - 业务层
      • 6. bootstrap 模块 - 启动模块
    • 核心功能
      • 1. 多协议接入
        • MQTT 协议
        • 自定义 TCP 协议
      • 2. 设备认证与授权
        • JWT Token 认证
        • ACL 权限控制
      • 3. 规则引擎
        • 规则定义(JSON格式)
        • 内置函数
      • 4. 设备影子
      • 5. OTA 升级
        • 升级流程
        • API 示例
    • 配置说明
      • application.yml 完整配置
      • 生产环境建议
    • 开发指南
      • 添加新协议支持
      • 添加新规则动作
      • 添加新的REST API
      • 单元测试
    • 常见问题
      • 1. 启动失败:端口被占用
      • 2. Redis连接失败
      • 3. 设备连接后立即断开
      • 4. 消息无法送达
      • 5. 规则引擎不触发
      • 6. 性能优化建议
      • 7. 日志级别调整
    • 附录
      • A. 项目结构总览
      • B. 依赖关系图
      • C. 端口说明
      • D. 常用命令
    • 结语

物联网实战:Spring Boot + Netty 搭建 MQTT

源码(sample-00)

https://gitee.com/kcnf-iot/mqtt-boot/tree/master/sample-00

目录

  1. 项目简介
  2. 技术架构
  3. 环境准备
  4. 快速开始
  5. 模块详解
  6. 核心功能
  7. 配置说明
  8. 开发指南
  9. 常见问题

项目简介

MQTT Boot 是一个基于 Spring Boot 和 Netty 构建的轻量级 IoT 物联网平台框架。它提供了完整的设备接入、消息路由、规则引擎和设备管理能力,支持多种物联网协议(MQTT、CoAP、HTTP/WebSocket、自定义TCP协议)。

核心特性

  • 多协议接入: 支持 MQTT、CoAP、HTTP/WebSocket 及自定义 TCP 协议
  • 高性能通信: 基于 Netty 构建的高性能网络通信层
  • 灵活路由: Trie 树实现的主题匹配与消息分发
  • 设备管理: 设备注册、认证授权、设备影子
  • 规则引擎: 基于条件的规则触发与动作执行
  • 数据存储: 支持 Redis 缓存、TDengine 时序数据库、MySQL 元数据
  • 模块化设计: 清晰的六层架构,便于扩展和维护

技术架构

技术栈

技术版本用途
Java21运行时环境
Spring Boot3.5.11应用框架
Netty最新版网络通信框架
Maven3.x依赖管理
Redis-缓存与订阅关系存储
Caffeine-本地缓存
JWT0.11.5认证令牌
Lombok1.18.34代码简化
Fastjson22.0.62JSON 处理

模块架构

mqtt-boot (根项目)
└── sample-00 (示例项目)
    ├── sample-00-iot-common      # 公共模块:模型、常量、工具类
    ├── sample-00-iot-access      # 接入层:Netty服务器、协议适配
    ├── sample-00-iot-routing     # 路由层:主题匹配、消息分发
    ├── sample-00-iot-core        # 核心层:设备管理、规则引擎、设备影子
    ├── sample-00-iot-business    # 业务层:REST API、WebSocket、告警、OTA
    └── sample-00-iot-bootstrap   # 启动模块:Spring Boot主类、配置

数据流向

设备端 → Access层(协议解析) → Routing层(主题匹配) → Core层(规则引擎/持久化) → Business层(API推送)
                                                                    ↓
                                                              外部系统(TDengine/MySQL/Redis)

环境准备

必需软件

  1. JDK 21

    # 验证Java版本
    java -version
    
    # 应输出类似内容:
    # openjdk version "21.0.x" 2026-xx-xx LTS
    
  2. Maven 3.6+

    # 验证Maven版本
    mvn -version
    
  3. Redis (可选但推荐)

    # 安装Redis (Ubuntu)
    sudo apt-get install redis-server
    
    # 启动Redis
    redis-server
    
    # 或使用Docker
    docker run -d -p 6379:6379 redis:latest
    
  4. IDE推荐: IntelliJ IDEA 2024+ (支持Java 21)

可选组件

  • TDengine: 时序数据存储
  • MySQL: 元数据存储
  • Docker: 容器化部署

快速开始

1. 克隆项目

git clone <repository-url>
cd mqtt-boot

2. 编译项目

# 在项目根目录执行
mvn clean install

3. 配置应用

编辑 sample-00/sample-00-iot-bootstrap/src/main/resources/application.yml:

# 修改Redis连接(如果使用)
spring:
  data:
    redis:
      host: localhost
      port: 6379

# 修改MQTT服务器端口
mqtt:
  server:
    port: 1883

4. 启动应用

# 方式1: 使用Maven
cd sample-00/sample-00-iot-bootstrap
mvn spring-boot:run

# 方式2: 打包后运行
mvn clean package
java -jar target/sample-00-iot-bootstrap-1.0-SNAPSHOT.jar

# 方式3: IDE中直接运行
# 运行 IotPlatformApplication.main()

5. 验证启动

启动成功后会看到以下输出:

IoT物联网平台启动成功!

检查服务状态:

  • REST API: http://localhost:8081/iot
  • MQTT服务器: localhost:1883

6. 测试连接

使用 MQTT 客户端工具(如 MQTT.fx、MQTT Explorer)连接:

Host: localhost
Port: 1883
Protocol: MQTT v3.1.1

发布/订阅测试主题:

Topic: test/topic
Payload: {"message": "Hello IoT"}

模块详解

1. common 模块 - 公共基础

职责: 提供跨模块共享的数据结构、工具类和常量定义

包结构:

com.jysemel.iot.common
├── model                    # 统一消息模型
│   ├── InternalMessage      # 内部统一消息对象
│   ├── DeviceIdentity       # 设备身份标识
│   └── ProtocolType         # 协议类型枚举
├── exception                # 通用异常类
│   ├── IotException
│   ├── DeviceOfflineException
│   └── ProtocolNotSupportException
├── constants                # 常量定义
│   ├── TopicConstants       # 主题常量
│   └── RedisKeys            # Redis键名常量
└── util                     # 工具类
    ├── JsonUtil             # JSON处理工具
    └── NettyUtil            # Netty辅助工具

关键类说明:

  • InternalMessage: 统一的内部消息格式,所有协议转换后的消息都使用此格式
  • DeviceIdentity: 设备唯一标识(产品Key + 设备Key)
  • ProtocolType: 支持的协议类型枚举(MQTT, CoAP, HTTP, TCP_CUSTOM)

使用示例:

// 创建设备身份
DeviceIdentity identity = new DeviceIdentity("product001", "device001");

// 创建内部消息
InternalMessage message = InternalMessage.builder()
    .deviceId(identity)
    .topic("sensors/temperature")
    .payload("{\"temp\": 25.5}")
    .qos(1)
    .timestamp(System.currentTimeMillis())
    .build();

2. access 模块 - 接入层

职责: 基于 Netty 启动 TCP/UDP 服务,处理多协议接入

核心功能:

  • 多端口监听,支持不同协议绑定
  • 首包嗅探自动识别协议
  • 协议编解码器插件化
  • 连接管理(channelId ↔ deviceId 映射)
  • 心跳超时检测
  • TLS/SSL 支持
  • 流量整形(令牌桶限流)

包结构:

com.jysemel.iot.access
├── server                      # Netty服务器
│   ├── IotNettyServer          # 启动Netty服务
│   └── IotServerInitializer    # ChannelPipeline初始化
├── protocol                    # 协议适配器
│   ├── ProtocolAdapter         # 适配器接口
│   ├── MqttProtocolAdapter     # MQTT协议实现
│   ├── CoapProtocolAdapter     # CoAP协议实现
│   ├── HttpWsProtocolAdapter   # HTTP/WebSocket实现
│   ├── CustomTcpProtocolAdapter # 自定义TCP协议
│   └── ProtocolDetector        # 首包嗅探器
├── codec                       # 编解码器
│   └── MqttCodec               # MQTT编解码
├── connection                  # 连接管理
│   ├── ConnectionManager       # 连接管理接口
│   └── InMemoryConnectionManager # 内存实现
├── heartbeat                   # 心跳检测
│   ├── HeartbeatHandler        # 心跳处理器
│   └── HeartbeatCallback       # 超时回调
├── security                    # 安全控制
│   ├── SslContextFactory       # TLS配置
│   ├── RateLimiterInterceptor  # 限流拦截器
│   └── AclPreFilter            # ACL前置校验
└── event                       # 事件定义
    ├── DeviceOnlineEvent       # 设备上线事件
    └── DeviceOfflineEvent      # 设备下线事件

工作流程:

  1. 启动阶段: IotNettyServer 创建 EventLoopGroup,绑定端口
  2. 连接建立: 新连接到达,IotServerInitializer 初始化 Pipeline
  3. 协议识别: ProtocolDetector 分析首包字节特征,确定协议类型
  4. 协议解析: 对应的 ProtocolAdapter 解析消息,转换为 InternalMessage
  5. 连接注册: ConnectionManager 记录 channelId 与 deviceId 的映射
  6. 心跳监控: HeartbeatHandler 检测空闲连接,超时触发离线事件

扩展现有协议:

// 1. 实现协议适配器
@Component
public class MyProtocolAdapter implements ProtocolAdapter {
    
    @Override
    public ProtocolType supportProtocol() {
        return ProtocolType.MY_CUSTOM;
    }
    
    @Override
    public InternalMessage decode(ByteBuf buffer) {
        // 解析自定义协议字节流
        // ...
        return message;
    }
    
    @Override
    public ByteBuf encode(InternalMessage message) {
        // 编码为自定义协议字节流
        // ...
        return buffer;
    }
}

// 2. 在 ProtocolDetector 中添加识别规则
private ProtocolType detectProtocol(ByteBuf buffer) {
    byte firstByte = buffer.getByte(0);
    if (firstByte == 0xAA) {  // 自定义协议魔数
        return ProtocolType.MY_CUSTOM;
    }
    // ... 其他协议判断
}

3. routing 模块 - 路由层

职责: 维护主题订阅关系,实现消息的路由分发

核心功能:

  • Trie 树实现的主题匹配(支持 + 和 # 通配符)
  • 订阅关系管理(Redis + Caffeine 两级缓存)
  • 上行消息分发:匹配订阅者并推送
  • 下行消息处理:查找目标设备 Channel
  • 保留消息存储

包结构:

com.jysemel.iot.routing
├── matcher                     # 主题匹配
│   ├── TopicMatcher            # 匹配接口
│   └── TrieTopicMatcher        # Trie树实现
├── subscription                # 订阅管理
│   ├── SubscriptionManager     # 管理接口
│   ├── RedisSubscriptionManager # Redis实现
│   └── Subscription            # 订阅实体
├── dispatcher                  # 消息分发
│   ├── UpstreamDispatcher      # 上行分发器
│   ├── DownstreamDispatcher    # 下行分发器
│   └── RetainedMessageStore    # 保留消息存储
├── cache                       # 缓存
│   └── LocalSubscriptionCache  # 本地缓存
└── config
    └── RoutingAutoConfiguration # 自动配置

主题匹配规则:

  • +: 单层通配符,匹配任意一个层级

    • sensors/+/temperature 匹配 sensors/room1/temperature
    • 不匹配 sensors/room1/floor1/temperature
  • #: 多层通配符,匹配剩余所有层级(必须在末尾)

    • sensors/# 匹配 sensors/room1/temperature 和 sensors/room1/floor1/humidity

订阅流程:

// 设备订阅主题
Subscription sub = new Subscription(deviceId, "sensors/+/temperature", QoS.AT_LEAST_ONCE);
subscriptionManager.subscribe(sub);

// 发布消息时匹配订阅者
List<Subscription> matchedSubs = topicMatcher.match("sensors/room1/temperature");
// 返回所有匹配的订阅关系

消息分发:

// 上行消息分发(设备 → 服务端)
upstreamDispatcher.dispatch(internalMessage);
// 1. 匹配主题订阅者
// 2. 通过事件总线推送到Core层
// 3. 在线设备直接推送

// 下行消息分发(服务端 → 设备)
downstreamDispatcher.dispatch(targetDeviceId, message);
// 1. 查询ConnectionManager获取Channel
// 2. 在线则直接发送
// 3. 离线则触发持久化暂存

4. core 模块 - 核心层

职责: 设备管理、认证授权、规则引擎、数据持久化、设备影子

核心功能:

  • 设备注册与物模型管理
  • X.509证书/JWT Token 认证
  • ACL 动态权限控制
  • 规则引擎(条件触发与动作执行)
  • 时序数据写入 TDengine
  • 元数据写入 MySQL
  • 设备影子(期望状态 vs 上报状态)

包结构:

com.jysemel.iot.core
├── device                      # 设备管理
│   ├── DeviceRegistry          # 设备注册/查询
│   ├── DeviceService           # 设备业务服务
│   ├── model
│   │   ├── DeviceInfo          # 设备实体
│   │   └── ProductModel        # 物模型
│   └── repository
│       └── DeviceRepository    # 数据访问
├── auth                        # 认证授权
│   ├── AuthenticationManager   # 认证接口
│   ├── AuthorizationManager    # ACL授权接口
│   └── token
│       └── JwtTokenProvider    # JWT令牌生成/验证
├── rule                        # 规则引擎
│   ├── RuleEngine              # 规则引擎接口
│   ├── DroolsRuleEngine        # Drools实现
│   ├── ActionExecutor          # 动作执行器
│   └── model
│       └── Rule                # 规则定义
├── shadow                      # 设备影子
│   ├── DeviceShadowManager     # 影子管理
│   ├── ShadowStore             # Redis存储
│   └── ShadowSyncService       # 状态同步
├── storage                     # 数据存储
│   ├── TimeSeriesWriter        # 时序数据写入
│   ├── MetadataCache           # 元数据缓存
│   └── DatabaseConfig          # 数据库配置
└── event
    └── CoreEventConsumer       # 事件消费者

设备注册示例:

// 注册新设备
DeviceInfo device = DeviceInfo.builder()
    .productKey("sensor_product")
    .deviceKey("temp_sensor_001")
    .deviceName("温度传感器001")
    .protocolType(ProtocolType.MQTT)
    .authType(AuthType.JWT)
    .build();

deviceRegistry.register(device);

规则引擎使用:

// 定义规则
Rule rule = Rule.builder()
    .ruleId("alert_high_temp")
    .name("高温告警")
    .condition("temperature > 80")
    .action(ActionType.NOTIFY)
    .actionConfig("{\"target\": \"admin@example.com\", \"method\": \"email\"}")
    .enabled(true)
    .build();

ruleEngine.addRule(rule);

// 规则触发(自动)
// 当设备上报 temperature > 80 时,自动发送邮件通知

设备影子操作:

// 更新期望状态(云端下发)
shadowManager.updateDesired(deviceId, "{\"targetTemp\": 25}");

// 更新上报状态(设备上报)
shadowManager.updateReported(deviceId, "{\"currentTemp\": 23}");

// 获取差异
ShadowDiff diff = shadowManager.getDiff(deviceId);
// diff = {desired: {targetTemp: 25}, reported: {currentTemp: 23}}

// 设备上线时同步期望状态
shadowSyncService.syncOnConnect(deviceId);

5. business 模块 - 业务层

职责: 提供面向用户的 REST API、WebSocket 推送、管理后台

核心功能:

  • 设备管理 REST API
  • 设备影子控制台 API
  • 告警接收与推送(钉钉/邮件)
  • OTA 升级管理
  • WebSocket 实时数据推送
  • OpenAPI 网关

包结构:

com.jysemel.iot.business
├── controller                  # REST控制器
│   ├── DeviceController        # 设备管理API
│   ├── ShadowController        # 影子控制台API
│   ├── AlertController         # 告警API
│   └── OtaController           # OTA升级API
├── dashboard                   # 实时监控
│   ├── WebSocketHandler        # WebSocket推送
│   └── DashboardService        # 大屏数据服务
├── alert                       # 告警管理
│   ├── AlertService            # 告警服务
│   └── AlertRuleListener       # 告警规则监听
├── ota                         # OTA升级
│   ├── OtaManager              # 固件管理
│   ├── OtaProcessor            # 升级分发
│   └── model
│       └── Firmware            # 固件实体
└── openapi                     # 开放API
    ├── OpenApiGateway          # API网关
    └── dto
        └── ApiResponse         # 统一响应格式

REST API 示例:

# 查询设备列表
GET http://localhost:8081/iot/api/devices?page=1&size=20
Authorization: Bearer <jwt-token>

# 创建设备
POST http://localhost:8081/iot/api/devices
Content-Type: application/json
{
  "productKey": "sensor_product",
  "deviceKey": "temp_sensor_002",
  "deviceName": "温度传感器002"
}

# 查询设备影子
GET http://localhost:8081/iot/api/shadow/{deviceId}

# 更新期望状态
PUT http://localhost:8081/iot/api/shadow/{deviceId}/desired
{
  "targetTemp": 25,
  "mode": "auto"
}

WebSocket 实时推送:

// 前端连接WebSocket
const ws = new WebSocket('ws://localhost:8081/iot/ws/dashboard');

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('收到实时数据:', data);
  // 更新仪表盘图表
};

// 订阅特定设备数据
ws.send(JSON.stringify({
  type: 'subscribe',
  deviceId: 'temp_sensor_001'
}));

告警推送配置:

// 钉钉告警
AlertService.sendDingTalk(webhookUrl, alertMessage);

// 邮件告警
AlertService.sendEmail(toAddress, subject, content);

6. bootstrap 模块 - 启动模块

职责: 提供 Spring Boot 启动类,配置组件扫描和外部化配置

包结构:

com.jysemel.iot.bootstrap
├── IotPlatformApplication      # Spring Boot主类
└── config                      # 配置类
    ├── NettyConfig             # Netty服务器配置
    ├── EventBusConfig          # 事件总线配置
    └── RedisConfig             # Redis配置

启动类:

@EnableAsync          // 启用异步支持
@EnableScheduling     // 启用定时任务
@SpringBootApplication(scanBasePackages = "com.jysemel.iot")
public class IotPlatformApplication {
    public static void main(String[] args) {
        SpringApplication.run(IotPlatformApplication.class, args);
        System.out.println("IoT物联网平台启动成功!");
    }
}

核心功能

1. 多协议接入

MQTT 协议

标准 MQTT v3.1.1 协议支持:

# 使用 mosquitto_pub 测试
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m '{"data": "hello"}'

# 使用 mosquitto_sub 订阅
mosquitto_sub -h localhost -p 1883 -t "test/#"

自定义 TCP 协议

定义自己的二进制协议格式:

帧格式:
| 魔数(1B) | 版本号(1B) | 命令字(2B) | 数据长度(4B) | 数据(NB) | CRC(2B) |

示例:
AA 01 00 01 00 00 00 0A [数据...] [CRC]

实现适配器:

@Component
public class CustomTcpProtocolAdapter implements ProtocolAdapter {
    
    @Override
    public ProtocolType supportProtocol() {
        return ProtocolType.TCP_CUSTOM;
    }
    
    @Override
    public InternalMessage decode(ByteBuf buffer) {
        // 读取魔数
        byte magic = buffer.readByte();
        if (magic != 0xAA) {
            throw new ProtocolNotSupportException("Invalid magic number");
        }
        
        // 读取版本号、命令字、数据长度
        byte version = buffer.readByte();
        short command = buffer.readShort();
        int length = buffer.readInt();
        
        // 读取数据
        byte[] data = new byte[length];
        buffer.readBytes(data);
        
        // 读取CRC
        short crc = buffer.readShort();
        
        // 转换为InternalMessage
        return InternalMessage.builder()
            .payload(new String(data, StandardCharsets.UTF_8))
            .timestamp(System.currentTimeMillis())
            .build();
    }
}

2. 设备认证与授权

JWT Token 认证

// 生成Token
String token = jwtTokenProvider.generateToken(deviceIdentity);

// 验证Token
boolean valid = jwtTokenProvider.validateToken(token);
DeviceIdentity identity = jwtTokenProvider.getIdentityFromToken(token);

ACL 权限控制

// 设置ACL规则
authorizationManager.setAcl(deviceId, Arrays.asList(
    new AclRule("sensors/+/temperature", Permission.SUBSCRIBE),
    new AclRule("devices/" + deviceId + "/commands", Permission.PUBLISH)
));

// 检查权限
boolean allowed = authorizationManager.checkPermission(deviceId, 
    "sensors/room1/temperature", Permission.PUBLISH);

3. 规则引擎

规则定义(JSON格式)

{
  "ruleId": "temp_alert",
  "name": "温度超限告警",
  "description": "当温度超过80度时发送告警",
  "condition": "temperature > 80 AND humidity < 30",
  "actions": [
    {
      "type": "NOTIFY",
      "config": {
        "method": "email",
        "target": "admin@example.com"
      }
    },
    {
      "type": "INVOKE_API",
      "config": {
        "url": "http://external-system.com/api/alert",
        "method": "POST"
      }
    }
  ],
  "enabled": true
}

内置函数

规则条件支持以下函数:

  • avg(field, window): 窗口平均值
  • max(field, window): 窗口最大值
  • min(field, window): 窗口最小值
  • count(field, window): 窗口计数
  • rate(field, window): 变化率

示例:

avg(temperature, 5m) > 75  // 5分钟平均温度超过75度
rate(temperature, 1m) > 10 // 1分钟内温度变化超过10度

4. 设备影子

设备影子用于存储设备的期望状态和上报状态,解决设备离线时的状态同步问题。

影子文档结构:

{
  "deviceId": "temp_sensor_001",
  "desired": {
    "targetTemp": 25,
    "mode": "auto",
    "updateTime": "2026-05-19 10:00:00"
  },
  "reported": {
    "currentTemp": 23,
    "batteryLevel": 85,
    "updateTime": "2026-05-19 10:05:00"
  },
  "metadata": {
    "desired": {
      "targetTemp": { "timestamp": 1716091200000 }
    },
    "reported": {
      "currentTemp": { "timestamp": 1716091500000 }
    }
  }
}

同步流程:

  1. 设备上线时,Core层调用 ShadowSyncService.syncOnConnect()
  2. 比较 desired 和 reported 的差异
  3. 将差异通过下行消息发送给设备
  4. 设备调整状态后上报新的 reported
  5. 持续同步直到两者一致

5. OTA 升级

升级流程

1. 上传固件 → Business层(OtaManager)
2. 创建升级任务 → 选择目标设备
3. 推送升级通知 → Routing层 → Access层 → 设备
4. 设备下载固件 → 验证签名 → 安装
5. 设备重启 → 上报新版本号
6. 更新升级状态 → 完成任务

API 示例

# 上传固件
POST http://localhost:8081/iot/api/ota/firmware
Content-Type: multipart/form-data
firmware=@firmware_v1.2.bin
version=1.2
description="修复温度传感器bug"

# 创建升级任务
POST http://localhost:8081/iot/api/ota/task
{
  "firmwareId": "fw_001",
  "deviceIds": ["temp_sensor_001", "temp_sensor_002"],
  "strategy": "BATCH",  // BATCH:批量, GRAY:灰度
  "batchSize": 10
}

# 查询升级状态
GET http://localhost:8081/iot/api/ota/task/{taskId}/status

配置说明

application.yml 完整配置

# 服务器配置
server:
  port: 8081
  servlet:
    context-path: /iot

# Spring配置
spring:
  application:
    name: iot-platform
  
  # Redis配置
  data:
    redis:
      host: localhost
      port: 6379
      password:  # Redis密码(如有)
      database: 0
      timeout: 3000ms
      lettuce:
        pool:
          max-active: 8
          max-idle: 8
          min-idle: 0
          max-wait: -1ms
  
  # JSON序列化配置
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

# MQTT服务器配置
mqtt:
  server:
    port: 1883              # MQTT监听端口
    boss-threads: 1         # Boss线程数
    worker-threads: 8       # Worker线程数
    heartbeat-timeout: 120  # 心跳超时时间(秒)
    version: MQTT_3_1_1     # MQTT协议版本

# Netty配置
netty:
  so:
    keep-alive: true        # TCP保活
    tcp-no-delay: true      # 禁用Nagle算法
    rcv-buf: 65536          # 接收缓冲区大小
    snd-buf: 65536          # 发送缓冲区大小

# 日志配置
logging:
  level:
    root: info
    com.jysemel.iot: debug  # IoT模块DEBUG级别
    io.netty: info
  pattern:
    console: '%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n'
  file:
    name: logs/iot-platform.log
    max-size: 100MB
    max-history: 30

# JWT配置
jwt:
  secret: iot-platform-secret-key-change-in-production-environment
  expiration: 86400000  # Token有效期24小时(毫秒)

# TDengine配置(可选)
tdengine:
  enabled: false
  url: jdbc:TAOS://localhost:6030/iot_db
  username: root
  password: taosdata

# MySQL配置(可选)
mysql:
  enabled: false
  url: jdbc:mysql://localhost:3306/iot_db?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT+8
  username: root
  password: root
  driver-class-name: com.mysql.cj.jdbc.Driver

生产环境建议

  1. 修改JWT密钥:

    jwt:
      secret: <使用强随机字符串>
    
  2. 启用TLS/SSL:

    mqtt:
      server:
        ssl-enabled: true
        cert-path: /path/to/cert.pem
        key-path: /path/to/key.pem
    
  3. 调整线程池:

    mqtt:
      server:
        boss-threads: 2
        worker-threads: 16  # 根据CPU核心数调整
    
  4. 启用数据库:

    tdengine:
      enabled: true
    mysql:
      enabled: true
    
  5. 配置日志滚动:

    logging:
      file:
        max-size: 500MB
        max-history: 90
    

开发指南

添加新协议支持

步骤1: 在 common 模块添加协议类型

// ProtocolType.java
public enum ProtocolType {
    MQTT, COAP, HTTP, TCP_CUSTOM, MY_NEW_PROTOCOL
}

步骤2: 在 access 模块实现协议适配器

@Component
public class MyNewProtocolAdapter implements ProtocolAdapter {
    
    @Override
    public ProtocolType supportProtocol() {
        return ProtocolType.MY_NEW_PROTOCOL;
    }
    
    @Override
    public InternalMessage decode(ByteBuf buffer) {
        // 实现解码逻辑
    }
    
    @Override
    public ByteBuf encode(InternalMessage message) {
        // 实现编码逻辑
    }
}

步骤3: 在 ProtocolDetector 添加识别规则

private ProtocolType detectProtocol(ByteBuf buffer) {
    byte firstByte = buffer.getByte(0);
    
    // 添加新协议识别
    if (firstByte == 0xBB) {
        return ProtocolType.MY_NEW_PROTOCOL;
    }
    
    // ... 其他协议判断
}

步骤4: 在 IotServerInitializer 添加Handler

@Override
protected void initChannel(Channel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    
    // 添加协议识别
    pipeline.addLast("protocolDetector", new ProtocolDetector());
    
    // 根据协议添加对应Handler
    pipeline.addLast("myProtocolHandler", new MyProtocolHandler());
    
    // ... 其他Handler
}

添加新规则动作

步骤1: 定义动作类型

// ActionType.java
public enum ActionType {
    NOTIFY, INVOKE_API, STORE_DATA, MY_CUSTOM_ACTION
}

步骤2: 实现动作执行器

@Component
public class MyCustomActionExecutor implements ActionExecutor {
    
    @Override
    public boolean support(ActionType type) {
        return type == ActionType.MY_CUSTOM_ACTION;
    }
    
    @Override
    public void execute(Rule rule, InternalMessage message) {
        // 实现自定义动作逻辑
        log.info("执行自定义动作: {}", rule.getRuleId());
        
        // 例如:调用第三方服务
        String apiUrl = rule.getActionConfig().get("url");
        httpClient.post(apiUrl, buildPayload(message));
    }
}

步骤3: 在规则中使用

{
  "ruleId": "custom_rule",
  "condition": "temperature > 80",
  "actions": [
    {
      "type": "MY_CUSTOM_ACTION",
      "config": {
        "url": "http://third-party.com/api/action",
        "param1": "value1"
      }
    }
  ]
}

添加新的REST API

步骤1: 创建Controller

@RestController
@RequestMapping("/api/custom")
public class CustomController {
    
    @Autowired
    private CustomService customService;
    
    @GetMapping("/data")
    public ApiResponse<List<DataVO>> getData(@RequestParam String deviceId) {
        List<DataVO> data = customService.queryData(deviceId);
        return ApiResponse.success(data);
    }
    
    @PostMapping("/command")
    public ApiResponse<Void> sendCommand(@RequestBody CommandDTO command) {
        customService.sendCommand(command);
        return ApiResponse.success();
    }
}

步骤2: 实现Service

@Service
public class CustomService {
    
    @Autowired
    private DownstreamDispatcher downstreamDispatcher;
    
    public void sendCommand(CommandDTO command) {
        InternalMessage message = InternalMessage.builder()
            .deviceId(new DeviceIdentity(command.getProductKey(), command.getDeviceKey()))
            .topic("devices/" + command.getDeviceKey() + "/commands")
            .payload(JsonUtil.toJson(command.getData()))
            .build();
        
        downstreamDispatcher.dispatch(message.getDeviceId(), message);
    }
}

单元测试

测试协议编解码:

@SpringBootTest
class MqttProtocolAdapterTest {
    
    @Autowired
    private MqttProtocolAdapter adapter;
    
    @Test
    void testDecodeConnectPacket() {
        ByteBuf buffer = Unpooled.buffer();
        // 构造MQTT CONNECT报文
        buffer.writeBytes(mqttConnectBytes);
        
        InternalMessage message = adapter.decode(buffer);
        
        assertNotNull(message);
        assertEquals(ProtocolType.MQTT, message.getProtocolType());
    }
    
    @Test
    void testEncodePublishPacket() {
        InternalMessage message = InternalMessage.builder()
            .topic("test/topic")
            .payload("Hello")
            .build();
        
        ByteBuf buffer = adapter.encode(message);
        
        assertNotNull(buffer);
        assertTrue(buffer.readableBytes() > 0);
    }
}

测试主题匹配:

@SpringBootTest
class TrieTopicMatcherTest {
    
    @Autowired
    private TopicMatcher topicMatcher;
    
    @Test
    void testSingleWildcard() {
        topicMatcher.subscribe("sensors/+/temperature", "device1");
        
        List<String> matched = topicMatcher.match("sensors/room1/temperature");
        
        assertTrue(matched.contains("device1"));
    }
    
    @Test
    void testMultiWildcard() {
        topicMatcher.subscribe("sensors/#", "device1");
        
        List<String> matched = topicMatcher.match("sensors/room1/floor1/temperature");
        
        assertTrue(matched.contains("device1"));
    }
}

常见问题

1. 启动失败:端口被占用

错误信息:

Address already in use: bind

解决方案:

# 查看端口占用
netstat -ano | findstr :1883
taskkill /PID <pid> /F

# 或修改配置文件中的端口
mqtt:
  server:
    port: 1884

2. Redis连接失败

错误信息:

io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379

解决方案:

  1. 确认Redis已启动

    redis-cli ping
    # 应返回 PONG
    
  2. 检查配置

    spring:
      data:
        redis:
          host: localhost  # 确认地址正确
          port: 6379
    
  3. 如果不需要Redis,可以暂时禁用相关功能


3. 设备连接后立即断开

可能原因:

  • 心跳超时时间设置过短
  • 认证失败
  • ACL权限不足

排查步骤:

  1. 检查日志

    tail -f logs/iot-platform.log | grep "device"
    
  2. 验证认证信息

    // 确认JWT Token有效
    boolean valid = jwtTokenProvider.validateToken(token);
    
  3. 检查心跳配置

    mqtt:
      server:
        heartbeat-timeout: 120  # 增加超时时间
    

4. 消息无法送达

排查清单:

  • [ ] 设备是否在线(检查ConnectionManager)
  • [ ] 主题是否正确(检查TopicMatcher匹配结果)
  • [ ] 订阅关系是否存在(检查SubscriptionManager)
  • [ ] Channel是否有效(检查Channel.isActive())

调试代码:

// 检查设备连接状态
boolean online = connectionManager.isOnline(deviceId);
log.info("Device {} online: {}", deviceId, online);

// 检查订阅关系
List<Subscription> subs = subscriptionManager.getSubscriptionsByTopic(topic);
log.info("Subscriptions for {}: {}", topic, subs.size());

// 检查Channel状态
Channel channel = connectionManager.getChannel(deviceId);
if (channel != null) {
    log.info("Channel active: {}", channel.isActive());
}

5. 规则引擎不触发

常见原因:

  1. 规则未启用

    {
      "enabled": true  // 确保为true
    }
    
  2. 条件表达式语法错误

    // 正确: temperature > 80
    // 错误: temperature > eighty
    
  3. 字段名称不匹配

    // 规则条件: temperature > 80
    // 消息载荷: {"temp": 85}  // 字段名不一致
    

调试方法:

// 启用规则引擎调试日志
logging:
  level:
    com.jysemel.iot.core.rule: debug

// 手动触发规则测试
ruleEngine.testRule(ruleId, testMessage);

6. 性能优化建议

高并发场景:

  1. 调整Netty线程池

    mqtt:
      server:
        worker-threads: 16  # CPU核心数 * 2
    
  2. 启用连接池

    spring:
      data:
        redis:
          lettuce:
            pool:
              max-active: 20
              max-idle: 10
    
  3. 使用本地缓存

    // Caffeine缓存配置
    @Bean
    public Cache<String, DeviceInfo> deviceCache() {
        return Caffeine.newBuilder()
            .maximumSize(10000)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();
    }
    
  4. 异步处理

    @Async
    public void processMessage(InternalMessage message) {
        // 耗时操作
    }
    
  5. 批量写入

    // 批量写入时序数据
    timeSeriesWriter.batchWrite(messages);
    

7. 日志级别调整

开发环境:

logging:
  level:
    com.jysemel.iot: debug
    io.netty.handler.codec.mqtt: debug

生产环境:

logging:
  level:
    com.jysemel.iot: info
    io.netty: warn

临时调试特定模块:

# 通过Actuator动态调整(需引入spring-boot-starter-actuator)
curl -X POST http://localhost:8081/iot/actuator/loggers/com.jysemel.iot.access \
  -H "Content-Type: application/json" \
  -d '{"configuredLevel": "DEBUG"}'

附录

A. 项目结构总览

mqtt-boot/
├── pom.xml                          # 父POM
├── README.md                        # 项目说明
├── TUTORIAL.md                      # 本教程
├── sample-00/                       # 示例项目
│   ├── pom.xml
│   ├── sample-00-iot-common/        # 公共模块
│   │   ├── pom.xml
│   │   ├── README.md
│   │   └── src/main/java/com/jysemel/iot/common/
│   ├── sample-00-iot-access/        # 接入层
│   │   ├── pom.xml
│   │   ├── README.md
│   │   └── src/main/java/com/jysemel/iot/access/
│   ├── sample-00-iot-routing/       # 路由层
│   │   ├── pom.xml
│   │   ├── README.md
│   │   └── src/main/java/com/jysemel/iot/routing/
│   ├── sample-00-iot-core/          # 核心层
│   │   ├── pom.xml
│   │   ├── README.md
│   │   └── src/main/java/com/jysemel/iot/core/
│   ├── sample-00-iot-business/      # 业务层
│   │   ├── pom.xml
│   │   ├── README.md
│   │   └── src/main/java/com/jysemel/iot/business/
│   └── sample-00-iot-bootstrap/     # 启动模块
│       ├── pom.xml
│       ├── README.md
│       └── src/
│           ├── main/java/com/jysemel/iot/bootstrap/
│           └── main/resources/
│               └── application.yml
└── mqtt-simulator/                  # MQTT模拟器(测试工具)
    └── pom.xml

B. 依赖关系图

bootstrap (依赖所有模块)
    ├── business
    │   ├── core
    │   │   ├── routing
    │   │   │   └── common
    │   │   └── access
    │   │       └── common
    │   └── common
    └── common

C. 端口说明

端口用途配置项
8081REST APIserver.port
1883MQTT服务器mqtt.server.port
6379Redisspring.data.redis.port

D. 常用命令

# 编译项目
mvn clean install

# 跳过测试编译
mvn clean install -DskipTests

# 运行指定模块
cd sample-00/sample-00-iot-bootstrap
mvn spring-boot:run

# 打包
mvn clean package

# 查看依赖树
mvn dependency:tree

# 清理构建产物
mvn clean

结语

本教程详细介绍了 MQTT Boot IoT 平台的架构、功能和使用方法。通过这个框架,你可以快速构建自己的物联网平台,支持海量设备接入和实时数据处理。

最近更新: 2026/5/20 08:59
Contributors: kcnf
Prev
模块化设计
Next
编写可托管的 Netty 服务器启动器