物联网实战:MQTT平台 | MQTT规则引擎 | 基于sql规则引擎
基于sql规则引擎不是执行数据库 SQL,而是 用 SQL 语法做表达式过滤
MQTT SQL 规则引擎示例项目 (mqtt-rule-sql-sample-00)
源码(mqtt-rule-sql-sample-00)
https://gitee.com/kcnf-iot/mqtt-rule
项目概述
本项目是一个基于 MQTT 和 SQL 的物联网规则引擎示例,实现了一个轻量级的 MQTT Broker 与规则引擎的结合体。支持使用 SQL 语法定义数据处理规则,通过主题匹配和条件评估自动执行相应动作。
核心功能:
- 基于 Netty 的 MQTT Broker 实现
- SQL 语法定义的规则引擎
- 主题通配符匹配(
+和#) - 条件表达式评估(
>,<,=,AND) - 数据持久化(SQLite)
- 规则执行日志
目录结构
mqtt-rule-sql-sample-00/
├── src/
│ ├── main/
│ │ ├── java/com/jysemel/iot/
│ │ │ ├── entity/ # 数据库实体
│ │ │ ├── mqtt/ # MQTT Broker 模块
│ │ │ ├── repository/ # JPA 数据访问
│ │ │ └── rule/ # 规则引擎核心
│ │ │ ├── action/ # 动作调度
│ │ │ ├── cache/ # 规则缓存
│ │ │ ├── executor/ # 规则执行
│ │ │ ├── matcher/ # 匹配器
│ │ │ ├── model/ # 数据模型
│ │ │ ├── parser/ # SQL 解析
│ │ │ └── SqlRuleEngine.java
│ │ └── resources/
│ │ ├── db/ # 数据库初始化脚本
│ │ └── application.yml # 应用配置
│ └── test/ # 测试用例
└── pom.xml
技术架构
核心架构分层
┌─────────────────────────────────────────────────────────┐
│ 消息接入层 (MQTT Broker) │
│ ┌──────────────────┐ ┌─────────────────────────────┐ │
│ │ MqttBrokerServer │ │ MqttBrokerHandler │ │
│ └──────────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 规则引擎层 │
│ ┌──────────────────┐ ┌─────────────────────────────┐ │
│ │ RuleCache │ │ RuleExecutor │ │
│ └──────────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 规则解析层 (Parser) │
│ ┌──────────────────┐ ┌─────────────────────────────┐ │
│ │ SqlRuleParser │ │ TopicMatcher/ConditionEval │ │
│ └──────────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 动作调度层 (Action) │
│ ┌─────────────────────┐ │
│ │ ActionDispatcher │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 数据持久化层 (JPA) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ rule_config / device_data / rule_log │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
数据库设计
表结构
项目使用 SQLite 数据库,定义了 3 张核心表:
1. rule_config(规则配置表)
| 字段 | 类型 | 说明 |
|---|---|---|
| id | INTEGER | 主键,自增 |
| rule_name | VARCHAR(200) | 规则名称 |
| topic_pattern | VARCHAR(500) | 主题匹配表达式 |
| condition_expr | TEXT | 条件表达式 |
| enabled | INTEGER | 启用状态:0-禁用,1-启用 |
| priority | INTEGER | 优先级 |
| created_at | DATETIME | 创建时间 |
| updated_at | DATETIME | 更新时间 |
2. device_data(设备消息表)
| 字段 | 类型 | 说明 |
|---|---|---|
| id | INTEGER | 主键,自增 |
| client_id | VARCHAR(200) | 设备ID |
| topic | VARCHAR(500) | 消息主题 |
| payload | TEXT | 消息内容 |
| matched_rule_id | INTEGER | 匹配的规则ID |
| created_at | DATETIME | 入库时间 |
初始化数据
项目启动时会初始化 3 条示例规则:
-- 规则1: 温度告警 - 温度大于30触发
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('温度告警', 'sensor/temperature/+', 'temperature > 30', 1, 10);
-- 规则2: 设备状态监控 - 离线时触发
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('设备状态监控', 'device/+/status', 'status = ''offline''', 1, 5);
-- 规则3: 湿度数据记录 - 无条件记录所有数据
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('湿度数据记录', 'sensor/humidity/+', '', 1, 1);
核心模块详解
1. MQTT Broker 模块
MqttBrokerServer(Broker 服务器)
负责启动 Netty 服务,监听 1883 端口:
@Component
public class MqttBrokerServer {
private static final int PORT = 1883;
@PostConstruct
public void start() throws InterruptedException {
// 加载规则
sqlRuleEngine.loadRules();
// 启动 Netty 服务
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(...)
.bind(PORT)
.sync();
}
}
MqttBrokerHandler(消息处理器)
处理 MQTT 协议报文:
CONNECT- 处理客户端连接PUBLISH- 处理消息发布(核心流程)SUBSCRIBE- 处理主题订阅UNSUBSCRIBE- 处理取消订阅PINGREQ- 处理心跳DISCONNECT- 处理断开连接
核心 PUBLISH 处理流程:
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
String topic = msg.variableHeader().topicName();
String payloadStr = new String(payload, StandardCharsets.UTF_8);
String clientId = clientIdMap.getOrDefault(ctx.channel(), "unknown");
// 1. 规则引擎处理
sqlRuleEngine.process(topic, clientId, payloadStr);
// 2. 消息转发
subscriptionManager.publishToSubscribers(topic, payload, packetId);
}
SubscriptionManager(订阅管理器)
管理客户端订阅关系,支持主题通配符匹配的消息转发:
public void publishToSubscribers(String topic, byte[] payload, int packetId) {
for (Map.Entry<String, Set<Channel>> entry : subscriptions.entrySet()) {
String filter = entry.getKey();
if (matchTopic(topic, filter)) { // 主题匹配
// 转发消息给订阅者
for (Channel ch : entry.getValue()) {
ch.writeAndFlush(publishMsg.retain());
}
}
}
}
2. 规则引擎模块
SqlRuleEngine(规则引擎入口)
规则引擎的统一入口,协调各模块工作:
@Service
public class SqlRuleEngine {
// 从数据库加载规则
public void loadRules() {
List<RuleConfig> configs = ruleConfigRepository.findAllEnabledRules();
List<SqlRule> rules = configs.stream().map(this::toSqlRule).collect(Collectors.toList());
ruleCache.loadRules(rules);
}
// 处理消息
public void process(String topic, String clientId, String payload) {
// 规则匹配与执行
List<RuleMatchResult> results = ruleExecutor.execute(topic, payload);
// 动作调度
actionDispatcher.dispatch(results, clientId);
}
}
RuleCache(规则缓存)
缓存已启用的规则并预解析 SQL 条件表达式:
public class RuleCache {
private final Map<Long, SqlRule> rules = new ConcurrentHashMap<>();
private final Map<Long, RuleParseResult> parseResults = new ConcurrentHashMap<>();
public void loadRules(List<SqlRule> sqlRules) {
for (SqlRule rule : sqlRules) {
if (rule.getEnabled() == 1) {
rules.put(rule.getId(), rule);
// 预解析条件表达式
if (rule.getConditionExpr() != null && !rule.getConditionExpr().isEmpty()) {
Expression whereExpr = SqlRuleParser.parseCondition(rule.getConditionExpr());
parseResults.put(rule.getId(), RuleParseResult.builder().whereExpr(whereExpr).build());
}
}
}
}
}
RuleExecutor(规则执行器)
执行规则匹配,核心流程分 3 步:
public List<RuleMatchResult> execute(String topic, String payload) {
List<RuleMatchResult> results = new ArrayList<>();
for (SqlRule rule : ruleCache.getRules().values()) {
RuleMatchResult result = executeSingleRule(rule, topic, payload);
if (result.isMatched()) {
results.add(result);
}
}
return results;
}
private RuleMatchResult executeSingleRule(SqlRule rule, String topic, String payload) {
// 1. 主题匹配 (FROM)
if (!TopicMatcher.match(topic, rule.getTopicPattern())) {
return RuleMatchResult.builder().matched(false).build();
}
// 2. 条件评估 (WHERE)
boolean conditionMatched = ConditionEvaluator.evaluate(jsonMsg, parseResult.getWhereExpr());
// 3. 字段提取 (SELECT)
Map<String, Object> extractedFields = ConditionEvaluator.extractFields(jsonMsg, selectFields);
return RuleMatchResult.builder()
.matched(conditionMatched)
.extractedFields(extractedFields)
.build();
}
3. 规则解析模块
SqlRuleParser(SQL 解析器)
解析完整 SQL 语句或仅解析 WHERE 条件:
public static RuleParseResult parse(String sql) {
Select select = (Select) CCJSqlParserUtil.parse(sql);
PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
String fromTopic = plainSelect.getFromItem().toString();
Expression whereExpr = plainSelect.getWhere();
List<String> selectFields = extractSelectFields(plainSelect);
return RuleParseResult.builder()
.fromTopic(fromTopic)
.whereExpr(whereExpr)
.selectFields(selectFields)
.build();
}
public static Expression parseCondition(String conditionExpr) {
return CCJSqlParserUtil.parseCondExpression(conditionExpr);
}
TopicMatcher(主题匹配器)
支持 MQTT 主题通配符匹配:
+- 匹配当前层级任意值,如sensor/+/temperature#- 匹配剩余所有层级,如sensor/#
public static boolean match(String msgTopic, String topicPattern) {
String[] msgLevels = msgTopic.split("/");
String[] filterLevels = topicPattern.split("/");
for (int i = 0; i < filterLevels.length; i++) {
if ("#".equals(filterLevels[i])) {
return true; // # 匹配剩余所有
} else if ("+".equals(filterLevels[i])) {
continue; // + 匹配当前层级
} else if (!filterLevels[i].equals(msgLevels[i])) {
return false;
}
}
return msgLevels.length == filterLevels.length;
}
ConditionEvaluator(条件评估器)
评估条件表达式,支持:
- 比较操作:
>,<,= - 逻辑操作:
AND
public static boolean evaluate(JsonNode jsonMsg, Expression whereExpr) {
if (whereExpr instanceof GreaterThan) {
return evaluateGreaterThan(jsonMsg, (GreaterThan) whereExpr);
} else if (whereExpr instanceof MinorThan) {
return evaluateMinorThan(jsonMsg, (MinorThan) whereExpr);
} else if (whereExpr instanceof EqualsTo) {
return evaluateEqualsTo(jsonMsg, (EqualsTo) whereExpr);
} else if (whereExpr instanceof AndExpression) {
return evaluate(left) && evaluate(right);
}
return false;
}
4. 动作调度模块
ActionDispatcher(动作调度器)
执行规则匹配成功后的动作:
public void dispatch(List<RuleMatchResult> results, String clientId) {
for (RuleMatchResult result : results) {
if (result.isMatched()) {
executeActions(result, clientId);
}
}
}
private void executeActions(RuleMatchResult result, String clientId) {
// 动作1: 保存消息记录
saveDeviceData(result, clientId);
// 动作2: 转发消息
publishMessage(result);
}
核心流程
消息处理完整流程
设备发布消息
↓
MqttBrokerHandler.handlePublish()
↓
SqlRuleEngine.process(topic, clientId, payload)
↓
RuleExecutor.execute()
├─→ TopicMatcher.match() // 1. 主题匹配
├─→ ConditionEvaluator.evaluate() // 2. 条件评估
└─→ ConditionEvaluator.extractFields() // 3. 字段提取
↓
ActionDispatcher.dispatch()
├─→ 保存 device_data
├─→ 转发消息
└─→ 记录 rule_log
↓
SubscriptionManager.publishToSubscribers()
↓
订阅者收到消息
使用示例
示例 1: 温度告警
规则配置:
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('温度告警', 'sensor/temperature/+', 'temperature > 30', 1, 10);
测试数据:
{
"temperature": 35,
"deviceId": "dev001",
"humidity": 60
}
发布主题: sensor/temperature/dev001
执行结果:
- 主题匹配:
sensor/temperature/dev001匹配sensor/temperature/+✓ - 条件评估:
temperature > 30→ 35 > 30 → true ✓ - 规则触发,保存数据到数据库
示例 2: 设备状态监控
规则配置:
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('设备状态监控', 'device/+/status', 'status = ''offline''', 1, 5);
测试数据:
{
"status": "offline",
"deviceId": "dev001",
"timestamp": 1780295586732
}
发布主题: device/dev001/status
执行结果:
- 主题匹配:
device/dev001/status匹配device/+/status✓ - 条件评估:
status = 'offline'→ true ✓ - 规则触发
示例 3: 湿度数据记录(无条件)
规则配置:
INSERT INTO rule_config (rule_name, topic_pattern, condition_expr, enabled, priority)
VALUES ('湿度数据记录', 'sensor/humidity/+', '', 1, 1);
测试数据:
{
"humidity": 55,
"deviceId": "dev001"
}
发布主题: sensor/humidity/dev001
执行结果:
- 主题匹配:✓
- 无条件,自动匹配 ✓
- 所有数据都保存到数据库
测试模块
项目提供了完整的单元测试,覆盖所有核心功能:
配置说明
application.yml
server:
port: 8084
spring:
application:
name: mqtt-rule-sql-sample-00
datasource:
url: jdbc:sqlite:data/rule.db
driver-class-name: org.sqlite.JDBC
jpa:
database-platform: org.hibernate.community.dialect.SQLiteDialect
hibernate:
ddl-auto: update
show-sql: true
扩展建议
1. 增强条件评估
- 支持更多操作符:
<=,>=,!=,OR,NOT - 支持字符串操作:
LIKE,IN - 支持函数调用:
ABS(),ROUND()
2. 增强动作调度
- 支持自定义动作类型
- 支持动作配置化(JSON)
- 支持 Webhook 通知
3. 规则管理
- 提供 REST API 管理规则
- 支持规则热更新
- 支持规则版本管理
4. 性能优化
- 规则分组(按主题前缀)
- 规则优先级排序
- 异步规则执行
总结
本项目展示了一个清晰、可扩展的 MQTT 规则引擎架构,通过分层设计实现了高内聚低耦合。核心特点:
- 职责清晰:Broker 只负责 MQTT 协议,规则引擎专注于规则处理
- 可扩展:各模块通过接口解耦,易于扩展
- 易测试:提供完整的单元测试和集成测试
- 轻量级:基于 SQLite,无需复杂的数据库配置
适合作为物联网数据处理、设备监控、告警系统的基础框架。
