物联网 基于netty控制报文结构(连接与握手)
简述
MQTT 协议中 CONNECT 和 CONNACK 报文的编解码,展示客户端如何发起连接请求、服务端如何解析并返回连接确认
源码(netty-sample-04-connect)
https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-04
核心概念
- CONNECT:客户端 → 服务端
告诉服务端:“我是谁(ClientId),心跳间隔多少,是否清理会话”
- CONNACK:服务端 → 客户端
回复:“连接成功(返回码0)” 或 失败原因(1~5)
简易代码
server代码
package com.jysemel.iot;
import com.jysemel.iot.handler.SimpleServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class SimpleConnectServer {
public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new SimpleServerHandler());
}
});
b.bind(1883).sync().channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
client代码
package com.jysemel.iot;
import com.jysemel.iot.handler.SimpleClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class SimpleConnectClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new SimpleClientHandler());
}
});
Channel ch = b.connect("127.0.0.1", 1883).sync().channel();
// 发送模拟的 CONNECT 字符串
ch.writeAndFlush("CONNECT:clientId=test,keepAlive=60,cleanSession=1\n");
ch.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
报文结构回顾
CONNECT(客户端 → 服务端)
| 固定头 | 可变头 | 有效载荷 |
|---|---|---|
| 类型=1,标志位=0,剩余长度(变长编码) | 协议名 "MQTT"(2字节长度+4字节数据)协议级别(1字节,MQTT 3.1.1 为 4) 连接标志(1字节) 保活时间(2字节) | 客户端标识符(UTF-8 字符串,必须) 遗嘱主题(可选) 遗嘱消息(可选) 用户名(可选) 密码(可选) |
CONNACK(服务端 → 客户端)
| 固定头 | 可变头 |
|---|---|
| 类型=2,标志位=0,剩余长度=2 | 连接确认标志(1字节,当前仅 bit0 = 会话存在标志) 返回码(1字节:0=成功,1=协议不可接受,2=标识符被拒绝,3=服务不可用,4=用户名/密码错误,5=未授权) |
剩余长度变长编码:每个字节最高位表示是否还有后续字节,低 7 位表示数值,最多 4 字节
有效载荷结构
在网络协议中,有效载荷 指的是报文中 真正要传输的业务数据,不包括协议本身的控制字段(如固定头、可变头、长度字段等
快递单 = 固定头 + 可变头(包含收件人地址、包裹重量、物流编号等控制信息)
包裹里的商品 = 有效载荷(你真正想寄给别人的东西)
真正 MQTT 二进制编码
最简 CONNECT(只包含 ClientId)的十六进制
固定头:0x10(类型1,标志0) + 剩余长度
可变头:
00 04 M Q T T(协议名) + 04(协议级别) +
02(连接标志:Clean Session=1) + 00 3C(Keep Alive=60秒)
有效载荷:00 04 t e s t(ClientId = "test")
部分源码
服务端
package com.jysemel.iot;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MqttConnectServer {
public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 2) return;
in.markReaderIndex();
byte first = in.readByte();
int type = (first >> 4) & 0x0F;
if (type != 1) { in.resetReaderIndex(); return; }
// 这里需要完整解码,为了方便直接调用上面的 decodeConnect 方法
// 实际应保存解码结果,但本例省略
System.out.println("收到 CONNECT,回复 CONNACK");
ctx.writeAndFlush(Unpooled.wrappedBuffer(buildConnack((byte)0)));
}
});
}
});
b.bind(1883).sync().channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
// 包含 buildConnack 方法
public static byte[] buildConnack(byte returnCode) {
ByteBuf buf = Unpooled.buffer();
buf.writeByte(0x20); // 类型2,flag=0
buf.writeByte(0x02); // 剩余长度=2
buf.writeByte(0x00); // 会话存在标志 = 0
buf.writeByte(returnCode); // 0=成功
byte[] result = new byte[buf.readableBytes()];
buf.readBytes(result);
buf.release();
return result;
}
}
客户端
package com.jysemel.iot;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.StandardCharsets;
public class MqttConnectClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
byte[] connect = buildMinimalConnect("testClient");
ctx.writeAndFlush(Unpooled.wrappedBuffer(connect));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
byte first = buf.readByte();
int type = (first >> 4) & 0x0F;
if (type == 2) {
buf.readByte(); // 剩余长度
byte session = buf.readByte();
byte code = buf.readByte();
System.out.println("CONNACK received, returnCode=" + code);
}
ctx.close();
}
});
}
});
b.connect("127.0.0.1", 1883).sync().channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
// 包含 buildMinimalConnect 方法
public static byte[] buildMinimalConnect(String clientId) {
ByteBuf buf = Unpooled.buffer();
// 固定头(先写类型,剩余长度占位)
buf.writeByte(0x10); // 类型1,flag=0
int lenPos = buf.writerIndex();
buf.writeByte(0); // 剩余长度占位
// 可变头
buf.writeShort(4); // 协议名长度
buf.writeBytes("MQTT".getBytes());
buf.writeByte(4); // 协议级别 4
buf.writeByte(0x02); // 连接标志:Clean Session=1,其他0
buf.writeShort(60); // Keep Alive 60秒
// 有效载荷:ClientId
byte[] idBytes = clientId.getBytes(StandardCharsets.UTF_8);
buf.writeShort(idBytes.length);
buf.writeBytes(idBytes);
// 计算并回填剩余长度(从可变头开始到结尾的字节数)
int remainingLen = buf.writerIndex() - (lenPos + 1);
buf.setByte(lenPos, remainingLen);
byte[] result = new byte[buf.readableBytes()];
buf.readBytes(result);
buf.release();
return result;
}
}
