砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
    • netty-mqtt-boot

      • 物联网实战

        • 模块化设计
        • 创建工程并引入
        • 编写可托管的 Netty 服务器启动器
        • 在 Pipeline 中添加 MQTT 编解码器
        • 实现 CONNECT 与 PUBLISH 基础响应
        • 内存中维护订阅-发布映射并转发消息
        • 集成 application.yml 管理 Netty 参数
        • 增加 REST API 监控连接与订阅状态
  • 物联网 基于netty核心实战-心跳保活机制
    • 简述
    • 源码(netty-sample-05-idle)
    • 为什么需要心跳保活机制
    • 双端超时机制的区别
    • 源码
      • server
      • client
      • 消息类型标记
    • 验证结果

物联网 基于netty核心实战-心跳保活机制

简述

心跳保活是MQTT连接的生命线——它并不是连接瞬间发起的,而是在整个长连接期间持续运行的后台守护机制

源码(netty-sample-05-idle)

https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-05

为什么需要心跳保活机制

MQTT 协议运行在 TCP/IP 之上,当网络连接长时间没有数据交互时,可能会被防火墙或中间网络设备断开,导致连接“假死”。心跳机制可以让客户端定期发送 PINGREQ 心跳包,服务端回复 PINGRESP,双方以此确认对方仍然存活

角色职责保活周期
客户端发送 PINGREQ 请求KeepAlive(如 60 秒),即客户端每 KeepAlive 秒主动发一次 PINGREQ
服务端响应 PINGRESP接收客户端任何报文(PUBLISH/PINGREQ等),若 1.5 倍 KeepAlive 内无任何报文则断开连接

双端超时机制的区别

  • 客户端按 KeepAlive 间隔主动发送 PINGREQ;若发送 PINGREQ 后一段时间内未收到 PINGRESP,则断开。
  • 服务端计算“从客户端收到的最近一次任何报文”(不限于 PINGREQ)至今的时间,若超过 1.5 倍 KeepAlive 则断开。

说明:KeepAlive 参数由客户端在 CONNECT 报文可变头中指定,服务端据此计算两种超时阈值

源码

server

package com.jysemel.iot;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MqttServer {
    private static final int PORT = 1883;
    private static final long DEFAULT_KEEP_ALIVE_SECONDS = 30;

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new MqttServerInitializer(DEFAULT_KEEP_ALIVE_SECONDS));

            ChannelFuture future = bootstrap.bind(PORT).sync();

            System.out.println("╔════════════════════════════════════════╗");
            System.out.println("║   MQTT 心跳保活服务端已启动           ║");
            System.out.println("║   监听端口: " + String.format("%-22d", PORT) + "║");
            System.out.println("║   默认KeepAlive: " + String.format("%-16d", DEFAULT_KEEP_ALIVE_SECONDS) + "║");
            System.out.println("╚════════════════════════════════════════╝");

            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("MQTT 服务端已关闭");
        }
    }
}

client

package com.jysemel.iot;

import com.jysemel.iot.handler.MqttHeartbeatClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class MqttHeartbeatClient {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 1883;
    private static final int KEEP_ALIVE_SECONDS = 10;

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast("mqttDecoder", new MqttDecoder(65536));
                            pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE);

                            pipeline.addLast("idleStateHandler",
                                    new IdleStateHandler(0, KEEP_ALIVE_SECONDS, 0, TimeUnit.SECONDS));

                            pipeline.addLast("clientHandler", new MqttHeartbeatClientHandler(KEEP_ALIVE_SECONDS));
                        }
                    });

            ChannelFuture future = bootstrap.connect(HOST, PORT).sync();

            System.out.println("╔════════════════════════════════════════╗");
            System.out.println("║   MQTT 心跳客户端已启动               ║");
            System.out.println("║   服务器: " + String.format("%-22s", HOST + ":" + PORT) + "║");
            System.out.println("║   KeepAlive: " + String.format("%-19d", KEEP_ALIVE_SECONDS) + "║");
            System.out.println("║   说明: 每" + KEEP_ALIVE_SECONDS + "秒发送一次PINGREQ              ║");
            System.out.println("╚════════════════════════════════════════╝");

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
            System.out.println("MQTT 客户端已关闭");
        }
    }
}

消息类型标记

private void handlePingReq(ChannelHandlerContext ctx) {
    System.out.println(">>> 收到 PINGREQ,回复 PINGRESP");

    MqttFixedHeader pingRespHeader = new MqttFixedHeader(
            MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
    ctx.writeAndFlush(new MqttMessage(pingRespHeader));
}
if (event.state() == IdleState.WRITER_IDLE && isConnected) {
    pingCount++;
    System.out.println(">>> [" + pingCount + "] 写空闲超时,发送 PINGREQ");

    MqttFixedHeader pingReqHeader = new MqttFixedHeader(
            MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
    ctx.writeAndFlush(new MqttMessage(pingReqHeader));
}

验证结果

img

img

最近更新: 2026/5/18 11:06
Contributors: kcnf
Prev
核心实战(握手与认证)
Next
核心实战(会话管理)