add dgiot_grpc

This commit is contained in:
lsxredrain 2021-08-23 19:36:50 +08:00
parent 115464c038
commit 3f0de65394
16 changed files with 295 additions and 17534 deletions

View File

@ -297,7 +297,7 @@ do_request(Populated, Req0, State = #state{
context = Context
}) ->
Args = [OperationID, Populated, Context, Req0],
?LOG(info,"~p ~p",[OperationID,dgiot_data:get(?DGIOT_SWAGGER,OperationID)]),
?LOG(debug,"~p ~p",[OperationID,dgiot_data:get(?DGIOT_SWAGGER,OperationID)]),
Result =
case IsMock of
true ->

View File

@ -102,7 +102,7 @@ class HookProvider(exhook_pb2_grpc.HookProviderServicer):
def OnMessagePublish(self, request, context):
nmsg = request.message
nmsg.payload = b"hardcode payload by exhook-svr-python111 :)"
print(nmsg.payload)
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
return reply

View File

@ -1,407 +0,0 @@
//------------------------------------------------------------------------------
// Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//------------------------------------------------------------------------------
syntax = "proto3";
option csharp_namespace = "dgiot.Exhook.V1";
option go_package = "dgiot.io/grpc/exhook";
option java_multiple_files = true;
option java_package = "io.dgiot.exhook";
option java_outer_classname = "DgiotExHookProto";
package dgiot.exhook.v1;
service HookProvider {
rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
}
//------------------------------------------------------------------------------
// Request & Response
//------------------------------------------------------------------------------
message ProviderLoadedRequest {
BrokerInfo broker = 1;
}
message LoadedResponse {
repeated HookSpec hooks = 1;
}
message ProviderUnloadedRequest { }
message ClientConnectRequest {
ConnInfo conninfo = 1;
// MQTT CONNECT packet's properties (MQTT v5.0)
//
// It should be empty on MQTT v3.1.1/v3.1 or others protocol
repeated Property props = 2;
}
message ClientConnackRequest {
ConnInfo conninfo = 1;
string result_code = 2;
repeated Property props = 3;
}
message ClientConnectedRequest {
ClientInfo clientinfo = 1;
}
message ClientDisconnectedRequest {
ClientInfo clientinfo = 1;
string reason = 2;
}
message ClientAuthenticateRequest {
ClientInfo clientinfo = 1;
bool result = 2;
}
message ClientCheckAclRequest {
ClientInfo clientinfo = 1;
enum AclReqType {
PUBLISH = 0;
SUBSCRIBE = 1;
}
AclReqType type = 2;
string topic = 3;
bool result = 4;
}
message ClientSubscribeRequest {
ClientInfo clientinfo = 1;
repeated Property props = 2;
repeated TopicFilter topic_filters = 3;
}
message ClientUnsubscribeRequest {
ClientInfo clientinfo = 1;
repeated Property props = 2;
repeated TopicFilter topic_filters = 3;
}
message SessionCreatedRequest {
ClientInfo clientinfo = 1;
}
message SessionSubscribedRequest {
ClientInfo clientinfo = 1;
string topic = 2;
SubOpts subopts = 3;
}
message SessionUnsubscribedRequest {
ClientInfo clientinfo = 1;
string topic = 2;
}
message SessionResumedRequest {
ClientInfo clientinfo = 1;
}
message SessionDiscardedRequest {
ClientInfo clientinfo = 1;
}
message SessionTakeoveredRequest {
ClientInfo clientinfo = 1;
}
message SessionTerminatedRequest {
ClientInfo clientinfo = 1;
string reason = 2;
}
message MessagePublishRequest {
Message message = 1;
}
message MessageDeliveredRequest {
ClientInfo clientinfo = 1;
Message message = 2;
}
message MessageDroppedRequest {
Message message = 1;
string reason = 2;
}
message MessageAckedRequest {
ClientInfo clientinfo = 1;
Message message = 2;
}
//------------------------------------------------------------------------------
// Basic data types
//------------------------------------------------------------------------------
message EmptySuccess { }
message ValuedResponse {
// The responsed value type
// - contiune: Use the responsed value and execute the next hook
// - ignore: Ignore the responsed value
// - stop_and_return: Use the responsed value and stop the chain executing
enum ResponsedType {
CONTINUE = 0;
IGNORE = 1;
STOP_AND_RETURN = 2;
}
ResponsedType type = 1;
oneof value {
// Boolean result, used on the 'client.authenticate', 'client.check_acl' hooks
bool bool_result = 3;
// Message result, used on the 'message.*' hooks
Message message = 4;
}
}
message BrokerInfo {
string version = 1;
string sysdescr = 2;
string uptime = 3;
string datetime = 4;
}
message HookSpec {
// The registered hooks name
//
// Available value:
// "client.connect", "client.connack"
// "client.connected", "client.disconnected"
// "client.authenticate", "client.check_acl"
// "client.subscribe", "client.unsubscribe"
//
// "session.created", "session.subscribed"
// "session.unsubscribed", "session.resumed"
// "session.discarded", "session.takeovered"
// "session.terminated"
//
// "message.publish", "message.delivered"
// "message.acked", "message.dropped"
string name = 1;
// The topic filters for message hooks
repeated string topics = 2;
}
message ConnInfo {
string node = 1;
string clientid = 2;
string username = 3;
string peerhost = 4;
uint32 sockport = 5;
string proto_name = 6;
string proto_ver = 7;
uint32 keepalive = 8;
}
message ClientInfo {
string node = 1;
string clientid = 2;
string username = 3;
string password = 4;
string peerhost = 5;
uint32 sockport = 6;
string protocol = 7;
string mountpoint = 8;
bool is_superuser = 9;
bool anonymous = 10;
// common name of client TLS cert
string cn = 11;
// subject of client TLS cert
string dn = 12;
}
message Message {
string node = 1;
string id = 2;
uint32 qos = 3;
string from = 4;
string topic = 5;
bytes payload = 6;
uint64 timestamp = 7;
}
message Property {
string name = 1;
string value = 2;
}
message TopicFilter {
string name = 1;
uint32 qos = 2;
}
message SubOpts {
// The QoS level
uint32 qos = 1;
// The group name for shared subscription
string share = 2;
// The Retain Handling option (MQTT v5.0)
//
// 0 = Send retained messages at the time of the subscribe
// 1 = Send retained messages at subscribe only if the subscription does
// not currently exist
// 2 = Do not send retained messages at the time of the subscribe
uint32 rh = 3;
// The Retain as Published option (MQTT v5.0)
//
// If 1, Application Messages forwarded using this subscription keep the
// RETAIN flag they were published with.
// If 0, Application Messages forwarded using this subscription have the
// RETAIN flag set to 0.
// Retained messages sent when the subscription is established have the RETAIN flag set to 1.
uint32 rap = 4;
// The No Local option (MQTT v5.0)
//
// If the value is 1, Application Messages MUST NOT be forwarded to a
// connection with a ClientID equal to the ClientID of the publishing
uint32 nl = 5;
}

View File

@ -1,324 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_exhook_handler).
-include("dgiot_grpc.hrl").
-include_lib("dgiot/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-export([ on_client_connect/2
, on_client_connack/3
, on_client_connected/2
, on_client_disconnected/3
, on_client_authenticate/2
, on_client_check_acl/4
, on_client_subscribe/3
, on_client_unsubscribe/3
]).
%% Session Lifecircle Hooks
-export([ on_session_created/2
, on_session_subscribed/3
, on_session_unsubscribed/3
, on_session_resumed/2
, on_session_discarded/2
, on_session_takeovered/2
, on_session_terminated/3
]).
-export([ on_message_publish/1
, on_message_dropped/3
, on_message_delivered/2
, on_message_acked/2
]).
%% Utils
-export([ message/1
, stringfy/1
, merge_responsed_bool/2
, merge_responsed_message/2
, assign_to_message/2
, clientinfo/1
]).
-import(emqx_exhook,
[ cast/2
, call_fold/3
]).
%%--------------------------------------------------------------------
%% Clients
%%--------------------------------------------------------------------
on_client_connect(ConnInfo, Props) ->
Req = #{conninfo => conninfo(ConnInfo),
props => properties(Props)
},
cast('client.connect', Req).
on_client_connack(ConnInfo, Rc, Props) ->
Req = #{conninfo => conninfo(ConnInfo),
result_code => stringfy(Rc),
props => properties(Props)},
cast('client.connack', Req).
on_client_connected(ClientInfo, _ConnInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('client.connected', Req).
on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo),
reason => stringfy(Reason)
},
cast('client.disconnected', Req).
on_client_authenticate(ClientInfo, AuthResult) ->
%% XXX: Bool is missing more information about the atom of the result
%% So, the `Req` has missed detailed info too.
%%
%% The return value of `call_fold` just a bool, that has missed
%% detailed info too.
%%
Bool = maps:get(auth_result, AuthResult, undefined) == success,
Req = #{clientinfo => clientinfo(ClientInfo),
result => Bool
},
case call_fold('client.authenticate', Req,
fun merge_responsed_bool/2) of
{StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
Result = case Result0 of true -> success; _ -> not_authorized end,
{StopOrOk, AuthResult#{auth_result => Result, anonymous => false}};
_ ->
{ok, AuthResult}
end.
on_client_check_acl(ClientInfo, PubSub, Topic, Result) ->
Bool = Result == allow,
Type = case PubSub of
publish -> 'PUBLISH';
subscribe -> 'SUBSCRIBE'
end,
Req = #{clientinfo => clientinfo(ClientInfo),
type => Type,
topic => Topic,
result => Bool
},
case call_fold('client.check_acl', Req,
fun merge_responsed_bool/2) of
{StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
NResult = case Result0 of true -> allow; _ -> deny end,
{StopOrOk, NResult};
_ -> {ok, Result}
end.
on_client_subscribe(ClientInfo, Props, TopicFilters) ->
Req = #{clientinfo => clientinfo(ClientInfo),
props => properties(Props),
topic_filters => topicfilters(TopicFilters)
},
cast('client.subscribe', Req).
on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
Req = #{clientinfo => clientinfo(ClientInfo),
props => properties(Props),
topic_filters => topicfilters(TopicFilters)
},
cast('client.unsubscribe', Req).
%%--------------------------------------------------------------------
%% Session
%%--------------------------------------------------------------------
on_session_created(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.created', Req).
on_session_subscribed(ClientInfo, Topic, SubOpts) ->
Req = #{clientinfo => clientinfo(ClientInfo),
topic => Topic,
subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
},
cast('session.subscribed', Req).
on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
Req = #{clientinfo => clientinfo(ClientInfo),
topic => Topic
},
cast('session.unsubscribed', Req).
on_session_resumed(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.resumed', Req).
on_session_discarded(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.discarded', Req).
on_session_takeovered(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.takeovered', Req).
on_session_terminated(ClientInfo, Reason, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo),
reason => stringfy(Reason)},
cast('session.terminated', Req).
%%--------------------------------------------------------------------
%% Message
%%--------------------------------------------------------------------
on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_publish(Message) ->
Req = #{message => message(Message)},
case call_fold('message.publish', Req,
fun emqx_exhook_handler:merge_responsed_message/2) of
{StopOrOk, #{message := NMessage}} ->
{StopOrOk, assign_to_message(NMessage, Message)};
_ ->
{ok, Message}
end.
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
ok;
on_message_dropped(Message, _By, Reason) ->
Req = #{message => message(Message),
reason => stringfy(Reason)
},
cast('message.dropped', Req).
on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_delivered(ClientInfo, Message) ->
Req = #{clientinfo => clientinfo(ClientInfo),
message => message(Message)
},
cast('message.delivered', Req).
on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_acked(ClientInfo, Message) ->
Req = #{clientinfo => clientinfo(ClientInfo),
message => message(Message)
},
cast('message.acked', Req).
%%--------------------------------------------------------------------
%% Types
properties(undefined) -> [];
properties(M) when is_map(M) ->
maps:fold(fun(K, V, Acc) ->
[#{name => stringfy(K),
value => stringfy(V)} | Acc]
end, [], M).
conninfo(_ConnInfo =
#{clientid := ClientId, username := Username, peername := {Peerhost, _},
sockname := {_, SockPort}, proto_name := ProtoName, proto_ver := ProtoVer,
keepalive := Keepalive}) ->
#{node => stringfy(node()),
clientid => ClientId,
username => maybe(Username),
peerhost => ntoa(Peerhost),
sockport => SockPort,
proto_name => ProtoName,
proto_ver => stringfy(ProtoVer),
keepalive => Keepalive}.
clientinfo(ClientInfo =
#{clientid := ClientId, username := Username, peerhost := PeerHost,
sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont}) ->
#{node => stringfy(node()),
clientid => ClientId,
username => maybe(Username),
password => maybe(maps:get(password, ClientInfo, undefined)),
peerhost => ntoa(PeerHost),
sockport => SockPort,
protocol => stringfy(Protocol),
mountpoint => maybe(Mountpoiont),
is_superuser => maps:get(is_superuser, ClientInfo, false),
anonymous => maps:get(anonymous, ClientInfo, true),
cn => maybe(maps:get(cn, ClientInfo, undefined)),
dn => maybe(maps:get(dn, ClientInfo, undefined))}.
message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) ->
#{node => stringfy(node()),
id => emqx_guid:to_hexstr(Id),
qos => Qos,
from => stringfy(From),
topic => Topic,
payload => Payload,
timestamp => Ts}.
assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) ->
Message#message{qos = Qos, topic = Topic, payload = Payload}.
topicfilters(Tfs) when is_list(Tfs) ->
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
ntoa(IP) ->
list_to_binary(inet_parse:ntoa(IP)).
maybe(undefined) -> <<>>;
maybe(B) -> B.
%% @private
stringfy(Term) when is_binary(Term) ->
Term;
stringfy(Term) when is_integer(Term) ->
integer_to_binary(Term);
stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8);
stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
%%--------------------------------------------------------------------
%% Acc funcs
%% see exhook.proto
merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
ignore;
merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
when is_boolean(NewBool) ->
NReq = Req#{result => NewBool},
case Type of
'CONTINUE' -> {ok, NReq};
'STOP_AND_RETURN' ->
?LOG(error,"NReq ~p",[NReq]),
{stop, NReq}
end;
merge_responsed_bool(_Req, Resp) ->
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
ignore.
merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
ignore;
merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
NReq = Req#{message => NMessage},
case Type of
'CONTINUE' -> {ok, NReq};
'STOP_AND_RETURN' ->
?LOG(error,"NReq ~p",[NReq]),
{stop, NReq}
end;
merge_responsed_message(_Req, Resp) ->
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
ignore.

View File

@ -1,325 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_exhook_server).
-include("dgiot_grpc.hrl").
-include_lib("dgiot/include/logger.hrl").
-define(CNTER, emqx_exhook_counter).
-define(PB_CLIENT_MOD, dgiot_exhook_v_1_hook_provider_client).
%% Load/Unload
-export([ load/2
, unload/1
]).
%% APIs
-export([call/3]).
%% Infos
-export([ name/1
, format/1
]).
-record(server, {
%% Server name (equal to grpc client channel name)
name :: server_name(),
%% The server started options
options :: list(),
%% gRPC channel pid
channel :: pid(),
%% Registered hook names and options
hookspec :: #{hookpoint() => map()},
%% Metrcis name prefix
prefix :: list()
}).
-type server_name() :: string().
-type server() :: #server{}.
-type hookpoint() :: 'client.connect'
| 'client.connack'
| 'client.connected'
| 'client.disconnected'
| 'client.authenticate'
| 'client.check_acl'
| 'client.subscribe'
| 'client.unsubscribe'
| 'session.created'
| 'session.subscribed'
| 'session.unsubscribed'
| 'session.resumed'
| 'session.discarded'
| 'session.takeovered'
| 'session.terminated'
| 'message.publish'
| 'message.delivered'
| 'message.acked'
| 'message.dropped'.
-export_type([server/0]).
-dialyzer({nowarn_function, [inc_metrics/2]}).
%%--------------------------------------------------------------------
%% Load/Unload APIs
%%--------------------------------------------------------------------
-spec load(atom(), list()) -> {ok, server()} | {error, term()} .
load(Name0, Opts0) ->
Name = to_list(Name0),
{SvrAddr, ClientOpts} = channel_opts(Opts0),
case emqx_exhook_sup:start_grpc_client_channel(
Name,
SvrAddr,
ClientOpts) of
{ok, _ChannPoolPid} ->
case do_init(Name) of
{ok, HookSpecs} ->
%% Reigster metrics
Prefix = lists:flatten(
io_lib:format("exhook.~s.", [Name])),
ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks
ensure_hooks(HookSpecs),
{ok, #server{name = Name,
options = Opts0,
channel = _ChannPoolPid,
hookspec = HookSpecs,
prefix = Prefix }};
{error, _} = E ->
emqx_exhook_sup:stop_grpc_client_channel(Name), E
end;
{error, _} = E -> E
end.
%% @private
to_list(Name) when is_atom(Name) ->
atom_to_list(Name);
to_list(Name) when is_binary(Name) ->
binary_to_list(Name);
to_list(Name) when is_list(Name) ->
Name.
%% @private
channel_opts(Opts) ->
Scheme = proplists:get_value(scheme, Opts),
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
SvrAddr = format_http_uri(Scheme, Host, Port),
ClientOpts = case Scheme of
https ->
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
#{gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}};
_ -> #{}
end,
{SvrAddr, ClientOpts}.
format_http_uri(Scheme, Host0, Port) ->
Host = case is_tuple(Host0) of
true -> inet:ntoa(Host0);
_ -> Host0
end,
lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
-spec unload(server()) -> ok.
unload(#server{name = Name, hookspec = HookSpecs}) ->
_ = do_deinit(Name),
_ = may_unload_hooks(HookSpecs),
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
ok.
do_deinit(Name) ->
_ = do_call(Name, 'on_provider_unloaded', #{}),
ok.
do_init(ChannName) ->
Req = #{broker => maps:from_list(emqx_sys:info())},
case do_call(ChannName, 'on_provider_loaded', Req) of
{ok, InitialResp} ->
try
{ok, resovle_hookspec(maps:get(hooks, InitialResp, []))}
catch _:Reason:Stk ->
?LOG(error, "try to init ~p failed, reason: ~p, stacktrace: ~0p",
[ChannName, Reason, Stk]),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% @private
resovle_hookspec(HookSpecs) when is_list(HookSpecs) ->
MessageHooks = message_hooks(),
AvailableHooks = available_hooks(),
lists:foldr(fun(HookSpec, Acc) ->
case maps:get(name, HookSpec, undefined) of
undefined -> Acc;
Name0 ->
Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end,
case lists:member(Name, AvailableHooks) of
true ->
case lists:member(Name, MessageHooks) of
true ->
Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}};
_ ->
Acc#{Name => #{}}
end;
_ -> error({unknown_hookpoint, Name})
end
end
end, #{}, HookSpecs).
ensure_metrics(Prefix, HookSpecs) ->
Keys = [list_to_atom(Prefix ++ atom_to_list(Hookpoint))
|| Hookpoint <- maps:keys(HookSpecs)],
lists:foreach(fun emqx_metrics:ensure/1, Keys).
ensure_hooks(HookSpecs) ->
lists:foreach(fun(Hookpoint) ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
false ->
?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]);
{Hookpoint, {M, F, A}} ->
emqx_hooks:put(Hookpoint, {M, F, A}),
ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
end
end, maps:keys(HookSpecs)).
may_unload_hooks(HookSpecs) ->
lists:foreach(fun(Hookpoint) ->
case ets:update_counter(?CNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
Cnt when Cnt =< 0 ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
{Hookpoint, {M, F, _A}} ->
emqx_hooks:del(Hookpoint, {M, F});
_ -> ok
end,
ets:delete(?CNTER, Hookpoint);
_ -> ok
end
end, maps:keys(HookSpecs)).
format(#server{name = Name, hookspec = Hooks}) ->
io_lib:format("name=~p, hooks=~0p", [Name, Hooks]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
name(#server{name = Name}) ->
Name.
-spec call(hookpoint(), map(), server())
-> ignore
| {ok, Resp :: term()}
| {error, term()}.
call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix}) ->
GrpcFunc = hk2func(Hookpoint),
case maps:get(Hookpoint, Hooks, undefined) of
undefined -> ignore;
Opts ->
NeedCall = case lists:member(Hookpoint, message_hooks()) of
false -> true;
_ ->
#{message := #{topic := Topic}} = Req,
match_topic_filter(Topic, maps:get(topics, Opts, []))
end,
case NeedCall of
false -> ignore;
_ ->
inc_metrics(Prefix, Hookpoint),
do_call(ChannName, GrpcFunc, Req)
end
end.
%% @private
inc_metrics(IncFun, Name) when is_function(IncFun) ->
%% BACKW: e4.2.0-e4.2.2
{env, [Prefix|_]} = erlang:fun_info(IncFun, env),
inc_metrics(Prefix, Name);
inc_metrics(Prefix, Name) when is_list(Prefix) ->
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
-compile({inline, [match_topic_filter/2]}).
match_topic_filter(_, []) ->
true;
match_topic_filter(TopicName, TopicFilter) ->
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
-spec do_call(string(), atom(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req) ->
Options = #{channel => ChannName},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
{ok, Resp, _Metadata} ->
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
{ok, Resp};
{error, {Code, Msg}, _Metadata} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]),
{error, {Code, Msg}};
{error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason]),
{error, Reason};
{'EXIT', {Reason, Stk}} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]),
{error, Reason}
end.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
-compile({inline, [hk2func/1]}).
hk2func('client.connect') -> 'on_client_connect';
hk2func('client.connack') -> 'on_client_connack';
hk2func('client.connected') -> 'on_client_connected';
hk2func('client.disconnected') -> 'on_client_disconnected';
hk2func('client.authenticate') -> 'on_client_authenticate';
hk2func('client.check_acl') -> 'on_client_check_acl';
hk2func('client.subscribe') -> 'on_client_subscribe';
hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
hk2func('session.created') -> 'on_session_created';
hk2func('session.subscribed') -> 'on_session_subscribed';
hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
hk2func('session.resumed') -> 'on_session_resumed';
hk2func('session.discarded') -> 'on_session_discarded';
hk2func('session.takeovered') -> 'on_session_takeovered';
hk2func('session.terminated') -> 'on_session_terminated';
hk2func('message.publish') -> 'on_message_publish';
hk2func('message.delivered') ->'on_message_delivered';
hk2func('message.acked') -> 'on_message_acked';
hk2func('message.dropped') ->'on_message_dropped'.
-compile({inline, [message_hooks/0]}).
message_hooks() ->
['message.publish', 'message.delivered',
'message.acked', 'message.dropped'].
-compile({inline, [available_hooks/0]}).
available_hooks() ->
['client.connect', 'client.connack', 'client.connected',
'client.disconnected', 'client.authenticate', 'client.check_acl',
'client.subscribe', 'client.unsubscribe',
'session.created', 'session.subscribed', 'session.unsubscribed',
'session.resumed', 'session.discarded', 'session.takeovered',
'session.terminated' | message_hooks()].

View File

@ -1,93 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Behaviour to implement for grpc service dgiot.exhook.v1.HookProvider.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated and should not be modified manually
-module(dgiot_exhook_v_1_hook_provider_bhvr).
-callback on_provider_loaded(exhook_pb:provider_loaded_request(), grpc:metadata())
-> {ok, exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_provider_unloaded(exhook_pb:provider_unloaded_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_connect(exhook_pb:client_connect_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_connack(exhook_pb:client_connack_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_connected(exhook_pb:client_connected_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_disconnected(exhook_pb:client_disconnected_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_authenticate(exhook_pb:client_authenticate_request(), grpc:metadata())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_check_acl(exhook_pb:client_check_acl_request(), grpc:metadata())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_subscribe(exhook_pb:client_subscribe_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_client_unsubscribe(exhook_pb:client_unsubscribe_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_created(exhook_pb:session_created_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_subscribed(exhook_pb:session_subscribed_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_unsubscribed(exhook_pb:session_unsubscribed_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_resumed(exhook_pb:session_resumed_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_discarded(exhook_pb:session_discarded_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_takeovered(exhook_pb:session_takeovered_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_session_terminated(exhook_pb:session_terminated_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_publish(exhook_pb:message_publish_request(), grpc:metadata())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_delivered(exhook_pb:message_delivered_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_dropped(exhook_pb:message_dropped_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
-callback on_message_acked(exhook_pb:message_acked_request(), grpc:metadata())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.

View File

@ -1,445 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Client module for grpc service dgiot.exhook.v1.HookProvider.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated and should not be modified manually
-module(dgiot_exhook_v_1_hook_provider_client).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("grpc/include/grpc.hrl").
-define(SERVICE, 'dgiot.exhook.v1.HookProvider').
-define(PROTO_MODULE, 'exhook_pb').
-define(MARSHAL(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end).
-define(UNMARSHAL(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end).
-define(DEF(Path, Req, Resp, MessageType),
#{path => Path,
service =>?SERVICE,
message_type => MessageType,
marshal => ?MARSHAL(Req),
unmarshal => ?UNMARSHAL(Resp)}).
-spec on_provider_loaded(exhook_pb:provider_loaded_request())
-> {ok, exhook_pb:loaded_response(), grpc:metadata()}
| {error, term()}.
on_provider_loaded(Req) ->
on_provider_loaded(Req, #{}, #{}).
-spec on_provider_loaded(exhook_pb:provider_loaded_request(), grpc:options())
-> {ok, exhook_pb:loaded_response(), grpc:metadata()}
| {error, term()}.
on_provider_loaded(Req, Options) ->
on_provider_loaded(Req, #{}, Options).
-spec on_provider_loaded(exhook_pb:provider_loaded_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:loaded_response(), grpc:metadata()}
| {error, term()}.
on_provider_loaded(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnProviderLoaded">>,
provider_loaded_request, loaded_response, <<"dgiot.exhook.v1.ProviderLoadedRequest">>),
Req, Metadata, Options).
-spec on_provider_unloaded(exhook_pb:provider_unloaded_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_provider_unloaded(Req) ->
on_provider_unloaded(Req, #{}, #{}).
-spec on_provider_unloaded(exhook_pb:provider_unloaded_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_provider_unloaded(Req, Options) ->
on_provider_unloaded(Req, #{}, Options).
-spec on_provider_unloaded(exhook_pb:provider_unloaded_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_provider_unloaded(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnProviderUnloaded">>,
provider_unloaded_request, empty_success, <<"dgiot.exhook.v1.ProviderUnloadedRequest">>),
Req, Metadata, Options).
-spec on_client_connect(exhook_pb:client_connect_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connect(Req) ->
on_client_connect(Req, #{}, #{}).
-spec on_client_connect(exhook_pb:client_connect_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connect(Req, Options) ->
on_client_connect(Req, #{}, Options).
-spec on_client_connect(exhook_pb:client_connect_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connect(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientConnect">>,
client_connect_request, empty_success, <<"dgiot.exhook.v1.ClientConnectRequest">>),
Req, Metadata, Options).
-spec on_client_connack(exhook_pb:client_connack_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connack(Req) ->
on_client_connack(Req, #{}, #{}).
-spec on_client_connack(exhook_pb:client_connack_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connack(Req, Options) ->
on_client_connack(Req, #{}, Options).
-spec on_client_connack(exhook_pb:client_connack_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connack(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientConnack">>,
client_connack_request, empty_success, <<"dgiot.exhook.v1.ClientConnackRequest">>),
Req, Metadata, Options).
-spec on_client_connected(exhook_pb:client_connected_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connected(Req) ->
on_client_connected(Req, #{}, #{}).
-spec on_client_connected(exhook_pb:client_connected_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connected(Req, Options) ->
on_client_connected(Req, #{}, Options).
-spec on_client_connected(exhook_pb:client_connected_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_connected(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientConnected">>,
client_connected_request, empty_success, <<"dgiot.exhook.v1.ClientConnectedRequest">>),
Req, Metadata, Options).
-spec on_client_disconnected(exhook_pb:client_disconnected_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_disconnected(Req) ->
on_client_disconnected(Req, #{}, #{}).
-spec on_client_disconnected(exhook_pb:client_disconnected_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_disconnected(Req, Options) ->
on_client_disconnected(Req, #{}, Options).
-spec on_client_disconnected(exhook_pb:client_disconnected_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_disconnected(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientDisconnected">>,
client_disconnected_request, empty_success, <<"dgiot.exhook.v1.ClientDisconnectedRequest">>),
Req, Metadata, Options).
-spec on_client_authenticate(exhook_pb:client_authenticate_request())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authenticate(Req) ->
on_client_authenticate(Req, #{}, #{}).
-spec on_client_authenticate(exhook_pb:client_authenticate_request(), grpc:options())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authenticate(Req, Options) ->
on_client_authenticate(Req, #{}, Options).
-spec on_client_authenticate(exhook_pb:client_authenticate_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_authenticate(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientAuthenticate">>,
client_authenticate_request, valued_response, <<"dgiot.exhook.v1.ClientAuthenticateRequest">>),
Req, Metadata, Options).
-spec on_client_check_acl(exhook_pb:client_check_acl_request())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_check_acl(Req) ->
on_client_check_acl(Req, #{}, #{}).
-spec on_client_check_acl(exhook_pb:client_check_acl_request(), grpc:options())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_check_acl(Req, Options) ->
on_client_check_acl(Req, #{}, Options).
-spec on_client_check_acl(exhook_pb:client_check_acl_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_client_check_acl(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientCheckAcl">>,
client_check_acl_request, valued_response, <<"dgiot.exhook.v1.ClientCheckAclRequest">>),
Req, Metadata, Options).
-spec on_client_subscribe(exhook_pb:client_subscribe_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_subscribe(Req) ->
on_client_subscribe(Req, #{}, #{}).
-spec on_client_subscribe(exhook_pb:client_subscribe_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_subscribe(Req, Options) ->
on_client_subscribe(Req, #{}, Options).
-spec on_client_subscribe(exhook_pb:client_subscribe_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_subscribe(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientSubscribe">>,
client_subscribe_request, empty_success, <<"dgiot.exhook.v1.ClientSubscribeRequest">>),
Req, Metadata, Options).
-spec on_client_unsubscribe(exhook_pb:client_unsubscribe_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_unsubscribe(Req) ->
on_client_unsubscribe(Req, #{}, #{}).
-spec on_client_unsubscribe(exhook_pb:client_unsubscribe_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_unsubscribe(Req, Options) ->
on_client_unsubscribe(Req, #{}, Options).
-spec on_client_unsubscribe(exhook_pb:client_unsubscribe_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_client_unsubscribe(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnClientUnsubscribe">>,
client_unsubscribe_request, empty_success, <<"dgiot.exhook.v1.ClientUnsubscribeRequest">>),
Req, Metadata, Options).
-spec on_session_created(exhook_pb:session_created_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_created(Req) ->
on_session_created(Req, #{}, #{}).
-spec on_session_created(exhook_pb:session_created_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_created(Req, Options) ->
on_session_created(Req, #{}, Options).
-spec on_session_created(exhook_pb:session_created_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_created(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnSessionCreated">>,
session_created_request, empty_success, <<"dgiot.exhook.v1.SessionCreatedRequest">>),
Req, Metadata, Options).
-spec on_session_subscribed(exhook_pb:session_subscribed_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_subscribed(Req) ->
on_session_subscribed(Req, #{}, #{}).
-spec on_session_subscribed(exhook_pb:session_subscribed_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_subscribed(Req, Options) ->
on_session_subscribed(Req, #{}, Options).
-spec on_session_subscribed(exhook_pb:session_subscribed_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_subscribed(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnSessionSubscribed">>,
session_subscribed_request, empty_success, <<"dgiot.exhook.v1.SessionSubscribedRequest">>),
Req, Metadata, Options).
-spec on_session_unsubscribed(exhook_pb:session_unsubscribed_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_unsubscribed(Req) ->
on_session_unsubscribed(Req, #{}, #{}).
-spec on_session_unsubscribed(exhook_pb:session_unsubscribed_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_unsubscribed(Req, Options) ->
on_session_unsubscribed(Req, #{}, Options).
-spec on_session_unsubscribed(exhook_pb:session_unsubscribed_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_unsubscribed(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnSessionUnsubscribed">>,
session_unsubscribed_request, empty_success, <<"dgiot.exhook.v1.SessionUnsubscribedRequest">>),
Req, Metadata, Options).
-spec on_session_resumed(exhook_pb:session_resumed_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_resumed(Req) ->
on_session_resumed(Req, #{}, #{}).
-spec on_session_resumed(exhook_pb:session_resumed_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_resumed(Req, Options) ->
on_session_resumed(Req, #{}, Options).
-spec on_session_resumed(exhook_pb:session_resumed_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_resumed(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnSessionResumed">>,
session_resumed_request, empty_success, <<"dgiot.exhook.v1.SessionResumedRequest">>),
Req, Metadata, Options).
-spec on_session_discarded(exhook_pb:session_discarded_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_discarded(Req) ->
on_session_discarded(Req, #{}, #{}).
-spec on_session_discarded(exhook_pb:session_discarded_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_discarded(Req, Options) ->
on_session_discarded(Req, #{}, Options).
-spec on_session_discarded(exhook_pb:session_discarded_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_discarded(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnSessionDiscarded">>,
session_discarded_request, empty_success, <<"dgiot.exhook.v1.SessionDiscardedRequest">>),
Req, Metadata, Options).
-spec on_session_takeovered(exhook_pb:session_takeovered_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_takeovered(Req) ->
on_session_takeovered(Req, #{}, #{}).
-spec on_session_takeovered(exhook_pb:session_takeovered_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_takeovered(Req, Options) ->
on_session_takeovered(Req, #{}, Options).
-spec on_session_takeovered(exhook_pb:session_takeovered_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_takeovered(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnSessionTakeovered">>,
session_takeovered_request, empty_success, <<"dgiot.exhook.v1.SessionTakeoveredRequest">>),
Req, Metadata, Options).
-spec on_session_terminated(exhook_pb:session_terminated_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_terminated(Req) ->
on_session_terminated(Req, #{}, #{}).
-spec on_session_terminated(exhook_pb:session_terminated_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_terminated(Req, Options) ->
on_session_terminated(Req, #{}, Options).
-spec on_session_terminated(exhook_pb:session_terminated_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_session_terminated(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnSessionTerminated">>,
session_terminated_request, empty_success, <<"dgiot.exhook.v1.SessionTerminatedRequest">>),
Req, Metadata, Options).
-spec on_message_publish(exhook_pb:message_publish_request())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_message_publish(Req) ->
on_message_publish(Req, #{}, #{}).
-spec on_message_publish(exhook_pb:message_publish_request(), grpc:options())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_message_publish(Req, Options) ->
on_message_publish(Req, #{}, Options).
-spec on_message_publish(exhook_pb:message_publish_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:valued_response(), grpc:metadata()}
| {error, term()}.
on_message_publish(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnMessagePublish">>,
message_publish_request, valued_response, <<"dgiot.exhook.v1.MessagePublishRequest">>),
Req, Metadata, Options).
-spec on_message_delivered(exhook_pb:message_delivered_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_delivered(Req) ->
on_message_delivered(Req, #{}, #{}).
-spec on_message_delivered(exhook_pb:message_delivered_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_delivered(Req, Options) ->
on_message_delivered(Req, #{}, Options).
-spec on_message_delivered(exhook_pb:message_delivered_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_delivered(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnMessageDelivered">>,
message_delivered_request, empty_success, <<"dgiot.exhook.v1.MessageDeliveredRequest">>),
Req, Metadata, Options).
-spec on_message_dropped(exhook_pb:message_dropped_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_dropped(Req) ->
on_message_dropped(Req, #{}, #{}).
-spec on_message_dropped(exhook_pb:message_dropped_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_dropped(Req, Options) ->
on_message_dropped(Req, #{}, Options).
-spec on_message_dropped(exhook_pb:message_dropped_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_dropped(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnMessageDropped">>,
message_dropped_request, empty_success, <<"dgiot.exhook.v1.MessageDroppedRequest">>),
Req, Metadata, Options).
-spec on_message_acked(exhook_pb:message_acked_request())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_acked(Req) ->
on_message_acked(Req, #{}, #{}).
-spec on_message_acked(exhook_pb:message_acked_request(), grpc:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_acked(Req, Options) ->
on_message_acked(Req, #{}, Options).
-spec on_message_acked(exhook_pb:message_acked_request(), grpc:metadata(), grpc_client:options())
-> {ok, exhook_pb:empty_success(), grpc:metadata()}
| {error, term()}.
on_message_acked(Req, Metadata, Options) ->
grpc_client:unary(?DEF(<<"/dgiot.exhook.v1.HookProvider/OnMessageAcked">>,
message_acked_request, empty_success, <<"dgiot.exhook.v1.MessageAckedRequest">>),
Req, Metadata, Options).

View File

@ -1,45 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc dgiot_grpc Protocol
-module(dgiot_grpc).
-include("dgiot_grpc.hrl").
-include_lib("dgiot/include/logger.hrl").
-export([ start_grpc_client_channel/3
, stop_grpc_client_channel/1
]).
%%--------------------------------------------------------------------
%% APIs
-spec start_grpc_client_channel(
string(),
uri_string:uri_string(),
grpc_client:options()) -> {ok, pid()} | {error, term()}.
start_grpc_client_channel(Name, SvrAddr, Options) ->
grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).
-spec stop_grpc_client_channel(string()) -> ok.
stop_grpc_client_channel(Name) ->
%% Avoid crash due to hot-upgrade had unloaded
%% grpc application
try
grpc_client_sup:stop_channel_pool(Name)
catch
_:_:_ ->
ok
end.

View File

@ -21,7 +21,7 @@
-include_lib("dgiot/include/dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
-include("dgiot_grpc.hrl").
-define(TYPE, <<"GRPC">>).
-define(TYPE, <<"GRPC_ClIENT">>).
%% API
-export([start/2]).
@ -42,16 +42,53 @@
}).
%%
-params(#{
<<"port">> => #{
<<"scheme">> => #{
order => 1,
type => integer,
type => string,
required => true,
default => 61888,
default => <<"http">>,
title => #{
zh => <<"端口"/utf8>>
zh => <<"通讯协议"/utf8>>
},
description => #{
zh => <<"侦听端口"/utf8>>
zh => <<"通讯协议"/utf8>>
}
},
<<"host">> => #{
order => 2,
type => string,
required => true,
default => <<"127.0.0.1">>,
title => #{
zh => <<"IP"/utf8>>
},
description => #{
zh => <<"grpc服务器IP"/utf8>>
}
},
<<"port">> => #{
order => 3,
type => integer,
required => true,
default => 7000,
title => #{
zh => <<"port"/utf8>>
},
description => #{
zh => <<"grpc服务器端口"/utf8>>
}
},
<<"model">> => #{
order => 2,
type => enum,
required => false,
default => <<"both"/utf8>>,
enum => [<<"both">>, <<"client">>, <<"server">>],
title => #{
zh => <<"启动模式"/utf8>>
},
description => #{
zh => <<"启动模式:both|client|server"/utf8>>
}
},
<<"ico">> => #{
@ -75,31 +112,30 @@ start(ChannelId, ChannelArgs) ->
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
%%
init(?TYPE, ChannelId, #{
<<"port">> := _Port,
<<"product">> := _Products}) ->
init(?TYPE, ChannelId, Env) ->
State = #state{
id = ChannelId
id = ChannelId,
env = Env
},
%% %% grpc server
%% Services = #{protos => [dgiot_exhook_pb],
%% services => #{
%% 'dgiot.exhook.v1.HookProvider' => exhook_svr
%% }
%% },
%% Options = [],
%% {ok, Spec} = grpc:start_server(exhook_svr, 9000, Services, Options),
%% io:format("Start service exhook_svr on 9000 successfully!~n", []),
%% %% magic line
%% _ = exhook_svr:module_info(),
%% %% counter
%% ets:new(exhook_stats, [public, named_table, {write_concurrency, true}]),
{ok, State, []};
init(?TYPE, _ChannelId, _Args) ->
{ok, #{}, #{}}.
Port = maps:get(<<"port">>, Env),
Scheme = maps:get(<<"scheme">>, Env),
Host = dgiot_utils:to_list(maps:get(<<"host">>, Env)),
Opts0 = [{scheme, dgiot_utils:to_atom(Scheme)}, {host, Host}, {port, Port}],
case maps:get(<<"model">>, Env) of
<<"both">> ->
dgiot_grpc_server:start(ChannelId, Port, []),
application:ensure_all_started(emqx_hooks),
emqx_exhook:enable(ChannelId, Opts0);
<<"client">> ->
application:ensure_all_started(emqx_hooks),
emqx_exhook:enable(ChannelId, Opts0);
<<"server">> ->
dgiot_grpc_server:start(ChannelId, Port, [])
end,
{ok, State, []}.
handle_init(State) ->
erlang:send_after(5000, self(), start),
{ok, State}.
%% ,
@ -120,6 +156,9 @@ handle_event(EventId, Event, State) ->
?LOG(error, "EventId ~p Event ~p", [EventId, Event]),
{ok, State}.
handle_message(start, State) ->
{ok, State};
% SELECT clientid, payload, topic FROM "meter"
% SELECT clientid, disconnected_at FROM "$events/client_disconnected" WHERE username = 'dgiot'
% SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'dgiot'
@ -134,5 +173,6 @@ handle_message({rule, #{clientid := DevAddr, payload := Payload, topic := _Topic
handle_message(_Message, State) ->
{ok, State}.
stop(_ChannelType, _ChannelId, _State) ->
stop(_ChannelType, ChannelId, _State) ->
emqx_exhook:disable(ChannelId),
ok.

View File

@ -0,0 +1,111 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc dgiot_grpc Protocol
-module(dgiot_grpc_client).
-include("dgiot_grpc.hrl").
-include_lib("dgiot/include/logger.hrl").
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
-export([ start_client_sup/2
]).
%% APIs
-export([
do_call/3
, do_init/1
, do_deinit/1]).
%%--------------------------------------------------------------------
%% APIs
%% for creating a Child spec to hang on another Application supervisor.
start_client_sup(ChannleId, Opts0) ->
%% Opts0 = [{scheme,http},{host,"127.0.0.1"},{port,9000}]
Name = dgiot_utils:to_list(ChannleId),
{URL, ClientOpts} = channel_opts(Opts0),
_ = application:ensure_all_started(gproc),
case uri_string:parse(URL) of
#{scheme := Scheme, host := Host, port := Port} ->
Server = {Scheme, Host, Port},
#{id => Name,
start => {grpc_client_sup, start_link, [Name, Server, ClientOpts]},
restart => transient,
shutdown => infinity,
type => supervisor,
modules => [grpc_client_sup]};
{error, _Reason, _} -> []
end.
%% @private
channel_opts( #{
<<"scheme">> := Scheme,
<<"port">> := Port,
<<"host">> := Host}) ->
NewScheme = dgiot_utils:to_atom(Scheme),
SvrAddr = format_http_uri(NewScheme, Host, Port),
ClientOpts = case NewScheme of
https ->
%% SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
%% #{gun_opts =>
%% #{transport => ssl,
%% transport_opts => SslOpts}};
#{};
_ -> #{}
end,
{SvrAddr, ClientOpts}.
format_http_uri(Scheme, Host0, Port) ->
Host = case is_tuple(Host0) of
true -> inet:ntoa(Host0);
_ -> dgiot_utils:to_list(Host0)
end,
lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
do_deinit(Name) ->
_ = do_call(Name, 'on_provider_unloaded', #{}),
ok.
do_init(ChannName) ->
Req = #{broker => maps:from_list(emqx_sys:info())},
case do_call(ChannName, 'on_provider_loaded', Req) of
{ok, _InitialResp} ->
ok;
{error, Reason} ->
{error, Reason}
end.
-spec do_call(string(), atom(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req) ->
Options = #{channel => ChannName},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
{ok, Resp, _Metadata} ->
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
{ok, Resp};
{error, {Code, Msg}, _Metadata} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]),
{error, {Code, Msg}};
{error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason]),
{error, Reason};
{'EXIT', {Reason, Stk}} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]),
{error, Reason}
end.

View File

@ -14,201 +14,209 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(exhook_svr).
-module(dgiot_grpc_server).
-behavior(dgiot_exhook_v_1_hook_provider_bhvr).
-behavior(emqx_exhook_v_1_hook_provider_bhvr).
-export([start/3]).
%% gRPC server HookProvider callbacks
-export([ on_provider_loaded/2
, on_provider_unloaded/2
, on_client_connect/2
, on_client_connack/2
, on_client_connected/2
, on_client_disconnected/2
, on_client_authenticate/2
, on_client_check_acl/2
, on_client_subscribe/2
, on_client_unsubscribe/2
, on_session_created/2
, on_session_subscribed/2
, on_session_unsubscribed/2
, on_session_resumed/2
, on_session_discarded/2
, on_session_takeovered/2
, on_session_terminated/2
, on_message_publish/2
, on_message_delivered/2
, on_message_dropped/2
, on_message_acked/2
]).
-export([on_provider_loaded/2
, on_provider_unloaded/2
, on_client_connect/2
, on_client_connack/2
, on_client_connected/2
, on_client_disconnected/2
, on_client_authenticate/2
, on_client_check_acl/2
, on_client_subscribe/2
, on_client_unsubscribe/2
, on_session_created/2
, on_session_subscribed/2
, on_session_unsubscribed/2
, on_session_resumed/2
, on_session_discarded/2
, on_session_takeovered/2
, on_session_terminated/2
, on_message_publish/2
, on_message_delivered/2
, on_message_dropped/2
, on_message_acked/2
]).
-define(PORT, 9000).
-define(NAME, ?MODULE).
start(Name, Port, Options) ->
%% grpc server
Services = #{protos => [emqx_exhook_pb],
services => #{
'emqx.exhook.v1.HookProvider' => dgiot_grpc_server
}
},
%% Options = [],
{ok, _Pid} = grpc:start_server(dgiot_utils:to_atom(Name), Port, Services, Options).
-spec on_provider_loaded(dgiot_exhook_pb:provider_loaded_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_provider_loaded(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{hooks => [
#{name => <<"client.connect">>},
#{name => <<"client.connack">>},
#{name => <<"client.connected">>},
#{name => <<"client.disconnected">>},
#{name => <<"client.authenticate">>},
#{name => <<"client.check_acl">>},
#{name => <<"client.subscribe">>},
#{name => <<"client.unsubscribe">>},
#{name => <<"session.created">>},
#{name => <<"session.subscribed">>},
#{name => <<"session.unsubscribed">>},
#{name => <<"session.resumed">>},
#{name => <<"session.discarded">>},
#{name => <<"session.takeovered">>},
#{name => <<"session.terminated">>},
#{name => <<"message.publish">>},
#{name => <<"message.delivered">>},
#{name => <<"message.acked">>}
%#{name => <<"message.dropped">>}
]}, Md}.
#{name => <<"client.connect">>},
#{name => <<"client.connack">>},
#{name => <<"client.connected">>},
#{name => <<"client.disconnected">>},
#{name => <<"client.authenticate">>},
#{name => <<"client.check_acl">>},
#{name => <<"client.subscribe">>},
#{name => <<"client.unsubscribe">>},
#{name => <<"session.created">>},
#{name => <<"session.subscribed">>},
#{name => <<"session.unsubscribed">>},
#{name => <<"session.resumed">>},
#{name => <<"session.discarded">>},
#{name => <<"session.takeovered">>},
#{name => <<"session.terminated">>},
#{name => <<"message.publish">>},
#{name => <<"message.delivered">>},
#{name => <<"message.acked">>}
%#{name => <<"message.dropped">>}
]}, Md}.
-spec on_provider_unloaded(dgiot_exhook_pb:provider_unloaded_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_provider_unloaded(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_client_connect(dgiot_exhook_pb:client_connect_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connect(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_client_connack(dgiot_exhook_pb:client_connack_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connack(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_client_connected(dgiot_exhook_pb:client_connected_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connected(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_client_disconnected(dgiot_exhook_pb:client_disconnected_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_disconnected(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_client_authenticate(dgiot_exhook_pb:client_authenticate_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_authenticate(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{type => 'STOP_AND_RETURN', value => {bool_result, true}}, Md}.
-spec on_client_check_acl(dgiot_exhook_pb:client_check_acl_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_check_acl(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{type => 'STOP_AND_RETURN', value => {bool_result, true}}, Md}.
-spec on_client_subscribe(dgiot_exhook_pb:client_subscribe_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_subscribe(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_client_unsubscribe(dgiot_exhook_pb:client_unsubscribe_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_unsubscribe(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_session_created(dgiot_exhook_pb:session_created_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_created(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_session_subscribed(dgiot_exhook_pb:session_subscribed_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_subscribed(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_session_unsubscribed(dgiot_exhook_pb:session_unsubscribed_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_unsubscribed(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_session_resumed(dgiot_exhook_pb:session_resumed_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_resumed(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_session_discarded(dgiot_exhook_pb:session_discarded_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_discarded(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_session_takeovered(dgiot_exhook_pb:session_takeovered_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_takeovered(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_session_terminated(dgiot_exhook_pb:session_terminated_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_terminated(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_message_publish(dgiot_exhook_pb:message_publish_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_publish(_Req = #{message := Msg}, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
NMsg = Msg#{payload => <<"hardcode payload by exhook-svr-erlang :)">>},
{ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}.
-spec on_message_delivered(dgiot_exhook_pb:message_delivered_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_delivered(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_message_dropped(dgiot_exhook_pb:message_dropped_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_dropped(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.
-spec on_message_acked(dgiot_exhook_pb:message_acked_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_acked(_Req, Md) ->
ets:update_counter(exhook_stats, ?FUNCTION_NAME, {2, 1}, {?FUNCTION_NAME, 0}),
{ok, #{}, Md}.

File diff suppressed because it is too large Load Diff

View File

@ -24,17 +24,20 @@
-define(CNTER, emqx_exhook_counter).
-export([ start/2
, stop/1
, prep_stop/1
]).
-export([start/2
, stop/1
, prep_stop/1
]).
%% Internal export
-export([ load_server/2
, unload_server/1
, unload_exhooks/0
, init_hooks_cnter/0
]).
-export([
load_all_servers/0
, load_server/2
, unload_all_servers/0
, unload_server/1
, unload_exhooks/0
, init_hooks_cnter/0
]).
%%--------------------------------------------------------------------
%% Application callbacks
@ -47,7 +50,7 @@ start(_StartType, _StartArgs) ->
init_hooks_cnter(),
%% Load all dirvers
load_all_servers(),
%% load_all_servers(),
%% Register CLI
emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []),
@ -69,7 +72,7 @@ stop(_State) ->
load_all_servers() ->
lists:foreach(fun({Name, Options}) ->
load_server(Name, Options)
end, application:get_env(?APP, servers, [])).
end, application:get_env(?APP, servers, [])).
unload_all_servers() ->
emqx_exhook:disable_all().
@ -82,7 +85,7 @@ unload_server(Name) ->
unload_exhooks() ->
[emqx:unhook(Name, {M, F}) ||
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
init_hooks_cnter() ->
try

View File

@ -6,6 +6,7 @@
{emqx_telemetry, {{enable_plugin_emqx_telemetry}}}.
{emqx_rule_engine, {{enable_plugin_emqx_rule_engine}}}.
{emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}.
{emqx_exhook, {{enable_plugin_emqx_exhook}}}.
{dgiot, {{enable_plugin_dgiot}}}.
{dgiot_parse, {{enable_plugin_dgiot_parse}}}.
{dgiot_api, {{enable_plugin_dgiot_api}}}.

View File

@ -199,6 +199,7 @@ overlay_vars_rel(RelType) ->
, {enable_plugin_emqx_recon, true}
, {enable_plugin_emqx_retainer, true}
, {enable_plugin_emqx_telemetry, true}
, {enable_plugin_emqx_exhook, true}
, {enable_plugin_dgiot, true}
, {enable_plugin_dgiot_bridge, true}
, {enable_plugin_dgiot_parse, true}
@ -313,6 +314,7 @@ relx_plugin_apps(ReleaseType) ->
, emqx_recon
, emqx_rule_engine
, emqx_sasl
, emqx_exhook
]
++ [emqx_telemetry || not is_enterprise()]
++ relx_plugin_apps_per_rel(ReleaseType)