物联网 MQTT简易版Broker,基于spring-boot socket
流程演示
发布者发送: PUBLISH#topic1#hello
↓
Broker 收到消息 (handlePublish)
↓
查找 topic1 的所有订阅者 (PrintWriter 集合)
↓
遍历每个订阅者的 PrintWriter
↓
writer.println("MSG#topic1#hello") ← 你选中的代码
↓
数据写入 Socket 输出流 → 网络传输 → 订阅端 Socket 输入流
↓
订阅端的 in.readLine() 返回这行数据
↓
订阅端收到消息
源码
spring-boot-iot-sample-broker[https://gitee.com/kcnf-webrtc/iot-sample/tree/master/spring-boot-iot-sample/spring-boot-iot-sample-broker]
添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.jysemel.iot</groupId>
<artifactId>iot-sample</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>spring-boot-iot-sample</artifactId>
<packaging>pom</packaging>
<modules>
<module>spring-boot-iot-sample-broker</module>
<module>spring-boot-iot-protobuf-api</module>
<module>spring-boot-iot-sample-broker-protobuf</module>
<module>spring-boot-iot-sample-broker-protobuf-python</module>
</modules>
<!-- pom.xml -->
<dependencies>
<!-- 用于处理异步连接,也可以直接用 Java 线程池 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<!-- pom.xml -->
<dependencies>
<!-- Protobuf 支持 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.12</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
socket broker
package com.jysemel.iot.socket;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
@Component
public class MockMqttSocketServer {
private ServerSocket serverSocket;
private final ExecutorService clientExecutor = Executors.newCachedThreadPool();
@Value("${socket.server.port:18888}")
private int port;
@PostConstruct
public void start() throws IOException {
serverSocket = new ServerSocket(port);
log.info("普通 Socket Broker 服务端启动,端口:{},模拟 MQTT 发布/订阅", port);
// 单独线程接受连接
new Thread(() -> {
while (!serverSocket.isClosed()) {
try {
Socket socket = serverSocket.accept();
clientExecutor.submit(new ClientHandler(socket));
} catch (IOException e) {
if (!serverSocket.isClosed()) log.warn("接受连接异常", e);
}
}
}).start();
}
@PreDestroy
public void stop() throws IOException {
if (serverSocket != null) serverSocket.close();
clientExecutor.shutdown();
}
}
package com.jysemel.iot.socket;
import com.jysemel.iot.Constant;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ClientHandler implements Runnable {
// 全局订阅表:topic -> Set<PrintWriter>
private static final Map<String, Set<PrintWriter>> SUBSCRIPTIONS = new ConcurrentHashMap<>();
private final Socket socket;
private PrintWriter out;
private BufferedReader in;
private String clientId;
public ClientHandler(Socket socket) {
this.socket = socket;
this.clientId = socket.getRemoteSocketAddress().toString();
}
@Override
public void run() {
MDC.put(Constant.TRACE_ID_KEY, UUID.randomUUID().toString().replace("-", "").substring(0, 16));
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("简易模拟 SOCKET BROKER (MQTT-LIKE)");
log.info("新客户端连接: {}", clientId);
String line;
while ((line = in.readLine()) != null) {
if (line.startsWith("SUBSCRIBE#")) {
handleSubscribe(line);
out.println("SUBSCRIBE");
} else if (line.startsWith("PUBLISH#")) {
handlePublish(line);
out.println("PUBLISH");
} else if (line.equals("PING")) {
out.println("PONG");
} else if (line.equals("DISCONNECT")) {
removeAllSubscriptions();
break;
} else {
out.println("ERROR: Unknown command");
}
}
} catch (IOException e) {
log.warn("客户端{}连接异常: {}", clientId, e.getMessage());
} finally {
removeAllSubscriptions();
closeQuietly();
log.info("客户端{}已断开", clientId);
MDC.clear();
}
}
private void handleSubscribe(String line) {
String topic = line.substring("SUBSCRIBE#".length());
SUBSCRIPTIONS.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()).add(out);
clientId = socket.getRemoteSocketAddress().toString() + ":" + topic;
out.println("SUBACK: subscribed to " + topic);
log.info("客户端 {} 订阅主题: {}", clientId, topic);
}
private void handlePublish(String line) {
// 格式: PUBLISH#topic#payload
String[] parts = line.split("#", 3);
if (parts.length < 3) {
out.println("ERROR: PUBLISH format error");
return;
}
String topic = parts[1];
String payload = parts[2];
log.info("收到发布: topic={}, payload={}", topic, payload);
// 模拟 MQTT 的 QoS 0(最多一次,无确认),直接转发给所有订阅者
Set<PrintWriter> subscribers = SUBSCRIPTIONS.getOrDefault(topic, Collections.emptySet());
if (subscribers.isEmpty()) {
out.println("WARN: no subscribers for " + topic);
} else {
String msg = "MSG#" + topic + "#" + payload;
for (PrintWriter writer : subscribers) {
writer.println(msg);
}
out.println("PUBACK: published to " + subscribers.size() + " clients");
}
}
private void removeAllSubscriptions() {
// 从所有主题中移除当前客户端
SUBSCRIPTIONS.values().forEach(set -> set.remove(out));
log.info("客户端{}断开,已清理订阅", clientId);
}
private void closeQuietly() {
try { if (in != null) in.close(); } catch (IOException ignored) {}
try { if (out != null) out.close(); } catch (Exception ignored) {}
try { if (socket != null) socket.close(); } catch (IOException ignored) {}
}
}
消息发布入口
package com.jysemel.iot.controller;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.jysemel.iot.Constant;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Slf4j
@RestController
public class TestPublishController {
@Value("${mock.client.port:18888}")
private Integer mockPort;
@Value("${mock.client.ip:127.0.0.1}")
private String mockIp;
// 通过 HTTP 触发本机模拟客户端行为,方便验证,但也可以直接用 telnet
@PostMapping("/publish")
public String publish(@RequestBody String message) {
JSONObject jsonObject = JSON.parseObject(message);
MDC.put(Constant.TRACE_ID_KEY, UUID.randomUUID()+"");
try (Socket client = new Socket(mockIp, mockPort);
PrintWriter out = new PrintWriter(client.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()))) {
String welcomeMsg = in.readLine();
log.info("模拟推送报文 {}", jsonObject.toString());
log.info("模拟推送请求端口 {}", client.getPort());
log.info("模拟推送欢迎消息 {}", welcomeMsg);
out.println("PUBLISH#" + jsonObject.getString("topic") + "#" + jsonObject.getString("payload") + "-" + MDC.get(Constant.TRACE_ID_KEY));
String publishResponse = in.readLine();
log.info("模拟推送PUBLISH响应 {}", publishResponse);
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
return "Published: " + publishResponse;
} catch (Exception e) {
log.error("发布消息失败", e);
return "Error: " + e.getMessage();
}
}
}
订阅入口
package com.jysemel.iot.controller;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
@RestController
public class TestSubscribeController {
private static final Map<String, ExecutorService> SUBSCRIBER_POOLS = new ConcurrentHashMap<>();
private static final Map<String, Future<?>> SUBSCRIBER_FUTURES = new ConcurrentHashMap<>();
@Value("${mock.client.port:18888}")
private Integer mockPort;
@Value("${mock.client.ip:127.0.0.1}")
private String mockIp;
@PostMapping("/subscribe")
public String subscribe(@RequestBody String message) {
JSONObject jsonObject = JSON.parseObject(message);
String topic = jsonObject.getString("topic");
String clientId = jsonObject.getString("clientId", "subscriber-" + System.currentTimeMillis());
if (topic == null || topic.isEmpty()) {
return "Error: topic is required";
}
// 如果已经存在相同clientId的订阅,先取消
if (SUBSCRIBER_FUTURES.containsKey(clientId)) {
unsubscribeById(clientId);
}
// 创建新的订阅任务
ExecutorService executor = Executors.newSingleThreadExecutor();
SUBSCRIBER_POOLS.put(clientId, executor);
Future<?> future = executor.submit(() -> {
try (Socket subscriberSocket = new Socket(mockIp, mockPort);
PrintWriter out = new PrintWriter(subscriberSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(subscriberSocket.getInputStream()))) {
String welcomeMsg = in.readLine();
log.info("订阅客户端 {} 连接成功,欢迎消息: {}", clientId, welcomeMsg);
// 发送订阅命令
out.println("SUBSCRIBE#" + topic);
String subAck = in.readLine();
log.info("订阅客户端 {} 收到 SUBACK: {}", clientId, subAck);
// 持续监听消息
String msg;
while ((msg = in.readLine()) != null && !Thread.currentThread().isInterrupted()) {
if (msg.startsWith("MSG#")) {
String[] parts = msg.split("#", 3);
if (parts.length >= 3) {
String msgTopic = parts[1];
String msgPayload = parts[2];
log.info("订阅客户端 {} 收到消息 - Topic: {}, Payload: {}", clientId, msgTopic, msgPayload);
}
} else if (msg.equals("PONG")) {
log.debug("订阅客户端 {} 收到 PONG", clientId);
}
}
} catch (IOException e) {
if (!Thread.currentThread().isInterrupted()) {
log.error("订阅客户端 {} 连接异常", clientId, e);
}
} finally {
log.info("订阅客户端 {} 已断开", clientId);
SUBSCRIBER_POOLS.remove(clientId);
SUBSCRIBER_FUTURES.remove(clientId);
}
});
SUBSCRIBER_FUTURES.put(clientId, future);
return "Subscribed client: " + clientId + " to topic: " + topic;
}
@PostMapping("/unsubscribe")
public String unsubscribe(@RequestBody String message) {
JSONObject jsonObject = JSON.parseObject(message);
String clientId = jsonObject.getString("clientId");
if (clientId == null || clientId.isEmpty()) {
return "Error: clientId is required";
}
return unsubscribeById(clientId);
}
private String unsubscribeById(String clientId) {
Future<?> future = SUBSCRIBER_FUTURES.remove(clientId);
if (future != null) {
future.cancel(true);
}
ExecutorService executor = SUBSCRIBER_POOLS.remove(clientId);
if (executor != null) {
executor.shutdownNow();
}
log.info("取消订阅客户端: {}", clientId);
return "Unsubscribed client: " + clientId;
}
@GetMapping("/subscribers")
public String getSubscribers() {
return "Active subscribers: " + SUBSCRIBER_FUTURES.keySet();
}
}
演示结果
- 注册订阅(http://127.0.0.1:8081/subscribe)
{
"topic":"test",
"payload":"test"
}
- 发布信息(http://127.0.0.1:8081/publish)
{
"topic":"test",
"payload":"test"
}
- 结果

