mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-02 04:08:54 +08:00
feat: add dgiot_mqttc
This commit is contained in:
parent
28379d8688
commit
410bd10fd0
@ -24,7 +24,9 @@
|
||||
-export([register/3, unregister/1, start_link/2, add_clock/3, notify/3, add/2, set_consumer/2, get_consumer/1]).
|
||||
-export([start/2, start/3, stop/1, stop/2, stop/3, restart/2, get/2, send/4, count/1]).
|
||||
-export([get_time/1, get_nexttime/2, get_count/3, get_rand/1]).
|
||||
-export([get_que/2, save_que/3, start_que/1, get_pnque_len/2, save_pnque/5, get_pnque/2, del_pnque/2, start_pnque/2]).
|
||||
-export([get_que/2, save_que/3, start_que/1, stop_que/1]).
|
||||
-export([get_pnque_len/2, save_pnque/5, get_pnque/2, del_pnque/2, start_pnque/2, stop_pnque/2]).
|
||||
|
||||
-type(result() :: any()). %% todo 目前只做参数检查,不做结果检查
|
||||
|
||||
%% @doc 注册client的通道管理池子
|
||||
@ -115,6 +117,26 @@ start_pnque(ChannelId, ClinetId) ->
|
||||
end, PnQue)
|
||||
end.
|
||||
|
||||
stop_pnque(ChannelId, ClinetId) when is_atom(ChannelId) ->
|
||||
stop_pnque(dgiot_utils:to_binary(ChannelId), ClinetId);
|
||||
stop_pnque(ChannelId, ClinetId) ->
|
||||
case dgiot_data:get(?DCLINET_PNQUE(ChannelId), ClinetId) of
|
||||
not_find ->
|
||||
not_find;
|
||||
PnQue ->
|
||||
lists:map(
|
||||
fun
|
||||
({ProductId, DevAddr}) ->
|
||||
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
|
||||
%% io:format("~s ~p ChannelId ~p, Type ~p , ClinetId ~p ~n",[?FILE, ?LINE, ChannelId, Type, ClinetId]),
|
||||
dgiot_client:stop(<<ProductId/binary, "_", ChannelId/binary>>, DeviceId,
|
||||
#{<<"child">> => #{<<"productid">> => ProductId, <<"devaddr">> => DevAddr, <<"dtuid">> => ClinetId}});
|
||||
(_) ->
|
||||
pass
|
||||
end, PnQue),
|
||||
del_pnque(ChannelId, ClinetId)
|
||||
end.
|
||||
|
||||
save_que(ChannelId, ProductId, DevAddr) ->
|
||||
DtuId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_data:insert(?DCLINET_QUE(ChannelId), DtuId, {ProductId, DevAddr}).
|
||||
@ -133,6 +155,18 @@ start_que(ChannelId) ->
|
||||
end,
|
||||
dgiot_data:loop(?DCLINET_QUE(ChannelId), Fun).
|
||||
|
||||
stop_que(ChannelId) ->
|
||||
Fun = fun
|
||||
({ClientId, _Value}) ->
|
||||
dgiot_client:stop_pnque(ChannelId, ClientId),
|
||||
dgiot_client:stop(ChannelId, ClientId);
|
||||
(_) ->
|
||||
pass
|
||||
end,
|
||||
dgiot_data:loop(?DCLINET_QUE(ChannelId), Fun),
|
||||
timer:sleep(3000),
|
||||
dgiot_data:destroy(?DCLINET_QUE(ChannelId)).
|
||||
|
||||
%% @doc 在通道管理池子中增加client的Pid号
|
||||
-spec add(atom() | binary(), binary()) -> result().
|
||||
add(ChannelId, ClientId) when is_binary(ChannelId) ->
|
||||
@ -287,7 +321,7 @@ count(ChannelId) ->
|
||||
undefined ->
|
||||
0;
|
||||
Info ->
|
||||
proplists:get_value(size,Info)
|
||||
proplists:get_value(size, Info)
|
||||
end.
|
||||
|
||||
%% @doc 做一下全局的错峰处理
|
||||
@ -319,7 +353,7 @@ get_nexttime(NowTime, Freq, NextTime) when (NextTime =< NowTime) ->
|
||||
erlang:send_after(RetryTime * 1000, self(), next_time),
|
||||
NowTime + RetryTime + Freq;
|
||||
|
||||
get_nexttime(NowTime, Freq, NextTime) ->
|
||||
get_nexttime(NowTime, Freq, NextTime) ->
|
||||
RetryTime = NextTime - NowTime,
|
||||
erlang:send_after(RetryTime * 1000, self(), next_time),
|
||||
NextTime + Freq.
|
||||
|
@ -17,6 +17,10 @@
|
||||
-module(dgiot_metrics).
|
||||
-include_lib("dgiot/include/dgiot.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-dgiot_data("ets").
|
||||
-export([init_ets/0]).
|
||||
-define(DGIOT_METRICS_ETS, dgiot_metrics_ets).
|
||||
|
||||
-export([counter/2, counter/3, counter/4, gauge/2, gauge/3, gauge/4, summary/2, summary/3, summary/4, histogram/3, histogram/2, histogram/4]).
|
||||
-export([counter_reset/1, counter_reset/2, counter_reset/3]).
|
||||
-export([gauge_reset/1, gauge_reset/2, gauge_reset/3]).
|
||||
@ -25,7 +29,7 @@
|
||||
|
||||
-export([init_metrics/1, collect_metrics/4]).
|
||||
-export([start_metrics/1, inc/3, inc/4, inc/5, dec/3, dec/4, dec/5]).
|
||||
-export([start/1, check_metrics/0]).
|
||||
-export([start/1, check_metrics/0, reset_metrics/1]).
|
||||
|
||||
-route_path("/metrics/:Registry").
|
||||
-export([init/2]).
|
||||
@ -37,6 +41,9 @@ init(Req0, ?MODULE) ->
|
||||
}, dgiot_utils:to_binary(Data), Req0),
|
||||
{ok, Req, ?MODULE}.
|
||||
|
||||
init_ets() ->
|
||||
dgiot_data:init(?DGIOT_METRICS_ETS).
|
||||
|
||||
counter(Name, Value) when Value > 0 ->
|
||||
counter(Name, [], Value);
|
||||
counter(Name, Value) when Value < 0 ->
|
||||
@ -105,63 +112,71 @@ histogram_reset(Registry, Name, LabelValues) ->
|
||||
%%新增统计函数
|
||||
inc(Registry, Name, Value) ->
|
||||
{ok, Count} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} ->
|
||||
{ok, 0};
|
||||
{ok, Count1} ->
|
||||
{ok, Count1}
|
||||
end,
|
||||
dgiot_data:insert({Name, Registry}, Count + Value).
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, Count + Value).
|
||||
|
||||
inc(Registry, Name, Value, Total, rate) ->
|
||||
{ok, Count} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
{error, not_find} -> {ok, 0};
|
||||
{ok, Count1} -> {ok, Count1}
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} ->
|
||||
{ok, 0};
|
||||
{ok, Count1} ->
|
||||
{ok, Count1}
|
||||
end,
|
||||
case (Count + Value) >= Total of
|
||||
OldValue = round(Count * Total / 100),
|
||||
NewValue = OldValue + Value,
|
||||
case NewValue >= Total of
|
||||
true ->
|
||||
dgiot_data:insert({Name, Registry}, 100);
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, 100);
|
||||
_ ->
|
||||
case Total > 0 of
|
||||
true -> dgiot_data:insert({Name, Registry}, round(100 * (Count + Value) / Total));
|
||||
_ -> dgiot_data:insert({Name, Registry}, 0)
|
||||
true ->
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, round(100 * NewValue) / Total);
|
||||
_ ->
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, 0)
|
||||
end
|
||||
end;
|
||||
|
||||
inc(Registry, Name, Value, Total, max) ->
|
||||
{ok, Count} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
{error, not_find} -> {ok, 0};
|
||||
{ok, Count1} -> {ok, Count1}
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} ->
|
||||
{ok, 0};
|
||||
{ok, Count1} ->
|
||||
{ok, Count1}
|
||||
end,
|
||||
case (Count + Value) >= Total of
|
||||
true ->
|
||||
dgiot_data:insert({Name, Registry}, Total);
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, Total);
|
||||
_ ->
|
||||
dgiot_data:insert({Name, Registry}, (Count + Value))
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, (Count + Value))
|
||||
end.
|
||||
|
||||
inc(Registry, Name, Value, average) ->
|
||||
{ok, Count} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} -> {ok, 0};
|
||||
{ok, Count1} -> {ok, Count1}
|
||||
end,
|
||||
dgiot_data:insert({Name, Registry}, (Count + Value) / 2);
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, (Count + Value) / 2);
|
||||
|
||||
inc(Registry, Name, Label, Value) ->
|
||||
{ok, Map} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} -> {ok, #{}};
|
||||
{ok, Map1} -> {ok, Map1}
|
||||
end,
|
||||
Count = maps:get(Label, Map, 0),
|
||||
dgiot_data:insert({Name, Registry}, Map#{Label => Count + Value}).
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, Map#{Label => Count + Value}).
|
||||
|
||||
dec(Registry, Name, Value) ->
|
||||
{ok, Count} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} -> {ok, 0};
|
||||
{ok, Count1} -> {ok, Count1}
|
||||
end,
|
||||
@ -172,33 +187,32 @@ dec(Registry, Name, Value) ->
|
||||
false ->
|
||||
Count - Value
|
||||
end,
|
||||
dgiot_data:insert({Name, Registry}, NewCount).
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, NewCount).
|
||||
|
||||
dec(Registry, Name, Value, Total, rate) ->
|
||||
{ok, Count} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
{error, not_find} -> {ok, 0};
|
||||
{ok, Count1} -> {ok, Count1}
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} ->
|
||||
{ok, 0};
|
||||
{ok, Count1} ->
|
||||
{ok, Count1}
|
||||
end,
|
||||
NewCount =
|
||||
case Count > Value of
|
||||
true ->
|
||||
Count - Value;
|
||||
false ->
|
||||
0
|
||||
end,
|
||||
case Total > 0 of
|
||||
true ->
|
||||
dgiot_data:insert({Name, Registry}, round(100 * NewCount / Total));
|
||||
OldValue = round(Count * Total / 100),
|
||||
NewValue = OldValue - Value,
|
||||
case NewValue =< 0 of
|
||||
false when Total > 0 ->
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, round(100 * NewValue / Total));
|
||||
_ ->
|
||||
dgiot_data:insert({Name, Registry}, 0)
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, 0)
|
||||
end.
|
||||
|
||||
dec(Registry, Name, Label, Value) ->
|
||||
{ok, Map} =
|
||||
case dgiot_data:lookup({Name, Registry}) of
|
||||
{error, not_find} -> {ok, #{}};
|
||||
{ok, Map1} -> {ok, Map1}
|
||||
case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
{error, not_find} ->
|
||||
{ok, #{}};
|
||||
{ok, Map1} ->
|
||||
{ok, Map1}
|
||||
end,
|
||||
Count = maps:get(Label, Map, 0),
|
||||
NewCount =
|
||||
@ -208,7 +222,7 @@ dec(Registry, Name, Label, Value) ->
|
||||
false ->
|
||||
0
|
||||
end,
|
||||
dgiot_data:insert({Name, Registry}, Map#{Label => NewCount}).
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, Map#{Label => NewCount}).
|
||||
|
||||
start(Registry) ->
|
||||
start_metrics(Registry).
|
||||
@ -216,18 +230,40 @@ start(Registry) ->
|
||||
start_metrics(Registry) ->
|
||||
dgiot_stats:new(Registry).
|
||||
|
||||
reset_metrics(Registry) ->
|
||||
Fun =
|
||||
fun({{Name, Registry1}, _V1}) when Registry1 == Registry ->
|
||||
case dgiot_data:get(?DGIOT_METRICS_ETS, {Name, Registry}) of
|
||||
Values when is_map(Values) ->
|
||||
LabelValues =
|
||||
maps:fold(fun(K, _V, Acc) ->
|
||||
Acc#{K => 0}
|
||||
end, #{}, Values),
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, LabelValues);
|
||||
_ ->
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, 0)
|
||||
end;
|
||||
(_) ->
|
||||
pass
|
||||
end,
|
||||
dgiot_data:loop(?DGIOT_METRICS_ETS, Fun).
|
||||
|
||||
init_metrics(#{name := Name, registry := Registry, labels := Labels}) ->
|
||||
case Labels of
|
||||
[] ->
|
||||
dgiot_data:insert({Name, Registry}, 0);
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, 0);
|
||||
[#{<<"values">> := Values}] ->
|
||||
dgiot_data:insert({Name, Registry}, lists:foldl(fun(Label, Acc) -> Acc#{Label => 0} end, #{}, Values))
|
||||
LabelsValue =
|
||||
lists:foldl(
|
||||
fun(Label, Acc) ->
|
||||
Acc#{Label => 0}
|
||||
end, #{}, Values),
|
||||
dgiot_data:insert(?DGIOT_METRICS_ETS, {Name, Registry}, LabelsValue)
|
||||
end.
|
||||
|
||||
|
||||
% Module = ?MODULE,
|
||||
collect_metrics(_Instance, Registry, Name, _Labels) ->
|
||||
{ok, Map} = dgiot_data:lookup({Name, Registry}),
|
||||
{ok, Map} = dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}),
|
||||
case is_map(Map) of
|
||||
true ->
|
||||
maps:fold(
|
||||
|
@ -360,17 +360,21 @@ new_registry(Registry, #{<<"type">> := Type, <<"name">> := Name, <<"help">> := H
|
||||
collect_metrics(Type, Labels, Spec)
|
||||
end.
|
||||
|
||||
% 只增不减 计数器
|
||||
collect_metrics(<<"counter">>, Labels, Spec) ->
|
||||
prometheus_counter:new(maps:to_list(Spec)),
|
||||
do_callback(Spec#{labels => Labels});
|
||||
% 可增可减 仪表盘
|
||||
collect_metrics(<<"gauge">>, Labels, Spec) ->
|
||||
prometheus_gauge:new(maps:to_list(Spec)),
|
||||
do_callback(Spec#{labels => Labels});
|
||||
collect_metrics(<<"summary">>, Labels, Spec) ->
|
||||
prometheus_summary:new(maps:to_list(Spec)),
|
||||
do_callback(Spec#{labels => Labels});
|
||||
% 直方图
|
||||
collect_metrics(<<"histogram">>, Labels, Spec) ->
|
||||
prometheus_histogram:new(maps:to_list(Spec)),
|
||||
do_callback(Spec#{labels => Labels});
|
||||
% 摘要型
|
||||
collect_metrics(<<"summary">>, Labels, Spec) ->
|
||||
prometheus_summary:new(maps:to_list(Spec)),
|
||||
do_callback(Spec#{labels => Labels}).
|
||||
|
||||
do_callback(Spec) ->
|
||||
@ -380,7 +384,7 @@ do_callback(Spec) ->
|
||||
load_config(File) ->
|
||||
case file:read_file(File) of
|
||||
{ok, Bin} ->
|
||||
{ok, jsx:decode(Bin, [{labels, binary}, return_maps])};
|
||||
{ok, jiffy:decode(Bin, [return_maps])};
|
||||
_ ->
|
||||
{error, <<"readfile failed">>}
|
||||
end.
|
||||
|
209
apps/dgiot/src/rules/dgiot_rule_maps.erl
Normal file
209
apps/dgiot/src/rules/dgiot_rule_maps.erl
Normal file
@ -0,0 +1,209 @@
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(dgiot_rule_maps).
|
||||
|
||||
-export([ nested_get/2
|
||||
, nested_get/3
|
||||
, nested_put/3
|
||||
, range_gen/2
|
||||
, range_get/3
|
||||
, atom_key_map/1
|
||||
, unsafe_atom_key_map/1
|
||||
]).
|
||||
|
||||
nested_get(Key, Data) ->
|
||||
nested_get(Key, Data, undefined).
|
||||
|
||||
nested_get({var, Key}, Data, Default) ->
|
||||
general_map_get({key, Key}, Data, Data, Default);
|
||||
nested_get({path, Path}, Data, Default) when is_list(Path) ->
|
||||
do_nested_get(Path, Data, Data, Default).
|
||||
|
||||
do_nested_get([Key|More], Data, OrgData, Default) ->
|
||||
case general_map_get(Key, Data, OrgData, undefined) of
|
||||
undefined -> Default;
|
||||
Val -> do_nested_get(More, Val, OrgData, Default)
|
||||
end;
|
||||
do_nested_get([], Val, _OrgData, _Default) ->
|
||||
Val.
|
||||
|
||||
nested_put(Key, Val, Data) when not is_map(Data),
|
||||
not is_list(Data) ->
|
||||
nested_put(Key, Val, #{});
|
||||
nested_put(_, undefined, Map) ->
|
||||
Map;
|
||||
nested_put({var, Key}, Val, Map) ->
|
||||
general_map_put({key, Key}, Val, Map, Map);
|
||||
nested_put({path, Path}, Val, Map) when is_list(Path) ->
|
||||
do_nested_put(Path, Val, Map, Map).
|
||||
|
||||
do_nested_put([Key|More], Val, Map, OrgData) ->
|
||||
SubMap = general_map_get(Key, Map, OrgData, undefined),
|
||||
general_map_put(Key, do_nested_put(More, Val, SubMap, OrgData), Map, OrgData);
|
||||
do_nested_put([], Val, _Map, _OrgData) ->
|
||||
Val.
|
||||
|
||||
general_map_get(Key, Map, OrgData, Default) ->
|
||||
general_find(Key, Map, OrgData,
|
||||
fun
|
||||
({equivalent, {_EquiKey, Val}}) -> Val;
|
||||
({found, {_Key, Val}}) -> Val;
|
||||
(not_found) -> Default
|
||||
end).
|
||||
|
||||
general_map_put(_Key, undefined, Map, _OrgData) ->
|
||||
Map;
|
||||
general_map_put(Key, Val, Map, OrgData) ->
|
||||
general_find(Key, Map, OrgData,
|
||||
fun
|
||||
({equivalent, {EquiKey, _Val}}) -> do_put(EquiKey, Val, Map, OrgData);
|
||||
(_) -> do_put(Key, Val, Map, OrgData)
|
||||
end).
|
||||
|
||||
general_find(KeyOrIndex, Data, OrgData, Handler) when is_binary(Data) ->
|
||||
try dgiot_json:decode(Data, [return_maps]) of
|
||||
Json -> general_find(KeyOrIndex, Json, OrgData, Handler)
|
||||
catch
|
||||
_:_ -> Handler(not_found)
|
||||
end;
|
||||
general_find({key, Key}, Map, _OrgData, Handler) when is_map(Map) ->
|
||||
case maps:find(Key, Map) of
|
||||
{ok, Val} -> Handler({found, {{key, Key}, Val}});
|
||||
error when is_atom(Key) ->
|
||||
%% the map may have an equivalent binary-form key
|
||||
BinKey = emqx_rule_utils:bin(Key),
|
||||
case maps:find(BinKey, Map) of
|
||||
{ok, Val} -> Handler({equivalent, {{key, BinKey}, Val}});
|
||||
error -> Handler(not_found)
|
||||
end;
|
||||
error when is_binary(Key) ->
|
||||
try %% the map may have an equivalent atom-form key
|
||||
AtomKey = list_to_existing_atom(binary_to_list(Key)),
|
||||
case maps:find(AtomKey, Map) of
|
||||
{ok, Val} -> Handler({equivalent, {{key, AtomKey}, Val}});
|
||||
error -> Handler(not_found)
|
||||
end
|
||||
catch error:badarg ->
|
||||
Handler(not_found)
|
||||
end;
|
||||
error ->
|
||||
Handler(not_found)
|
||||
end;
|
||||
general_find({key, _Key}, _Map, _OrgData, Handler) ->
|
||||
Handler(not_found);
|
||||
general_find({index, {const, Index0}} = IndexP, List, _OrgData, Handler) when is_list(List) ->
|
||||
handle_getnth(Index0, List, IndexP, Handler);
|
||||
general_find({index, Index0} = IndexP, List, OrgData, Handler) when is_list(List) ->
|
||||
Index1 = nested_get(Index0, OrgData),
|
||||
handle_getnth(Index1, List, IndexP, Handler);
|
||||
general_find({index, _}, List, _OrgData, Handler) when not is_list(List) ->
|
||||
Handler(not_found).
|
||||
|
||||
do_put({key, Key}, Val, Map, _OrgData) when is_map(Map) ->
|
||||
maps:put(Key, Val, Map);
|
||||
do_put({key, Key}, Val, Data, _OrgData) when not is_map(Data) ->
|
||||
#{Key => Val};
|
||||
do_put({index, {const, Index}}, Val, List, _OrgData) ->
|
||||
setnth(Index, List, Val);
|
||||
do_put({index, Index0}, Val, List, OrgData) ->
|
||||
Index1 = nested_get(Index0, OrgData),
|
||||
setnth(Index1, List, Val).
|
||||
|
||||
setnth(_, Data, Val) when not is_list(Data) ->
|
||||
setnth(head, [], Val);
|
||||
setnth(head, List, Val) when is_list(List) -> [Val | List];
|
||||
setnth(head, _List, Val) -> [Val];
|
||||
setnth(tail, List, Val) when is_list(List) -> List ++ [Val];
|
||||
setnth(tail, _List, Val) -> [Val];
|
||||
setnth(I, List, _Val) when not is_integer(I) -> List;
|
||||
setnth(0, List, _Val) -> List;
|
||||
setnth(I, List, _Val) when is_integer(I), I > 0 ->
|
||||
do_setnth(I, List, _Val);
|
||||
setnth(I, List, _Val) when is_integer(I), I < 0 ->
|
||||
lists:reverse(do_setnth(-I, lists:reverse(List), _Val)).
|
||||
|
||||
do_setnth(1, [_|Rest], Val) -> [Val|Rest];
|
||||
do_setnth(I, [E|Rest], Val) -> [E|setnth(I-1, Rest, Val)];
|
||||
do_setnth(_, [], _Val) -> [].
|
||||
|
||||
getnth(0, _) ->
|
||||
{error, not_found};
|
||||
getnth(I, L) when I > 0 ->
|
||||
do_getnth(I, L);
|
||||
getnth(I, L) when I < 0 ->
|
||||
do_getnth(-I, lists:reverse(L)).
|
||||
|
||||
do_getnth(I, L) ->
|
||||
try {ok, lists:nth(I, L)}
|
||||
catch error:_ -> {error, not_found}
|
||||
end.
|
||||
|
||||
handle_getnth(Index, List, IndexPattern, Handler) ->
|
||||
case getnth(Index, List) of
|
||||
{ok, Val} ->
|
||||
Handler({found, {IndexPattern, Val}});
|
||||
{error, _} ->
|
||||
Handler(not_found)
|
||||
end.
|
||||
|
||||
range_gen(Begin, End) ->
|
||||
lists:seq(Begin, End).
|
||||
|
||||
range_get(Begin, End, List) when is_list(List) ->
|
||||
do_range_get(Begin, End, List);
|
||||
range_get(_, _, _NotList) ->
|
||||
error({range_get, non_list_data}).
|
||||
|
||||
do_range_get(Begin, End, List) ->
|
||||
TotalLen = length(List),
|
||||
BeginIndex = index(Begin, TotalLen),
|
||||
EndIndex = index(End, TotalLen),
|
||||
lists:sublist(List, BeginIndex, (EndIndex - BeginIndex + 1)).
|
||||
|
||||
index(0, _) -> error({invalid_index, 0});
|
||||
index(Index, _) when Index > 0 -> Index;
|
||||
index(Index, Len) when Index < 0 ->
|
||||
Len + Index + 1.
|
||||
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% atom key map
|
||||
%%%-------------------------------------------------------------------
|
||||
atom_key_map(BinKeyMap) when is_map(BinKeyMap) ->
|
||||
maps:fold(
|
||||
fun(K, V, Acc) when is_binary(K) ->
|
||||
Acc#{binary_to_existing_atom(K, utf8) => atom_key_map(V)};
|
||||
(K, V, Acc) when is_list(K) ->
|
||||
Acc#{list_to_existing_atom(K) => atom_key_map(V)};
|
||||
(K, V, Acc) when is_atom(K) ->
|
||||
Acc#{K => atom_key_map(V)}
|
||||
end, #{}, BinKeyMap);
|
||||
atom_key_map(ListV) when is_list(ListV) ->
|
||||
[atom_key_map(V) || V <- ListV];
|
||||
atom_key_map(Val) -> Val.
|
||||
|
||||
unsafe_atom_key_map(BinKeyMap) when is_map(BinKeyMap) ->
|
||||
maps:fold(
|
||||
fun(K, V, Acc) when is_binary(K) ->
|
||||
Acc#{binary_to_atom(K, utf8) => unsafe_atom_key_map(V)};
|
||||
(K, V, Acc) when is_list(K) ->
|
||||
Acc#{list_to_atom(K) => unsafe_atom_key_map(V)};
|
||||
(K, V, Acc) when is_atom(K) ->
|
||||
Acc#{K => unsafe_atom_key_map(V)}
|
||||
end, #{}, BinKeyMap);
|
||||
unsafe_atom_key_map(ListV) when is_list(ListV) ->
|
||||
[unsafe_atom_key_map(V) || V <- ListV];
|
||||
unsafe_atom_key_map(Val) -> Val.
|
@ -25,7 +25,7 @@
|
||||
, clear_rule_payload/0
|
||||
]).
|
||||
|
||||
-import(emqx_rule_maps,
|
||||
-import(dgiot_rule_maps,
|
||||
[ nested_get/2
|
||||
, range_gen/2
|
||||
, range_get/3
|
||||
|
@ -18,7 +18,7 @@
|
||||
-behavior(dgiot_channelx).
|
||||
-define(TYPE, <<"MQTT">>).
|
||||
-author("johnliu").
|
||||
-record(state, {id, auth = <<"ProductSecret"/utf8>>, devaddr, deviceId}).
|
||||
-record(state, {id, auth = <<"ProductSecret"/utf8>>, count, devaddr, deviceId}).
|
||||
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
|
||||
@ -44,12 +44,31 @@
|
||||
type => string,
|
||||
required => true,
|
||||
default => <<"ProductSecret"/utf8>>,
|
||||
enum => [<<"ProductSecret"/utf8>>, <<"DeviceSecret"/utf8>>, <<"DeviceCert"/utf8>>],
|
||||
type => string,
|
||||
required => false,
|
||||
default => #{<<"value">> => <<"ProductSecret">>, <<"label">> => <<"一型一密"/utf8>>},
|
||||
enum => [
|
||||
#{<<"value">> => <<"ProductSecret">>, <<"label">> => <<"一型一密"/utf8>>},
|
||||
#{<<"value">> => <<"DeviceSecret">>, <<"label">> => <<"一机一密"/utf8>>},
|
||||
#{<<"value">> => <<"DeviceCert">>, <<"label">> => <<"设备证书"/utf8>>}
|
||||
],
|
||||
title => #{
|
||||
zh => <<"设备授权"/utf8>>
|
||||
},
|
||||
description => #{
|
||||
zh => <<"设备授权:一型一密:ProductSecret 一机一密: DeviceSecret 设备证书:DeviceCert "/utf8>>
|
||||
zh => <<"设备授权"/utf8>>
|
||||
}
|
||||
},
|
||||
<<"count">> => #{
|
||||
order => 2,
|
||||
type => integer,
|
||||
required => true,
|
||||
default => 10,
|
||||
title => #{
|
||||
zh => <<"压测数量"/utf8>>
|
||||
},
|
||||
description => #{
|
||||
zh => <<"压测数量:数量为-1时, 从设备档案数据库获取Device, 否则自动生成device"/utf8>>
|
||||
}
|
||||
},
|
||||
<<"ico">> => #{
|
||||
@ -68,71 +87,33 @@
|
||||
}
|
||||
}).
|
||||
|
||||
|
||||
start(ChannelId, ChannelArgs) ->
|
||||
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
|
||||
|
||||
%% 通道初始化
|
||||
init(?TYPE, ChannelId, #{
|
||||
<<"product">> := Products,
|
||||
<<"auth">> := Auth}) ->
|
||||
%% io:format("Products = ~p.~n", [Products]),
|
||||
lists:map(fun(X) ->
|
||||
case X of
|
||||
{ProductId, #{<<"ACL">> := Acl, <<"thing">> := Thing}} ->
|
||||
dgiot_data:insert({mqttd, ProductId}, {Acl, maps:get(<<"properties">>, Thing, [])}),
|
||||
%% 创建连接规则
|
||||
ConRawsql = <<"SELECT clientid, connected_at FROM \"$events/client_connected\" WHERE username = '", ProductId/binary, "'">>,
|
||||
dgiot_rule_handler:create_rules(<<"rule:connected_", ProductId/binary>>, ChannelId, <<"创建连接规则"/utf8>>, ConRawsql, <<"/${productid}/#">>),
|
||||
%% 创建断开连接规则
|
||||
DisRawsql = <<"SELECT clientid, disconnected_at FROM \"$events/client_disconnected\" WHERE username = '", ProductId/binary, "'">>,
|
||||
dgiot_rule_handler:create_rules(<<"rule:disconnected_", ProductId/binary>>, ChannelId, <<"断开连接规则"/utf8>>, DisRawsql, <<"/${productid}/#">>),
|
||||
%% 创建上传数据规则
|
||||
MetaRawsql = <<"SELECT payload.msg as msg,clientid,'", ProductId/binary, "' as productid FROM \"/", ProductId/binary, "/#\" WHERE username = '", ProductId/binary, "'">>,
|
||||
dgiot_rule_handler:create_rules(<<"rule:metadata_", ProductId/binary>>, ChannelId, <<"派生物模型上报规则"/utf8>>, MetaRawsql, <<"/${productid}/#">>);
|
||||
_ ->
|
||||
io:format("~s ~p X = ~p.~n", [?FILE, ?LINE, X]),
|
||||
pass
|
||||
end
|
||||
end, Products),
|
||||
dgiot_data:set_consumer(ChannelId, 20),
|
||||
<<"product">> := [{ProductId, _Product} |_],
|
||||
<<"auth">> := Auth,
|
||||
<<"count">> := Count}) ->
|
||||
State = #state{
|
||||
id = ChannelId,
|
||||
auth = Auth
|
||||
auth = Auth,
|
||||
count = Count
|
||||
},
|
||||
dgiot_rule_handler:sysc_rules(),
|
||||
emqx_rule_engine_api:list_rules(#{}, []),
|
||||
io:format("~s ~p ProductId ~p ~n",[?FILE, ?LINE, ProductId]),
|
||||
{ok, State};
|
||||
|
||||
init(?TYPE, _ChannelId, _Args) ->
|
||||
io:format("~s ~p _ChannelId ~p ~n",[?FILE, ?LINE, _ChannelId]),
|
||||
{ok, #{}}.
|
||||
|
||||
handle_init(State) ->
|
||||
{ok, State}.
|
||||
|
||||
% SELECT clientid, payload, topic FROM "meter"
|
||||
% SELECT clientid, disconnected_at FROM "$events/client_disconnected" WHERE username = 'dgiot'
|
||||
% SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'dgiot'
|
||||
handle_event('client.connected', {rule, #{clientid := DeviceId, connected_at := _ConnectedAt, peername := PeerName}, _Context}, #state{id = _ChannelId} = State) ->
|
||||
[DTUIP, _] = binary:split(PeerName, <<$:>>, [global, trim]),
|
||||
updat_device(DeviceId, DTUIP, <<"ONLINE">>),
|
||||
dgiot_device:online(DeviceId),
|
||||
{ok, State};
|
||||
|
||||
|
||||
handle_event('client.disconnected', {rule, #{clientid := DeviceId, disconnected_at := _DisconnectedAt, peername := PeerName}, _Context}, State) ->
|
||||
[DTUIP, _] = binary:split(PeerName, <<$:>>, [global, trim]),
|
||||
updat_device(DeviceId, DTUIP, <<"OFFLINE">>),
|
||||
dgiot_device:offline(DeviceId),
|
||||
{ok, State};
|
||||
|
||||
%% 通道消息处理,注意:进程池调用
|
||||
handle_event(_EventId, _Event, State) ->
|
||||
{ok, State}.
|
||||
|
||||
handle_message({rule, _Msg, _Context}, State) ->
|
||||
{ok, State};
|
||||
|
||||
handle_message(_Message, State) ->
|
||||
io:format("~s ~p _Message = ~p.~n", [?FILE, ?LINE, _Message]),
|
||||
{ok, State}.
|
||||
@ -140,15 +121,3 @@ handle_message(_Message, State) ->
|
||||
stop(_ChannelType, _ChannelId, _State) ->
|
||||
ok.
|
||||
|
||||
%% 更新设备
|
||||
updat_device(DeviceId, DTUIP, Status) ->
|
||||
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
|
||||
{ok, _Result} ->
|
||||
Body = #{
|
||||
<<"ip">> => DTUIP,
|
||||
<<"status">> => Status},
|
||||
dgiot_parse:update_object(<<"Device">>, DeviceId, Body);
|
||||
_R ->
|
||||
pass
|
||||
end.
|
||||
|
||||
|
@ -13,61 +13,43 @@
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(zeta_mqttc).
|
||||
-module(dlink_mqttc).
|
||||
-author("johnliu").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-include_lib("dgiot/include/dgiot_client.hrl").
|
||||
|
||||
-record(zeta, {tid}).
|
||||
|
||||
-export([childspec/2, star_client/2]).
|
||||
-export([childspec/2, start_client/2]).
|
||||
|
||||
%% API
|
||||
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
childspec(ChannelId, #{<<"host">> := Host, <<"port">> := Port}) ->
|
||||
#{<<"apiKey">> := Key, <<"apiSecret">> := Secret} = zeta_config:get_config(ChannelId, <<"zeta_auth">>),
|
||||
childspec(ChannelId, #{<<"host">> := Host, <<"port">> := Port,
|
||||
<<"username">> := UserName, <<"password">> := Password}) ->
|
||||
Options = #{
|
||||
host => dgiot_utils:to_list(Host),
|
||||
port => Port,
|
||||
username => dgiot_utils:to_list(Key),
|
||||
password => dgiot_utils:to_list(Secret),
|
||||
username => dgiot_utils:to_list(UserName),
|
||||
password => dgiot_utils:to_list(Password),
|
||||
proto_ver => v3,
|
||||
keepalive => 60,
|
||||
clean_start => true
|
||||
},
|
||||
Args = #{
|
||||
<<"channel">> => get_channel(ChannelId),
|
||||
<<"channel">> => ChannelId,
|
||||
<<"mod">> => ?MODULE,
|
||||
<<"options">> => Options
|
||||
},
|
||||
dgiot_client:register(get_channel(ChannelId), mqtt_client_sup, Args).
|
||||
dgiot_client:register(ChannelId, mqtt_client_sup, Args).
|
||||
|
||||
get_channel(ChannelId) when is_atom(ChannelId) ->
|
||||
get_channel(dgiot_utils:to_binary(ChannelId));
|
||||
get_channel(ChannelId) ->
|
||||
ProductId = get_productid(),
|
||||
dgiot_utils:to_atom(<<ProductId/binary, "_", ChannelId/binary>>).
|
||||
|
||||
get_productid() ->
|
||||
<<"a51704b2cf">>. % zeta订阅压测
|
||||
|
||||
%%推送协议:MQTT
|
||||
%%➢ 连接地址:服务器 IP
|
||||
%%➢ 连接端口:1883
|
||||
%%➢ 连接方式:TCP
|
||||
%%➢ 认证参数
|
||||
%%用户名:api_key(企业编码)
|
||||
%%密码:api_secret(企业秘钥)
|
||||
%%clientID:api_key:api_secret: + 三位随机数字。 如:api_key:api_secret125
|
||||
%%注:服务器 IP 由平台运营商提供,企业编码、企业秘钥可在 ZETA 网管平台 -> 权限管理 -> 企业信息中获得
|
||||
star_client(_ChannelId, 0) ->
|
||||
start_client(_ChannelId, 0) ->
|
||||
pass;
|
||||
star_client(ChannelId, Count) ->
|
||||
#{<<"apiKey">> := Key, <<"apiSecret">> := Secret} = zeta_config:get_config(ChannelId, <<"zeta_auth">>),
|
||||
ClientId = list_to_binary(lists:concat([binary_to_list(Key), ":", binary_to_list(Secret), ":", io_lib:format("~3.10.0B", [Count])])),
|
||||
dgiot_client:start(get_channel(ChannelId), ClientId),
|
||||
star_client(ChannelId, Count - 1).
|
||||
start_client(ChannelId, Count) ->
|
||||
ClientId = <<"">>,
|
||||
dgiot_client:start(ChannelId, ClientId),
|
||||
start_client(ChannelId, Count - 1).
|
||||
|
||||
|
||||
%% callback
|
||||
@ -128,4 +110,4 @@ code_change(_OldVsn, Dclient, _Extra) ->
|
||||
{ok, Dclient}.
|
||||
|
||||
update(ChannelId) ->
|
||||
dgiot_data:insert({<<"mqtt_online">>, zeta_metrics}, dgiot_client:count(ChannelId)).
|
||||
dgiot_data:insert({<<"mqtt_online">>, dlink_metrics}, dgiot_client:count(ChannelId)).
|
@ -21,7 +21,10 @@
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
|
||||
-export([login/3]).
|
||||
-export([properties_report/3]).
|
||||
-export([
|
||||
properties_report/3
|
||||
,firmware_report/3
|
||||
]).
|
||||
|
||||
|
||||
properties_report(ProductId, DevAddr, Payload) when is_map(Payload) ->
|
||||
@ -31,7 +34,21 @@ properties_report(ProductId, DevAddr, Payload) when is_map(Payload) ->
|
||||
properties_report(ProductId, DevAddr, Payload) ->
|
||||
lists:map(fun
|
||||
({ChannelId, _Ctype}) ->
|
||||
dgiot_channelx:do_message(ChannelId, {dlink_device_report, ProductId, DevAddr, Payload});
|
||||
dgiot_channelx:do_message(ChannelId, {dlink_properties_report, ProductId, DevAddr, Payload});
|
||||
(_) ->
|
||||
pass
|
||||
end, dgiot_bridge:get_proctol_channel(ProductId)),
|
||||
io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]),
|
||||
ok.
|
||||
|
||||
firmware_report(ProductId, DevAddr, Payload) when is_map(Payload) ->
|
||||
io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]),
|
||||
dgiot_task:save_td(ProductId, DevAddr, Payload, #{});
|
||||
|
||||
firmware_report(ProductId, DevAddr, Payload) ->
|
||||
lists:map(fun
|
||||
({ChannelId, _Ctype}) ->
|
||||
dgiot_channelx:do_message(ChannelId, {dlink_firmware_report, ProductId, DevAddr, Payload});
|
||||
(_) ->
|
||||
pass
|
||||
end, dgiot_bridge:get_proctol_channel(ProductId)),
|
||||
|
@ -27,20 +27,14 @@
|
||||
-define(EMPTY_USERNAME, <<"">>).
|
||||
|
||||
on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, payload = Payload, from = _ClientId, headers = _Headers}, _State) ->
|
||||
case re:split(Topic, <<"/">>) of
|
||||
[ProductId, DevAddr, <<"properties">>, <<"report">>] ->
|
||||
NewPayload =
|
||||
case jsx:is_json(Payload) of
|
||||
true ->
|
||||
jiffy:decode(Payload,[return_maps]);
|
||||
false ->
|
||||
Payload
|
||||
end,
|
||||
%% io:format("~s ~p NewPayload: ~p~n", [?FILE, ?LINE, NewPayload]),
|
||||
dgiot_dlink_proctol:properties_report(ProductId, DevAddr, NewPayload);
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
case re:split(Topic, <<"/">>) of
|
||||
[ProductId, DevAddr, <<"properties">>, <<"report">>] ->
|
||||
dgiot_dlink_proctol:properties_report(ProductId, DevAddr, get_payload(Payload));
|
||||
[ProductId, DevAddr, <<"firmware">>, <<"report">>] ->
|
||||
dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload));
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
{ok, Message};
|
||||
|
||||
on_message_publish(Message, _State) ->
|
||||
@ -63,3 +57,12 @@ on_message_publish(Message, _State) ->
|
||||
%% ?LOG(debug, "Message acked by client(~s): ~s~n",
|
||||
%% [ClientId, emqx_message:format(Message)]),
|
||||
%% ok.
|
||||
|
||||
get_payload(Payload) ->
|
||||
io:format("~s ~p Payload: ~p~n", [?FILE, ?LINE, Payload]),
|
||||
case jsx:is_json(Payload) of
|
||||
true ->
|
||||
jiffy:decode(Payload, [return_maps]);
|
||||
false ->
|
||||
Payload
|
||||
end.
|
Loading…
Reference in New Issue
Block a user