fix: fix mqtt trace bug

This commit is contained in:
lsxredrain 2021-10-09 01:10:08 +08:00
parent c38b046f19
commit 1a53ebec4e
6 changed files with 196 additions and 136 deletions

View File

@ -13,6 +13,7 @@
* add task metircs ([79cfd2d](https://github.com/dgiot/dgiot/commit/79cfd2dc45e8a396dff7351eaf40ab5e07be83c5))
* add tcp metircs ([f8e19b1](https://github.com/dgiot/dgiot/commit/f8e19b1b241200b5d1324d689f4e2d9536e74a4d))
* add tcp transparent ([b1f091c](https://github.com/dgiot/dgiot/commit/b1f091cba9fa143fcc8040a6fa8848ac89412e96))
* add tdengie metrics ([c38b046](https://github.com/dgiot/dgiot/commit/c38b046f1939bf6d59a106a62b2b1f3c86a89127))
### Performance Improvements

View File

@ -63,7 +63,6 @@ debug(Format, Args) ->
debug(Metadata, Format, Args) when is_map(Metadata) ->
emqx_logger:debug(Format, Args, Metadata).
-spec(info(unicode:chardata()) -> ok).
info(Msg) ->
emqx_logger:info(Msg).
@ -121,7 +120,6 @@ critical(Metadata, Format, Args) when is_map(Metadata) ->
%% module日志等级 logger:get_module_level(dgiot)
%% module日志等级 logger:set_module_level(dgiot_wechat,debug)
set_loglevel(<<"system">>, <<"dgiot">>, Level) ->
emqx_logger:set_log_level(dgiot_utils:to_atom(Level));
set_loglevel(<<"app">>, Name, Level) ->

View File

@ -0,0 +1,178 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_tracer).
-export([
add_trace/1,
del_trace/1,
check_trace/3
]).
%% Mnesia bootstrap
-export([mnesia/1]).
-define(DGIOT_CLIENT_TRACE, dgiot_client_trace).
-define(DGIOT_TOPIC_TRACE, dgiot_topic_trace).
-record(dgiot_client_trace, {key :: binary(), value}).
-record(dgiot_topic_trace, {key :: binary(), value}).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% APIs
%% @doc Create or replicate topics table.
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
%% Optimize storage
StoreProps = [{ets, [{read_concurrency, true},
{write_concurrency, true}
]}],
ok = ekka_mnesia:create_table(?DGIOT_TOPIC_TRACE, [
{ram_copies, [node()]},
{record_name, ?DGIOT_TOPIC_TRACE},
{attributes, record_info(fields, ?DGIOT_TOPIC_TRACE)},
{type, ordered_set},
{storage_properties, StoreProps}]),
ok = ekka_mnesia:create_table(?DGIOT_CLIENT_TRACE, [
{ram_copies, [node()]},
{record_name, ?DGIOT_CLIENT_TRACE},
{attributes, record_info(fields, ?DGIOT_CLIENT_TRACE)},
{type, ordered_set},
{storage_properties, StoreProps}]);
mnesia(copy) ->
%% Copy topics table
ok = ekka_mnesia:copy_table(?DGIOT_TOPIC_TRACE, ram_copies),
ok = ekka_mnesia:copy_table(?DGIOT_CLIENT_TRACE, ram_copies).
add_trace({Type, Id}) when is_list(Id) ->
add_trace({Type, list_to_binary(Id)});
add_trace({Type, Id}) when is_atom(Id) ->
add_trace({Type, atom_to_binary(Id)});
add_trace({clientid, ClientId}) ->
insert(?DGIOT_CLIENT_TRACE, ClientId, clientid);
add_trace({topic, TopicFilter}) ->
insert(?DGIOT_TOPIC_TRACE, TopicFilter, topic);
add_trace(_) ->
ignore.
del_trace({Type, Id}) when is_list(Id) ->
del_trace({Type, list_to_binary(Id)});
del_trace({Type, Id}) when is_atom(Id) ->
del_trace({Type, atom_to_binary(Id)});
del_trace({clientid, ClientId}) ->
delete(?DGIOT_CLIENT_TRACE, ClientId);
del_trace({topic, TopicFilter}) ->
delete(?DGIOT_TOPIC_TRACE, TopicFilter);
del_trace(_) ->
ignore.
get_trace({Type, Id}) when is_list(Id) ->
get_trace({Type, list_to_binary(Id)});
get_trace({Type, Id}) when is_atom(Id) ->
get_trace({Type, atom_to_binary(Id)});
get_trace({clientid, ClientId}) ->
ets:member(?DGIOT_CLIENT_TRACE, ClientId);
get_trace({topic, Topic}) ->
lists:any(fun({emqx_topic_trace,TopicFilter, _}) ->
emqx_topic:match(Topic, TopicFilter)
end, ets:tab2list(?DGIOT_TOPIC_TRACE));
get_trace(_) ->
false.
check_trace(From, Topic,Payload) ->
case get_trace({clientid, From}) of
true ->
BinClientId = dgiot_utils:to_binary(From),
dgiot_mqtt:publish(self(), <<"logger_trace/trace/", BinClientId/binary, Topic/binary>>, Payload);
false ->
case get_trace({topic, Topic}) of
true ->
dgiot_mqtt:publish(self(), <<"logger_trace/trace/", Topic/binary>>, Payload);
false ->
false
end
end.
%%--------------------------------------------------------------------
%% Mnesia APIs
%%--------------------------------------------------------------------
lookup(Tab, Key) ->
Result = mnesia:transaction(fun mnesia:read/1, [{Tab, Key}]),
result(Result).
insert(?DGIOT_CLIENT_TRACE, Key, Value) ->
case lookup(?DGIOT_CLIENT_TRACE, Key) of
{ok, _} ->
delete(?DGIOT_CLIENT_TRACE, Key);
_ ->
pass
end,
case get(?DGIOT_CLIENT_TRACE, Key) of
not_find ->
insert_(?DGIOT_CLIENT_TRACE,#?DGIOT_CLIENT_TRACE{key = Key, value = Value});
_ -> pass
end,
insert_(?DGIOT_CLIENT_TRACE, #?DGIOT_CLIENT_TRACE{key = Key, value = Value});
insert(?DGIOT_TOPIC_TRACE, Key, Value) ->
case lookup(?DGIOT_TOPIC_TRACE, Key) of
{ok, _} ->
delete(?DGIOT_TOPIC_TRACE, Key);
_ ->
pass
end,
case get(?DGIOT_TOPIC_TRACE, Key) of
not_find ->
insert_(?DGIOT_TOPIC_TRACE,#?DGIOT_TOPIC_TRACE{key = Key, value = Value});
_ -> pass
end,
insert_(?DGIOT_TOPIC_TRACE, #?DGIOT_TOPIC_TRACE{key = Key, value = Value}).
insert_(?DGIOT_CLIENT_TRACE, Record) ->
F = fun() ->
mnesia:write(?DGIOT_CLIENT_TRACE, Record, write)
end,
Result = mnesia:transaction(F),
result(Result);
insert_(?DGIOT_TOPIC_TRACE, Record) ->
F = fun() ->
mnesia:write(?DGIOT_TOPIC_TRACE, Record, write)
end,
Result = mnesia:transaction(F),
result(Result).
get(Name, Key) ->
case ets:lookup(Name, Key) of
[] -> not_find;
[{Key, Value} | _] -> Value;
[Value | _] -> Value
end.
delete(TAB, Key) ->
F =
fun() ->
mnesia:delete({TAB, Key})
end,
Result = mnesia:transaction(F),
result(Result).
result({atomic, ok}) -> true;
result({atomic, []}) -> {error, empty};
result({aborted, Reason}) -> {error, Reason};
result({atomic, Result}) -> {ok, Result};
result(Result) -> Result.

View File

@ -240,43 +240,21 @@ do_request(get_trace, _Args, _Context, _Req) ->
{200, #{<<"code">> => 200, <<"data">> => NewData}};
%% traces : traces :traces
do_request(post_trace, #{<<"action">> := Action, <<"tracetype">> := Tracetype, <<"handle">> := Handle, <<"deviceid">> := DeviceId, <<"order">> := Order, <<"level">> := Level}, _Context, _Req) ->
case length(emqx_tracer:lookup_traces()) > 10 of
false ->
Rtn =
case Action of
<<"start">> ->
{ok, #{<<"results">> := [#{<<"objectId">> := HandleId} | _]}} = dgiot_parse:query_object(<<"LogLevel">>, #{<<"where">> => #{<<"name">> => <<"dgiot_handle">>, <<"type">> => <<"dgiot_handle">>}}),
dgiot_parse:create_object(<<"LogLevel">>, #{
<<"level">> => Level,
<<"parent">> => #{
<<"__type">> => <<"Pointer">>,
<<"className">> => <<"LogLevel">>,
<<"objectId">> => HandleId
},
<<"name">> => Handle,
<<"deviceid">> => DeviceId,
<<"type">> => <<"trace">>,
<<"order">> => Order,
<<"topic">> => Handle,
<<"path">> => <<"tracelog/", Handle/binary, ".txt">>
}),
emqx_tracer:start_trace({dgiot_utils:to_atom(Tracetype), Handle}, dgiot_utils:to_atom(Level), get_tracelog(<<Handle/binary, ".txt">>));
<<"stop">> ->
LoglevelId = dgiot_parse:get_loglevelid(Handle, <<"trace">>),
dgiot_parse:del_object(<<"LogLevel">>, LoglevelId),
emqx_tracer:stop_trace({dgiot_utils:to_atom(Tracetype), Handle});
_Other ->
{error, _Other}
end,
case Rtn of
ok ->
{200, #{<<"code">> => 200, <<"msg">> => <<"SUCCESS">>}};
{error, Reason} ->
{400, #{<<"code">> => 400, <<"error">> => dgiot_utils:format("~p", [Reason])}}
end;
true ->
{400, #{<<"code">> => 400, <<"error">> => <<"trace超出限制了限制10个"/utf8>>}}
do_request(post_trace, #{<<"action">> := Action, <<"tracetype">> := Tracetype, <<"handle">> := Handle}, _Context, _Req) ->
Rtn =
case Action of
<<"start">> ->
dgiot_tracer:add_trace({dgiot_utils:to_atom(Tracetype), Handle});
<<"stop">> ->
dgiot_tracer:del_trace({dgiot_utils:to_atom(Tracetype), Handle});
_Other ->
{error, _Other}
end,
case Rtn of
ok ->
{200, #{<<"code">> => 200, <<"msg">> => <<"SUCCESS">>}};
{error, Reason} ->
{400, #{<<"code">> => 400, <<"error">> => dgiot_utils:format("~p", [Reason])}}
end;
@ -342,9 +320,3 @@ format_val(Mod, Schema) ->
end, Acc, Methods)
end, [], Paths),
{Tpl, [{mod, Mod}, {apis, Apis}], [{api, record_info(fields, api)}]}.
get_tracelog(Handle) ->
{file, Here} = code:is_loaded(?MODULE),
Dir = filename:dirname(filename:dirname(Here)),
Filename = filename:join([Dir, "priv/www/tracelog/", Handle]),
dgiot_utils:to_list(Filename).

View File

@ -168,6 +168,7 @@ init(?TYPE, Channel, Cfg) ->
%%
handle_init(State) ->
emqx_hooks:add('logger.send', {?MODULE, send, []}),
emqx_hooks:add('mqtt_publish.trace', {dgiot_tracer, check_trace, []}),
{ok, State}.
handle_message(config, #state{cfg = Cfg} = State) ->

View File

@ -21,16 +21,6 @@
-logger_header("[Tracer]").
%% Mnesia bootstrap
-export([mnesia/1]).
-define(EMQX_CLIENT_TRACE, emqx_client_trace).
-define(EMQX_TOPIC_TRACE, emqx_topic_trace).
-record(?EMQX_CLIENT_TRACE, {key :: binary(), value}).
-record(?EMQX_TOPIC_TRACE, {key :: binary(), value}).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% APIs
-export([trace/2
, start_trace/3
@ -82,12 +72,7 @@ trace(publish, #message{topic = <<"logger_trace", _/binary>>}) ->
ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) ->
case check_trace(From, Topic) of
true ->
emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}}, " ~0p ~p", [Payload]);
_ ->
ignore
end.
emqx_hooks:run('mqtt_publish.trace',[From, Topic, Payload]).
%% @doc Start to trace clientid or topic.
-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}).
@ -132,7 +117,6 @@ install_trace_handler(Who, Level, LogFile) ->
{fun filter_by_meta_key/2, Who}}]})
of
ok ->
add_trace(Who),
?LOG(info, "Start trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "Start trace for ~p failed, error: ~p", [Who, Reason]),
@ -142,7 +126,6 @@ install_trace_handler(Who, Level, LogFile) ->
uninstall_trance_handler(Who) ->
case logger:remove_handler(handler_id(Who)) of
ok ->
del_trace(Who),
?LOG(info, "Stop trace for ~p", [Who]);
{error, Reason} ->
?LOG(error, "Stop trace for ~p failed, error: ~p", [Who, Reason]),
@ -184,76 +167,3 @@ handler_name(Bin) ->
hashstr(Bin) ->
binary_to_list(emqx_misc:bin2hexstr_A_F(Bin)).
%% @doc Create or replicate topics table.
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
%% Optimize storage
StoreProps = [{ets, [{read_concurrency, true},
{write_concurrency, true}
]}],
ok = ekka_mnesia:create_table(?EMQX_TOPIC_TRACE, [
{ram_copies, [node()]},
{record_name, ?EMQX_TOPIC_TRACE},
{attributes, record_info(fields, ?EMQX_TOPIC_TRACE)},
{type, ordered_set},
{storage_properties, StoreProps}]),
ok = ekka_mnesia:create_table(?EMQX_CLIENT_TRACE, [
{ram_copies, [node()]},
{record_name, ?EMQX_CLIENT_TRACE},
{attributes, record_info(fields, ?EMQX_CLIENT_TRACE)},
{type, ordered_set},
{storage_properties, StoreProps}]);
mnesia(copy) ->
%% Copy topics table
ok = ekka_mnesia:copy_table(?EMQX_TOPIC_TRACE, ram_copies),
ok = ekka_mnesia:copy_table(?EMQX_CLIENT_TRACE, ram_copies).
add_trace({Type, Id}) when is_list(Id) ->
add_trace({Type, list_to_binary(Id)});
add_trace({Type, Id}) when is_atom(Id) ->
add_trace({Type, atom_to_binary(Id)});
add_trace({clientid, ClientId}) ->
ets:insert(?EMQX_CLIENT_TRACE, {ClientId, clientid});
add_trace({topic, TopicFilter}) ->
ets:insert(?EMQX_TOPIC_TRACE, {TopicFilter, topic});
add_trace(_) ->
ignore.
del_trace({Type, Id}) when is_list(Id) ->
del_trace({Type, list_to_binary(Id)});
del_trace({Type, Id}) when is_atom(Id) ->
del_trace({Type, atom_to_binary(Id)});
del_trace({clientid, ClientId}) ->
ets:delete_object(?EMQX_CLIENT_TRACE, {ClientId,clientid});
del_trace({topic, TopicFilter}) ->
ets:delete_object(?EMQX_TOPIC_TRACE, {TopicFilter,topic});
del_trace(_) ->
ignore.
get_trace({Type, Id}) when is_list(Id) ->
get_trace({Type, list_to_binary(Id)});
get_trace({Type, Id}) when is_atom(Id) ->
get_trace({Type, atom_to_binary(Id)});
get_trace({clientid, ClientId}) ->
ets:member(?EMQX_CLIENT_TRACE, {ClientId, clientid});
get_trace({topic, Topic}) ->
lists:any(fun({TopicFilter, _}) ->
emqx_topic:match(Topic, TopicFilter)
end, ets:tab2list(?EMQX_TOPIC_TRACE));
get_trace(_) ->
false.
check_trace(From, Topic) ->
case get_trace({clientid, From}) of
true ->
true;
false ->
case get_trace({topic, Topic}) of
true ->
true;
false ->
false
end
end.