JiangShan-web/MQTT接口.md
2025-05-22 16:20:13 +08:00

576 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 在线离线接口
# 主题:
/productid/deviceid/status/post
# 原始http推送接口
http://121.41.45.13:8025/receive
abcdefghijkmlnopqrstuvwxyz
# 新的http推送接口
http://121.36.111.27:8080/bridge/get
# 消息格式json格式具体字段如下其中status取值4离线3在线
{"status":3,"isShadow":1,"rssi":-51}
# onenet平台上下线信息
{msg={"dev_name":"1","at":1742873510747,"pid":"JYr2f72uSJ","type":2,"status":1}, signature=25BcZvEQ+kk10rfB9gLnnQ==, time=1742873510772, id=5614fc489df3433f9891d6d4c7c3e5e7, nonce=b1R7hjgp}
"status":1上线 0下线
# 物模型推送接口: 将数据推送至FASTBEE端
/productid/deviceid/property/post
# 消息格式json格式具体字段如下
[{
"id": "co2",
"value": "1",
"remark": ""
}]
# 规则引擎仅作于设备状态更新
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
Long productId = 136 // 固定 productId
String sysTopic = "" // 系统主题
String sysPayload = "" // 系统数据格式
// 1. 获取原始内容
String payload = msgContext.getPayload()
msgContext.logger.info("原始数据:" + payload)
// 2. ✅ 只提取 msg 内容
try {
// ✅ 使用 Groovy 兼容的正则写法,提取 msg 内的数据
String msgData = (payload =~ /msg=\{(.+?)\}/)[0][1]
msgContext.logger.info("提取后的 msg 数据:" + msgData)
// ✅ 解析提取后的数据
JSONObject msgObj = JSONUtil.parseObj("{" + msgData + "}")
// ✅ 提取 dev_name 作为 serialNumber
String serialNumber = msgObj.getStr("dev_name", "unknown")
// ✅ 解析 status 并进行转换
Integer status = msgObj.getInt("status", -1)
Integer convertedStatus = (status == 0) ? 4 : (status == 1) ? 3 : status
// ✅ 构造转换后的 sysPayload
JSONObject newObj = new JSONObject()
newObj.put("status", convertedStatus)
newObj.put("isShadow", 1)
newObj.put("rssi", msgObj.getInt("rssi", -51)) // 默认 rssi = -51
sysPayload = newObj.toString()
sysTopic = "/" + productId + "/" + serialNumber + "/status/post"
} catch (Exception e) {
msgContext.logger.error("数据解析失败:", e)
sysPayload = "{}"
}
// 3. 打印调试信息
msgContext.logger.info("新主题:" + sysTopic)
msgContext.logger.info("新内容:" + sysPayload)
msgContext.logger.info("NewTopic 长度: " + sysTopic.length());
// 4. 设置新的数据
msgContext.setTopic(sysTopic)
msgContext.setPayload(sysPayload)
msgContext.logger.info("输出:" + msgContext.getTopic());
// 执行Action动作参数(脚本由系统自动生成)
msgContext.setData("mqttBridgeID", 8);
# 定制设备状态
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
String sysTopic = "/special/device" // 固定主题
String sysPayload = "" // 系统数据格式
// 1. 获取原始内容
String payload = msgContext.getPayload()
msgContext.logger.info(" 原始数据:" + payload)
// 2. ✅ 只提取 msg 内容
try {
// ✅ 使用 Groovy 兼容的正则写法,提取 msg 内的数据
String msgData = (payload =~ /msg=\{(.+?)\}/)[0][1]
msgContext.logger.info(" 提取后的 msg 数据:" + msgData)
// ✅ 解析提取后的数据
JSONObject msgObj = JSONUtil.parseObj("{" + msgData + "}")
// ✅ 提取 dev_name 作为 serialNumber
String serialNumber = msgObj.getStr("dev_name", "unknown")
// ✅ 解析 status 并进行转换 (0=上线,1=下线)
Integer status = msgObj.getInt("status", -1)
Integer convertedStatus = (status == 0) ? 0 : (status == 1) ? 1 : status
// ✅ 构造转换后的 JSON payload
JSONObject newObj = new JSONObject()
newObj.put("serialNumber", serialNumber)
newObj.put("status", convertedStatus)
sysPayload = newObj.toString()
} catch (Exception e) {
msgContext.logger.error(" 数据解析失败:", e)
// 错误时返回默认 JSON
JSONObject errorObj = new JSONObject()
errorObj.put("serialNumber", "error")
errorObj.put("status", -1)
sysPayload = errorObj.toString()
}
// 3. 打印调试信息
msgContext.logger.info(" 新主题:" + sysTopic)
msgContext.logger.info(" 新内容:" + sysPayload)
// 4. 设置新的数据
msgContext.setTopic(sysTopic)
msgContext.setPayload(sysPayload)
msgContext.logger.info(" 输出:" + msgContext.getTopic());
// 执行Action动作参数(脚本由系统自动生成)
msgContext.setData("mqttBridgeID", 8);
# ONENET 物模型
# 物模型
{
msg = {
"notifyType": "property",
"productId": "ebm1xlK9QJ",
"messageType": "notify",
"data": {
"id": "1",
"params": {
"time": {
"time": 1747031766274,
"value": 3
},
"press": {
"time": 1747031766273,
"value": 4
},
"remove": {
"time": 1747031766273,
"value": 2
}
}
},
"deviceName": "D1231XS9KI9X"
},
signature = YoYfkfdyNiYfPmJPLzz5wg == ,
time = 1747031766259,
id = e20abecf848c408fb381bc03a23ebfd7,
nonce = xQLnbAzQ
}
[
{
"id": "params",
"remark": "",
"value": "3"
},
{
"id": "press",
"remark": "",
"value": "3"
},
{
"id": "remove",
"remark": "",
"value": "3"
}
]
[
{
"id": "press",
"remark": "",
"value": "3"
}
]
{
"id": "123",
"version": "1.0",
"params": {
"time": {
"value": 2,
"time": 1706673129818
},
"press": {
"value": 3,
"time": 1706673129818
},
"remove": {
"value": 2
"time": 1747031766273
}
}
}
{
"id": "1234567890123",
"version": "1.0",
"params": {
"press": { "value": 4 },
"remove": { "value": 2 },
"time": { "value": 3 }
}
}
# mqttfx
# 物模型规则脚本
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
Long productId = 136
String sysTopic = ""
String sysPayload = "[]"
try {
// 1. 获取原始payload
String payload = msgContext.getPayload().trim()
// 2. 安全提取msg JSON兼容各种格式
int msgStart = payload.indexOf('"msg":') + 6
int jsonStart = payload.indexOf('{', msgStart)
int jsonEnd = payload.lastIndexOf('}')
String msgJsonStr = payload.substring(jsonStart, jsonEnd + 1)
msgContext.logger.info(" 提取的msg JSON: " + msgJsonStr)
// 3. 解析JSON
JSONObject msgObj = JSONUtil.parseObj(msgJsonStr)
// 4. 获取设备名(确保字段名正确)
String serialNumber = msgObj.getStr("deviceName", "unknown")
// 5. 处理物模型数据(完全兼容的遍历方式)
JSONObject data = msgObj.getJSONObject("data")
JSONObject params = data.getJSONObject("params")
JSONArray propertyArray = new JSONArray()
// 使用兼容的遍历方式替代each闭包
Set<String> keys = params.keySet()
keys.each { key ->
JSONObject valueObj = params.getJSONObject(key)
if (valueObj != null) {
JSONObject prop = new JSONObject()
prop.set("id", key)
prop.set("remark", "")
prop.set("value", valueObj.getStr("value"))
propertyArray.add(prop)
}
}
// 6. 设置输出
sysPayload = propertyArray.toString()
sysTopic = "/${productId}/${serialNumber}/property/post"
} catch (Exception e) {
msgContext.logger.error(" 处理异常: " + e.getMessage())
} finally {
// 确保有效输出
msgContext.setTopic(sysTopic ?: "/${productId}/unknown/property/post")
msgContext.setPayload(sysPayload ?: "[]")
msgContext.setData("mqttBridgeID", 8)
// 验证日志
msgContext.logger.info("=== 最终结果验证 ===")
msgContext.logger.info(" 设备名: " + (sysTopic.split("/")[2] ?: "null"))
msgContext.logger.info("Payload 内容: " + sysPayload)
}
# 2合1脚本
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
// 公共配置
Long productId = 136
String sysTopic = ""
String sysPayload = ""
try {
// 1. 获取原始payload
String payload = msgContext.getPayload().trim()
msgContext.logger.info(" 原始数据:" + payload)
// 2. 提取msg内容
String msgContent = payload
if (payload.startsWith("{")) {
// 尝试直接解析为JSON
try {
JSONObject payloadObj = JSONUtil.parseObj(payload)
if (payloadObj.containsKey("msg")) {
msgContent = payloadObj.getStr("msg")
}
} catch (Exception e) {
// 可能是msg={...}格式
if (payload.contains("msg={")) {
int start = payload.indexOf("msg={") + 4
int end = payload.lastIndexOf("}") + 1
msgContent = payload.substring(start, end)
}
}
} else if (payload.contains("msg={")) {
// 处理msg={"key":"value"}格式
int start = payload.indexOf("msg={") + 4
int end = payload.lastIndexOf("}") + 1
msgContent = payload.substring(start, end)
} else if (payload.contains("msg=")) {
// 处理msg=JSON格式不带大括号
int start = payload.indexOf("msg=") + 4
msgContent = payload.substring(start)
}
// 3. 解析msg内容
JSONObject parsedObj = JSONUtil.parseObj(msgContent)
msgContext.logger.info(" 解析后的JSON对象" + parsedObj)
// 4. 判断数据类型并处理
if (parsedObj.containsKey("notifyType") && "property".equals(parsedObj.getStr("notifyType"))) {
// 物模型数据处理 =========================================
msgContext.logger.info(" 检测到物模型数据格式")
// 获取设备名
String serialNumber = parsedObj.getStr("deviceName", "unknown")
// 处理物模型数据
JSONObject data = parsedObj.getJSONObject("data")
JSONObject params = data.getJSONObject("params")
JSONArray propertyArray = new JSONArray()
params.keySet().each { key ->
JSONObject valueObj = params.getJSONObject(key)
if (valueObj != null) {
JSONObject prop = new JSONObject()
prop.set("id", key)
prop.set("remark", "")
prop.set("value", valueObj.getStr("value"))
propertyArray.add(prop)
}
}
// 设置物模型输出
sysPayload = propertyArray.toString()
sysTopic = "/${productId}/${serialNumber}/property/post"
} else if (parsedObj.containsKey("dev_name") || parsedObj.containsKey("status")) {
// 设备状态处理 ===========================================
msgContext.logger.info(" 检测到设备状态数据格式")
// 提取设备信息
String serialNumber = parsedObj.getStr("dev_name", "unknown")
Integer status = parsedObj.getInt("status", -1)
Integer convertedStatus = (status == 0) ? 0 : (status == 1) ? 1 : status
// 构造状态JSON
JSONObject statusObj = new JSONObject()
statusObj.put("serialNumber", serialNumber)
statusObj.put("status", convertedStatus)
// 设置状态输出
sysPayload = statusObj.toString()
sysTopic = "/special/device"
} else {
throw new Exception("无法识别的数据格式")
}
} catch (Exception e) {
msgContext.logger.error(" 处理异常: " + e.getMessage())
// 创建错误响应
JSONObject errorObj = new JSONObject()
errorObj.put("error", e.getMessage())
errorObj.put("originalData", msgContext.getPayload())
sysPayload = errorObj.toString()
sysTopic = "/error/unknown"
} finally {
// 确保有效输出
msgContext.setTopic(sysTopic)
msgContext.setPayload(sysPayload)
msgContext.setData("mqttBridgeID", 8)
// 记录最终结果
msgContext.logger.info("=== 处理结果 ===")
msgContext.logger.info(" 主题: " + sysTopic)
msgContext.logger.info("Payload: " + sysPayload)
}
# 压力传感器的id
{msg={at=1747123666940, imei=868256050123875, type=1, ds_id=3323_0_5700, value=1538.0, dev_id=2415928615}, msg_signature=mpYC8EvR7lB14AzqteyjhA==, nonce=nPyzGpMV}
{"msg":{"at":1747142877978,"imei":"868256050017168","type":1,"ds_id":"3323_0_5700","value":-1.0,"dev_id":2442383559},"msg_signature":"MHjUOUPj0GGfpKpyBZ6BzQ==","nonce":"TdTKIcVq"}
# 上下线消息
{msg={at=1747126859912, login_type=10, imei=868256050123875, type=2, dev_id=2442383559, status=1}, msg_signature=Z/cRScM2TLOUeaptE1hllg==, nonce=BSy0fFIY}
{"msg":{"at":1747126859912,"login_type":10,"imei":"868256050123875","type":2,"dev_id":2442383559,"status":1},"msg_signature":"Z/cRScM2TLOUeaptE1hllg==","nonce":"BSy0fFIY"}
# 移动消息
{"msg":{"at":1747275785411,"imei":"868256050099802","type":1,"ds_id":"30100_0_6500","value":"01","dev_id":2442561775},"msg_signature":"2dRxxL4t9HDW1EmRwIGhfQ==","nonce":"hM31WR88"}
脚本
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.hutool.core.util.NumberUtil;
// 默认 topic
String sysTopic = "/special/device";
String sysPayload = "";
// 获取原始 payload
String payload = msgContext.getPayload();
msgContext.logger.info("原始 payload" + payload);
try {
// 提取 msg 内容
def matcher = (payload =~ /"msg":\{(.+?)\}/);
if (!matcher.find()) {
throw new Exception("未找到 msg 字段");
}
String msgData = matcher[0][1];
msgContext.logger.info("提取出的 msg 内容:" + msgData);
// 将 msgData 转为 JSON 字符串key=value → "key":"value"
String jsonLike = msgData.replaceAll(/(\w+)=([^,}]+)/) { all, k, v ->
"\"${k}\":\"${v}\""
};
jsonLike = "{${jsonLike}}";
msgContext.logger.info("转换为 JSON 格式字符串:" + jsonLike);
JSONObject msgObj = JSONUtil.parseObj(jsonLike);
// 判断是否是物模型数据(包含 ds_id 字段)
if (msgObj.containsKey("ds_id")) {
String dsid = msgObj.getStr("ds_id");
if ("3323_0_5700" != dsid) {
msgContext.logger.info("不处理的 ds_id" + dsid);
return; // 直接跳过不处理
}
// ✅ 物模型处理
String imei = msgObj.getStr("imei", "unknown");
String valueStr = msgObj.getStr("value", "0");
Double valueNum = NumberUtil.parseNumber(msgObj.getStr("value", "0")).doubleValue();
JSONArray array = new JSONArray();
JSONObject data = new JSONObject();
data.put("id", "press");
data.put("remark", "");
data.put("value", valueNum);
array.add(data);
sysPayload = array.toString();
sysTopic = "/138/${imei}/property/post";
} else {
// ✅ 在线离线数据处理
String imei = msgObj.getStr("imei", "unknown");
Integer status = NumberUtil.parseInt(msgObj.getStr("status", "-1"));
Integer loginType = NumberUtil.parseInt(msgObj.getStr("login_type", "-1"));
Long timestamp = msgObj.getLong("at", 0);
JSONObject newPayload = new JSONObject();
newPayload.put("serialNumber", imei);
newPayload.put("status", status);
newPayload.put("loginType", loginType);
newPayload.put("timestamp", timestamp);
sysPayload = newPayload.toString();
sysTopic = "/special/device";
}
} catch (Exception e) {
msgContext.logger.error("解析出错:", e);
JSONObject errorPayload = new JSONObject();
errorPayload.put("serialNumber", "error");
errorPayload.put("status", -1);
sysPayload = errorPayload.toString();
sysTopic = "/special/device";
}
// 设置 topic 和 payload
msgContext.setTopic(sysTopic);
msgContext.setPayload(sysPayload);
// 打印调试信息
msgContext.logger.info("新主题:" + sysTopic);
msgContext.logger.info("新内容:" + sysPayload);
msgContext.logger.info("输出:" + msgContext.getTopic());
// 可选设置
// 执行Action动作参数(脚本由系统自动生成)
msgContext.setData("mqttBridgeID", 7);
// 执行Action动作参数(脚本由系统自动生成)