mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-01 19:58:46 +08:00
feat: Optimized list cycle
This commit is contained in:
parent
40bc639088
commit
a4d32c9c16
1
.gitignore
vendored
1
.gitignore
vendored
@ -102,6 +102,7 @@ apps/dgiot_modbusx/
|
||||
apps/dgiot_rtsp2ws/
|
||||
apps/dgiot_sophon/
|
||||
apps/dgiot_open62541/
|
||||
apps/dgiot_tdenginex/
|
||||
node_modules/
|
||||
package-lock.json
|
||||
erlang_ls.config
|
||||
|
@ -29,37 +29,44 @@ add(Key, Fun) ->
|
||||
add(HookType, Key, Fun) ->
|
||||
Hooks = get_hooks(),
|
||||
New = case HookType of
|
||||
one_for_more ->
|
||||
Old = maps:get(Key, Hooks, []),
|
||||
[Fun|Old];
|
||||
one_for_one ->
|
||||
[Fun]
|
||||
end,
|
||||
dgiot_data:insert(dgiot_hook, Hooks#{ Key => New }).
|
||||
one_for_more ->
|
||||
Old = maps:get(Key, Hooks, []),
|
||||
[Fun | Old];
|
||||
one_for_one ->
|
||||
[Fun]
|
||||
end,
|
||||
dgiot_data:insert(dgiot_hook, Hooks#{Key => New}).
|
||||
|
||||
remove(Key) ->
|
||||
Hooks = get_hooks(),
|
||||
NewHooks = maps:remove(Key, Hooks),
|
||||
dgiot_data:insert(dgiot_hook, NewHooks).
|
||||
NewHooks = maps:remove(Key, Hooks),
|
||||
dgiot_data:insert(dgiot_hook, NewHooks).
|
||||
|
||||
run_hook(Key, Args) ->
|
||||
Funs = get_hooks(Key),
|
||||
case Funs of
|
||||
not_find ->
|
||||
{error, not_find};
|
||||
_ ->
|
||||
{_NewFuns, Rtns} = lists:foldl(
|
||||
fun(Fun, {Acc, R}) ->
|
||||
case catch (do_hook(Fun, Args)) of
|
||||
{'EXIT', Reason} ->
|
||||
?LOG(error,"do hook error, Args:~p -> ~p~n", [Args, Reason]),
|
||||
{Acc, R};
|
||||
Result ->
|
||||
{[Fun | Acc], [Result | R]}
|
||||
end
|
||||
end, {[], []}, Funs),
|
||||
{ok, Rtns}
|
||||
end.
|
||||
Funs = get_hooks(Key),
|
||||
case Funs of
|
||||
not_find ->
|
||||
{error, not_find};
|
||||
_ ->
|
||||
Rtns = fold_hook(Funs, Args),
|
||||
{ok, Rtns}
|
||||
end.
|
||||
|
||||
fold_hook(Funs, Args) ->
|
||||
fold_hook(Funs, Args, []).
|
||||
fold_hook([], _Args, Acc) ->
|
||||
Acc;
|
||||
fold_hook([Fun | Funs], Args, Acc) ->
|
||||
NewAcc =
|
||||
case catch (do_hook(Fun, Args)) of
|
||||
{'EXIT', Reason} ->
|
||||
?LOG(error, "do hook error, Args:~p -> ~p~n", [Args, Reason]),
|
||||
Acc;
|
||||
Result ->
|
||||
[Result | Acc]
|
||||
end,
|
||||
erlang:garbage_collect(self()),
|
||||
fold_hook(Funs, Args, NewAcc).
|
||||
|
||||
do_hook({M, F}, Args) when is_list(Args) ->
|
||||
apply(M, F, Args);
|
||||
@ -71,14 +78,14 @@ do_hook(Fun, Args) ->
|
||||
Fun(Args).
|
||||
|
||||
get_hooks(Key) ->
|
||||
Hooks = get_hooks(),
|
||||
maps:get(Key, Hooks, not_find).
|
||||
Hooks = get_hooks(),
|
||||
maps:get(Key, Hooks, not_find).
|
||||
|
||||
get_hooks() ->
|
||||
case dgiot_data:lookup(dgiot_hook) of
|
||||
{ok, Hooks} -> Hooks;
|
||||
{error, not_find} -> #{}
|
||||
end.
|
||||
case dgiot_data:lookup(dgiot_hook) of
|
||||
{ok, Hooks} -> Hooks;
|
||||
{error, not_find} -> #{}
|
||||
end.
|
||||
|
||||
|
||||
|
||||
|
@ -34,7 +34,7 @@ start_link(Opts) ->
|
||||
supervisor:start_link({local, ?SERVER}, ?MODULE, Opts).
|
||||
|
||||
init(Opts) ->
|
||||
MaxSize = proplists:get_value(ets_maxsize, Opts, 32 * 1024 * 1024),
|
||||
MaxSize = proplists:get_value(ets_maxsize, Opts, 1024 * 1024 * 1024),
|
||||
Threshold = proplists:get_value(ets_threshold, Opts, 0.85),
|
||||
Weight = proplists:get_value(ets_weight, Opts, 30),
|
||||
ValOpts = [{ets_maxsize, MaxSize}, {ets_threshold, Threshold}, {checkpid, dgiot_cache_check_worker}],
|
||||
|
@ -77,7 +77,7 @@ stop(NameOrPid) ->
|
||||
%%%===================================================================
|
||||
|
||||
init(Opts) ->
|
||||
MaxSize = proplists:get_value(ets_maxsize, Opts, 8 * 1024 * 1024),
|
||||
MaxSize = proplists:get_value(ets_maxsize, Opts, 1024 * 1024 * 1024),
|
||||
Threshold = proplists:get_value(ets_threshold, Opts, 0.85),
|
||||
CheckPid = proplists:get_value(checkpid, Opts),
|
||||
ValueEts = ets:new(?MODULE, [public, named_table, {write_concurrency, true}, {read_concurrency, true}]),
|
||||
|
@ -182,7 +182,7 @@ handle_info({tcp_closed, Sock}, #state{mod = Mod, child = #tcp{clientid = Client
|
||||
DTUIP = dgiot_utils:get_ip(Sock),
|
||||
write_log(ChildState#tcp.log, <<"ERROR ", DTUIP/binary, " ", Clientid/binary>>, <<"tcp_closed">>),
|
||||
dgiot_metrics:dec(dgiot, <<"tcp_online">>, 1),
|
||||
?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]),
|
||||
%% ?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]),
|
||||
case Mod:handle_info(tcp_closed, ChildState) of
|
||||
{noreply, NewChild} ->
|
||||
{stop, normal, State#state{child = NewChild#tcp{socket = undefined}}};
|
||||
|
@ -136,12 +136,12 @@ post_properties(<<"plc">>, AtomName) ->
|
||||
end, [], Things);
|
||||
|
||||
post_properties(<<"dlink">>, AtomName) ->
|
||||
Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11' | '_']}),
|
||||
lists:foldl(fun([Index, Devicetype, Name, Identifier, Key, Len, AccessMode, Min_Max, Unit, Type, Specs | _], Acc) ->
|
||||
Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11', '$12' | '_']}),
|
||||
lists:foldl(fun([Index, Devicetype, Name, Identifier, Key, Len, Isstorage, AccessMode, Min_Max, Unit, Type, Specs | _], Acc) ->
|
||||
Acc++ [#{
|
||||
<<"name">> => Name,
|
||||
<<"index">> => Index,
|
||||
<<"isstorage">> => true,
|
||||
<<"isstorage">> => dgiot_utils:to_int(Isstorage),
|
||||
<<"isshow">> => true,
|
||||
<<"dataForm">> => #{
|
||||
<<"address">> => <<"0">>,
|
||||
|
@ -105,7 +105,7 @@ load_channel(Channels, Fun) ->
|
||||
<<"method">> => <<"GET">>,
|
||||
<<"path">> => <<"/classes/Product">>,
|
||||
<<"body">> => #{
|
||||
<<"keys">> => [<<"decoder">>, <<"ACL">>, <<"name">>, <<"channel">>, <<"dynamicReg">>, <<"devType">>, <<"nodeType">>, <<"productSecret">>, <<"config">>, <<"thing">>, <<"topics">>],
|
||||
<<"keys">> => [<<"decoder">>, <<"ACL">>, <<"name">>, <<"channel">>, <<"dynamicReg">>, <<"devType">>, <<"nodeType">>, <<"productSecret">>, <<"config">>, <<"topics">>],
|
||||
<<"include">> => [<<"Dict">>],
|
||||
<<"where">> => #{
|
||||
<<"$relatedTo">> => #{
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
-define(DGIOT_PRODUCT, dgiot_product).
|
||||
-define(DGIOT_PRODUCT_IDENTIFIE, dgiot_product_identifie).
|
||||
-define(DGIOT_PRODUCT_STAB, dgiot_product_stab).
|
||||
-define(DEVICE_PROFILE, dgiot_device_profile).
|
||||
-define(DEVICE_DEVICE_COLOR, dgiot_device_color).
|
||||
-define(DGIOT_CHANNEL_SESSION, dgiot_channel_session).
|
||||
|
@ -25,16 +25,18 @@
|
||||
-export([get_prop/1, get_props/1, get_props/2, get_unit/1, update_properties/2, update_properties/0]).
|
||||
-export([update_topics/0, update_product_filed/1]).
|
||||
-export([save_devicetype/1, get_devicetype/1, get_device_thing/2, get_productSecret/1]).
|
||||
-export([save_/1, get_keys/1, get_control/1, save_control/1, get_interval/1, save_device_thingtype/1, get_product_identifier/2, hook_topic/1]).
|
||||
-export([save_/1, get_keys/1, get_sub_tab/1, get_control/1, save_control/1, get_interval/1, get_product_identifier/2, hook_topic/1]).
|
||||
|
||||
init_ets() ->
|
||||
dgiot_data:init(?DGIOT_PRODUCT, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
|
||||
dgiot_data:init(?DGIOT_PRODUCT_IDENTIFIE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
|
||||
dgiot_data:init(?DGIOT_PRODUCT_STAB, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
|
||||
dgiot_data:init(?DGIOT_CHANNEL_SESSION, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
|
||||
dgiot_data:init(?DEVICE_DEVICE_COLOR, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]),
|
||||
dgiot_data:init(?DEVICE_PROFILE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]).
|
||||
|
||||
load_all_cache({Skip}) ->
|
||||
|
||||
case dgiot_parsex:query_object(<<"Product">>, #{<<"limit">> => 1000, <<"skip">> => Skip}) of
|
||||
{ok, #{<<"results">> := Results}} when length(Results) == 0 ->
|
||||
load_end;
|
||||
@ -47,6 +49,7 @@ load_all_cache({Skip}) ->
|
||||
{next, Skip}
|
||||
end.
|
||||
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
save_prod(ProductId, #{<<"thing">> := _thing} = Product) ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product),
|
||||
@ -170,34 +173,58 @@ get_devicetype(ProductId) ->
|
||||
DeviceTypes
|
||||
end.
|
||||
|
||||
save_product_thing(ProductId, Identifier, undefined, Profile, DeviceType, Prop, Type, 0) ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
|
||||
save_product_thing({ProductId, device_thing, DeviceType}, #{Identifier => Type}, map),
|
||||
save_product_thing({ProductId, devicetype}, [DeviceType], list),
|
||||
save_product_thing({ProductId, profile_control}, #{Identifier => Profile}, map);
|
||||
|
||||
%% 设备类型
|
||||
save_device_thingtype(ProductId, DeviceType, NewMap) ->
|
||||
case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}) of
|
||||
save_product_thing(ProductId, Identifier, Key, undefined, DeviceType, Prop, Type, Isstorage) ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
|
||||
save_product_thing({ProductId, device_thing, DeviceType}, #{Identifier => Type}, map),
|
||||
save_product_thing({ProductId, keys}, [Key], list),
|
||||
save_product_thing({ProductId, devicetype}, [DeviceType], list),
|
||||
|
||||
<<PId:10/binary, _/binary>> = dgiot_utils:to_md5(<<ProductId/binary, Isstorage/binary>>),
|
||||
save_product_thing({ProductId, sub_tab}, [PId], list),
|
||||
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {PId, Identifier, identifie}, Prop),
|
||||
save_product_thing({PId, device_thing, DeviceType}, #{Identifier => Type}, map),
|
||||
save_product_thing({PId, keys}, [Key], list),
|
||||
save_product_thing({PId, devicetype}, [DeviceType], list),
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_STAB, {ProductId, Identifier, stab}, PId);
|
||||
|
||||
save_product_thing(ProductId, Identifier, Key, Profile, DeviceType, Prop, Type, Isstorage) ->
|
||||
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
|
||||
save_product_thing({ProductId, device_thing, DeviceType}, #{Identifier => Type}, map),
|
||||
save_product_thing({ProductId, keys}, [Key], list),
|
||||
save_product_thing({ProductId, devicetype}, [DeviceType], list),
|
||||
save_product_thing({ProductId, profile_control}, #{Identifier => Profile}, map),
|
||||
|
||||
<<PId:10/binary, _/binary>> = dgiot_utils:to_md5(<<ProductId/binary, Isstorage/binary>>),
|
||||
save_product_thing({ProductId, sub_tab}, [PId], list),
|
||||
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
|
||||
save_product_thing({PId, device_thing, DeviceType}, #{Identifier => Type}, map),
|
||||
save_product_thing({PId, keys}, [Key], list),
|
||||
save_product_thing({PId, devicetype}, [DeviceType], list),
|
||||
save_product_thing({PId, profile_control}, #{Identifier => Profile}, map),
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_STAB, {ProductId, Identifier, stab}, PId).
|
||||
|
||||
save_product_thing(Key, Value, list) ->
|
||||
case dgiot_data:get(?DGIOT_PRODUCT, Key) of
|
||||
not_find ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, NewMap);
|
||||
Map ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, dgiot_map:merge(Map, NewMap))
|
||||
end.
|
||||
|
||||
save_device_thingtype(ProductId) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:map(
|
||||
fun
|
||||
(#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"dataType">> := #{<<"type">> := Type}}) ->
|
||||
case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}) of
|
||||
not_find ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, #{Identifier => Type});
|
||||
Map ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, Map#{Identifier => Type})
|
||||
end;
|
||||
(_) ->
|
||||
pass
|
||||
end, Props);
|
||||
|
||||
_Error ->
|
||||
[]
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, Key, Value);
|
||||
Values ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, Key, dgiot_utils:unique_2(Values ++ Value))
|
||||
end;
|
||||
save_product_thing(Key, Value, map) ->
|
||||
case dgiot_data:get(?DGIOT_PRODUCT, Key) of
|
||||
not_find ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, Key, Value);
|
||||
Values ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, Key, dgiot_map:merge(Values, Value))
|
||||
end.
|
||||
|
||||
%% 物模型标识符
|
||||
@ -302,34 +329,45 @@ update_product_filed(_Filed) ->
|
||||
end.
|
||||
|
||||
save_(ProductId) ->
|
||||
{Keys, Control, DeviceTypes} =
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} ->
|
||||
delete_product_identifier(ProductId),
|
||||
Tags = maps:get(<<"tags">>, Thing, []),
|
||||
lists:foldl(
|
||||
fun
|
||||
(#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
|
||||
save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}),
|
||||
{Acc ++ [Identifier], Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]};
|
||||
(#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
|
||||
save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}),
|
||||
{Acc, Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]};
|
||||
(#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop),
|
||||
save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}),
|
||||
{Acc ++ [Identifier], Ccc, Dcc ++ [DeviceType]};
|
||||
(_, {Acc, Ccc, Dcc}) ->
|
||||
{Acc, Ccc, Dcc}
|
||||
end, {[], #{}, []}, Props ++ Tags);
|
||||
_Error ->
|
||||
{[], #{}, []}
|
||||
end,
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, keys}, Keys),
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, profile_control}, Control),
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, devicetype}, dgiot_utils:unique_2(DeviceTypes)).
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} ->
|
||||
delete_product_identifier(ProductId),
|
||||
Tags = maps:get(<<"tags">>, Thing, []),
|
||||
fold_prop(ProductId, Props ++ Tags);
|
||||
_Error ->
|
||||
{[], #{}, []}
|
||||
end.
|
||||
|
||||
fold_prop(ProductId, Props) when length(Props) =< 40 ->
|
||||
fold_prop_(ProductId, Props);
|
||||
|
||||
fold_prop(ProductId, [A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20,
|
||||
A21, A22, A23, A24, A25, A26, A27, A28, A29, A30, A31, A32, A33, A34, A35, A36, A37, A38, A39, A40 | Tail]) ->
|
||||
Head = [A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20,
|
||||
A21, A22, A23, A24, A25, A26, A27, A28, A29, A30, A31, A32, A33, A34, A35, A36, A37, A38, A39, A40],
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
fold_prop(ProductId, Tail),
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
fold_prop_(ProductId, Head),
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
erlang:garbage_collect(self()).
|
||||
|
||||
fold_prop_(_ProductId, []) ->
|
||||
ok;
|
||||
|
||||
fold_prop_(ProductId, [#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := Isstorage,
|
||||
<<"dataType">> := #{<<"type">> := Type}} = Prop | Props]) when Isstorage > 0 ->
|
||||
save_product_thing(ProductId, Identifier, Identifier, maps:get(<<"profile">>, Prop, undefined), DeviceType, Prop, Type, dgiot_utils:to_binary(Isstorage)),
|
||||
fold_prop_(ProductId, Props);
|
||||
|
||||
fold_prop_(ProductId, [#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"profile">> := Profile,
|
||||
<<"dataType">> := #{<<"type">> := Type}} = Prop | Props]) ->
|
||||
save_product_thing(ProductId, Identifier, undefined, Profile, DeviceType, Prop, Type, 0),
|
||||
fold_prop_(ProductId, Props);
|
||||
|
||||
fold_prop_(ProductId, [_Prop | Props]) ->
|
||||
fold_prop_(ProductId, Props).
|
||||
|
||||
|
||||
get_keys(ProductId) ->
|
||||
case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, keys}) of
|
||||
@ -339,6 +377,14 @@ get_keys(ProductId) ->
|
||||
Keys
|
||||
end.
|
||||
|
||||
get_sub_tab(ProductId) ->
|
||||
case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, sub_tab}) of
|
||||
not_find ->
|
||||
[];
|
||||
Keys ->
|
||||
Keys
|
||||
end.
|
||||
|
||||
get_interval(ProductId) ->
|
||||
case lookup_prod(ProductId) of
|
||||
{ok, #{<<"config">> := #{<<"interval">> := Interval}}} ->
|
||||
|
@ -134,14 +134,14 @@ do_request(get_echart_deviceid, #{<<"deviceid">> := DeviceId, <<"style">> := Sty
|
||||
{error, Error} -> {error, Error};
|
||||
{ok, Channel} ->
|
||||
case dgiot_parsex:get_object(<<"Device">>, DeviceId) of
|
||||
{ok, #{<<"objectId">> := DeviceId, <<"product">> := #{<<"objectId">> := ProductId}}} ->
|
||||
{ok, #{<<"objectId">> := DeviceId, <<"devaddr">> := Devaddr, <<"product">> := #{<<"objectId">> := ProductId}}} ->
|
||||
case Style of
|
||||
<<"amis_table">> ->
|
||||
dgiot_device_echart:get_data_by_month(Channel, ProductId, DeviceId, Args);
|
||||
<<"echart_category">> ->
|
||||
dgiot_device_echart:get_data_by_echart_category(Channel, ProductId, DeviceId, Args);
|
||||
_ ->
|
||||
dgiot_device_echart:get_echart_data(Channel, ProductId, DeviceId, Args)
|
||||
dgiot_device_echart:get_echart_data(Channel, ProductId, DeviceId, Devaddr, Args)
|
||||
|
||||
end;
|
||||
_ ->
|
||||
@ -158,9 +158,9 @@ do_request(get_devicecard_deviceid, #{<<"deviceid">> := DeviceId} = Args, #{<<"s
|
||||
{error, Error};
|
||||
{ok, Channel} ->
|
||||
case dgiot_parsex:get_object(<<"Device">>, DeviceId) of
|
||||
{ok, #{<<"objectId">> := DeviceId, <<"product">> := #{<<"objectId">> := ProductId}}} ->
|
||||
{ok, #{<<"objectId">> := DeviceId, <<"devaddr">> := Devaddr, <<"product">> := #{<<"objectId">> := ProductId}}} ->
|
||||
dgiot_mqtt:subscribe_route_key([<<"$dg/user/realtimecard/", DeviceId/binary, "/#">>], <<"realtimecard">>, SessionToken),
|
||||
dgiot_device_card:get_device_card(Channel, ProductId, DeviceId, Args);
|
||||
dgiot_device_card:get_devcard(Channel, ProductId, DeviceId, Devaddr, Args);
|
||||
_ ->
|
||||
{error, <<"not find device">>}
|
||||
end
|
||||
|
@ -20,7 +20,22 @@
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-include_lib("dgiot_tdengine/include/dgiot_tdengine.hrl").
|
||||
|
||||
-export([get_card/5, get_device_card/4]).
|
||||
-export([get_devcard/5, get_card/5, get_device_card/4]).
|
||||
|
||||
get_devcard(Channel, ProductId, DeviceId, Devaddr, Args) ->
|
||||
Chartdata =
|
||||
case dgiot_product:get_sub_tab(ProductId) of
|
||||
not_find ->
|
||||
get_device_card(Channel, ProductId, DeviceId, Args);
|
||||
Subs when length(Subs) > 1 ->
|
||||
lists:foldl(fun(SubId, Acc) ->
|
||||
SubDevid = dgiot_parse_id:get_deviceid(SubId, Devaddr),
|
||||
Acc ++ get_device_card(Channel, SubId, SubDevid, Args)
|
||||
end, [], Subs);
|
||||
_ ->
|
||||
get_device_card(Channel, ProductId, DeviceId, Args)
|
||||
end,
|
||||
{ok, #{<<"data">> => Chartdata}}.
|
||||
|
||||
get_device_card(Channel, ProductId, DeviceId, Args) ->
|
||||
TableName = ?Table(DeviceId),
|
||||
@ -31,8 +46,8 @@ get_device_card(Channel, ProductId, DeviceId, Args) ->
|
||||
_ ->
|
||||
[#{}]
|
||||
end,
|
||||
Chartdata = get_card(ProductId, Results, DeviceId, Args, dgiot_data:get({shard_storage, ProductId})),
|
||||
{ok, #{<<"data">> => Chartdata}}.
|
||||
get_card(ProductId, Results, DeviceId, Args, dgiot_data:get({shard_storage, ProductId})).
|
||||
|
||||
|
||||
decode_shard_data(Data, Result) ->
|
||||
case binary:split(Data, <<$,>>, [global, trim]) of
|
||||
|
@ -20,69 +20,105 @@
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-include_lib("dgiot_tdengine/include/dgiot_tdengine.hrl").
|
||||
|
||||
-export([get_echart_data/4]).
|
||||
-export([get_echart_data/5, get_echart_data/4]).
|
||||
-export([get_data_by_month/4, get_data_by_echart_category/4, get_keys/2, get_table/2]).
|
||||
|
||||
get_echart_data(Channel, ProductId, DeviceId, Args) ->
|
||||
Query = maps:without([<<"productid">>, <<"deviceid">>], Args),
|
||||
Interval = maps:get(<<"interval">>, Args),
|
||||
TableName = ?Table(DeviceId),
|
||||
get_echart_data(Channel, ProductId, DeviceId, Devaddr, Args) ->
|
||||
{Names, Results} =
|
||||
case dgiot_device_tdengine:get_history_data(Channel, ProductId, TableName, Query) of
|
||||
{TdNames, {ok, #{<<"results">> := TdResults}}} ->
|
||||
{TdNames, TdResults};
|
||||
case dgiot_product:get_sub_tab(ProductId) of
|
||||
not_find ->
|
||||
get_echart_data(Channel, ProductId, DeviceId, Args);
|
||||
Subs when length(Subs) > 1 ->
|
||||
TableNames =
|
||||
lists:foldl(fun(SubId, Acc) ->
|
||||
SubDevid = dgiot_parse_id:get_deviceid(SubId, Devaddr),
|
||||
Acc ++ [SubDevid]
|
||||
end, [], Subs),
|
||||
get_echart_data(Channel, ProductId, TableNames, Args);
|
||||
_ ->
|
||||
{[], []}
|
||||
get_echart_data(Channel, ProductId, DeviceId, Args)
|
||||
end,
|
||||
Interval = maps:get(<<"interval">>, Args),
|
||||
Chartdata = get_echart(ProductId, Results, Names, Interval),
|
||||
{ok, #{<<"chartData">> => Chartdata}}.
|
||||
|
||||
get_echart(ProductId, Results, Names, Interval) ->
|
||||
Maps = dgiot_product:get_prop(ProductId),
|
||||
Units = dgiot_product:get_unit(ProductId),
|
||||
NewMaps = maps:merge(#{<<"createdat">> => <<"日期"/utf8>>}, Maps),
|
||||
Columns = [<<"日期"/utf8>>] ++ Names,
|
||||
Rows =
|
||||
lists:foldl(fun(Line, Lines) ->
|
||||
NewLine =
|
||||
maps:fold(fun(K, V, Acc) ->
|
||||
case maps:find(K, NewMaps) of
|
||||
error ->
|
||||
Acc;
|
||||
{ok, Name} ->
|
||||
case Name of
|
||||
<<"日期"/utf8>> ->
|
||||
NewV = dgiot_tdengine_field:get_time(V, Interval),
|
||||
Acc#{Name => NewV};
|
||||
_ ->
|
||||
Acc#{Name => V}
|
||||
end
|
||||
end
|
||||
end, #{}, Line),
|
||||
Lines ++ [NewLine]
|
||||
end, [], Results),
|
||||
%% io:format("~s ~p Rows = ~p.~n", [?FILE, ?LINE, Rows]),
|
||||
ChildRows = lists:foldl(fun(X, Acc1) ->
|
||||
Date = maps:get(<<"日期"/utf8>>, X, dgiot_datetime:format(dgiot_datetime:to_localtime(dgiot_datetime:now_secs()), <<"YY-MM-DD HH:NN:SS">>)),
|
||||
maps:fold(fun(K1, V1, Acc) ->
|
||||
case maps:find(K1, Acc) of
|
||||
get_echart_data(Channel, ProductId, DeviceId, Args) ->
|
||||
Query = maps:without([<<"productid">>, <<"deviceid">>], Args),
|
||||
case dgiot_device_tdengine:get_history_data(Channel, ProductId, DeviceId, Query) of
|
||||
{TdNames, {ok, #{<<"results">> := TdResults}}} ->
|
||||
{TdNames, TdResults};
|
||||
_ ->
|
||||
{[], []}
|
||||
end.
|
||||
|
||||
|
||||
format_line(ProductId, Interval, Line, Acc) when length(Line) > 1 ->
|
||||
Acc ++ [format_line_(ProductId, Interval, Line, #{})];
|
||||
format_line(_ProductId, _Interval, _Line, Acc) ->
|
||||
Acc.
|
||||
|
||||
format_line_(_ProductId, _Interval, [], Acc) ->
|
||||
Acc;
|
||||
|
||||
format_line_(ProductId, Interval, [{<<"createdat">>, V} | Tail], Acc) ->
|
||||
NewV = dgiot_tdengine_field:get_time(V, Interval),
|
||||
NewAcc = Acc#{<<"日期"/utf8>> => NewV},
|
||||
format_line_(ProductId, Interval, Tail, NewAcc);
|
||||
|
||||
format_line_(ProductId, Interval, [{K, V} | Tail], Acc) ->
|
||||
NewAcc =
|
||||
case dgiot_product:get_product_identifier(ProductId, K) of
|
||||
#{<<"name">> := Name} ->
|
||||
Acc#{Name => V};
|
||||
_ ->
|
||||
Acc
|
||||
end,
|
||||
format_line_(ProductId, Interval, Tail, NewAcc).
|
||||
|
||||
format_results(_ProductId, _Interval, [], Acc) ->
|
||||
Acc;
|
||||
|
||||
format_results(ProductId, Interval, [Line | Results], Acc) ->
|
||||
NewAcc = format_line(ProductId, Interval, maps:to_list(Line), Acc),
|
||||
format_results(ProductId, Interval, Results, NewAcc).
|
||||
|
||||
format_rows([], Acc) ->
|
||||
Acc;
|
||||
|
||||
format_rows([Row | Rows], Acc) ->
|
||||
Date = maps:get(<<"日期"/utf8>>, Row, dgiot_datetime:format(dgiot_datetime:to_localtime(dgiot_datetime:now_secs()), <<"YY-MM-DD HH:NN:SS">>)),
|
||||
NewAcc =
|
||||
maps:fold(fun(K1, V1, Acc1) ->
|
||||
case maps:find(K1, Acc1) of
|
||||
error ->
|
||||
Acc#{K1 => [#{<<"日期"/utf8>> => Date, K1 => V1}]};
|
||||
Acc1#{K1 => [#{<<"日期"/utf8>> => Date, K1 => V1}]};
|
||||
{ok, V2} ->
|
||||
Acc#{K1 => V2 ++ [#{<<"日期"/utf8>> => Date, K1 => V1}]}
|
||||
Acc1#{K1 => V2 ++ [#{<<"日期"/utf8>> => Date, K1 => V1}]}
|
||||
end
|
||||
end, Acc1, maps:without([<<"日期"/utf8>>], X))
|
||||
end, #{}, Rows),
|
||||
end, Acc, maps:without([<<"日期"/utf8>>], Row)),
|
||||
format_rows(Rows, NewAcc).
|
||||
|
||||
format_childrows(_ProductId, [], Acc) ->
|
||||
Acc;
|
||||
|
||||
format_childrows(ProductId, [{K, V} | ChildRows], Acc) ->
|
||||
Unit =
|
||||
case dgiot_product:get_product_identifier(ProductId, K) of
|
||||
#{<<"dataType">> := #{<<"specs">> := #{<<"unit">> := Unit1}}} ->
|
||||
Unit1;
|
||||
_ ->
|
||||
<<"">>
|
||||
end,
|
||||
NewAcc = Acc ++ [#{<<"columns">> => [<<"日期"/utf8>>, K], <<"rows">> => V, <<"unit">> => Unit}],
|
||||
format_childrows(ProductId, ChildRows, NewAcc).
|
||||
|
||||
get_echart(ProductId, Results, Names, Interval) ->
|
||||
Columns = [<<"日期"/utf8>>] ++ Names,
|
||||
Rows = format_results(ProductId, Interval, Results, []),
|
||||
%% io:format("~s ~p Rows = ~ts.~n", [?FILE, ?LINE, unicode:characters_to_list(dgiot_json:encode(Rows))]),
|
||||
ChildRows = format_rows(Rows, #{}),
|
||||
%% io:format("~s ~p ChildRows = ~p.~n", [?FILE, ?LINE, ChildRows]),
|
||||
Child =
|
||||
maps:fold(fun(K, V, Acc) ->
|
||||
Unit =
|
||||
case maps:find(K, Units) of
|
||||
error -> <<"">>;
|
||||
{ok, Unit1} -> Unit1
|
||||
end,
|
||||
Acc ++ [#{<<"columns">> => [<<"日期"/utf8>>, K], <<"rows">> => V, <<"unit">> => Unit}]
|
||||
end, [], ChildRows),
|
||||
Child = format_childrows(ProductId, maps:to_list(ChildRows), []),
|
||||
%% io:format("~s ~p Child = ~p.~n", [?FILE, ?LINE, Child]),
|
||||
#{<<"columns">> => Columns, <<"rows">> => Rows, <<"child">> => Child}.
|
||||
|
||||
@ -112,10 +148,9 @@ get_data_by_month(Channel, ProductId, DeviceId, Args) ->
|
||||
{ok, Sql} = maps:find(<<"sql">>, Res),
|
||||
{ok, Name_and_nuit} = maps:find(<<"name_and_unit">>, Res),
|
||||
%% 配置参数
|
||||
TableName = ?Table(DeviceId),
|
||||
Interval = <<"1d">>,
|
||||
%%传入参数获得结果
|
||||
case dgiot_device_tdengine:get_history_data2(Sql, Channel, TableName, Interval, ProductId, StartTime, EndTime) of
|
||||
case dgiot_device_tdengine:get_history_data2(Sql, Channel, DeviceId, Interval, ProductId, StartTime, EndTime) of
|
||||
%% 判断结果并转换格式
|
||||
{ok, #{<<"results">> := Results}} ->
|
||||
%% io:format("~s ~p Results = ~p,Name_and_nuit = ~p ~n",[?FILE,?LINE,Results,Name_and_nuit]),
|
||||
@ -271,10 +306,9 @@ get_data_by_echart_category(Channel, ProductId, DeviceId, Args) ->
|
||||
{ok, Sql} = maps:find(<<"sql">>, Res),
|
||||
{ok, Names} = maps:find(<<"names">>, Res),
|
||||
%% 配置参数
|
||||
TableName = ?Table(DeviceId),
|
||||
Interval = <<"1d">>,
|
||||
%%传入参数获得结果
|
||||
case dgiot_device_tdengine:get_history_data2(Sql, Channel, TableName, Interval, ProductId, StartTime, EndTime) of
|
||||
case dgiot_device_tdengine:get_history_data2(Sql, Channel, DeviceId, Interval, ProductId, StartTime, EndTime) of
|
||||
%% 判断结果并转换格式
|
||||
{ok, #{<<"results">> := Results}} ->
|
||||
%% io:format("~s ~p Results = ~p ~n",[?FILE,?LINE,Results]),
|
||||
|
@ -141,7 +141,7 @@ get_realdata({Token, Realdatas}) when is_map(Realdatas) ->
|
||||
{error, Error} ->
|
||||
{error, Error};
|
||||
{ok, Channel} ->
|
||||
{_, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, <<"last">>, Keys),
|
||||
{_, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, <<"_", ProductId/binary>>, <<"last">>, Keys),
|
||||
DB = dgiot_tdengine:get_database(Channel, ProductId),
|
||||
Sql1 = <<"select last(devaddr) as devaddr,", Newkeys/binary, " FROM ", DB/binary, "_", ProductId/binary, " group by devaddr;">>,
|
||||
%% io:format("Channel = ~p.~n Sql = ~p.~n", [Channel, Sql1]),
|
||||
|
@ -76,27 +76,48 @@ get_device(Channel, ProductId, DeviceId, _DevAddr, Query) ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
get_tablename(DB, [FirstDevId | _] = DeviceIds) ->
|
||||
format_join(DB, ?Table(FirstDevId), DeviceIds, <<>>, <<>>);
|
||||
|
||||
get_tablename(DB, DeviceId) ->
|
||||
TableName = ?Table(DeviceId),
|
||||
{<<DB/binary, TableName/binary>>, <<"1=1">>, TableName}.
|
||||
|
||||
format_join(_DB, FirstTableName, [], Tcc, Wcc) ->
|
||||
{Tcc, Wcc, FirstTableName};
|
||||
format_join(DB, FirstTableName, [DeviceId | DeviceIds], Tcc, Wcc) ->
|
||||
TableName = ?Table(DeviceId),
|
||||
{NewTcc, NewWcc} =
|
||||
case Tcc of
|
||||
<<>> ->
|
||||
{<<DB/binary, TableName/binary>>, <<FirstTableName/binary, ".createdat = ", FirstTableName/binary, ".createdat">>};
|
||||
_ ->
|
||||
{<<Tcc/binary, ", ", DB/binary, TableName/binary>>, <<Wcc/binary, " AND ", FirstTableName/binary, ".createdat = ", TableName/binary, ".createdat">>}
|
||||
end,
|
||||
format_join(DB, TableName, DeviceIds, NewTcc, NewWcc).
|
||||
|
||||
%% SELECT max(day_electricity) '时间' ,max(charge_current) '日期' FROM _2d26a94cf8._c5e1093e30 WHERE createdat >= now - 1h INTERVAL(1h) limit 10;
|
||||
%% SELECT spread(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h);
|
||||
%% SELECT last(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h);
|
||||
get_history_data(Channel, ProductId, TableName, Query) ->
|
||||
get_history_data(Channel, ProductId, DeviceId, Query) ->
|
||||
dgiot_tdengine:transaction(Channel,
|
||||
fun(Context) ->
|
||||
DB = dgiot_tdengine:get_database(Channel, ProductId),
|
||||
{TableName, DefWhere, FirstTableName} = get_tablename(DB, DeviceId),
|
||||
Function = maps:get(<<"function">>, Query, <<>>),
|
||||
Keys = maps:get(<<"keys">>, Query, <<"*">>),
|
||||
Limit = dgiot_tdengine_select:format_limit(Query),
|
||||
Starttime = dgiot_utils:to_binary(maps:get(<<"starttime">>, Query, dgiot_datetime:now_ms() - 25920000000)),
|
||||
Endtime = dgiot_utils:to_binary(maps:get(<<"endtime">>, Query, dgiot_datetime:now_ms())),
|
||||
{Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, Function, Keys),
|
||||
{Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, FirstTableName, Function, Keys),
|
||||
Tail =
|
||||
case maps:get(<<"interval">>, Query, <<>>) of
|
||||
<<>> ->
|
||||
<<" where createdat >= ", Starttime/binary, " AND createdat <= ", Endtime/binary, " ", Limit/binary, ";">>;
|
||||
<<" where ", DefWhere/binary, " AND ", FirstTableName/binary, ".createdat >= ", Starttime/binary, " AND ", FirstTableName/binary, ".createdat <= ", Endtime/binary, " ", Limit/binary, ";">>;
|
||||
Interval ->
|
||||
<<" where createdat >= ", Starttime/binary, " AND createdat <= ", Endtime/binary, " INTERVAL(", Interval/binary, ") ", Limit/binary, ";">>
|
||||
<<" where ", DefWhere/binary, " AND ", FirstTableName/binary, ".createdat >= ", Starttime/binary, " AND ", FirstTableName/binary, ".createdat <= ", Endtime/binary, " INTERVAL(", Interval/binary, ") ", Limit/binary, ";">>
|
||||
end,
|
||||
Sql = <<"SELECT ", Newkeys/binary, " FROM ", DB/binary, TableName/binary, Tail/binary>>,
|
||||
Sql = <<"SELECT ", Newkeys/binary, " FROM ", TableName/binary, Tail/binary>>,
|
||||
%% io:format("~s ~p Sql = ~p.~n", [?FILE, ?LINE, Sql]),
|
||||
{Names, dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_query, Sql)}
|
||||
end).
|
||||
@ -112,7 +133,7 @@ get_realtime_data(Channel, ProductId, TableName, Query) ->
|
||||
_ ->
|
||||
maps:get(<<"keys">>, Query, <<"*">>)
|
||||
end,
|
||||
{_Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, <<"">>, Keys),
|
||||
{_Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, TableName, <<"">>, Keys),
|
||||
DB = dgiot_tdengine:get_database(Channel, ProductId),
|
||||
case size(Newkeys) > 0 of
|
||||
true ->
|
||||
@ -123,11 +144,12 @@ get_realtime_data(Channel, ProductId, TableName, Query) ->
|
||||
end
|
||||
end).
|
||||
|
||||
get_history_data2(Order, Channel, TableName, Interval, ProductId, StartTime, _EndTime) ->
|
||||
get_history_data2(Order, Channel, DeviceId, Interval, ProductId, StartTime, _EndTime) ->
|
||||
%% io:format("~s ~p Order= ~p, Channel= ~p, TableName= ~p, TableName= ~p,~n Interval= ~p, ProductId= ~p, ~n StartTime= ~p, EndTime =~p. ~n",[?FILE,?LINE,Order, Channel, TableName, TableName, Interval, ProductId, StartTime, _EndTime]),
|
||||
dgiot_tdengine:transaction(Channel,
|
||||
fun(Context) ->
|
||||
DB = dgiot_tdengine:get_database(Channel, ProductId),
|
||||
TableName = ?Table(DeviceId),
|
||||
BinStartTime = dgiot_utils:to_binary(StartTime),
|
||||
Tail = <<" where createdat >= ", BinStartTime/binary, " INTERVAL(", Interval/binary, ") ", ";">>,
|
||||
Sql = <<"SELECT ", Order/binary, " FROM ", DB/binary, TableName/binary, Tail/binary>>,
|
||||
@ -137,9 +159,8 @@ get_history_data2(Order, Channel, TableName, Interval, ProductId, StartTime, _En
|
||||
|
||||
get_gps_track(Channel, ProductId, DeviceId, Args) ->
|
||||
Query = Args#{<<"keys">> => <<"latitude,longitude">>},
|
||||
TableName = ?Table(DeviceId),
|
||||
{_Names, Results} =
|
||||
case dgiot_device_tdengine:get_history_data(Channel, ProductId, TableName, Query) of
|
||||
case dgiot_device_tdengine:get_history_data(Channel, ProductId, DeviceId, Query) of
|
||||
{TdNames, {ok, #{<<"results">> := TdResults}}} ->
|
||||
%% io:format("~s ~p TdResults = ~p.~n", [?FILE, ?LINE, TdResults]),
|
||||
NewTdResults =
|
||||
|
@ -20,7 +20,7 @@
|
||||
-include_lib("dgiot_tdengine/include/dgiot_tdengine.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
|
||||
-export([get_product/2, get_products/2, get_keys/3, check_field/3, test_product/0]).
|
||||
-export([get_product/2, get_products/2, get_keys/4, check_field/3, test_product/0]).
|
||||
-export([get_channel/1, do_channel/3, get_product_data/4]).
|
||||
|
||||
test_product() ->
|
||||
@ -123,50 +123,63 @@ get_channel(Session) ->
|
||||
{ok, ChannelId}
|
||||
end.
|
||||
|
||||
get_keys(ProductId, Function, <<"*">>) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} when length(Props) > 0 ->
|
||||
lists:foldl(fun(X, {Names, Acc}) ->
|
||||
case X of
|
||||
#{<<"identifier">> := Identifier, <<"name">> := Name, <<"isstorage">> := true} ->
|
||||
case Acc of
|
||||
<<>> ->
|
||||
{Names ++ [Name], <<Acc/binary, Function/binary, "(", Identifier/binary, ") ", Identifier/binary>>};
|
||||
_ ->
|
||||
{Names ++ [Name], <<Acc/binary, ", ", Function/binary, "(", Identifier/binary, ") ", Identifier/binary>>}
|
||||
end;
|
||||
spell_sql(_ProductId, _Function, [], {Names, Acc}) ->
|
||||
{Names, Acc};
|
||||
spell_sql(ProductId, Function, [Key | Keys], {Names, Acc}) ->
|
||||
case dgiot_product:get_product_identifier(ProductId, Key) of
|
||||
#{<<"identifier">> := Identifier, <<"name">> := Name, <<"isstorage">> := Isstorage} when Isstorage > 0 ->
|
||||
{NewNames, NewAcc} =
|
||||
case Acc of
|
||||
<<>> ->
|
||||
{Names ++ [Name], <<Acc/binary, Function/binary, "(", Identifier/binary, ") ", Identifier/binary>>};
|
||||
_ ->
|
||||
{Names, Acc}
|
||||
end
|
||||
end, {[], get_defult(Function)}, Props);
|
||||
_Other ->
|
||||
?LOG(debug, "~p _Other ~p", [ProductId, _Other]),
|
||||
{Names ++ [Name], <<Acc/binary, ", ", Function/binary, "(", Identifier/binary, ") ", Identifier/binary>>}
|
||||
end,
|
||||
spell_sql(ProductId, Function, Keys, {NewNames, NewAcc});
|
||||
_ ->
|
||||
{Names, Acc}
|
||||
end.
|
||||
|
||||
get_keys(ProductId, TableName, Function, <<"*">>) ->
|
||||
case dgiot_product:get_keys(ProductId) of
|
||||
Keys when length(Keys) > 0 ->
|
||||
spell_sql(ProductId, Function, Keys, {[], get_defult(TableName, Function)});
|
||||
_ ->
|
||||
{[], <<"*">>}
|
||||
end;
|
||||
|
||||
get_keys(ProductId, Function, Keys) when Keys == undefined; Keys == <<>> ->
|
||||
get_keys(ProductId, Function, <<"*">>);
|
||||
get_keys(ProductId, TableName, Function, Keys) when Keys == undefined; Keys == <<>> ->
|
||||
get_keys(ProductId, TableName, Function, <<"*">>);
|
||||
|
||||
get_keys(ProductId, Function, Keys) ->
|
||||
get_keys(ProductId, TableName, Function, Keys) ->
|
||||
List =
|
||||
case is_list(Keys) of
|
||||
true -> Keys;
|
||||
false -> re:split(Keys, <<",">>)
|
||||
end,
|
||||
Maps = get_prop(ProductId),
|
||||
lists:foldl(fun(X, {Names, Acc}) ->
|
||||
case maps:find(X, Maps) of
|
||||
error ->
|
||||
{Names, Acc};
|
||||
{ok, Name} ->
|
||||
case Acc of
|
||||
<<>> ->
|
||||
{Names ++ [Name], <<Acc/binary, Function/binary, "(", X/binary, ") ", X/binary>>};
|
||||
case dgiot_product:get_keys(ProductId) of
|
||||
TdKeys when length(TdKeys) > 0 ->
|
||||
lists:foldl(fun(Key, {Names, Acc}) ->
|
||||
case TdKeys -- [Key] of
|
||||
TdKeys ->
|
||||
{Names, Acc};
|
||||
_ ->
|
||||
{Names ++ [Name], <<Acc/binary, ", ", Function/binary, "(", X/binary, ") ", X/binary>>}
|
||||
case dgiot_product:get_product_identifier(ProductId, Key) of
|
||||
#{<<"identifier">> := Identifier, <<"name">> := Name, <<"isstorage">> := Isstorage} when Isstorage > 0 ->
|
||||
case Acc of
|
||||
<<>> ->
|
||||
{Names ++ [Name], <<Acc/binary, Function/binary, "(", Identifier/binary, ") ", Identifier/binary>>};
|
||||
_ ->
|
||||
{Names ++ [Name], <<Acc/binary, ", ", Function/binary, "(", Identifier/binary, ") ", Identifier/binary>>}
|
||||
end;
|
||||
_ ->
|
||||
{Names, Acc}
|
||||
end
|
||||
end
|
||||
end
|
||||
end, {[], get_defult(Function)}, List).
|
||||
end, {[], get_defult(TableName, Function)}, List);
|
||||
_ ->
|
||||
{[], <<"*">>}
|
||||
end.
|
||||
|
||||
%% 秒转换为分钟
|
||||
check_field(_, V, #{<<"specs">> := #{<<"type">> := <<"minutes">>}}) ->
|
||||
@ -232,21 +245,6 @@ check_field(<<"date">>, V, _) ->
|
||||
check_field(_Typea, V, _) ->
|
||||
dgiot_utils:to_binary(V).
|
||||
|
||||
get_prop(ProductId) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
case X of
|
||||
#{<<"identifier">> := Identifier, <<"name">> := Name} ->
|
||||
Acc#{Identifier => Name};
|
||||
_ -> Acc
|
||||
end
|
||||
end, #{}, Props);
|
||||
_ ->
|
||||
#{}
|
||||
end.
|
||||
|
||||
|
||||
get_product_data(Channel, ProductId, DeviceId, Args) ->
|
||||
Query = maps:without([<<"productid">>, <<"deviceid">>], Args),
|
||||
?LOG(info, "Channel ~p Args ~p", [Channel, Args]),
|
||||
@ -268,17 +266,17 @@ get_product_data(Channel, ProductId, DeviceId, Args) ->
|
||||
end
|
||||
end.
|
||||
|
||||
get_defult(<<"first">>) ->
|
||||
<<"first(createdat) createdat">>;
|
||||
get_defult(<<"last">>) ->
|
||||
<<"last(createdat) createdat">>;
|
||||
get_defult(<<"count">>) ->
|
||||
get_defult(TableName, <<"first">>) ->
|
||||
<<"first(", TableName/binary, ".createdat) createdat">>;
|
||||
get_defult(TableName, <<"last">>) ->
|
||||
<<"last(", TableName/binary, ".createdat) createdat">>;
|
||||
get_defult(_TableName, <<"count">>) ->
|
||||
<<"">>;
|
||||
get_defult(<<"avg">>) ->
|
||||
get_defult(_TableName, <<"avg">>) ->
|
||||
<<"">>;
|
||||
get_defult(<<"sum">>) ->
|
||||
get_defult(_TableName, <<"sum">>) ->
|
||||
<<"">>;
|
||||
get_defult(<<"stddev">>) ->
|
||||
get_defult(_TableName, <<"stddev">>) ->
|
||||
<<"">>;
|
||||
get_defult(_) ->
|
||||
<<"(createdat) createdat">>.
|
||||
get_defult(TableName, _) ->
|
||||
<<"(", TableName/binary, ".createdat) createdat">>.
|
||||
|
@ -107,14 +107,14 @@ start(ChannelId, ChannelArgs) ->
|
||||
init(?TYPE, ChannelId, #{<<"network">> := NetWork, <<"port">> := Port, <<"url">> := Url, <<"Size">> := PoolSize} = _ChannelArgs) ->
|
||||
State = #state{id = ChannelId},
|
||||
Mode = case Url of
|
||||
<<"product">> ->
|
||||
<<"product">>;
|
||||
_ ->
|
||||
<<"product">> ->
|
||||
<<"product">>;
|
||||
_ ->
|
||||
%% dgiot_grpc_test:start(),
|
||||
R = dgiot_grpc_client:create_channel_pool(ChannelId, Url, PoolSize),
|
||||
io:format("~s ~p R ~p ~n",[?FILE, ?LINE, R]),
|
||||
<<"grpc">>
|
||||
end,
|
||||
dgiot_grpc_client:create_channel_pool(ChannelId, Url, PoolSize),
|
||||
%% io:format("~s ~p R ~p ~n",[?FILE, ?LINE, R]),
|
||||
<<"grpc">>
|
||||
end,
|
||||
{ok, State, get_child_spec(NetWork, Port, ChannelId, Mode)}.
|
||||
|
||||
handle_init(State) ->
|
||||
@ -130,15 +130,15 @@ handle_event(_EventId, _Event, State) ->
|
||||
{ok, State}.
|
||||
|
||||
%% gun监测 开始
|
||||
handle_message({gun_up, _Pid, Proctol, Status, Env }, #state{id = _ChannelId} = State) ->
|
||||
handle_message({gun_up, _Pid, Proctol, Status, Env}, #state{id = _ChannelId} = State) ->
|
||||
io:format("~s ~p gun_up = Proctol ~p. Status ~p , Env ~p ~n", [?FILE, ?LINE, Proctol, Status, Env]),
|
||||
{ok, State};
|
||||
|
||||
handle_message({gun_error, _Pid, Proctol, Status, Env }, #state{id = _ChannelId} = State) ->
|
||||
handle_message({gun_error, _Pid, Proctol, Status, Env}, #state{id = _ChannelId} = State) ->
|
||||
io:format("~s ~p gun_error = Proctol ~p. Status ~p , Env ~p ~n", [?FILE, ?LINE, Proctol, Status, Env]),
|
||||
{ok, State};
|
||||
|
||||
handle_message({gun_down, _Pid, Proctol, Status, Env }, #state{id = _ChannelId} = State) ->
|
||||
handle_message({gun_down, _Pid, Proctol, Status, Env}, #state{id = _ChannelId} = State) ->
|
||||
io:format("~s ~p gun_down = Proctol ~p. Status ~p , Env ~p ~n", [?FILE, ?LINE, Proctol, Status, Env]),
|
||||
{ok, State};
|
||||
%% gun监测结束
|
||||
|
@ -121,8 +121,15 @@ publish(Pid, Payload) ->
|
||||
receive_ack(ResBody) ->
|
||||
receive
|
||||
{sync_parse, NewResBody} when is_map(NewResBody) ->
|
||||
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, length(maps:to_list(NewResBody))]),
|
||||
{ok, dgiot_json:encode(maps:remove(<<"id">>, NewResBody))};
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
erlang:garbage_collect(self()),
|
||||
M = maps:remove(<<"id">>, NewResBody),
|
||||
erlang:garbage_collect(self()),
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
ResBody1 = jsx:encode(M),
|
||||
erlang:garbage_collect(self()),
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
{ok, ResBody1};
|
||||
{sync_parse, NewResBody} ->
|
||||
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, NewResBody]),
|
||||
{ok, NewResBody};
|
||||
|
@ -589,6 +589,9 @@ merge_table(Name, Class, NewFields, OldFields) ->
|
||||
handle_response(Result) ->
|
||||
Fun =
|
||||
fun(Res, Body) ->
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
erlang:garbage_collect(self()),
|
||||
?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]),
|
||||
case jsx:is_json(Body) of
|
||||
true ->
|
||||
case catch jsx:decode(Body, [{labels, binary}, return_maps]) of
|
||||
|
@ -454,7 +454,7 @@ log.to = both
|
||||
## this level will be logged.
|
||||
##
|
||||
## Default: warning
|
||||
log.level = warning
|
||||
log.level = error
|
||||
|
||||
## The dir for log files.
|
||||
##
|
||||
|
Loading…
Reference in New Issue
Block a user