物联网 基于netty核心实战-握手与认证
简述
MQTT PUBLISH 报文 的二进制结构(固定头、可变头、有效载荷),并基于 Netty 实现一个支持 QoS 0 的发布/订阅 Broker
源码(netty-sample-05-auth)
https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-05
协议结构
| MQTT 报文结构 | Netty 中的对应 Java 类 | 关键字段/作用 |
|---|---|---|
| 固定报头 (Fixed Header) | MqttFixedHeader | 包含 messageType (如 CONNECT), qosLevel, isRetain 等标志位 |
| 可变报头 (Variable Header) | MqttConnectVariableHeader | 包含协议名、版本号、连接标志(如 Clean Session)、保活时间 |
| 有效载荷 (Payload) | MqttConnectPayload | 包含实际数据,如客户端ID、遗嘱消息、用户名和密码 |
| 完整连接请求 | MqttConnectMessage | 一个对象就包含了上述三部分,可以直接在处理器中使用 |
简易demo
服务端
package com.jysemel.iot;
import com.jysemel.iot.handler.MqttAuthHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
public class MqttAuthServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// (A) 使用 Netty 内置的 MqttDecoder
p.addLast("mqttDecoder", new MqttDecoder(1024));
// (B) 使用单例的 MqttEncoder
p.addLast("mqttEncoder", MqttEncoder.INSTANCE);
// (C) 实现我们自己的业务逻辑处理器
p.addLast("authHandler", new MqttAuthHandler());
}
});
ChannelFuture f = b.bind(1883).sync();
System.out.println("MQTT Auth Server started on port 1883");
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端
package com.jysemel.iot;
import com.jysemel.iot.handler.MqttAuthClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
public class MqttAuthClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast("mqttDecoder", new MqttDecoder(1024));
p.addLast("mqttEncoder", MqttEncoder.INSTANCE);
p.addLast("clientHandler", new MqttAuthClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 1883).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
