物联网 基于netty核心实战-会话管理
简述
MQTT 的会话(Session)是服务端为每个客户端维护的状态信息,用于支持离线消息和持久订阅
源码(netty-sample-05-session)
https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-05
核心概念
- 会话是服务端为某个客户端存储的状态集合
客户端的订阅列表(主题 + QoS)
未完成的消息(QoS 1/2 的待确认消息)
客户端是否在线(用于离线消息存储)
会话管理 vs 心跳保活
会话管理和心跳保活是 MQTT 协议中两个互补但完全不同的机制,它们共同保障了物联网通信的可靠性与效率
| 维度 | 会话管理 (Session Management) | 心跳保活 (Keep Alive / PING) |
|---|---|---|
| 目标 | 解决“断线后如何记住状态” | 解决“如何及时发现断线” |
| 作用时间 | 跨连接(从一次连接到下一次重连) | 单次连接期间 |
| 存储内容 | 订阅列表、离线消息、未完成 QoS 状态 | 无状态,只传递“我还活着”的信号 |
| 触发方式 | CONNECT 报文中的 Clean Session 标志决定是否持久化 | 客户端定时发送 PINGREQ,服务端回复 PINGRESP |
| 断线后 | 若 Clean Session=0,Broker 保留会话数据;若=1,立即清除 | 心跳停止,Broker 会在一段时间后判定连接死亡 |
| 重连后 | 若 Clean Session=0,Broker 恢复会话(订阅、离线消息等) | 心跳计时器重置,从零开始继续保活 |
协同工作
┌─────────────────────────────────────────────────────────┐
│ MQTT 连接生命周期 │
├─────────────────────────────────────────────────────────┤
│ 连接建立 ──▶ 正常通信 ──▶ 断线 ──▶ 重连 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 会话加载 心跳保活维持 会话保存 会话恢复 │
│ (Clean (PINGREQ/ (订阅/离 (离线消息 │
│ Session) PINGRESP) 线消息) 补发) │
└─────────────────────────────────────────────────────────┘
直白的理解
时间轴 →
第一次连接:
你走进银行 → 柜员问“要办什么?”你说“存定期” → 银行记下(会话创建)
→ 办理中,柜员每隔30秒问“还在吗?”你答“在”(心跳维持)
→ 办完离开(断开连接)
离店期间:
银行保留你的“存定期”偏好(会话保存)
有人给你汇款,银行暂存(离线消息存储)
第二次连接:
你再次走进银行 → 柜员调出记录说“记得您,还是存定期吗?”(会话恢复)
→ 柜员把暂存的汇款给你(离线消息推送)
→ 重新开始每隔30秒问“还在吗?”(心跳重置)
源码
server
package com.jysemel.iot;
import com.jysemel.iot.handler.MqttSessionHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
public class MqttSessionServer {
public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new MqttDecoder(65536));
p.addLast("encoder", MqttEncoder.INSTANCE);
p.addLast("handler", new MqttSessionHandler());
}
});
b.bind(1883).sync().channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
client
package com.jysemel.iot;
import com.jysemel.iot.handler.TestMqttClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.*;
import java.nio.charset.StandardCharsets;
public class TestMqttClient {
private Channel channel;
private String clientId;
private int messageId = 1;
public void connect(String host, int port, String clientId, boolean cleanSession) throws Exception {
this.clientId = clientId;
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new MqttDecoder(65536));
p.addLast("encoder", MqttEncoder.INSTANCE);
p.addLast("handler", new TestMqttClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
channel = f.channel();
// 发送 CONNECT 报文
MqttConnectMessage connectMsg = MqttMessageBuilders.connect()
.clientId(clientId)
.cleanSession(cleanSession)
.keepAlive(60)
.protocolVersion(MqttVersion.MQTT_3_1_1)
.build();
channel.writeAndFlush(connectMsg);
System.out.println("CONNECT sent, cleanSession=" + cleanSession);
}
public void subscribe(String topic, int qos) {
MqttSubscribeMessage subscribeMsg = MqttMessageBuilders.subscribe()
.messageId(messageId++)
.addSubscription(MqttQoS.valueOf(qos), topic)
.build();
channel.writeAndFlush(subscribeMsg);
System.out.println("SUBSCRIBE sent, topic=" + topic + ", qos=" + qos);
}
public void publish(String topic, String message, int qos) {
MqttPublishMessage publishMsg = MqttMessageBuilders.publish()
.topicName(topic)
.qos(MqttQoS.valueOf(qos))
.messageId(messageId++)
.payload(Unpooled.copiedBuffer(message.getBytes(StandardCharsets.UTF_8)))
.build();
channel.writeAndFlush(publishMsg);
System.out.println("PUBLISH sent, topic=" + topic + ", message=" + message);
}
public void disconnect() {
MqttMessage disconnectMsg = MqttMessageBuilders.disconnect().build();
channel.writeAndFlush(disconnectMsg);
System.out.println("DISCONNECT sent");
channel.close();
}
public static void main(String[] args) throws Exception {
TestMqttClient client = new TestMqttClient();
// 示例:使用不同的 clientId 和 cleanSession 组合来演示会话管理
String clientId = "client-001";
boolean cleanSession = false; // 设置为 false 演示持久会话
client.connect("127.0.0.1", 1883, clientId, cleanSession);
Thread.sleep(1000); // 等待连接建立
// 订阅主题
client.subscribe("test/topic", 0);
Thread.sleep(500);
// 发布消息
client.publish("test/topic", "Hello Session Management!", 0);
Thread.sleep(500);
// 断开连接
client.disconnect();
System.exit(0);
}
}
