砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
        • 核心实战(安全)
    • mqtt-模拟器

      • 集成Paho

        • 设备模拟器设计
        • 设备模拟器演示
        • Paho拆解入门
        • Paho拆解核心
        • Paho拆解高性能
        • 其他客户端框架比较
      • NetAssist

        • 设备模拟器设计
    • netty-mqtt-boot

      • 模块化设计
      • 物联网一体化设计
      • 统一接入层
      • 消息路由与流转层
      • 核心服务层
      • 业务应用层
      • 整体项目管理
      • 测试脚手架
      • 兼容支持
    • mqtt-压测

      • mqtt-jmeter

        • 模块化设计
      • ‌emqtt-bench

        • 模块化设计
    • mqtt-规则引擎

      • MQTT规则引擎
      • ice

        • 模块化设计
      • Aviator

        • 模块化设计
      • Drools

        • 模块化设计
  • 物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能
    • 源码(mqtt-simulator-sample02)
    • 搭载 emqx 公共免费的broker
    • 相关依赖
    • 同步 vs 异步
    • 基于MqttAsyncClient构建
      • 部分源码
    • 演示结果

物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能

MqttAsyncClient 非阻塞、高吞吐MQTT 应用

源码(mqtt-simulator-sample02)

https://gitee.com/kcnf-iot/mqtt-simulator

搭载 emqx 公共免费的broker

https://www.emqx.com/zh/mqtt/public-mqtt5-broker

相关依赖

<!-- MQTT Client -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>

同步 vs 异步

维度同步 API(MqttClient)异步 API(MqttAsyncClient)
阻塞性阻塞等待执行完成非阻塞,操作完回调通知
线程模型当前线程等待底层 IO 线程,不阻塞主线程
高并发性能低,大量请求会阻塞极高,适合高吞吐、多设备
适用场景简单设备、单连接、低频次网关、高吞吐、消息转发、服务端
核心回调仅消息回调IMqttActionListener 监听所有操作结果

基于MqttAsyncClient构建

部分源码

package com.jysemel.iot;

import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class MqttAsyncGateway {

    private MqttAsyncClient upClient;  // 上行:收设备消息
    private MqttAsyncClient downClient;// 下行:转发消息

    // ====================== 频率控制(每秒最多1条,可自己改) ======================
    private static final long PUBLISH_INTERVAL = 2000; // 2秒一条
    private long lastSendTime = 0;

    public static void main(String[] args) {
        new MqttAsyncGateway().start();
    }

    public void start() {
        try {
            // 创建两个异步客户端
            upClient = new MqttAsyncClient(MqttConst.UP_BROKER, MqttConst.UP_CLIENT_ID, new MemoryPersistence());
            downClient = new MqttAsyncClient(MqttConst.DOWN_BROKER, MqttConst.DOWN_CLIENT_ID, new MemoryPersistence());

            MqttConnectionOptions options = new MqttConnectionOptions();
            options.setKeepAliveInterval(10);
            options.setCleanStart(true);

            // 连接上行
            upClient.connect(options, null, new MqttActionListener() {
                @Override
                public void onSuccess(IMqttToken token) {
                    System.out.println("✅ 上行连接成功");
                    subscribeDeviceTopic();
                }

                @Override
                public void onFailure(IMqttToken token, Throwable exception) {
                    System.err.println("❌ 上行连接失败");
                }
            });

            // 连接下行
            downClient.connect(options, null, new MqttActionListener() {
                @Override
                public void onSuccess(IMqttToken token) {
                    System.out.println("✅ 下行连接成功");
                }

                @Override
                public void onFailure(IMqttToken token, Throwable exception) {
                    System.err.println("❌ 下行连接失败");
                }
            });

            // 消息回调
            upClient.setCallback(new MqttCallback() {
                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    String content = new String(message.getPayload()).trim();

                    // ====================== 频率控制核心 ======================
                    long now = System.currentTimeMillis();
                    if (now - lastSendTime < PUBLISH_INTERVAL) {
                        System.out.println("⏱ 频率限制,跳过转发:" + content);
                        return;
                    }
                    lastSendTime = now;

                    System.out.println("\n📥 收到:" + topic + " -> " + content);
                    forwardToDownstream(content); // 转发
                }

                @Override
                public void disconnected(MqttDisconnectResponse disconnectResponse) {}
                @Override
                public void mqttErrorOccurred(MqttException exception) {}
                @Override
                public void deliveryComplete(IMqttToken token) {}
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {}
                @Override
                public void authPacketArrived(int reasonCode, MqttProperties properties) {}
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 订阅
    private void subscribeDeviceTopic() {
        try {
            upClient.subscribe(MqttConst.DEVICE_TOPIC, 1, null, new MqttActionListener() {
                @Override
                public void onSuccess(IMqttToken token) {
                    System.out.println("✅ 订阅成功:" + MqttConst.DEVICE_TOPIC);
                }

                @Override
                public void onFailure(IMqttToken token, Throwable exception) {
                    System.err.println("❌ 订阅失败");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 异步转发
    private void forwardToDownstream(String content) {
        try {
            MqttMessage msg = new MqttMessage(content.getBytes());
            msg.setQos(1);

            downClient.publish(MqttConst.FORWARD_TOPIC, msg, null, new MqttActionListener() {
                @Override
                public void onSuccess(IMqttToken token) {
                    System.out.println("📤 转发完成:" + content);
                }

                @Override
                public void onFailure(IMqttToken token, Throwable exception) {
                    System.err.println("❌ 转发失败");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

演示结果

img

最近更新: 2026/6/3 11:51
Contributors: kcnf
Prev
Paho拆解核心
Next
其他客户端框架比较