dgiot/src/emqx_metrics.erl
2021-08-16 15:07:42 +08:00

579 lines
21 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_metrics).
-behavior(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include("emqx_mqtt.hrl").
-include("emqx.hrl").
-logger_header("[Metrics]").
-export([ start_link/0
, stop/0
]).
-export([ new/1
, new/2
, ensure/1
, ensure/2
, all/0
]).
-export([ val/1
, inc/1
, inc/2
, dec/1
, dec/2
, set/2
]).
-export([ trans/2
, trans/3
, commit/0
]).
%% Inc received/sent metrics
-export([ inc_msg/1
, inc_recv/1
, inc_sent/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% BACKW: v4.3.0
-export([ upgrade_retained_delayed_counter_type/0
]).
-export_type([metric_idx/0]).
-compile({inline, [inc/1, inc/2, dec/1, dec/2]}).
-compile({inline, [inc_recv/1, inc_sent/1]}).
-opaque(metric_idx() :: 1..1024).
-type(metric_name() :: atom() | string() | binary()).
-define(MAX_SIZE, 1024).
-define(RESERVED_IDX, 512).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
%% Bytes sent and received
-define(BYTES_METRICS,
[{counter, 'bytes.received'}, % Total bytes received
{counter, 'bytes.sent'} % Total bytes sent
]).
%% Packets sent and received
-define(PACKET_METRICS,
[{counter, 'packets.received'}, % All Packets received
{counter, 'packets.sent'}, % All Packets sent
{counter, 'packets.connect.received'}, % CONNECT Packets received
{counter, 'packets.connack.sent'}, % CONNACK Packets sent
{counter, 'packets.connack.error'}, % CONNACK error sent
{counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent
{counter, 'packets.publish.received'}, % PUBLISH packets received
{counter, 'packets.publish.sent'}, % PUBLISH packets sent
{counter, 'packets.publish.inuse'}, % PUBLISH packet_id inuse
{counter, 'packets.publish.error'}, % PUBLISH failed for error
{counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error
{counter, 'packets.publish.dropped'}, % PUBLISH(QoS2) packets dropped
{counter, 'packets.puback.received'}, % PUBACK packets received
{counter, 'packets.puback.sent'}, % PUBACK packets sent
{counter, 'packets.puback.inuse'}, % PUBACK packet_id inuse
{counter, 'packets.puback.missed'}, % PUBACK packets missed
{counter, 'packets.pubrec.received'}, % PUBREC packets received
{counter, 'packets.pubrec.sent'}, % PUBREC packets sent
{counter, 'packets.pubrec.inuse'}, % PUBREC packet_id inuse
{counter, 'packets.pubrec.missed'}, % PUBREC packets missed
{counter, 'packets.pubrel.received'}, % PUBREL packets received
{counter, 'packets.pubrel.sent'}, % PUBREL packets sent
{counter, 'packets.pubrel.missed'}, % PUBREL packets missed
{counter, 'packets.pubcomp.received'}, % PUBCOMP packets received
{counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent
{counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse
{counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed
{counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received
{counter, 'packets.subscribe.error'}, % SUBSCRIBE error
{counter, 'packets.subscribe.auth_error'}, % SUBSCRIBE failed for not auth
{counter, 'packets.suback.sent'}, % SUBACK packets sent
{counter, 'packets.unsubscribe.received'}, % UNSUBSCRIBE Packets received
{counter, 'packets.unsubscribe.error'}, % UNSUBSCRIBE error
{counter, 'packets.unsuback.sent'}, % UNSUBACK Packets sent
{counter, 'packets.pingreq.received'}, % PINGREQ packets received
{counter, 'packets.pingresp.sent'}, % PINGRESP Packets sent
{counter, 'packets.disconnect.received'}, % DISCONNECT Packets received
{counter, 'packets.disconnect.sent'}, % DISCONNECT Packets sent
{counter, 'packets.auth.received'}, % Auth Packets received
{counter, 'packets.auth.sent'} % Auth Packets sent
]).
%% Messages sent/received and pubsub
-define(MESSAGE_METRICS,
[{counter, 'messages.received'}, % All Messages received
{counter, 'messages.sent'}, % All Messages sent
{counter, 'messages.qos0.received'}, % QoS0 Messages received
{counter, 'messages.qos0.sent'}, % QoS0 Messages sent
{counter, 'messages.qos1.received'}, % QoS1 Messages received
{counter, 'messages.qos1.sent'}, % QoS1 Messages sent
{counter, 'messages.qos2.received'}, % QoS2 Messages received
{counter, 'messages.qos2.sent'}, % QoS2 Messages sent
%% PubSub Metrics
{counter, 'messages.publish'}, % Messages Publish
{counter, 'messages.dropped'}, % Messages dropped due to no subscribers
{counter, 'messages.dropped.expired'}, % QoS2 Messages expired
{counter, 'messages.dropped.no_subscribers'}, % Messages dropped
{counter, 'messages.forward'}, % Messages forward
{counter, 'messages.retained'}, % Messages retained
{counter, 'messages.delayed'}, % Messages delayed
{counter, 'messages.delivered'}, % Messages delivered
{counter, 'messages.acked'} % Messages acked
]).
%% Delivery metrics
-define(DELIVERY_METRICS,
[{counter, 'delivery.dropped'},
{counter, 'delivery.dropped.no_local'},
{counter, 'delivery.dropped.too_large'},
{counter, 'delivery.dropped.qos0_msg'},
{counter, 'delivery.dropped.queue_full'},
{counter, 'delivery.dropped.expired'}
]).
%% Client Lifecircle metrics
-define(CLIENT_METRICS,
[{counter, 'client.connect'},
{counter, 'client.connack'},
{counter, 'client.connected'},
{counter, 'client.authenticate'},
{counter, 'client.auth.anonymous'},
{counter, 'client.check_acl'},
{counter, 'client.subscribe'},
{counter, 'client.unsubscribe'},
{counter, 'client.disconnected'}
]).
%% Session Lifecircle metrics
-define(SESSION_METRICS,
[{counter, 'session.created'},
{counter, 'session.resumed'},
{counter, 'session.takeovered'},
{counter, 'session.discarded'},
{counter, 'session.terminated'}
]).
-record(state, {next_idx = 1}).
-record(metric, {name, type, idx}).
%% @doc Start the metrics server.
-spec(start_link() -> startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(stop() -> ok).
stop() -> gen_server:stop(?SERVER).
%% BACKW: v4.3.0
upgrade_retained_delayed_counter_type() ->
Ks = ['messages.retained', 'messages.delayed'],
gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity).
%%--------------------------------------------------------------------
%% Metrics API
%%--------------------------------------------------------------------
-spec(new(metric_name()) -> ok).
new(Name) ->
new(counter, Name).
-spec(new(gauge|counter, metric_name()) -> ok).
new(gauge, Name) ->
create(gauge, Name);
new(counter, Name) ->
create(counter, Name).
-spec(ensure(metric_name()) -> ok).
ensure(Name) ->
ensure(counter, Name).
-spec(ensure(gauge|counter, metric_name()) -> ok).
ensure(Type, Name) when Type =:= gauge; Type =:= counter ->
case ets:lookup(?TAB, Name) of
[] -> create(Type, Name);
_ -> ok
end.
%% @private
create(Type, Name) ->
case gen_server:call(?SERVER, {create, Type, Name}) of
{ok, _Idx} -> ok;
{error, Reason} -> error(Reason)
end.
%% @doc Get all metrics
-spec(all() -> [{metric_name(), non_neg_integer()}]).
all() ->
CRef = persistent_term:get(?MODULE),
[{Name, counters:get(CRef, Idx)}
|| #metric{name = Name, idx = Idx} <- ets:tab2list(?TAB)].
%% @doc Get metric value
-spec(val(metric_name()) -> maybe(non_neg_integer())).
val(Name) ->
case ets:lookup(?TAB, Name) of
[#metric{idx = Idx}] ->
CRef = persistent_term:get(?MODULE),
counters:get(CRef, Idx);
[] -> undefined
end.
%% @doc Increase counter
-spec(inc(metric_name()) -> ok).
inc(Name) ->
inc(Name, 1).
%% @doc Increase metric value
-spec(inc(metric_name(), pos_integer()) -> ok).
inc(Name, Value) ->
update_counter(Name, Value).
%% @doc Decrease metric value
-spec(dec(metric_name()) -> ok).
dec(Name) ->
dec(Name, 1).
%% @doc Decrease metric value
-spec(dec(metric_name(), pos_integer()) -> ok).
dec(Name, Value) ->
update_counter(Name, -Value).
%% @doc Set metric value
-spec(set(metric_name(), integer()) -> ok).
set(Name, Value) ->
CRef = persistent_term:get(?MODULE),
Idx = ets:lookup_element(?TAB, Name, 4),
counters:put(CRef, Idx, Value).
-spec(trans(inc | dec, metric_name()) -> ok).
trans(Op, Name) when Op =:= inc; Op =:= dec ->
trans(Op, Name, 1).
-spec(trans(inc | dec, metric_name(), pos_integer()) -> ok).
trans(inc, Name, Value) ->
cache(Name, Value);
trans(dec, Name, Value) ->
cache(Name, -Value).
-spec(cache(metric_name(), integer()) -> ok).
cache(Name, Value) ->
put('$metrics', case get('$metrics') of
undefined ->
#{Name => Value};
Metrics ->
maps:update_with(Name, fun(Cnt) -> Cnt + Value end, Value, Metrics)
end),
ok.
-spec(commit() -> ok).
commit() ->
case get('$metrics') of
undefined -> ok;
Metrics ->
_ = erase('$metrics'),
lists:foreach(fun update_counter/1, maps:to_list(Metrics))
end.
update_counter({Name, Value}) ->
update_counter(Name, Value).
update_counter(Name, Value) ->
CRef = persistent_term:get(?MODULE),
CIdx = case reserved_idx(Name) of
Idx when is_integer(Idx) -> Idx;
undefined ->
ets:lookup_element(?TAB, Name, 4)
end,
counters:add(CRef, CIdx, Value).
%%--------------------------------------------------------------------
%% Inc received/sent metrics
%%--------------------------------------------------------------------
-spec(inc_msg(emqx_types:massage()) -> ok).
inc_msg(Msg) ->
case Msg#message.qos of
0 -> inc('messages.qos0.received');
1 -> inc('messages.qos1.received');
2 -> inc('messages.qos2.received')
end,
inc('messages.received').
%% @doc Inc packets received.
-spec(inc_recv(emqx_types:packet()) -> ok).
inc_recv(Packet) ->
inc('packets.received'),
do_inc_recv(Packet).
do_inc_recv(?PACKET(?CONNECT)) ->
inc('packets.connect.received');
do_inc_recv(?PUBLISH_PACKET(QoS)) ->
inc('messages.received'),
case QoS of
?QOS_0 -> inc('messages.qos0.received');
?QOS_1 -> inc('messages.qos1.received');
?QOS_2 -> inc('messages.qos2.received');
_other -> ok
end,
inc('packets.publish.received');
do_inc_recv(?PACKET(?PUBACK)) ->
inc('packets.puback.received');
do_inc_recv(?PACKET(?PUBREC)) ->
inc('packets.pubrec.received');
do_inc_recv(?PACKET(?PUBREL)) ->
inc('packets.pubrel.received');
do_inc_recv(?PACKET(?PUBCOMP)) ->
inc('packets.pubcomp.received');
do_inc_recv(?PACKET(?SUBSCRIBE)) ->
inc('packets.subscribe.received');
do_inc_recv(?PACKET(?UNSUBSCRIBE)) ->
inc('packets.unsubscribe.received');
do_inc_recv(?PACKET(?PINGREQ)) ->
inc('packets.pingreq.received');
do_inc_recv(?PACKET(?DISCONNECT)) ->
inc('packets.disconnect.received');
do_inc_recv(?PACKET(?AUTH)) ->
inc('packets.auth.received');
do_inc_recv(_Packet) -> ok.
%% @doc Inc packets sent. Will not count $SYS PUBLISH.
-spec(inc_sent(emqx_types:packet()) -> ok).
inc_sent(?PUBLISH_PACKET(_QoS, <<"$SYS/", _/binary>>, _, _)) ->
ok;
inc_sent(Packet) ->
inc('packets.sent'),
do_inc_sent(Packet).
do_inc_sent(?CONNACK_PACKET(ReasonCode)) ->
(ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'),
(ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.connack.auth_error'),
(ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'),
inc('packets.connack.sent');
do_inc_sent(?PUBLISH_PACKET(QoS)) ->
inc('messages.sent'),
case QoS of
?QOS_0 -> inc('messages.qos0.sent');
?QOS_1 -> inc('messages.qos1.sent');
?QOS_2 -> inc('messages.qos2.sent');
_other -> ok
end,
inc('packets.publish.sent');
do_inc_sent(?PUBACK_PACKET(_PacketId, ReasonCode)) ->
(ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'),
(ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'),
inc('packets.puback.sent');
do_inc_sent(?PUBREC_PACKET(_PacketId, ReasonCode)) ->
(ReasonCode >= ?RC_UNSPECIFIED_ERROR) andalso inc('packets.publish.error'),
(ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.publish.auth_error'),
inc('packets.pubrec.sent');
do_inc_sent(?PACKET(?PUBREL)) ->
inc('packets.pubrel.sent');
do_inc_sent(?PACKET(?PUBCOMP)) ->
inc('packets.pubcomp.sent');
do_inc_sent(?PACKET(?SUBACK)) ->
inc('packets.suback.sent');
do_inc_sent(?PACKET(?UNSUBACK)) ->
inc('packets.unsuback.sent');
do_inc_sent(?PACKET(?PINGRESP)) ->
inc('packets.pingresp.sent');
do_inc_sent(?PACKET(?DISCONNECT)) ->
inc('packets.disconnect.sent');
do_inc_sent(?PACKET(?AUTH)) ->
inc('packets.auth.sent');
do_inc_sent(_Packet) -> ok.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
% Create counters array
CRef = counters:new(?MAX_SIZE, [write_concurrency]),
ok = persistent_term:put(?MODULE, CRef),
% Create index mapping table
ok = emqx_tables:new(?TAB, [{keypos, 2}, {read_concurrency, true}]),
Metrics = lists:append([?BYTES_METRICS,
?PACKET_METRICS,
?MESSAGE_METRICS,
?DELIVERY_METRICS,
?CLIENT_METRICS,
?SESSION_METRICS
]),
% Store reserved indices
ok = lists:foreach(fun({Type, Name}) ->
Idx = reserved_idx(Name),
Metric = #metric{name = Name, type = Type, idx = Idx},
true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0)
end, Metrics),
{ok, #state{next_idx = ?RESERVED_IDX + 1}, hibernate}.
handle_call({create, Type, Name}, _From, State = #state{next_idx = ?MAX_SIZE}) ->
?LOG(error, "Failed to create ~s:~s for index exceeded.", [Type, Name]),
{reply, {error, metric_index_exceeded}, State};
handle_call({create, Type, Name}, _From, State = #state{next_idx = NextIdx}) ->
case ets:lookup(?TAB, Name) of
[#metric{idx = Idx}] ->
?LOG(info, "~s already exists.", [Name]),
{reply, {ok, Idx}, State};
[] ->
Metric = #metric{name = Name, type = Type, idx = NextIdx},
true = ets:insert(?TAB, Metric),
{reply, {ok, NextIdx}, State#state{next_idx = NextIdx + 1}}
end;
handle_call({set_type_to_counter, Keys}, _From, State) ->
lists:foreach(
fun(K) ->
ets:update_element(?TAB, K, {#metric.type, counter})
end, Keys),
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
reserved_idx('bytes.received') -> 01;
reserved_idx('bytes.sent') -> 02;
%% Reserved indices of packet's metrics
reserved_idx('packets.received') -> 10;
reserved_idx('packets.sent') -> 11;
reserved_idx('packets.connect.received') -> 12;
reserved_idx('packets.connack.sent') -> 13;
reserved_idx('packets.connack.error') -> 14;
reserved_idx('packets.connack.auth_error') -> 15;
reserved_idx('packets.publish.received') -> 16;
reserved_idx('packets.publish.sent') -> 17;
reserved_idx('packets.publish.inuse') -> 18;
reserved_idx('packets.publish.error') -> 19;
reserved_idx('packets.publish.auth_error') -> 20;
reserved_idx('packets.puback.received') -> 21;
reserved_idx('packets.puback.sent') -> 22;
reserved_idx('packets.puback.inuse') -> 23;
reserved_idx('packets.puback.missed') -> 24;
reserved_idx('packets.pubrec.received') -> 25;
reserved_idx('packets.pubrec.sent') -> 26;
reserved_idx('packets.pubrec.inuse') -> 27;
reserved_idx('packets.pubrec.missed') -> 28;
reserved_idx('packets.pubrel.received') -> 29;
reserved_idx('packets.pubrel.sent') -> 30;
reserved_idx('packets.pubrel.missed') -> 31;
reserved_idx('packets.pubcomp.received') -> 32;
reserved_idx('packets.pubcomp.sent') -> 33;
reserved_idx('packets.pubcomp.inuse') -> 34;
reserved_idx('packets.pubcomp.missed') -> 35;
reserved_idx('packets.subscribe.received') -> 36;
reserved_idx('packets.subscribe.error') -> 37;
reserved_idx('packets.subscribe.auth_error') -> 38;
reserved_idx('packets.suback.sent') -> 39;
reserved_idx('packets.unsubscribe.received') -> 40;
reserved_idx('packets.unsubscribe.error') -> 41;
reserved_idx('packets.unsuback.sent') -> 42;
reserved_idx('packets.pingreq.received') -> 43;
reserved_idx('packets.pingresp.sent') -> 44;
reserved_idx('packets.disconnect.received') -> 45;
reserved_idx('packets.disconnect.sent') -> 46;
reserved_idx('packets.auth.received') -> 47;
reserved_idx('packets.auth.sent') -> 48;
reserved_idx('packets.publish.dropped') -> 49;
%% Reserved indices of message's metrics
reserved_idx('messages.received') -> 100;
reserved_idx('messages.sent') -> 101;
reserved_idx('messages.qos0.received') -> 102;
reserved_idx('messages.qos0.sent') -> 103;
reserved_idx('messages.qos1.received') -> 104;
reserved_idx('messages.qos1.sent') -> 105;
reserved_idx('messages.qos2.received') -> 106;
reserved_idx('messages.qos2.sent') -> 107;
reserved_idx('messages.publish') -> 108;
reserved_idx('messages.dropped') -> 109;
reserved_idx('messages.dropped.expired') -> 110;
reserved_idx('messages.dropped.no_subscribers') -> 111;
reserved_idx('messages.forward') -> 112;
reserved_idx('messages.retained') -> 113;
reserved_idx('messages.delayed') -> 114;
reserved_idx('messages.delivered') -> 115;
reserved_idx('messages.acked') -> 116;
reserved_idx('delivery.expired') -> 117;
reserved_idx('delivery.dropped') -> 118;
reserved_idx('delivery.dropped.no_local') -> 119;
reserved_idx('delivery.dropped.too_large') -> 120;
reserved_idx('delivery.dropped.qos0_msg') -> 121;
reserved_idx('delivery.dropped.queue_full') -> 122;
reserved_idx('delivery.dropped.expired') -> 123;
reserved_idx('client.connect') -> 200;
reserved_idx('client.connack') -> 201;
reserved_idx('client.connected') -> 202;
reserved_idx('client.authenticate') -> 203;
reserved_idx('client.enhanced_authenticate') -> 204;
reserved_idx('client.auth.anonymous') -> 205;
reserved_idx('client.check_acl') -> 206;
reserved_idx('client.subscribe') -> 207;
reserved_idx('client.unsubscribe') -> 208;
reserved_idx('client.disconnected') -> 209;
reserved_idx('session.created') -> 220;
reserved_idx('session.resumed') -> 221;
reserved_idx('session.takeovered') -> 222;
reserved_idx('session.discarded') -> 223;
reserved_idx('session.terminated') -> 224;
reserved_idx(_) -> undefined.