砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
    • app
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 基于netty构建入门
      • 理解粘包/拆包
      • 编解码器机制与自定义协议
      • 心跳和ack机制
      • mqtt服务demo演示
      • mqtt服务协议支持
      • mqtt服务udp支持
      • mqtt协议规范(发布/订阅模式)
      • mqtt协议规范(轻量级二进制协议)
      • mqtt协议规范(三种 QoS 等级)
      • mqtt协议规范(主题通配符订阅)
      • mqtt协议规范(遗嘱与保留消息)
  • 物联网 基于netty构建mqtt服务demo演示
    • 简述
    • 基于Netty的MQTT服务
    • 源码
    • 添加pom依赖
    • 项目目录结构
    • 启动类 代码
    • 测试结果

物联网 基于netty构建mqtt服务demo演示

简述

基于Netty实现轻量级MQTT协议,利用了Netty的异步、非阻塞特性,将MQTT协议的核心能力(连接管理、消息路由、心跳保活等)进行模块化构建

基于Netty的MQTT服务

  • 连接接入:

通过EventLoopGroup(Boss & Worker)模型高效处理网络I/O,实现单机数万至百万连接的承载

  • 协议编解码:

定义MqttDecoder/Encoder解析编码MQTT报文。Netty的ChannelPipeline可灵活配置编解码器处理CONTROL报文

  • 核心处理:

在ChannelHandler中实现核心逻辑

- 会话与状态管理:
>  管理客户端连接(如心跳超时检测),维护订阅树和消息缓存(Pending Messages)

- 消息路由:
> 为PUBLISH/SUBSCRIBE报文,实现高性能通配符匹配,将消息高效投递至订阅者

- QoS保证:
> 支持MQTT服务质量等级,确保消息的可靠投递

- 保留与遗嘱消息:
> 实现对持久会话的断线重连自动恢复,以及保留/遗嘱消息的处理
  • 存储扩展:

根据业务需求,往往需要将上述(会话、订阅、消息)数据进行持久化或使用高性能缓存(如Redis)

源码

netty-sample-01[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/netty/netty-sample-01]

添加pom依赖

<dependencies>
    <!-- Netty 核心依赖 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
    </dependency>
</dependencies>

项目目录结构

img

启动类 代码

package com.jysemel.iot;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;


public class MqttServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // 启用TCP的KeepAlive,探测死连接
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加心跳检测:60秒无读事件触发IdleStateEvent
                            pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
                            // 添加MQTT解码器,指定最大报文长度为 65536 字节
                            pipeline.addLast("mqttDecoder", new MqttDecoder(64 * 1024));
                            // 添加MQTT编码器
                            pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE);
                            // 添加业务处理器
                            pipeline.addLast("mqttServerHandler", new MqttServerHandler());
                        }
                    });
            // 绑定1883端口并启动
            ChannelFuture channelFuture = serverBootstrap.bind(1883).sync();
            System.out.println("🚀 Netty MQTT Broker 启动成功,端口: 1883");
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

测试结果

img

最近更新: 2026/5/14 17:25
Contributors: kcnf
Prev
心跳和ack机制
Next
mqtt服务协议支持