砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • nodejs
    • npm
    • webpack
    • 微信
    • 正则
    • uniapp
  • java

    • java基础
    • jdk体系
    • jvm
    • spring
    • spring_cloud
    • spring_boot
    • 分库分表
    • zookeeper
  • python

    • python基础
    • python高级
    • python框架
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • docker
  • k8s
  • devops
  • nginx
  • 元宇宙
  • 区块链
  • 物联网
  • linux
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
      • 物联网 MQTT简易版Broker基于Protobuf
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
      • Protobuf实战第二篇
      • Protobuf实战第三篇
    • emqx

      • emqx入门
    • mica

      • mica入门
  • 物联网 MQTT简易版Broker,基于spring-boot socket
    • 流程演示
    • 源码
    • 添加依赖
    • socket broker
    • 消息发布入口
    • 订阅入口
    • 演示结果

物联网 MQTT简易版Broker,基于spring-boot socket

流程演示

发布者发送: PUBLISH#topic1#hello
    ↓
Broker 收到消息 (handlePublish)
    ↓
查找 topic1 的所有订阅者 (PrintWriter 集合)
    ↓
遍历每个订阅者的 PrintWriter
    ↓
writer.println("MSG#topic1#hello")  ← 你选中的代码
    ↓
数据写入 Socket 输出流 → 网络传输 → 订阅端 Socket 输入流
    ↓
订阅端的 in.readLine() 返回这行数据
    ↓
订阅端收到消息

源码

spring-boot-iot-sample-broker[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/spring-boot-iot-sample/spring-boot-iot-sample-broker]

添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.jysemel.iot</groupId>
        <artifactId>iot-sample</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>spring-boot-iot-sample</artifactId>
    <packaging>pom</packaging>

    <modules>
        <module>spring-boot-iot-sample-broker</module>
        <module>spring-boot-iot-protobuf-api</module>
        <module>spring-boot-iot-sample-broker-protobuf</module>
        <module>spring-boot-iot-sample-broker-protobuf-python</module>
    </modules>


    <!-- pom.xml -->
    <dependencies>
        <!-- 用于处理异步连接,也可以直接用 Java 线程池 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

    </dependencies>

    <dependencyManagement>
        <!-- pom.xml -->
        <dependencies>
            <!-- Protobuf 支持 -->
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.21.12</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

socket broker

package com.jysemel.iot.socket;


import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@Component
public class MockMqttSocketServer {

    private ServerSocket serverSocket;
    private final ExecutorService clientExecutor = Executors.newCachedThreadPool();

    @Value("${socket.server.port:18888}")
    private int port;

    @PostConstruct
    public void start() throws IOException {
        serverSocket = new ServerSocket(port);
        log.info("普通 Socket Broker 服务端启动,端口:{},模拟 MQTT 发布/订阅", port);

        // 单独线程接受连接
        new Thread(() -> {
            while (!serverSocket.isClosed()) {
                try {
                    Socket socket = serverSocket.accept();
                    clientExecutor.submit(new ClientHandler(socket));
                } catch (IOException e) {
                    if (!serverSocket.isClosed()) log.warn("接受连接异常", e);
                }
            }
        }).start();
    }

    @PreDestroy
    public void stop() throws IOException {
        if (serverSocket != null) serverSocket.close();
        clientExecutor.shutdown();
    }
}
package com.jysemel.iot.socket;


import com.jysemel.iot.Constant;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class ClientHandler implements Runnable {


    // 全局订阅表:topic -> Set<PrintWriter>
    private static final Map<String, Set<PrintWriter>> SUBSCRIPTIONS = new ConcurrentHashMap<>();
    private final Socket socket;
    private PrintWriter out;
    private BufferedReader in;
    private String clientId;

    public ClientHandler(Socket socket) {
        this.socket = socket;
        this.clientId = socket.getRemoteSocketAddress().toString();
    }

    @Override
    public void run() {
        MDC.put(Constant.TRACE_ID_KEY, UUID.randomUUID().toString().replace("-", "").substring(0, 16));
        try {
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            out.println("简易模拟 SOCKET BROKER (MQTT-LIKE)");
            log.info("新客户端连接: {}", clientId);

            String line;
            while ((line = in.readLine()) != null) {
                if (line.startsWith("SUBSCRIBE#")) {
                    handleSubscribe(line);
                    out.println("SUBSCRIBE");
                } else if (line.startsWith("PUBLISH#")) {
                    handlePublish(line);
                    out.println("PUBLISH");
                } else if (line.equals("PING")) {
                    out.println("PONG");
                } else if (line.equals("DISCONNECT")) {
                    removeAllSubscriptions();
                    break;
                } else {
                    out.println("ERROR: Unknown command");
                }
            }
        } catch (IOException e) {
            log.warn("客户端{}连接异常: {}", clientId, e.getMessage());
        } finally {
            removeAllSubscriptions();
            closeQuietly();
            log.info("客户端{}已断开", clientId);
            MDC.clear();
        }
    }

    private void handleSubscribe(String line) {
        String topic = line.substring("SUBSCRIBE#".length());
        SUBSCRIPTIONS.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()).add(out);
        clientId = socket.getRemoteSocketAddress().toString() + ":" + topic;
        out.println("SUBACK: subscribed to " + topic);
        log.info("客户端 {} 订阅主题: {}", clientId, topic);
    }

    private void handlePublish(String line) {
        // 格式: PUBLISH#topic#payload
        String[] parts = line.split("#", 3);
        if (parts.length < 3) {
            out.println("ERROR: PUBLISH format error");
            return;
        }
        String topic = parts[1];
        String payload = parts[2];
        log.info("收到发布: topic={}, payload={}", topic, payload);

        // 模拟 MQTT 的 QoS 0(最多一次,无确认),直接转发给所有订阅者
        Set<PrintWriter> subscribers = SUBSCRIPTIONS.getOrDefault(topic, Collections.emptySet());
        if (subscribers.isEmpty()) {
            out.println("WARN: no subscribers for " + topic);
        } else {
            String msg = "MSG#" + topic + "#" + payload;
            for (PrintWriter writer : subscribers) {
                writer.println(msg);
            }
            out.println("PUBACK: published to " + subscribers.size() + " clients");
        }
    }

    private void removeAllSubscriptions() {
        // 从所有主题中移除当前客户端
        SUBSCRIPTIONS.values().forEach(set -> set.remove(out));
        log.info("客户端{}断开,已清理订阅", clientId);
    }

    private void closeQuietly() {
        try { if (in != null) in.close(); } catch (IOException ignored) {}
        try { if (out != null) out.close(); } catch (Exception ignored) {}
        try { if (socket != null) socket.close(); } catch (IOException ignored) {}
    }
}

消息发布入口

package com.jysemel.iot.controller;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.jysemel.iot.Constant;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Slf4j
@RestController
public class TestPublishController {

    @Value("${mock.client.port:18888}")
    private Integer mockPort;

    @Value("${mock.client.ip:127.0.0.1}")
    private String mockIp;


    // 通过 HTTP 触发本机模拟客户端行为,方便验证,但也可以直接用 telnet
    @PostMapping("/publish")
    public String publish(@RequestBody String message) {
        JSONObject jsonObject = JSON.parseObject(message);
        MDC.put(Constant.TRACE_ID_KEY, UUID.randomUUID()+"");
        try (Socket client = new Socket(mockIp, mockPort);
             PrintWriter out = new PrintWriter(client.getOutputStream(), true);
             BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()))) {

            String welcomeMsg = in.readLine();
            log.info("模拟推送报文 {}", jsonObject.toString());
            log.info("模拟推送请求端口 {}", client.getPort());
            log.info("模拟推送欢迎消息 {}", welcomeMsg);

            out.println("PUBLISH#" + jsonObject.getString("topic") + "#" + jsonObject.getString("payload") + "-" + MDC.get(Constant.TRACE_ID_KEY));

            String publishResponse = in.readLine();
            log.info("模拟推送PUBLISH响应 {}", publishResponse);

            Thread.sleep(TimeUnit.SECONDS.toMillis(5));

            return "Published: " + publishResponse;
        } catch (Exception e) {
            log.error("发布消息失败", e);
            return "Error: " + e.getMessage();
        }
    }

}

订阅入口

package com.jysemel.iot.controller;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j
@RestController
public class TestSubscribeController {

    private static final Map<String, ExecutorService> SUBSCRIBER_POOLS = new ConcurrentHashMap<>();
    private static final Map<String, Future<?>> SUBSCRIBER_FUTURES = new ConcurrentHashMap<>();


    @Value("${mock.client.port:18888}")
    private Integer mockPort;

    @Value("${mock.client.ip:127.0.0.1}")
    private String mockIp;

    @PostMapping("/subscribe")
    public String subscribe(@RequestBody String message) {
        JSONObject jsonObject = JSON.parseObject(message);
        String topic = jsonObject.getString("topic");
        String clientId = jsonObject.getString("clientId", "subscriber-" + System.currentTimeMillis());
        if (topic == null || topic.isEmpty()) {
            return "Error: topic is required";
        }
        // 如果已经存在相同clientId的订阅,先取消
        if (SUBSCRIBER_FUTURES.containsKey(clientId)) {
            unsubscribeById(clientId);
        }
        // 创建新的订阅任务
        ExecutorService executor = Executors.newSingleThreadExecutor();
        SUBSCRIBER_POOLS.put(clientId, executor);
        Future<?> future = executor.submit(() -> {
            try (Socket subscriberSocket = new Socket(mockIp, mockPort);
                PrintWriter out = new PrintWriter(subscriberSocket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(new InputStreamReader(subscriberSocket.getInputStream()))) {
                String welcomeMsg = in.readLine();
                log.info("订阅客户端 {} 连接成功,欢迎消息: {}", clientId, welcomeMsg);
                // 发送订阅命令
                out.println("SUBSCRIBE#" + topic);
                String subAck = in.readLine();
                log.info("订阅客户端 {} 收到 SUBACK: {}", clientId, subAck);
                // 持续监听消息
                String msg;
                while ((msg = in.readLine()) != null && !Thread.currentThread().isInterrupted()) {
                    if (msg.startsWith("MSG#")) {
                        String[] parts = msg.split("#", 3);
                        if (parts.length >= 3) {
                            String msgTopic = parts[1];
                            String msgPayload = parts[2];
                            log.info("订阅客户端 {} 收到消息 - Topic: {}, Payload: {}", clientId, msgTopic, msgPayload);
                        }
                    } else if (msg.equals("PONG")) {
                        log.debug("订阅客户端 {} 收到 PONG", clientId);
                    }
                }
            } catch (IOException e) {
                if (!Thread.currentThread().isInterrupted()) {
                    log.error("订阅客户端 {} 连接异常", clientId, e);
                }
            } finally {
                log.info("订阅客户端 {} 已断开", clientId);
                SUBSCRIBER_POOLS.remove(clientId);
                SUBSCRIBER_FUTURES.remove(clientId);
            }
        });
        SUBSCRIBER_FUTURES.put(clientId, future);
        return "Subscribed client: " + clientId + " to topic: " + topic;
    }

    @PostMapping("/unsubscribe")
    public String unsubscribe(@RequestBody String message) {
        JSONObject jsonObject = JSON.parseObject(message);
        String clientId = jsonObject.getString("clientId");
        if (clientId == null || clientId.isEmpty()) {
            return "Error: clientId is required";
        }
        return unsubscribeById(clientId);
    }

    private String unsubscribeById(String clientId) {
        Future<?> future = SUBSCRIBER_FUTURES.remove(clientId);
        if (future != null) {
            future.cancel(true);
        }
        ExecutorService executor = SUBSCRIBER_POOLS.remove(clientId);
        if (executor != null) {
            executor.shutdownNow();
        }
        log.info("取消订阅客户端: {}", clientId);
        return "Unsubscribed client: " + clientId;
    }

    @GetMapping("/subscribers")
    public String getSubscribers() {
        return "Active subscribers: " + SUBSCRIBER_FUTURES.keySet();
    }

}

演示结果

  • 注册订阅(http://127.0.0.1:8081/subscribe)
{
    "topic":"test",
    "payload":"test"
}
  • 发布信息(http://127.0.0.1:8081/publish)
{
    "topic":"test",
    "payload":"test"
}
  • 结果 img
最近更新: 2026/5/8 15:40
Contributors: jysemel, kcnf, Copilot
Prev
物联网 MQTT订阅性能优势
Next
物联网 MQTT简易版Broker基于Protobuf