砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 基于netty构建入门
      • 理解粘包/拆包
      • 编解码器机制与自定义协议
      • 心跳和ack机制
      • mqtt服务demo演示
      • mqtt服务协议支持
      • mqtt服务udp支持
      • mqtt协议规范(发布/订阅模式)
      • mqtt协议规范(轻量级二进制协议)
      • mqtt协议规范(三种 QoS 等级)
      • mqtt协议规范(主题通配符订阅)
      • mqtt协议规范(遗嘱与保留消息)
  • 物联网 基于netty心跳和ack机制
    • 编解码器
    • 源码
    • 心跳机制原理与配置
    • 客户端 vs 服务端的心跳职责对比
    • ACK 确认机制
      • ACK 机制的核心组件
      • server 端
      • client 端

物联网 基于netty心跳和ack机制

编解码器

TCP 长连接在物理断开后,双方无法主动感知连接状态,导致大量无效连接占用服务器资源,称为"假死"

Netty 的心跳机制核心是 IdleStateHandler:
它基于定时任务与事件驱动模型,
当 Channel 在指定时间内没有执行读、写操作时,触发 IdleStateEvent,
由业务层决定发送心跳包或关闭连接

源码

netty-sample-00[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/netty/netty-sample-00]

心跳机制原理与配置

IdleStateHandler 的构造函数参数,可按需将不关注的参数设为 0

参数含义触发条件
readerIdleTime读空闲时间指定时间内未收到数据(即未调用 channelRead())
writerIdleTime写空闲时间指定时间内未发送数据(即未调用 write() 或 writeAndFlush())
allIdleTime全空闲时间指定时间内既无读也无写

客户端 vs 服务端的心跳职责对比

角色配置职责处理逻辑
客户端new IdleStateHandler(0, 30, 0)主动发送心跳包WRITER_IDLE 时发送 PING,收到 PONG 后认为连接正常
服务端new IdleStateHandler(45, 0, 0)检测客户端是否存活READER_IDLE 累计 2~3 次无数据则关闭连接
心跳间隔建议 15–30 秒,读空闲超时设为心跳间隔的 2–3 倍(45–90 秒),
可结合随机抖动避免大量连接同时发送心跳。
超时时间需根据实际网络 RTT 调整,
建议公式为「典型 RTT × 3 + 心跳间隔 × 2」,避免频繁误触发

ACK 确认机制

TCP 保证传输到达对端操作系统缓冲区,但不保证业务层已处理消息。ACK 机制需要在应用层实现消息确认,确保消息真正被处理

┌────────┐         ┌────────┐         ┌────────┐
│ Client │         │ Server │         │ Peer   │
└───┬────┘         └───┬────┘         └───┬────┘
    │ ─────业务消息────→ │                  │
    │ ←─────ACK───────── │                  │
    │                    │ ────业务消息────→ │
    │                    │ ←─────ACK──────── │
    │ ←──────── 最终确认 ──────────          │

ACK 机制的核心组件

组件作用
消息 ID每条消息唯一标识,用于确认和重传去重
未确认消息表ConcurrentHashMap<msgId, 发送时间> 存储待确认消息
超时检测定时扫描,移除超时未确认的消息并进行重传
重传策略指数退避(2^n 秒) + 最大重试次数
幂等性服务端存储已处理的消息 ID,避免重复处理

server 端

package com.jysemel.iot;

import com.jysemel.iot.code.HeartbeatDecoder;
import com.jysemel.iot.code.HeartbeatEncoder;
import com.jysemel.iot.handler.ServerHeartbeatHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class HeartbeatServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            // ① 编解码器:将字节流转为HeartbeatMessage对象
                            p.addLast(new HeartbeatDecoder());
                            p.addLast(new HeartbeatEncoder());
                            
                            // ② 读空闲检测:10秒未收到客户端消息则触发
                            p.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
                            
                            // ③ 业务处理器:处理心跳和业务消息
                            p.addLast(new ServerHeartbeatHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = bootstrap.bind(8182).sync();
            System.out.println("========================================");
            System.out.println("  心跳服务器启动成功,监听端口: 8182");
            System.out.println("========================================");
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

client 端

package com.jysemel.iot;

import com.jysemel.iot.code.HeartbeatDecoder;
import com.jysemel.iot.code.HeartbeatEncoder;
import com.jysemel.iot.handler.ClientHeartbeatHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class HeartbeatClient {

    private final String host;
    private final int port;
    private Channel channel;

    public HeartbeatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void connect() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();

                            // ① 编解码器
                            p.addLast(new HeartbeatDecoder());
                            p.addLast(new HeartbeatEncoder());

                            // ② 写空闲检测:5秒无数据发送则触发心跳
                            p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));

                            // ③ 心跳处理器:处理心跳和ACK
                            p.addLast(new ClientHeartbeatHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect(host, port).sync();
            channel = future.channel();

            System.out.println("========================================");
            System.out.println("  客户端连接成功: " + host + ":" + port);
            System.out.println("========================================");

            // 阻塞直到连接关闭
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        HeartbeatClient client = new HeartbeatClient("127.0.0.1", 8182);
        client.connect();
    }
}

最近更新: 2026/5/14 17:19
Contributors: kcnf
Prev
编解码器机制与自定义协议
Next
mqtt服务demo演示