mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-11-29 18:57:41 +08:00
feat: add dgiot_client for tcp client
This commit is contained in:
parent
b3bb039c1b
commit
038494d3a4
@ -17,9 +17,9 @@
|
||||
|
||||
-record(dclock, {
|
||||
nexttime :: non_neg_integer(), %% 下一次闹铃执行时间
|
||||
endtime :: non_neg_integer(), %% 闹铃结束时间
|
||||
freq :: non_neg_integer(), %% 周期闹铃提醒频率单位为秒
|
||||
round :: non_neg_integer(), %% 闹铃总计执行轮次
|
||||
count:: non_neg_integer(), %% 闹铃总计执行多少次
|
||||
round :: non_neg_integer(), %% 闹铃总计已执行轮次
|
||||
rand :: boolean() %% 闹铃任务启动是否随机错峰处理, 防止所有客户端在同一个时刻启动任务
|
||||
}).
|
||||
|
||||
@ -28,7 +28,8 @@
|
||||
client :: binary(), %% 客户端地址
|
||||
status :: integer(), %% client的状态值
|
||||
clock :: #dclock{}, %% client的闹铃
|
||||
userdata %% 用户自定义参数
|
||||
userdata :: any(), %% 用户自定义参数
|
||||
child ::any() %% 子模块参数
|
||||
}).
|
||||
|
||||
|
||||
|
@ -22,7 +22,7 @@
|
||||
%% API
|
||||
-export([register/3, start_link/2, add_clock/3, notify/3, add/2, set_consumer/2, get_consumer/1]).
|
||||
-export([start/2, start/3, stop/1, stop/2, stop/3, restart/2, get/2, send/4]).
|
||||
-export([get_nexttime/2, send_after/4]).
|
||||
-export([get_nexttime/2, send_after/4, get_count/3]).
|
||||
|
||||
-type(result() :: any()). %% todo 目前只做参数检查,不做结果检查
|
||||
|
||||
@ -110,11 +110,10 @@ stop(ChannelId, ClientId) ->
|
||||
|
||||
%% @doc stop client
|
||||
-spec stop(atom() | binary(), binary(), non_neg_integer()) -> result().
|
||||
stop(ChannelId, ClientId, EndTime) when is_binary(ChannelId) ->
|
||||
stop(binary_to_atom(ChannelId), ClientId, EndTime);
|
||||
stop(ChannelId, ClientId, EndTime) ->
|
||||
NowTime = dgiot_datetime:nowstamp(),
|
||||
case NowTime > EndTime of
|
||||
stop(ChannelId, ClientId, Count) when is_binary(ChannelId) ->
|
||||
stop(binary_to_atom(ChannelId), ClientId, Count);
|
||||
stop(ChannelId, ClientId, Count) ->
|
||||
case Count =< 0 of
|
||||
true ->
|
||||
stop(ChannelId, ClientId)
|
||||
end.
|
||||
@ -146,7 +145,7 @@ get(ChannelId, ClientId) ->
|
||||
Pid when is_pid(Pid) ->
|
||||
case is_process_alive(Pid) of
|
||||
true ->
|
||||
online;
|
||||
{ok, Pid};
|
||||
false ->
|
||||
offline
|
||||
end;
|
||||
@ -196,6 +195,15 @@ send_after(RetryTime, Freq, true, Msg) ->
|
||||
send_after(RetryTime, _Freq, _, Msg) ->
|
||||
erlang:send_after(RetryTime * 1000, self(), Msg).
|
||||
|
||||
%% @doc 获取闹铃执行次数
|
||||
-spec get_count(integer(), integer(), integer()) -> result().
|
||||
get_count(StartTime, EndTime, _Freq) when EndTime >= StartTime ->
|
||||
0;
|
||||
get_count(_StartTime, _EndTime, Freq) when Freq =< 0 ->
|
||||
0;
|
||||
get_count(StartTime, EndTime, Freq) ->
|
||||
(EndTime - StartTime) div Freq.
|
||||
|
||||
get_nexttime(NextTime, Freq) ->
|
||||
NowTime = dgiot_datetime:nowstamp(),
|
||||
get_nexttime(NowTime, Freq, NextTime).
|
||||
@ -224,13 +232,15 @@ get_consumer(ChannelId) ->
|
||||
|
||||
%% 定时检查启动, 10s
|
||||
%% @doc 添加闹铃
|
||||
-spec add_clock(binary() | atom(), binary(), binary()) -> result().
|
||||
-spec add_clock(binary() | atom(), binary() | integer(), binary() | integer()) -> result().
|
||||
add_clock(Channel, Start_time, End_time) when is_binary(Channel) ->
|
||||
add_clock(dgiot_utils:to_atom(Channel), Start_time, End_time);
|
||||
add_clock(Channel, Start_time, End_time) when is_binary(Start_time) ->
|
||||
add_clock(Channel, dgiot_datetime:to_localtime(Start_time), dgiot_datetime:to_localtime(End_time));
|
||||
add_clock(Channel, Start_time, End_time) ->
|
||||
BinChannel = dgiot_utils:to_binary(Channel),
|
||||
dgiot_cron:push(BinChannel, dgiot_datetime:to_localtime(Start_time), {?MODULE, notify, [Channel, start_client]}),
|
||||
dgiot_cron:push(<<BinChannel/binary, "_stop">>, dgiot_datetime:to_localtime(End_time), {?MODULE, notify, [Channel, stop_client]}).
|
||||
dgiot_cron:push(BinChannel, Start_time, {?MODULE, notify, [Channel, start_client]}),
|
||||
dgiot_cron:push(<<BinChannel/binary, "_stop">>, End_time, {?MODULE, notify, [Channel, stop_client]}).
|
||||
|
||||
%% 定时检查启动, 10s
|
||||
%% @doc 闹铃通知回调函数
|
||||
|
@ -18,91 +18,97 @@
|
||||
-author("johnliu").
|
||||
-include("dgiot_socket.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-include_lib("dgiot/include/dgiot_client.hrl").
|
||||
-behaviour(gen_server).
|
||||
%% API
|
||||
-export([start_link/1, send/2]).
|
||||
-export([start_link/1, send/1, send/3]).
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
-record(state, {host, port, child = #tcp{}, mod, reconnect_times, reconnect_sleep = 30}).
|
||||
-record(connect_state, {host, port, mod, socket = undefined}).
|
||||
-define(TIMEOUT, 10000).
|
||||
-define(TCP_OPTIONS, [binary, {active, once}, {packet, raw}, {reuseaddr, false}, {send_timeout, ?TIMEOUT}]).
|
||||
|
||||
start_link(Args) ->
|
||||
dgiot_client:start_link(?MODULE, Args).
|
||||
|
||||
init([#{<<"host">> := Host, <<"port">> := Port, <<"mod">> := Mod, <<"child">> := ChildState} = Args]) ->
|
||||
io:format("~s ~p ~p ~n", [?FILE, ?LINE, ChildState]),
|
||||
init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"ip">> := Host, <<"port">> := Port, <<"mod">> := Mod} = Args]) ->
|
||||
Ip = dgiot_utils:to_list(Host),
|
||||
Port1 = dgiot_utils:to_int(Port),
|
||||
ReconnectTimes = maps:get(<<"reconnect_times">>, Args, max),
|
||||
ReconnectSleep = maps:get(<<"reconnect_sleep">>, Args, 30),
|
||||
State = #state{
|
||||
mod = Mod,
|
||||
host = Ip,
|
||||
port = Port1,
|
||||
reconnect_times = ReconnectTimes, %% 重连次数
|
||||
reconnect_sleep = ReconnectSleep %% 重连间隔
|
||||
},
|
||||
Transport = gen_tcp,
|
||||
Child = #tcp{transport = Transport, socket = undefined},
|
||||
case Mod:init(Child#tcp{state = ChildState}) of
|
||||
Count = maps:get(<<"count">>, Args, max),
|
||||
Freq = maps:get(<<"freq">>, Args, 30),
|
||||
UserData = #connect_state{mod = Mod, host = Ip, port = Port1},
|
||||
Clock = #dclock{freq = Freq, count = Count, round = 0},
|
||||
Dclient = #dclient{channel = ChannelId, client = ClientId, status = ?DCLIENT_INTIALIZED, clock = Clock, userdata = UserData},
|
||||
dgiot_client:add(ChannelId, ClientId),
|
||||
case Mod:init(Dclient) of
|
||||
{ok, ChildState} ->
|
||||
NewState = State#state{
|
||||
child = ChildState
|
||||
},
|
||||
{ok, do_connect(false, NewState), hibernate};
|
||||
NewState = Dclient#dclient{child = ChildState},
|
||||
do_connect(NewState),
|
||||
{ok, NewState, hibernate};
|
||||
{stop, Reason} ->
|
||||
{stop, Reason}
|
||||
end.
|
||||
|
||||
handle_call({connection_ready, Socket}, _From, #state{mod = Mod, child = ChildState} = State) ->
|
||||
NewChildState = ChildState#tcp{socket = Socket},
|
||||
case Mod:handle_info(connection_ready, NewChildState) of
|
||||
{noreply, NewChildState1} ->
|
||||
{reply, ok, State#state{child = NewChildState1}, hibernate};
|
||||
{stop, Reason, NewChildState1} ->
|
||||
{stop, Reason, {error, Reason}, State#state{child = NewChildState1}}
|
||||
handle_call({connection_ready, Socket}, _From, #dclient{channel = ChannelId, client = ClientId,
|
||||
userdata = #connect_state{mod = Mod} = UserData, child = ChildState} = Dclient) ->
|
||||
NewUserData = UserData#connect_state{socket = Socket},
|
||||
case Mod:handle_info(connection_ready, ChildState) of
|
||||
{noreply, NewChildState} ->
|
||||
{reply, ok, Dclient#dclient{child = NewChildState, userdata = NewUserData}, hibernate};
|
||||
{stop, _Reason, NewChildState} ->
|
||||
dgiot_client:stop(ChannelId, ClientId),
|
||||
{reply, _Reason, Dclient#dclient{child = NewChildState, userdata = NewUserData}}
|
||||
end;
|
||||
|
||||
handle_call(Request, From, #state{mod = Mod, child = ChildState} = State) ->
|
||||
handle_call(Request, From, #dclient{channel = ChannelId, client = ClientId,
|
||||
userdata = #connect_state{mod = Mod}, child = ChildState} = Dclient) ->
|
||||
case Mod:handle_call(Request, From, ChildState) of
|
||||
{reply, Reply, NewChildState} ->
|
||||
{reply, Reply, State#state{child = NewChildState}, hibernate};
|
||||
{reply, Reply, Dclient#dclient{child = NewChildState}, hibernate};
|
||||
{stop, Reason, NewChildState} ->
|
||||
{stop, Reason, State#state{child = NewChildState}}
|
||||
dgiot_client:stop(ChannelId, ClientId),
|
||||
{reply, Reason, Dclient#dclient{child = NewChildState}}
|
||||
end.
|
||||
|
||||
handle_cast(Msg, #state{mod = Mod, child = ChildState} = State) ->
|
||||
handle_cast(Msg, #dclient{channel = ChannelId, client = ClientId,
|
||||
userdata = #connect_state{mod = Mod}, child = ChildState} = Dclient) ->
|
||||
case Mod:handle_cast(Msg, ChildState) of
|
||||
{noreply, NewChildState} ->
|
||||
{noreply, State#state{child = NewChildState}, hibernate};
|
||||
{noreply, Dclient#dclient{child = NewChildState}, hibernate};
|
||||
{stop, Reason, NewChildState} ->
|
||||
{stop, Reason, State#state{child = NewChildState}}
|
||||
dgiot_client:stop(ChannelId, ClientId),
|
||||
{reply, Reason, Dclient#dclient{child = NewChildState}}
|
||||
end.
|
||||
|
||||
%% 连接次数为0了
|
||||
handle_info(do_connect, State) ->
|
||||
?LOG(info, "CONNECT CLOSE ~s:~p", [State#state.host, State#state.port]),
|
||||
{stop, normal, State};
|
||||
handle_info(do_connect, #dclient{channel = ChannelId, client = ClientId, clock = #dclock{count = Count}} = State) when Count =< 0 ->
|
||||
dgiot_client:stop(ChannelId, ClientId),
|
||||
{noreply, State, hibernate};
|
||||
handle_info(do_connect, #dclient{clock = #dclock{count = Count, round = Round, freq = Freq} = Clock} = State) ->
|
||||
timer:sleep(Freq * 1000),
|
||||
NewState = State#dclient{clock = Clock#dclock{count = Count - 1, round = Round + 1}},
|
||||
do_connect(NewState),
|
||||
io:format("~s ~p ~p ~n", [?FILE, ?LINE, Count]),
|
||||
{noreply, NewState, hibernate};
|
||||
|
||||
%% 连接次数为0了
|
||||
handle_info(connect_stop, State) ->
|
||||
?LOG(info, "CONNECT CLOSE ~s:~p", [State#state.host, State#state.port]),
|
||||
{stop, normal, State};
|
||||
|
||||
handle_info({connection_ready, Socket}, #state{mod = Mod, child = ChildState} = State) ->
|
||||
NewChildState = ChildState#tcp{socket = Socket},
|
||||
?LOG(info, "connection_ready ~p~n", [Socket]),
|
||||
case Mod:handle_info(connection_ready, NewChildState) of
|
||||
handle_info({connection_ready, Socket}, #dclient{userdata = #connect_state{mod = Mod}, child = ChildState} = State) ->
|
||||
case Mod:handle_info(connection_ready, ChildState) of
|
||||
{noreply, NewChildState1} ->
|
||||
inet:setopts(Socket, [{active, once}]),
|
||||
{noreply, State#state{child = NewChildState1}, hibernate};
|
||||
{noreply, State#dclient{child = NewChildState1}, hibernate};
|
||||
{stop, Reason, NewChildState1} ->
|
||||
{stop, Reason, State#state{child = NewChildState1}}
|
||||
{stop, Reason, State#dclient{child = NewChildState1}}
|
||||
end;
|
||||
|
||||
handle_info({tcp, Socket, Binary}, State) ->
|
||||
%% ?LOG(info,"Binary ~p~n", [Binary]),
|
||||
#state{mod = Mod, child = #tcp{socket = Socket} = ChildState} = State,
|
||||
%% 往tcp server 发送报文
|
||||
handle_info({send, _PayLoad}, #dclient{userdata = #connect_state{socket = undefined}} = State) ->
|
||||
{noreply, State, hibernate};
|
||||
handle_info({send, PayLoad}, #dclient{userdata = #connect_state{socket = Socket}} = State) ->
|
||||
gen_tcp:send(Socket, PayLoad),
|
||||
{noreply, State, hibernate};
|
||||
|
||||
%% 接收tcp server发送过来的报文
|
||||
handle_info({tcp, Socket, Binary}, #dclient{channel = ChannelId, client = ClientId,
|
||||
userdata = #connect_state{mod = Mod, socket = Socket}, child = ChildState} = State) ->
|
||||
NewBin =
|
||||
case binary:referenced_byte_size(Binary) of
|
||||
Large when Large > 2 * byte_size(Binary) ->
|
||||
@ -112,108 +118,75 @@ handle_info({tcp, Socket, Binary}, State) ->
|
||||
end,
|
||||
case Mod:handle_info({tcp, NewBin}, ChildState) of
|
||||
{noreply, NewChildState} ->
|
||||
inet:setopts(ChildState#tcp.socket, [{active, once}]),
|
||||
{noreply, State#state{child = NewChildState}, hibernate};
|
||||
inet:setopts(Socket, [{active, once}]),
|
||||
{noreply, State#dclient{child = NewChildState}, hibernate};
|
||||
{stop, Reason, NewChildState} ->
|
||||
{stop, Reason, State#state{child = NewChildState}}
|
||||
dgiot_client:stop(ChannelId, ClientId),
|
||||
{noreply, Reason, State#dclient{child = NewChildState}, hibernate}
|
||||
end;
|
||||
|
||||
handle_info({tcp_error, _Socket, _Reason}, #state{child = _ChildState} = State) ->
|
||||
handle_info({tcp_error, _Socket, _Reason}, State) ->
|
||||
{noreply, State, hibernate};
|
||||
|
||||
handle_info({Closed, _Sock}, #state{mod = Mod, child = #tcp{transport = Transport, socket = Socket} = ChildState} = State) when Closed == tcp_closed ->
|
||||
Transport:close(Socket),
|
||||
case Mod:handle_info(Closed, ChildState) of
|
||||
handle_info({tcp_closed, _Sock}, #dclient{channel = ChannelId, client = ClientId,
|
||||
userdata = #connect_state{mod = Mod, socket = Socket}, child = ChildState} = State) ->
|
||||
gen_tcp:close(Socket),
|
||||
case Mod:handle_info(tcp_closed, ChildState) of
|
||||
{noreply, NewChildState} ->
|
||||
NewState = State#state{child = NewChildState#tcp{socket = undefined}},
|
||||
case is_integer(NewState#state.reconnect_sleep) of
|
||||
false ->
|
||||
{stop, normal, NewState};
|
||||
true ->
|
||||
Now = erlang:system_time(second),
|
||||
Sleep =
|
||||
case get(last_closed) of
|
||||
Time when is_integer(Time) andalso Now - Time < State#state.reconnect_sleep ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end,
|
||||
put(last_closed, Now),
|
||||
{noreply, do_connect(Sleep, NewState), hibernate}
|
||||
end;
|
||||
{stop, Reason, NewChildState} ->
|
||||
{stop, Reason, State#state{child = NewChildState#tcp{socket = undefined}}}
|
||||
NewState = State#dclient{child = NewChildState},
|
||||
self() ! do_connect,
|
||||
{noreply, NewState, hibernate};
|
||||
{stop, _Reason, NewChildState} ->
|
||||
dgiot_client:stop(ChannelId, ClientId),
|
||||
{noreply, State#dclient{child = NewChildState}, hibernate}
|
||||
end;
|
||||
|
||||
handle_info(Info, #state{mod = Mod, child = ChildState} = State) ->
|
||||
handle_info(Info, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod,socket = Socket}, child = ChildState} = State) ->
|
||||
case Mod:handle_info(Info, ChildState) of
|
||||
{noreply, NewChildState} ->
|
||||
{noreply, State#state{child = NewChildState}, hibernate};
|
||||
{stop, Reason, NewChildState} ->
|
||||
{stop, Reason, State#state{child = NewChildState}}
|
||||
{noreply, State#dclient{child = NewChildState}, hibernate};
|
||||
{stop, _Reason, NewChildState} ->
|
||||
gen_tcp:close(Socket),
|
||||
dgiot_client:stop(ChannelId, ClientId),
|
||||
{noreply, State#dclient{child = NewChildState}, hibernate}
|
||||
end.
|
||||
|
||||
terminate(Reason, #state{mod = Mod, child = ChildState}) ->
|
||||
terminate(Reason, #dclient{userdata = #connect_state{mod = Mod}, child = ChildState}) ->
|
||||
Mod:terminate(Reason, ChildState).
|
||||
|
||||
code_change(OldVsn, #state{mod = Mod, child = ChildState} = State, Extra) ->
|
||||
code_change(OldVsn, #dclient{userdata = #connect_state{mod = Mod}, child = ChildState} = State, Extra) ->
|
||||
{ok, NewChildState} = Mod:code_change(OldVsn, ChildState, Extra),
|
||||
{ok, State#state{child = NewChildState}}.
|
||||
{ok, State#dclient{child = NewChildState}}.
|
||||
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
send(#tcp{transport = Transport, socket = Socket, log = _Log}, Payload) ->
|
||||
case Socket == undefined of
|
||||
true ->
|
||||
{error, disconnected};
|
||||
false ->
|
||||
timer:sleep(1),
|
||||
Transport:send(Socket, Payload)
|
||||
send(Payload) ->
|
||||
self() ! {send, Payload}.
|
||||
|
||||
send(ChannelId, ClientId, Payload) ->
|
||||
case dgiot_client:get(ChannelId, ClientId) of
|
||||
{ok, Pid} ->
|
||||
Pid ! {send, Payload};
|
||||
_ ->
|
||||
pass
|
||||
end.
|
||||
|
||||
do_connect(Sleep, #state{child = TCPState} = State) ->
|
||||
Client = self(),
|
||||
NewState = State#state{
|
||||
child = TCPState#tcp{
|
||||
socket = undefined
|
||||
}
|
||||
},
|
||||
spawn(
|
||||
fun() ->
|
||||
Sleep andalso timer:sleep(State#state.reconnect_sleep * 1000),
|
||||
connect(Client, NewState)
|
||||
end),
|
||||
NewState.
|
||||
|
||||
connect(Client, #state{host = Host, port = Port, reconnect_times = Times, reconnect_sleep = Sleep} = State) ->
|
||||
case is_process_alive(Client) of
|
||||
true ->
|
||||
case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, ?TIMEOUT) of
|
||||
{ok, Socket} ->
|
||||
case catch gen_server:call(Client, {connection_ready, Socket}, 5000) of
|
||||
ok ->
|
||||
inet:setopts(Socket, [{active, once}]),
|
||||
gen_tcp:controlling_process(Socket, Client);
|
||||
_ ->
|
||||
ok
|
||||
end;
|
||||
{error, Reason} ->
|
||||
case is_integer(Times) of
|
||||
true when Times - 1 > 0 ->
|
||||
Client ! {connection_error, Reason},
|
||||
timer:sleep(Sleep * 1000),
|
||||
connect(Client, State#state{reconnect_times = Times - 1});
|
||||
false when is_atom(Times) ->
|
||||
Client ! {connection_error, Reason},
|
||||
timer:sleep(Sleep * 1000),
|
||||
connect(Client, State);
|
||||
_ ->
|
||||
Client ! connect_stop
|
||||
end
|
||||
do_connect(#dclient{userdata = #connect_state{host = Host, port = Port}}) ->
|
||||
case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, ?TIMEOUT) of
|
||||
{ok, Socket} ->
|
||||
case catch gen_server:call(self(), {connection_ready, Socket}, 5000) of
|
||||
ok ->
|
||||
inet:setopts(Socket, [{active, once}]),
|
||||
gen_tcp:controlling_process(Socket, self());
|
||||
_ ->
|
||||
ok
|
||||
end;
|
||||
false ->
|
||||
ok
|
||||
end.
|
||||
{error, _Reason} ->
|
||||
self() ! do_connect
|
||||
end;
|
||||
|
||||
do_connect(_) ->
|
||||
self() ! do_connect.
|
||||
|
||||
|
@ -17,7 +17,6 @@
|
||||
-behavior(dgiot_channelx).
|
||||
-include("dgiot_bridge.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-include_lib("dgiot/include/dgiot_socket.hrl").
|
||||
-define(TYPE, <<"TCPC">>).
|
||||
-record(state, {id, env}).
|
||||
%% API
|
||||
@ -28,10 +27,6 @@
|
||||
%% Channel callback
|
||||
-export([init/3, handle_init/1, handle_event/3, handle_message/2, stop/3]).
|
||||
|
||||
%% tcp client callback
|
||||
-define(MAX_BUFF_SIZE, 10 * 1024).
|
||||
-export([init/1, handle_info/2, terminate/2]).
|
||||
|
||||
-channel(?TYPE).
|
||||
-channel_type(#{
|
||||
cType => ?TYPE,
|
||||
@ -88,12 +83,12 @@
|
||||
start(ChannelId, ChannelArgs) ->
|
||||
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
|
||||
|
||||
|
||||
%% 通道初始化
|
||||
init(?TYPE, ChannelId,
|
||||
#{<<"ip">> := Ip, <<"port">> := Port} = Args) ->
|
||||
State = #state{id = ChannelId, env = Args},
|
||||
NewArgs = #{<<"ip">> => Ip, <<"port">> => Port, <<"mod">> => ?MODULE, <<"child">> => #{}},
|
||||
dgiot_client:add_clock(ChannelId, dgiot_datetime:now_secs() + 5, dgiot_datetime:now_secs() + 30),
|
||||
NewArgs = #{ <<"channel">> => ChannelId, <<"ip">> => Ip, <<"port">> => Port, <<"mod">> => dgiot_tcpc_worker, <<"count">> => 3, <<"freq">> => 10},
|
||||
{ok, State, dgiot_client:register(ChannelId, tcp_client_sup, NewArgs)}.
|
||||
|
||||
handle_init(State) ->
|
||||
@ -104,104 +99,20 @@ handle_event(_EventId, Event, State) ->
|
||||
?LOG(info, "channel ~p", [Event]),
|
||||
{ok, State}.
|
||||
|
||||
handle_message(start_client, #state{id = ChannelId} = State) ->
|
||||
io:format("~s ~p ChannelId = ~p.~n", [?FILE, ?LINE, ChannelId]),
|
||||
case dgiot_data:get({start_client, ChannelId}) of
|
||||
not_find ->
|
||||
[dgiot_client:start(ChannelId, dgiot_utils:to_binary(I)) || I <- lists:seq(1, 10)];
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
|
||||
{ok, State};
|
||||
|
||||
handle_message(_Message, State) ->
|
||||
{ok, State}.
|
||||
|
||||
stop(ChannelType, ChannelId, _State) ->
|
||||
?LOG(warning, "channel stop ~p,~p", [ChannelType, ChannelId]),
|
||||
ok.
|
||||
|
||||
|
||||
%% tcp client callback
|
||||
init(TCPState) ->
|
||||
{ok, TCPState}.
|
||||
|
||||
handle_info(connection_ready, #tcp{state = #{productid := ProductId} = _State} = TCPState) ->
|
||||
rand:seed(exs1024),
|
||||
Time = erlang:round(rand:uniform() * 1 + 1) * 1000,
|
||||
dgiot_tcp_client:send(TCPState, <<"login">>),
|
||||
case do_cmd(ProductId, connection_ready, <<>>, TCPState) of
|
||||
default ->
|
||||
erlang:send_after(Time, self(), login);
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
{noreply, TCPState};
|
||||
|
||||
handle_info(#{<<"cmd">> := Cmd, <<"data">> := Data, <<"productId">> := ProductId}, TCPState) ->
|
||||
case do_cmd(ProductId, Cmd, Data, TCPState) of
|
||||
default ->
|
||||
{noreply, TCPState};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
|
||||
handle_info(tcp_closed, #tcp{state = #{productid := ProductId} = _State} = TCPState) ->
|
||||
case do_cmd(ProductId, tcp_closed, <<>>, TCPState) of
|
||||
default ->
|
||||
{noreply, TCPState};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
|
||||
handle_info({tcp, Buff}, #tcp{buff = Old, state = #{productid := ProductId} = _State} = TCPState) ->
|
||||
Data = <<Old/binary, Buff/binary>>,
|
||||
case do_cmd(ProductId, tcp, Data, TCPState) of
|
||||
default ->
|
||||
{noreply, TCPState};
|
||||
{noreply, Bin, State} ->
|
||||
{noreply, TCPState#tcp{buff = Bin, state = State}};
|
||||
{stop, Reason, State} ->
|
||||
{stop, Reason, TCPState#tcp{state = State}};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
|
||||
handle_info({deliver, _Topic, Msg}, #tcp{state = State} = TCPState) ->
|
||||
Payload = dgiot_mqtt:get_payload(Msg),
|
||||
?LOG(info, "Client recv from mqtt Payload ~p ~n ~p~n", [Payload, State]),
|
||||
%% Message =
|
||||
%% case jsx:is_json(Payload) of
|
||||
%% true ->
|
||||
%% jsx:decode(Payload, [{labels, binary}, return_maps]);
|
||||
%% false ->
|
||||
%% binary_to_term(Payload)
|
||||
%% end,
|
||||
{noreply, TCPState};
|
||||
|
||||
|
||||
handle_info(login, #tcp{state = #{productid := ProductId, devaddr := DevAddr, hb := Hb} = _State} = TCPState) ->
|
||||
Topic = <<"mock/", ProductId/binary, "/", DevAddr/binary>>,
|
||||
dgiot_mqtt:subscribe(Topic),
|
||||
erlang:send_after(Hb * 1000, self(), heartbeat),
|
||||
?LOG(info, "~p ", [<<"login">>]),
|
||||
dgiot_tcp_client:send(TCPState, <<"login">>),
|
||||
{noreply, TCPState};
|
||||
|
||||
handle_info(heartbeat, #tcp{state = #{devaddr := _DevAddr, hb := Hb} = _State} = TCPState) ->
|
||||
erlang:send_after(Hb * 1000, self(), heartbeat),
|
||||
%% ?LOG(info,"~p ",[<<"heartbeat">>]),
|
||||
dgiot_tcp_client:send(TCPState, <<"heartbeat">>),
|
||||
{noreply, TCPState};
|
||||
|
||||
handle_info(_Info, TCPState) ->
|
||||
{noreply, TCPState}.
|
||||
|
||||
terminate(_Reason, _TCPState) ->
|
||||
ok.
|
||||
|
||||
do_cmd(ProductId, Cmd, Data, #tcp{state = #{id := ChannelId} = State} = TCPState) ->
|
||||
case dgiot_hook:run_hook({tcp, ProductId}, [Cmd, Data, State]) of
|
||||
{ok, NewState} ->
|
||||
{noreply, TCPState#tcp{state = NewState}};
|
||||
{reply, ProductId, Payload, NewState} ->
|
||||
case dgiot_tcp_server:send(TCPState#tcp{state = NewState}, Payload) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
dgiot_bridge:send_log(ChannelId, ProductId, "Send Fail, ~p, CMD:~p", [Cmd, Reason])
|
||||
end,
|
||||
{noreply, TCPState#tcp{state = NewState}};
|
||||
_ ->
|
||||
default
|
||||
end.
|
||||
ok.
|
104
apps/dgiot_bridge/src/channel/dgiot_tcpc_worker.erl
Normal file
104
apps/dgiot_bridge/src/channel/dgiot_tcpc_worker.erl
Normal file
@ -0,0 +1,104 @@
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(dgiot_tcpc_worker).
|
||||
-include("dgiot_bridge.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-include_lib("dgiot/include/dgiot_socket.hrl").
|
||||
-record(child_state, {buff = <<>>, product, devaddr, hb = 30 }).
|
||||
|
||||
|
||||
%% tcp client callback
|
||||
-define(MAX_BUFF_SIZE, 10 * 1024).
|
||||
-export([init/1, handle_info/2, terminate/2]).
|
||||
|
||||
%% tcp client callback
|
||||
init(_Dclient) ->
|
||||
{ok, #child_state{}}.
|
||||
|
||||
handle_info(connection_ready, #child_state{product = ProductId} = ChildState) ->
|
||||
rand:seed(exs1024),
|
||||
Time = erlang:round(rand:uniform() * 1 + 1) * 1000,
|
||||
case do_cmd(ProductId, connection_ready, <<>>, ChildState) of
|
||||
default ->
|
||||
erlang:send_after(Time, self(), login);
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
{noreply, ChildState};
|
||||
|
||||
handle_info(#{<<"cmd">> := Cmd, <<"data">> := Data, <<"productId">> := ProductId}, ChildState) ->
|
||||
case do_cmd(ProductId, Cmd, Data, ChildState) of
|
||||
default ->
|
||||
{noreply, ChildState};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
|
||||
handle_info(tcp_closed, #child_state{product = ProductId} = ChildState) ->
|
||||
case do_cmd(ProductId, tcp_closed, <<>>, ChildState) of
|
||||
default ->
|
||||
{noreply, ChildState};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
|
||||
handle_info({tcp, Buff}, #child_state{buff = Old, product = ProductId} = ChildState) ->
|
||||
Data = <<Old/binary, Buff/binary>>,
|
||||
case do_cmd(ProductId, tcp, Data, ChildState) of
|
||||
default ->
|
||||
{noreply, ChildState};
|
||||
{noreply, Bin, NewChildState} ->
|
||||
{noreply, NewChildState#child_state{buff = Bin}};
|
||||
{stop, Reason, NewChildState} ->
|
||||
{stop, Reason, NewChildState};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
|
||||
handle_info({deliver, _Topic, _Msg}, ChildState) ->
|
||||
{noreply, ChildState};
|
||||
|
||||
handle_info(login, #child_state{hb = Hb} = ChildState) ->
|
||||
erlang:send_after(Hb * 1000, self(), heartbeat),
|
||||
dgiot_tcp_client:send(<<"login">>),
|
||||
{noreply, ChildState};
|
||||
|
||||
handle_info(heartbeat, #child_state{hb = Hb} = TCPState) ->
|
||||
erlang:send_after(Hb * 1000, self(), heartbeat),
|
||||
dgiot_tcp_client:send(<<"heartbeat">>),
|
||||
{noreply, TCPState};
|
||||
|
||||
handle_info(_Info, TCPState) ->
|
||||
{noreply, TCPState}.
|
||||
|
||||
terminate(_Reason, _TCPState) ->
|
||||
ok.
|
||||
|
||||
do_cmd(ProductId, Cmd, Data, ChildState) ->
|
||||
case dgiot_hook:run_hook({tcp, ProductId}, [Cmd, Data, ChildState]) of
|
||||
{ok, NewChildState} ->
|
||||
{noreply, NewChildState};
|
||||
{reply, ProductId, Payload, NewChildState} ->
|
||||
case dgiot_tcp_client:send(Payload) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, _Reason} ->
|
||||
pass
|
||||
end,
|
||||
{noreply, NewChildState};
|
||||
_ ->
|
||||
default
|
||||
end.
|
@ -139,57 +139,6 @@ stop(ChannelType, ChannelId, _State) ->
|
||||
io:format("~s ~p channel stop ~p~n ~p~n", [?FILE, ?LINE, ChannelType, ChannelId]),
|
||||
ok.
|
||||
|
||||
%%start_client(ProductId, Ip, Port, #{<<"parse">> := Parse}) ->
|
||||
%% #{
|
||||
%% <<"parse_table">> := ParseTable,
|
||||
%% <<"devaddr">> := DevAddrKey,
|
||||
%% <<"page_index">> := PageIndex,
|
||||
%% <<"page_size">> := PageSize,
|
||||
%% <<"total">> := Total
|
||||
%% } = Parse,
|
||||
%% Success = fun(Page) ->
|
||||
%% lists:map(fun(#{<<"devaddr">> := DevAddr}) ->
|
||||
%% dgiot_modbusc_tcp:start_connect(#{
|
||||
%% <<"auto_reconnect">> => 10,
|
||||
%% <<"reconnect_times">> => 3,
|
||||
%% <<"ip">> => Ip,
|
||||
%% <<"port">> => Port,
|
||||
%% <<"productid">> => ProductId,
|
||||
%% <<"hb">> => 60,
|
||||
%% <<"devaddr">> => DevAddr
|
||||
%% })
|
||||
%% end, Page)
|
||||
%% end,
|
||||
%% Query = #{<<"keys">> => [DevAddrKey], <<"order">> => DevAddrKey},
|
||||
%% dgiot_parse_loader:start(ParseTable, Query, PageIndex, PageSize, Total, Success).
|
||||
%%
|
||||
%%
|
||||
%%start_timer(Time, Fun) ->
|
||||
%% spawn(fun() ->
|
||||
%% timer(Time, Fun)
|
||||
%% end).
|
||||
%%
|
||||
%%timer(Time, Fun) ->
|
||||
%% receive
|
||||
%% cancel ->
|
||||
%% void
|
||||
%% after Time ->
|
||||
%% Fun()
|
||||
%% end.
|
||||
|
||||
|
||||
%%get_app(Products) ->
|
||||
%% lists:map(fun({ProdcutId, #{<<"ACL">> := Acl}}) ->
|
||||
%% Predicate = fun(E) ->
|
||||
%% case E of
|
||||
%% <<"role:", _/binary>> -> true;
|
||||
%% _ -> false
|
||||
%% end
|
||||
%% end,
|
||||
%% [<<"role:", App/binary>> | _] = lists:filter(Predicate, maps:keys(Acl)),
|
||||
%% {ProdcutId, App}
|
||||
%% end, Products).
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -47,8 +47,10 @@ init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"starttime">> :=
|
||||
erlang:send_after(20, self(), init),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task">>, 1),
|
||||
NextTime = dgiot_client:get_nexttime(StartTime, Freq),
|
||||
io:format("~s ~p DtuId = ~p.~n", [?FILE, ?LINE, NextTime]),
|
||||
Dclient = #dclient{channel = ChannelId, client = ClientId, status = ?DCLIENT_INTIALIZED, clock = #dclock{nexttime = NextTime, endtime = EndTime, freq = Freq, round = 0}},
|
||||
Count = dgiot_client:get_count(StartTime, EndTime, Freq),
|
||||
io:format("~s ~p ChannelId ~p ClientId ~p NextTime = ~p Freq ~p Count = ~p.~n", [?FILE, ?LINE, ChannelId, ClientId, NextTime, Freq, Count]),
|
||||
Dclient = #dclient{channel = ChannelId, client = ClientId, status = ?DCLIENT_INTIALIZED,
|
||||
clock = #dclock{nexttime = NextTime, freq = Freq, count = Count, round = 0}},
|
||||
{ok, Dclient};
|
||||
|
||||
init(A) ->
|
||||
@ -74,24 +76,24 @@ handle_info(stop, State) ->
|
||||
|
||||
%% 动态修改任务启动时间和周期
|
||||
handle_info({change_clock, NextTime, EndTime, Freq}, #dclient{clock = Clock} = Dclient) ->
|
||||
{noreply, Dclient#dclient{clock = Clock#dclock{nexttime = NextTime, endtime = EndTime, freq = Freq}}};
|
||||
{noreply, Dclient#dclient{clock = Clock#dclock{nexttime = NextTime, count = dgiot_client:get_count(NextTime, EndTime, Freq), freq = Freq}}};
|
||||
|
||||
%% 定时触发网关及网关任务, 在单个任务轮次中,要将任务在全局上做一下错峰操作
|
||||
handle_info(next_time, #dclient{ channel = Channel, client = Client, userdata = UserData,
|
||||
clock = #dclock{round = Round, nexttime = NextTime, endtime = EndTime, freq = Freq, rand = Rand} = Clock} = Dclient) ->
|
||||
clock = #dclock{round = Round, nexttime = NextTime, count = Count, freq = Freq, rand = Rand} = Clock} = Dclient) ->
|
||||
io:format("~s ~p DtuId = ~p.~n", [?FILE, ?LINE, Client]),
|
||||
dgiot_client:stop(Channel, Client, EndTime), %% 检查是否需要停止任务
|
||||
dgiot_client:stop(Channel, Client, Count), %% 检查是否需要停止任务
|
||||
NewNextTime = dgiot_client:get_nexttime(NextTime, Freq),
|
||||
case dgiot_task:get_pnque(Client) of
|
||||
not_find ->
|
||||
{noreply, Dclient#dclient{clock = Clock#dclock{ nexttime = NewNextTime}}};
|
||||
{noreply, Dclient#dclient{clock = Clock#dclock{ nexttime = NewNextTime, count = Count - 1}}};
|
||||
{ProductId, DevAddr} ->
|
||||
NewRound = Round + 1,
|
||||
PnQueLen = dgiot_task:get_pnque_len(Client),
|
||||
DiQue = dgiot_task:get_instruct(ProductId, NewRound),
|
||||
dgiot_client:send_after(10, Freq, Rand, read), % 每轮任务开始时,做一下随机开始
|
||||
{noreply, Dclient#dclient{userdata = UserData#device_task{product = ProductId, devaddr = DevAddr, pnque_len = PnQueLen, dique = DiQue},
|
||||
clock = Clock#dclock{nexttime = NewNextTime, round = NewRound}}}
|
||||
clock = Clock#dclock{nexttime = NewNextTime, count = Count - 1, round = NewRound}}}
|
||||
end;
|
||||
|
||||
%% 开始采集下一个子设备的指令集
|
||||
|
@ -66,7 +66,7 @@ do_request(Url, Authorization, Sql) ->
|
||||
Index = dgiot_utils:update_counter({tdengine, index}),
|
||||
Profile = list_to_atom(lists:concat([http, Index])),
|
||||
timer:sleep(1),
|
||||
case catch httpc:request(post, Request, [{timeout, 60000}, {connect_timeout, 60000}], [{body_format, binary}], Profile) of
|
||||
case catch httpc:request(post, Request, ?HTTPOption, ?REQUESTOption, Profile) of
|
||||
{ok, {{_HTTPVersion, StatusCode, _ReasonPhrase}, _Headers, Body}} ->
|
||||
format_body(StatusCode, Body, Formatter);
|
||||
{error, {failed_connect, _}} ->
|
||||
|
Loading…
Reference in New Issue
Block a user