物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能
MqttAsyncClient 非阻塞、高吞吐MQTT 应用
源码(mqtt-simulator-sample02)
https://gitee.com/kcnf-iot/mqtt-simulator
搭载 emqx 公共免费的broker
https://www.emqx.com/zh/mqtt/public-mqtt5-broker
相关依赖
<!-- MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
同步 vs 异步
| 维度 | 同步 API(MqttClient) | 异步 API(MqttAsyncClient) |
|---|---|---|
| 阻塞性 | 阻塞等待执行完成 | 非阻塞,操作完回调通知 |
| 线程模型 | 当前线程等待 | 底层 IO 线程,不阻塞主线程 |
| 高并发性能 | 低,大量请求会阻塞 | 极高,适合高吞吐、多设备 |
| 适用场景 | 简单设备、单连接、低频次 | 网关、高吞吐、消息转发、服务端 |
| 核心回调 | 仅消息回调 | IMqttActionListener 监听所有操作结果 |
基于MqttAsyncClient构建
部分源码
package com.jysemel.iot;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
public class MqttAsyncGateway {
private MqttAsyncClient upClient; // 上行:收设备消息
private MqttAsyncClient downClient;// 下行:转发消息
// ====================== 频率控制(每秒最多1条,可自己改) ======================
private static final long PUBLISH_INTERVAL = 2000; // 2秒一条
private long lastSendTime = 0;
public static void main(String[] args) {
new MqttAsyncGateway().start();
}
public void start() {
try {
// 创建两个异步客户端
upClient = new MqttAsyncClient(MqttConst.UP_BROKER, MqttConst.UP_CLIENT_ID, new MemoryPersistence());
downClient = new MqttAsyncClient(MqttConst.DOWN_BROKER, MqttConst.DOWN_CLIENT_ID, new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setKeepAliveInterval(10);
options.setCleanStart(true);
// 连接上行
upClient.connect(options, null, new MqttActionListener() {
@Override
public void onSuccess(IMqttToken token) {
System.out.println("✅ 上行连接成功");
subscribeDeviceTopic();
}
@Override
public void onFailure(IMqttToken token, Throwable exception) {
System.err.println("❌ 上行连接失败");
}
});
// 连接下行
downClient.connect(options, null, new MqttActionListener() {
@Override
public void onSuccess(IMqttToken token) {
System.out.println("✅ 下行连接成功");
}
@Override
public void onFailure(IMqttToken token, Throwable exception) {
System.err.println("❌ 下行连接失败");
}
});
// 消息回调
upClient.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
String content = new String(message.getPayload()).trim();
// ====================== 频率控制核心 ======================
long now = System.currentTimeMillis();
if (now - lastSendTime < PUBLISH_INTERVAL) {
System.out.println("⏱ 频率限制,跳过转发:" + content);
return;
}
lastSendTime = now;
System.out.println("\n📥 收到:" + topic + " -> " + content);
forwardToDownstream(content); // 转发
}
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {}
@Override
public void mqttErrorOccurred(MqttException exception) {}
@Override
public void deliveryComplete(IMqttToken token) {}
@Override
public void connectComplete(boolean reconnect, String serverURI) {}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {}
});
} catch (Exception e) {
e.printStackTrace();
}
}
// 订阅
private void subscribeDeviceTopic() {
try {
upClient.subscribe(MqttConst.DEVICE_TOPIC, 1, null, new MqttActionListener() {
@Override
public void onSuccess(IMqttToken token) {
System.out.println("✅ 订阅成功:" + MqttConst.DEVICE_TOPIC);
}
@Override
public void onFailure(IMqttToken token, Throwable exception) {
System.err.println("❌ 订阅失败");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
// 异步转发
private void forwardToDownstream(String content) {
try {
MqttMessage msg = new MqttMessage(content.getBytes());
msg.setQos(1);
downClient.publish(MqttConst.FORWARD_TOPIC, msg, null, new MqttActionListener() {
@Override
public void onSuccess(IMqttToken token) {
System.out.println("📤 转发完成:" + content);
}
@Override
public void onFailure(IMqttToken token, Throwable exception) {
System.err.println("❌ 转发失败");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
演示结果

