規(guī)則引擎包含場景聯(lián)動和規(guī)則腳本,由場景表/腳本表/場景腳本表和場景設(shè)備表組成。場景表存儲場景基本信息和規(guī)則鏈,腳本表存儲腳本信息和腳本數(shù)據(jù),場景腳本記錄觸發(fā)器和執(zhí)行動作和腳本表對應(yīng),場景設(shè)備表記錄場景關(guān)聯(lián)的所有設(shè)備和產(chǎn)品。
場景聯(lián)動
一、場景規(guī)則鏈:
規(guī)則數(shù)據(jù)示例:
IF(AND(T1,T2,T3),THEN(A1,A2,A3))
IF(OR(T1,T2,T3),WHEN(A1,A2,A3))
說明:
AND OR 中必須包含兩個以上組件
NOT 只能包含一個觸發(fā)器
THEN 串行,順序執(zhí)行
WHEN 并行,同時執(zhí)行
二、場景腳本格式:
String json ={
"cond": 3,
"delay": 3,
"deviceNums": "D1ELV3A5TOJS",
"id": "temperature",
"operator": "between",
"productId": 41,
"purpose": 2,
"sceneId": 63,
"scriptId": "T1754931118680969216",
"silent": 1,
"source": 1,
"type": 1,
"value": "10-50"
};
sceneContext.process(json);
關(guān)鍵字段說明:
- source: 1=設(shè)備觸發(fā) 3=產(chǎn)品觸發(fā) 4=告警
- type: 1=屬性, 2=功能,3=事件,4=設(shè)備升級,5=設(shè)備上線,6=設(shè)備下線
- delay: 延時,例如等于1表示延時1秒執(zhí)行,不存儲的定時任務(wù)
- silent: 靜默時間,例如等于1表示靜默一分鐘,設(shè)備一分鐘內(nèi)只會上報第一次的執(zhí)行動作(包含告警),使用redis存儲。
- purpose:腳本用途,1=數(shù)據(jù)流(上報/下發(fā)),2=觸發(fā)器,3=執(zhí)行動作
- cond:觸發(fā)條件,1=任意條件,2=所有條件,3=不滿足
- operator:操作符,大于/等于/小于/包含/不包含/在...之間等
- scriptId:腳本ID,系統(tǒng)生成D開頭為數(shù)據(jù)流,T開頭為觸發(fā)器,A開頭為執(zhí)行動作
三、場景腳本對象
分為觸發(fā)器對象和執(zhí)行動作對象,執(zhí)行動作必填字段少于觸發(fā)器。執(zhí)行動作中的物模型不包含只讀和監(jiān)測數(shù)據(jù)。
觸發(fā)器所需字段:
productId: 0,
productName: '',
deviceCount: 0,
deviceNums: [],
source: 1, // 觸發(fā)源,1=設(shè)備,2=定時,3=產(chǎn)品
type: 1, // 類型,1=屬性, 2=功能,3=事件,4=設(shè)備升級,5=設(shè)備上線,6=設(shè)備下線
parentId: '', // 物模父級id
parentName: '',
parentModel: null, // 父級物模型,除對象類型和對象數(shù)組類型外的普通類型,父級物模型與物模型相同
model: null, // 物模型
operator: '=',
id: '',
name: '',
value: '', // between操作符時,值=值A(chǔ)-值B
valueA: '',
valueB: '',
arrayIndex: '', // 索引,數(shù)組才有
arrayIndexName: '',
isAdvance: 0, // 自定義CRON
cronExpression: '', // cron表達(dá)式
timerTimeValue: '', // 時間
timerWeekValue: [1, 2, 3, 4, 5, 6, 7], // 星期
執(zhí)行動作所需字段:
productId: 0,
productName: '',
deviceCount: 0,
deviceNums: [],
source: 4, // 1=設(shè)備,3=產(chǎn)品,4=告警
type: 2, // 類型
parentId: '', // 物模父級id
parentName: '',
parentModel: null, // 父級物模型,除對象類型和對象數(shù)組類型外的普通類型,父級物模型與物模型相同
model: null, // 物模型
id: '',
name: '',
value: '',
arrayIndex: '', // 索引,數(shù)組才有
arrayIndexName: '',
規(guī)則腳本
一、腳本語言
平臺腳本語言目前使用Groovy,語法跟Java類似,差異如下:
Groovy | Java |
---|---|
編譯為JVM字節(jié)碼,兼容Java平臺 | 在JDK上開發(fā)并在JVM上運(yùn)行 |
用作編程和腳本語言 | 用作編程和面向?qū)ο蟮恼Z言 |
默認(rèn)訪問修飾符 public | 默認(rèn)訪問修飾符包 |
為類成員自動生成 getter 和 setter | 需要為字段提供 getter 和 setter 方法,特別是如果遵循 Java Beans命名約定 |
分號是可選的 | 分號是強(qiáng)制性的 |
默認(rèn)導(dǎo)入常用包 | 默認(rèn)只導(dǎo)入Java.lang.* 包 |
一切都是對象并且僅使用對象,因此沒有自動裝箱或拆箱的概念 | 具有原始數(shù)據(jù)類型和包裝類來隱式或顯式執(zhí)行裝箱和拆箱 |
不需要任何 main 方法或方法的入口點(diǎn)來運(yùn)行類或任何程序 | 需要類內(nèi)部的 main 方法來運(yùn)行程序 |
二、消息轉(zhuǎn)發(fā)
文檔將從下述步驟簡單介紹消息轉(zhuǎn)發(fā)過程:
- 消息轉(zhuǎn)發(fā)上下文執(zhí)行方法:腳本執(zhí)行后端代碼定義的方法類
- 執(zhí)行規(guī)則引擎方法類: 調(diào)用腳本執(zhí)行類
- 前端展示配置,腳本定義
- 平臺使用腳本的示例,用法
1.消息轉(zhuǎn)發(fā)上下文執(zhí)行方法(可選)
@ScriptBean("msgContextService") 定義該類可以在上下文使用,使用方法是 msgContextService.porcess()
/*
* 規(guī)則引擎上下文執(zhí)行方法
* @author gsb
*/
@Component
@Slf4j
@ScriptBean("msgContextService")
public class MsgContextService {
private final RedisCache redisCache;
public MsgContextService(RedisCache redisCache){
this.redisCache = redisCache;
}
private void process(String serialNumber){
//執(zhí)行的業(yè)務(wù)邏輯
}
}
2.執(zhí)行規(guī)則引擎方法類
先根據(jù)設(shè)備編號獲取到 產(chǎn)品ID、協(xié)議編
號等信息,并緩存到redis,然后根據(jù)產(chǎn)品ID,等信息查詢 iot_scripe表中是否存在要執(zhí)行的腳本,如果有,則執(zhí)行腳本。類定義如下:
/**
* 執(zhí)行規(guī)則引擎
* @author gsb
*/
@Component
@Slf4j
public class RuleProcess {
@Resource
private FlowExecutor flowExecutor;
@Resource
private IScriptService scriptService;
@Resource
private RedisCache redisCache;
@Resource
private IProductService productService;
}
執(zhí)行引擎腳本方法:
/**
* 規(guī)則引擎腳本處理
* @param topic
* @param payload
* @param event 1=設(shè)備上報 2=平臺下發(fā) 3=設(shè)備上線 4=設(shè)備下線 (其他可以增加設(shè)備完成主題訂閱之類)
* @return
*/
public MsgContext processRuleScript(String serialNumber, int event, String topic, String payload) {
ProductCode productCode = getDeviceDetail(serialNumber);
if (Objects.isNull(productCode)){
return null;
}
// 查詢數(shù)據(jù)流腳本組件
ScriptCondition scriptCondition = new ScriptCondition();
scriptCondition.setProductId(productCode.getProductId());
scriptCondition.setScriptEvent(event); // 事件 1=設(shè)備上報 2=平臺下發(fā) 3=設(shè)備上線 4=設(shè)備下線
scriptCondition.setScriptPurpose(1); // 腳本用途:數(shù)據(jù)流=1
String[] scriptIds = scriptService.selectRuleScriptIdArray(scriptCondition);
MsgContext context = new MsgContext(topic, payload, serialNumber, productCode.getProductId(), productCode.getProtocolCode());
//如果查詢不到腳本,則認(rèn)為是不用處理
if (Objects.isNull(scriptIds) || scriptIds.length == 0) {
return null;
}
// 動態(tài)構(gòu)造Chain和EL表達(dá)式
String el = String.join(",", scriptIds); // THEN(a,b,c,d)
LiteFlowChainELBuilder.createChain().setChainName("dataChain").setEL("THEN(" + el + ")").build();
// 執(zhí)行規(guī)則腳本
LiteflowResponse response = flowExecutor.execute2Resp("dataChain", null, context);
if (!response.isSuccess()) {
log.error("規(guī)則腳本執(zhí)行發(fā)生錯誤:" + response.getMessage());
}
return context;
}
獲取產(chǎn)品ID,協(xié)議編號等信息方法
/**
* 查詢產(chǎn)品id,協(xié)議編號,緩存到redis,后續(xù)查詢協(xié)議的地方替換數(shù)據(jù)庫查詢
*
* @param serialNumber
*/
public ProductCode getDeviceDetail(String serialNumber) {
ProductCode productCode;
String cacheKey = RedisKeyBuilder.buildDeviceMsgCacheKey(serialNumber);
if (redisCache.containsKey(cacheKey)) {
Object cacheObject = redisCache.getCacheObject(cacheKey);
return JSON.parseObject(cacheObject.toString(), ProductCode.class);
}
productCode = productService.getProtocolBySerialNumber(serialNumber);
String jsonString = JSON.toJSONString(productCode);
redisCache.setCacheObject(cacheKey, jsonString);
return productCode;
}
3. 前端配置頁面如下:

5.平臺使用腳本的示例,用法
設(shè)備上報:設(shè)備上報是指設(shè)備端上報數(shù)據(jù)到云端,設(shè)備端定義的數(shù)據(jù)或者topic都可能跟系統(tǒng)有差別,通過消息轉(zhuǎn)發(fā)可以調(diào)整主題名稱和消息內(nèi)容,適配到設(shè)備的主題和內(nèi)容。我們看看當(dāng)前平臺調(diào)用設(shè)備上報執(zhí)行腳本的地方:
- 使用emqx作為MQTT服務(wù)器時, 在MQTT客戶端橋接消息的回調(diào)方法類(MqttService)中調(diào)用
/**
* 消息回調(diào)方法
* @param topic 主題
* @param mqttMessage 消息體
*/
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
String message = new String(mqttMessage.getPayload());
//..此處省略代碼../
//這里默認(rèn)設(shè)備編號長度超過9位
String[] split = topic.split("/");
String clientId = Arrays.stream(split).filter(imei -> imei.length() > 9).findFirst().get();
// 規(guī)則引擎腳本處理,完成后返回結(jié)果
MsgContext context = ruleProcess.processRuleScript(clientId,1, topic, message);
if (!Objects.isNull(context)){
topic = context.getTopic();
message = context.getPayload();
}
- 使用netty-MQTT作為MQTT服務(wù)器時,在類MqttPublish中sendToMQ調(diào)用
/**
* 消息推送
* @param message 推送消息
*/
@SneakyThrows
public void sendToMQ(MqttPublishMessage message,String clientId) {
/*獲取topic*/
String topicName = message.variableHeader().topicName();
byte[] source = ByteBufUtil.getBytes(message.content());
//..此處省略代碼../
// 規(guī)則引擎腳本處理,完成后返回結(jié)果
MsgContext context = ruleProcess.processRuleScript(clientId,1, topicName, new String(source));
if (!Objects.isNull(context)){
reportBo.setTopicName(context.getTopic());
reportBo.setData(context.getPayload().getBytes(StandardCharsets.UTF_8));
}
//..此處省略代碼../
}