物联网 基于netty编解码器机制与自定义协议
编解码器
Netty 中网络传输的数据本质是 字节(ByteBuf),而业务逻辑通常操作 对象(如 POJO、字符串、JSON)
编码器(Encoder):将业务对象 → 字节,以便写出到网络
解码器(Decoder):将字节 → 业务对象,以便业务 Handler 处理
TCP 流式特性带来的粘包/拆包问题,必须在 解码阶段 解决
Netty 提供了丰富的解码器基类,其中最重要的就是 ByteToMessageDecoder
源码
netty-sample-00[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/netty/netty-sample-00]
核心编解码器类层次
| 抽象类 | 作用 |
|---|---|
| ByteToMessageDecoder | 将入站字节解码为多个对象(可解决粘包/拆包)。需要实现 decode() 方法,将累积的字节解码成业务对象并放入 |
| MessageToByteEncoder | 将出站对象编码为字节。实现 encode() 方法。 |
| ReplayingDecoder | ByteToMessageDecoder 的子类,它通过状态机简化了解码逻辑,但性能略低(使用方便的断言方式)。 |
| MessageToMessageDecoder | 将一种消息类型解码为另一种(如 JSON 字符串 → POJO)。 |
协议格式
+----------+----------------+
| 长度(4B) | 数据(变长) |
+----------+----------------+
代码
server 端
package com.jysemel.iot;
import com.jysemel.iot.code.CustomProtocolDecoder;
import com.jysemel.iot.code.CustomProtocolEncoder;
import com.jysemel.iot.handler.ServerBusinessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class CustomProtocolServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加自定义解码器(解决粘包/拆包)
pipeline.addLast(new CustomProtocolDecoder());
// 添加自定义编码器(用于回显)
pipeline.addLast(new CustomProtocolEncoder());
// 业务 Handler
pipeline.addLast(new ServerBusinessHandler());
}
});
ChannelFuture future = bootstrap.bind(8182).sync();
System.out.println("自定义协议服务器启动,端口 8182");
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
client 端
package com.jysemel.iot;
import com.jysemel.iot.code.CustomProtocolDecoder;
import com.jysemel.iot.code.CustomProtocolEncoder;
import com.jysemel.iot.handler.ClientBusinessHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class CustomProtocolClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomProtocolDecoder());
pipeline.addLast(new CustomProtocolEncoder());
pipeline.addLast(new ClientBusinessHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 8182).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
