物联网实战:Spring Boot + Netty 搭建 MQTT | MQTT 设备模拟器
IoT 设备模拟器 - 支持 MQTT、TCP、UDP、CoAP、HTTP 多种协议



项目简介
定位
mqtt-simulator 是一个用于模拟 IoT 设备的测试工具,帮助开发者:
- 模拟多种协议(MQTT、TCP、UDP、CoAP、HTTP)的设备终端
- 通过 REST API 动态管理设备生命周期
- 进行功能测试、压力测试和协议兼容性验证
- 作为 sample-00 项目的客户端配套测试使用
核心功能
| 功能 | 说明 |
|---|---|
| 多协议支持 | MQTT (v3.1.1/v5.0)、TCP、UDP、CoAP、HTTP/WebSocket |
| 设备管理 | 动态创建设备、启动、停止、批量操作 |
| 消息发送 | 支持文本、二进制(Base64)、JSON 格式 |
| 主题订阅 | 订阅/取消订阅、接收服务器下推命令 |
| 消息日志 | 记录发送/接收消息历史,支持查询和清理 |
| 健康检查 | 设备状态监控、连接状态检测 |
| API 文档 | 集成 Swagger UI,可视化 API 操作 |
技术栈
Spring Boot 3.5 + Java 21
├── MQTT Client: Eclipse Paho MQTT v5
├── Protocol: TCP / UDP / CoAP / HTTP
├── Database: SQLite (本地存储)
├── API Doc: SpringDoc OpenAPI (Swagger UI)
└── Build: Maven
模块结构
com.jysemel.iot.simulator
├── protocol/ # 协议实现
│ ├── mqtt/ # MQTT/MQTTS 协议
│ ├── tcp/ # TCP 自定义协议
│ ├── udp/ # UDP 协议
│ ├── coap/ # CoAP 协议
│ └── http/ # HTTP/WebSocket 协议
├── controller/ # REST API 控制器
│ ├── device/ # 设备管理
│ ├── message/ # 消息测试
│ ├── sub/ # 订阅管理
│ ├── log/ # 日志查询
│ └── health/ # 健康检查
├── model/ # 数据模型
├── doa/ # 数据访问层
└── config/ # 配置类
快速开始
# 1. 启动项目
cd mqtt-simulator
mvn spring-boot:run
# 2. 访问 Swagger UI
http://localhost:8082/swagger-ui.html
# 3. 常用 API
# 创建设备: POST /api/device/add
# 启动设备: POST /api/device/start
# 发布消息: POST /api/message/publish
# 订阅主题: POST /api/sub/subscribe
1. 项目简介
1.1 这是什么?
这是一个模拟 IoT 设备的软件,可以:
- ✅ 模拟温度传感器、智能灯、开关等设备
- ✅ 通过 MQTT 协议发送/接收消息
- ✅ 提供 Web 界面实时监控
- ✅ 一键启动多个设备进行压力测试
1.2 为什么需要模拟器?
| 场景 | 说明 |
|---|---|
| 🧪 开发测试 | 没有真实硬件时,用软件模拟设备测试服务器 |
| 🎓 学习MQTT | 直观理解 MQTT 的工作原理 |
| 📊 压力测试 | 模拟100个设备同时在线,测试服务器性能 |
| 🐛 调试问题 | 复现特定场景,排查bug |
1.3 技术栈
后端:Spring Boot 3.x + Java 21
MQTT客户端:Eclipse Paho MQTT v5
实时通信:WebSocket (STOMP协议)
JSON处理:Fastjson2
简化代码:Lombok
API文档:SpringDoc OpenAPI (Swagger UI) ⭐新增
2. 快速开始
2.1 环境要求
# 检查Java版本(需要21或以上)
java -version
# 检查Maven(需要3.6+)
mvn -version
2.2 启动步骤
步骤1:准备MQTT服务器
你需要一个MQTT Broker(服务器),推荐选择:
选项A:使用公共测试服务器(最简单)
地址:tcp://broker.emqx.io:1883
无需安装,直接连接
选项B:本地安装EMQX(推荐学习用)
# macOS
brew install emqx
# Windows(下载安装包)
# https://www.emqx.io/zh/downloads
# 启动EMQX
emqx start
# 访问管理界面
# http://localhost:18083
# 默认账号:admin/public
选项C:Docker运行
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
步骤2:编译项目
cd mqtt-simulator
mvn clean package -DskipTests
步骤3:启动应用
java -jar target/mqtt-simulator-1.0-SNAPSHOT.jar
看到以下输出表示成功:
╔══════════════════════════════════════════════════╗
║ MQTT Device Simulator Web v1.0 ║
║ IoT设备模拟测试平台 ║
║ ║
║ 访问地址: http://localhost:8080 ║
╚══════════════════════════════════════════════════╝
步骤4:访问Web界面
打开浏览器访问:http://localhost:8080
3. 核心概念
3.1 MQTT 是什么?
MQTT = Message Queuing Telemetry Transport(消息队列遥测传输)
💡 类比理解:MQTT就像"邮局系统"
- 设备 = 寄信人/收信人
- Broker(服务器) = 邮局
- Topic(主题) = 收件地址
- Message(消息) = 信件内容
3.2 核心术语
| 术语 | 英文 | 说明 | 类比 |
|---|---|---|---|
| Broker | Broker | MQTT服务器,转发消息 | 邮局 |
| Client | Client | 连接服务器的设备 | 寄信人 |
| Topic | Topic | 消息分类标签 | 收件地址 |
| Publish | Publish | 发送消息到主题 | 寄信 |
| Subscribe | Subscribe | 监听某个主题的消息 | 订阅报纸 |
| QoS | Quality of Service | 消息送达保证级别 | 快递类型 |
3.3 QoS 等级详解
QoS 0 - At most once(最多一次)
├─ 特点:发完不管,可能丢失
├─ 速度:最快
└─ 适用:传感器数据(丢几条没关系)
QoS 1 - At least once(至少一次)⭐ 推荐
├─ 特点:保证送达,可能重复
├─ 速度:中等
└─ 适用:大多数场景
QoS 2 - Exactly once(恰好一次)
├─ 特点:保证送达且不重复
├─ 速度:最慢
└─ 适用:金融交易等关键业务
3.4 主题设计
本项目使用的主题规范:
devices/{产品Key}/{设备Key}/{消息类型}
示例:
devices/smart_light/light_001/telemetry # 遥测数据上报
devices/smart_light/light_001/commands # 接收命令
devices/smart_light/light_001/status # 状态变化
通配符支持:
+/+ 单层通配符:
devices/+/light_001/telemetry → 匹配所有产品的light_001设备
# 多层通配符:
devices/# → 匹配devices下的所有消息
4. 代码结构
4.1 数据流向
【设备上报数据流程】
DeviceSimulator (定时任务)
↓ 生成遥测数据
MqttClientWrapper (发布消息)
↓ MQTT协议
MQTT Broker (转发消息)
↓
IoT平台 (接收并处理)
【服务器下发命令流程】
用户 (Web界面点击"开灯")
↓ HTTP请求
Controller (接收请求)
↓
Service (找到对应设备)
↓
MqttClientWrapper (发布命令到主题)
↓ MQTT协议
DeviceSimulator (收到消息)
↓ 执行回调
WebSocket (推送结果到前端)
↓
用户 (看到"开灯成功")
6. API 接口
🌐 Swagger UI(推荐⭐)
访问地址: http://localhost:8082/swagger-ui.html
优势:
- ✅ 可视化界面,无需安装任何工具
- ✅ 自动同步代码变更
- ✅ 一键测试API(Try it out)
- ✅ 查看请求/响应示例
- ✅ 支持参数自动补全
6.1 设备管理 API
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/device-management/start | 创建设备 |
| POST | /api/device-management/{key}/stop | 停止设备 |
| GET | /api/device-management/list | 获取列表 |
| GET | /api/device-management/statistics | 获取统计 |
6.2 消息测试 API
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/message-test/publish | 发布消息 |
| POST | /api/message-test/subscribe | 订阅主题 |
6.3 场景配置 API
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/scenario-config/demo | 演示场景(3个设备) |
| POST | /api/scenario-config/batch?count=10 | 批量场景 |
| POST | /api/scenario-config/stress?count=50 | 压力测试 |
| POST | /api/scenario-config/business/{type} | 业务场景 |
6.4 健康检查 API ⭐新增
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/health/mqtt?brokerUrl=xxx | 检查MQTT服务器连接 |
| GET | /api/health/app | 获取应用健康状态 |
| GET | /api/health/full?brokerUrl=xxx | 综合健康检查 |
| GET | /api/health/alive | 快速存活检查 |
使用示例:
# 1. 检查MQTT服务器是否启动
curl "http://localhost:8082/api/health/mqtt?brokerUrl=tcp://localhost:1883"
# 响应(成功):
{
"serverUrl": "tcp://localhost:1883",
"status": "UP",
"healthy": true,
"message": "MQTT服务器连接成功",
"responseTimeMs": 15
}
# 响应(失败):
{
"serverUrl": "tcp://localhost:1883",
"status": "DOWN",
"healthy": false,
"message": "MQTT服务器连接失败: Connection refused",
"responseTimeMs": 5023
}
# 2. 查看应用状态
curl http://localhost:8082/api/health/app
# 3. 综合健康检查
curl "http://localhost:8082/api/health/full?brokerUrl=tcp://localhost:1883"
应用场景:
- 🔍 启动前检查:确认MQTT服务器是否正常
- 📊 系统监控:定时检测服务器状态
- 🐛 故障排查:快速定位问题根源
- 🤖 自动化运维:集成到监控脚本中
6.5 WebSocket 主题
| 主题 | 方向 | 说明 |
|---|---|---|
/topic/devices | 服务器→前端 | 设备列表更新 |
/topic/messages | 服务器→前端 | 消息收发通知 |
/topic/statistics | 服务器→前端 | 统计数据更新 |
