mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-02 04:08:54 +08:00
feix 告警规则界面化修复
This commit is contained in:
parent
6f7a77d072
commit
adfaffc9a2
@ -100,9 +100,9 @@
|
||||
create => on_action_create_dgiot,
|
||||
params => ?ACTION_DATA_SPEC,
|
||||
title => #{en => <<"DGIOT CHANNEL">>,
|
||||
zh => <<"数蛙物联网通道"/utf8>>},
|
||||
zh => <<"DGIOT通道"/utf8>>},
|
||||
description => #{en => <<"Republish a MQTT message to dgiot channel">>,
|
||||
zh => <<"重新发布消息到物联网通道"/utf8>>}
|
||||
zh => <<"重新发布消息到DGIOT通道"/utf8>>}
|
||||
}).
|
||||
|
||||
|
||||
@ -194,5 +194,5 @@ on_action_dgiot(Selected, #{event := Event} = Envs) ->
|
||||
%% dgiot_umeng:add_notification(Ruleid, DevAddr, NewPayload);
|
||||
|
||||
post_rule(Msg) ->
|
||||
io:format("~s ~p Msg = ~p.~n", [?FILE, ?LINE, Msg]).
|
||||
%% ?LOG(debug, "~s ~p Msg = ~p.~n", [?FILE, ?LINE, Msg]).
|
||||
%% io:format("~s ~p Msg = ~p.~n", [?FILE, ?LINE, Msg]).
|
||||
?LOG(debug, "~s ~p Msg = ~p.~n", [?FILE, ?LINE, Msg]).
|
||||
|
@ -174,10 +174,10 @@ do_request(get_actions, _Args, _Context, _Req) ->
|
||||
%%do_request(post_rulesql, #{<<"select">> := Select, <<"from">> := From, <<"where">> := Where, <<"method">> := Method}, _Context, _Req) ->
|
||||
%% device_sql(Select, From, Where, Method);
|
||||
|
||||
do_request(post_rulesql, #{<<"trigger">> := Trigger, <<"condition">> := Condition, <<"action">> := Action, <<"ruleid">> := Ruleid, <<"description">> := Description}, _Context, _Req) ->
|
||||
do_request(post_rulesql, #{<<"trigger">> := Trigger, <<"ruleid">> := Ruleid, <<"productid">> := ProductId, <<"description">> := Description}, _Context, _Req) ->
|
||||
%% device_sql(Select, From, Where, Method);
|
||||
sql_tpl(Trigger, Condition, Action, Ruleid, Description);
|
||||
|
||||
%% sql_tpl(Trigger, Condition, Action, Ruleid, Description);
|
||||
sql_tpl(Trigger, Ruleid, ProductId, Description);
|
||||
|
||||
|
||||
do_request(get_actions_id, #{<<"id">> := RuleID}, _Context, _Req) ->
|
||||
@ -219,24 +219,12 @@ do_request(_OperationId, _Args, _Context, _Req) ->
|
||||
%%desc 消息通信数据格式
|
||||
%%doc-api https://help.aliyun.com/document_detail/73736.htm?spm=a2c4g.11186623.0.0.353d7a48CvEOwF#concept-ap3-lql-b2b
|
||||
|
||||
sql_tpl(Trigger, Condition, Action, Ruleid, Description) ->
|
||||
SELECT = generateSelect(Condition, Trigger, Action),
|
||||
FROM = generateFrom(Trigger),
|
||||
WHERE = generateWhere(Condition, Trigger, FROM),
|
||||
%% {200, #{<<"code">> => 200, <<"Action">> => Action, <<"TopicTpl">> => FROM, <<"WHERE">> => WHERE}}.
|
||||
%% SELECT payload.electricity as electricity FROM "$dg/alarm/94656917ab/157d0ff60f/#" where electricity > 20
|
||||
sql_tpl(Trigger, Ruleid, ProductId, Description) ->
|
||||
SELECT = generateSelect(Trigger),
|
||||
FROM = generateFrom(ProductId, Trigger),
|
||||
WHERE = generateWhere(Trigger),
|
||||
|
||||
%% WhereSql = lists:foldl(fun(X, Acc) ->
|
||||
%% case X of
|
||||
%% #{<<"identifier">> := Id, <<"operator">> := Op, <<"value">> := Value} ->
|
||||
%% case Acc of
|
||||
%% <<"">> ->
|
||||
%% <<Id/binary, " ", Op/binary, " ", Value/binary>>;
|
||||
%% _ ->
|
||||
%% <<Acc/binary, ", \r\n ", Id/binary, " ", Op/binary, " ", Value/binary>>
|
||||
%% end;
|
||||
%% _ -> Acc
|
||||
%% end
|
||||
%% end, <<"">>, Where),
|
||||
ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BACKEND_CHL), <<"NOTIFICATION">>, <<"dgiot_notification">>),
|
||||
DefaultSql =
|
||||
<<"SELECT", "\r\n",
|
||||
@ -245,7 +233,7 @@ sql_tpl(Trigger, Condition, Action, Ruleid, Description) ->
|
||||
" \"", FROM/binary, "\"", "\r\n",
|
||||
"WHERE", "\r\n ",
|
||||
WHERE/binary>>,
|
||||
create_rules(Ruleid, ChannelId, Description, DefaultSql, <<"/${productid}/#">>),
|
||||
create_rules(Ruleid, ChannelId, Description, DefaultSql, FROM),
|
||||
{ok, #{<<"template">> => DefaultSql}}.
|
||||
|
||||
%% 根据设备条件生成sql模板
|
||||
@ -428,7 +416,6 @@ getSelect(Select, Acc1) ->
|
||||
end
|
||||
end, Acc1, lists:flatten(maps:values(Select))).
|
||||
|
||||
generateFrom(Trigger) ->
|
||||
%% Trigger: [
|
||||
%% {
|
||||
%% label: '设备属性触发',
|
||||
@ -448,76 +435,38 @@ generateFrom(Trigger) ->
|
||||
%% },
|
||||
%% ],
|
||||
%%
|
||||
Firstfrom = lists:nth(1, maps:get(<<"items">>, Trigger)),
|
||||
Uri = maps:get(<<"uri">>, Firstfrom),
|
||||
Params = maps:get(<<"params">>, Firstfrom, #{}),
|
||||
generateFrom(ProductId, Trigger) ->
|
||||
Firstitems = lists:nth(1, maps:get(<<"items">>, Trigger)),
|
||||
Uri = maps:get(<<"uri">>, Firstitems, <<>>),
|
||||
Params = maps:get(<<"params">>, Firstitems, #{}),
|
||||
case Uri of
|
||||
<<"trigger/product/property">> ->
|
||||
case Params of
|
||||
#{<<"productKey">> := ProductId, <<"deviceName">> := DeviceId} ->
|
||||
case DeviceId of
|
||||
<<"">> ->
|
||||
<<"$dg/user/alarm/", ProductId/binary, "/#">>;
|
||||
<<"#">> ->
|
||||
<<"$dg/user/alarm/", ProductId/binary, "/#">>;
|
||||
_ ->
|
||||
<<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/#">>
|
||||
end;
|
||||
_ ->
|
||||
<<"$dg/user/alarm/test/#">>
|
||||
end;
|
||||
<<"trigger/product/event">> ->
|
||||
case Params of
|
||||
#{<<"productKey">> := ProductId, <<"deviceName">> := DeviceId} ->
|
||||
case DeviceId of
|
||||
<<"">> ->
|
||||
<<"$dg/user/alarm/", ProductId/binary, "/#">>;
|
||||
<<"#">> ->
|
||||
<<"$dg/user/alarm/", ProductId/binary, "/#">>;
|
||||
_ ->
|
||||
<<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/#">>
|
||||
end;
|
||||
_ ->
|
||||
<<"$dg/user/alarm/test/#">>
|
||||
end;
|
||||
<<"trigger/mqtt/event">> ->
|
||||
case Params of
|
||||
#{<<"productKey">> := ProductId, <<"mqtt">> := Mqtt, <<"deviceName">> := DeviceId} ->
|
||||
case ProductId of
|
||||
<<"">> ->
|
||||
<<"$events/", Mqtt/binary, "/", ProductId/binary, "/", DeviceId/binary>>;
|
||||
_ ->
|
||||
<<"$events/", Mqtt/binary, "/", ProductId/binary, "/", DeviceId/binary, "/#">>
|
||||
end;
|
||||
_ ->
|
||||
<<"$dg/user/alarm/test/#">>
|
||||
end;
|
||||
<<"trigger/timer">> ->
|
||||
<<"$dg/user/alarm/test/#">>;
|
||||
{NewProductId, NewDeviceId} =
|
||||
case Params of
|
||||
#{<<"productKey">> := <<"">>, <<"deviceName">> := <<"">>} ->
|
||||
{ProductId, <<"#">>};
|
||||
#{<<"productKey">> := <<"">>, <<"deviceName">> := <<"#">>} ->
|
||||
{ProductId, <<"#">>};
|
||||
#{<<"productKey">> := <<"">>, <<"deviceName">> := DeviceId} ->
|
||||
{ProductId, <<DeviceId/binary, "/#">>};
|
||||
#{<<"productKey">> := ProductKey, <<"deviceName">> := <<"">>} ->
|
||||
{ProductKey, <<"#">>};
|
||||
#{<<"productKey">> := ProductKey, <<"deviceName">> := <<"#">>} ->
|
||||
{ProductKey, <<"#">>};
|
||||
#{<<"productKey">> := ProductKey, <<"deviceName">> := DeviceId} ->
|
||||
{ProductKey, <<DeviceId/binary, "/#">>};
|
||||
_ ->
|
||||
{ProductId, <<"#">>}
|
||||
end,
|
||||
<<"$dg/user/alarm/", NewProductId/binary, "/", NewDeviceId/binary>>;
|
||||
_ ->
|
||||
<<"$dg/user/alarm/test/#">>
|
||||
end.
|
||||
|
||||
generateSelect(Condition, _Trigger, _FROM) ->
|
||||
generateSelect(Trigger) ->
|
||||
lists:foldl(fun(Item, Acc) ->
|
||||
case Item of
|
||||
#{<<"uri">> := <<"condition/device/deviceState">>, <<"params">> := Params} ->
|
||||
PropertyName = maps:get(<<"propertyName">>, Params, <<"test">>),
|
||||
case Acc of
|
||||
<<"">> ->
|
||||
<<"payload.", PropertyName/binary, " as ", PropertyName/binary>>;
|
||||
_ ->
|
||||
<<Acc/binary, ",\r\n ", "payload.", PropertyName/binary, " as ", PropertyName/binary>>
|
||||
end;
|
||||
#{<<"uri">> := <<"condition/device/stateContinue">>, <<"params">> := Params} ->
|
||||
PropertyName = maps:get(<<"propertyName">>, Params, <<"test">>),
|
||||
case Acc of
|
||||
<<"">> ->
|
||||
<<"payload.", PropertyName/binary, " as ", PropertyName/binary>>;
|
||||
_ ->
|
||||
<<Acc/binary, ",\r\n ", "payload.", PropertyName/binary, " as ", PropertyName/binary>>
|
||||
end;
|
||||
#{<<"uri">> := <<"condition/device/time">>, <<"params">> := Params} ->
|
||||
#{<<"uri">> := <<"trigger/product/property">>, <<"params">> := Params} ->
|
||||
PropertyName = maps:get(<<"propertyName">>, Params, <<"test">>),
|
||||
case Acc of
|
||||
<<"">> ->
|
||||
@ -528,24 +477,12 @@ generateSelect(Condition, _Trigger, _FROM) ->
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, <<"">>, maps:get(<<"items">>, Condition, [])).
|
||||
end, <<"">>, maps:get(<<"items">>, Trigger, [])).
|
||||
|
||||
generateWhere(Condition, _Trigger, _FROM) ->
|
||||
%% Condition: [
|
||||
%% {
|
||||
%% label: '状态持续时长判断',
|
||||
%% value: 'condition/device/stateContinue',
|
||||
%% },
|
||||
%% {
|
||||
%% label: '设备状态',
|
||||
%% value: 'condition/device/deviceState',
|
||||
%% },
|
||||
%% { label: '时间范围', value: 'condition/device/time' },
|
||||
%% // { label: '设备属性值', value: 'condition/device/property' },
|
||||
%% ],
|
||||
L1 = lists:foldl(fun(Item, Acc) ->
|
||||
generateWhere(Trigger) ->
|
||||
lists:foldl(fun(Item, Acc) ->
|
||||
case Item of
|
||||
#{<<"uri">> := <<"condition/device/deviceState">>, <<"params">> := Params} ->
|
||||
#{<<"uri">> := <<"trigger/product/property">>, <<"params">> := Params} ->
|
||||
CompareValue = maps:get(<<"compareValue">>, Params, <<"0">>),
|
||||
CompareType = maps:get(<<"compareType">>, Params, <<"=">>),
|
||||
PropertyName = maps:get(<<"propertyName">>, Params, <<"test">>),
|
||||
@ -559,53 +496,7 @@ generateWhere(Condition, _Trigger, _FROM) ->
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, <<"">>, maps:get(<<"items">>, Condition, [])),
|
||||
L2 = lists:foldl(fun(Item, Acc) ->
|
||||
case Item of
|
||||
#{<<"uri">> := <<"condition/device/stateContinue">>, <<"params">> := #{
|
||||
<<"state">> := State, <<"times">> := Times}} ->
|
||||
Continue = <<State/binary, " = ", Times/binary>>,
|
||||
case Acc of
|
||||
<<"">> ->
|
||||
Continue;
|
||||
_ ->
|
||||
<<Acc/binary, ",\r\n ", Continue/binary>>
|
||||
end;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, L1, maps:get(<<"items">>, Condition, [])),
|
||||
|
||||
lists:foldl(fun(Item, Acc) ->
|
||||
case Item of
|
||||
#{<<"uri">> := <<"condition/device/time">>, <<"params">> := #{<<"time">> := Time}} ->
|
||||
{TimesStart, TimesEnd} =
|
||||
lists:foldl(fun(X, Acc1) ->
|
||||
case X of
|
||||
{Start, End} ->
|
||||
IntStart = dgiot_utils:to_int(Start),
|
||||
IntEnd = dgiot_utils:to_int(End),
|
||||
case IntStart > IntEnd of
|
||||
true ->
|
||||
{dgiot_utils:to_binary(End), dgiot_utils:to_binary(Start)};
|
||||
false ->
|
||||
{dgiot_utils:to_binary(Start), dgiot_utils:to_binary(End)}
|
||||
end;
|
||||
_ ->
|
||||
Acc1
|
||||
end
|
||||
end, {<<"1646064000000">>, <<"1649088000000">>}, Time),
|
||||
TimesLg = <<"timestamp < ", TimesStart/binary, ", \r\n ", "timestamp > ", TimesEnd/binary>>,
|
||||
case Acc of
|
||||
<<"">> ->
|
||||
TimesLg;
|
||||
_ ->
|
||||
<<Acc/binary, " , ", TimesLg/binary>>
|
||||
end;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, L2, maps:get(<<"items">>, Condition, [])).
|
||||
end, <<"">>, maps:get(<<"items">>, Trigger, [])).
|
||||
|
||||
create_rules(RuleID, ChannelId, Description, Rawsql, Target_topic) ->
|
||||
emqx_rule_engine_api:create_resource(#{},
|
||||
@ -619,7 +510,7 @@ create_rules(RuleID, ChannelId, Description, Rawsql, Target_topic) ->
|
||||
<<"actions">> => [#{<<"name">> => <<"dgiot">>, <<"fallbacks">> => [],
|
||||
<<"params">> => #{
|
||||
<<"$resource">> => <<"resource:", ChannelId/binary>>,
|
||||
<<"channel">> => <<"数蛙物联网通道"/utf8>>,
|
||||
<<"channel">> => <<"DGIOT通道"/utf8>>,
|
||||
<<"payload_tmpl">> => <<"${payload}">>,
|
||||
<<"target_qos">> => 0,
|
||||
<<"target_topic">> => Target_topic
|
||||
|
@ -274,7 +274,7 @@ stop(ChannelType, ChannelId, _State) ->
|
||||
ok.
|
||||
|
||||
get_new_location(#{<<"longitude">> := Longitude, <<"latitude">> := Latitude}, <<"baidu">>) ->
|
||||
[Mglng, Mglat] = dgiot_gps:get_baidu_gps(Longitude, Latitude),
|
||||
[Mglng, Mglat] = dgiot_gps:get_baidu_gps(dgiot_utils:to_float(Longitude), dgiot_utils:to_float(Latitude)),
|
||||
#{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => Mglng, <<"latitude">> => Mglat};
|
||||
|
||||
get_new_location(#{<<"longitude">> := Longitude, <<"latitude">> := Latitude}, <<"GCJ">>) ->
|
||||
|
@ -74,22 +74,24 @@ save_(#{<<"objectId">> := DeviceId, <<"devaddr">> := Devaddr, <<"product">> := P
|
||||
_ -> true
|
||||
end,
|
||||
IsEnable = maps:get(<<"isEnable">>, Device, false),
|
||||
#{<<"longitude">> := Longitude, <<"latitude">> := Latitude} =
|
||||
maps:get(<<"location">>, Device, #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.161324, <<"latitude">> => 30.262441}),
|
||||
{Longitude, Latitude} =
|
||||
case maps:find(<<"location">>, Device) of
|
||||
{ok, #{<<"longitude">> := Longitude1, <<"latitude">> := Latitude1}} ->
|
||||
{Longitude1, Latitude1};
|
||||
_ ->
|
||||
{120.065714, 30.369491}
|
||||
end,
|
||||
insert_mnesia(DeviceId, dgiot_role:get_acls(Device), Status, UpdatedAt, IsEnable, ProductId, Devaddr, DeviceSecret, node(), Longitude, Latitude).
|
||||
|
||||
post(Device) ->
|
||||
put_content(Device),
|
||||
put_profile(Device),
|
||||
#{<<"longitude">> := Longitude, <<"latitude">> := Latitude} = put_location(Device),
|
||||
Devaddr = maps:get(<<"devaddr">>, Device),
|
||||
Product = maps:get(<<"product">>, Device),
|
||||
ProductId = maps:get(<<"objectId">>, Product),
|
||||
DeviceSecret = maps:get(<<"deviceSecret">>, Device, <<"oioojn">>),
|
||||
DeviceId = maps:get(<<"objectId">>, Device, dgiot_parse_id:get_deviceid(ProductId, Devaddr)),
|
||||
NewData = get_newdata(Device, ProductId),
|
||||
dgiot_parse:update_object(<<"Device">>, DeviceId, NewData),
|
||||
#{<<"longitude">> := Longitude, <<"latitude">> := Latitude} =
|
||||
maps:get(<<"location">>, NewData, #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.065463, <<"latitude">> => 30.368707}),
|
||||
Status =
|
||||
case maps:get(<<"status">>, Device, <<"OFFLINE">>) of
|
||||
<<"OFFLINE">> -> false;
|
||||
@ -98,27 +100,6 @@ post(Device) ->
|
||||
IsEnable = maps:get(<<"isEnable">>, Device, false),
|
||||
insert_mnesia(DeviceId, dgiot_role:get_acls(Device), Status, dgiot_datetime:now_secs(), IsEnable, ProductId, Devaddr, DeviceSecret, node(), Longitude, Latitude).
|
||||
|
||||
%% 新建设备时,如果没有,从产品拿默认值
|
||||
get_newdata(Device, ProductId) ->
|
||||
{Profile, Content, Location, Address} =
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, ProductInfo} ->
|
||||
Profile1 = maps:get(<<"profile">>, ProductInfo, #{}),
|
||||
Content1 = maps:get(<<"content">>, ProductInfo, #{}),
|
||||
Config = maps:get(<<"config">>, ProductInfo, #{}),
|
||||
Location1 = maps:get(<<"location">>, Config, #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.065463, <<"latitude">> => 30.368707}),
|
||||
Address1 = maps:get(<<"address">>, Config, <<>>),
|
||||
{Profile1, Content1, Location1, Address1};
|
||||
_ ->
|
||||
{#{}, #{}, #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.065463, <<"latitude">> => 30.368707}, <<>>}
|
||||
end,
|
||||
#{
|
||||
<<"profile">> => maps:get(<<"profile">>, Device, Profile),
|
||||
<<"content">> => maps:get(<<"content">>, Device, Content),
|
||||
<<"location">> => maps:get(<<"location">>, Device, Location#{<<"__type">> => <<"GeoPoint">>}),
|
||||
<<"address">> => maps:get(<<"address">>, Device, Address)
|
||||
}.
|
||||
|
||||
post(#{<<"ACL">> := _Acl} = Device, _SessionToken) ->
|
||||
dgiot_device_cache:post(Device);
|
||||
|
||||
@ -230,7 +211,7 @@ put_content(#{<<"product">> := Product, <<"objectId">> := DeviceId}) ->
|
||||
put_content(_) ->
|
||||
pass.
|
||||
|
||||
%% 根据产品的content 创建设备默认的content
|
||||
%% 根据产品的profile 创建设备默认的profile
|
||||
put_profile(#{<<"profile">> := _Content}) ->
|
||||
pass;
|
||||
put_profile(#{<<"product">> := Product, <<"objectId">> := DeviceId}) ->
|
||||
@ -244,6 +225,21 @@ put_profile(#{<<"product">> := Product, <<"objectId">> := DeviceId}) ->
|
||||
put_profile(_) ->
|
||||
pass.
|
||||
|
||||
%% 根据产品的location 创建设备默认的location
|
||||
put_location(#{<<"location">> := Location}) ->
|
||||
Location;
|
||||
put_location(#{<<"product">> := Product, <<"objectId">> := DeviceId}) ->
|
||||
ProductId = maps:get(<<"objectId">>, Product),
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"config">> := #{<<"location">> := Location, <<"address">> := Address}}} ->
|
||||
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"location">> => Location, <<"address">> => Address}),
|
||||
Location;
|
||||
_ ->
|
||||
#{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.065714, <<"latitude">> => 30.369491}
|
||||
end;
|
||||
put_location(_) ->
|
||||
#{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.065714, <<"latitude">> => 30.369491}.
|
||||
|
||||
enable(DeviceId) ->
|
||||
case lookup(DeviceId) of
|
||||
{ok, #{<<"status">> := Status, <<"acl">> := Acl, <<"devaddr">> := Devaddr, <<"productid">> := ProductId, <<"devicesecret">> := DeviceSecret,
|
||||
|
@ -220,11 +220,12 @@ handle_message(init, #state{id = ChannelId, env = Config} = State) ->
|
||||
end;
|
||||
|
||||
%% 数据与产品,设备地址分离
|
||||
handle_message({data, Product, DevAddr, Data, Context}, State) ->
|
||||
handle_message({data, Product, DevAddr, Data, Context}, #state{id = ChannelId} = State) ->
|
||||
dgiot_metrics:inc(dgiot_tdengine, <<"tdengine_recv">>, 1),
|
||||
case catch do_save([Product, DevAddr, Data, Context], State) of
|
||||
{Err, Reason} when Err == error; Err == 'EXIT' ->
|
||||
?LOG(error, "Save to Tdengine error, ~p, ~p", [Data, Reason]),
|
||||
dgiot_bridge:send_log(ChannelId, "Save to Tdengine error, ~ts~n, ~p", [unicode:characters_to_list(jsx:encode(Data)), Reason]),
|
||||
ok;
|
||||
{ok, NewState} ->
|
||||
{ok, NewState}
|
||||
|
Loading…
Reference in New Issue
Block a user