fix: mqtt mock

This commit is contained in:
jhonliu 2022-11-14 12:14:59 +08:00
parent 33562878eb
commit ec17b48f4b
14 changed files with 122 additions and 54 deletions

View File

@ -94,7 +94,7 @@ has_routes(Topic) ->
%% clientid动态订阅topic
subscribe_mgmt(ClientId, Topic) ->
timer:sleep(100),
timer:sleep(1),
emqx_mgmt:subscribe(ClientId, [{Topic, #{qos => 0}}]).
%% clientid动态取消订阅topic
@ -104,12 +104,12 @@ unsubscribe_mgmt(ClientId, Topic) ->
subscribe(Topic) ->
Options = #{qos => 0},
timer:sleep(100),
timer:sleep(1),
emqx:subscribe(Topic, dgiot_utils:to_binary(self()), Options).
%% clientid动态订阅topic
subscribe(ClientId, TopicFilter) ->
timer:sleep(100),
timer:sleep(1),
case emqx_broker_helper:lookup_subpid(ClientId) of
Pid when is_pid(Pid) ->
subscribe(TopicFilter, ClientId, Pid, subopts());

View File

@ -31,11 +31,21 @@
%%% API
%%%===================================================================
subscribe(Client, Topic, Qos) ->
gen_server:call(Client, {subscribe, Topic, Qos}).
subscribe(#connect_state{socket = ConnPid}, Topic, Qos) ->
case ConnPid of
disconnect ->
pass;
Pid ->
emqtt:subscribe(Pid, {Topic, Qos})
end.
publish(Client, Topic, Payload, Qos) ->
gen_server:call(Client, {publish, Topic, Payload, Qos}).
publish(#connect_state{socket = ConnPid}, Topic, Payload, Qos) ->
case ConnPid of
disconnect ->
pass;
Pid ->
emqtt:publish(Pid, Topic, Payload, Qos)
end.
start_link(Args) ->
dgiot_client:start_link(?MODULE, Args).
@ -87,7 +97,7 @@ handle_cast(_Request, Dclient) ->
handle_info(connect, #dclient{userdata = #connect_state{options = Options, mod = Mod} = ConnectStat} = Dclient) ->
case connect(Options) of
{ok, ConnPid, Props} ->
case Mod:handle_info({connect, ConnPid}, Dclient#dclient{userdata = ConnectStat#connect_state{props = Props}}) of
case Mod:handle_info({connect, ConnPid}, Dclient#dclient{userdata = ConnectStat#connect_state{props = Props, socket = ConnPid}}) of
{noreply, NewDclient} ->
{noreply, NewDclient};
{stop, Reason, NewDclient} ->

View File

@ -34,7 +34,6 @@ init_ets() ->
load_cache() ->
Success = fun(Page) ->
lists:map(fun(Product) ->
%% dgiot_mnesia:insert(ObjectId, ['Product', dgiot_role:get_acls(Product), ProductSecret]),
dgiot_product:save(Product)
end, Page)
end,

View File

@ -33,8 +33,9 @@ parse_cache_Device(_ClassName) ->
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, ClassName]),
dgiot_product:load_cache(),
Success = fun(Page) ->
lists:map(fun(Device) ->
lists:map(fun(#{<<"devaddr">> := Devaddr} = Device) ->
%% save_profile(Device),
io:format("Devaddr ~p ~n",[Devaddr]),
dgiot_device:save(Device)
end, Page)
end,
@ -43,7 +44,7 @@ parse_cache_Device(_ClassName) ->
<<"keys">> => [<<"ACL">>, <<"updatedAt">>, <<"devaddr">>, <<"status">>, <<"isEnable">>, <<"profile">>, <<"product">>, <<"location">>, <<"deviceSecret">>],
<<"where">> => #{}
},
dgiot_parse_loader:start(<<"Device">>, Query, 0, 500, 1000000, Success).
dgiot_parse_loader:start(<<"Device">>, Query, 0, 100, 1000000, Success).
save(ProductId, DevAddr) ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),

View File

@ -39,6 +39,7 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = dgiot_dlink_sup:start_link(),
_ = load_auth_hook(),
_ = load_acl_hook(),
_ = load_offline_hook(),
_ = load_publish_hook(),
{ok, Sup}.
@ -49,6 +50,8 @@ stop(_State) ->
prep_stop(State) ->
emqx:unhook('client.authenticate', fun dgiot_mqtt_auth:check/3),
emqx:unhook('client.check_acl', fun dgiot_mqtt_acl:check_acl/5),
emqx:unhook('client.disconnected', fun dgiot_mqtt_offline:on_client_disconnected/4),
emqx:unhook('session.terminated', fun dgiot_mqtt_offline:on_session_terminated/4),
emqx:unhook('message.publish', fun dgiot_mqtt_message:on_message_publish/2),
State.
@ -59,6 +62,12 @@ load_auth_hook() ->
load_acl_hook() ->
emqx:hook('client.check_acl', fun dgiot_mqtt_acl:check_acl/5, [#{}]),
ok.
load_offline_hook() ->
emqx:hook('client.disconnected', fun dgiot_mqtt_offline:on_client_disconnected/4, [#{}]),
emqx:hook('session.terminated', fun dgiot_mqtt_offline:on_session_terminated/4, [#{}]),
ok.
load_publish_hook() ->
emqx:hook('message.publish', fun dgiot_mqtt_message:on_message_publish/2, [#{}]),
ok.

View File

@ -75,16 +75,15 @@ handle_event(_EventId, _Event, State) ->
handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, #{<<"profile">> := #{<<"mock">> := #{
<<"enable">> := true, <<"transport">> := <<"mqtt">>} = Mock}, <<"objectId">> := DeviceId}},
#state{id = ChannelId} = State) ->
%% io:format("~s ~p DeviceId ~p Mock ~p", [?FILE, ?LINE, DeviceId, Mock]),
%% io:format("~s ~p DeviceId = ~p.~n", [?FILE, ?LINE, DeviceId]),
dgiot_mock_mqtt:start(ChannelId, DeviceId, Mock),
{ok, State};
handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, #{<<"profile">> := #{<<"mock">> := #{<<"enable">> := false}}, <<"objectId">> := DeviceId}},
#state{id = ChannelId} = State) ->
%% io:format("~s ~p DeviceId ~p", [?FILE, ?LINE, DeviceId]),
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := DevAddr, <<"productid">> := ProductId}} ->
dgiot_client:stop(ChannelId, <<ProductId/binary, "_", DevAddr/binary>>, DevAddr);
dgiot_client:stop(ChannelId, <<ProductId/binary, "_", DevAddr/binary>>);
_ ->
pass
end,

View File

@ -23,7 +23,6 @@
%% API
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, code_change/3]).
childspec(ChannelId, ChannelArgs) ->
Options = #{
host => binary_to_list(maps:get(<<"address">>, ChannelArgs, <<"127.0.0.1">>)),
@ -37,8 +36,20 @@ childspec(ChannelId, ChannelArgs) ->
dgiot_client:register(ChannelId, mqtt_client_sup, Args).
%% callback
init(#dclient{channel = ChannelId} = State) ->
{ok, State#dclient{channel = dgiot_utils:to_binary(ChannelId)}}.
init(#dclient{channel = ChannelId, child = #{<<"endtime">> := EndTime1, <<"starttime">> := StartTime1} = Child} = State) ->
Freq = dgiot_utils:to_int(maps:get(<<"freq">>, Child, 5)),
StartTime = dgiot_utils:to_int(StartTime1),
EndTime = dgiot_utils:to_int(EndTime1),
NextTime = dgiot_client:get_nexttime(StartTime, Freq + 5),
Count = dgiot_client:get_count(StartTime, EndTime, Freq),
Rand =
case maps:get(<<"rand">>, Child, true) of
true ->
dgiot_client:get_rand(Freq);
_ ->
0
end,
{ok, State#dclient{channel = dgiot_utils:to_binary(ChannelId), clock = #dclock{nexttime = NextTime + Rand, freq = Freq, count = Count, round = 0}}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
@ -46,9 +57,11 @@ handle_call(_Request, _From, State) ->
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({connect, Client}, #dclient{channel = ChannelId, client = ClientId} = Dclient) ->
emqtt:subscribe(Client, {<<ClientId/binary, "/#">>, 1}), % cloud to edge
dgiot_bridge:send_log(ChannelId, "~s ~p ~p ~n", [?FILE, ?LINE, jsx:encode(#{<<"network">> => <<"connect">>})]),
handle_info({connect, Pid}, #dclient{channel = ChannelId, client = <<ProductId:10/binary, "_", DevAddr/binary>>} = Dclient) ->
FirmwareTopic = <<"$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/firmware/report">>,
emqtt:publish(Pid, FirmwareTopic, jiffy:encode(#{<<"devaddr">> => DevAddr}), 1), % cloud to edge
ProfileTopic = <<"$dg/device/", ProductId/binary, "/", DevAddr/binary, "/profile">>,
emqtt:subscribe(Pid, ProfileTopic, 1),
update(ChannelId),
{noreply, Dclient};
@ -56,9 +69,23 @@ handle_info(disconnect, #dclient{channel = ChannelId} = Dclient) ->
dgiot_bridge:send_log(ChannelId, "~s ~p ~p ~n", [?FILE, ?LINE, jsx:encode(#{<<"network">> => <<"disconnect">>})]),
{noreply, Dclient};
handle_info(next_time, #dclient{channel = Channel, client = <<ProductId:10/binary, "_", DevAddr/binary>> = Client, userdata = UserData,
clock = #dclock{round = Round, nexttime = NextTime, count = Count, freq = Freq} = Clock} = Dclient) ->
dgiot_client:stop(Channel, Client, Count), %% ¼ì²éÊÇ·ñÐèҪֹͣÈÎÎñ
NewNextTime = dgiot_client:get_nexttime(NextTime, Freq),
NewRound = Round + 1,
PayLoad = lists:foldl(fun(Key, Acc) ->
Acc#{Key => erlang:round(rand:uniform() * 60 * 1 + 1) * 1000}
end, #{}, dgiot_product:get_keys(ProductId)),
Topic = <<"$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/properties/report">>,
dgiot_mqtt_client:publish(UserData, Topic, jiffy:encode(PayLoad), 0),
{noreply, Dclient#dclient{clock = Clock#dclock{nexttime = NewNextTime, count = Count - 1, round = NewRound}}};
handle_info({publish, #{payload := Payload, topic := Topic} = _Msg}, #dclient{channel = ChannelId} = State) ->
io:format("~s ~p ChannelId ~p Topic ~p Payload ~p ~n", [?FILE, ?LINE, ChannelId, Topic, Payload]),
dgiot_bridge:send_log(ChannelId, "cloud to edge: Topic ~p Payload ~p ~n", [Topic, Payload]),
%% dgiot_bridge:send_log(ChannelId, "cloud to edge: Topic ~p Payload ~p ~n", [Topic, Payload]),
%% dgiot_mqtt:publish(ChannelId, Topic, Payload),
{noreply, State};
@ -87,11 +114,9 @@ code_change(_OldVsn, Dclient, _Extra) ->
update(ChannelId) ->
dgiot_data:insert({<<"mqtt_online">>, dlink_metrics}, dgiot_client:count(ChannelId)).
start(ChannelId, DeviceId, #{<<"auth">> := <<"ProductSecret">>}) ->
start(ChannelId, DeviceId, #{<<"auth">> := <<"ProductSecret">>} = Mock) ->
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := DevAddr, <<"productid">> := ProductId}} ->
Options = #{
host => "127.0.0.1",
port => 1883,
@ -100,8 +125,7 @@ start(ChannelId, DeviceId, #{<<"auth">> := <<"ProductSecret">>}) ->
password => binary_to_list(dgiot_product:get_productSecret(ProductId)),
clean_start => false
},
%% io:format("~s ~p DeviceId ~p DevAddr ~p ", [?FILE, ?LINE, DeviceId, DevAddr]),
dgiot_client:start(ChannelId, <<ProductId/binary, "_", DevAddr/binary>>, #{<<"options">> => Options});
dgiot_client:start(ChannelId, <<ProductId/binary, "_", DevAddr/binary>>, #{<<"options">> => Options, <<"child">> => Mock});
_ ->
#{}
end;

View File

@ -54,15 +54,6 @@ properties_report(ProductId, DevAddr, Payload) ->
%% json格式报文
firmware_report(ProductId, DevAddr, Payload) when is_map(Payload) ->
%% OldPayload =
%% case dgiot_hook:run_hook({dlink_firmware_report, ProductId}, {ProductId, DevAddr, Payload}) of
%% {ok, [Payload1]} ->
%% Payload1;
%% _ ->
%% Payload
%% end,
%% NewPload = parse_payload(ProductId, OldPayload),
%% dgiot_task:save_td(ProductId, DevAddr, NewPload, #{});
lists:map(fun
({ChannelId, _Ctype}) ->
dgiot_channelx:do_message(ChannelId, {dlink_firmware_report, ProductId, DevAddr, Payload});

View File

@ -52,22 +52,13 @@ do_check(#{username := <<"dgiot">>, clientid := ClientId}, _PubSub, _Topic) ->
deny
end;
%% <<"$dg/thing/uniapp/r:46d3eaec6dfeafc9d719899eae858cb7_uniapp/report">>
do_check(#{clientid := <<Token:34/binary, _Type/binary>>, username := _UserId} = _ClientInfo, publish, <<"$dg/thing/uniapp/", SessionToken:34/binary, "/", _Rest/binary>> = _Topic) ->
%% io:format("~s ~p Topic: ~p~n", [?FILE, ?LINE, _Topic]),
case Token of
SessionToken ->
allow;
_ ->
deny
end;
%% "$dg/thing/productid/devaddr/#"
do_check(#{clientid := DeviceAddr, username := ProductID} = _ClientInfo, publish, <<"$dg/thing/", ProductID:10/binary, "/", DeviceInfo/binary>> = _Topic) ->
do_check(#{clientid := <<ProductID:10/binary, "_", DeviceAddr/binary>>, username := ProductID} = _ClientInfo, publish, <<"$dg/thing/", ProductID:10/binary, "/", DeviceInfo/binary>> = _Topic) ->
%% io:format("~s ~p Topic: ~p _ClientInfo ~p~n", [?FILE, ?LINE, _Topic, _ClientInfo]),
check_device_addr(DeviceInfo, DeviceAddr);
do_check(#{clientid := <<ProductID:10/binary, "_", DeviceAddr/binary>>, username := ProductID} = _ClientInfo, publish, <<"$dg/thing/", ProductID:10/binary, "/", DeviceInfo/binary>> = _Topic) ->
%% "$dg/thing/productid/devaddr/#"
do_check(#{clientid := DeviceAddr, username := ProductID} = _ClientInfo, publish, <<"$dg/thing/", ProductID:10/binary, "/", DeviceInfo/binary>> = _Topic) ->
io:format("~s ~p Topic: ~p _ClientInfo ~p~n", [?FILE, ?LINE, _Topic, _ClientInfo]),
check_device_addr(DeviceInfo, DeviceAddr);
%% "$dg/thing/deviceid/#"

View File

@ -62,8 +62,6 @@ on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, pa
_ ->
pass
end;
[<<"uniapp">>, Token, <<"report">>] ->
dgiot_hook:run_hook({uniapp, report}, {Token, get_payload(Payload)});
_ ->
pass
end,

View File

@ -0,0 +1,46 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_mqtt_offline).
%% ACL Callbacks
-export([on_client_disconnected/4, on_session_terminated/4, description/0]).
%% 线
on_client_disconnected(#{clientid := <<ProductID:10/binary, "_", DeviceAddr/binary>>, username := ProductID}, _ReasonCode, #{disconnected_at := _DisconnectedAt}, _State) ->
DeviceId = dgiot_parse_id:get_deviceid(ProductID, DeviceAddr),
dgiot_device:offline(DeviceId),
io:format("~s ~p ProductID = ~p DeviceAddr ~p DeviceId ~p ~n", [?FILE, ?LINE, ProductID, DeviceAddr, DeviceId]),
ok;
on_client_disconnected(_Client, _ReasonCode, _, _State) ->
ok.
on_session_terminated(#{clientid := <<ProductID:10/binary, "_", DeviceAddr/binary>>, username := ProductID}, _Reason, _SessInfo, _State) ->
DeviceId = dgiot_parse_id:get_deviceid(ProductID, DeviceAddr),
dgiot_device:offline(DeviceId),
io:format("~s ~p ProductID = ~p DeviceAddr ~p DeviceId ~p ~n", [?FILE, ?LINE, ProductID, DeviceAddr, DeviceId]),
ok;
on_session_terminated(ClientInfo, _Reason, _SessInfo, _State) ->
io:format("~s ~p ClientInfo = ~p.~n", [?FILE, ?LINE, ClientInfo]),
ok.
description() -> "Disconnected with Dlink".
%%--------------------------------------------------------------------
%% Internal functions
%%-------------------------------------------------------------------

View File

@ -863,7 +863,7 @@
"tags": [
{
"name": "factory",
"description": "factory"
"description": "数字工厂"
}
]
}

View File

@ -54,7 +54,7 @@ init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"starttime">> :=
_ ->
0
end,
io:format("~s ~p ChannelId ~p ClientId ~p NextTime = ~p Freq ~p Count = ~p.~n", [?FILE, ?LINE, ChannelId, ClientId, NextTime, Freq, Count]),
%% io:format("~s ~p ChannelId ~p ClientId ~p NextTime = ~p Freq ~p Count = ~p.~n", [?FILE, ?LINE, ChannelId, ClientId, NextTime, Freq, Count]),
Dclient = #dclient{channel = ChannelId, client = ClientId, status = ?DCLIENT_INTIALIZED, userdata = #device_task{},
clock = #dclock{nexttime = NextTime + Rand, freq = Freq, count = Count, round = 0}},
{ok, Dclient};