规则引擎 - MrHKing/mmqtt GitHub Wiki
通过SQL进行规则转发
- 选择发布到topic/#的消息,然后选择所有字段:
SELECT * FROM "topic/#"
- 查询专门字段SQL
SELECT this.payload.value, this.payload.deviceName FROM "topic/#"
- 条件查询SQL
SELECT this.payload.value, this.payload.deviceName FROM "topic/#" WHERE this.payload.deviceName like 'abc%'
- 滚动窗口
- 滑动窗口
- MySql\Sqlserver\Postgresql\Tdengine 插入数据库模板通用字段
属性 | 说明 |
---|---|
uuid | uuid字符串 |
date | yyyy-MM-dd格式日期 |
datetime | yyyy-MM-dd HH:mm:ss格式日期 |
utc | yyyy-MM-dd'T'HH:mm:ssZ格式日期 |
timestamp | long 格式日期 |
username | 发送消息的账户 |
topic | 发送topic |
topic[no] | 例:/topic1/topic2/topic3/... |
INSERT INTO test (uuid, date, datetime, utc, timestamp, topic, name) VALUES ('<json.uuid>',<json.date>,<json.datetime>,<json.utc>,<json.timestamp>,<json.topic>,<json.topic1>)
- 添加kafka资源
- 添加规则引擎
SELECT * FROM "topic/#"
- 测试
- Topic: topic/test
- MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
- 插入结果
{"address":"","qos":0,"payload":{"msg":{"name":"test","value":"12321"}},"topic":"topic/test"}
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
规则SQL Demo:
SELECT * FROM "topic/#"
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('<json.msg.name>','<json.msg.value>')
MQTT 客户端上传数组Json Demo 如下:
{
"device": "sss-sss",
"list": [{
"name": "test",
"value": "12321"
}, {
"name": "test1",
"value": "222"
}]
}
规则SQL Demo:
SELECT * FROM "topic/#"
Mysql SQL Demo如下:
<json.list:{x | INSERT INTO my_test (name,value,device) VALUES ('<x.name>','<x.value>','<json.device>');}>
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
规则SQL Demo:
SELECT * FROM "topic/#"
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('<json.msg.name>','<json.msg.value>')
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
规则SQL Demo:
SELECT * FROM "topic/#"
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('<json.msg.name>','<json.msg.value>'})
MQTT 客户端上传 Json Demo 如下:
{
"device": "sss-sss",
"list": [{
"name": "test",
"value": "12321"
}, {
"name": "test1",
"value": "222"
}]
}
规则SQL Demo:
SELECT * FROM "topic/#"
influx Demo如下:
[<json.list:{x | {"measurement": h2o_feet, "tags": {"device": "<json.deivce>", "name": "<x.name>"},"fields": { "value": "<x.value>"}}, }>]