feat: add concentrator

This commit is contained in:
U-JOHNLIU\jonhl 2022-01-25 18:39:02 +08:00
parent f7c2fb776a
commit 96d4e3400f
14 changed files with 381 additions and 47 deletions

View File

@ -15,6 +15,45 @@
%%--------------------------------------------------------------------
-author("johnliu").
-ifndef(DGIOT_MQTT_HRL).
-define(DGIOT_MQTT_HRL, true).
%%--------------------------------------------------------------------
%% Common
%%--------------------------------------------------------------------
-define(Otherwise, true).
%%--------------------------------------------------------------------
%% Banner
%%--------------------------------------------------------------------
-define(PROTOCOL_VERSION, "MQTT/5.0").
-define(ERTS_MINIMUM_REQUIRED, "10.0").
%%--------------------------------------------------------------------
%% Configs
%%--------------------------------------------------------------------
-define(NO_PRIORITY_TABLE, none).
%%--------------------------------------------------------------------
%% Topics' prefix: $SYS | $queue | $share
%%--------------------------------------------------------------------
%% System topic
-define(SYSTOP, <<"$SYS/">>).
%% Queue topic
-define(QUEUE, <<"$queue/">>).
%%--------------------------------------------------------------------
%% Message and Delivery
%%--------------------------------------------------------------------
-record(subscription, {topic, subid, subopts}).
%% See 'Application Message' in MQTT Version 5.0
-record(message, {
%% Global unique message ID
@ -24,15 +63,15 @@
%% Message from
from :: atom() | binary(),
%% Message flags
flags = #{} :: dgiot_types:flags(),
flags = #{} :: emqx_types:flags(),
%% Message headers. May contain any metadata. e.g. the
%% protocol version number, username, peerhost or
%% the PUBLISH properties (MQTT 5.0).
headers = #{} :: dgiot_types:headers(),
headers = #{} :: emqx_types:headers(),
%% Topic that the message is published to
topic :: dgiot_types:topic(),
topic :: emqx_types:topic(),
%% Message Payload
payload :: dgiot_types:payload(),
payload :: emqx_types:payload(),
%% Timestamp (Unit: millisecond)
timestamp :: integer()
}).
@ -41,3 +80,56 @@
sender :: pid(), %% Sender of the delivery
message :: #message{} %% The message delivered
}).
%%--------------------------------------------------------------------
%% Route
%%--------------------------------------------------------------------
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()}
}).
%%--------------------------------------------------------------------
%% Plugin
%%--------------------------------------------------------------------
-record(plugin, {
name :: atom(),
dir :: string() | undefined,
descr :: string(),
vendor :: string() | undefined,
active = false :: boolean(),
info = #{} :: map(),
type :: atom()
}).
%%--------------------------------------------------------------------
%% Command
%%--------------------------------------------------------------------
-record(command, {
name :: atom(),
action :: atom(),
args = [] :: list(),
opts = [] :: list(),
usage :: string(),
descr :: string()
}).
%%--------------------------------------------------------------------
%% Banned
%%--------------------------------------------------------------------
-record(banned, {
who :: {clientid, binary()}
| {peerhost, inet:ip_address()}
| {username, binary()}
| {ip_address, inet:ip_address()},
by :: binary(),
reason :: binary(),
at :: integer(),
until :: integer()
}).
-endif.

View File

@ -50,6 +50,7 @@ start_client_sup(ChannleId, Opts0) ->
{error, _Reason, _} -> []
end.
%% @private
channel_opts( #{
<<"scheme">> := Scheme,

View File

@ -78,7 +78,7 @@ on_provider_loaded(_Req, Md) ->
#{name => <<"message.publish">>},
#{name => <<"message.delivered">>},
#{name => <<"message.acked">>}
%#{name => <<"message.dropped">>}
#{name => <<"message.dropped">>}
]}, Md}.
-spec on_provider_unloaded(dgiot_exhook_pb:provider_unloaded_request(), grpc:metadata())
-> {ok, dgiot_exhook_pb:empty_success(), grpc:metadata()}

View File

@ -26,6 +26,7 @@
% --------------------------------------------------------------------
-define(METER, <<"METER">>).
-define(CONCENTRATOR, <<"CONCENTRATOR">>).
-define(DLT645, <<"DLT645">>).
-define(DLT376, <<"DLT376">>).
@ -41,6 +42,77 @@
%% Internal Header File
%DIR define
-define(DIR_DOWN, 0).
-define(DIR_UP, 1).
%PRM define
-define(PRM_SLAVER, 0).
-define(PRM_MASTER, 1).
%FCB define
-define(FCB_FALSE, 0).
-define(FCB_TRUE, 1).
%ACD define
-define(ACD_FALSE, 0).
-define(ACD_TRUE, 1).
%FCV define
-define(FCV_FALSE, 0).
-define(FCV_TRUE, 1).
%FIRN define
-define(FIRN_MULTI_MID, 0).
-define(FIRN_MULTI_END, 1).
-define(FIRN_MULTI_FIR, 2).
-define(FIRN_SINGLE, 3).
%CON define
-define(CON_FALSE, 0).
-define(CON_TRUE, 1).
%TPV define
-define(TPV_FALSE, 0).
-define(TPV_TRUE, 1).
%AFN define
-define(AFN_CONFIRM_OR_DENY, 16#00).
-define(AFN_RESET_COMMAND, 16#01).
-define(AFN_LINK_CHECK, 16#02).
-define(AFN_RELAY_COMMAND, 16#03).
-define(AFN_WRITE_PARAM, 16#04).
-define(AFN_CONTROL_COMMAND, 16#05).
-define(AFN_IDENTITI_PSW, 16#06).
-define(AFN_CASCADE_REPORT, 16#08).
-define(AFN_TERMINAL_PARAM, 16#09).
-define(AFN_READ_PARAM, 16#0A).
-define(AFN_GW_TASK_DATA, 16#1B).
-define(AFN_READ_CURRENT_DATA, 16#0C).
-define(AFN_READ_HISTORY_DATA, 16#0D).
-define(AFN_READ_EVENT_RECORD, 16#0E).
-define(AFN_READ_FILE_TRANSFER, 16#0F).
-define(AFN_RELAY_TRANSFER, 16#10).
-define(AFN_READ_TASK_DATA, 16#12).
-define(AFN_READ_WARN_DATA, 16#13).
-define(AFN_CASCADE_COMMAND, 16#14).
-define(AFN_USER_DEFINE_DATA, 16#15).
%LFN define
-define(LFN_SLAVER_CON, 0).
-define(LFN_SLAVER_RESP_USERDATA, 8).
-define(LFN_SLAVER_RESP_NODATA, 9).
-define(LFN_SLAVER_REQ_AND_RESP_LIKN, 11).
-define(LFN_MASTER_CONFIRM, 0).
-define(LFN_MASTER_SEND_AND_CON, 1).
-define(LFN_MASTER_SEND_AND_NOASWER, 4).
-define(LFN_MASTER_REQ_AND_RESP_LIKN, 9).
-define(LFN_MASTER_REQ_AND_RESP_1, 10).
-define(LFN_MASTER_REQ_AND_RESP_2, 11).
%% @doc dlt376 COMMAND.
-define(DLT376_MS_READ_DATA, 16#5B).
-define(DLT376_MS_READ_DATA_AFN, 16#0C).

View File

@ -115,7 +115,7 @@ create_meter4G(MeterAddr, ChannelId, DTUIP) ->
get_sub_device(DtuAddr) ->
Query = #{<<"keys">> => [<<"devaddr">>, <<"product">>],
Query = #{<<"keys">> => [<<"devaddr">>, <<"product">>,<<"route">>],
<<"where">> => #{<<"route.", DtuAddr/binary>> => #{<<"$regex">> => <<".+">>}},
<<"order">> => <<"devaddr">>, <<"limit">> => 256},
case dgiot_parse:query_object(<<"Device">>, Query) of
@ -132,7 +132,6 @@ parse_frame(?DLT645, Buff, Opts) ->
parse_frame(?DLT376, Buff, Opts) ->
{Rest, Frames} = dlt376_decoder:parse_frame(Buff, Opts),
% ?LOG(warning,"GGM 170 dgiot_meter parse_frame:~p", [Frames]),
{Rest, lists:foldl(fun(X, Acc) ->
Acc ++ [maps:without([<<"diff">>, <<"send_di">>], X)]
end, [], Frames)}.
@ -309,7 +308,7 @@ search_meter(tcp, _Ref, TCPState, 0) ->
<<"msgtype">> => ?DLT645,
<<"addr">> => dlt645_proctol:reverse(<<16#AA, 16#AA, 16#AA, 16#AA, 16#AA, 16#AA>>),
<<"command">> => ?DLT645_MS_READ_DATA,
<<"di">> => dlt645_proctol:reverse(<<0, 0, 0, 0>>)
<<"di">> => dlt645_proctol:reverse(<<0, 0, 0, 0>>) %%
}),
?LOG(info, "Payload ~p", [dgiot_utils:binary_to_hex(Payload)]),
dgiot_tcp_server:send(TCPState, Payload),
@ -357,7 +356,7 @@ search_meter(1) ->
<<"msgtype">> => ?DLT645,
<<"addr">> => <<Flag:8, 16#AA, 16#AA, 16#AA, 16#AA, 16#AA>>,
<<"command">> => ?DLT645_MS_READ_DATA,
<<"di">> => dlt645_proctol:reverse(<<0, 0, 0, 0>>)})
<<"di">> => dlt645_proctol:reverse(<<0, 0, 0, 0>>)}) %%
end;
search_meter(_) ->

View File

@ -47,8 +47,14 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
{Protocol, DtuAddr} =
case Buff of
<<16#68, _:4/bytes, 16#68, _A1:8/bytes, _Rest/binary>> ->
{_, [Acc | _]} = dlt376_decoder:parse_frame(NewBuff, []),
#{<<"msgtype">> := Protocol1, <<"addr">> := MeterAddr} = Acc,
{_, [Acc | _]} = dlt376_decoder:parse_frame(Buff, []), %% NewBuff
#{<<"msgtype">> := Protocol1, <<"con">> := Con, <<"addr">> := MeterAddr} = Acc,
case Con of
1 ->
Frame = maps:get(<<"frame">>, Acc, <<>>),
dgiot_tcp_server:send(TCPState, Frame);
_ -> ok
end,
dgiot_meter:create_meter4G(MeterAddr, ChannelId, DTUIP),
{Protocol1, MeterAddr};
_ ->
@ -62,8 +68,23 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(Buff)]),
{NewRef, NewStep} = {undefined, read_meter},
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
case Search of
<<"nosearch">> ->
lists:map(fun(X) ->
case X of
#{<<"product">> := #{<<"objectId">> := MeterProductid}, <<"devaddr">> := Meteraddr,<<"route">> := Route} ->
dgiot_bridge:send_log(ChannelId, MeterProductid, Meteraddr, "save taskque MeterProductid ~p Meteraddr ~p", [MeterProductid, Meteraddr]),
dgiot_data:insert({concentrator, MeterProductid, Meteraddr}, Route),
dgiot_task:save_pnque(DtuProductId, DtuAddr, MeterProductid, Meteraddr);
_ ->
pass
end
end, dgiot_meter:get_sub_device(DtuAddr));
_ ->
pass
end,
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT376, ref = NewRef, step = NewStep}}};
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT376,ref = NewRef, step = NewStep}}};
?DLT645 ->
dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP),
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
@ -81,7 +102,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
pass
end
end, dgiot_meter:get_sub_device(DtuAddr)),
{undefined, read_meter};
{undefined, read_meter};
<<"quick">> ->
dgiot_meter:search_meter(tcp, undefined, TCPState, 0),
{undefined, search_meter};
@ -89,9 +110,9 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
{Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
{Ref, Step}
end,
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]),
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]),
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT645, ref = NewRef, step = NewStep}}}
end;
@ -134,6 +155,12 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = Protocol
?DLT376 ->
dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
{Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
case Frames of
[#{<<"con">> := 1} = Frame | _] ->
Frame1 = maps:get(<<"frame">>, Frame, <<16#68>>),
dgiot_tcp_server:send(TCPState, Frame1);
_ -> ok
end,
dlt376_decoder:process_message(Frames, ChannelId),
{noreply, TCPState#tcp{buff = Rest}};
?DLT645 ->
@ -152,10 +179,12 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = _ChannelId}} = TCPS
case jsx:is_json(Payload) of
true ->
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
[<<"thing">>, _ProductId, _DevAddr] ->
[<<"thing">>, ProductId, DevAddr] ->
#tcp{state = #state{protocol = Protocol}} = TCPState,
case Protocol of
?DLT376 ->
Route = dgiot_data:get({concentrator, ProductId, DevAddr}),
?LOG(info,"Route ~p ",[Route]),
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(ThingData),
dgiot_tcp_server:send(TCPState, Payload1);

View File

@ -47,24 +47,27 @@ parse_frame(<<Rest/binary>> = Bin, Acc, _Opts) when byte_size(Rest) == 15 ->
%% 68 32 00 32 00 68 C9 00 10 01 00 00 02 70 00 00 01 00 4D 16
parse_frame(<<16#68, _:16, L2_low:6, _:2, L2_high:8, 16#68, C:8, A1:2/bytes, A2:2/bytes, A3:1/bytes, AFN:8, SEQ:8,Rest/binary>> = Bin, Acc, Opts) ->
Len = L2_high * 255 + L2_low,
DLen = Len -8,
DLen = Len - 8,
case byte_size(Rest) -2 >= DLen of
true ->
case Rest of
<<UserZone:DLen/bytes, Crc:8, 16#16, Rest1/binary>> ->
CheckBuf = <<C:8, A1:2/bytes,A2:2/bytes,A3:1/bytes, AFN:8, SEQ:8, UserZone/binary>>,
CheckCrc = dgiot_utils:get_parity(CheckBuf),
<<_Tpv:1, _FIRN:2, CON:1, _:4>> = <<SEQ:8>>,
% BinA = dgiot_utils:to_binary(A),
Acc1 =
case CheckCrc =:= Crc of
true ->
Frame = #{
% <<"addr">> => <<"16#00,16#00",dlt645_proctol:reverse(A1)/binary,dlt645_proctol:reverse(A2)/binary>>,
<<"addr">> => dgiot_utils:binary_to_hex(dlt376_proctol:encode_of_addr(A1,A2)), %dlt376_proctol:concrat_binary(dlt645_proctol:reverse(A1),dlt645_proctol:reverse(A2)),
<<"addr">> => dgiot_utils:binary_to_hex(dlt376_proctol:encode_of_addr(A1,A2)),
<<"command">> => C,
<<"afn">> => AFN,
<<"datalen">> => DLen,
<<"msgtype">> => ?DLT376
<<"msgtype">> => ?DLT376,
<<"con">> => CON,
<<"concentrator">> => <<A1:2/bytes,A2:2/bytes,A3:1/bytes>>
},
case catch (parse_userzone(UserZone, Frame, Opts)) of
{'EXIT', Reason} ->
@ -97,31 +100,42 @@ parse_userzone(UserZone, #{<<"msgtype">> := ?DLT376} = Frame, _Opts) ->
to_frame(#{
% <<"msgtype">> := ?DLT376,
<<"command">> := C,
<<"addr">> := Addr,
%% <<"addr">> := Addr, %%?? Addr是6位字符
<<"concentrator">> := Addr,
<<"afn">> := AFN
} = Msg) ->
} = Msg) ->
{ok, UserZone} = get_userzone(Msg),
Len = (byte_size(UserZone) + 8) * 4 + 2,
Crc = dgiot_utils:get_parity(<<C:8, Addr:5/bytes, AFN:8, 16#71, UserZone/binary>>),
Crc = dgiot_utils:get_parity(<<C:8, Addr:5/bytes, AFN:8, 16#61, UserZone/binary>>),
<<
16#68,
Len:8,
16#00,
Len:8,
16#00,
Len:16/little,
Len:16/little,
16#68,
C:8,
Addr:5/bytes,
AFN:8,
16#71,
16#61,
UserZone/binary,
Crc:8,
16#16
>>.
% DLT376
check_Command(State = #{<<"command">> := 16#C9, <<"afn">> := 16#02}) ->
State;
% DLT376
check_Command(State = #{<<"afn">> := 16#02, <<"data">> := <<16#00, 16#00, 16#01, 16#00, _Version/binary>>}) ->
Frame = to_frame(State#{<<"command">> => 11,
<<"afn">> => 0,
<<"di">> => <<"00000400">>,
<<"data">> => <<"020000010000">>}),
State#{<<"frame">> => Frame};
% DLT376
check_Command(State = #{<<"afn">> := 16#02, <<"data">> := <<16#00, 16#00, 16#04, 16#00, _Time/binary>>}) ->
Frame = to_frame(State#{<<"command">> => 11,
<<"afn">> => 0,
<<"di">> => <<"00000400">>,
<<"data">> => <<"020000040000">>}),
State#{<<"frame">> => Frame};
% DLT376
check_Command(State = #{<<"command">> := 16#88, <<"afn">> := 16#0C}) ->
@ -140,9 +154,27 @@ check_Command(State = #{<<"command">> := 16#88, <<"afn">> := 16#0C}) ->
_ ->
State
end;
% DLT376
check_Command(State = #{<<"afn">> := 16#0C}) ->
Data = maps:get(<<"data">>, State, <<>>),
case Data of
<<Di:4/bytes,DTime:5/bytes,DNum:1/bytes,DValue:5/bytes,_/bytes>> ->
State1 = #{
<<"di">> => Di,
<<"time">> =>dgiot_utils:to_hex(DTime),
<<"valuenum">> => DNum,
<<"value">> => #{dgiot_utils:to_hex(Di)=>binary_to_value_dlt376_bcd(DValue)},
<<"addr">> => maps:get(<<"addr">>, State, <<>>)
},
State1;
_ ->
State
end;
% DLT376 穿
check_Command(State = #{<<"command">> := 16#88, <<"afn">> := 16#10}) ->
% check_Command(State = #{<<"command">> := 16#88, <<"afn">> := 16#10}) ->
check_Command(State = #{<<"afn">> := 16#10}) ->
Data = maps:get(<<"data">>, State, <<>>),
case Data of
<<_:4/bytes,_:1/bytes,DLen2:8,DLen1:8,Rest/bytes>> ->
@ -153,11 +185,11 @@ check_Command(State = #{<<"command">> := 16#88, <<"afn">> := 16#10}) ->
?LOG(warning,"GGM 160 check_Command:~p", [Frames]),
case Frames of
%
[#{<<"command">>:=16#9C} | _] ->
[#{<<"command">> := 16#9C} | _] ->
Di = <<16#FE,16#FE,16#FE,16#FE>>,
State1 = #{
<<"di">> => Di,%
<<"value">> => #{dgiot_utils:to_hex(Di)=>0 },
<<"value">> => #{dgiot_utils:to_hex(Di) => 0 },
<<"addr">> => maps:get(<<"addr">>, State, <<>>)
},
State1;

View File

@ -1,4 +1,12 @@
## Password hash.
##
## Value: plain | md5 | sha | sha256 | sha512
auth.password_hash = sha256
auth.password_hash = sha256
##--------------------------------------------------------------------
## grpc Plugin
##--------------------------------------------------------------------
iot_dgiot_mqtt.listener = 31888
iot_dgiot_mqtt.heartbeat = 20

View File

@ -4,4 +4,15 @@
{mapping, "auth.password_hash", "dgiot_mqtt.password_hash", [
{default, sha256},
{datatype, {enum, [plain, md5, sha, sha256, sha512]}}
]}.
]}.
{mapping, "iot_dgiot_mqtt.listener", "dgiot_mqtt.listener", [
{default, 31888},
{datatype, [integer, ip]}
]}.
{mapping, "iot_dgiot_mqtt.heartbeat", "dgiot_mqtt.heartbeat", [
{default, 180},
{datatype, integer}
]}.

View File

@ -1,4 +1,22 @@
{deps, []}.
{plugins,
[{grpc_plugin, {git, "https://gitee.com/fastdgiot/grpc_plugin", {tag, "v0.10.1"}}}
]}.
{deps,
[{grpc, {git, "https://gitee.com/fastdgiot/grpc-erl", {tag, "0.6.4"}}}
]}.
{grpc,
[ {type, all}
, {protos, ["priv/protos/"]}
, {out_dir, "src/"}
, {gpb_opts, [{module_name_suffix, "_pb"}]}
]}.
{provider_hooks,
[{pre, [{compile, {grpc, gen}},
{clean, {grpc, clean}}]}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
@ -22,3 +40,4 @@
{emqtt, {git, "https://gitee.com/fastdgiot/emqtt", {tag, "1.2.3"}}}]}
]}
]}.

View File

@ -1,7 +1,7 @@
{application, dgiot_mqtt,
[{description, "DGIOT MQTT"},
{vsn, "4.3.0"},
{applications, [kernel, stdlib, dgiot]},
{applications, [kernel, stdlib, grpc, dgiot]},
{modules, []},
{env, []},
{mod, {dgiot_mqtt_app, []}},

View File

@ -39,6 +39,7 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = dgiot_mqtt_sup:start_link(),
_ = load_auth_hook(),
_ = load_acl_hook(),
%% _ = load_publish_hook(),
{ok, Sup}.
@ -48,6 +49,7 @@ stop(_State) ->
prep_stop(State) ->
emqx:unhook('client.authenticate', fun dgiot_mqtt_auth:check/3),
emqx:unhook('client.check_acl', fun dgiot_mqtt_acl:check_acl/5),
%% emqx:unhook('message.publish', fun dgiot_mqtt_message:on_message_publish/3),
State.
load_auth_hook() ->
@ -55,3 +57,8 @@ load_auth_hook() ->
load_acl_hook() ->
emqx:hook('client.check_acl', fun dgiot_mqtt_acl:check_acl/5, [#{}]).
%%load_publish_hook() ->
%% emqx:hook('message.publish', fun dgiot_mqtt_message:on_message_publish/3, [#{}]).

View File

@ -16,14 +16,9 @@
-module(dgiot_mqtt_auth).
%%-include("emqx_auth_mnesia.hrl").
%%
%%-include_lib("emqx/include/emqx.hrl").
%%-include_lib("emqx/include/logger.hrl").
%%-include_lib("emqx/include/types.hrl").
%%
%%-include_lib("stdlib/include/ms_transform.hrl").
%%
-include_lib("dgiot/include/logger.hrl").
-include("dgiot_mqtt.hrl").
-define(TABLE, emqx_user).
%% Auth callbacks
-export([
@ -34,7 +29,7 @@
check(#{username := Username}, AuthResult, _)
when Username == <<"anonymous">> orelse Username == undefined orelse Username == <<>> ->
io:format("~s ~p Username: ~p~n", [?FILE, ?LINE, Username]),
{stop, AuthResult#{anonymous => true, auth_result => success}};
{ok, AuthResult#{anonymous => true, auth_result => success}};
%% clientid password token
check(#{clientid := Token, username := UserId, password := Token}, AuthResult, #{hash_type := _HashType}) ->

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_mqtt_message).
-include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot/include/dgiot_mqtt.hrl").
%% ACL Callbacks
-export([
on_message_publish/2
]).
-define(EMPTY_USERNAME, <<"">>).
on_message_publish(Message = #message{from = ClientId,
topic = <<"$dg/",_Rest/binary>>, payload = _Payload, headers = _Headers}, _State) ->
%% Topic = <<"$dg/",Rest/binary>>,
%% Username = maps:get(username, Headers, ?EMPTY_USERNAME),
io:format("~s ~p ClientId: ~p~n", [?FILE, ?LINE, ClientId]),
{ok, Message};
%% case catch luerl:call_function([on_message_publish], [ClientId, Username, Topic, Payload, QoS, Retain], _State) of
%% {'EXIT', St} ->
%% ?LOG(error, "Failed to execute function on_message_publish(), which has syntax error, St=~p", [St]),
%% {ok, Message};
%% {[false], _St} ->
%% {stop, Message};
%% {[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
%% ?LOG(debug, "Lua function on_message_publish() return ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]),
%% {ok, Message#message{topic = NewTopic, payload = NewPayload,
%% qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
%% Other ->
%% ?LOG(error, "Topic=~p, lua function on_message_publish caught exception, ~p", [Topic, Other]),
%% {ok, Message}
%% end;
on_message_publish(Message, _State) ->
%% ignore topics starting with $
{ok, Message}.
%%on_message_delivered(#{}, #message{topic = <<$$, _Rest/binary>>}, _State) ->
%% %% ignore topics starting with $
%% ok;
%%on_message_delivered(#{clientid := ClientId, username := Username},
%% Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = Flags = #{retain := Retain}},
%% _State) ->
%% ok.
%%
%%on_message_acked(#{}, #message{topic = <<$$, _Rest/binary>>}, _State) ->
%% %% ignore topics starting with $
%% ok;
%%on_message_acked(#{clientid := ClientId, username := Username},
%% Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = #{retain := Retain}}, _State) ->
%% ?LOG(debug, "Message acked by client(~s): ~s~n",
%% [ClientId, emqx_message:format(Message)]),
%% ok.