砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
    • netty-mqtt-boot

      • 物联网实战

        • 模块化设计
        • 创建工程并引入
        • 编写可托管的 Netty 服务器启动器
        • 在 Pipeline 中添加 MQTT 编解码器
        • 实现 CONNECT 与 PUBLISH 基础响应
        • 内存中维护订阅-发布映射并转发消息
        • 集成 application.yml 管理 Netty 参数
        • 增加 REST API 监控连接与订阅状态
  • 物联网 基于netty核心实战-会话管理
    • 简述
    • 源码(netty-sample-05-session)
    • 核心概念
    • 会话管理 vs 心跳保活
    • 协同工作
    • 直白的理解
    • 源码
      • server
      • client

物联网 基于netty核心实战-会话管理

简述

MQTT 的会话(Session)是服务端为每个客户端维护的状态信息,用于支持离线消息和持久订阅

源码(netty-sample-05-session)

https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-05

核心概念

  • 会话是服务端为某个客户端存储的状态集合
客户端的订阅列表(主题 + QoS)
未完成的消息(QoS 1/2 的待确认消息)
客户端是否在线(用于离线消息存储)

会话管理 vs 心跳保活

会话管理和心跳保活是 MQTT 协议中两个互补但完全不同的机制,它们共同保障了物联网通信的可靠性与效率

维度会话管理 (Session Management)心跳保活 (Keep Alive / PING)
目标解决“断线后如何记住状态”解决“如何及时发现断线”
作用时间跨连接(从一次连接到下一次重连)单次连接期间
存储内容订阅列表、离线消息、未完成 QoS 状态无状态,只传递“我还活着”的信号
触发方式CONNECT 报文中的 Clean Session 标志决定是否持久化客户端定时发送 PINGREQ,服务端回复 PINGRESP
断线后若 Clean Session=0,Broker 保留会话数据;若=1,立即清除心跳停止,Broker 会在一段时间后判定连接死亡
重连后若 Clean Session=0,Broker 恢复会话(订阅、离线消息等)心跳计时器重置,从零开始继续保活

协同工作

┌─────────────────────────────────────────────────────────┐
│                    MQTT 连接生命周期                       │
├─────────────────────────────────────────────────────────┤
│  连接建立  ──▶  正常通信  ──▶  断线  ──▶  重连           │
│      │           │             │           │            │
│      ▼           ▼             ▼           ▼            │
│  会话加载   心跳保活维持     会话保存    会话恢复         │
│  (Clean      (PINGREQ/       (订阅/离     (离线消息       │
│   Session)    PINGRESP)       线消息)      补发)          │
└─────────────────────────────────────────────────────────┘

直白的理解

时间轴 →

第一次连接:
  你走进银行 → 柜员问“要办什么?”你说“存定期” → 银行记下(会话创建)
  → 办理中,柜员每隔30秒问“还在吗?”你答“在”(心跳维持)
  → 办完离开(断开连接)

离店期间:
  银行保留你的“存定期”偏好(会话保存)
  有人给你汇款,银行暂存(离线消息存储)

第二次连接:
  你再次走进银行 → 柜员调出记录说“记得您,还是存定期吗?”(会话恢复)
  → 柜员把暂存的汇款给你(离线消息推送)
  → 重新开始每隔30秒问“还在吗?”(心跳重置)

源码

server

package com.jysemel.iot;

import com.jysemel.iot.handler.MqttSessionHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;

public class MqttSessionServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast("decoder", new MqttDecoder(65536));
                            p.addLast("encoder", MqttEncoder.INSTANCE);
                            p.addLast("handler", new MqttSessionHandler());
                        }
                    });
            b.bind(1883).sync().channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

client

package com.jysemel.iot;

import com.jysemel.iot.handler.TestMqttClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.*;

import java.nio.charset.StandardCharsets;

public class TestMqttClient {
    private Channel channel;
    private String clientId;
    private int messageId = 1;

    public void connect(String host, int port, String clientId, boolean cleanSession) throws Exception {
        this.clientId = clientId;
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast("decoder", new MqttDecoder(65536));
                        p.addLast("encoder", MqttEncoder.INSTANCE);
                        p.addLast("handler", new TestMqttClientHandler());
                    }
                });
        ChannelFuture f = b.connect(host, port).sync();
        channel = f.channel();

        // 发送 CONNECT 报文
        MqttConnectMessage connectMsg = MqttMessageBuilders.connect()
                .clientId(clientId)
                .cleanSession(cleanSession)
                .keepAlive(60)
                .protocolVersion(MqttVersion.MQTT_3_1_1)
                .build();
        channel.writeAndFlush(connectMsg);
        System.out.println("CONNECT sent, cleanSession=" + cleanSession);
    }

    public void subscribe(String topic, int qos) {
        MqttSubscribeMessage subscribeMsg = MqttMessageBuilders.subscribe()
                .messageId(messageId++)
                .addSubscription(MqttQoS.valueOf(qos), topic)
                .build();
        channel.writeAndFlush(subscribeMsg);
        System.out.println("SUBSCRIBE sent, topic=" + topic + ", qos=" + qos);
    }

    public void publish(String topic, String message, int qos) {
        MqttPublishMessage publishMsg = MqttMessageBuilders.publish()
                .topicName(topic)
                .qos(MqttQoS.valueOf(qos))
                .messageId(messageId++)
                .payload(Unpooled.copiedBuffer(message.getBytes(StandardCharsets.UTF_8)))
                .build();
        channel.writeAndFlush(publishMsg);
        System.out.println("PUBLISH sent, topic=" + topic + ", message=" + message);
    }

    public void disconnect() {
        MqttMessage disconnectMsg = MqttMessageBuilders.disconnect().build();
        channel.writeAndFlush(disconnectMsg);
        System.out.println("DISCONNECT sent");
        channel.close();
    }

    public static void main(String[] args) throws Exception {
        TestMqttClient client = new TestMqttClient();

        // 示例:使用不同的 clientId 和 cleanSession 组合来演示会话管理
        String clientId = "client-001";
        boolean cleanSession = false; // 设置为 false 演示持久会话

        client.connect("127.0.0.1", 1883, clientId, cleanSession);
        Thread.sleep(1000); // 等待连接建立

        // 订阅主题
        client.subscribe("test/topic", 0);
        Thread.sleep(500);

        // 发布消息
        client.publish("test/topic", "Hello Session Management!", 0);
        Thread.sleep(500);

        // 断开连接
        client.disconnect();
        System.exit(0);
    }
}

最近更新: 2026/5/18 11:53
Contributors: kcnf
Prev
核心实战(心跳保活机制)