砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
        • 核心实战(安全)
    • mqtt-模拟器

      • 集成Paho

        • 设备模拟器设计
        • 设备模拟器演示
        • Paho拆解入门
        • Paho拆解核心
        • Paho拆解高性能
        • 其他客户端框架比较
      • NetAssist

        • 设备模拟器设计
    • netty-mqtt-boot

      • 模块化设计
      • 物联网一体化设计
      • 统一接入层
      • 消息路由与流转层
      • 核心服务层
      • 业务应用层
      • 整体项目管理
      • 测试脚手架
      • 兼容支持
    • mqtt-压测

      • mqtt-jmeter

        • 模块化设计
      • ‌emqtt-bench

        • 模块化设计
    • mqtt-规则引擎

      • MQTT规则引擎
      • ice

        • 模块化设计
      • Aviator

        • 模块化设计
      • Drools

        • 模块化设计
  • 物联网实战:Spring Boot MQTT | 模拟器Paho客户端sdk入门
    • 源码(mqtt-simulator-sample00)
    • 搭载 emqx 公共免费的broker
    • 相关依赖
    • 通用约定
    • 消息下发
    • 消息订阅
    • 注意事项
    • 演示结果

物联网实战:Spring Boot MQTT | 模拟器Paho客户端sdk入门

IoT 设备模拟器 - 支持 MQTT、TCP、UDP、CoAP、HTTP 多种协议

源码(mqtt-simulator-sample00)

https://gitee.com/kcnf-iot/mqtt-simulator

搭载 emqx 公共免费的broker

https://www.emqx.com/zh/mqtt/public-mqtt5-broker

相关依赖

<!-- MQTT Client -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>

通用约定

package com.jysemel.iot;

public class ClientId {

    public static String clientId_1 = "jysemel-test-0001";
    public static String clientId_2 = "jysemel-test-0002";

    public static String topic = "demo-test-hello";

    // 免费的公共broker服务
    public static String brokerUrl = "tcp://broker.emqx.io:1883";
}

消息下发

package com.jysemel.iot;

import lombok.SneakyThrows;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttMessage;

/**
 * MQTT 5.0 消息发布示例类
 * 用于演示如何连接到 MQTT Broker 并发布消息
 */
public class MqttPublishDemo {

    @SneakyThrows
    public static void main(String[] args) {
        // 获取固定的客户端ID
        String clientId = ClientId.clientId_1;
        MqttClient client = null;
        
        try {
            // 创建 MQTT 客户端实例
            client = new MqttClient(ClientId.brokerUrl, clientId);

            // 配置 MQTT 连接选项
            MqttConnectionOptions options = new MqttConnectionOptions();
            // 设置为全新会话,不保留历史数据
            options.setCleanStart(true);
            // 启用自动重连功能
            options.setAutomaticReconnect(true);
            // 连接超时时间设置为10秒
            options.setConnectionTimeout(10);
            // 心跳间隔设置为20秒
            options.setKeepAliveInterval(20);

            // 连接到 Broker
            client.connect(options);
            System.out.println("已连接: " + clientId);
            Thread.sleep(5000);
            // 循环发布5条消息
            for (int i = 0; i < 5; i++) {
                // 构造消息内容,包含序号和时间戳
                String payload = "消息 " + i + " 时间: " + System.currentTimeMillis();
                // 创建 MQTT 消息对象
                MqttMessage msg = new MqttMessage(payload.getBytes());
                // 设置消息服务质量等级为1(至少送达一次)
                msg.setQos(1);
                // 发布消息到指定主题
                client.publish(ClientId.topic, msg);
                System.out.println("已发布: " + payload);
                // 暂停1秒后发布下一条
                Thread.sleep(1000);
            }

            // 断开与 Broker 的连接
            client.disconnect();
            System.out.println("已断开连接");

        } finally {
            // 确保资源被正确释放
            if (client != null) {
                client.close();
            }
        }
    }
}

消息订阅

package com.jysemel.iot;

import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

import java.nio.charset.StandardCharsets;

/**
 * MQTT 5.0 消息订阅示例类
 * 实现了 MqttCallback 接口,用于接收和处理 MQTT 消息
 */
public class MqttSubscribeDemo implements MqttCallback {

    // MQTT 客户端实例
    private final MqttClient client;

    /**
     * 构造函数
     * @param broker MQTT Broker 地址
     * @param clientId 客户端ID
     * @throws MqttException MQTT异常
     */
    public MqttSubscribeDemo(String broker, String clientId) throws MqttException {
        // 创建 MQTT 客户端实例
        this.client = new MqttClient(broker, clientId);
        // 将当前类注册为回调处理器
        this.client.setCallback(this);
    }

    /**
     * 启动订阅服务
     * @throws Exception 异常
     */
    public void start() throws Exception {
        // 配置连接选项
        MqttConnectionOptions options = new MqttConnectionOptions();
        // 设置为全新会话
        options.setCleanStart(true);
        options.setAutomaticReconnect(true); // v5 自动重连

        System.out.println("正在连接到 Broker...");
        // 连接到 Broker
        client.connect(options);
        System.out.println("已连接,客户端ID: " + client.getClientId());

        // 订阅主题(QoS 1)
        client.subscribe(ClientId.topic, 1);
        System.out.println("已订阅主题: " + ClientId.topic);
    }

    /**
     * 停止订阅服务并释放资源
     */
    public void stop() {
        try {
            // 如果仍连接,先断开
            if (client.isConnected()) {
                client.disconnect();
                System.out.println("已断开连接。");
            }
            // 关闭客户端,释放资源
            client.close();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    // ===== MqttCallback 实现 =====

    /**
     * 消息到达回调
     * @param topic 消息主题
     * @param message 消息对象
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // 将消息字节数组转换为字符串
        String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
        // 打印接收到的消息信息
        System.out.printf("📩 收到主题 %s 的消息: %s (服务质量: %d)%n", topic, payload, message.getQos());
    }

    /**
     * 断开连接回调
     * @param disconnectResponse 断开连接响应
     */
    @Override
    public void disconnected(MqttDisconnectResponse disconnectResponse) {
        System.out.println("⚠️ 已断开连接: " + disconnectResponse.getReturnCode());
    }

    /**
     * MQTT 错误发生回调
     * @param exception MQTT 异常
     */
    @Override
    public void mqttErrorOccurred(MqttException exception) {
        System.err.println("❌ 错误: " + exception.getMessage());
    }

    /**
     * 消息发布完成回调
     * @param token 发布令牌
     */
    @Override
    public void deliveryComplete(IMqttToken token) {
        // 发布确认(仅当使用异步发布时才会触发)
    }

    /**
     * 连接完成回调
     * @param reconnect 是否是重连
     * @param serverURI 服务器URI
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        System.out.println("🔄 连接完成。是否重连: " + reconnect);
    }

    /**
     * 认证数据包到达回调
     * @param reasonCode 原因码
     * @param properties 属性
     */
    @Override
    public void authPacketArrived(int reasonCode, MqttProperties properties) {
        // 扩展认证(一般不用)
    }

    // ===== 主程序入口 =====
    public static void main(String[] args) throws Exception {
        // Broker 地址
        String broker = ClientId.brokerUrl;
        // 使用固定的客户端ID
        String clientId = ClientId.clientId_2;
        // 创建订阅示例实例
        MqttSubscribeDemo demo = new MqttSubscribeDemo(broker, clientId);
        // 启动订阅
        demo.start();
        System.out.println("正在监听消息... (按回车键退出)");
        System.in.read();           // 阻塞,等待用户按键
        // 停止订阅
        demo.stop();
    }
}

注意事项

  • topic和clientID 规范

jysemel-test-0001 demo-test-hello

  • Publish和Subscribe

模拟不同的两个总端通信,所以clientID保持不一样

演示结果

imgimg

最近更新: 2026/6/3 11:47
Contributors: kcnf
Prev
设备模拟器演示
Next
Paho拆解核心