mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-02 04:08:54 +08:00
fix: rm dgiot_httpd_channel
This commit is contained in:
parent
ba29b9e62d
commit
c6b4b4912f
@ -171,18 +171,24 @@ start_link([Mod, State]) ->
|
||||
%%%===================================================================
|
||||
|
||||
init([Mod, ChildState]) ->
|
||||
case erlang:function_exported(Mod, handle_init, 1) of
|
||||
case erlang:module_loaded(Mod) of
|
||||
true ->
|
||||
case Mod:handle_init(ChildState) of
|
||||
{ok, NewChildState} ->
|
||||
{ok, #state{childState = NewChildState, mod = Mod}};
|
||||
{stop, Reason} ->
|
||||
{stop, Reason}
|
||||
case erlang:function_exported(Mod, handle_init, 1) of
|
||||
true ->
|
||||
case Mod:handle_init(ChildState) of
|
||||
{ok, NewChildState} ->
|
||||
{ok, #state{childState = NewChildState, mod = Mod}};
|
||||
{stop, Reason} ->
|
||||
{stop, Reason}
|
||||
end;
|
||||
false ->
|
||||
{ok, #state{childState = ChildState, mod = Mod}}
|
||||
end;
|
||||
false ->
|
||||
{ok, #state{childState = ChildState, mod = Mod}}
|
||||
{stop, <<"module not exist">>}
|
||||
end.
|
||||
|
||||
|
||||
handle_call(Request, _From, State) ->
|
||||
case catch handle_message(Request, State) of
|
||||
ok ->
|
||||
|
@ -67,7 +67,9 @@ start_plugin(Sup) ->
|
||||
lists:map(fun(X) ->
|
||||
{_Order, Mod} = X,
|
||||
case erlang:function_exported(Mod, start, 1) of
|
||||
true -> Mod:start(Sup);
|
||||
false -> pass
|
||||
true ->
|
||||
Mod:start(Sup);
|
||||
false ->
|
||||
pass
|
||||
end
|
||||
end, lists:ukeysort(1, NewAcc)).
|
||||
|
@ -53,11 +53,12 @@ start_link(Name, Mod, Host, Port, ReconnectTimes, ReconnectSleep, Args) ->
|
||||
undefined ->
|
||||
gen_server:start_link(?MODULE, [State, Args], []);
|
||||
_ ->
|
||||
gen_server:start_link({local, Name}, ?MODULE, [State, Args], [])
|
||||
gen_server:start_link({local, list_to_atom(dgiot_utils:to_list(Name))}, ?MODULE, [State, Args], [])
|
||||
end.
|
||||
|
||||
|
||||
init([#state{mod = Mod} = State, Args]) ->
|
||||
?LOG(info,"Args ~p ",[Args]),
|
||||
Transport = gen_tcp,
|
||||
Child = #tcp{transport = Transport, socket = undefined},
|
||||
case Mod:init(Child#tcp{state = Args}) of
|
||||
@ -206,10 +207,9 @@ do_connect(Sleep, #state{child = TCPState} = State) ->
|
||||
connect(Client, #state{host = Host, port = Port, reconnect_times = Times, reconnect_sleep = Sleep} = State) ->
|
||||
case is_process_alive(Client) of
|
||||
true ->
|
||||
?LOG(info,"CONNECT ~s: ~p ~p", [Host, Port, Times]),
|
||||
case gen_tcp:connect(Host, Port, ?TCP_OPTIONS, ?TIMEOUT) of
|
||||
{ok, Socket} ->
|
||||
case catch gen_server:call(Client, {connection_ready, Socket}, 50000) of
|
||||
case catch gen_server:call(Client, {connection_ready, Socket}, 5000) of
|
||||
ok ->
|
||||
inet:setopts(Socket, [{active, once}]),
|
||||
gen_tcp:controlling_process(Socket, Client);
|
||||
|
@ -398,13 +398,18 @@ do_response(Status, Headers, Body, Req0, State) when is_binary(Body) ->
|
||||
{stop, Req, State}.
|
||||
|
||||
call(Mod, Fun, Args) ->
|
||||
case erlang:function_exported(Mod, Fun, length(Args)) of
|
||||
case erlang:module_loaded(Mod) of
|
||||
true ->
|
||||
{Time, Result} = timer:tc(Mod, Fun, Args),
|
||||
get_log(Fun, Args, Time, Result),
|
||||
Result;
|
||||
case erlang:function_exported(Mod, Fun, length(Args)) of
|
||||
true ->
|
||||
{Time, Result} = timer:tc(Mod, Fun, Args),
|
||||
get_log(Fun, Args, Time, Result),
|
||||
Result;
|
||||
false ->
|
||||
no_call
|
||||
end;
|
||||
false ->
|
||||
no_call
|
||||
no_module
|
||||
end.
|
||||
|
||||
init_ets() ->
|
||||
|
@ -85,7 +85,6 @@
|
||||
start(ChannelId, ChannelArgs) ->
|
||||
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
|
||||
|
||||
|
||||
%% 通道初始化
|
||||
init(?TYPE, ChannelId, #{<<"port">> := Port} = ChannelArgs) ->
|
||||
State = #state{
|
||||
@ -133,14 +132,12 @@ stop(ChannelType, ChannelId, _State) ->
|
||||
?LOG(info,"channel stop ~p,~p", [ChannelType, ChannelId]),
|
||||
ok.
|
||||
|
||||
|
||||
%% ====== http callback ======
|
||||
|
||||
init(Req, #state{ id = ChannelId, env = Env} = State) ->
|
||||
{ok, _Type, ProductIds} = dgiot_bridge:get_products(ChannelId),
|
||||
case dgiot_bridge:apply_channel(ChannelId, ProductIds, handle_info, [{http, Req}], Env) of
|
||||
{ok, NewEnv} ->
|
||||
Req1 = cowboy_req:reply(200, #{}, <<>>, Req),
|
||||
Req1 = cowboy_req:reply(200, #{}, <<"code not exist">>, Req),
|
||||
{ok, Req1, State#state{env = NewEnv}};
|
||||
{reply, _ProductId, {HTTPCode, Reply}, NewEnv} ->
|
||||
Req1 = cowboy_req:reply(HTTPCode, #{}, Reply, Req),
|
||||
|
@ -1,157 +0,0 @@
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(dgiot_httpd_channel).
|
||||
-behavior(dgiot_channelx).
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-define(TYPE, <<"HTTPD">>).
|
||||
-include("dgiot_bridge.hrl").
|
||||
-author("kenneth").
|
||||
-record(state, {id, env}).
|
||||
%% API
|
||||
-export([start/2]).
|
||||
-export([init/3, handle_event/3, handle_message/2, stop/3]).
|
||||
|
||||
-export([init/2]).
|
||||
|
||||
|
||||
%% 注册通道类型
|
||||
-channel_type(#{
|
||||
cType => ?TYPE,
|
||||
type => 1,
|
||||
title => #{
|
||||
zh => <<"HTTPD采集通道"/utf8>>
|
||||
},
|
||||
description => #{
|
||||
zh => <<"HTTPD采集通道"/utf8>>
|
||||
}
|
||||
}).
|
||||
%% 注册通道参数
|
||||
-params(#{
|
||||
<<"port">> => #{
|
||||
order => 1,
|
||||
type => integer,
|
||||
required => true,
|
||||
default => 3080,
|
||||
title => #{
|
||||
zh => <<"端口"/utf8>>
|
||||
},
|
||||
description => #{
|
||||
zh => <<"侦听端口"/utf8>>
|
||||
}
|
||||
},
|
||||
<<"path">> => #{
|
||||
order => 2,
|
||||
type => string,
|
||||
required => true,
|
||||
default => <<"/test">>,
|
||||
title => #{
|
||||
zh => <<"路径"/utf8>>
|
||||
},
|
||||
description => #{
|
||||
zh => <<"路径"/utf8>>
|
||||
}
|
||||
},
|
||||
<<"ico">> => #{
|
||||
order => 102,
|
||||
type => string,
|
||||
required => false,
|
||||
default => <<"http://dgiot-1253666439.cos.ap-shanghai-fsi.myqcloud.com/shuwa_tech/zh/product/dgiot/channel/HTTPICO.png">>,
|
||||
title => #{
|
||||
en => <<"channel ICO">>,
|
||||
zh => <<"通道ICO"/utf8>>
|
||||
},
|
||||
description => #{
|
||||
en => <<"channel ICO">>,
|
||||
zh => <<"通道ICO"/utf8>>
|
||||
}
|
||||
}
|
||||
}).
|
||||
|
||||
start(ChannelId, ChannelArgs) ->
|
||||
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
|
||||
|
||||
|
||||
%% 通道初始化
|
||||
init(?TYPE, ChannelId, #{<<"port">> := Port} = ChannelArgs) ->
|
||||
State = #state{
|
||||
id = ChannelId,
|
||||
env = maps:without([<<"port">>,<<"path">>,<<"product">>,<<"behaviour">>], ChannelArgs)
|
||||
},
|
||||
Name = dgiot_channelx:get_name(?TYPE, ChannelId),
|
||||
Opts = [
|
||||
{ip, {0, 0, 0, 0}},
|
||||
{port, Port}
|
||||
],
|
||||
SSL = maps:with([<<"cacertfile">>, <<"certfile">>, <<"keyfile">>], ChannelArgs),
|
||||
{Transport, TransportOpts} =
|
||||
case maps:to_list(SSL) of
|
||||
[] ->
|
||||
{ranch_tcp, Opts};
|
||||
SslOpts = [_ | _] ->
|
||||
{ranch_ssl, Opts ++ SslOpts}
|
||||
end,
|
||||
Route = get_route(maps:get(<<"path">>, ChannelArgs, <<>>)),
|
||||
Dispatch = cowboy_router:compile([
|
||||
{'_', [
|
||||
{Route, ?MODULE, State}
|
||||
]}
|
||||
]),
|
||||
CowboyOpts = #{
|
||||
env => #{
|
||||
dispatch => Dispatch
|
||||
}
|
||||
},
|
||||
ChildSpec = ranch:child_spec(Name, 300, Transport, TransportOpts, cowboy_clear, CowboyOpts),
|
||||
{ok, State, ChildSpec}.
|
||||
|
||||
|
||||
%% 通道消息处理,注意:进程池调用
|
||||
handle_event(EventId, Event, _State) ->
|
||||
?LOG(info, "channel ~p, ~p", [EventId, Event]),
|
||||
ok.
|
||||
|
||||
handle_message(Message, State) ->
|
||||
?LOG(info, "channel ~p ~p", [Message]),
|
||||
{ok, State}.
|
||||
|
||||
stop(ChannelType, ChannelId, _) ->
|
||||
?LOG(info, "channel stop ~p,~p", [ChannelType, ChannelId]),
|
||||
ok.
|
||||
|
||||
|
||||
%% ====== http callback ======
|
||||
|
||||
init(Req, #state{id = ChannelId, env = Env} = State) ->
|
||||
{ok, _Type, ProductIds} = dgiot_bridge:get_products(ChannelId),
|
||||
case dgiot_bridge:apply_channel(ChannelId, ProductIds, handle_info, [{http, Req}], Env) of
|
||||
{ok, NewEnv} ->
|
||||
Req1 = cowboy_req:reply(200, #{}, <<>>, Req),
|
||||
{ok, Req1, State#state{env = NewEnv}};
|
||||
{reply, _ProductId, {HTTPCode, Reply}, NewEnv} ->
|
||||
Req1 = cowboy_req:reply(HTTPCode, #{}, Reply, Req),
|
||||
{ok, Req1, State#state{env = NewEnv}};
|
||||
{reply, _ProductId, {HTTPCode, Header, Reply}, NewEnv} ->
|
||||
Req1 = cowboy_req:reply(HTTPCode, Header, Reply, Req),
|
||||
{ok, Req1, State#state{env = NewEnv}}
|
||||
end.
|
||||
|
||||
get_route(<<"http://", Path>>) ->
|
||||
get_route(Path);
|
||||
get_route(Path) when is_binary(Path) ->
|
||||
binary_to_list(Path);
|
||||
get_route(_) ->
|
||||
"/[...]".
|
@ -26,7 +26,7 @@
|
||||
-record(state, {
|
||||
productid,
|
||||
devaddr,
|
||||
hb = 60
|
||||
hb = 10
|
||||
}).
|
||||
|
||||
start_connect(#{
|
||||
@ -43,7 +43,7 @@ start_connect(#{
|
||||
devaddr = DevAddr,
|
||||
hb = HB
|
||||
},
|
||||
dgiot_tcp_client:start_link(?MODULE, Ip, Port, Recon, ReTimes, State).
|
||||
dgiot_tcp_client:start_link(DevAddr, ?MODULE, Ip, Port, Recon, ReTimes, State).
|
||||
|
||||
init(TCPState) ->
|
||||
{ok, TCPState}.
|
||||
@ -51,6 +51,8 @@ init(TCPState) ->
|
||||
handle_info(connection_ready, TCPState) ->
|
||||
rand:seed(exs1024),
|
||||
Time = erlang:round(rand:uniform() * 1 + 1) * 1000,
|
||||
?LOG(info,"Time ~p ",[Time]),
|
||||
dgiot_tcp_client:send(TCPState, <<"login">>),
|
||||
erlang:send_after(Time, self(), login),
|
||||
{noreply, TCPState};
|
||||
|
||||
@ -61,11 +63,13 @@ handle_info(login, #tcp{state = #state{productid = ProductId, devaddr = DevAddr,
|
||||
Topic = <<"mock/", ProductId/binary, "/", DevAddr/binary>>,
|
||||
dgiot_mqtt:subscribe(Topic),
|
||||
erlang:send_after(Hb * 1000, self(), heartbeat),
|
||||
?LOG(info,"~p ",[<<"login">>]),
|
||||
dgiot_tcp_client:send(TCPState, <<"login">>),
|
||||
{noreply, TCPState};
|
||||
|
||||
handle_info(heartbeat, #tcp{state = #state{devaddr = _DevAddr, hb = Hb} = _State} = TCPState) ->
|
||||
erlang:send_after(Hb * 1000, self(), heartbeat),
|
||||
?LOG(info,"~p ",[<<"heartbeat">>]),
|
||||
dgiot_tcp_client:send(TCPState, <<"heartbeat">>),
|
||||
{noreply, TCPState};
|
||||
|
||||
|
@ -105,19 +105,23 @@ apply_channel(ChannelId, ProductId, Fun, Args, Env) ->
|
||||
|
||||
apply_product(ProductId, Fun, Args) ->
|
||||
Mod = binary_to_atom(ProductId, utf8),
|
||||
case erlang:function_exported(Mod, Fun, length(Args)) of
|
||||
case erlang:module_loaded(Mod) of
|
||||
true ->
|
||||
case catch apply(Mod, Fun, Args) of
|
||||
{'EXIT', Reason} ->
|
||||
{error, Reason};
|
||||
Result ->
|
||||
Result
|
||||
case erlang:function_exported(Mod, Fun, length(Args)) of
|
||||
true ->
|
||||
case catch apply(Mod, Fun, Args) of
|
||||
{'EXIT', Reason} ->
|
||||
{error, Reason};
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
false ->
|
||||
{error, function_not_exported}
|
||||
end;
|
||||
false ->
|
||||
{error, function_not_exported}
|
||||
end.
|
||||
|
||||
|
||||
get_product_info(ProductId) ->
|
||||
dgiot_product:local(ProductId).
|
||||
|
||||
|
@ -99,18 +99,23 @@ do_channel(_Type, CType, ChannelId, Products, Cfg) ->
|
||||
{error, not_find} ->
|
||||
{error, {CType, unknow}};
|
||||
{ok, Mod} ->
|
||||
case erlang:function_exported(Mod, start, 2) of
|
||||
case erlang:module_loaded(Mod) of
|
||||
true ->
|
||||
ProductIds = do_product(ChannelId, Products),
|
||||
dgiot_data:insert(?DGIOT_BRIDGE, {ChannelId, productIds}, {CType, ProductIds}),
|
||||
case Mod:start(ChannelId, Cfg#{<<"product">> => Products}) of
|
||||
{ok, _} ->
|
||||
dgiot_mqtt:subscribe(<<"channel/", ChannelId/binary, "/#">>),
|
||||
dgiot_bridge:send_log(ChannelId, "Channel ~s is Install Protocol ~s", [ChannelId, jsx:encode(ProductIds)]),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
dgiot_data:delete(?DGIOT_BRIDGE, {ChannelId, productIds}),
|
||||
{error, Reason}
|
||||
case erlang:function_exported(Mod, start, 2) of
|
||||
true ->
|
||||
ProductIds = do_product(ChannelId, Products),
|
||||
dgiot_data:insert(?DGIOT_BRIDGE, {ChannelId, productIds}, {CType, ProductIds}),
|
||||
case Mod:start(ChannelId, Cfg#{<<"product">> => Products}) of
|
||||
{ok, _} ->
|
||||
dgiot_mqtt:subscribe(<<"channel/", ChannelId/binary, "/#">>),
|
||||
dgiot_bridge:send_log(ChannelId, "Channel ~s is Install Protocol ~s", [ChannelId, jsx:encode(ProductIds)]),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
dgiot_data:delete(?DGIOT_BRIDGE, {ChannelId, productIds}),
|
||||
{error, Reason}
|
||||
end;
|
||||
false ->
|
||||
{error, {Mod, start_error}}
|
||||
end;
|
||||
false ->
|
||||
{error, {Mod, start_error}}
|
||||
|
Loading…
Reference in New Issue
Block a user