砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
        • 核心实战(安全)
    • mqtt-模拟器

      • 集成Paho

        • 设备模拟器设计
        • 设备模拟器演示
        • Paho拆解入门
        • Paho拆解核心
        • Paho拆解高性能
        • 其他客户端框架比较
      • NetAssist

        • 设备模拟器设计
    • netty-mqtt-boot

      • 模块化设计
      • 物联网一体化设计
      • 统一接入层
      • 消息路由与流转层
      • 核心服务层
      • 业务应用层
      • 整体项目管理
      • 测试脚手架
      • 兼容支持
    • mqtt-压测

      • mqtt-jmeter

        • 模块化设计
      • ‌emqtt-bench

        • 模块化设计
    • mqtt-规则引擎

      • MQTT规则引擎
      • ice

        • 模块化设计
      • Aviator

        • 模块化设计
      • Drools

        • 模块化设计
  • 物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解核心点
    • 源码(mqtt-simulator-sample02)
    • 搭载 emqx 公共免费的broker
    • 相关依赖
    • 配置核心
      • MqttConnectionOptions 连接配置(核心)
      • 消息发布核心
      • 订阅主题
      • MqttCallback 回调接口(必须实现)
      • 异常处理
    • 源码展示
    • 演示结果
    • 核心点总结

物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解核心点

连接配置、发布 / 订阅、回调、异常处理、断线重连、温湿度模拟场景

源码(mqtt-simulator-sample02)

https://gitee.com/kcnf-iot/mqtt-simulator

搭载 emqx 公共免费的broker

https://www.emqx.com/zh/mqtt/public-mqtt5-broker

相关依赖

<!-- MQTT Client -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>

配置核心

MqttConnectionOptions 连接配置(核心)

所有连接参数都通过这个类设置,同步 / 异步通用:

方法作用说明
setKeepAliveInterval (秒)心跳间隔客户端定时发心跳,防止连接被断开
setConnectionTimeout (秒)连接超时连接服务器超时时间,0 = 永不超时
setCleanStart(true/false)清理会话true = 断开后清除订阅 / 消息;false = 离线消息持久化
setUserName/setPassword认证服务器需要鉴权时使用
setAutomaticReconnect自动重连同步 API 推荐手动重连,更可控

消息发布核心

  • QoS 0:最多一次(发完即忘,不保证到达)
  • QoS 1:至少一次(保证到达,可能重复)
  • QoS 2: exactly once( exactly once,无重复无丢失)
  • Retained:保留消息,新订阅者立即收到最后一条消息

订阅主题

  • 单主题:sensor/temp
  • 通配符 +:单层匹配 sensor/+/data
  • 通配符 #:多层匹配 sensor/#
  • 支持多主题 + 不同 QoS 同时订阅

MqttCallback 回调接口(必须实现)

  • connectionLost:连接丢失(触发重连)
  • messageArrived:收到消息(核心)
  • deliveryComplete:消息发布完成确认

异常处理

所有 MQTT 操作都会抛出 MqttException,通过 getReasonCode() 获取错误码排查问题。

源码展示

package com.jysemel.iot;

import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;

import java.nio.charset.StandardCharsets;
import java.util.Random;

/**
 * 规范 + 高性能 + 生产可靠
 * MQTT v5 发布者(自动重连、心跳保活、无资源泄漏)
 */
public class MqttSensorPublisherReConnect {

    private static MqttClient client;
    private static MqttConnectionOptions options;

    private static final Random random = new Random();
    private static final int KEEP_ALIVE = 10;          // 心跳10秒(规范)
    private static final int PUB_INTERVAL = 2000;      // 发布间隔2秒
    private static final int RECONNECT_INTERVAL = 2000;// 重连间隔2秒(避免频繁攻击服务器)

    public static void main(String[] args) {
        startPublishLoop();
        simulateAbortDisconnect(); // 测试用:模拟异常断开
    }

    // ====================== 【规范】发布主循环(高性能、低CPU) ======================
    private static void startPublishLoop() {
        new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // 【规范】连接状态校验 + 自动重连
                    if (client == null || !client.isConnected()) {
                        closeClient();  // 【规范】先释放旧资源
                        connect();      // 重建连接
                    }

                    // 【规范】发布消息
                    publishSensorData();

                } catch (Exception e) {
                    System.err.println("【异常】连接异常,准备重连...");
                    client = null; // 标记重连
                }

                // 【性能】固定间隔,降低CPU占用
                sleep(PUB_INTERVAL);
            }
        }, "mqtt-publish-thread").start();
    }

    // ====================== 【规范】发布业务 ======================
    private static void publishSensorData() throws MqttException {
        String temp = "温度:" + String.format("%.1f", 20 + random.nextFloat() * 10);
        MqttMessage msg = new MqttMessage(temp.getBytes(StandardCharsets.UTF_8));
        msg.setQos(1);          // 【规范】工业级QoS
        msg.setRetained(false); // 【规范】传感器数据不保留
        client.publish(MqttConst.TOPIC_TEMP, msg);
        System.out.println("已发布:" + temp);
    }

    // ====================== 【规范】连接方法 ======================
    private static void connect() {
        try {
            client = new MqttClient(MqttConst.BROKER, MqttConst.PUBLISH_CLIENT_ID, new MemoryPersistence());
            options = new MqttConnectionOptions();

            // MQTT v5 标准连接配置
            options.setKeepAliveInterval(KEEP_ALIVE);
            options.setConnectionTimeout(5);
            options.setCleanStart(true);
            options.setUserName(MqttConst.USERNAME);
            options.setPassword(MqttConst.PASSWORD.getBytes(StandardCharsets.UTF_8));

            client.connect(options);
            System.out.println("【发布者】连接成功 ✅");

        } catch (Exception e) {
            System.err.println("连接失败," + (RECONNECT_INTERVAL / 1000) + "秒后重试...");
            sleep(RECONNECT_INTERVAL);
        }
    }

    // ====================== 【规范】关闭客户端(防止资源泄漏) ======================
    private static void closeClient() {
        if (client != null) {
            try {
                if (client.isConnected()) {
                    client.disconnect();
                }
                client.close();
            } catch (MqttException ignored) {}
        }
    }

    // ====================== 【测试】模拟异常断开 ======================
    private static void simulateAbortDisconnect() {
        new Thread(() -> {
            sleep(10000);
            System.err.println("\n===== 模拟异常断开 =====");
            try {
                if (client != null && client.isConnected()) {
                    client.disconnectForcibly(0);
                }
            } catch (Exception ignored) {}
        }, "mqtt-simulate-disconnect").start();
    }

    // ====================== 【工具】睡眠方法 ======================
    private static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

演示结果

img

核心点总结

  • 同步 API:MqttClient 是核心类,所有操作阻塞执行,简单易控
  • 连接配置:MqttConnectionOptions 管理心跳、超时、会话、认证
  • 发布:支持单条 / 批量、QoS0/1/2、保留消息
  • 订阅:支持单主题、多主题、通配符(+/#)、自定义 QoS
  • 回调:MqttCallback 处理连接丢失、消息接收、发布确认
  • 异常:捕获 MqttException,通过错误码定位问题
  • 重连:连接丢失后循环重连 + 重新订阅,保证服务可用性
最近更新: 2026/6/3 11:47
Contributors: kcnf
Prev
Paho拆解入门
Next
Paho拆解高性能