砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • 环境配置
    • 微信生态
    • 正则
    • 全栈技能
  • java圈

    • java基础
    • jdk体系
    • jvm
    • spring框架
    • 分库分表
    • zookeeper
  • python技能

    • python编程
    • python数据
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • devops
  • 元宇宙
  • 区块链
  • 物联网
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • 环境配置
    • 微信生态
    • 正则
    • 全栈技能
  • java圈

    • java基础
    • jdk体系
    • jvm
    • spring框架
    • 分库分表
    • zookeeper
  • python技能

    • python编程
    • python数据
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • devops
  • 元宇宙
  • 区块链
  • 物联网
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
        • 核心实战(安全)
    • mqtt-模拟器|客户端

      • 集成Paho

        • 设备模拟器设计
        • 设备模拟器演示
        • Paho拆解入门
        • Paho拆解核心
        • Paho拆解高性能
        • 其他客户端框架比较
      • NetAssist

        • 设备模拟器设计
      • hiveMq

        • hiveMq客户端
    • netty-mqtt-boot

      • 模块化设计
      • 统一接入层
      • 消息路由与流转层
      • 核心服务层
      • 业务应用层
      • 整体项目管理
      • 测试脚手架
      • 兼容支持
    • mqtt-压测

      • mqtt-jmeter

        • 模块化设计
      • ‌emqtt-bench

        • 模块化设计
    • mqtt-规则引擎

      • MQTT规则引擎
      • sql

        • 基于sql规则
      • ice

        • 模块化设计
      • Aviator

        • 模块化设计
      • Drools

        • 模块化设计
    • mqtt-实战问题

      • MQTT分布式集群

        • 分布式集群

          • 分布式集群意义
          • 如何构建分布式集群
          • 分布式集群简易版
          • mqtt入口HAProxy模拟负载
        • ignite

          • ignite入门
          • 模块化设计
      • 实战汇总

        • 分布式集群意义
  • mqtt-netty源码拆解

    • mqtt-netty源码拆解

      • Pipeline双向链表
      • MqttEncoder.INSTANCE
      • 分布式集群简易版
      • mqtt入口HAProxy模拟负载
  • 物联网实战|Spring Boot + hiveMQ搭建MQTT客户端 | MQTT 设备模拟器
    • 源码(mqtt-simulator-hivemq-00)
    • 添加 HiveMQ MQTT 客户端的核心依赖
    • 配置 MQTT 客户端
    • 发布消息
    • 订阅消息
    • 启动
    • 总结
      • 客户端ID唯一性
      • 连接可靠性
      • API选择
      • 线程模型
    • 验证结果

物联网实战|Spring Boot + hiveMQ搭建MQTT客户端 | MQTT 设备模拟器

HiveMQ 客户端提供了三种 API 风格:阻塞(Blocking)、异步(Async)和响应式(Reactor)

源码(mqtt-simulator-hivemq-00)

https://gitee.com/kcnf-iot/mqtt-simulator

添加 HiveMQ MQTT 客户端的核心依赖

<dependency>
    <groupId>com.hivemq</groupId>
    <artifactId>hivemq-mqtt-client</artifactId>
    <version>1.3.10</version>
</dependency>

配置 MQTT 客户端

package com.jysemel.iot.client;

import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.UUID;

@Slf4j
@Configuration
public class MqttConfig {

    @Bean
    public Mqtt5AsyncClient mqtt5AsyncClient() {
        // 使用 Builder 模式创建异步客户端
        Mqtt5AsyncClient client = Mqtt5Client.builder()
                .identifier("client" + UUID.randomUUID()) // 设置客户端ID,需唯一
                .serverHost("127.0.0.1") // Broker 地址
                .serverPort(1883) // 默认非加密端口
                .buildAsync(); // 构建异步客户端

        // 连接到 Broker
        client.connect()
                .whenComplete((connAck, throwable) -> {
                    if (throwable != null) {
                        System.err.println("MQTT 连接失败: " + throwable.getMessage());
                        // 可以在这里添加重试逻辑
                    } else {
                        System.out.println("MQTT 连接成功!");
                    }
                });
        return client;
    }
}

发布消息

package com.jysemel.iot.client;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

@Service
public class MqttPublishService {

    @Autowired
    private Mqtt5AsyncClient mqttClient;


    public void publish(String topic, String payload) {
        mqttClient.publishWith()
                .topic(topic)
                .qos(MqttQos.AT_LEAST_ONCE) // QoS 1: 至少一次
                .payload(payload.getBytes(StandardCharsets.UTF_8))
                .send()
                .whenComplete((publish, throwable) -> {
                    if (throwable != null) {
                        System.err.println("消息发布失败: " + throwable.getMessage());
                    } else {
                        System.out.println("消息发布成功: " + payload);
                    }
                });
    }
}

订阅消息

package com.jysemel.iot.client;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

@Service
public class MqttSubscribeService {


    @Autowired
    private Mqtt5AsyncClient mqttClient;


    // 使用 @PostConstruct 在 Bean 初始化后自动订阅
    @PostConstruct
    public void subscribe() {
        mqttClient.subscribeWith()
                .topicFilter("test/topic") // 订阅的主题
                .qos(MqttQos.AT_LEAST_ONCE)
                .callback(publish -> {
                    // 处理收到的消息
                    String payload = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
                    System.out.println("收到消息,主题: " + publish.getTopic() + ", 内容: " + payload);
                })
                .send()
                .whenComplete((subAck, throwable) -> {
                    if (throwable != null) {
                        System.err.println("订阅失败: " + throwable.getMessage());
                    } else {
                        System.out.println("订阅主题 'test/topic' 成功!");
                    }
                });
    }
}

启动

package com.jysemel.iot;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MqttDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqttDemoApplication.class, args);
    }
}

总结

客户端ID唯一性

确保每个连接到 Broker 的客户端ID都是唯一的,否则连接会被拒绝

连接可靠性

生产环境中,务必处理连接失败、断开重连等异常情况。HiveMQ 客户端内置了自动重连功能,你可以通过 .automaticReconnectWithDefaultConfig() 等方法进行配置

API选择

  • 阻塞(Blocking):适用于简单场景,但会阻塞当前线程
  • 异步(Async):基于 CompletableFuture,适合大多数 Spring Boot 应用
  • 响应式(Reactive):如果你使用 Project Reactor(如 Spring WebFlux),可以选择此 API 以获得更好的流式处理能力

线程模型

HiveMQ 客户端基于 Netty,其回调方法通常在 Netty 的 IO 线程中执行。因此,不要在回调方法中执行耗时操作,应将其提交到业务线程池处理,以避免阻塞 IO 线程

验证结果

img

img

img

最近更新: 2026/6/27 11:05
Contributors: kcnf