砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
    • netty-mqtt-boot

      • 物联网实战

        • 模块化设计
        • 创建工程并引入
        • 编写可托管的 Netty 服务器启动器
        • 在 Pipeline 中添加 MQTT 编解码器
        • 实现 CONNECT 与 PUBLISH 基础响应
        • 内存中维护订阅-发布映射并转发消息
        • 集成 application.yml 管理 Netty 参数
        • 增加 REST API 监控连接与订阅状态
  • 物联网 基于netty控制报文结构(连接与握手)
    • 简述
    • 源码(netty-sample-04-connect)
    • 核心概念
    • 简易代码
      • server代码
      • client代码
    • 报文结构回顾
      • CONNECT(客户端 → 服务端)
      • CONNACK(服务端 → 客户端)
      • 有效载荷结构
    • 真正 MQTT 二进制编码
    • 部分源码
      • 服务端
      • 客户端

物联网 基于netty控制报文结构(连接与握手)

简述

MQTT 协议中 CONNECT 和 CONNACK 报文的编解码,展示客户端如何发起连接请求、服务端如何解析并返回连接确认

源码(netty-sample-04-connect)

https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-04

核心概念

  • CONNECT:客户端 → 服务端

告诉服务端:“我是谁(ClientId),心跳间隔多少,是否清理会话”

  • CONNACK:服务端 → 客户端

回复:“连接成功(返回码0)” 或 失败原因(1~5)

简易代码

server代码

package com.jysemel.iot;

import com.jysemel.iot.handler.SimpleServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleConnectServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new SimpleServerHandler());
                        }
                    });
            b.bind(1883).sync().channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

client代码

package com.jysemel.iot;

import com.jysemel.iot.handler.SimpleClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleConnectClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new SimpleClientHandler());
                        }
                    });
            Channel ch = b.connect("127.0.0.1", 1883).sync().channel();
            // 发送模拟的 CONNECT 字符串
            ch.writeAndFlush("CONNECT:clientId=test,keepAlive=60,cleanSession=1\n");
            ch.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

报文结构回顾

CONNECT(客户端 → 服务端)

固定头可变头有效载荷
类型=1,标志位=0,剩余长度(变长编码)协议名 "MQTT"(2字节长度+4字节数据)
协议级别(1字节,MQTT 3.1.1 为 4)
连接标志(1字节)
保活时间(2字节)
客户端标识符(UTF-8 字符串,必须)
遗嘱主题(可选)
遗嘱消息(可选)
用户名(可选)
密码(可选)

CONNACK(服务端 → 客户端)

固定头可变头
类型=2,标志位=0,剩余长度=2连接确认标志(1字节,当前仅 bit0 = 会话存在标志)
返回码(1字节:0=成功,1=协议不可接受,2=标识符被拒绝,3=服务不可用,4=用户名/密码错误,5=未授权)

剩余长度变长编码:每个字节最高位表示是否还有后续字节,低 7 位表示数值,最多 4 字节

有效载荷结构

在网络协议中,有效载荷 指的是报文中 真正要传输的业务数据,不包括协议本身的控制字段(如固定头、可变头、长度字段等

快递单 = 固定头 + 可变头(包含收件人地址、包裹重量、物流编号等控制信息)
包裹里的商品 = 有效载荷(你真正想寄给别人的东西)

真正 MQTT 二进制编码

最简 CONNECT(只包含 ClientId)的十六进制

固定头:0x10(类型1,标志0) + 剩余长度
可变头:
    00 04 M Q T T(协议名) + 04(协议级别) + 
        02(连接标志:Clean Session=1) + 00 3C(Keep Alive=60秒)
有效载荷:00 04 t e s t(ClientId = "test")

部分源码

服务端

package com.jysemel.iot;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;


public class MqttConnectServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, worker).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new ByteToMessageDecoder() {
                                @Override
                                protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
                                    if (in.readableBytes() < 2) return;
                                    in.markReaderIndex();
                                    byte first = in.readByte();
                                    int type = (first >> 4) & 0x0F;
                                    if (type != 1) { in.resetReaderIndex(); return; }
                                    // 这里需要完整解码,为了方便直接调用上面的 decodeConnect 方法
                                    // 实际应保存解码结果,但本例省略
                                    System.out.println("收到 CONNECT,回复 CONNACK");
                                    ctx.writeAndFlush(Unpooled.wrappedBuffer(buildConnack((byte)0)));
                                }
                            });
                        }
                    });
            b.bind(1883).sync().channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    // 包含 buildConnack 方法
    public static byte[] buildConnack(byte returnCode) {
        ByteBuf buf = Unpooled.buffer();
        buf.writeByte(0x20);        // 类型2,flag=0
        buf.writeByte(0x02);        // 剩余长度=2
        buf.writeByte(0x00);        // 会话存在标志 = 0
        buf.writeByte(returnCode);  // 0=成功
        byte[] result = new byte[buf.readableBytes()];
        buf.readBytes(result);
        buf.release();
        return result;
    }
}

客户端

package com.jysemel.iot;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.StandardCharsets;

public class MqttConnectClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) {
                                    byte[] connect = buildMinimalConnect("testClient");
                                    ctx.writeAndFlush(Unpooled.wrappedBuffer(connect));
                                }
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                    ByteBuf buf = (ByteBuf) msg;
                                    byte first = buf.readByte();
                                    int type = (first >> 4) & 0x0F;
                                    if (type == 2) {
                                        buf.readByte(); // 剩余长度
                                        byte session = buf.readByte();
                                        byte code = buf.readByte();
                                        System.out.println("CONNACK received, returnCode=" + code);
                                    }
                                    ctx.close();
                                }
                            });
                        }
                    });
            b.connect("127.0.0.1", 1883).sync().channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }



    // 包含 buildMinimalConnect 方法
    public static byte[] buildMinimalConnect(String clientId) {
        ByteBuf buf = Unpooled.buffer();
        // 固定头(先写类型,剩余长度占位)
        buf.writeByte(0x10);           // 类型1,flag=0
        int lenPos = buf.writerIndex();
        buf.writeByte(0);              // 剩余长度占位

        // 可变头
        buf.writeShort(4);             // 协议名长度
        buf.writeBytes("MQTT".getBytes());
        buf.writeByte(4);              // 协议级别 4
        buf.writeByte(0x02);           // 连接标志:Clean Session=1,其他0
        buf.writeShort(60);            // Keep Alive 60秒

        // 有效载荷:ClientId
        byte[] idBytes = clientId.getBytes(StandardCharsets.UTF_8);
        buf.writeShort(idBytes.length);
        buf.writeBytes(idBytes);

        // 计算并回填剩余长度(从可变头开始到结尾的字节数)
        int remainingLen = buf.writerIndex() - (lenPos + 1);
        buf.setByte(lenPos, remainingLen);

        byte[] result = new byte[buf.readableBytes()];
        buf.readBytes(result);
        buf.release();
        return result;
    }
}

最近更新: 2026/5/17 14:22
Contributors: kcnf
Prev
控制报文结构(报文分类)
Next
控制报文结构(发布与接收)