From 96b88dcbde70d2d301a3a61137dec5c44930d9ff Mon Sep 17 00:00:00 2001 From: "U-JOHNLIU\\jonhl" Date: Mon, 21 Mar 2022 14:06:03 +0800 Subject: [PATCH] fix: add dgiot http and tcp worker --- .../src/channel/dgiot_http_channel.erl | 54 +-- .../src/channel/dgiot_http_worker.erl | 79 +++++ .../src/channel/dgiot_tcp_channel.erl | 290 +-------------- .../src/channel/dgiot_tcp_worker.erl | 329 ++++++++++++++++++ apps/dgiot_dlink/src/dgiot_mqttc_channel.erl | 18 +- dgiot_install.sh | 41 +++ 6 files changed, 463 insertions(+), 348 deletions(-) create mode 100644 apps/dgiot_bridge/src/channel/dgiot_http_worker.erl create mode 100644 apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl diff --git a/apps/dgiot_bridge/src/channel/dgiot_http_channel.erl b/apps/dgiot_bridge/src/channel/dgiot_http_channel.erl index 4e188beb..6c8575cd 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_http_channel.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_http_channel.erl @@ -26,9 +26,6 @@ -export([start/2]). -export([init/3, handle_event/3, handle_message/2, stop/3]). --export([init/2]). - - %% 注册通道类型 -channel_type(#{ cType => ?TYPE, @@ -86,37 +83,12 @@ start(ChannelId, ChannelArgs) -> dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs). %% 通道初始化 -init(?TYPE, ChannelId, #{<<"port">> := Port} = ChannelArgs) -> +init(?TYPE, ChannelId, ChannelArgs) -> State = #state{ id = ChannelId, env = maps:without([<<"port">>,<<"path">>,<<"product">>,<<"behaviour">>], ChannelArgs) }, - Name = dgiot_channelx:get_name(?TYPE, ChannelId), - Opts = [ - {ip, {0, 0, 0, 0}}, - {port, Port} - ], - SSL = maps:with([<<"cacertfile">>, <<"certfile">>, <<"keyfile">>], ChannelArgs), - {Transport, TransportOpts} = - case maps:to_list(SSL) of - [] -> - {ranch_tcp, Opts}; - SslOpts = [_ | _] -> - {ranch_ssl, Opts ++ SslOpts} - end, - Route = get_route(maps:get(<<"path">>, ChannelArgs, <<>>)), - Dispatch = cowboy_router:compile([ - {'_', [ - {Route, ?MODULE, State} - ]} - ]), - CowboyOpts = #{ - env =>#{ - dispatch => Dispatch - } - }, - ChildSpec = ranch:child_spec(Name, 300, Transport, TransportOpts, cowboy_clear, CowboyOpts), - {ok, State, ChildSpec}. + {ok, State, dgiot_http_worker:childSpec(?TYPE, ChannelId, ChannelArgs)}. %% 通道消息处理,注意:进程池调用 @@ -131,25 +103,3 @@ handle_message(Message, State) -> stop(ChannelType, ChannelId, _State) -> ?LOG(info,"channel stop ~p,~p", [ChannelType, ChannelId]), ok. - -%% ====== http callback ====== -init(Req, #state{ id = ChannelId, env = Env} = State) -> - {ok, _Type, ProductIds} = dgiot_bridge:get_products(ChannelId), - case dgiot_bridge:apply_channel(ChannelId, ProductIds, handle_info, [{http, Req}], Env) of - {ok, NewEnv} -> - Req1 = cowboy_req:reply(200, #{}, <<"code not exist">>, Req), - {ok, Req1, State#state{env = NewEnv}}; - {reply, _ProductId, {HTTPCode, Reply}, NewEnv} -> - Req1 = cowboy_req:reply(HTTPCode, #{}, Reply, Req), - {ok, Req1, State#state{env = NewEnv}}; - {reply, _ProductId, {HTTPCode, Header, Reply}, NewEnv} -> - Req1 = cowboy_req:reply(HTTPCode, Header, Reply, Req), - {ok, Req1, State#state{env = NewEnv}} - end. - -get_route(<<"http://", Path>>) -> - get_route(Path); -get_route(Path) when is_binary(Path) -> - binary_to_list(Path); -get_route(_) -> - "/[...]". diff --git a/apps/dgiot_bridge/src/channel/dgiot_http_worker.erl b/apps/dgiot_bridge/src/channel/dgiot_http_worker.erl new file mode 100644 index 00000000..a992ad32 --- /dev/null +++ b/apps/dgiot_bridge/src/channel/dgiot_http_worker.erl @@ -0,0 +1,79 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_http_worker). +-include("dgiot_bridge.hrl"). +-include_lib("dgiot/include/logger.hrl"). +-author("johnliu"). + +-record(state, {id, env}). + +%% API +-export([childSpec/3, + init/2]). + +childSpec(Type, ChannelId, #{<<"port">> := Port} = ChannelArgs) -> + State = #state{ + id = ChannelId, + env = maps:without([<<"port">>,<<"path">>,<<"product">>,<<"behaviour">>], ChannelArgs) + }, + Name = dgiot_channelx:get_name(Type, ChannelId), + Opts = [ + {ip, {0, 0, 0, 0}}, + {port, Port} + ], + SSL = maps:with([<<"cacertfile">>, <<"certfile">>, <<"keyfile">>], ChannelArgs), + {Transport, TransportOpts} = + case maps:to_list(SSL) of + [] -> + {ranch_tcp, Opts}; + SslOpts = [_ | _] -> + {ranch_ssl, Opts ++ SslOpts} + end, + Route = get_route(maps:get(<<"path">>, ChannelArgs, <<>>)), + Dispatch = cowboy_router:compile([ + {'_', [ + {Route, ?MODULE, State} + ]} + ]), + CowboyOpts = #{ + env =>#{ + dispatch => Dispatch + } + }, + ranch:child_spec(Name, 300, Transport, TransportOpts, cowboy_clear, CowboyOpts). + +%% ====== http callback ====== +init(Req, #state{ id = ChannelId, env = Env} = State) -> + {ok, _Type, ProductIds} = dgiot_bridge:get_products(ChannelId), + case dgiot_bridge:apply_channel(ChannelId, ProductIds, handle_info, [{http, Req}], Env) of + {ok, NewEnv} -> + Req1 = cowboy_req:reply(200, #{}, <<"hello word">>, Req), + {ok, Req1, State#state{env = NewEnv}}; + {reply, _ProductId, {HTTPCode, Reply}, NewEnv} -> + Req1 = cowboy_req:reply(HTTPCode, #{}, Reply, Req), + {ok, Req1, State#state{env = NewEnv}}; + {reply, _ProductId, {HTTPCode, Header, Reply}, NewEnv} -> + Req1 = cowboy_req:reply(HTTPCode, Header, Reply, Req), + {ok, Req1, State#state{env = NewEnv}} + end. + +get_route(<<"http://", Path>>) -> + get_route(Path); +get_route(Path) when is_binary(Path) -> + binary_to_list(Path); +get_route(_) -> + "/[...]". \ No newline at end of file diff --git a/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl b/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl index a9c16494..797ab0cd 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl @@ -42,8 +42,6 @@ %% Channel callback -export([init/3, handle_init/1, handle_event/3, handle_message/2, stop/3]). -%% TCP callback --export([init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]). %% 注册通道类型 -channel_type(#{ @@ -128,7 +126,7 @@ init(?TYPE, ChannelId, #{ dtutype = Dtutype, buff_size = maps:get(<<"buff_size">>, Args, 1024000) }, - {ok, State, dgiot_tcp_server:child_spec(?MODULE, Port, State)}. + {ok, State, dgiot_tcp_worker:child_spec(Port, State)}. handle_init(State) -> {ok, State}. @@ -146,246 +144,6 @@ stop(ChannelType, ChannelId, _State) -> ?LOG(warning, "Channel[~p,~p] stop", [ChannelType, ChannelId]), ok. -%% ======================= -%% {ok, State} | {stop, Reason} -init(#tcp{state = #state{id = ChannelId} = State} = TCPState) -> - case dgiot_bridge:get_products(ChannelId) of - {ok, ?TYPE, ProductIds} -> - NewTcpState = TCPState#tcp{ - log = log_fun(ChannelId) - }, - do_product(init, [ChannelId], NewTcpState#tcp{ - state = State#state{ - env = #{}, - product = ProductIds - } - }); - {error, not_find} -> - {stop, not_find_channel} - end. - -handle_info({deliver, _, Msg}, TCPState) -> - Payload = dgiot_mqtt:get_payload(Msg), - Topic = dgiot_mqtt:get_topic(Msg), - case binary:split(Topic, <<$/>>, [global, trim]) of - [<<"thing">>, ProductId, DevAddr, <<"tcp">>, <<"hex">>] -> - DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), - dgiot_device:save_log(DeviceId, Payload, ['tcp_send']), - dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(dgiot_utils:trim_string(Payload))), - {noreply, TCPState}; - _ -> - case jsx:is_json(Payload) of - true -> - case jsx:decode(Payload, [{labels, binary}, return_maps]) of - #{<<"cmd">> := <<"send">>} = Cmd -> - handle_info(Cmd, TCPState); - Info -> - handle_info({mqtt, Topic, Info}, TCPState) - end; - false -> - handle_info({mqtt, Topic, Payload}, TCPState) - end - end; - -%% 对于TCP转一道,向下发的命令 -handle_info(#{<<"cmd">> := <<"send">>} = Cmd, #tcp{state = #state{id = ChannelId}} = TCPState) -> - case do_product(to_frame, [maps:without([<<"cmd">>], Cmd)], TCPState) of - {ok, NewTCPState} -> - {noreply, NewTCPState}; - {reply, ProductId, Payload, NewTCPState} -> - case dgiot_tcp_server:send(TCPState, Payload) of - ok -> - ok; - {error, Reason} -> - dgiot_bridge:send_log(ChannelId, ProductId, "Send Fail, ~p, CMD:~p", [Cmd, Reason]) - end, - {noreply, NewTCPState} - end; - -handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, devaddr = <<>>, head = Head, len = Len, product = Products, dtutype = Dtutype} = State} = TCPState) -> - DTUIP = dgiot_utils:get_ip(Socket), - case check_login(Head, Len, Buff) of - <<>> -> - {noreply, TCPState#tcp{buff = <<>>}}; - DtuAddr -> - NewProductId = get_productid(ChannelId, Products, Head, Dtutype), - DeviceId = dgiot_parse:get_deviceid(NewProductId, DtuAddr), - case create_device(DeviceId, NewProductId, DtuAddr, DTUIP, Dtutype) of - {<<>>, <<>>} -> - {noreply, TCPState#tcp{buff = <<>>}}; - {_, _} -> - NewProducts = dgiot_utils:unique_1(Products ++ [NewProductId]), - dgiot_bridge:send_log(ChannelId, NewProductId, DtuAddr, "DeviceId ~p DTU revice from ~p", [DeviceId, DtuAddr]), - {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DeviceId, - state = State#state{devaddr = DtuAddr, product = NewProducts, deviceId = DeviceId}}} - end - end; - -handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, product = Products, deviceId = DeviceId}} = TCPState) -> - dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(Buff), ['tcp_receive']), - case decode(Buff, Products, TCPState) of - {ok, [], NewTCPState} -> - {noreply, NewTCPState#tcp{buff = <<>>}}; - {ok, Frames, #tcp{state = #state{product = ProductId}} = NewTCPState} -> - dgiot_bridge:send_log(ChannelId, ProductId, "~s", [jsx:encode(Frames)]), -%% Module:do(ChannelId, ProductId, Frames), - handle_frames(Frames, NewTCPState), - {noreply, NewTCPState#tcp{buff = <<>>}}; - {stop, _Reason} -> - {noreply, TCPState#tcp{buff = <<>>}} - end; - -%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop -handle_info(Info, TCPState) -> - case do_product(handle_info, [Info], TCPState) of - {ok, NewTCPState} -> - {noreply, NewTCPState}; - {stop, Reason, NewTCPState} -> - {stop, Reason, NewTCPState} - end. - -handle_call(_Msg, _From, TCPState) -> - {reply, ok, TCPState}. - -handle_cast(_Msg, TCPState) -> - {noreply, TCPState}. - -terminate(_Reason, _TCPState) -> - ok. - -code_change(_OldVsn, TCPState, _Extra) -> - {ok, TCPState}. - - -%% ======================= - -log_fun(ChannelId) -> - fun(Type, Buff) -> - Data = - case Type of - <<"ERROR">> -> Buff; - _ -> <<<> || <> <= Buff, Y <- integer_to_list(X, 16)>> - end, - dgiot_bridge:send_log(ChannelId, "~s", [<>]) - end. - -send_fun(TCPState) -> - fun(Payload) -> - dgiot_tcp_server:send(TCPState, Payload) - end. - -%% 如果是多个产品,先用报文依次解析,只要有一个能解开,则认为此连接为此产品的, -%% 否则一直解析下去,如果一个产品都没有解析成功,则缓存到下一次解析,直到缓存 -%% 如果超过了BuffSize,TCP断开, -decode(Payload, [], _TCPState) when byte_size(Payload) > ?MAX_BUFF_SIZE -> - {stop, buff_size_limit}; -decode(Payload, [], TCPState) -> - {ok, [], TCPState#tcp{buff = Payload}}; -decode(Payload, [ProductId | Products], #tcp{state = #state{env = Env} = State} = TCPState) -> - case dgiot_bridge:parse_frame(ProductId, Payload, Env) of - {error, function_not_exported} -> - decode(Payload, Products, TCPState); - {error, Reason} -> - {stop, Reason}; - {Rest, Messages, NewEnv} when length(Messages) > 0 -> - {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}}; - {Rest, Messages} when length(Messages) > 0 -> - {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}}; - _ -> - decode(Payload, Products, TCPState) - end; - -decode(Payload, ProductId, #tcp{state = #state{env = Env} = State} = TCPState) -> - case dgiot_bridge:parse_frame(ProductId, Payload, Env) of - {error, Reason} -> - {stop, Reason}; - {Rest, Messages} -> - {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}}; - {Rest, Messages, NewEnv} -> - {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}} - end. - -do_product(Fun, Args, #tcp{state = #state{product = ProductIds, id = ChannelId, env = Env}} = TCPState) -> - case dgiot_bridge:apply_channel(ChannelId, ProductIds, Fun, Args, Env#{<<"send">> => send_fun(TCPState)}) of - {ok, NewEnv} -> - {ok, update_state(NewEnv, TCPState)}; - {stop, Reason, NewEnv} -> - {stop, Reason, update_state(NewEnv, TCPState)}; - {reply, ProductId, Reply, NewEnv} -> - {reply, ProductId, Reply, update_state(NewEnv, TCPState)} - end. - -handle_frames([], TCPState) -> - {noreply, TCPState}; -handle_frames([Frame | Frames], TCPState) -> - case do_product(handle_info, [{message, Frame}], TCPState) of - {ok, NewTCPState} -> - handle_frames(Frames, NewTCPState); - {stop, Reason, NewTCPState} -> - {stop, Reason, NewTCPState} - end. - -update_state(Env, #tcp{state = State} = TCPState) -> - TCPState#tcp{state = State#state{env = maps:without([<<"send">>], Env)}}. - -create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) -> - case dgiot_parse:get_object(<<"Product">>, ProductId) of - {ok, #{<<"ACL">> := Acl, <<"devType">> := DevType}} -> - case dgiot_parse:get_object(<<"Device">>, DeviceId) of - {ok, #{<<"devaddr">> := _GWAddr}} -> - dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"ip">> => DTUIP, <<"status">> => <<"ONLINE">>}), - dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC), - create_instruct(Acl, ProductId, DeviceId); - _ -> - dgiot_device:create_device(#{ - <<"devaddr">> => DTUMAC, - <<"name">> => <>, - <<"ip">> => DTUIP, - <<"isEnable">> => true, - <<"product">> => ProductId, - <<"ACL">> => Acl, - <<"status">> => <<"ONLINE">>, - <<"location">> => #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.161324, <<"latitude">> => 30.262441}, - <<"brand">> => Dtutype, - <<"devModel">> => DevType - }), - dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC), - create_instruct(Acl, ProductId, DeviceId) - end, - Productname = - case dgiot_parse:get_object(<<"Product">>, ProductId) of - {ok, #{<<"name">> := Productname1}} -> - Productname1; - _ -> - <<"">> - end, - ?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => DTUMAC, <<"productid">> => ProductId, <<"productname">> => Productname, <<"devicename">> => <>}, ['online']), - dgiot_device:sub_topic(DeviceId, <<"tcp/hex">>), - dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(DTUMAC), ['tcp_receive']), - {DeviceId, DTUMAC}; - Error2 -> - ?LOG(info, "Error2 ~p ", [Error2]), - {<<>>, <<>>} - end. - - -create_instruct(ACL, DtuProductId, DtuDevId) -> - case dgiot_product:lookup_prod(DtuProductId) of - {ok, #{<<"thing">> := #{<<"properties">> := Properties}}} -> - lists:map(fun(Y) -> - case Y of - #{<<"dataSource">> := #{<<"slaveid">> := 256}} -> %%不做指令 - pass; - #{<<"dataSource">> := #{<<"slaveid">> := SlaveId}} -> - Pn = dgiot_utils:to_binary(SlaveId), - dgiot_instruct:create(DtuProductId, DtuDevId, Pn, ACL, <<"all">>, #{<<"properties">> => [Y]}); - _ -> pass - end - end, Properties); - _ -> pass - end. - - get_header(Regular) -> lists:foldl(fun(X, {Header, Len}) -> case X of @@ -394,48 +152,4 @@ get_header(Regular) -> _ -> {Header ++ X, Len + length(X)} end end, {[], 0}, - re:split(dgiot_utils:to_list(Regular), "-", [{return, list}])). - -get_productid(ChannelId, Products, Head, Dtutype) -> - NewProductId = dgiot_parse:get_productid(<<"DGIOTHUB"/utf8>>, dgiot_utils:to_binary(Head), dgiot_utils:to_binary(Dtutype)), - case lists:member(NewProductId, Products) of - false -> - {ok, Acl} = dgiot_bridge:get_acl(ChannelId), - case dgiot_parse:get_object(<<"Product">>, NewProductId) of - {ok, _} -> - pass; - _ -> - Product = #{ - <<"name">> => dgiot_utils:to_binary(Dtutype), - <<"devType">> => dgiot_utils:to_binary(Head), - <<"category">> => <<"DGIOTHUB"/utf8>>, - <<"ACL">> => Acl, - <<"netType">> => <<"NB-IOT">>, - <<"nodeType">> => 3, - <<"config">> => #{}, - <<"thing">> => #{}, - <<"productSecret">> => dgiot_utils:random() - }, - dgiot_parse:create_object(<<"Product">>, Product), - pass - end; - true -> - pass - end, - NewProductId. - -check_login(Head, Len, Addr) -> - HexAddr = dgiot_utils:binary_to_hex(Addr), - HexList = dgiot_utils:to_list(HexAddr), - List = dgiot_utils:to_list(Addr), - case re:run(HexAddr, Head, [{capture, first, list}]) of - {match, [Head]} when length(HexList) == Len -> - HexAddr; - _Error -> - case re:run(Addr, Head, [{capture, first, list}]) of - {match, [Head]} when length(List) == Len -> - Addr; - _Error1 -> - <<>> - end - end. + re:split(dgiot_utils:to_list(Regular), "-", [{return, list}])). \ No newline at end of file diff --git a/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl b/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl new file mode 100644 index 00000000..5ab3141b --- /dev/null +++ b/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl @@ -0,0 +1,329 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_tcp_worker). +-author("kenneth"). +-include("dgiot_bridge.hrl"). +-include_lib("dgiot/include/dgiot_socket.hrl"). +-include_lib("dgiot/include/logger.hrl"). + +-define(TYPE, <<"TCP">>). +-define(MAX_BUFF_SIZE, 1024). +-record(state, { + id, + buff_size = 1024000, + devaddr = <<>>, + heartcount = 0, + head = "xxxxxx0eee", + len = 0, + app = <<>>, + product = <<>>, + deviceId = <<>>, + env = #{}, + dtutype = <<>> +}). + + + +%% TCP callback +-export([child_spec/2, init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]). + +child_spec(Port, State) -> + dgiot_tcp_server:child_spec(?MODULE, Port, State). + +%% ======================= +%% {ok, State} | {stop, Reason} +init(#tcp{state = #state{id = ChannelId} = State} = TCPState) -> + case dgiot_bridge:get_products(ChannelId) of + {ok, ?TYPE, ProductIds} -> + NewTcpState = TCPState#tcp{ + log = log_fun(ChannelId) + }, + do_product(init, [ChannelId], NewTcpState#tcp{ + state = State#state{ + env = #{}, + product = ProductIds + } + }); + {error, not_find} -> + {stop, not_find_channel} + end. + +handle_info({deliver, _, Msg}, TCPState) -> + Payload = dgiot_mqtt:get_payload(Msg), + Topic = dgiot_mqtt:get_topic(Msg), + case binary:split(Topic, <<$/>>, [global, trim]) of + [<<"thing">>, ProductId, DevAddr, <<"tcp">>, <<"hex">>] -> + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_device:save_log(DeviceId, Payload, ['tcp_send']), + dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(dgiot_utils:trim_string(Payload))), + {noreply, TCPState}; + _ -> + case jsx:is_json(Payload) of + true -> + case jsx:decode(Payload, [{labels, binary}, return_maps]) of + #{<<"cmd">> := <<"send">>} = Cmd -> + handle_info(Cmd, TCPState); + Info -> + handle_info({mqtt, Topic, Info}, TCPState) + end; + false -> + handle_info({mqtt, Topic, Payload}, TCPState) + end + end; + +%% 对于TCP转一道,向下发的命令 +handle_info(#{<<"cmd">> := <<"send">>} = Cmd, #tcp{state = #state{id = ChannelId}} = TCPState) -> + case do_product(to_frame, [maps:without([<<"cmd">>], Cmd)], TCPState) of + {ok, NewTCPState} -> + {noreply, NewTCPState}; + {reply, ProductId, Payload, NewTCPState} -> + case dgiot_tcp_server:send(TCPState, Payload) of + ok -> + ok; + {error, Reason} -> + dgiot_bridge:send_log(ChannelId, ProductId, "Send Fail, ~p, CMD:~p", [Cmd, Reason]) + end, + {noreply, NewTCPState} + end; + +handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, devaddr = <<>>, head = Head, len = Len, product = Products, dtutype = Dtutype} = State} = TCPState) -> + DTUIP = dgiot_utils:get_ip(Socket), + dgiot_tcp_server:send(TCPState, <<"echo">>), + case check_login(Head, Len, Buff) of + <<>> -> + {noreply, TCPState#tcp{buff = <<>>}}; + DtuAddr -> + NewProductId = get_productid(ChannelId, Products, Head, Dtutype), + DeviceId = dgiot_parse:get_deviceid(NewProductId, DtuAddr), + case create_device(DeviceId, NewProductId, DtuAddr, DTUIP, Dtutype) of + {<<>>, <<>>} -> + {noreply, TCPState#tcp{buff = <<>>}}; + {_, _} -> + NewProducts = dgiot_utils:unique_1(Products ++ [NewProductId]), + dgiot_bridge:send_log(ChannelId, NewProductId, DtuAddr, "DeviceId ~p DTU revice from ~p", [DeviceId, DtuAddr]), + {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DeviceId, + state = State#state{devaddr = DtuAddr, product = NewProducts, deviceId = DeviceId}}} + end + end; + +handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, product = Products, deviceId = DeviceId}} = TCPState) -> + dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(Buff), ['tcp_receive']), + case decode(Buff, Products, TCPState) of + {ok, [], NewTCPState} -> + {noreply, NewTCPState#tcp{buff = <<>>}}; + {ok, Frames, #tcp{state = #state{product = ProductId}} = NewTCPState} -> + dgiot_bridge:send_log(ChannelId, ProductId, "~s", [jsx:encode(Frames)]), +%% Module:do(ChannelId, ProductId, Frames), + handle_frames(Frames, NewTCPState), + {noreply, NewTCPState#tcp{buff = <<>>}}; + {stop, _Reason} -> + {noreply, TCPState#tcp{buff = <<>>}} + end; + +%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop +handle_info(Info, TCPState) -> + case do_product(handle_info, [Info], TCPState) of + {ok, NewTCPState} -> + {noreply, NewTCPState}; + {stop, Reason, NewTCPState} -> + {stop, Reason, NewTCPState} + end. + +handle_call(_Msg, _From, TCPState) -> + {reply, ok, TCPState}. + +handle_cast(_Msg, TCPState) -> + {noreply, TCPState}. + +terminate(_Reason, _TCPState) -> + ok. + +code_change(_OldVsn, TCPState, _Extra) -> + {ok, TCPState}. + + +%% ======================= + +log_fun(ChannelId) -> + fun(Type, Buff) -> + Data = + case Type of + <<"ERROR">> -> Buff; + _ -> <<<> || <> <= Buff, Y <- integer_to_list(X, 16)>> + end, + dgiot_bridge:send_log(ChannelId, "~s", [<>]) + end. + +send_fun(TCPState) -> + fun(Payload) -> + dgiot_tcp_server:send(TCPState, Payload) + end. + +%% 如果是多个产品,先用报文依次解析,只要有一个能解开,则认为此连接为此产品的, +%% 否则一直解析下去,如果一个产品都没有解析成功,则缓存到下一次解析,直到缓存 +%% 如果超过了BuffSize,TCP断开, +decode(Payload, [], _TCPState) when byte_size(Payload) > ?MAX_BUFF_SIZE -> + {stop, buff_size_limit}; +decode(Payload, [], TCPState) -> + {ok, [], TCPState#tcp{buff = Payload}}; +decode(Payload, [ProductId | Products], #tcp{state = #state{env = Env} = State} = TCPState) -> + case dgiot_bridge:parse_frame(ProductId, Payload, Env) of + {error, function_not_exported} -> + decode(Payload, Products, TCPState); + {error, Reason} -> + {stop, Reason}; + {Rest, Messages, NewEnv} when length(Messages) > 0 -> + {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}}; + {Rest, Messages} when length(Messages) > 0 -> + {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}}; + _ -> + decode(Payload, Products, TCPState) + end; + +decode(Payload, ProductId, #tcp{state = #state{env = Env} = State} = TCPState) -> + case dgiot_bridge:parse_frame(ProductId, Payload, Env) of + {error, Reason} -> + {stop, Reason}; + {Rest, Messages} -> + {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{product = ProductId}}}; + {Rest, Messages, NewEnv} -> + {ok, Messages, TCPState#tcp{buff = Rest, state = State#state{env = NewEnv, product = ProductId}}} + end. + +do_product(Fun, Args, #tcp{state = #state{product = ProductIds, id = ChannelId, env = Env}} = TCPState) -> + case dgiot_bridge:apply_channel(ChannelId, ProductIds, Fun, Args, Env#{<<"send">> => send_fun(TCPState)}) of + {ok, NewEnv} -> + {ok, update_state(NewEnv, TCPState)}; + {stop, Reason, NewEnv} -> + {stop, Reason, update_state(NewEnv, TCPState)}; + {reply, ProductId, Reply, NewEnv} -> + {reply, ProductId, Reply, update_state(NewEnv, TCPState)} + end. + +handle_frames([], TCPState) -> + {noreply, TCPState}; +handle_frames([Frame | Frames], TCPState) -> + case do_product(handle_info, [{message, Frame}], TCPState) of + {ok, NewTCPState} -> + handle_frames(Frames, NewTCPState); + {stop, Reason, NewTCPState} -> + {stop, Reason, NewTCPState} + end. + +update_state(Env, #tcp{state = State} = TCPState) -> + TCPState#tcp{state = State#state{env = maps:without([<<"send">>], Env)}}. + +create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) -> + case dgiot_parse:get_object(<<"Product">>, ProductId) of + {ok, #{<<"ACL">> := Acl, <<"devType">> := DevType}} -> + case dgiot_parse:get_object(<<"Device">>, DeviceId) of + {ok, #{<<"devaddr">> := _GWAddr}} -> + dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"ip">> => DTUIP, <<"status">> => <<"ONLINE">>}), + dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC), + create_instruct(Acl, ProductId, DeviceId); + _ -> + dgiot_device:create_device(#{ + <<"devaddr">> => DTUMAC, + <<"name">> => <>, + <<"ip">> => DTUIP, + <<"isEnable">> => true, + <<"product">> => ProductId, + <<"ACL">> => Acl, + <<"status">> => <<"ONLINE">>, + <<"location">> => #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.161324, <<"latitude">> => 30.262441}, + <<"brand">> => Dtutype, + <<"devModel">> => DevType + }), + dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC), + create_instruct(Acl, ProductId, DeviceId) + end, + Productname = + case dgiot_parse:get_object(<<"Product">>, ProductId) of + {ok, #{<<"name">> := Productname1}} -> + Productname1; + _ -> + <<"">> + end, + ?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => DTUMAC, <<"productid">> => ProductId, <<"productname">> => Productname, <<"devicename">> => <>}, ['online']), + dgiot_device:sub_topic(DeviceId, <<"tcp/hex">>), + dgiot_device:save_log(DeviceId, dgiot_utils:binary_to_hex(DTUMAC), ['tcp_receive']), + {DeviceId, DTUMAC}; + Error2 -> + ?LOG(info, "Error2 ~p ", [Error2]), + {<<>>, <<>>} + end. + + +create_instruct(ACL, DtuProductId, DtuDevId) -> + case dgiot_product:lookup_prod(DtuProductId) of + {ok, #{<<"thing">> := #{<<"properties">> := Properties}}} -> + lists:map(fun(Y) -> + case Y of + #{<<"dataSource">> := #{<<"slaveid">> := 256}} -> %%不做指令 + pass; + #{<<"dataSource">> := #{<<"slaveid">> := SlaveId}} -> + Pn = dgiot_utils:to_binary(SlaveId), + dgiot_instruct:create(DtuProductId, DtuDevId, Pn, ACL, <<"all">>, #{<<"properties">> => [Y]}); + _ -> pass + end + end, Properties); + _ -> pass + end. + +get_productid(ChannelId, Products, Head, Dtutype) -> + NewProductId = dgiot_parse:get_productid(<<"DGIOTHUB"/utf8>>, dgiot_utils:to_binary(Head), dgiot_utils:to_binary(Dtutype)), + case lists:member(NewProductId, Products) of + false -> + {ok, Acl} = dgiot_bridge:get_acl(ChannelId), + case dgiot_parse:get_object(<<"Product">>, NewProductId) of + {ok, _} -> + pass; + _ -> + Product = #{ + <<"name">> => dgiot_utils:to_binary(Dtutype), + <<"devType">> => dgiot_utils:to_binary(Head), + <<"category">> => <<"DGIOTHUB"/utf8>>, + <<"ACL">> => Acl, + <<"netType">> => <<"NB-IOT">>, + <<"nodeType">> => 3, + <<"config">> => #{}, + <<"thing">> => #{}, + <<"productSecret">> => dgiot_utils:random() + }, + dgiot_parse:create_object(<<"Product">>, Product), + pass + end; + true -> + pass + end, + NewProductId. + +check_login(Head, Len, Addr) -> + HexAddr = dgiot_utils:binary_to_hex(Addr), + HexList = dgiot_utils:to_list(HexAddr), + List = dgiot_utils:to_list(Addr), + case re:run(HexAddr, Head, [{capture, first, list}]) of + {match, [Head]} when length(HexList) == Len -> + HexAddr; + _Error -> + case re:run(Addr, Head, [{capture, first, list}]) of + {match, [Head]} when length(List) == Len -> + Addr; + _Error1 -> + <<>> + end + end. diff --git a/apps/dgiot_dlink/src/dgiot_mqttc_channel.erl b/apps/dgiot_dlink/src/dgiot_mqttc_channel.erl index 21a4be53..3c33e9a0 100644 --- a/apps/dgiot_dlink/src/dgiot_mqttc_channel.erl +++ b/apps/dgiot_dlink/src/dgiot_mqttc_channel.erl @@ -34,12 +34,12 @@ %% 注册通道类型 -channel_type(#{ cType => ?TYPE, - type => ?PROTOCOL_CHL, + type => ?BRIDGE_CHL, title => #{ - zh => <<"MQTT资源通道"/utf8>> + zh => <<"MQTT桥接通道"/utf8>> }, description => #{ - zh => <<"MQTT资源通道"/utf8>> + zh => <<"MQTT桥接通道"/utf8>> } }). %% 注册通道参数 @@ -72,7 +72,7 @@ order => 3, type => string, required => true, - default => <<"test"/utf8>>, + default => <<"anonymous"/utf8>>, title => #{ zh => <<"用户名"/utf8>> }, @@ -93,7 +93,7 @@ } }, <<"ssl">> => #{ - order => 6, + order => 5, type => boolean, required => true, default => false, @@ -105,7 +105,7 @@ } }, <<"clean_start">> => #{ - order => 7, + order => 6, type => boolean, required => true, default => false, @@ -188,13 +188,14 @@ handle_cast(_Request, State) -> {noreply, State}. handle_info({connect, Client}, #state{id = ChannelId} = State) -> + emqtt:subscribe(Client, {<<"bridge/#">>, 1}), case dgiot_bridge:get_products(ChannelId) of {ok, _Type, ProductIds} -> case ProductIds of [] -> pass; _ -> lists:map(fun(ProductId) -> -%% dgiot_product:load(ProductId), +%% dgiot_product:load(ProductId), emqtt:subscribe(Client, {<<"bridge/thing/", ProductId/binary, "/#">>, 1}), dgiot_mqtt:subscribe(<<"forward/thing/", ProductId/binary, "/+/post">>), dgiot_mqtt:publish(ChannelId, <<"thing/", ProductId/binary>>, jsx:encode(#{<<"network">> => <<"connect">>})) @@ -225,7 +226,8 @@ handle_info({publish, #{payload := Payload, topic := <<"bridge/", Topic/binary>> handle_info({deliver, _, Msg}, #state{client = Client} = State) -> case dgiot_mqtt:get_topic(Msg) of - <<"forward/", Topic/binary>> -> emqtt:publish(Client, Topic, dgiot_mqtt:get_payload(Msg)); + <<"forward/", Topic/binary>> -> + emqtt:publish(Client, Topic, dgiot_mqtt:get_payload(Msg)); _ -> pass end, {noreply, State}; diff --git a/dgiot_install.sh b/dgiot_install.sh index be72367f..8ea7f496 100644 --- a/dgiot_install.sh +++ b/dgiot_install.sh @@ -1167,6 +1167,41 @@ function make_ssl() { fi } +function build_dashboard_lite() { + if [ ! -d ${script_dir}/node-v16.5.0-linux-x64/bin/ ]; then + if [ ! -f ${script_dir}/node-v16.5.0-linux-x64.tar.gz ]; then + wget https://dgiotdev-1308220533.cos.ap-nanjing.myqcloud.com/node-v16.5.0-linux-x64.tar.gz &> /dev/null + tar xvf node-v16.5.0-linux-x64.tar.gz &> /dev/null + if [ ! -f usr/bin/node ]; then + rm /usr/bin/node -rf + fi + ln -s ${script_dir}/node-v16.5.0-linux-x64/bin/node /usr/bin/node + ${script_dir}/node-v16.5.0-linux-x64/bin/npm i -g yarn --registry=https://registry.npmmirror.com + ${script_dir}/node-v16.5.0-linux-x64/bin/yarn config set registry https://registry.npmmirror.com + ${script_dir}/node-v16.5.0-linux-x64/bin/yarn -v + ${script_dir}/node-v16.5.0-linux-x64/bin/yarn get registry + sudo /bin/dd if=/dev/zero of=/var/swap.1 bs=1M count=1024 + sudo /sbin/mkswap /var/swap.1 + sudo /sbin/swapon /var/swap.1 + fi + fi + + cd ${script_dir}/ + if [ ! -d ${script_dir}/dgiot_dashboard_lite/ ]; then + git clone -b master https://gitee.com/dgiiot/dgiot-dashboard-lite.git dgiot_dashboard_lite + fi + + cd ${script_dir}/dgiot_dashboard_lite + git reset --hard + git pull + + export PATH=$PATH:/usr/local/bin:${script_dir}/node-v16.5.0-linux-x64/bin/ + rm ${script_dir}/dgiot_dashboard_lite/dist/ -rf + ${script_dir}/node-v16.5.0-linux-x64/bin/yarn install + ${script_dir}/node-v16.5.0-linux-x64/bin/yarn build + echo "not build" + } + function build_dashboard() { if [ ! -d ${script_dir}/node-v16.5.0-linux-x64/bin/ ]; then if [ ! -f ${script_dir}/node-v16.5.0-linux-x64.tar.gz ]; then @@ -1241,6 +1276,10 @@ function pre_build_dgiot() { cp ${script_dir}/dgiot_dashboard/dist/ ${script_dir}/$plugin/apps/dgiot_api/priv/www -rf fi + if [ -d ${script_dir}/dgiot_dashboard_lite/dist ]; then + cp ${script_dir}/dgiot_dashboard_lite/dist/ ${script_dir}/$plugin/apps/dgiot_api/priv/www/lite -rf + fi + if [ -d ${script_dir}/dgiot/emqx/rel/ ]; then rm ${script_dir}/dgiot/emqx/rel -rf fi @@ -1274,6 +1313,7 @@ function post_build_dgiot() { function devops() { build_dashboard + build_dashboard_lite pre_build_dgiot make post_build_dgiot @@ -1281,6 +1321,7 @@ function devops() { function ci() { build_dashboard + build_dashboard_lite pre_build_dgiot make ci post_build_dgiot