物联网 基于netty控制报文结构(发布与接收)
简述
MQTT PUBLISH 报文 的二进制结构(固定头、可变头、有效载荷),并基于 Netty 实现一个支持 QoS 0 的发布/订阅 Broker
源码(netty-sample-04-PubSub)
https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-04
PUBLISH 报文结构
| 组成部分 | 字节数 | 内容 |
|---|---|---|
| 固定头 | 2~5 | 第一个字节:高4位=3(报文类型),低4位=标志位(DUP、QoS、RETAIN) 后续字节:剩余长度(变长编码) |
| 可变头 | 变长 | 主题名(UTF-8 字符串:2字节长度 + 主题字节) 如果 QoS > 0,则紧跟 2 字节报文标识符 |
| 有效载荷 | 变长 | 应用消息(二进制) |
剩余长度 = 可变头长度 + 有效载荷长度。
标志位说明
固定头第一个字节的低4位
| 位 | 含义 |
|---|---|
| bit 3 | DUP(重发标志) |
| bit 2-1 | QoS(00=0,01=1,10=2,11=保留) |
| bit 0 | RETAIN(保留标志) |
简易代码
服务端
package com.jysemel.iot;
import com.jysemel.iot.coder.MqttPublishDecoder;
import com.jysemel.iot.pojo.PublishMessage;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MqttServer {
public static void main(String[] args) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// 添加解码器:字节 -> PublishMessage
p.addLast(new MqttPublishDecoder());
// 业务处理器:处理收到的 PublishMessage
p.addLast(new SimpleChannelInboundHandler<PublishMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, PublishMessage msg) {
System.out.println("收到: topic = " + msg.getTopic() + ", payload = " + msg.getPayload());
}
});
}
});
ChannelFuture f = b.bind(1883).sync();
System.out.println("MQTT server started on port 1883");
f.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
客户端
package com.jysemel.iot;
import com.jysemel.iot.coder.MqttPublishEncoder;
import com.jysemel.iot.pojo.PublishMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MqttClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// 客户端不需要解码,只需要编码 PUBLISH
p.addLast(new MqttPublishEncoder());
p.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 连接建立后,发送一个 PUBLISH 消息
PublishMessage msg = new PublishMessage("test", "你好 MQTT");
System.out.println("发送: topic=test, payload=Hello MQTT");
ctx.writeAndFlush(msg);
}
});
}
});
ChannelFuture f = b.connect("127.0.0.1", 1883).sync();
// 等待 5 秒后关闭
Thread.sleep(5000);
f.channel().close().sync();
} finally {
group.shutdownGracefully();
}
}
}
