物联网实战|Spring Boot + hiveMQ搭建MQTT客户端 | MQTT 设备模拟器
HiveMQ 客户端提供了三种 API 风格:阻塞(Blocking)、异步(Async)和响应式(Reactor)
源码(mqtt-simulator-hivemq-00)
https://gitee.com/kcnf-iot/mqtt-simulator
添加 HiveMQ MQTT 客户端的核心依赖
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.3.10</version>
</dependency>
配置 MQTT 客户端
package com.jysemel.iot.client;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
@Slf4j
@Configuration
public class MqttConfig {
@Bean
public Mqtt5AsyncClient mqtt5AsyncClient() {
// 使用 Builder 模式创建异步客户端
Mqtt5AsyncClient client = Mqtt5Client.builder()
.identifier("client" + UUID.randomUUID()) // 设置客户端ID,需唯一
.serverHost("127.0.0.1") // Broker 地址
.serverPort(1883) // 默认非加密端口
.buildAsync(); // 构建异步客户端
// 连接到 Broker
client.connect()
.whenComplete((connAck, throwable) -> {
if (throwable != null) {
System.err.println("MQTT 连接失败: " + throwable.getMessage());
// 可以在这里添加重试逻辑
} else {
System.out.println("MQTT 连接成功!");
}
});
return client;
}
}
发布消息
package com.jysemel.iot.client;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@Service
public class MqttPublishService {
@Autowired
private Mqtt5AsyncClient mqttClient;
public void publish(String topic, String payload) {
mqttClient.publishWith()
.topic(topic)
.qos(MqttQos.AT_LEAST_ONCE) // QoS 1: 至少一次
.payload(payload.getBytes(StandardCharsets.UTF_8))
.send()
.whenComplete((publish, throwable) -> {
if (throwable != null) {
System.err.println("消息发布失败: " + throwable.getMessage());
} else {
System.out.println("消息发布成功: " + payload);
}
});
}
}
订阅消息
package com.jysemel.iot.client;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@Service
public class MqttSubscribeService {
@Autowired
private Mqtt5AsyncClient mqttClient;
// 使用 @PostConstruct 在 Bean 初始化后自动订阅
@PostConstruct
public void subscribe() {
mqttClient.subscribeWith()
.topicFilter("test/topic") // 订阅的主题
.qos(MqttQos.AT_LEAST_ONCE)
.callback(publish -> {
// 处理收到的消息
String payload = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
System.out.println("收到消息,主题: " + publish.getTopic() + ", 内容: " + payload);
})
.send()
.whenComplete((subAck, throwable) -> {
if (throwable != null) {
System.err.println("订阅失败: " + throwable.getMessage());
} else {
System.out.println("订阅主题 'test/topic' 成功!");
}
});
}
}
启动
package com.jysemel.iot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MqttDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MqttDemoApplication.class, args);
}
}
总结
客户端ID唯一性
确保每个连接到 Broker 的客户端ID都是唯一的,否则连接会被拒绝
连接可靠性
生产环境中,务必处理连接失败、断开重连等异常情况。HiveMQ 客户端内置了自动重连功能,你可以通过 .automaticReconnectWithDefaultConfig() 等方法进行配置
API选择
- 阻塞(Blocking):适用于简单场景,但会阻塞当前线程
- 异步(Async):基于 CompletableFuture,适合大多数 Spring Boot 应用
- 响应式(Reactive):如果你使用 Project Reactor(如 Spring WebFlux),可以选择此 API 以获得更好的流式处理能力
线程模型
HiveMQ 客户端基于 Netty,其回调方法通常在 Netty 的 IO 线程中执行。因此,不要在回调方法中执行耗时操作,应将其提交到业务线程池处理,以避免阻塞 IO 线程
验证结果



