砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • 环境配置
    • 微信生态
    • 正则
    • 全栈技能
  • java圈

    • java基础
    • jdk体系
    • jvm
    • spring框架
    • 分库分表
    • zookeeper
  • python技能

    • python编程
    • python数据
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • devops
  • 元宇宙
  • 区块链
  • 物联网
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • 环境配置
    • 微信生态
    • 正则
    • 全栈技能
  • java圈

    • java基础
    • jdk体系
    • jvm
    • spring框架
    • 分库分表
    • zookeeper
  • python技能

    • python编程
    • python数据
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • devops
  • 元宇宙
  • 区块链
  • 物联网
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
        • 核心实战(安全)
    • mqtt-模拟器|客户端

      • 集成Paho

        • 设备模拟器设计
        • 设备模拟器演示
        • Paho拆解入门
        • Paho拆解核心
        • Paho拆解高性能
        • 其他客户端框架比较
      • NetAssist

        • 设备模拟器设计
      • hiveMq

        • hiveMq客户端
    • netty-mqtt-boot

      • 模块化设计
      • 统一接入层
      • 消息路由与流转层
      • 核心服务层
      • 业务应用层
      • 整体项目管理
      • 测试脚手架
      • 兼容支持
    • mqtt-压测

      • mqtt-jmeter

        • 模块化设计
      • ‌emqtt-bench

        • 模块化设计
    • mqtt-规则引擎

      • MQTT规则引擎
      • sql

        • 基于sql规则
      • ice

        • 模块化设计
      • Aviator

        • 模块化设计
      • Drools

        • 模块化设计
    • mqtt-实战问题

      • MQTT分布式集群

        • 分布式集群

          • 分布式集群意义
          • 如何构建分布式集群
          • 分布式集群简易版
          • mqtt入口HAProxy模拟负载
        • ignite

          • ignite入门
          • 模块化设计
      • 实战汇总

        • 分布式集群意义
  • mqtt-netty源码拆解

    • mqtt-netty源码拆解

      • Pipeline双向链表
      • MqttEncoder.INSTANCE
      • 分布式集群简易版
      • mqtt入口HAProxy模拟负载
  • 物联网 基于netty构建mqtt协议规范(三种 QoS 等级)
    • 简述
    • 源码(netty-sample-03-qos)
    • 三种 QoS 等级
    • 协议命令演示
      • 🧩 命令交互流程图
        • 1️⃣ 订阅主题
        • 2️⃣ 发布消息(按QoS等级)
          • 🔹 QoS 0(最多一次)
          • 🔸 QoS 1(至少一次)
          • 🔹 QoS 2(仅一次)
        • 3️⃣ Broker 推送消息给订阅者
    • server代码
    • client代码
    • 运行结果

物联网 基于netty构建mqtt协议规范(三种 QoS 等级)

简述

QoS(Quality of Service)是 MQTT 协议的核心特性,定义了消息传递的可靠性等级

源码(netty-sample-03-qos)

https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-03

三种 QoS 等级

等级名称消息交付次数确认机制适用场景
0最多一次最多一次(可能丢失)无确认,不重传传感器数据、日志,可容忍少量丢失
1至少一次至少一次(可能重复)PUBLISH → PUBACK,超时重传需要可靠送达但可接受重复的控制指令
2仅一次恰好一次(不重不丢)4 步握手(PUBREC / PUBREL / PUBCOMP)计费、关键状态同步,不允许重复

协议命令演示

🧩 命令交互流程图

1️⃣ 订阅主题
┌────────┐     SUB <topic>      ┌────────┐
│ 客户端 │ ───────────────────► │ Broker │
│  (C)   │                       │  (B)   │
└────────┘                       └────────┘
     ▲                                 │
     └─────────── SUBACK ──────────────┘
2️⃣ 发布消息(按QoS等级)
🔹 QoS 0(最多一次)
┌────────┐     PUB 0 <topic> <msg>      ┌────────┐
│ 客户端 │ ───────────────────────────► │ Broker │
└────────┘                               └────────┘
(无确认,消息发出即忘)
🔸 QoS 1(至少一次)
┌────────┐     PUB 1 <topic> <msg>      ┌────────┐
│ 客户端 │ ───────────────────────────► │ Broker │
└────────┘                               └────────┘
     ▲                                         │
     └────────────── ACK <msgId> ─────────────┘
🔹 QoS 2(仅一次)
┌────────┐     PUB 2 <topic> <msg>      ┌────────┐
│ 客户端 │ ───────────────────────────► │ Broker │
└────────┘                               └────────┘
     ▲                                         │
     │            REC <msgId>                 │
     └────────────────────────────────────────┘
     
┌────────┐     REL <msgId>            ┌────────┐
│ 客户端 │ ───────────────────────────► │ Broker │
└────────┘                               └────────┘
     ▲                                         │
     │            COMP <msgId>                │
     └────────────────────────────────────────┘
3️⃣ Broker 推送消息给订阅者
┌────────┐
│ Broker │
└────────┘
    │
    │ MSG <topic> <msg> (含msgId)
    ▼
┌────────┐
│ 客户端 │
└────────┘
    │
    │ ACK <msgId> (仅QoS 1/2)
    ▼
┌────────┐
│ Broker │
└────────┘

server代码

package com.jysemel.iot;

import com.jysemel.iot.handler.SimpleBrokerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.StandardCharsets;

/**
 * QoS Broker - 简化版消息服务器
 *
 * 学习目标:
 * 1. 理解Pub/Sub(发布/订阅)模式
 * 2. 理解三种QoS级别的区别
 * 3. 掌握基本的消息确认机制
 *
 * 使用方法:
 * 1. 运行此程序启动Broker
 * 2. 运行QosClient连接到此Broker
 * 3. 使用SUB和PUB命令测试
 */
public class QosBroker {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("╔════════════════════════════════════╗");
        System.out.println("║     QoS Broker 启动中...          ║");
        System.out.println("║     端口: 8888                     ║");
        System.out.println("║     支持: QoS 0 / 1 / 2           ║");
        System.out.println("╚════════════════════════════════════╝\n");

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            // 字符串解码和编码
                            p.addLast(new StringDecoder(StandardCharsets.UTF_8));
                            p.addLast(new StringEncoder(StandardCharsets.UTF_8));
                            // 业务处理器
                            p.addLast(new SimpleBrokerHandler());
                        }
                    });

            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("✓ Broker已就绪,等待客户端连接...\n");

            future.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

client代码

package com.jysemel.iot;

import com.jysemel.iot.handler.SimpleClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class QosClient {
    private String host = "127.0.0.1";
    private int port = 8888;
    private Channel channel;
    private ScheduledExecutorService scheduler;
    private boolean autoTestMode = false;

    public static void main(String[] args) throws InterruptedException {
        QosClient client = new QosClient();
        client.autoTestMode = true;
        client.connect();
    }

    public void connect() throws InterruptedException {
        System.out.println("╔════════════════════════════════════╗");
        System.out.println("║     MQTT QoS 客户端               ║");
        System.out.println("║     连接到: " + host + ":" + port);
        if (autoTestMode) {
            System.out.println("║     模式: 自动测试                  ║");
        }
        System.out.println("╚════════════════════════════════════╝\n");
        EventLoopGroup group = new NioEventLoopGroup();
        scheduler = Executors.newScheduledThreadPool(2);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new StringDecoder(StandardCharsets.UTF_8));
                            p.addLast(new StringEncoder(StandardCharsets.UTF_8));
                            p.addLast(new SimpleClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            channel = future.channel();
            System.out.println("✅ 连接成功!\n");
            startAutoTest();
            channel.closeFuture().sync();
        } finally {
            if (scheduler != null) {
                scheduler.shutdown();
            }
            group.shutdownGracefully();
        }
    }

    private void startAutoTest() {
        System.out.println("🚀 启动自动测试模式...\n");

        scheduler.schedule(() -> {
            System.out.println("📝 [测试1]     主题订阅: news");
            sendCommand("SUB news");
        }, 2, TimeUnit.SECONDS);

//        scheduler.schedule(() -> {
//            System.out.println("📝 [测试2]     向服务端发布 QoS 0 消息");
//            sendCommand("PUB 0 news Hello_QoS0");
//        }, 20, TimeUnit.SECONDS);

        scheduler.schedule(() -> {
            System.out.println("📝 [测试3]     向服务端发布 QoS 1 消息");
            sendCommand("PUB 1 news Important_QoS1");
        }, 30, TimeUnit.SECONDS);
//
//        scheduler.schedule(() -> {
//            System.out.println("📝 [测试4]   向服务端发布 QoS 2 消息");
//            sendCommand("PUB 2 news Critical_QoS2");
//        }, 40, TimeUnit.SECONDS);
//
//        scheduler.schedule(() -> {
//            System.out.println("✅ 自动测试完成!");
//            if (channel != null && channel.isActive()) {
//                channel.close();
//            }
//        }, 50, TimeUnit.SECONDS);
    }

    private void sendCommand(String command) {
        channel.writeAndFlush(command + "\n");
        System.out.println("📤 已发送命令: " + command);
    }
}


运行结果

img

img

最近更新: 2026/5/16 14:48
Contributors: kcnf
Prev
mqtt协议规范(轻量级二进制协议)
Next
mqtt协议规范(主题通配符订阅)