feat: add dgiot_mqtt_worker

This commit is contained in:
U-JOHNLIU\jonhl 2022-03-21 14:54:16 +08:00
parent 96b88dcbde
commit 522f7605c0
6 changed files with 106 additions and 141 deletions

View File

@ -25,5 +25,3 @@
-define(AUTH_CHL, 5).
-define(defult_CHL, 5).
-define(DGIOT_UDPC_WORK, dgiot_udpc_work).

View File

@ -18,9 +18,8 @@
-define(TYPE, <<"MQTTC">>).
-author("kenneth").
-record(state, {id, client = disconnect}).
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-include("dgiot_bridge.hrl").
-include_lib("dgiot/include/logger.hrl").
-include("dgiot_dlink.hrl").
%% API
-dgiot_data("ets").
@ -28,8 +27,8 @@
-export([start/2]).
-export([init/3, handle_event/3, handle_message/2, handle_init/1, stop/3]).
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).
-define(DGIOT_MQTT_WORK, dgiot_mqtt_work).
%%
-channel_type(#{
@ -140,23 +139,10 @@ start(ChannelId, ChannelArgs) ->
%%
init(?TYPE, ChannelId, ChannelArgs) ->
Options = [
{host, binary_to_list(maps:get(<<"address">>, ChannelArgs))},
{port, maps:get(<<"port">>, ChannelArgs)},
{clientid, ChannelId},
{ssl, maps:get(<<"ssl">>, ChannelArgs, false)},
{username, binary_to_list(maps:get(<<"username">>, ChannelArgs))},
{password, binary_to_list(maps:get(<<"password">>, ChannelArgs))},
{clean_start, maps:get(<<"clean_start">>, ChannelArgs, false)}
],
State = #state{
id = ChannelId
},
Specs = [
{dgiot_mqtt_client, {dgiot_mqtt_client, start_link, [?MODULE, [State], Options]}, permanent, 5000, worker, [dgiot_mqtt_client]}
],
{ok, State, Specs}.
{ok, State, dgiot_mqttc_worker:childSpec(ChannelId, State, ChannelArgs)}.
%%
handle_init(State) ->
@ -175,70 +161,3 @@ handle_message(Message, State) ->
stop(ChannelType, ChannelId, _State) ->
?LOG(info, "channel stop ~p,~p", [ChannelType, ChannelId]),
ok.
%% mqtt client hook
init([State]) ->
{ok, State#state{}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({connect, Client}, #state{id = ChannelId} = State) ->
emqtt:subscribe(Client, {<<"bridge/#">>, 1}),
case dgiot_bridge:get_products(ChannelId) of
{ok, _Type, ProductIds} ->
case ProductIds of
[] -> pass;
_ ->
lists:map(fun(ProductId) ->
%% dgiot_product:load(ProductId),
emqtt:subscribe(Client, {<<"bridge/thing/", ProductId/binary, "/#">>, 1}),
dgiot_mqtt:subscribe(<<"forward/thing/", ProductId/binary, "/+/post">>),
dgiot_mqtt:publish(ChannelId, <<"thing/", ProductId/binary>>, jsx:encode(#{<<"network">> => <<"connect">>}))
end, ProductIds)
end,
?LOG(info, "connect ~p sub ~n", [Client]);
_ -> pass
end,
{noreply, State#state{client = Client}};
handle_info(disconnect, #state{id = ChannelId} = State) ->
case dgiot_bridge:get_products(ChannelId) of
{ok, _Type, ProductIds} ->
case ProductIds of
[] -> pass;
_ ->
lists:map(fun(ProductId) ->
dgiot_mqtt:publish(ChannelId, <<"thing/", ProductId/binary>>, jsx:encode(#{<<"network">> => <<"disconnect">>}))
end, ProductIds)
end;
_ -> pass
end,
{noreply, State#state{client = disconnect}};
handle_info({publish, #{payload := Payload, topic := <<"bridge/", Topic/binary>>} = _Msg}, #state{id = ChannelId} = State) ->
dgiot_mqtt:publish(ChannelId, Topic, Payload),
{noreply, State};
handle_info({deliver, _, Msg}, #state{client = Client} = State) ->
case dgiot_mqtt:get_topic(Msg) of
<<"forward/", Topic/binary>> ->
emqtt:publish(Client, Topic, dgiot_mqtt:get_payload(Msg));
_ -> pass
end,
{noreply, State};
handle_info(Info, State) ->
?LOG(info, "unkknow ~p~n", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -0,0 +1,100 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_mqttc_worker).
-author("johnliu").
-record(state, {id, client = disconnect}).
-include_lib("dgiot/include/logger.hrl").
-include("dgiot_bridge.hrl").
-export([childSpec/3, init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).
childSpec(ChannelId, State, ChannelArgs) ->
Options = [
{host, binary_to_list(maps:get(<<"address">>, ChannelArgs))},
{port, maps:get(<<"port">>, ChannelArgs)},
{clientid, ChannelId},
{ssl, maps:get(<<"ssl">>, ChannelArgs, false)},
{username, binary_to_list(maps:get(<<"username">>, ChannelArgs))},
{password, binary_to_list(maps:get(<<"password">>, ChannelArgs))},
{clean_start, maps:get(<<"clean_start">>, ChannelArgs, false)}
],
[{dgiot_mqtt_client, {dgiot_mqtt_client, start_link, [?MODULE, [State], Options]}, permanent, 5000, worker, [dgiot_mqtt_client]}].
%% mqtt client hook
init([State]) ->
{ok, State#state{}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({connect, Client}, #state{id = ChannelId} = State) ->
emqtt:subscribe(Client, {<<"bridge/#">>, 1}),
case dgiot_bridge:get_products(ChannelId) of
{ok, _Type, ProductIds} ->
case ProductIds of
[] -> pass;
_ ->
lists:map(fun(ProductId) ->
%% dgiot_product:load(ProductId),
emqtt:subscribe(Client, {<<"bridge/thing/", ProductId/binary, "/#">>, 1}),
dgiot_mqtt:subscribe(<<"forward/thing/", ProductId/binary, "/+/post">>),
dgiot_mqtt:publish(ChannelId, <<"thing/", ProductId/binary>>, jsx:encode(#{<<"network">> => <<"connect">>}))
end, ProductIds)
end,
?LOG(info, "connect ~p sub ~n", [Client]);
_ -> pass
end,
{noreply, State#state{client = Client}};
handle_info(disconnect, #state{id = ChannelId} = State) ->
case dgiot_bridge:get_products(ChannelId) of
{ok, _Type, ProductIds} ->
case ProductIds of
[] -> pass;
_ ->
lists:map(fun(ProductId) ->
dgiot_mqtt:publish(ChannelId, <<"thing/", ProductId/binary>>, jsx:encode(#{<<"network">> => <<"disconnect">>}))
end, ProductIds)
end;
_ -> pass
end,
{noreply, State#state{client = disconnect}};
handle_info({publish, #{payload := Payload, topic := <<"bridge/", Topic/binary>>} = _Msg}, #state{id = ChannelId} = State) ->
dgiot_mqtt:publish(ChannelId, Topic, Payload),
{noreply, State};
handle_info({deliver, _, Msg}, #state{client = Client} = State) ->
case dgiot_mqtt:get_topic(Msg) of
<<"forward/", Topic/binary>> ->
emqtt:publish(Client, Topic, dgiot_mqtt:get_payload(Msg));
_ -> pass
end,
{noreply, State};
handle_info(Info, State) ->
?LOG(info, "unkknow ~p~n", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -18,11 +18,14 @@
-author("johnliu").
-include_lib("dgiot/include/dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
%% API
-export([init/1, handle_info/2, terminate/2]).
-export([start_connect/1]).
-define(MAX_BUFF_SIZE, 10 * 1024).
-define(DGIOT_UDPC_WORK, dgiot_udpc_work).
-record(state, {
productid,
devaddr,

View File

@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-define(DGIOT_MQTT_WORK, dgiot_mqtt_work).
-define(GROUP_TOPIC(Di, Mid), <<"group/", Di/binary,"/",Mid/binary>>).
-define(COMSUMER_KEY(Di), <<"comsumer/",Di/binary>>).

View File

@ -1,54 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc dgiot_mqttc_sup
-module(dgiot_mqttc_sup).
-behaviour(supervisor).
%% API
-export([start_link/2, start_sup/2]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, transient, 5000, Type, [I]}).
%%--------------------------------------------------------------------
%% API functions
%%--------------------------------------------------------------------
start_link(Name, Mod) ->
supervisor:start_link({local, Name}, ?MODULE, [Mod]).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([Mod]) ->
{ok, { {simple_one_for_one, 5, 10}, [?CHILD(Mod, worker, [])]}}.
start_sup(Name, Mod) ->
case whereis(Name) of
undefined ->
Args = [Name, Mod],
supervisor:start_child(dgiot_mqttc_sup, {Name, {dgiot_mqttc_sup, start_link, Args}, permanent, 5000, supervisor, [Name]});
Pid ->
{ok, Pid}
end.