物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解核心点
连接配置、发布 / 订阅、回调、异常处理、断线重连、温湿度模拟场景
源码(mqtt-simulator-sample02)
https://gitee.com/kcnf-iot/mqtt-simulator
搭载 emqx 公共免费的broker
https://www.emqx.com/zh/mqtt/public-mqtt5-broker
相关依赖
<!-- MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
配置核心
MqttConnectionOptions 连接配置(核心)
所有连接参数都通过这个类设置,同步 / 异步通用:
| 方法 | 作用 | 说明 |
|---|---|---|
| setKeepAliveInterval (秒) | 心跳间隔 | 客户端定时发心跳,防止连接被断开 |
| setConnectionTimeout (秒) | 连接超时 | 连接服务器超时时间,0 = 永不超时 |
| setCleanStart(true/false) | 清理会话 | true = 断开后清除订阅 / 消息;false = 离线消息持久化 |
| setUserName/setPassword | 认证 | 服务器需要鉴权时使用 |
| setAutomaticReconnect | 自动重连 | 同步 API 推荐手动重连,更可控 |
消息发布核心
- QoS 0:最多一次(发完即忘,不保证到达)
- QoS 1:至少一次(保证到达,可能重复)
- QoS 2: exactly once( exactly once,无重复无丢失)
- Retained:保留消息,新订阅者立即收到最后一条消息
订阅主题
- 单主题:
sensor/temp - 通配符
+:单层匹配sensor/+/data - 通配符
#:多层匹配sensor/# - 支持多主题 + 不同 QoS 同时订阅
MqttCallback 回调接口(必须实现)
connectionLost:连接丢失(触发重连)messageArrived:收到消息(核心)deliveryComplete:消息发布完成确认
异常处理
所有 MQTT 操作都会抛出
MqttException,通过getReasonCode()获取错误码排查问题。
源码展示
package com.jysemel.iot;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import java.nio.charset.StandardCharsets;
import java.util.Random;
/**
* 规范 + 高性能 + 生产可靠
* MQTT v5 发布者(自动重连、心跳保活、无资源泄漏)
*/
public class MqttSensorPublisherReConnect {
private static MqttClient client;
private static MqttConnectionOptions options;
private static final Random random = new Random();
private static final int KEEP_ALIVE = 10; // 心跳10秒(规范)
private static final int PUB_INTERVAL = 2000; // 发布间隔2秒
private static final int RECONNECT_INTERVAL = 2000;// 重连间隔2秒(避免频繁攻击服务器)
public static void main(String[] args) {
startPublishLoop();
simulateAbortDisconnect(); // 测试用:模拟异常断开
}
// ====================== 【规范】发布主循环(高性能、低CPU) ======================
private static void startPublishLoop() {
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// 【规范】连接状态校验 + 自动重连
if (client == null || !client.isConnected()) {
closeClient(); // 【规范】先释放旧资源
connect(); // 重建连接
}
// 【规范】发布消息
publishSensorData();
} catch (Exception e) {
System.err.println("【异常】连接异常,准备重连...");
client = null; // 标记重连
}
// 【性能】固定间隔,降低CPU占用
sleep(PUB_INTERVAL);
}
}, "mqtt-publish-thread").start();
}
// ====================== 【规范】发布业务 ======================
private static void publishSensorData() throws MqttException {
String temp = "温度:" + String.format("%.1f", 20 + random.nextFloat() * 10);
MqttMessage msg = new MqttMessage(temp.getBytes(StandardCharsets.UTF_8));
msg.setQos(1); // 【规范】工业级QoS
msg.setRetained(false); // 【规范】传感器数据不保留
client.publish(MqttConst.TOPIC_TEMP, msg);
System.out.println("已发布:" + temp);
}
// ====================== 【规范】连接方法 ======================
private static void connect() {
try {
client = new MqttClient(MqttConst.BROKER, MqttConst.PUBLISH_CLIENT_ID, new MemoryPersistence());
options = new MqttConnectionOptions();
// MQTT v5 标准连接配置
options.setKeepAliveInterval(KEEP_ALIVE);
options.setConnectionTimeout(5);
options.setCleanStart(true);
options.setUserName(MqttConst.USERNAME);
options.setPassword(MqttConst.PASSWORD.getBytes(StandardCharsets.UTF_8));
client.connect(options);
System.out.println("【发布者】连接成功 ✅");
} catch (Exception e) {
System.err.println("连接失败," + (RECONNECT_INTERVAL / 1000) + "秒后重试...");
sleep(RECONNECT_INTERVAL);
}
}
// ====================== 【规范】关闭客户端(防止资源泄漏) ======================
private static void closeClient() {
if (client != null) {
try {
if (client.isConnected()) {
client.disconnect();
}
client.close();
} catch (MqttException ignored) {}
}
}
// ====================== 【测试】模拟异常断开 ======================
private static void simulateAbortDisconnect() {
new Thread(() -> {
sleep(10000);
System.err.println("\n===== 模拟异常断开 =====");
try {
if (client != null && client.isConnected()) {
client.disconnectForcibly(0);
}
} catch (Exception ignored) {}
}, "mqtt-simulate-disconnect").start();
}
// ====================== 【工具】睡眠方法 ======================
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
演示结果

核心点总结
- 同步 API:MqttClient 是核心类,所有操作阻塞执行,简单易控
- 连接配置:MqttConnectionOptions 管理心跳、超时、会话、认证
- 发布:支持单条 / 批量、QoS0/1/2、保留消息
- 订阅:支持单主题、多主题、通配符(+/#)、自定义 QoS
- 回调:MqttCallback 处理连接丢失、消息接收、发布确认
- 异常:捕获 MqttException,通过错误码定位问题
- 重连:连接丢失后循环重连 + 重新订阅,保证服务可用性
