物联网 基于netty心跳和ack机制
编解码器
TCP 长连接在物理断开后,双方无法主动感知连接状态,导致大量无效连接占用服务器资源,称为"假死"
Netty 的心跳机制核心是 IdleStateHandler:
它基于定时任务与事件驱动模型,
当 Channel 在指定时间内没有执行读、写操作时,触发 IdleStateEvent,
由业务层决定发送心跳包或关闭连接
源码
netty-sample-00[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/netty/netty-sample-00]
心跳机制原理与配置
IdleStateHandler 的构造函数参数,可按需将不关注的参数设为 0
| 参数 | 含义 | 触发条件 |
|---|---|---|
| readerIdleTime | 读空闲时间 | 指定时间内未收到数据(即未调用 channelRead()) |
| writerIdleTime | 写空闲时间 | 指定时间内未发送数据(即未调用 write() 或 writeAndFlush()) |
| allIdleTime | 全空闲时间 | 指定时间内既无读也无写 |
客户端 vs 服务端的心跳职责对比
| 角色 | 配置 | 职责 | 处理逻辑 |
|---|---|---|---|
| 客户端 | new IdleStateHandler(0, 30, 0) | 主动发送心跳包 | WRITER_IDLE 时发送 PING,收到 PONG 后认为连接正常 |
| 服务端 | new IdleStateHandler(45, 0, 0) | 检测客户端是否存活 | READER_IDLE 累计 2~3 次无数据则关闭连接 |
心跳间隔建议 15–30 秒,读空闲超时设为心跳间隔的 2–3 倍(45–90 秒),
可结合随机抖动避免大量连接同时发送心跳。
超时时间需根据实际网络 RTT 调整,
建议公式为「典型 RTT × 3 + 心跳间隔 × 2」,避免频繁误触发
ACK 确认机制
TCP 保证传输到达对端操作系统缓冲区,但不保证业务层已处理消息。ACK 机制需要在应用层实现消息确认,确保消息真正被处理
┌────────┐ ┌────────┐ ┌────────┐
│ Client │ │ Server │ │ Peer │
└───┬────┘ └───┬────┘ └───┬────┘
│ ─────业务消息────→ │ │
│ ←─────ACK───────── │ │
│ │ ────业务消息────→ │
│ │ ←─────ACK──────── │
│ ←──────── 最终确认 ────────── │
ACK 机制的核心组件
| 组件 | 作用 |
|---|---|
| 消息 ID | 每条消息唯一标识,用于确认和重传去重 |
| 未确认消息表 | ConcurrentHashMap<msgId, 发送时间> 存储待确认消息 |
| 超时检测 | 定时扫描,移除超时未确认的消息并进行重传 |
| 重传策略 | 指数退避(2^n 秒) + 最大重试次数 |
| 幂等性 | 服务端存储已处理的消息 ID,避免重复处理 |
server 端
package com.jysemel.iot;
import com.jysemel.iot.code.HeartbeatDecoder;
import com.jysemel.iot.code.HeartbeatEncoder;
import com.jysemel.iot.handler.ServerHeartbeatHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class HeartbeatServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// ① 编解码器:将字节流转为HeartbeatMessage对象
p.addLast(new HeartbeatDecoder());
p.addLast(new HeartbeatEncoder());
// ② 读空闲检测:10秒未收到客户端消息则触发
p.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
// ③ 业务处理器:处理心跳和业务消息
p.addLast(new ServerHeartbeatHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(8182).sync();
System.out.println("========================================");
System.out.println(" 心跳服务器启动成功,监听端口: 8182");
System.out.println("========================================");
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
client 端
package com.jysemel.iot;
import com.jysemel.iot.code.HeartbeatDecoder;
import com.jysemel.iot.code.HeartbeatEncoder;
import com.jysemel.iot.handler.ClientHeartbeatHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class HeartbeatClient {
private final String host;
private final int port;
private Channel channel;
public HeartbeatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void connect() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// ① 编解码器
p.addLast(new HeartbeatDecoder());
p.addLast(new HeartbeatEncoder());
// ② 写空闲检测:5秒无数据发送则触发心跳
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
// ③ 心跳处理器:处理心跳和ACK
p.addLast(new ClientHeartbeatHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
channel = future.channel();
System.out.println("========================================");
System.out.println(" 客户端连接成功: " + host + ":" + port);
System.out.println("========================================");
// 阻塞直到连接关闭
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
HeartbeatClient client = new HeartbeatClient("127.0.0.1", 8182);
client.connect();
}
}
