物联网 基于netty构建mqtt服务udp支持
简述
应用中分别启动TCP和UDP两个服务,实现协议的分离与消息互通
源码
netty-sample-02[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/netty/netty-sample-02]
代码
package com.jysemel.iot;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
public class DualProtocolServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
EventLoopGroup udpGroup = new NioEventLoopGroup(); // UDP工作组
try {
// --- 1. MQTT over TCP 服务 (端口1883) ---
ServerBootstrap tcpBootstrap = new ServerBootstrap();
tcpBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new MqttDecoder(64 * 1024));
p.addLast(MqttEncoder.INSTANCE);
p.addLast("mqttHandler", new MqttServerHandler()); // MQTT业务逻辑
}
});
ChannelFuture tcpFuture = tcpBootstrap.bind(1883).sync();
System.out.println("[TCP] MQTT Broker 启动, 端口 1883");
// --- 2. UDP 服务 (端口8888) 接收自定义传感器数据 ---
Bootstrap udpBootstrap = new Bootstrap();
udpBootstrap.group(udpGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) {
ch.pipeline().addLast("udpHandler", new UdpServerHandler());
}
});
ChannelFuture udpFuture = udpBootstrap.bind(8888).sync();
System.out.println("[UDP] 自定义传感器服务启动, 端口 8888");
tcpFuture.channel().closeFuture().sync();
udpFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
udpGroup.shutdownGracefully();
}
}
}
package com.jysemel.iot;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import java.nio.charset.StandardCharsets;
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
ByteBuf content = packet.content();
String udpData = content.toString(StandardCharsets.UTF_8);
System.out.println("[UDP] Received: " + udpData);
// 将UDP数据转换为MQTT消息,并广播给所有MQTT客户端
// 这里固定发布到 "udp/sensor" 主题
MqttServerHandler.broadcast("udp/sensor", udpData);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("UDP error: " + cause.getMessage());
}
}
