fix: add dgiot http and tcp worker

This commit is contained in:
U-JOHNLIU\jonhl 2022-03-21 14:06:03 +08:00
parent d347987ee0
commit 96b88dcbde
6 changed files with 463 additions and 348 deletions

View File

@ -26,9 +26,6 @@
-export([start/2]).
-export([init/3, handle_event/3, handle_message/2, stop/3]).
-export([init/2]).
%%
-channel_type(#{
cType => ?TYPE,
@ -86,37 +83,12 @@ start(ChannelId, ChannelArgs) ->
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
%%
init(?TYPE, ChannelId, #{<<"port">> := Port} = ChannelArgs) ->
init(?TYPE, ChannelId, ChannelArgs) ->
State = #state{
id = ChannelId,
env = maps:without([<<"port">>,<<"path">>,<<"product">>,<<"behaviour">>], ChannelArgs)
},
Name = dgiot_channelx:get_name(?TYPE, ChannelId),
Opts = [
{ip, {0, 0, 0, 0}},
{port, Port}
],
SSL = maps:with([<<"cacertfile">>, <<"certfile">>, <<"keyfile">>], ChannelArgs),
{Transport, TransportOpts} =
case maps:to_list(SSL) of
[] ->
{ranch_tcp, Opts};
SslOpts = [_ | _] ->
{ranch_ssl, Opts ++ SslOpts}
end,
Route = get_route(maps:get(<<"path">>, ChannelArgs, <<>>)),
Dispatch = cowboy_router:compile([
{'_', [
{Route, ?MODULE, State}
]}
]),
CowboyOpts = #{
env =>#{
dispatch => Dispatch
}
},
ChildSpec = ranch:child_spec(Name, 300, Transport, TransportOpts, cowboy_clear, CowboyOpts),
{ok, State, ChildSpec}.
{ok, State, dgiot_http_worker:childSpec(?TYPE, ChannelId, ChannelArgs)}.
%% ,
@ -131,25 +103,3 @@ handle_message(Message, State) ->
stop(ChannelType, ChannelId, _State) ->
?LOG(info,"channel stop ~p,~p", [ChannelType, ChannelId]),
ok.
%% ====== http callback ======
init(Req, #state{ id = ChannelId, env = Env} = State) ->
{ok, _Type, ProductIds} = dgiot_bridge:get_products(ChannelId),
case dgiot_bridge:apply_channel(ChannelId, ProductIds, handle_info, [{http, Req}], Env) of
{ok, NewEnv} ->
Req1 = cowboy_req:reply(200, #{}, <<"code not exist">>, Req),
{ok, Req1, State#state{env = NewEnv}};
{reply, _ProductId, {HTTPCode, Reply}, NewEnv} ->
Req1 = cowboy_req:reply(HTTPCode, #{}, Reply, Req),
{ok, Req1, State#state{env = NewEnv}};
{reply, _ProductId, {HTTPCode, Header, Reply}, NewEnv} ->
Req1 = cowboy_req:reply(HTTPCode, Header, Reply, Req),
{ok, Req1, State#state{env = NewEnv}}
end.
get_route(<<"http://", Path>>) ->
get_route(Path);
get_route(Path) when is_binary(Path) ->
binary_to_list(Path);
get_route(_) ->
"/[...]".

View File

@ -0,0 +1,79 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_http_worker).
-include("dgiot_bridge.hrl").
-include_lib("dgiot/include/logger.hrl").
-author("johnliu").
-record(state, {id, env}).
%% API
-export([childSpec/3,
init/2]).
childSpec(Type, ChannelId, #{<<"port">> := Port} = ChannelArgs) ->
State = #state{
id = ChannelId,
env = maps:without([<<"port">>,<<"path">>,<<"product">>,<<"behaviour">>], ChannelArgs)
},
Name = dgiot_channelx:get_name(Type, ChannelId),
Opts = [
{ip, {0, 0, 0, 0}},
{port, Port}
],
SSL = maps:with([<<"cacertfile">>, <<"certfile">>, <<"keyfile">>], ChannelArgs),
{Transport, TransportOpts} =
case maps:to_list(SSL) of
[] ->
{ranch_tcp, Opts};
SslOpts = [_ | _] ->
{ranch_ssl, Opts ++ SslOpts}
end,
Route = get_route(maps:get(<<"path">>, ChannelArgs, <<>>)),
Dispatch = cowboy_router:compile([
{'_', [
{Route, ?MODULE, State}
]}
]),
CowboyOpts = #{
env =>#{
dispatch => Dispatch
}
},
ranch:child_spec(Name, 300, Transport, TransportOpts, cowboy_clear, CowboyOpts).
%% ====== http callback ======
init(Req, #state{ id = ChannelId, env = Env} = State) ->
{ok, _Type, ProductIds} = dgiot_bridge:get_products(ChannelId),
case dgiot_bridge:apply_channel(ChannelId, ProductIds, handle_info, [{http, Req}], Env) of
{ok, NewEnv} ->
Req1 = cowboy_req:reply(200, #{}, <<"hello word">>, Req),
{ok, Req1, State#state{env = NewEnv}};
{reply, _ProductId, {HTTPCode, Reply}, NewEnv} ->
Req1 = cowboy_req:reply(HTTPCode, #{}, Reply, Req),
{ok, Req1, State#state{env = NewEnv}};
{reply, _ProductId, {HTTPCode, Header, Reply}, NewEnv} ->
Req1 = cowboy_req:reply(HTTPCode, Header, Reply, Req),
{ok, Req1, State#state{env = NewEnv}}
end.
get_route(<<"http://", Path>>) ->
get_route(Path);
get_route(Path) when is_binary(Path) ->
binary_to_list(Path);
get_route(_) ->
"/[...]".

View File

@ -42,8 +42,6 @@
%% Channel callback
-export([init/3, handle_init/1, handle_event/3, handle_message/2, stop/3]).
%% TCP callback
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).
%%
-channel_type(#{
@ -128,7 +126,7 @@ init(?TYPE, ChannelId, #{
dtutype = Dtutype,
buff_size = maps:get(<<"buff_size">>, Args, 1024000)
},
{ok, State, dgiot_tcp_server:child_spec(?MODULE, Port, State)}.
{ok, State, dgiot_tcp_worker:child_spec(Port, State)}.
handle_init(State) ->
{ok, State}.
@ -146,246 +144,6 @@ stop(ChannelType, ChannelId, _State) ->
?LOG(warning, "Channel[~p,~p] stop", [ChannelType, ChannelId]),
ok.
%% =======================
%% {ok, State} | {stop, Reason}
init(#tcp{state = #state{id = ChannelId} = State} = TCPState) ->
case dgiot_bridge:get_products(ChannelId) of
{ok, ?TYPE, ProductIds} ->
NewTcpState = TCPState#tcp{
log = log_fun(ChannelId)
},
do_product(init, [ChannelId], NewTcpState#tcp{
state = State#state{
env = #{},
product = ProductIds
}
});
{error, not_find} ->
{stop, not_find_channel}
end.
handle_info({deliver, _, Msg}, TCPState) ->
Payload = dgiot_mqtt:get_payload(Msg),
Topic = dgiot_mqtt:get_topic(Msg),
case binary:split(Topic, <<$/>>, [global, trim]) of
[<<"thing">>, ProductId, DevAddr, <<"tcp">>, <<"hex">>] ->
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_device:save_log(DeviceId, Payload, ['tcp_send']),
dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(dgiot_utils:trim_string(Payload))),
{noreply, TCPState};
_ ->
case jsx:is_json(Payload) of
true ->
case jsx:decode(Payload, [{labels, binary}, return_maps]) of
#{<<"cmd">> := <<"send">>} = Cmd ->
handle_info(Cmd, TCPState);
Info ->
handle_info({mqtt, Topic, Info}, TCPState)
end;
false ->
handle_info({mqtt, Topic, Payload}, TCPState)
end
end;
%% TCP转一道
handle_info(#{<<"cmd">> := <<"send">>} = Cmd, #tcp{state = #state{id = ChannelId}} = TCPState) ->
case do_product(to_frame, [maps:without([<<"cmd">>], Cmd)], TCPState) of
{ok, NewTCPState} ->
{noreply, NewTCPState};
{reply, ProductId, Payload, NewTCPState} ->
case dgiot_tcp_server:send(TCPState, Payload) of
ok ->
ok;
{error, Reason} ->
dgiot_bridge:send_log(ChannelId, ProductId, "Send Fail, ~p, CMD:~p", [Cmd, Reason])
end,
{noreply, NewTCPState}
end;
handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, devaddr = <<>>, head = Head, len = Len, product = Products, dtutype = Dtutype} = State} = TCPState) ->
DTUIP = dgiot_utils:get_ip(Socket),
case check_login(Head, Len, Buff) of
<<>> ->
{noreply, TCPState#tcp{buff = <<>>}};
DtuAddr ->
NewProductId = get_productid(ChannelId, Products, Head, Dtutype),
DeviceId = dgiot_parse:get_deviceid(NewProductId, DtuAddr),
case create_device(DeviceId, NewProductId, DtuAddr, DTUIP, Dtutype) of
{<<>>, <<>>} ->
{noreply, TCPState#tcp{buff = <<>>}};
{_, _} ->
NewProducts = dgiot_utils:unique_1(Products ++ [NewProductId]),
dgiot_bridge:send_log(ChannelId, NewProductId, DtuAddr, "DeviceId ~p DTU revice from ~p", [DeviceId, DtuAddr]),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DeviceId,
state = State#state{devaddr = DtuAddr, product = NewProducts, deviceId = DeviceId}}}
end
end;
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, product = Products, deviceId = DeviceId}} = TCPState) ->
dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(Buff), ['tcp_receive']),
case decode(Buff, Products, TCPState) of
{ok, [], NewTCPState} ->
{noreply, NewTCPState#tcp{buff = <<>>}};
{ok, Frames, #tcp{state = #state{product = ProductId}} = NewTCPState} ->
dgiot_bridge:send_log(ChannelId, ProductId, "~s", [jsx:encode(Frames)]),
%% Module:do(ChannelId, ProductId, Frames),
handle_frames(Frames, NewTCPState),
{noreply, NewTCPState#tcp{buff = <<>>}};
{stop, _Reason} ->
{noreply, TCPState#tcp{buff = <<>>}}
end;
%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop
handle_info(Info, TCPState) ->
case do_product(handle_info, [Info], TCPState) of
{ok, NewTCPState} ->
{noreply, NewTCPState};
{stop, Reason, NewTCPState} ->
{stop, Reason, NewTCPState}
end.
handle_call(_Msg, _From, TCPState) ->
{reply, ok, TCPState}.
handle_cast(_Msg, TCPState) ->
{noreply, TCPState}.
terminate(_Reason, _TCPState) ->
ok.
code_change(_OldVsn, TCPState, _Extra) ->
{ok, TCPState}.
%% =======================
log_fun(ChannelId) ->
fun(Type, Buff) ->
Data =
case Type of
<<"ERROR">> -> Buff;
_ -> <<<<Y>> || <<X:4>> <= Buff, Y <- integer_to_list(X, 16)>>
end,
dgiot_bridge:send_log(ChannelId, "~s", [<<Type/binary, " ", Data/binary>>])
end.
send_fun(TCPState) ->
fun(Payload) ->
dgiot_tcp_server:send(TCPState, Payload)
end.
%%
%%
%% BuffSizeTCP断开
decode(Payload, [], _TCPState) when byte_size(Payload) > ?MAX_BUFF_SIZE ->
{stop, buff_size_limit};
decode(Payload, [], TCPState) ->
{ok, [], TCPState#tcp{buff = Payload}};
decode(Payload, [ProductId | Products], #tcp{state = #state{env = Env} = State} = TCPState) ->
case dgiot_bridge:parse_frame(ProductId, Payload, Env) of
{error, function_not_exported} ->
decode(Payload, Products, TCPState);
{error, Reason} ->
{stop, Reason};
{Rest, Messages, NewEnv} when length(Messages) > 0 ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}};
{Rest, Messages} when length(Messages) > 0 ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}};
_ ->
decode(Payload, Products, TCPState)
end;
decode(Payload, ProductId, #tcp{state = #state{env = Env} = State} = TCPState) ->
case dgiot_bridge:parse_frame(ProductId, Payload, Env) of
{error, Reason} ->
{stop, Reason};
{Rest, Messages} ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}};
{Rest, Messages, NewEnv} ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}}
end.
do_product(Fun, Args, #tcp{state = #state{product = ProductIds, id = ChannelId, env = Env}} = TCPState) ->
case dgiot_bridge:apply_channel(ChannelId, ProductIds, Fun, Args, Env#{<<"send">> => send_fun(TCPState)}) of
{ok, NewEnv} ->
{ok, update_state(NewEnv, TCPState)};
{stop, Reason, NewEnv} ->
{stop, Reason, update_state(NewEnv, TCPState)};
{reply, ProductId, Reply, NewEnv} ->
{reply, ProductId, Reply, update_state(NewEnv, TCPState)}
end.
handle_frames([], TCPState) ->
{noreply, TCPState};
handle_frames([Frame | Frames], TCPState) ->
case do_product(handle_info, [{message, Frame}], TCPState) of
{ok, NewTCPState} ->
handle_frames(Frames, NewTCPState);
{stop, Reason, NewTCPState} ->
{stop, Reason, NewTCPState}
end.
update_state(Env, #tcp{state = State} = TCPState) ->
TCPState#tcp{state = State#state{env = maps:without([<<"send">>], Env)}}.
create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) ->
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"ACL">> := Acl, <<"devType">> := DevType}} ->
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"devaddr">> := _GWAddr}} ->
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"ip">> => DTUIP, <<"status">> => <<"ONLINE">>}),
dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC),
create_instruct(Acl, ProductId, DeviceId);
_ ->
dgiot_device:create_device(#{
<<"devaddr">> => DTUMAC,
<<"name">> => <<Dtutype/binary, DTUMAC/binary>>,
<<"ip">> => DTUIP,
<<"isEnable">> => true,
<<"product">> => ProductId,
<<"ACL">> => Acl,
<<"status">> => <<"ONLINE">>,
<<"location">> => #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.161324, <<"latitude">> => 30.262441},
<<"brand">> => Dtutype,
<<"devModel">> => DevType
}),
dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC),
create_instruct(Acl, ProductId, DeviceId)
end,
Productname =
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"name">> := Productname1}} ->
Productname1;
_ ->
<<"">>
end,
?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => DTUMAC, <<"productid">> => ProductId, <<"productname">> => Productname, <<"devicename">> => <<Dtutype/binary, DTUMAC/binary>>}, ['online']),
dgiot_device:sub_topic(DeviceId, <<"tcp/hex">>),
dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(DTUMAC), ['tcp_receive']),
{DeviceId, DTUMAC};
Error2 ->
?LOG(info, "Error2 ~p ", [Error2]),
{<<>>, <<>>}
end.
create_instruct(ACL, DtuProductId, DtuDevId) ->
case dgiot_product:lookup_prod(DtuProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Properties}}} ->
lists:map(fun(Y) ->
case Y of
#{<<"dataSource">> := #{<<"slaveid">> := 256}} -> %%
pass;
#{<<"dataSource">> := #{<<"slaveid">> := SlaveId}} ->
Pn = dgiot_utils:to_binary(SlaveId),
dgiot_instruct:create(DtuProductId, DtuDevId, Pn, ACL, <<"all">>, #{<<"properties">> => [Y]});
_ -> pass
end
end, Properties);
_ -> pass
end.
get_header(Regular) ->
lists:foldl(fun(X, {Header, Len}) ->
case X of
@ -394,48 +152,4 @@ get_header(Regular) ->
_ -> {Header ++ X, Len + length(X)}
end
end, {[], 0},
re:split(dgiot_utils:to_list(Regular), "-", [{return, list}])).
get_productid(ChannelId, Products, Head, Dtutype) ->
NewProductId = dgiot_parse:get_productid(<<"DGIOTHUB"/utf8>>, dgiot_utils:to_binary(Head), dgiot_utils:to_binary(Dtutype)),
case lists:member(NewProductId, Products) of
false ->
{ok, Acl} = dgiot_bridge:get_acl(ChannelId),
case dgiot_parse:get_object(<<"Product">>, NewProductId) of
{ok, _} ->
pass;
_ ->
Product = #{
<<"name">> => dgiot_utils:to_binary(Dtutype),
<<"devType">> => dgiot_utils:to_binary(Head),
<<"category">> => <<"DGIOTHUB"/utf8>>,
<<"ACL">> => Acl,
<<"netType">> => <<"NB-IOT">>,
<<"nodeType">> => 3,
<<"config">> => #{},
<<"thing">> => #{},
<<"productSecret">> => dgiot_utils:random()
},
dgiot_parse:create_object(<<"Product">>, Product),
pass
end;
true ->
pass
end,
NewProductId.
check_login(Head, Len, Addr) ->
HexAddr = dgiot_utils:binary_to_hex(Addr),
HexList = dgiot_utils:to_list(HexAddr),
List = dgiot_utils:to_list(Addr),
case re:run(HexAddr, Head, [{capture, first, list}]) of
{match, [Head]} when length(HexList) == Len ->
HexAddr;
_Error ->
case re:run(Addr, Head, [{capture, first, list}]) of
{match, [Head]} when length(List) == Len ->
Addr;
_Error1 ->
<<>>
end
end.
re:split(dgiot_utils:to_list(Regular), "-", [{return, list}])).

View File

@ -0,0 +1,329 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_tcp_worker).
-author("kenneth").
-include("dgiot_bridge.hrl").
-include_lib("dgiot/include/dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
-define(TYPE, <<"TCP">>).
-define(MAX_BUFF_SIZE, 1024).
-record(state, {
id,
buff_size = 1024000,
devaddr = <<>>,
heartcount = 0,
head = "xxxxxx0eee",
len = 0,
app = <<>>,
product = <<>>,
deviceId = <<>>,
env = #{},
dtutype = <<>>
}).
%% TCP callback
-export([child_spec/2, init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]).
child_spec(Port, State) ->
dgiot_tcp_server:child_spec(?MODULE, Port, State).
%% =======================
%% {ok, State} | {stop, Reason}
init(#tcp{state = #state{id = ChannelId} = State} = TCPState) ->
case dgiot_bridge:get_products(ChannelId) of
{ok, ?TYPE, ProductIds} ->
NewTcpState = TCPState#tcp{
log = log_fun(ChannelId)
},
do_product(init, [ChannelId], NewTcpState#tcp{
state = State#state{
env = #{},
product = ProductIds
}
});
{error, not_find} ->
{stop, not_find_channel}
end.
handle_info({deliver, _, Msg}, TCPState) ->
Payload = dgiot_mqtt:get_payload(Msg),
Topic = dgiot_mqtt:get_topic(Msg),
case binary:split(Topic, <<$/>>, [global, trim]) of
[<<"thing">>, ProductId, DevAddr, <<"tcp">>, <<"hex">>] ->
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_device:save_log(DeviceId, Payload, ['tcp_send']),
dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(dgiot_utils:trim_string(Payload))),
{noreply, TCPState};
_ ->
case jsx:is_json(Payload) of
true ->
case jsx:decode(Payload, [{labels, binary}, return_maps]) of
#{<<"cmd">> := <<"send">>} = Cmd ->
handle_info(Cmd, TCPState);
Info ->
handle_info({mqtt, Topic, Info}, TCPState)
end;
false ->
handle_info({mqtt, Topic, Payload}, TCPState)
end
end;
%% TCP转一道
handle_info(#{<<"cmd">> := <<"send">>} = Cmd, #tcp{state = #state{id = ChannelId}} = TCPState) ->
case do_product(to_frame, [maps:without([<<"cmd">>], Cmd)], TCPState) of
{ok, NewTCPState} ->
{noreply, NewTCPState};
{reply, ProductId, Payload, NewTCPState} ->
case dgiot_tcp_server:send(TCPState, Payload) of
ok ->
ok;
{error, Reason} ->
dgiot_bridge:send_log(ChannelId, ProductId, "Send Fail, ~p, CMD:~p", [Cmd, Reason])
end,
{noreply, NewTCPState}
end;
handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, devaddr = <<>>, head = Head, len = Len, product = Products, dtutype = Dtutype} = State} = TCPState) ->
DTUIP = dgiot_utils:get_ip(Socket),
dgiot_tcp_server:send(TCPState, <<"echo">>),
case check_login(Head, Len, Buff) of
<<>> ->
{noreply, TCPState#tcp{buff = <<>>}};
DtuAddr ->
NewProductId = get_productid(ChannelId, Products, Head, Dtutype),
DeviceId = dgiot_parse:get_deviceid(NewProductId, DtuAddr),
case create_device(DeviceId, NewProductId, DtuAddr, DTUIP, Dtutype) of
{<<>>, <<>>} ->
{noreply, TCPState#tcp{buff = <<>>}};
{_, _} ->
NewProducts = dgiot_utils:unique_1(Products ++ [NewProductId]),
dgiot_bridge:send_log(ChannelId, NewProductId, DtuAddr, "DeviceId ~p DTU revice from ~p", [DeviceId, DtuAddr]),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DeviceId,
state = State#state{devaddr = DtuAddr, product = NewProducts, deviceId = DeviceId}}}
end
end;
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, product = Products, deviceId = DeviceId}} = TCPState) ->
dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(Buff), ['tcp_receive']),
case decode(Buff, Products, TCPState) of
{ok, [], NewTCPState} ->
{noreply, NewTCPState#tcp{buff = <<>>}};
{ok, Frames, #tcp{state = #state{product = ProductId}} = NewTCPState} ->
dgiot_bridge:send_log(ChannelId, ProductId, "~s", [jsx:encode(Frames)]),
%% Module:do(ChannelId, ProductId, Frames),
handle_frames(Frames, NewTCPState),
{noreply, NewTCPState#tcp{buff = <<>>}};
{stop, _Reason} ->
{noreply, TCPState#tcp{buff = <<>>}}
end;
%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop
handle_info(Info, TCPState) ->
case do_product(handle_info, [Info], TCPState) of
{ok, NewTCPState} ->
{noreply, NewTCPState};
{stop, Reason, NewTCPState} ->
{stop, Reason, NewTCPState}
end.
handle_call(_Msg, _From, TCPState) ->
{reply, ok, TCPState}.
handle_cast(_Msg, TCPState) ->
{noreply, TCPState}.
terminate(_Reason, _TCPState) ->
ok.
code_change(_OldVsn, TCPState, _Extra) ->
{ok, TCPState}.
%% =======================
log_fun(ChannelId) ->
fun(Type, Buff) ->
Data =
case Type of
<<"ERROR">> -> Buff;
_ -> <<<<Y>> || <<X:4>> <= Buff, Y <- integer_to_list(X, 16)>>
end,
dgiot_bridge:send_log(ChannelId, "~s", [<<Type/binary, " ", Data/binary>>])
end.
send_fun(TCPState) ->
fun(Payload) ->
dgiot_tcp_server:send(TCPState, Payload)
end.
%%
%%
%% BuffSizeTCP断开
decode(Payload, [], _TCPState) when byte_size(Payload) > ?MAX_BUFF_SIZE ->
{stop, buff_size_limit};
decode(Payload, [], TCPState) ->
{ok, [], TCPState#tcp{buff = Payload}};
decode(Payload, [ProductId | Products], #tcp{state = #state{env = Env} = State} = TCPState) ->
case dgiot_bridge:parse_frame(ProductId, Payload, Env) of
{error, function_not_exported} ->
decode(Payload, Products, TCPState);
{error, Reason} ->
{stop, Reason};
{Rest, Messages, NewEnv} when length(Messages) > 0 ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}};
{Rest, Messages} when length(Messages) > 0 ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}};
_ ->
decode(Payload, Products, TCPState)
end;
decode(Payload, ProductId, #tcp{state = #state{env = Env} = State} = TCPState) ->
case dgiot_bridge:parse_frame(ProductId, Payload, Env) of
{error, Reason} ->
{stop, Reason};
{Rest, Messages} ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}};
{Rest, Messages, NewEnv} ->
{ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}}
end.
do_product(Fun, Args, #tcp{state = #state{product = ProductIds, id = ChannelId, env = Env}} = TCPState) ->
case dgiot_bridge:apply_channel(ChannelId, ProductIds, Fun, Args, Env#{<<"send">> => send_fun(TCPState)}) of
{ok, NewEnv} ->
{ok, update_state(NewEnv, TCPState)};
{stop, Reason, NewEnv} ->
{stop, Reason, update_state(NewEnv, TCPState)};
{reply, ProductId, Reply, NewEnv} ->
{reply, ProductId, Reply, update_state(NewEnv, TCPState)}
end.
handle_frames([], TCPState) ->
{noreply, TCPState};
handle_frames([Frame | Frames], TCPState) ->
case do_product(handle_info, [{message, Frame}], TCPState) of
{ok, NewTCPState} ->
handle_frames(Frames, NewTCPState);
{stop, Reason, NewTCPState} ->
{stop, Reason, NewTCPState}
end.
update_state(Env, #tcp{state = State} = TCPState) ->
TCPState#tcp{state = State#state{env = maps:without([<<"send">>], Env)}}.
create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) ->
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"ACL">> := Acl, <<"devType">> := DevType}} ->
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"devaddr">> := _GWAddr}} ->
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"ip">> => DTUIP, <<"status">> => <<"ONLINE">>}),
dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC),
create_instruct(Acl, ProductId, DeviceId);
_ ->
dgiot_device:create_device(#{
<<"devaddr">> => DTUMAC,
<<"name">> => <<Dtutype/binary, DTUMAC/binary>>,
<<"ip">> => DTUIP,
<<"isEnable">> => true,
<<"product">> => ProductId,
<<"ACL">> => Acl,
<<"status">> => <<"ONLINE">>,
<<"location">> => #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.161324, <<"latitude">> => 30.262441},
<<"brand">> => Dtutype,
<<"devModel">> => DevType
}),
dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC),
create_instruct(Acl, ProductId, DeviceId)
end,
Productname =
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"name">> := Productname1}} ->
Productname1;
_ ->
<<"">>
end,
?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => DTUMAC, <<"productid">> => ProductId, <<"productname">> => Productname, <<"devicename">> => <<Dtutype/binary, DTUMAC/binary>>}, ['online']),
dgiot_device:sub_topic(DeviceId, <<"tcp/hex">>),
dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(DTUMAC), ['tcp_receive']),
{DeviceId, DTUMAC};
Error2 ->
?LOG(info, "Error2 ~p ", [Error2]),
{<<>>, <<>>}
end.
create_instruct(ACL, DtuProductId, DtuDevId) ->
case dgiot_product:lookup_prod(DtuProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Properties}}} ->
lists:map(fun(Y) ->
case Y of
#{<<"dataSource">> := #{<<"slaveid">> := 256}} -> %%
pass;
#{<<"dataSource">> := #{<<"slaveid">> := SlaveId}} ->
Pn = dgiot_utils:to_binary(SlaveId),
dgiot_instruct:create(DtuProductId, DtuDevId, Pn, ACL, <<"all">>, #{<<"properties">> => [Y]});
_ -> pass
end
end, Properties);
_ -> pass
end.
get_productid(ChannelId, Products, Head, Dtutype) ->
NewProductId = dgiot_parse:get_productid(<<"DGIOTHUB"/utf8>>, dgiot_utils:to_binary(Head), dgiot_utils:to_binary(Dtutype)),
case lists:member(NewProductId, Products) of
false ->
{ok, Acl} = dgiot_bridge:get_acl(ChannelId),
case dgiot_parse:get_object(<<"Product">>, NewProductId) of
{ok, _} ->
pass;
_ ->
Product = #{
<<"name">> => dgiot_utils:to_binary(Dtutype),
<<"devType">> => dgiot_utils:to_binary(Head),
<<"category">> => <<"DGIOTHUB"/utf8>>,
<<"ACL">> => Acl,
<<"netType">> => <<"NB-IOT">>,
<<"nodeType">> => 3,
<<"config">> => #{},
<<"thing">> => #{},
<<"productSecret">> => dgiot_utils:random()
},
dgiot_parse:create_object(<<"Product">>, Product),
pass
end;
true ->
pass
end,
NewProductId.
check_login(Head, Len, Addr) ->
HexAddr = dgiot_utils:binary_to_hex(Addr),
HexList = dgiot_utils:to_list(HexAddr),
List = dgiot_utils:to_list(Addr),
case re:run(HexAddr, Head, [{capture, first, list}]) of
{match, [Head]} when length(HexList) == Len ->
HexAddr;
_Error ->
case re:run(Addr, Head, [{capture, first, list}]) of
{match, [Head]} when length(List) == Len ->
Addr;
_Error1 ->
<<>>
end
end.

View File

@ -34,12 +34,12 @@
%%
-channel_type(#{
cType => ?TYPE,
type => ?PROTOCOL_CHL,
type => ?BRIDGE_CHL,
title => #{
zh => <<"MQTT资源通道"/utf8>>
zh => <<"MQTT桥接通道"/utf8>>
},
description => #{
zh => <<"MQTT资源通道"/utf8>>
zh => <<"MQTT桥接通道"/utf8>>
}
}).
%%
@ -72,7 +72,7 @@
order => 3,
type => string,
required => true,
default => <<"test"/utf8>>,
default => <<"anonymous"/utf8>>,
title => #{
zh => <<"用户名"/utf8>>
},
@ -93,7 +93,7 @@
}
},
<<"ssl">> => #{
order => 6,
order => 5,
type => boolean,
required => true,
default => false,
@ -105,7 +105,7 @@
}
},
<<"clean_start">> => #{
order => 7,
order => 6,
type => boolean,
required => true,
default => false,
@ -188,13 +188,14 @@ handle_cast(_Request, State) ->
{noreply, State}.
handle_info({connect, Client}, #state{id = ChannelId} = State) ->
emqtt:subscribe(Client, {<<"bridge/#">>, 1}),
case dgiot_bridge:get_products(ChannelId) of
{ok, _Type, ProductIds} ->
case ProductIds of
[] -> pass;
_ ->
lists:map(fun(ProductId) ->
%% dgiot_product:load(ProductId),
%% dgiot_product:load(ProductId),
emqtt:subscribe(Client, {<<"bridge/thing/", ProductId/binary, "/#">>, 1}),
dgiot_mqtt:subscribe(<<"forward/thing/", ProductId/binary, "/+/post">>),
dgiot_mqtt:publish(ChannelId, <<"thing/", ProductId/binary>>, jsx:encode(#{<<"network">> => <<"connect">>}))
@ -225,7 +226,8 @@ handle_info({publish, #{payload := Payload, topic := <<"bridge/", Topic/binary>>
handle_info({deliver, _, Msg}, #state{client = Client} = State) ->
case dgiot_mqtt:get_topic(Msg) of
<<"forward/", Topic/binary>> -> emqtt:publish(Client, Topic, dgiot_mqtt:get_payload(Msg));
<<"forward/", Topic/binary>> ->
emqtt:publish(Client, Topic, dgiot_mqtt:get_payload(Msg));
_ -> pass
end,
{noreply, State};

View File

@ -1167,6 +1167,41 @@ function make_ssl() {
fi
}
function build_dashboard_lite() {
if [ ! -d ${script_dir}/node-v16.5.0-linux-x64/bin/ ]; then
if [ ! -f ${script_dir}/node-v16.5.0-linux-x64.tar.gz ]; then
wget https://dgiotdev-1308220533.cos.ap-nanjing.myqcloud.com/node-v16.5.0-linux-x64.tar.gz &> /dev/null
tar xvf node-v16.5.0-linux-x64.tar.gz &> /dev/null
if [ ! -f usr/bin/node ]; then
rm /usr/bin/node -rf
fi
ln -s ${script_dir}/node-v16.5.0-linux-x64/bin/node /usr/bin/node
${script_dir}/node-v16.5.0-linux-x64/bin/npm i -g yarn --registry=https://registry.npmmirror.com
${script_dir}/node-v16.5.0-linux-x64/bin/yarn config set registry https://registry.npmmirror.com
${script_dir}/node-v16.5.0-linux-x64/bin/yarn -v
${script_dir}/node-v16.5.0-linux-x64/bin/yarn get registry
sudo /bin/dd if=/dev/zero of=/var/swap.1 bs=1M count=1024
sudo /sbin/mkswap /var/swap.1
sudo /sbin/swapon /var/swap.1
fi
fi
cd ${script_dir}/
if [ ! -d ${script_dir}/dgiot_dashboard_lite/ ]; then
git clone -b master https://gitee.com/dgiiot/dgiot-dashboard-lite.git dgiot_dashboard_lite
fi
cd ${script_dir}/dgiot_dashboard_lite
git reset --hard
git pull
export PATH=$PATH:/usr/local/bin:${script_dir}/node-v16.5.0-linux-x64/bin/
rm ${script_dir}/dgiot_dashboard_lite/dist/ -rf
${script_dir}/node-v16.5.0-linux-x64/bin/yarn install
${script_dir}/node-v16.5.0-linux-x64/bin/yarn build
echo "not build"
}
function build_dashboard() {
if [ ! -d ${script_dir}/node-v16.5.0-linux-x64/bin/ ]; then
if [ ! -f ${script_dir}/node-v16.5.0-linux-x64.tar.gz ]; then
@ -1241,6 +1276,10 @@ function pre_build_dgiot() {
cp ${script_dir}/dgiot_dashboard/dist/ ${script_dir}/$plugin/apps/dgiot_api/priv/www -rf
fi
if [ -d ${script_dir}/dgiot_dashboard_lite/dist ]; then
cp ${script_dir}/dgiot_dashboard_lite/dist/ ${script_dir}/$plugin/apps/dgiot_api/priv/www/lite -rf
fi
if [ -d ${script_dir}/dgiot/emqx/rel/ ]; then
rm ${script_dir}/dgiot/emqx/rel -rf
fi
@ -1274,6 +1313,7 @@ function post_build_dgiot() {
function devops() {
build_dashboard
build_dashboard_lite
pre_build_dgiot
make
post_build_dgiot
@ -1281,6 +1321,7 @@ function devops() {
function ci() {
build_dashboard
build_dashboard_lite
pre_build_dgiot
make ci
post_build_dgiot