diff --git a/.gitignore b/.gitignore index 052eb381..a34959a6 100644 --- a/.gitignore +++ b/.gitignore @@ -102,6 +102,7 @@ apps/dgiot_modbusx/ apps/dgiot_rtsp2ws/ apps/dgiot_sophon/ apps/dgiot_open62541/ +apps/dgiot_tdenginex/ node_modules/ package-lock.json erlang_ls.config diff --git a/apps/dgiot/src/otp/dgiot_hook.erl b/apps/dgiot/src/otp/dgiot_hook.erl index 40aff60c..09b670dc 100644 --- a/apps/dgiot/src/otp/dgiot_hook.erl +++ b/apps/dgiot/src/otp/dgiot_hook.erl @@ -29,37 +29,44 @@ add(Key, Fun) -> add(HookType, Key, Fun) -> Hooks = get_hooks(), New = case HookType of - one_for_more -> - Old = maps:get(Key, Hooks, []), - [Fun|Old]; - one_for_one -> - [Fun] - end, - dgiot_data:insert(dgiot_hook, Hooks#{ Key => New }). + one_for_more -> + Old = maps:get(Key, Hooks, []), + [Fun | Old]; + one_for_one -> + [Fun] + end, + dgiot_data:insert(dgiot_hook, Hooks#{Key => New}). remove(Key) -> Hooks = get_hooks(), - NewHooks = maps:remove(Key, Hooks), - dgiot_data:insert(dgiot_hook, NewHooks). + NewHooks = maps:remove(Key, Hooks), + dgiot_data:insert(dgiot_hook, NewHooks). run_hook(Key, Args) -> - Funs = get_hooks(Key), - case Funs of - not_find -> - {error, not_find}; - _ -> - {_NewFuns, Rtns} = lists:foldl( - fun(Fun, {Acc, R}) -> - case catch (do_hook(Fun, Args)) of - {'EXIT', Reason} -> - ?LOG(error,"do hook error, Args:~p -> ~p~n", [Args, Reason]), - {Acc, R}; - Result -> - {[Fun | Acc], [Result | R]} - end - end, {[], []}, Funs), - {ok, Rtns} - end. + Funs = get_hooks(Key), + case Funs of + not_find -> + {error, not_find}; + _ -> + Rtns = fold_hook(Funs, Args), + {ok, Rtns} + end. + +fold_hook(Funs, Args) -> + fold_hook(Funs, Args, []). +fold_hook([], _Args, Acc) -> + Acc; +fold_hook([Fun | Funs], Args, Acc) -> + NewAcc = + case catch (do_hook(Fun, Args)) of + {'EXIT', Reason} -> + ?LOG(error, "do hook error, Args:~p -> ~p~n", [Args, Reason]), + Acc; + Result -> + [Result | Acc] + end, + erlang:garbage_collect(self()), + fold_hook(Funs, Args, NewAcc). do_hook({M, F}, Args) when is_list(Args) -> apply(M, F, Args); @@ -71,14 +78,14 @@ do_hook(Fun, Args) -> Fun(Args). get_hooks(Key) -> - Hooks = get_hooks(), - maps:get(Key, Hooks, not_find). + Hooks = get_hooks(), + maps:get(Key, Hooks, not_find). get_hooks() -> - case dgiot_data:lookup(dgiot_hook) of - {ok, Hooks} -> Hooks; - {error, not_find} -> #{} - end. + case dgiot_data:lookup(dgiot_hook) of + {ok, Hooks} -> Hooks; + {error, not_find} -> #{} + end. diff --git a/apps/dgiot/src/storage/dgiot_cache_sup.erl b/apps/dgiot/src/storage/dgiot_cache_sup.erl index 5403a293..54e444a7 100644 --- a/apps/dgiot/src/storage/dgiot_cache_sup.erl +++ b/apps/dgiot/src/storage/dgiot_cache_sup.erl @@ -34,7 +34,7 @@ start_link(Opts) -> supervisor:start_link({local, ?SERVER}, ?MODULE, Opts). init(Opts) -> - MaxSize = proplists:get_value(ets_maxsize, Opts, 32 * 1024 * 1024), + MaxSize = proplists:get_value(ets_maxsize, Opts, 1024 * 1024 * 1024), Threshold = proplists:get_value(ets_threshold, Opts, 0.85), Weight = proplists:get_value(ets_weight, Opts, 30), ValOpts = [{ets_maxsize, MaxSize}, {ets_threshold, Threshold}, {checkpid, dgiot_cache_check_worker}], diff --git a/apps/dgiot/src/storage/dgiot_cache_worker.erl b/apps/dgiot/src/storage/dgiot_cache_worker.erl index 18ad1dee..b174b99c 100644 --- a/apps/dgiot/src/storage/dgiot_cache_worker.erl +++ b/apps/dgiot/src/storage/dgiot_cache_worker.erl @@ -77,7 +77,7 @@ stop(NameOrPid) -> %%%=================================================================== init(Opts) -> - MaxSize = proplists:get_value(ets_maxsize, Opts, 8 * 1024 * 1024), + MaxSize = proplists:get_value(ets_maxsize, Opts, 1024 * 1024 * 1024), Threshold = proplists:get_value(ets_threshold, Opts, 0.85), CheckPid = proplists:get_value(checkpid, Opts), ValueEts = ets:new(?MODULE, [public, named_table, {write_concurrency, true}, {read_concurrency, true}]), diff --git a/apps/dgiot/src/transport/dgiot_tcp_server.erl b/apps/dgiot/src/transport/dgiot_tcp_server.erl index 56c3891c..3eee26bf 100644 --- a/apps/dgiot/src/transport/dgiot_tcp_server.erl +++ b/apps/dgiot/src/transport/dgiot_tcp_server.erl @@ -182,7 +182,7 @@ handle_info({tcp_closed, Sock}, #state{mod = Mod, child = #tcp{clientid = Client DTUIP = dgiot_utils:get_ip(Sock), write_log(ChildState#tcp.log, <<"ERROR ", DTUIP/binary, " ", Clientid/binary>>, <<"tcp_closed">>), dgiot_metrics:dec(dgiot, <<"tcp_online">>, 1), - ?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]), +%% ?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]), case Mod:handle_info(tcp_closed, ChildState) of {noreply, NewChild} -> {stop, normal, State#state{child = NewChild#tcp{socket = undefined}}}; diff --git a/apps/dgiot/src/utils/dgiot_csv.erl b/apps/dgiot/src/utils/dgiot_csv.erl index 76bb76f3..bcee02bc 100644 --- a/apps/dgiot/src/utils/dgiot_csv.erl +++ b/apps/dgiot/src/utils/dgiot_csv.erl @@ -136,12 +136,12 @@ post_properties(<<"plc">>, AtomName) -> end, [], Things); post_properties(<<"dlink">>, AtomName) -> - Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11' | '_']}), - lists:foldl(fun([Index, Devicetype, Name, Identifier, Key, Len, AccessMode, Min_Max, Unit, Type, Specs | _], Acc) -> + Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11', '$12' | '_']}), + lists:foldl(fun([Index, Devicetype, Name, Identifier, Key, Len, Isstorage, AccessMode, Min_Max, Unit, Type, Specs | _], Acc) -> Acc++ [#{ <<"name">> => Name, <<"index">> => Index, - <<"isstorage">> => true, + <<"isstorage">> => dgiot_utils:to_int(Isstorage), <<"isshow">> => true, <<"dataForm">> => #{ <<"address">> => <<"0">>, diff --git a/apps/dgiot_bridge/src/dgiot_bridge_loader.erl b/apps/dgiot_bridge/src/dgiot_bridge_loader.erl index bcdbbc8b..b6d2163d 100644 --- a/apps/dgiot_bridge/src/dgiot_bridge_loader.erl +++ b/apps/dgiot_bridge/src/dgiot_bridge_loader.erl @@ -105,7 +105,7 @@ load_channel(Channels, Fun) -> <<"method">> => <<"GET">>, <<"path">> => <<"/classes/Product">>, <<"body">> => #{ - <<"keys">> => [<<"decoder">>, <<"ACL">>, <<"name">>, <<"channel">>, <<"dynamicReg">>, <<"devType">>, <<"nodeType">>, <<"productSecret">>, <<"config">>, <<"thing">>, <<"topics">>], + <<"keys">> => [<<"decoder">>, <<"ACL">>, <<"name">>, <<"channel">>, <<"dynamicReg">>, <<"devType">>, <<"nodeType">>, <<"productSecret">>, <<"config">>, <<"topics">>], <<"include">> => [<<"Dict">>], <<"where">> => #{ <<"$relatedTo">> => #{ diff --git a/apps/dgiot_device/include/dgiot_device.hrl b/apps/dgiot_device/include/dgiot_device.hrl index 931b85c6..4eb86cf9 100644 --- a/apps/dgiot_device/include/dgiot_device.hrl +++ b/apps/dgiot_device/include/dgiot_device.hrl @@ -16,6 +16,7 @@ -define(DGIOT_PRODUCT, dgiot_product). -define(DGIOT_PRODUCT_IDENTIFIE, dgiot_product_identifie). +-define(DGIOT_PRODUCT_STAB, dgiot_product_stab). -define(DEVICE_PROFILE, dgiot_device_profile). -define(DEVICE_DEVICE_COLOR, dgiot_device_color). -define(DGIOT_CHANNEL_SESSION, dgiot_channel_session). diff --git a/apps/dgiot_device/src/dgiot_product.erl b/apps/dgiot_device/src/dgiot_product.erl index 751db261..e7756d3b 100644 --- a/apps/dgiot_device/src/dgiot_product.erl +++ b/apps/dgiot_device/src/dgiot_product.erl @@ -25,16 +25,18 @@ -export([get_prop/1, get_props/1, get_props/2, get_unit/1, update_properties/2, update_properties/0]). -export([update_topics/0, update_product_filed/1]). -export([save_devicetype/1, get_devicetype/1, get_device_thing/2, get_productSecret/1]). --export([save_/1, get_keys/1, get_control/1, save_control/1, get_interval/1, save_device_thingtype/1, get_product_identifier/2, hook_topic/1]). +-export([save_/1, get_keys/1, get_sub_tab/1, get_control/1, save_control/1, get_interval/1, get_product_identifier/2, hook_topic/1]). init_ets() -> dgiot_data:init(?DGIOT_PRODUCT, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), dgiot_data:init(?DGIOT_PRODUCT_IDENTIFIE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), + dgiot_data:init(?DGIOT_PRODUCT_STAB, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), dgiot_data:init(?DGIOT_CHANNEL_SESSION, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), dgiot_data:init(?DEVICE_DEVICE_COLOR, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]), dgiot_data:init(?DEVICE_PROFILE, [public, named_table, set, {write_concurrency, true}, {read_concurrency, true}]). load_all_cache({Skip}) -> + case dgiot_parsex:query_object(<<"Product">>, #{<<"limit">> => 1000, <<"skip">> => Skip}) of {ok, #{<<"results">> := Results}} when length(Results) == 0 -> load_end; @@ -47,6 +49,7 @@ load_all_cache({Skip}) -> {next, Skip} end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% save_prod(ProductId, #{<<"thing">> := _thing} = Product) -> dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product), @@ -170,34 +173,58 @@ get_devicetype(ProductId) -> DeviceTypes end. +save_product_thing(ProductId, Identifier, undefined, Profile, DeviceType, Prop, Type, 0) -> + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), + save_product_thing({ProductId, device_thing, DeviceType}, #{Identifier => Type}, map), + save_product_thing({ProductId, devicetype}, [DeviceType], list), + save_product_thing({ProductId, profile_control}, #{Identifier => Profile}, map); -%% 设备类型 -save_device_thingtype(ProductId, DeviceType, NewMap) -> - case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}) of +save_product_thing(ProductId, Identifier, Key, undefined, DeviceType, Prop, Type, Isstorage) -> + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), + save_product_thing({ProductId, device_thing, DeviceType}, #{Identifier => Type}, map), + save_product_thing({ProductId, keys}, [Key], list), + save_product_thing({ProductId, devicetype}, [DeviceType], list), + + <> = dgiot_utils:to_md5(<>), + save_product_thing({ProductId, sub_tab}, [PId], list), + + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {PId, Identifier, identifie}, Prop), + save_product_thing({PId, device_thing, DeviceType}, #{Identifier => Type}, map), + save_product_thing({PId, keys}, [Key], list), + save_product_thing({PId, devicetype}, [DeviceType], list), + dgiot_data:insert(?DGIOT_PRODUCT_STAB, {ProductId, Identifier, stab}, PId); + +save_product_thing(ProductId, Identifier, Key, Profile, DeviceType, Prop, Type, Isstorage) -> + + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), + save_product_thing({ProductId, device_thing, DeviceType}, #{Identifier => Type}, map), + save_product_thing({ProductId, keys}, [Key], list), + save_product_thing({ProductId, devicetype}, [DeviceType], list), + save_product_thing({ProductId, profile_control}, #{Identifier => Profile}, map), + + <> = dgiot_utils:to_md5(<>), + save_product_thing({ProductId, sub_tab}, [PId], list), + + dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), + save_product_thing({PId, device_thing, DeviceType}, #{Identifier => Type}, map), + save_product_thing({PId, keys}, [Key], list), + save_product_thing({PId, devicetype}, [DeviceType], list), + save_product_thing({PId, profile_control}, #{Identifier => Profile}, map), + dgiot_data:insert(?DGIOT_PRODUCT_STAB, {ProductId, Identifier, stab}, PId). + +save_product_thing(Key, Value, list) -> + case dgiot_data:get(?DGIOT_PRODUCT, Key) of not_find -> - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, NewMap); - Map -> - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, dgiot_map:merge(Map, NewMap)) - end. - -save_device_thingtype(ProductId) -> - case dgiot_product:lookup_prod(ProductId) of - {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> - lists:map( - fun - (#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"dataType">> := #{<<"type">> := Type}}) -> - case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}) of - not_find -> - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, #{Identifier => Type}); - Map -> - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, device_thing, DeviceType}, Map#{Identifier => Type}) - end; - (_) -> - pass - end, Props); - - _Error -> - [] + dgiot_data:insert(?DGIOT_PRODUCT, Key, Value); + Values -> + dgiot_data:insert(?DGIOT_PRODUCT, Key, dgiot_utils:unique_2(Values ++ Value)) + end; +save_product_thing(Key, Value, map) -> + case dgiot_data:get(?DGIOT_PRODUCT, Key) of + not_find -> + dgiot_data:insert(?DGIOT_PRODUCT, Key, Value); + Values -> + dgiot_data:insert(?DGIOT_PRODUCT, Key, dgiot_map:merge(Values, Value)) end. %% 物模型标识符 @@ -302,34 +329,45 @@ update_product_filed(_Filed) -> end. save_(ProductId) -> - {Keys, Control, DeviceTypes} = - case dgiot_product:lookup_prod(ProductId) of - {ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} -> - delete_product_identifier(ProductId), - Tags = maps:get(<<"tags">>, Thing, []), - lists:foldl( - fun - (#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) -> - dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), - save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}), - {Acc ++ [Identifier], Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]}; - (#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"profile">> := Profile, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) -> - dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), - save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}), - {Acc, Ccc#{Identifier => Profile}, Dcc ++ [DeviceType]}; - (#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := true, <<"dataType">> := #{<<"type">> := Type}} = Prop, {Acc, Ccc, Dcc}) -> - dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifier, identifie}, Prop), - save_device_thingtype(ProductId, DeviceType, #{Identifier => Type}), - {Acc ++ [Identifier], Ccc, Dcc ++ [DeviceType]}; - (_, {Acc, Ccc, Dcc}) -> - {Acc, Ccc, Dcc} - end, {[], #{}, []}, Props ++ Tags); - _Error -> - {[], #{}, []} - end, - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, keys}, Keys), - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, profile_control}, Control), - dgiot_data:insert(?DGIOT_PRODUCT, {ProductId, devicetype}, dgiot_utils:unique_2(DeviceTypes)). + case dgiot_product:lookup_prod(ProductId) of + {ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} -> + delete_product_identifier(ProductId), + Tags = maps:get(<<"tags">>, Thing, []), + fold_prop(ProductId, Props ++ Tags); + _Error -> + {[], #{}, []} + end. + +fold_prop(ProductId, Props) when length(Props) =< 40 -> + fold_prop_(ProductId, Props); + +fold_prop(ProductId, [A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, + A21, A22, A23, A24, A25, A26, A27, A28, A29, A30, A31, A32, A33, A34, A35, A36, A37, A38, A39, A40 | Tail]) -> + Head = [A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, + A21, A22, A23, A24, A25, A26, A27, A28, A29, A30, A31, A32, A33, A34, A35, A36, A37, A38, A39, A40], + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), + fold_prop(ProductId, Tail), + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), + fold_prop_(ProductId, Head), + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), + erlang:garbage_collect(self()). + +fold_prop_(_ProductId, []) -> + ok; + +fold_prop_(ProductId, [#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"isstorage">> := Isstorage, + <<"dataType">> := #{<<"type">> := Type}} = Prop | Props]) when Isstorage > 0 -> + save_product_thing(ProductId, Identifier, Identifier, maps:get(<<"profile">>, Prop, undefined), DeviceType, Prop, Type, dgiot_utils:to_binary(Isstorage)), + fold_prop_(ProductId, Props); + +fold_prop_(ProductId, [#{<<"devicetype">> := DeviceType, <<"identifier">> := Identifier, <<"profile">> := Profile, + <<"dataType">> := #{<<"type">> := Type}} = Prop | Props]) -> + save_product_thing(ProductId, Identifier, undefined, Profile, DeviceType, Prop, Type, 0), + fold_prop_(ProductId, Props); + +fold_prop_(ProductId, [_Prop | Props]) -> + fold_prop_(ProductId, Props). + get_keys(ProductId) -> case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, keys}) of @@ -339,6 +377,14 @@ get_keys(ProductId) -> Keys end. +get_sub_tab(ProductId) -> + case dgiot_data:get(?DGIOT_PRODUCT, {ProductId, sub_tab}) of + not_find -> + []; + Keys -> + Keys + end. + get_interval(ProductId) -> case lookup_prod(ProductId) of {ok, #{<<"config">> := #{<<"interval">> := Interval}}} -> diff --git a/apps/dgiot_device/src/handler/dgiot_tdengine_handler.erl b/apps/dgiot_device/src/handler/dgiot_tdengine_handler.erl index 11588971..f1979ba9 100644 --- a/apps/dgiot_device/src/handler/dgiot_tdengine_handler.erl +++ b/apps/dgiot_device/src/handler/dgiot_tdengine_handler.erl @@ -134,14 +134,14 @@ do_request(get_echart_deviceid, #{<<"deviceid">> := DeviceId, <<"style">> := Sty {error, Error} -> {error, Error}; {ok, Channel} -> case dgiot_parsex:get_object(<<"Device">>, DeviceId) of - {ok, #{<<"objectId">> := DeviceId, <<"product">> := #{<<"objectId">> := ProductId}}} -> + {ok, #{<<"objectId">> := DeviceId, <<"devaddr">> := Devaddr, <<"product">> := #{<<"objectId">> := ProductId}}} -> case Style of <<"amis_table">> -> dgiot_device_echart:get_data_by_month(Channel, ProductId, DeviceId, Args); <<"echart_category">> -> dgiot_device_echart:get_data_by_echart_category(Channel, ProductId, DeviceId, Args); _ -> - dgiot_device_echart:get_echart_data(Channel, ProductId, DeviceId, Args) + dgiot_device_echart:get_echart_data(Channel, ProductId, DeviceId, Devaddr, Args) end; _ -> @@ -158,9 +158,9 @@ do_request(get_devicecard_deviceid, #{<<"deviceid">> := DeviceId} = Args, #{<<"s {error, Error}; {ok, Channel} -> case dgiot_parsex:get_object(<<"Device">>, DeviceId) of - {ok, #{<<"objectId">> := DeviceId, <<"product">> := #{<<"objectId">> := ProductId}}} -> + {ok, #{<<"objectId">> := DeviceId, <<"devaddr">> := Devaddr, <<"product">> := #{<<"objectId">> := ProductId}}} -> dgiot_mqtt:subscribe_route_key([<<"$dg/user/realtimecard/", DeviceId/binary, "/#">>], <<"realtimecard">>, SessionToken), - dgiot_device_card:get_device_card(Channel, ProductId, DeviceId, Args); + dgiot_device_card:get_devcard(Channel, ProductId, DeviceId, Devaddr, Args); _ -> {error, <<"not find device">>} end diff --git a/apps/dgiot_device/src/utils/dgiot_device_card.erl b/apps/dgiot_device/src/utils/dgiot_device_card.erl index 2ac42e67..b7520b11 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_card.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_card.erl @@ -20,7 +20,22 @@ -include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot_tdengine/include/dgiot_tdengine.hrl"). --export([get_card/5, get_device_card/4]). +-export([get_devcard/5, get_card/5, get_device_card/4]). + +get_devcard(Channel, ProductId, DeviceId, Devaddr, Args) -> + Chartdata = + case dgiot_product:get_sub_tab(ProductId) of + not_find -> + get_device_card(Channel, ProductId, DeviceId, Args); + Subs when length(Subs) > 1 -> + lists:foldl(fun(SubId, Acc) -> + SubDevid = dgiot_parse_id:get_deviceid(SubId, Devaddr), + Acc ++ get_device_card(Channel, SubId, SubDevid, Args) + end, [], Subs); + _ -> + get_device_card(Channel, ProductId, DeviceId, Args) + end, + {ok, #{<<"data">> => Chartdata}}. get_device_card(Channel, ProductId, DeviceId, Args) -> TableName = ?Table(DeviceId), @@ -31,8 +46,8 @@ get_device_card(Channel, ProductId, DeviceId, Args) -> _ -> [#{}] end, - Chartdata = get_card(ProductId, Results, DeviceId, Args, dgiot_data:get({shard_storage, ProductId})), - {ok, #{<<"data">> => Chartdata}}. + get_card(ProductId, Results, DeviceId, Args, dgiot_data:get({shard_storage, ProductId})). + decode_shard_data(Data, Result) -> case binary:split(Data, <<$,>>, [global, trim]) of diff --git a/apps/dgiot_device/src/utils/dgiot_device_echart.erl b/apps/dgiot_device/src/utils/dgiot_device_echart.erl index 0946843e..026aee20 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_echart.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_echart.erl @@ -20,69 +20,105 @@ -include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot_tdengine/include/dgiot_tdengine.hrl"). --export([get_echart_data/4]). +-export([get_echart_data/5, get_echart_data/4]). -export([get_data_by_month/4, get_data_by_echart_category/4, get_keys/2, get_table/2]). -get_echart_data(Channel, ProductId, DeviceId, Args) -> - Query = maps:without([<<"productid">>, <<"deviceid">>], Args), - Interval = maps:get(<<"interval">>, Args), - TableName = ?Table(DeviceId), +get_echart_data(Channel, ProductId, DeviceId, Devaddr, Args) -> {Names, Results} = - case dgiot_device_tdengine:get_history_data(Channel, ProductId, TableName, Query) of - {TdNames, {ok, #{<<"results">> := TdResults}}} -> - {TdNames, TdResults}; + case dgiot_product:get_sub_tab(ProductId) of + not_find -> + get_echart_data(Channel, ProductId, DeviceId, Args); + Subs when length(Subs) > 1 -> + TableNames = + lists:foldl(fun(SubId, Acc) -> + SubDevid = dgiot_parse_id:get_deviceid(SubId, Devaddr), + Acc ++ [SubDevid] + end, [], Subs), + get_echart_data(Channel, ProductId, TableNames, Args); _ -> - {[], []} + get_echart_data(Channel, ProductId, DeviceId, Args) end, + Interval = maps:get(<<"interval">>, Args), Chartdata = get_echart(ProductId, Results, Names, Interval), {ok, #{<<"chartData">> => Chartdata}}. -get_echart(ProductId, Results, Names, Interval) -> - Maps = dgiot_product:get_prop(ProductId), - Units = dgiot_product:get_unit(ProductId), - NewMaps = maps:merge(#{<<"createdat">> => <<"日期"/utf8>>}, Maps), - Columns = [<<"日期"/utf8>>] ++ Names, - Rows = - lists:foldl(fun(Line, Lines) -> - NewLine = - maps:fold(fun(K, V, Acc) -> - case maps:find(K, NewMaps) of - error -> - Acc; - {ok, Name} -> - case Name of - <<"日期"/utf8>> -> - NewV = dgiot_tdengine_field:get_time(V, Interval), - Acc#{Name => NewV}; - _ -> - Acc#{Name => V} - end - end - end, #{}, Line), - Lines ++ [NewLine] - end, [], Results), -%% io:format("~s ~p Rows = ~p.~n", [?FILE, ?LINE, Rows]), - ChildRows = lists:foldl(fun(X, Acc1) -> - Date = maps:get(<<"日期"/utf8>>, X, dgiot_datetime:format(dgiot_datetime:to_localtime(dgiot_datetime:now_secs()), <<"YY-MM-DD HH:NN:SS">>)), - maps:fold(fun(K1, V1, Acc) -> - case maps:find(K1, Acc) of +get_echart_data(Channel, ProductId, DeviceId, Args) -> + Query = maps:without([<<"productid">>, <<"deviceid">>], Args), + case dgiot_device_tdengine:get_history_data(Channel, ProductId, DeviceId, Query) of + {TdNames, {ok, #{<<"results">> := TdResults}}} -> + {TdNames, TdResults}; + _ -> + {[], []} + end. + + +format_line(ProductId, Interval, Line, Acc) when length(Line) > 1 -> + Acc ++ [format_line_(ProductId, Interval, Line, #{})]; +format_line(_ProductId, _Interval, _Line, Acc) -> + Acc. + +format_line_(_ProductId, _Interval, [], Acc) -> + Acc; + +format_line_(ProductId, Interval, [{<<"createdat">>, V} | Tail], Acc) -> + NewV = dgiot_tdengine_field:get_time(V, Interval), + NewAcc = Acc#{<<"日期"/utf8>> => NewV}, + format_line_(ProductId, Interval, Tail, NewAcc); + +format_line_(ProductId, Interval, [{K, V} | Tail], Acc) -> + NewAcc = + case dgiot_product:get_product_identifier(ProductId, K) of + #{<<"name">> := Name} -> + Acc#{Name => V}; + _ -> + Acc + end, + format_line_(ProductId, Interval, Tail, NewAcc). + +format_results(_ProductId, _Interval, [], Acc) -> + Acc; + +format_results(ProductId, Interval, [Line | Results], Acc) -> + NewAcc = format_line(ProductId, Interval, maps:to_list(Line), Acc), + format_results(ProductId, Interval, Results, NewAcc). + +format_rows([], Acc) -> + Acc; + +format_rows([Row | Rows], Acc) -> + Date = maps:get(<<"日期"/utf8>>, Row, dgiot_datetime:format(dgiot_datetime:to_localtime(dgiot_datetime:now_secs()), <<"YY-MM-DD HH:NN:SS">>)), + NewAcc = + maps:fold(fun(K1, V1, Acc1) -> + case maps:find(K1, Acc1) of error -> - Acc#{K1 => [#{<<"日期"/utf8>> => Date, K1 => V1}]}; + Acc1#{K1 => [#{<<"日期"/utf8>> => Date, K1 => V1}]}; {ok, V2} -> - Acc#{K1 => V2 ++ [#{<<"日期"/utf8>> => Date, K1 => V1}]} + Acc1#{K1 => V2 ++ [#{<<"日期"/utf8>> => Date, K1 => V1}]} end - end, Acc1, maps:without([<<"日期"/utf8>>], X)) - end, #{}, Rows), + end, Acc, maps:without([<<"日期"/utf8>>], Row)), + format_rows(Rows, NewAcc). + +format_childrows(_ProductId, [], Acc) -> + Acc; + +format_childrows(ProductId, [{K, V} | ChildRows], Acc) -> + Unit = + case dgiot_product:get_product_identifier(ProductId, K) of + #{<<"dataType">> := #{<<"specs">> := #{<<"unit">> := Unit1}}} -> + Unit1; + _ -> + <<"">> + end, + NewAcc = Acc ++ [#{<<"columns">> => [<<"日期"/utf8>>, K], <<"rows">> => V, <<"unit">> => Unit}], + format_childrows(ProductId, ChildRows, NewAcc). + +get_echart(ProductId, Results, Names, Interval) -> + Columns = [<<"日期"/utf8>>] ++ Names, + Rows = format_results(ProductId, Interval, Results, []), +%% io:format("~s ~p Rows = ~ts.~n", [?FILE, ?LINE, unicode:characters_to_list(dgiot_json:encode(Rows))]), + ChildRows = format_rows(Rows, #{}), %% io:format("~s ~p ChildRows = ~p.~n", [?FILE, ?LINE, ChildRows]), - Child = - maps:fold(fun(K, V, Acc) -> - Unit = - case maps:find(K, Units) of - error -> <<"">>; - {ok, Unit1} -> Unit1 - end, - Acc ++ [#{<<"columns">> => [<<"日期"/utf8>>, K], <<"rows">> => V, <<"unit">> => Unit}] - end, [], ChildRows), + Child = format_childrows(ProductId, maps:to_list(ChildRows), []), %% io:format("~s ~p Child = ~p.~n", [?FILE, ?LINE, Child]), #{<<"columns">> => Columns, <<"rows">> => Rows, <<"child">> => Child}. @@ -112,10 +148,9 @@ get_data_by_month(Channel, ProductId, DeviceId, Args) -> {ok, Sql} = maps:find(<<"sql">>, Res), {ok, Name_and_nuit} = maps:find(<<"name_and_unit">>, Res), %% 配置参数 - TableName = ?Table(DeviceId), Interval = <<"1d">>, %%传入参数获得结果 - case dgiot_device_tdengine:get_history_data2(Sql, Channel, TableName, Interval, ProductId, StartTime, EndTime) of + case dgiot_device_tdengine:get_history_data2(Sql, Channel, DeviceId, Interval, ProductId, StartTime, EndTime) of %% 判断结果并转换格式 {ok, #{<<"results">> := Results}} -> %% io:format("~s ~p Results = ~p,Name_and_nuit = ~p ~n",[?FILE,?LINE,Results,Name_and_nuit]), @@ -271,10 +306,9 @@ get_data_by_echart_category(Channel, ProductId, DeviceId, Args) -> {ok, Sql} = maps:find(<<"sql">>, Res), {ok, Names} = maps:find(<<"names">>, Res), %% 配置参数 - TableName = ?Table(DeviceId), Interval = <<"1d">>, %%传入参数获得结果 - case dgiot_device_tdengine:get_history_data2(Sql, Channel, TableName, Interval, ProductId, StartTime, EndTime) of + case dgiot_device_tdengine:get_history_data2(Sql, Channel, DeviceId, Interval, ProductId, StartTime, EndTime) of %% 判断结果并转换格式 {ok, #{<<"results">> := Results}} -> %% io:format("~s ~p Results = ~p ~n",[?FILE,?LINE,Results]), diff --git a/apps/dgiot_device/src/utils/dgiot_device_static.erl b/apps/dgiot_device/src/utils/dgiot_device_static.erl index 4f3d13be..10d4a8f8 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_static.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_static.erl @@ -141,7 +141,7 @@ get_realdata({Token, Realdatas}) when is_map(Realdatas) -> {error, Error} -> {error, Error}; {ok, Channel} -> - {_, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, <<"last">>, Keys), + {_, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, <<"_", ProductId/binary>>, <<"last">>, Keys), DB = dgiot_tdengine:get_database(Channel, ProductId), Sql1 = <<"select last(devaddr) as devaddr,", Newkeys/binary, " FROM ", DB/binary, "_", ProductId/binary, " group by devaddr;">>, %% io:format("Channel = ~p.~n Sql = ~p.~n", [Channel, Sql1]), diff --git a/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl b/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl index 7ca01f90..b0b9c0eb 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl @@ -76,27 +76,48 @@ get_device(Channel, ProductId, DeviceId, _DevAddr, Query) -> {error, Reason} end. +get_tablename(DB, [FirstDevId | _] = DeviceIds) -> + format_join(DB, ?Table(FirstDevId), DeviceIds, <<>>, <<>>); + +get_tablename(DB, DeviceId) -> + TableName = ?Table(DeviceId), + {<>, <<"1=1">>, TableName}. + +format_join(_DB, FirstTableName, [], Tcc, Wcc) -> + {Tcc, Wcc, FirstTableName}; +format_join(DB, FirstTableName, [DeviceId | DeviceIds], Tcc, Wcc) -> + TableName = ?Table(DeviceId), + {NewTcc, NewWcc} = + case Tcc of + <<>> -> + {<>, <>}; + _ -> + {<>, <>} + end, + format_join(DB, TableName, DeviceIds, NewTcc, NewWcc). + %% SELECT max(day_electricity) '时间' ,max(charge_current) '日期' FROM _2d26a94cf8._c5e1093e30 WHERE createdat >= now - 1h INTERVAL(1h) limit 10; %% SELECT spread(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h); %% SELECT last(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h); -get_history_data(Channel, ProductId, TableName, Query) -> +get_history_data(Channel, ProductId, DeviceId, Query) -> dgiot_tdengine:transaction(Channel, fun(Context) -> DB = dgiot_tdengine:get_database(Channel, ProductId), + {TableName, DefWhere, FirstTableName} = get_tablename(DB, DeviceId), Function = maps:get(<<"function">>, Query, <<>>), Keys = maps:get(<<"keys">>, Query, <<"*">>), Limit = dgiot_tdengine_select:format_limit(Query), Starttime = dgiot_utils:to_binary(maps:get(<<"starttime">>, Query, dgiot_datetime:now_ms() - 25920000000)), Endtime = dgiot_utils:to_binary(maps:get(<<"endtime">>, Query, dgiot_datetime:now_ms())), - {Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, Function, Keys), + {Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, FirstTableName, Function, Keys), Tail = case maps:get(<<"interval">>, Query, <<>>) of <<>> -> - <<" where createdat >= ", Starttime/binary, " AND createdat <= ", Endtime/binary, " ", Limit/binary, ";">>; + <<" where ", DefWhere/binary, " AND ", FirstTableName/binary, ".createdat >= ", Starttime/binary, " AND ", FirstTableName/binary, ".createdat <= ", Endtime/binary, " ", Limit/binary, ";">>; Interval -> - <<" where createdat >= ", Starttime/binary, " AND createdat <= ", Endtime/binary, " INTERVAL(", Interval/binary, ") ", Limit/binary, ";">> + <<" where ", DefWhere/binary, " AND ", FirstTableName/binary, ".createdat >= ", Starttime/binary, " AND ", FirstTableName/binary, ".createdat <= ", Endtime/binary, " INTERVAL(", Interval/binary, ") ", Limit/binary, ";">> end, - Sql = <<"SELECT ", Newkeys/binary, " FROM ", DB/binary, TableName/binary, Tail/binary>>, + Sql = <<"SELECT ", Newkeys/binary, " FROM ", TableName/binary, Tail/binary>>, %% io:format("~s ~p Sql = ~p.~n", [?FILE, ?LINE, Sql]), {Names, dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_query, Sql)} end). @@ -112,7 +133,7 @@ get_realtime_data(Channel, ProductId, TableName, Query) -> _ -> maps:get(<<"keys">>, Query, <<"*">>) end, - {_Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, <<"">>, Keys), + {_Names, Newkeys} = dgiot_product_tdengine:get_keys(ProductId, TableName, <<"">>, Keys), DB = dgiot_tdengine:get_database(Channel, ProductId), case size(Newkeys) > 0 of true -> @@ -123,11 +144,12 @@ get_realtime_data(Channel, ProductId, TableName, Query) -> end end). -get_history_data2(Order, Channel, TableName, Interval, ProductId, StartTime, _EndTime) -> +get_history_data2(Order, Channel, DeviceId, Interval, ProductId, StartTime, _EndTime) -> %% io:format("~s ~p Order= ~p, Channel= ~p, TableName= ~p, TableName= ~p,~n Interval= ~p, ProductId= ~p, ~n StartTime= ~p, EndTime =~p. ~n",[?FILE,?LINE,Order, Channel, TableName, TableName, Interval, ProductId, StartTime, _EndTime]), dgiot_tdengine:transaction(Channel, fun(Context) -> DB = dgiot_tdengine:get_database(Channel, ProductId), + TableName = ?Table(DeviceId), BinStartTime = dgiot_utils:to_binary(StartTime), Tail = <<" where createdat >= ", BinStartTime/binary, " INTERVAL(", Interval/binary, ") ", ";">>, Sql = <<"SELECT ", Order/binary, " FROM ", DB/binary, TableName/binary, Tail/binary>>, @@ -137,9 +159,8 @@ get_history_data2(Order, Channel, TableName, Interval, ProductId, StartTime, _En get_gps_track(Channel, ProductId, DeviceId, Args) -> Query = Args#{<<"keys">> => <<"latitude,longitude">>}, - TableName = ?Table(DeviceId), {_Names, Results} = - case dgiot_device_tdengine:get_history_data(Channel, ProductId, TableName, Query) of + case dgiot_device_tdengine:get_history_data(Channel, ProductId, DeviceId, Query) of {TdNames, {ok, #{<<"results">> := TdResults}}} -> %% io:format("~s ~p TdResults = ~p.~n", [?FILE, ?LINE, TdResults]), NewTdResults = diff --git a/apps/dgiot_device/src/utils/dgiot_product_tdengine.erl b/apps/dgiot_device/src/utils/dgiot_product_tdengine.erl index cac1e503..7477c559 100644 --- a/apps/dgiot_device/src/utils/dgiot_product_tdengine.erl +++ b/apps/dgiot_device/src/utils/dgiot_product_tdengine.erl @@ -20,7 +20,7 @@ -include_lib("dgiot_tdengine/include/dgiot_tdengine.hrl"). -include_lib("dgiot/include/logger.hrl"). --export([get_product/2, get_products/2, get_keys/3, check_field/3, test_product/0]). +-export([get_product/2, get_products/2, get_keys/4, check_field/3, test_product/0]). -export([get_channel/1, do_channel/3, get_product_data/4]). test_product() -> @@ -123,50 +123,63 @@ get_channel(Session) -> {ok, ChannelId} end. -get_keys(ProductId, Function, <<"*">>) -> - case dgiot_product:lookup_prod(ProductId) of - {ok, #{<<"thing">> := #{<<"properties">> := Props}}} when length(Props) > 0 -> - lists:foldl(fun(X, {Names, Acc}) -> - case X of - #{<<"identifier">> := Identifier, <<"name">> := Name, <<"isstorage">> := true} -> - case Acc of - <<>> -> - {Names ++ [Name], <>}; - _ -> - {Names ++ [Name], <>} - end; +spell_sql(_ProductId, _Function, [], {Names, Acc}) -> + {Names, Acc}; +spell_sql(ProductId, Function, [Key | Keys], {Names, Acc}) -> + case dgiot_product:get_product_identifier(ProductId, Key) of + #{<<"identifier">> := Identifier, <<"name">> := Name, <<"isstorage">> := Isstorage} when Isstorage > 0 -> + {NewNames, NewAcc} = + case Acc of + <<>> -> + {Names ++ [Name], <>}; _ -> - {Names, Acc} - end - end, {[], get_defult(Function)}, Props); - _Other -> - ?LOG(debug, "~p _Other ~p", [ProductId, _Other]), + {Names ++ [Name], <>} + end, + spell_sql(ProductId, Function, Keys, {NewNames, NewAcc}); + _ -> + {Names, Acc} + end. + +get_keys(ProductId, TableName, Function, <<"*">>) -> + case dgiot_product:get_keys(ProductId) of + Keys when length(Keys) > 0 -> + spell_sql(ProductId, Function, Keys, {[], get_defult(TableName, Function)}); + _ -> {[], <<"*">>} end; -get_keys(ProductId, Function, Keys) when Keys == undefined; Keys == <<>> -> - get_keys(ProductId, Function, <<"*">>); +get_keys(ProductId, TableName, Function, Keys) when Keys == undefined; Keys == <<>> -> + get_keys(ProductId, TableName, Function, <<"*">>); -get_keys(ProductId, Function, Keys) -> +get_keys(ProductId, TableName, Function, Keys) -> List = case is_list(Keys) of true -> Keys; false -> re:split(Keys, <<",">>) end, - Maps = get_prop(ProductId), - lists:foldl(fun(X, {Names, Acc}) -> - case maps:find(X, Maps) of - error -> - {Names, Acc}; - {ok, Name} -> - case Acc of - <<>> -> - {Names ++ [Name], <>}; + case dgiot_product:get_keys(ProductId) of + TdKeys when length(TdKeys) > 0 -> + lists:foldl(fun(Key, {Names, Acc}) -> + case TdKeys -- [Key] of + TdKeys -> + {Names, Acc}; _ -> - {Names ++ [Name], <>} + case dgiot_product:get_product_identifier(ProductId, Key) of + #{<<"identifier">> := Identifier, <<"name">> := Name, <<"isstorage">> := Isstorage} when Isstorage > 0 -> + case Acc of + <<>> -> + {Names ++ [Name], <>}; + _ -> + {Names ++ [Name], <>} + end; + _ -> + {Names, Acc} + end end - end - end, {[], get_defult(Function)}, List). + end, {[], get_defult(TableName, Function)}, List); + _ -> + {[], <<"*">>} + end. %% 秒转换为分钟 check_field(_, V, #{<<"specs">> := #{<<"type">> := <<"minutes">>}}) -> @@ -232,21 +245,6 @@ check_field(<<"date">>, V, _) -> check_field(_Typea, V, _) -> dgiot_utils:to_binary(V). -get_prop(ProductId) -> - case dgiot_product:lookup_prod(ProductId) of - {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> - lists:foldl(fun(X, Acc) -> - case X of - #{<<"identifier">> := Identifier, <<"name">> := Name} -> - Acc#{Identifier => Name}; - _ -> Acc - end - end, #{}, Props); - _ -> - #{} - end. - - get_product_data(Channel, ProductId, DeviceId, Args) -> Query = maps:without([<<"productid">>, <<"deviceid">>], Args), ?LOG(info, "Channel ~p Args ~p", [Channel, Args]), @@ -268,17 +266,17 @@ get_product_data(Channel, ProductId, DeviceId, Args) -> end end. -get_defult(<<"first">>) -> - <<"first(createdat) createdat">>; -get_defult(<<"last">>) -> - <<"last(createdat) createdat">>; -get_defult(<<"count">>) -> +get_defult(TableName, <<"first">>) -> + <<"first(", TableName/binary, ".createdat) createdat">>; +get_defult(TableName, <<"last">>) -> + <<"last(", TableName/binary, ".createdat) createdat">>; +get_defult(_TableName, <<"count">>) -> <<"">>; -get_defult(<<"avg">>) -> +get_defult(_TableName, <<"avg">>) -> <<"">>; -get_defult(<<"sum">>) -> +get_defult(_TableName, <<"sum">>) -> <<"">>; -get_defult(<<"stddev">>) -> +get_defult(_TableName, <<"stddev">>) -> <<"">>; -get_defult(_) -> - <<"(createdat) createdat">>. +get_defult(TableName, _) -> + <<"(", TableName/binary, ".createdat) createdat">>. diff --git a/apps/dgiot_dlink/src/dgiot_dlink_channel.erl b/apps/dgiot_dlink/src/dgiot_dlink_channel.erl index dfa45e57..17396a23 100644 --- a/apps/dgiot_dlink/src/dgiot_dlink_channel.erl +++ b/apps/dgiot_dlink/src/dgiot_dlink_channel.erl @@ -107,14 +107,14 @@ start(ChannelId, ChannelArgs) -> init(?TYPE, ChannelId, #{<<"network">> := NetWork, <<"port">> := Port, <<"url">> := Url, <<"Size">> := PoolSize} = _ChannelArgs) -> State = #state{id = ChannelId}, Mode = case Url of - <<"product">> -> - <<"product">>; - _ -> + <<"product">> -> + <<"product">>; + _ -> %% dgiot_grpc_test:start(), - R = dgiot_grpc_client:create_channel_pool(ChannelId, Url, PoolSize), - io:format("~s ~p R ~p ~n",[?FILE, ?LINE, R]), - <<"grpc">> - end, + dgiot_grpc_client:create_channel_pool(ChannelId, Url, PoolSize), +%% io:format("~s ~p R ~p ~n",[?FILE, ?LINE, R]), + <<"grpc">> + end, {ok, State, get_child_spec(NetWork, Port, ChannelId, Mode)}. handle_init(State) -> @@ -130,15 +130,15 @@ handle_event(_EventId, _Event, State) -> {ok, State}. %% gun监测 开始 -handle_message({gun_up, _Pid, Proctol, Status, Env }, #state{id = _ChannelId} = State) -> +handle_message({gun_up, _Pid, Proctol, Status, Env}, #state{id = _ChannelId} = State) -> io:format("~s ~p gun_up = Proctol ~p. Status ~p , Env ~p ~n", [?FILE, ?LINE, Proctol, Status, Env]), {ok, State}; -handle_message({gun_error, _Pid, Proctol, Status, Env }, #state{id = _ChannelId} = State) -> +handle_message({gun_error, _Pid, Proctol, Status, Env}, #state{id = _ChannelId} = State) -> io:format("~s ~p gun_error = Proctol ~p. Status ~p , Env ~p ~n", [?FILE, ?LINE, Proctol, Status, Env]), {ok, State}; -handle_message({gun_down, _Pid, Proctol, Status, Env }, #state{id = _ChannelId} = State) -> +handle_message({gun_down, _Pid, Proctol, Status, Env}, #state{id = _ChannelId} = State) -> io:format("~s ~p gun_down = Proctol ~p. Status ~p , Env ~p ~n", [?FILE, ?LINE, Proctol, Status, Env]), {ok, State}; %% gun监测结束 diff --git a/apps/dgiot_parse/src/dgiot_parse_hook.erl b/apps/dgiot_parse/src/dgiot_parse_hook.erl index b556679f..0994ffc2 100644 --- a/apps/dgiot_parse/src/dgiot_parse_hook.erl +++ b/apps/dgiot_parse/src/dgiot_parse_hook.erl @@ -121,8 +121,15 @@ publish(Pid, Payload) -> receive_ack(ResBody) -> receive {sync_parse, NewResBody} when is_map(NewResBody) -> -%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, length(maps:to_list(NewResBody))]), - {ok, dgiot_json:encode(maps:remove(<<"id">>, NewResBody))}; + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), + erlang:garbage_collect(self()), + M = maps:remove(<<"id">>, NewResBody), + erlang:garbage_collect(self()), + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), + ResBody1 = jsx:encode(M), + erlang:garbage_collect(self()), + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), + {ok, ResBody1}; {sync_parse, NewResBody} -> %% io:format("~s ~p ~p ~n", [?FILE, ?LINE, NewResBody]), {ok, NewResBody}; diff --git a/apps/dgiot_parse/src/dgiot_parsex.erl b/apps/dgiot_parse/src/dgiot_parsex.erl index 3835caf8..872d7847 100644 --- a/apps/dgiot_parse/src/dgiot_parsex.erl +++ b/apps/dgiot_parse/src/dgiot_parsex.erl @@ -589,6 +589,9 @@ merge_table(Name, Class, NewFields, OldFields) -> handle_response(Result) -> Fun = fun(Res, Body) -> + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), + erlang:garbage_collect(self()), + ?LOG(warning, "~p", [erlang:process_info(self(), total_heap_size)]), case jsx:is_json(Body) of true -> case catch jsx:decode(Body, [{labels, binary}, return_maps]) of diff --git a/etc/emqx.conf b/etc/emqx.conf index 35c7210b..d3875f1f 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -454,7 +454,7 @@ log.to = both ## this level will be logged. ## ## Default: warning -log.level = warning +log.level = error ## The dir for log files. ##