物联网 基于netty构建mqtt服务demo演示
简述
基于Netty实现轻量级MQTT协议,利用了Netty的异步、非阻塞特性,将MQTT协议的核心能力(连接管理、消息路由、心跳保活等)进行模块化构建
基于Netty的MQTT服务
- 连接接入:
通过EventLoopGroup(Boss & Worker)模型高效处理网络I/O,实现单机数万至百万连接的承载
- 协议编解码:
定义MqttDecoder/Encoder解析编码MQTT报文。Netty的ChannelPipeline可灵活配置编解码器处理CONTROL报文
- 核心处理:
在ChannelHandler中实现核心逻辑
- 会话与状态管理:
> 管理客户端连接(如心跳超时检测),维护订阅树和消息缓存(Pending Messages)
- 消息路由:
> 为PUBLISH/SUBSCRIBE报文,实现高性能通配符匹配,将消息高效投递至订阅者
- QoS保证:
> 支持MQTT服务质量等级,确保消息的可靠投递
- 保留与遗嘱消息:
> 实现对持久会话的断线重连自动恢复,以及保留/遗嘱消息的处理
- 存储扩展:
根据业务需求,往往需要将上述(会话、订阅、消息)数据进行持久化或使用高性能缓存(如Redis)
源码
netty-sample-01[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/netty/netty-sample-01]
添加pom依赖
<dependencies>
<!-- Netty 核心依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
项目目录结构

启动类 代码
package com.jysemel.iot;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MqttServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 启用TCP的KeepAlive,探测死连接
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加心跳检测:60秒无读事件触发IdleStateEvent
pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
// 添加MQTT解码器,指定最大报文长度为 65536 字节
pipeline.addLast("mqttDecoder", new MqttDecoder(64 * 1024));
// 添加MQTT编码器
pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE);
// 添加业务处理器
pipeline.addLast("mqttServerHandler", new MqttServerHandler());
}
});
// 绑定1883端口并启动
ChannelFuture channelFuture = serverBootstrap.bind(1883).sync();
System.out.println("🚀 Netty MQTT Broker 启动成功,端口: 1883");
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
测试结果

