fix log bug

This commit is contained in:
lsxredrain 2021-08-27 18:29:59 +08:00
parent 50d6fb5406
commit 6b4a362a87
8 changed files with 77 additions and 86 deletions

View File

@ -26,7 +26,6 @@
start(_StartType, _StartArgs) ->
dgiot_data:init(),
dgiot_data:search_data(),
emqx_hooks:add('logger.send', {dgiot_logger, send, []}),
dgiot_datetime:start_time(),
start_mnesia(),
dgiot:init_plugins(),

View File

@ -20,7 +20,6 @@
-export([
set_loglevel/3,
send/2,
test/0]).
%% Logs
-export([debug/1
@ -97,7 +96,6 @@ error(Format, Args) ->
error(Metadata, Format, Args) when is_map(Metadata) ->
emqx_logger:error(Format, Args, Metadata).
-spec(critical(unicode:chardata()) -> ok).
critical(Msg) ->
logger:critical(Msg).
@ -110,65 +108,6 @@ critical(Format, Args) ->
critical(Metadata, Format, Args) when is_map(Metadata) ->
logger:critical(Format, Args, Metadata).
send(Meta, Payload) when is_list(Payload) ->
send(Meta, iolist_to_binary(Payload));
send(#{error_logger := _Error_logger}, Payload) ->
case jsx:is_json(Payload) of
true ->
Map = jiffy:decode(Payload, [return_maps]),
Mfa = maps:get(<<"mfa">>, Map, <<"all">>),
Line = get_line(Map),
Topic = <<"$SYS/log/", Mfa/binary, "/", Line/binary>>,
NewMap = maps:with([<<"time">>, <<"pid">>, <<"msg">>, <<"mfa">>, <<"line">>, <<"level">>, <<"clientid">>, <<"topic">>, <<"peername">>], Map),
dgiot_mqtt:publish(Mfa, Topic, get_body(NewMap, [error_logger]));
false ->
Topic1 = <<"$SYS/error_logger/all">>,
dgiot_mqtt:publish(self(), Topic1, Payload)
end;
send(#{topic := _Topic} = Meta, Payload) ->
send(Meta#{domain => [topic_trace]}, Payload);
send(#{clientid := _ClientId} = Meta, Payload) ->
send(Meta#{domain => [clientid_trace]}, Payload);
send(Meta, Payload) ->
Map = jiffy:decode(Payload, [return_maps]),
Mfa = maps:get(<<"mfa">>, Map, <<"all">>),
Domain = maps:get(domain, Meta, [public_log]),
TraceTopic =
case maps:find(<<"topic">>, Map) of
{ok, TraceTopic1} ->
<<TraceTopic1/binary, "/">>;
_ -> <<"">>
end,
Topic =
case maps:find(<<"clientid">>, Map) of
{ok, ClientId1} ->
<<"$SYS/trace/", TraceTopic/binary, ClientId1/binary>>;
_ ->
Line = get_line(Map),
<<"$SYS/log/", Mfa/binary, "/", Line/binary>>
end,
NewMap = maps:with([<<"time">>, <<"pid">>, <<"msg">>, <<"mfa">>, <<"line">>, <<"level">>, <<"clientid">>, <<"topic">>, <<"peername">>], Map),
dgiot_mqtt:publish(Mfa, Topic, jiffy:encode(get_body(NewMap, Domain))).
get_body(#{<<"msg">> := Msg} = Map, Domain) when is_map(Msg) ->
Map#{<<"type">> => <<"json">>, <<"domain">> => Domain};
get_body(Map, Domain) ->
Map#{<<"type">> => <<"text">>, <<"domain">> => Domain}.
get_line(Map) ->
case maps:find(<<"line">>, Map) of
{ok, Line1} ->
dgiot_utils:to_binary(Line1);
_ -> <<"0">>
end.
%% emqx_logger:get_primary_log_level().
%% emqx_logger:set_log_level(debug).

View File

@ -31,10 +31,10 @@
-author("johnliu").
-include("dgiot_cron.hrl").
-export([init/0, init/1, init/2, destroy/1]).
-export([init/0, init/1, init/2, destroy/1,size/1]).
-export([insert/2, save/2, delete/1, match/1, match_object/2, match_limit/2, match_safe_do/3, match_object/3, match_delete/1, select/2, lookup/1, page/6, update_counter/2]).
-export([insert/3, delete/2, match/2, match/3, match_delete/2, match_limit/3, match_safe_do/4, lookup/2, search/2, search/3, dets_search/2, dets_search/3, loop/2, dets_loop/3, update_counter/3]).
-export([set_consumer/2, set_consumer/3, get_consumer/2, get/1, get/2, clear/1,search_data/0]).
-export([set_consumer/2, set_consumer/3, get_consumer/2, get/1, get/2, clear/1, search_data/0]).
-define(DB, dgiot_data).
-define(ETS, ets).
-define(DETS, dets).
@ -77,8 +77,6 @@ init(Name, Options) ->
Name
end.
-spec(destroy(ets:tab()) -> ok).
%% Delete the ets table.
destroy(Tab) ->
case ?ETS:info(Tab, name) of
undefined -> ok;
@ -87,6 +85,14 @@ destroy(Tab) ->
ok
end.
size(Name) ->
case ?ETS:info(Name) of
undefined -> -1;
Info ->
{size, Size} = lists:keyfind(size, 1, Info), Size
end.
insert(Key, Value) ->
insert(?DB, Key, Value).
insert(Name, Key, Value) ->

View File

@ -137,10 +137,9 @@ init([Name, Options]) ->
end,
put(last, erlang:system_time(second)),
load_from_dets(Name),
save_to_dets(Options),
%% save_to_dets(Options),
{ok, #state{name = Name, opts = Options}}.
handle_call({insert, Objects}, _From, State) ->
Reply = insert(State#state.name, Objects),
{reply, Reply, State};
@ -171,9 +170,9 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(save_to_dets, #state{opts = Options} = State) ->
handle_info(save_to_dets, #state{opts = _Options} = State) ->
check_save(State),
save_to_dets(Options),
%% save_to_dets(Options),
{noreply, State};
handle_info(save, State) ->

View File

@ -49,7 +49,7 @@ unsubscribe(Topic) ->
-spec(publish(Client :: binary(), Topic :: binary(), Payload :: binary())
-> ok | {error, Reason :: any()}).
publish(Client, Topic, Payload) ->
timer:sleep(5),
timer:sleep(10),
Msg = emqx_message:make(dgiot_utils:to_binary(Client), 0, Topic, Payload),
emqx:publish(Msg),
ok.

View File

@ -21,8 +21,8 @@
-behavior(dgiot_channelx).
-dgiot_data("ets").
-export([
init_ets/0
]).
init_ets/0,
send/2]).
-export([get_config/0, get_config/1]).
-export([start/0, start/2, init/3, handle_init/1, handle_event/3, handle_message/2, stop/3, handle_save/1]).
@ -129,11 +129,6 @@
}
}).
init_ets() ->
dgiot_data:init(?DGIOT_PARSE_ETS),
dgiot_data:init(?ROLE_USER_ETS),
dgiot_data:init(?ROLE_PARENT_ETS),
dgiot_data:init(?USER_ROLE_ETS).
start() ->
Cfg = #{
@ -155,7 +150,6 @@ start(Channel, Cfg) ->
%%
init(?TYPE, Channel, Cfg) ->
State = #state{channel = Channel, cfg = Cfg},
dgiot_data:init(?CACHE(Channel)),
Opts = [?CACHE(Channel), #{
auto_save => application:get_env(dgiot_parse, cache_auto_save, 3000),
size => application:get_env(dgiot_parse, cache_max_size, 50000),
@ -166,24 +160,22 @@ init(?TYPE, Channel, Cfg) ->
Specs = [
{dgiot_dcache, {dgiot_dcache, start_link, Opts}, permanent, 5000, worker, [dgiot_dcache]}
],
dgiot_parse:load_LogLevel(),
{ok, State, Specs}.
%%
handle_init(State) ->
emqx_hooks:add('logger.send', {?MODULE, send, []}),
{ok, State}.
handle_message(config, #state{cfg = Cfg} = State) ->
%% dgiot_parse_cache:save_to_cache(#{<<"method">> => <<"POST">>, <<"path">> => <<"/classes/Log">>,
%% <<"body">> => get_body(NewMap, [error_logger])});
{reply, {ok, Cfg}, State};
handle_message(_Message, State) ->
{ok, State}.
%% ,
handle_event(full, _From, #state{channel = _Channel}) ->
%% dgiot_dcache:save_to_disk(?CACHE(Channel)),
handle_event(full, _From, #state{channel = Channel}) ->
dgiot_dcache:save_to_disk(?CACHE(Channel)),
ok;
handle_event(_EventId, _Event, _State) ->
@ -203,3 +195,57 @@ get_config() ->
get_config(Channel) ->
dgiot_channelx:call(?TYPE, Channel, config).
init_ets() ->
dgiot_data:init(?DGIOT_PARSE_ETS),
dgiot_data:init(?ROLE_USER_ETS),
dgiot_data:init(?ROLE_PARENT_ETS),
dgiot_data:init(?USER_ROLE_ETS).
send(Meta, Payload) when is_list(Payload) ->
send(Meta, iolist_to_binary(Payload));
send(#{error_logger := _Error_logger, mfa := {M, F, A}} = _Meta, Payload) ->
Mfa = <<(atom_to_binary(M, utf8))/binary, $/, (atom_to_binary(F, utf8))/binary, $/, (integer_to_binary(A))/binary>>,
Topic = <<"logger_trace/", Mfa/binary>>,
dgiot_mqtt:publish(Mfa, Topic, Payload),
dgiot_parse_cache:save_to_cache(#{<<"method">> => <<"POST">>, <<"path">> => <<"/classes/Log">>, <<"body">> => Payload});
send(#{mfa := _MFA} = _Meta, Payload) ->
Map = jiffy:decode(Payload, [return_maps]),
Mfa = dgiot_utils:to_binary(maps:get(<<"mfa">>, Map, <<"all">>)),
TraceTopic =
case maps:find(<<"topic">>, Map) of
{ok, TraceTopic1} ->
<<TraceTopic1/binary, "/">>;
_ -> <<"">>
end,
Topic =
case maps:find(<<"clientid">>, Map) of
{ok, ClientId1} ->
<<"logger_trace/trace/", TraceTopic/binary, ClientId1/binary>>;
_ ->
Line =
case maps:find(<<"line">>, Map) of
{ok, Line1} ->
dgiot_utils:to_binary(Line1);
_ ->
<<"0">>
end,
<<"logger_trace/log/", Mfa/binary, "/", Line/binary>>
end,
dgiot_mqtt:publish(Mfa, Topic, Payload),
dgiot_parse_cache:save_to_cache(#{
<<"method">> => <<"POST">>,
<<"path">> => <<"/classes/Log">>,
<<"body">> => get_body(Map)});
send(_Meta, Payload) ->
dgiot_mqtt:publish(<<"all">>, <<"logger_trace/other">>, Payload),
ok.
get_body(#{<<"msg">> := Msg} = Map) when is_map(Msg) ->
Map#{<<"type">> => <<"json">>, <<"msg">> => jiffy:encode(Msg)};
get_body(Map) ->
Map#{<<"type">> => <<"text">>}.

View File

@ -64,7 +64,7 @@ format(Msg, Meta, Config) ->
end,
Data = maps:without([report_cb], Data0),
Payload = jiffy:encode(json_obj(Data, Config)),
%% emqx_hooks:run('logger.send',[Meta, Payload]),
emqx_hooks:run('logger.send',[Meta, Payload]),
Payload.
format_msg({string, Chardata}, Meta, Config) ->

View File

@ -67,12 +67,14 @@
trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
%% Do not trace '$SYS' publish
ignore;
trace(publish, #message{topic = <<"logger_trace", _/binary>>}) ->
%% Do not trace '$SYS' publish
ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) ->
emqx_logger:info(#{topic => Topic,
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} },
"PUBLISH to ~s: ~0p", [Topic, Payload]).
%% @doc Start to trace clientid or topic.
-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}).
start_trace(Who, all, LogFile) ->