diff --git a/CHANGELOG.md b/CHANGELOG.md index 31a52396..cf7a7afa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * add task metircs ([79cfd2d](https://github.com/dgiot/dgiot/commit/79cfd2dc45e8a396dff7351eaf40ab5e07be83c5)) * add tcp metircs ([f8e19b1](https://github.com/dgiot/dgiot/commit/f8e19b1b241200b5d1324d689f4e2d9536e74a4d)) * add tcp transparent ([b1f091c](https://github.com/dgiot/dgiot/commit/b1f091cba9fa143fcc8040a6fa8848ac89412e96)) +* add tdengie metrics ([c38b046](https://github.com/dgiot/dgiot/commit/c38b046f1939bf6d59a106a62b2b1f3c86a89127)) ### Performance Improvements diff --git a/apps/dgiot/src/otp/dgiot_logger.erl b/apps/dgiot/src/otp/dgiot_logger.erl index 9d4e708f..b6108751 100644 --- a/apps/dgiot/src/otp/dgiot_logger.erl +++ b/apps/dgiot/src/otp/dgiot_logger.erl @@ -63,7 +63,6 @@ debug(Format, Args) -> debug(Metadata, Format, Args) when is_map(Metadata) -> emqx_logger:debug(Format, Args, Metadata). - -spec(info(unicode:chardata()) -> ok). info(Msg) -> emqx_logger:info(Msg). @@ -121,7 +120,6 @@ critical(Metadata, Format, Args) when is_map(Metadata) -> %% 获取module日志等级 logger:get_module_level(dgiot) %% 设置module日志等级 logger:set_module_level(dgiot_wechat,debug) set_loglevel(<<"system">>, <<"dgiot">>, Level) -> - emqx_logger:set_log_level(dgiot_utils:to_atom(Level)); set_loglevel(<<"app">>, Name, Level) -> diff --git a/apps/dgiot/src/utils/dgiot_tracer.erl b/apps/dgiot/src/utils/dgiot_tracer.erl new file mode 100644 index 00000000..4e091ccb --- /dev/null +++ b/apps/dgiot/src/utils/dgiot_tracer.erl @@ -0,0 +1,178 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_tracer). + +-export([ + add_trace/1, + del_trace/1, + check_trace/3 +]). + +%% Mnesia bootstrap +-export([mnesia/1]). +-define(DGIOT_CLIENT_TRACE, dgiot_client_trace). +-define(DGIOT_TOPIC_TRACE, dgiot_topic_trace). +-record(dgiot_client_trace, {key :: binary(), value}). +-record(dgiot_topic_trace, {key :: binary(), value}). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% APIs +%% @doc Create or replicate topics table. +-spec(mnesia(boot | copy) -> ok). +mnesia(boot) -> + %% Optimize storage + StoreProps = [{ets, [{read_concurrency, true}, + {write_concurrency, true} + ]}], + ok = ekka_mnesia:create_table(?DGIOT_TOPIC_TRACE, [ + {ram_copies, [node()]}, + {record_name, ?DGIOT_TOPIC_TRACE}, + {attributes, record_info(fields, ?DGIOT_TOPIC_TRACE)}, + {type, ordered_set}, + {storage_properties, StoreProps}]), + ok = ekka_mnesia:create_table(?DGIOT_CLIENT_TRACE, [ + {ram_copies, [node()]}, + {record_name, ?DGIOT_CLIENT_TRACE}, + {attributes, record_info(fields, ?DGIOT_CLIENT_TRACE)}, + {type, ordered_set}, + {storage_properties, StoreProps}]); +mnesia(copy) -> + %% Copy topics table + ok = ekka_mnesia:copy_table(?DGIOT_TOPIC_TRACE, ram_copies), + ok = ekka_mnesia:copy_table(?DGIOT_CLIENT_TRACE, ram_copies). + +add_trace({Type, Id}) when is_list(Id) -> + add_trace({Type, list_to_binary(Id)}); +add_trace({Type, Id}) when is_atom(Id) -> + add_trace({Type, atom_to_binary(Id)}); +add_trace({clientid, ClientId}) -> + insert(?DGIOT_CLIENT_TRACE, ClientId, clientid); +add_trace({topic, TopicFilter}) -> + insert(?DGIOT_TOPIC_TRACE, TopicFilter, topic); +add_trace(_) -> + ignore. + +del_trace({Type, Id}) when is_list(Id) -> + del_trace({Type, list_to_binary(Id)}); +del_trace({Type, Id}) when is_atom(Id) -> + del_trace({Type, atom_to_binary(Id)}); +del_trace({clientid, ClientId}) -> + delete(?DGIOT_CLIENT_TRACE, ClientId); +del_trace({topic, TopicFilter}) -> + delete(?DGIOT_TOPIC_TRACE, TopicFilter); +del_trace(_) -> + ignore. +get_trace({Type, Id}) when is_list(Id) -> + get_trace({Type, list_to_binary(Id)}); +get_trace({Type, Id}) when is_atom(Id) -> + get_trace({Type, atom_to_binary(Id)}); +get_trace({clientid, ClientId}) -> + ets:member(?DGIOT_CLIENT_TRACE, ClientId); +get_trace({topic, Topic}) -> + lists:any(fun({emqx_topic_trace,TopicFilter, _}) -> + emqx_topic:match(Topic, TopicFilter) + end, ets:tab2list(?DGIOT_TOPIC_TRACE)); +get_trace(_) -> + false. + +check_trace(From, Topic,Payload) -> + case get_trace({clientid, From}) of + true -> + BinClientId = dgiot_utils:to_binary(From), + dgiot_mqtt:publish(self(), <<"logger_trace/trace/", BinClientId/binary, Topic/binary>>, Payload); + false -> + case get_trace({topic, Topic}) of + true -> + dgiot_mqtt:publish(self(), <<"logger_trace/trace/", Topic/binary>>, Payload); + false -> + false + end + end. + + +%%-------------------------------------------------------------------- +%% Mnesia APIs +%%-------------------------------------------------------------------- +lookup(Tab, Key) -> + Result = mnesia:transaction(fun mnesia:read/1, [{Tab, Key}]), + result(Result). + +insert(?DGIOT_CLIENT_TRACE, Key, Value) -> + case lookup(?DGIOT_CLIENT_TRACE, Key) of + {ok, _} -> + delete(?DGIOT_CLIENT_TRACE, Key); + _ -> + pass + end, + case get(?DGIOT_CLIENT_TRACE, Key) of + not_find -> + insert_(?DGIOT_CLIENT_TRACE,#?DGIOT_CLIENT_TRACE{key = Key, value = Value}); + _ -> pass + end, + insert_(?DGIOT_CLIENT_TRACE, #?DGIOT_CLIENT_TRACE{key = Key, value = Value}); + +insert(?DGIOT_TOPIC_TRACE, Key, Value) -> + case lookup(?DGIOT_TOPIC_TRACE, Key) of + {ok, _} -> + delete(?DGIOT_TOPIC_TRACE, Key); + _ -> + pass + end, + case get(?DGIOT_TOPIC_TRACE, Key) of + not_find -> + insert_(?DGIOT_TOPIC_TRACE,#?DGIOT_TOPIC_TRACE{key = Key, value = Value}); + _ -> pass + end, + insert_(?DGIOT_TOPIC_TRACE, #?DGIOT_TOPIC_TRACE{key = Key, value = Value}). + +insert_(?DGIOT_CLIENT_TRACE, Record) -> + F = fun() -> + mnesia:write(?DGIOT_CLIENT_TRACE, Record, write) + end, + Result = mnesia:transaction(F), + result(Result); + +insert_(?DGIOT_TOPIC_TRACE, Record) -> + F = fun() -> + mnesia:write(?DGIOT_TOPIC_TRACE, Record, write) + end, + Result = mnesia:transaction(F), + result(Result). + +get(Name, Key) -> + case ets:lookup(Name, Key) of + [] -> not_find; + [{Key, Value} | _] -> Value; + [Value | _] -> Value + end. + +delete(TAB, Key) -> + F = + fun() -> + mnesia:delete({TAB, Key}) + end, + Result = mnesia:transaction(F), + result(Result). + + +result({atomic, ok}) -> true; +result({atomic, []}) -> {error, empty}; +result({aborted, Reason}) -> {error, Reason}; +result({atomic, Result}) -> {ok, Result}; +result(Result) -> Result. diff --git a/apps/dgiot_api/src/handler/dgiot_system_handler.erl b/apps/dgiot_api/src/handler/dgiot_system_handler.erl index dc101e56..cb688516 100644 --- a/apps/dgiot_api/src/handler/dgiot_system_handler.erl +++ b/apps/dgiot_api/src/handler/dgiot_system_handler.erl @@ -240,43 +240,21 @@ do_request(get_trace, _Args, _Context, _Req) -> {200, #{<<"code">> => 200, <<"data">> => NewData}}; %% traces 概要: traces 描述:启动,停止traces -do_request(post_trace, #{<<"action">> := Action, <<"tracetype">> := Tracetype, <<"handle">> := Handle, <<"deviceid">> := DeviceId, <<"order">> := Order, <<"level">> := Level}, _Context, _Req) -> - case length(emqx_tracer:lookup_traces()) > 10 of - false -> - Rtn = - case Action of - <<"start">> -> - {ok, #{<<"results">> := [#{<<"objectId">> := HandleId} | _]}} = dgiot_parse:query_object(<<"LogLevel">>, #{<<"where">> => #{<<"name">> => <<"dgiot_handle">>, <<"type">> => <<"dgiot_handle">>}}), - dgiot_parse:create_object(<<"LogLevel">>, #{ - <<"level">> => Level, - <<"parent">> => #{ - <<"__type">> => <<"Pointer">>, - <<"className">> => <<"LogLevel">>, - <<"objectId">> => HandleId - }, - <<"name">> => Handle, - <<"deviceid">> => DeviceId, - <<"type">> => <<"trace">>, - <<"order">> => Order, - <<"topic">> => Handle, - <<"path">> => <<"tracelog/", Handle/binary, ".txt">> - }), - emqx_tracer:start_trace({dgiot_utils:to_atom(Tracetype), Handle}, dgiot_utils:to_atom(Level), get_tracelog(<>)); - <<"stop">> -> - LoglevelId = dgiot_parse:get_loglevelid(Handle, <<"trace">>), - dgiot_parse:del_object(<<"LogLevel">>, LoglevelId), - emqx_tracer:stop_trace({dgiot_utils:to_atom(Tracetype), Handle}); - _Other -> - {error, _Other} - end, - case Rtn of - ok -> - {200, #{<<"code">> => 200, <<"msg">> => <<"SUCCESS">>}}; - {error, Reason} -> - {400, #{<<"code">> => 400, <<"error">> => dgiot_utils:format("~p", [Reason])}} - end; - true -> - {400, #{<<"code">> => 400, <<"error">> => <<"trace超出限制了,限制10个"/utf8>>}} +do_request(post_trace, #{<<"action">> := Action, <<"tracetype">> := Tracetype, <<"handle">> := Handle}, _Context, _Req) -> + Rtn = + case Action of + <<"start">> -> + dgiot_tracer:add_trace({dgiot_utils:to_atom(Tracetype), Handle}); + <<"stop">> -> + dgiot_tracer:del_trace({dgiot_utils:to_atom(Tracetype), Handle}); + _Other -> + {error, _Other} + end, + case Rtn of + ok -> + {200, #{<<"code">> => 200, <<"msg">> => <<"SUCCESS">>}}; + {error, Reason} -> + {400, #{<<"code">> => 400, <<"error">> => dgiot_utils:format("~p", [Reason])}} end; @@ -342,9 +320,3 @@ format_val(Mod, Schema) -> end, Acc, Methods) end, [], Paths), {Tpl, [{mod, Mod}, {apis, Apis}], [{api, record_info(fields, api)}]}. - -get_tracelog(Handle) -> - {file, Here} = code:is_loaded(?MODULE), - Dir = filename:dirname(filename:dirname(Here)), - Filename = filename:join([Dir, "priv/www/tracelog/", Handle]), - dgiot_utils:to_list(Filename). diff --git a/apps/dgiot_parse/src/sever/dgiot_parse_channel.erl b/apps/dgiot_parse/src/sever/dgiot_parse_channel.erl index d972f043..7017ec83 100644 --- a/apps/dgiot_parse/src/sever/dgiot_parse_channel.erl +++ b/apps/dgiot_parse/src/sever/dgiot_parse_channel.erl @@ -168,6 +168,7 @@ init(?TYPE, Channel, Cfg) -> %% 初始化池子 handle_init(State) -> emqx_hooks:add('logger.send', {?MODULE, send, []}), + emqx_hooks:add('mqtt_publish.trace', {dgiot_tracer, check_trace, []}), {ok, State}. handle_message(config, #state{cfg = Cfg} = State) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 07d77fa3..43f22424 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -21,16 +21,6 @@ -logger_header("[Tracer]"). -%% Mnesia bootstrap --export([mnesia/1]). --define(EMQX_CLIENT_TRACE, emqx_client_trace). --define(EMQX_TOPIC_TRACE, emqx_topic_trace). --record(?EMQX_CLIENT_TRACE, {key :: binary(), value}). --record(?EMQX_TOPIC_TRACE, {key :: binary(), value}). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - %% APIs -export([trace/2 , start_trace/3 @@ -82,12 +72,7 @@ trace(publish, #message{topic = <<"logger_trace", _/binary>>}) -> ignore; trace(publish, #message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> - case check_trace(From, Topic) of - true -> - emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, " ~0p ~p", [Payload]); - _ -> - ignore - end. + emqx_hooks:run('mqtt_publish.trace',[From, Topic, Payload]). %% @doc Start to trace clientid or topic. -spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}). @@ -132,7 +117,6 @@ install_trace_handler(Who, Level, LogFile) -> {fun filter_by_meta_key/2, Who}}]}) of ok -> - add_trace(Who), ?LOG(info, "Start trace for ~p", [Who]); {error, Reason} -> ?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]), @@ -142,7 +126,6 @@ install_trace_handler(Who, Level, LogFile) -> uninstall_trance_handler(Who) -> case logger:remove_handler(handler_id(Who)) of ok -> - del_trace(Who), ?LOG(info, "Stop trace for ~p", [Who]); {error, Reason} -> ?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]), @@ -184,76 +167,3 @@ handler_name(Bin) -> hashstr(Bin) -> binary_to_list(emqx_misc:bin2hexstr_A_F(Bin)). - - -%% @doc Create or replicate topics table. --spec(mnesia(boot | copy) -> ok). -mnesia(boot) -> - %% Optimize storage - StoreProps = [{ets, [{read_concurrency, true}, - {write_concurrency, true} - ]}], - ok = ekka_mnesia:create_table(?EMQX_TOPIC_TRACE, [ - {ram_copies, [node()]}, - {record_name, ?EMQX_TOPIC_TRACE}, - {attributes, record_info(fields, ?EMQX_TOPIC_TRACE)}, - {type, ordered_set}, - {storage_properties, StoreProps}]), - ok = ekka_mnesia:create_table(?EMQX_CLIENT_TRACE, [ - {ram_copies, [node()]}, - {record_name, ?EMQX_CLIENT_TRACE}, - {attributes, record_info(fields, ?EMQX_CLIENT_TRACE)}, - {type, ordered_set}, - {storage_properties, StoreProps}]); -mnesia(copy) -> - %% Copy topics table - ok = ekka_mnesia:copy_table(?EMQX_TOPIC_TRACE, ram_copies), - ok = ekka_mnesia:copy_table(?EMQX_CLIENT_TRACE, ram_copies). - -add_trace({Type, Id}) when is_list(Id) -> - add_trace({Type, list_to_binary(Id)}); -add_trace({Type, Id}) when is_atom(Id) -> - add_trace({Type, atom_to_binary(Id)}); -add_trace({clientid, ClientId}) -> - ets:insert(?EMQX_CLIENT_TRACE, {ClientId, clientid}); -add_trace({topic, TopicFilter}) -> - ets:insert(?EMQX_TOPIC_TRACE, {TopicFilter, topic}); -add_trace(_) -> - ignore. - -del_trace({Type, Id}) when is_list(Id) -> - del_trace({Type, list_to_binary(Id)}); -del_trace({Type, Id}) when is_atom(Id) -> - del_trace({Type, atom_to_binary(Id)}); -del_trace({clientid, ClientId}) -> - ets:delete_object(?EMQX_CLIENT_TRACE, {ClientId,clientid}); -del_trace({topic, TopicFilter}) -> - ets:delete_object(?EMQX_TOPIC_TRACE, {TopicFilter,topic}); -del_trace(_) -> - ignore. - -get_trace({Type, Id}) when is_list(Id) -> - get_trace({Type, list_to_binary(Id)}); -get_trace({Type, Id}) when is_atom(Id) -> - get_trace({Type, atom_to_binary(Id)}); -get_trace({clientid, ClientId}) -> - ets:member(?EMQX_CLIENT_TRACE, {ClientId, clientid}); -get_trace({topic, Topic}) -> - lists:any(fun({TopicFilter, _}) -> - emqx_topic:match(Topic, TopicFilter) - end, ets:tab2list(?EMQX_TOPIC_TRACE)); -get_trace(_) -> - false. - -check_trace(From, Topic) -> - case get_trace({clientid, From}) of - true -> - true; - false -> - case get_trace({topic, Topic}) of - true -> - true; - false -> - false - end - end.