砍材农夫砍材农夫
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • 环境配置
    • 微信生态
    • 正则
    • 全栈技能
  • java圈

    • java基础
    • jdk体系
    • jvm
    • spring框架
    • 分库分表
    • zookeeper
  • python技能

    • python编程
    • python数据
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • devops
  • 元宇宙
  • 区块链
  • 物联网
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 微信记账小程序
  • java
  • redis
  • mysql
  • 场景类
  • 框架类
  • vuepress搭建
  • hexo搭建
  • 云图
  • llm wiki

    • 基于karpathy
    • gradle
  • 常用工具

    • git
    • gradle
    • Zadig
    • it-tools
    • 开源推荐
    • curl
  • 大前端

    • 环境配置
    • 微信生态
    • 正则
    • 全栈技能
  • java圈

    • java基础
    • jdk体系
    • jvm
    • spring框架
    • 分库分表
    • zookeeper
  • python技能

    • python编程
    • python数据
  • 算法

    • 算法
  • 网关

    • spring_cloud_gateway
    • openresty
  • 高可用

    • 秒杀
    • 分布式
    • 缓存一致
  • MQ

    • MQ
    • rabbitMQ
    • rocketMQ
    • kafka
  • 其它

    • 设计模式
    • 领域驱动(ddd)
  • 关系型数据库

    • mysql5.0
    • mysql8.0
  • 非关系型数据库

    • redis
    • mongoDB
  • 分布式/其他

    • ShardingSphere
    • 区块链
  • 向量数据库

    • M3E
    • OPEN AI
  • Jmeter
  • fiddler
  • wireshark
  • AI入门
  • AI大模型
  • AI插件
  • AI集成框架
  • 相关算法
  • AI训练师
  • 量化交易
  • AIoT
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 相关运营
  • devops
  • 元宇宙
  • 区块链
  • 物联网
  • webrtc
  • web3.0
  • gitee
  • github
  • infoq
  • osc
  • 砍材工具
  • 关于
  • 中考
  • 投资
  • 保险
  • 思
  • 首页
    • 开发板介绍
    • micropython环境搭建
    • esp32开发板
    • 面包板
    • 万能表使用
  • 面包板
    • 点灯
  • esp32
    • 点亮开发板led灯
    • 点亮外接led
    • 点亮外接oled文字
    • 红外传感器
    • 红外传感器+olde
    • esp32+面包板
  • MQTT编程

    • MQTT入门

      • 物联网 MQTT
      • 物联网 MQTT和Socket
      • 物联网 MQTT订阅性能优势
      • 物联网 MQTT简易版Broker
    • HiveMQ

      • hivemq实战入门
    • Protobuf

      • Protobuf入门
      • Protobuf入门+梳理
      • Protobuf实战第一篇
    • emqx

      • emqx入门
    • mica

      • mica入门
    • netty

      • 入门

        • 基于netty构建入门
        • 理解粘包/拆包
        • 编解码器机制与自定义协议
        • 心跳和ack机制
        • mqtt服务demo演示
        • mqtt服务协议支持
        • mqtt服务udp支持
      • 协议规范

        • mqtt协议规范(发布/订阅模式)
        • mqtt协议规范(轻量级二进制协议)
        • mqtt协议规范(三种 QoS 等级)
        • mqtt协议规范(主题通配符订阅)
        • mqtt协议规范(遗嘱与保留消息)
      • 报文结构

        • 控制报文结构(报文分类)
        • 控制报文结构(连接与握手)
        • 控制报文结构(发布与接收)
      • 核心实战

        • 核心实战(握手与认证)
        • 核心实战(心跳保活机制)
        • 核心实战(会话管理)
        • 核心实战(安全)
    • mqtt-模拟器

      • 集成Paho

        • 设备模拟器设计
        • 设备模拟器演示
        • Paho拆解入门
        • Paho拆解核心
        • Paho拆解高性能
        • 其他客户端框架比较
      • NetAssist

        • 设备模拟器设计
    • netty-mqtt-boot

      • 模块化设计
      • 统一接入层
      • 消息路由与流转层
      • 核心服务层
      • 业务应用层
      • 整体项目管理
      • 测试脚手架
      • 兼容支持
    • mqtt-压测

      • mqtt-jmeter

        • 模块化设计
      • ‌emqtt-bench

        • 模块化设计
    • mqtt-规则引擎

      • MQTT规则引擎
      • sql

        • 基于sql规则
      • ice

        • 模块化设计
      • Aviator

        • 模块化设计
      • Drools

        • 模块化设计
  • 物联网实战:MQTT平台 | MQTT规则引擎 | 基于sql规则引擎
  • MQTT SQL 规则引擎示例项目 (mqtt-rule-sql-sample-00)
    • 源码(mqtt-rule-sql-sample-00)
    • 项目概述
    • 目录结构
    • 技术架构
      • 核心架构分层
    • 数据库设计
      • 表结构
        • 1. rule_config(规则配置表)
        • 2. device_data(设备消息表)
      • 初始化数据
    • 核心模块详解
      • 1. MQTT Broker 模块
        • MqttBrokerServer(Broker 服务器)
        • MqttBrokerHandler(消息处理器)
        • SubscriptionManager(订阅管理器)
      • 2. 规则引擎模块
        • SqlRuleEngine(规则引擎入口)
        • RuleCache(规则缓存)
        • RuleExecutor(规则执行器)
      • 3. 规则解析模块
        • SqlRuleParser(SQL 解析器)
        • TopicMatcher(主题匹配器)
        • ConditionEvaluator(条件评估器)
      • 4. 动作调度模块
        • ActionDispatcher(动作调度器)
    • 核心流程
      • 消息处理完整流程
    • 使用示例
      • 示例 1: 温度告警
      • 示例 2: 设备状态监控
      • 示例 3: 湿度数据记录(无条件)
    • 测试模块
    • 配置说明
    • 扩展建议
      • 1. 增强条件评估
      • 2. 增强动作调度
      • 3. 规则管理
      • 4. 性能优化
    • 总结

物联网实战:MQTT平台 | MQTT规则引擎 | 基于sql规则引擎

基于sql规则引擎不是执行数据库 SQL,而是 用 SQL 语法做表达式过滤

MQTT SQL 规则引擎示例项目 (mqtt-rule-sql-sample-00)

源码(mqtt-rule-sql-sample-00)

https://gitee.com/kcnf-iot/mqtt-rule

项目概述

本项目是一个基于 MQTT 和 SQL 的物联网规则引擎示例,实现了一个轻量级的 MQTT Broker 与规则引擎的结合体。支持使用 SQL 语法定义数据处理规则,通过主题匹配和条件评估自动执行相应动作。

核心功能:

  • 基于 Netty 的 MQTT Broker 实现
  • SQL 语法定义的规则引擎
  • 主题通配符匹配(+ 和 #)
  • 条件表达式评估(>, <, =, AND)
  • 数据持久化(SQLite)
  • 规则执行日志

目录结构

mqtt-rule-sql-sample-00/
├── src/
│   ├── main/
│   │   ├── java/com/jysemel/iot/
│   │   │   ├── entity/              # 数据库实体
│   │   │   ├── mqtt/                # MQTT Broker 模块
│   │   │   ├── repository/          # JPA 数据访问
│   │   │   └── rule/                # 规则引擎核心
│   │   │       ├── action/          # 动作调度
│   │   │       ├── cache/           # 规则缓存
│   │   │       ├── executor/        # 规则执行
│   │   │       ├── matcher/         # 匹配器
│   │   │       ├── model/           # 数据模型
│   │   │       ├── parser/          # SQL 解析
│   │   │       └── SqlRuleEngine.java
│   │   └── resources/
│   │       ├── db/                  # 数据库初始化脚本
│   │       └── application.yml      # 应用配置
│   └── test/                        # 测试用例
└── pom.xml

技术架构

核心架构分层

┌─────────────────────────────────────────────────────────┐
│                   消息接入层 (MQTT Broker)               │
│  ┌──────────────────┐  ┌─────────────────────────────┐  │
│  │ MqttBrokerServer │  │    MqttBrokerHandler        │  │
│  └──────────────────┘  └─────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘
                            ↓
┌─────────────────────────────────────────────────────────┐
│                   规则引擎层                             │
│  ┌──────────────────┐  ┌─────────────────────────────┐  │
│  │   RuleCache      │  │      RuleExecutor           │  │
│  └──────────────────┘  └─────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘
                            ↓
┌─────────────────────────────────────────────────────────┐
│                 规则解析层 (Parser)                      │
│  ┌──────────────────┐  ┌─────────────────────────────┐  │
│  │ SqlRuleParser    │  │  TopicMatcher/ConditionEval │  │
│  └──────────────────┘  └─────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘
                            ↓
┌─────────────────────────────────────────────────────────┐
│                  动作调度层 (Action)                     │
│              ┌─────────────────────┐                     │
│              │  ActionDispatcher   │                     │
│              └─────────────────────┘                     │
└─────────────────────────────────────────────────────────┘
                            ↓
┌─────────────────────────────────────────────────────────┐
│                   数据持久化层 (JPA)                     │
│  ┌──────────────────────────────────────────────────┐  │
│  │  rule_config / device_data / rule_log            │  │
│  └──────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘

数据库设计

表结构

项目使用 SQLite 数据库,定义了 3 张核心表:

1. rule_config(规则配置表)

字段类型说明
idINTEGER主键,自增
rule_nameVARCHAR(200)规则名称
topic_patternVARCHAR(500)主题匹配表达式
condition_exprTEXT条件表达式
enabledINTEGER启用状态:0-禁用,1-启用
priorityINTEGER优先级
created_atDATETIME创建时间
updated_atDATETIME更新时间

2. device_data(设备消息表)

字段类型说明
idINTEGER主键,自增
client_idVARCHAR(200)设备ID
topicVARCHAR(500)消息主题
payloadTEXT消息内容
matched_rule_idINTEGER匹配的规则ID
created_atDATETIME入库时间

初始化数据

项目启动时会初始化 3 条示例规则:

-- 规则1: 温度告警 - 温度大于30触发
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('温度告警', 'sensor/temperature/+', 'temperature > 30', 1, 10);

-- 规则2: 设备状态监控 - 离线时触发
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('设备状态监控', 'device/+/status', 'status = ''offline''', 1, 5);

-- 规则3: 湿度数据记录 - 无条件记录所有数据
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('湿度数据记录', 'sensor/humidity/+', '', 1, 1);

核心模块详解

1. MQTT Broker 模块

MqttBrokerServer(Broker 服务器)

负责启动 Netty 服务,监听 1883 端口:

@Component
public class MqttBrokerServer {
    private static final int PORT = 1883;

    @PostConstruct
    public void start() throws InterruptedException {
        // 加载规则
        sqlRuleEngine.loadRules();
        
        // 启动 Netty 服务
        new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(...)
            .bind(PORT)
            .sync();
    }
}

MqttBrokerHandler(消息处理器)

处理 MQTT 协议报文:

  • CONNECT - 处理客户端连接
  • PUBLISH - 处理消息发布(核心流程)
  • SUBSCRIBE - 处理主题订阅
  • UNSUBSCRIBE - 处理取消订阅
  • PINGREQ - 处理心跳
  • DISCONNECT - 处理断开连接

核心 PUBLISH 处理流程:

private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
    String topic = msg.variableHeader().topicName();
    String payloadStr = new String(payload, StandardCharsets.UTF_8);
    String clientId = clientIdMap.getOrDefault(ctx.channel(), "unknown");

    // 1. 规则引擎处理
    sqlRuleEngine.process(topic, clientId, payloadStr);
    // 2. 消息转发
    subscriptionManager.publishToSubscribers(topic, payload, packetId);
}

SubscriptionManager(订阅管理器)

管理客户端订阅关系,支持主题通配符匹配的消息转发:

public void publishToSubscribers(String topic, byte[] payload, int packetId) {
    for (Map.Entry<String, Set<Channel>> entry : subscriptions.entrySet()) {
        String filter = entry.getKey();
        if (matchTopic(topic, filter)) { // 主题匹配
            // 转发消息给订阅者
            for (Channel ch : entry.getValue()) {
                ch.writeAndFlush(publishMsg.retain());
            }
        }
    }
}

2. 规则引擎模块

SqlRuleEngine(规则引擎入口)

规则引擎的统一入口,协调各模块工作:

@Service
public class SqlRuleEngine {
    // 从数据库加载规则
    public void loadRules() {
        List<RuleConfig> configs = ruleConfigRepository.findAllEnabledRules();
        List<SqlRule> rules = configs.stream().map(this::toSqlRule).collect(Collectors.toList());
        ruleCache.loadRules(rules);
    }

    // 处理消息
    public void process(String topic, String clientId, String payload) {
        // 规则匹配与执行
        List<RuleMatchResult> results = ruleExecutor.execute(topic, payload);
        // 动作调度
        actionDispatcher.dispatch(results, clientId);
    }
}

RuleCache(规则缓存)

缓存已启用的规则并预解析 SQL 条件表达式:

public class RuleCache {
    private final Map<Long, SqlRule> rules = new ConcurrentHashMap<>();
    private final Map<Long, RuleParseResult> parseResults = new ConcurrentHashMap<>();

    public void loadRules(List<SqlRule> sqlRules) {
        for (SqlRule rule : sqlRules) {
            if (rule.getEnabled() == 1) {
                rules.put(rule.getId(), rule);
                // 预解析条件表达式
                if (rule.getConditionExpr() != null && !rule.getConditionExpr().isEmpty()) {
                    Expression whereExpr = SqlRuleParser.parseCondition(rule.getConditionExpr());
                    parseResults.put(rule.getId(), RuleParseResult.builder().whereExpr(whereExpr).build());
                }
            }
        }
    }
}

RuleExecutor(规则执行器)

执行规则匹配,核心流程分 3 步:

public List<RuleMatchResult> execute(String topic, String payload) {
    List<RuleMatchResult> results = new ArrayList<>();
    for (SqlRule rule : ruleCache.getRules().values()) {
        RuleMatchResult result = executeSingleRule(rule, topic, payload);
        if (result.isMatched()) {
            results.add(result);
        }
    }
    return results;
}

private RuleMatchResult executeSingleRule(SqlRule rule, String topic, String payload) {
    // 1. 主题匹配 (FROM)
    if (!TopicMatcher.match(topic, rule.getTopicPattern())) {
        return RuleMatchResult.builder().matched(false).build();
    }
    
    // 2. 条件评估 (WHERE)
    boolean conditionMatched = ConditionEvaluator.evaluate(jsonMsg, parseResult.getWhereExpr());
    
    // 3. 字段提取 (SELECT)
    Map<String, Object> extractedFields = ConditionEvaluator.extractFields(jsonMsg, selectFields);
    
    return RuleMatchResult.builder()
        .matched(conditionMatched)
        .extractedFields(extractedFields)
        .build();
}

3. 规则解析模块

SqlRuleParser(SQL 解析器)

解析完整 SQL 语句或仅解析 WHERE 条件:

public static RuleParseResult parse(String sql) {
    Select select = (Select) CCJSqlParserUtil.parse(sql);
    PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
    
    String fromTopic = plainSelect.getFromItem().toString();
    Expression whereExpr = plainSelect.getWhere();
    List<String> selectFields = extractSelectFields(plainSelect);
    
    return RuleParseResult.builder()
        .fromTopic(fromTopic)
        .whereExpr(whereExpr)
        .selectFields(selectFields)
        .build();
}

public static Expression parseCondition(String conditionExpr) {
    return CCJSqlParserUtil.parseCondExpression(conditionExpr);
}

TopicMatcher(主题匹配器)

支持 MQTT 主题通配符匹配:

  • + - 匹配当前层级任意值,如 sensor/+/temperature
  • # - 匹配剩余所有层级,如 sensor/#
public static boolean match(String msgTopic, String topicPattern) {
    String[] msgLevels = msgTopic.split("/");
    String[] filterLevels = topicPattern.split("/");
    
    for (int i = 0; i < filterLevels.length; i++) {
        if ("#".equals(filterLevels[i])) {
            return true; // # 匹配剩余所有
        } else if ("+".equals(filterLevels[i])) {
            continue; // + 匹配当前层级
        } else if (!filterLevels[i].equals(msgLevels[i])) {
            return false;
        }
    }
    return msgLevels.length == filterLevels.length;
}

ConditionEvaluator(条件评估器)

评估条件表达式,支持:

  • 比较操作:>, <, =
  • 逻辑操作:AND
public static boolean evaluate(JsonNode jsonMsg, Expression whereExpr) {
    if (whereExpr instanceof GreaterThan) {
        return evaluateGreaterThan(jsonMsg, (GreaterThan) whereExpr);
    } else if (whereExpr instanceof MinorThan) {
        return evaluateMinorThan(jsonMsg, (MinorThan) whereExpr);
    } else if (whereExpr instanceof EqualsTo) {
        return evaluateEqualsTo(jsonMsg, (EqualsTo) whereExpr);
    } else if (whereExpr instanceof AndExpression) {
        return evaluate(left) && evaluate(right);
    }
    return false;
}

4. 动作调度模块

ActionDispatcher(动作调度器)

执行规则匹配成功后的动作:

public void dispatch(List<RuleMatchResult> results, String clientId) {
    for (RuleMatchResult result : results) {
        if (result.isMatched()) {
            executeActions(result, clientId);
        }
    }
}

private void executeActions(RuleMatchResult result, String clientId) {
    // 动作1: 保存消息记录
    saveDeviceData(result, clientId);
    // 动作2: 转发消息
    publishMessage(result);
}

核心流程

消息处理完整流程

设备发布消息
    ↓
MqttBrokerHandler.handlePublish()
    ↓
SqlRuleEngine.process(topic, clientId, payload)
    ↓
RuleExecutor.execute()
    ├─→ TopicMatcher.match()          // 1. 主题匹配
    ├─→ ConditionEvaluator.evaluate() // 2. 条件评估
    └─→ ConditionEvaluator.extractFields() // 3. 字段提取
    ↓
ActionDispatcher.dispatch()
    ├─→ 保存 device_data
    ├─→ 转发消息
    └─→ 记录 rule_log
    ↓
SubscriptionManager.publishToSubscribers()
    ↓
订阅者收到消息

使用示例

示例 1: 温度告警

规则配置:

INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('温度告警', 'sensor/temperature/+', 'temperature > 30', 1, 10);

测试数据:

{
  "temperature": 35,
  "deviceId": "dev001",
  "humidity": 60
}

发布主题: sensor/temperature/dev001

执行结果:

  1. 主题匹配:sensor/temperature/dev001 匹配 sensor/temperature/+ ✓
  2. 条件评估:temperature > 30 → 35 > 30 → true ✓
  3. 规则触发,保存数据到数据库

示例 2: 设备状态监控

规则配置:

INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('设备状态监控', 'device/+/status', 'status = ''offline''', 1, 5);

测试数据:

{
  "status": "offline",
  "deviceId": "dev001",
  "timestamp": 1780295586732
}

发布主题: device/dev001/status

执行结果:

  1. 主题匹配:device/dev001/status 匹配 device/+/status ✓
  2. 条件评估:status = 'offline' → true ✓
  3. 规则触发

示例 3: 湿度数据记录(无条件)

规则配置:

INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('湿度数据记录', 'sensor/humidity/+', '', 1, 1);

测试数据:

{
  "humidity": 55,
  "deviceId": "dev001"
}

发布主题: sensor/humidity/dev001

执行结果:

  1. 主题匹配:✓
  2. 无条件,自动匹配 ✓
  3. 所有数据都保存到数据库

测试模块

项目提供了完整的单元测试,覆盖所有核心功能:


配置说明

application.yml

server:
  port: 8084

spring:
  application:
    name: mqtt-rule-sql-sample-00
  datasource:
    url: jdbc:sqlite:data/rule.db
    driver-class-name: org.sqlite.JDBC
  jpa:
    database-platform: org.hibernate.community.dialect.SQLiteDialect
    hibernate:
      ddl-auto: update
    show-sql: true

扩展建议

1. 增强条件评估

  • 支持更多操作符:<=, >=, !=, OR, NOT
  • 支持字符串操作:LIKE, IN
  • 支持函数调用:ABS(), ROUND()

2. 增强动作调度

  • 支持自定义动作类型
  • 支持动作配置化(JSON)
  • 支持 Webhook 通知

3. 规则管理

  • 提供 REST API 管理规则
  • 支持规则热更新
  • 支持规则版本管理

4. 性能优化

  • 规则分组(按主题前缀)
  • 规则优先级排序
  • 异步规则执行

总结

本项目展示了一个清晰、可扩展的 MQTT 规则引擎架构,通过分层设计实现了高内聚低耦合。核心特点:

  1. 职责清晰:Broker 只负责 MQTT 协议,规则引擎专注于规则处理
  2. 可扩展:各模块通过接口解耦,易于扩展
  3. 易测试:提供完整的单元测试和集成测试
  4. 轻量级:基于 SQLite,无需复杂的数据库配置

适合作为物联网数据处理、设备监控、告警系统的基础框架。

最近更新: 2026/6/8 09:07
Contributors: kcnf