feat: add status

This commit is contained in:
jhonliu 2023-08-09 19:45:23 +08:00
parent 4d59ebb5d8
commit 37fa2f0627
5 changed files with 70 additions and 3 deletions

View File

@ -0,0 +1,56 @@
[
{
"name": "mqttc_online",
"help": "mqtt客户端在线数据统计",
"type": "gauge",
"labels": []
},
{
"name": "mqttc_recv",
"help": "mqtt客户端收包数据统计",
"type": "gauge",
"labels": []
},
{
"name": "mqttc_send",
"help": "mqtt客户端发包数据统计",
"type": "gauge",
"labels": []
},
{
"name": "tcpc_online",
"help": "tcp客户端在线数据统计",
"type": "gauge",
"labels": []
},
{
"name": "tcpc_recv",
"help": "tcp客户端收包数据统计",
"type": "gauge",
"labels": []
},
{
"name": "tcpc_send",
"help": "tcp客户端发包数据统计",
"type": "gauge",
"labels": []
},
{
"name": "tcp_online",
"help": "tcp服务端在线数据统计",
"type": "gauge",
"labels": []
},
{
"name": "tcp_recv",
"help": "tcp服务端收包数据统计",
"type": "gauge",
"labels": []
},
{
"name": "tcp_send",
"help": "tcp服务端发包数据统计",
"type": "gauge",
"labels": []
}
]

View File

@ -33,6 +33,7 @@ start(_StartType, _StartArgs) ->
dgiot_cron_timer:start(),
start_plugin(Sup),
dgiot_pushgateway:start_link(),
dgiot_metrics:start(dgiot),
{ok, Sup}.
stop(_State) ->

View File

@ -63,7 +63,7 @@ init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"mod">> := Mod,
{ok, NewDclient} ->
process_flag(trap_exit, true),
rand:seed(exs1024),
Time = erlang:round(rand:uniform() * 4 + 1) * 1000,
Time = erlang:round(rand:uniform() * 200 + 1) * 20,
erlang:send_after(Time, self(), connect),
{ok, NewDclient, hibernate};
{stop, Reason} ->
@ -97,6 +97,7 @@ handle_cast(_Request, Dclient) ->
handle_info(connect, #dclient{userdata = #connect_state{options = Options, mod = Mod} = ConnectStat} = Dclient) ->
case connect(Options) of
{ok, ConnPid, Props} ->
dgiot_metrics:inc(dgiot, <<"mqttc_online">>, 1),
case Mod:handle_info({connect, ConnPid}, Dclient#dclient{userdata = ConnectStat#connect_state{props = Props, socket = ConnPid}}) of
{noreply, NewDclient} ->
{noreply, NewDclient};
@ -114,6 +115,7 @@ handle_info(connect, #dclient{userdata = #connect_state{options = Options, mod =
end;
handle_info({publish, #{topic := Topic, payload := Payload} = _Message}, #dclient{userdata = #connect_state{mod = Mod}} = Dclient) ->
dgiot_metrics:dec(dgiot, <<"mqttc_recv">>, 1),
case Mod:handle_info({publish, Topic, Payload}, Dclient) of
{noreply, NewDclient} ->
{noreply, NewDclient};
@ -122,6 +124,7 @@ handle_info({publish, #{topic := Topic, payload := Payload} = _Message}, #dclien
end;
handle_info({disconnected, shutdown, tcp_closed}, Dclient) ->
dgiot_metrics:dec(dgiot, <<"mqttc_online">>, 1),
{noreply, Dclient#dclient{userdata = #connect_state{socket = disconnect}}};
handle_info({'EXIT', _Conn, Reason}, #dclient{userdata = #connect_state{mod = Mod} = ConnectState} = Dclient) ->
@ -136,6 +139,7 @@ handle_info({'EXIT', _Conn, Reason}, #dclient{userdata = #connect_state{mod = Mo
end,
{noreply, NewDclient#dclient{userdata = ConnectState#connect_state{socket = disconnect}}};
{stop, Reason, NewDclient} ->
dgiot_metrics:dec(dgiot, <<"mqttc_online">>, 1),
{stop, Reason, NewDclient#dclient{userdata = ConnectState#connect_state{socket = disconnect}}}
end;

View File

@ -96,6 +96,7 @@ handle_info(do_connect, #dclient{userdata = #connect_state{count = Count, freq =
{noreply, NewDclient, hibernate};
handle_info({connection_ready, Socket}, #dclient{userdata = #connect_state{mod = Mod} = UserData} = Dclient) ->
dgiot_metrics:inc(dgiot, <<"tcpc_online">>, 1),
case Mod:handle_info(connection_ready, Dclient#dclient{userdata = UserData#connect_state{socket = Socket}}) of
{noreply, NewDclient} ->
inet:setopts(Socket, [{active, once}]),
@ -110,11 +111,13 @@ handle_info({send, _PayLoad}, #dclient{userdata = #connect_state{socket = undefi
handle_info({send, PayLoad}, #dclient{userdata = #connect_state{host = _Ip, port = _Port, socket = Socket}} = Dclient) ->
%% io:format("~s ~p ~p send to from ~p:~p : ~p ~n", [?FILE, ?LINE, self(), _Ip, _Port, dgiot_utils:to_hex(PayLoad)]),
gen_tcp:send(Socket, PayLoad),
dgiot_metrics:inc(dgiot, <<"tcpc_send">>, 1),
{noreply, Dclient, hibernate};
%% tcp server发送过来的报文
handle_info({tcp, Socket, Binary}, #dclient{userdata = #connect_state{host = _Ip, port = _Port, mod = Mod}} = Dclient) ->
%% io:format("~s ~p recv from ~p:~p ~p ~n", [?FILE, ?LINE, _Ip, _Port, dgiot_utils:to_hex(Binary)]),
dgiot_metrics:inc(dgiot, <<"tcpc_recv">>, 1),
NewBin =
case binary:referenced_byte_size(Binary) of
Large when Large > 2 * byte_size(Binary) ->
@ -135,6 +138,7 @@ handle_info({tcp_error, _Socket, _Reason}, Dclient) ->
{noreply, Dclient, hibernate};
handle_info({tcp_closed, Socket}, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod}} = Dclient) ->
dgiot_metrics:dec(dgiot, <<"tcpc_online">>, 1),
gen_tcp:close(Socket),
case Mod:handle_info(tcp_closed, Dclient) of
{noreply, NewDclient} ->

View File

@ -55,6 +55,7 @@ start_link(Transport, Sock, Mod, Opts, State) ->
init(Mod, Transport, Opts, Sock0, State) ->
case Transport:wait(Sock0) of
{ok, Sock} ->
dgiot_metrics:inc(dgiot, <<"tcp_online">>, 1),
ChildState = #tcp{socket = Sock, register = false, transport = Transport, state = State},
case Mod:init(ChildState) of
{ok, NewChildState} ->
@ -103,7 +104,7 @@ handle_info({tcp_passive, _Sock}, State) ->
%% add register function
handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{register = false, buff = Buff, socket = Sock} = ChildState} = State) ->
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1),
dgiot_metrics:inc(dgiot, <<"tcp_recv">>, 1),
Binary = iolist_to_binary(Data),
NewBin =
case binary:referenced_byte_size(Binary) of
@ -129,7 +130,7 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{register = false,
end;
handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socket = Sock} = ChildState} = State) ->
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1),
dgiot_metrics:inc(dgiot, <<"tcp_recv">>, 1),
Binary = iolist_to_binary(Data),
NewBin =
case binary:referenced_byte_size(Binary) of
@ -174,6 +175,7 @@ handle_info({tcp_error, _Sock, Reason}, #state{child = ChildState} = State) ->
handle_info({tcp_closed, Sock}, #state{mod = Mod, child = #tcp{socket = Sock} = ChildState} = State) ->
write_log(ChildState#tcp.log, <<"ERROR">>, <<"tcp_closed">>),
dgiot_metrics:dec(dgiot, <<"tcp_online">>, 1),
?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]),
case Mod:handle_info(tcp_closed, ChildState) of
{noreply, NewChild} ->