fix: rm no use function

This commit is contained in:
jhonliu 2022-04-25 13:55:57 +08:00
parent 7504274aff
commit e35a6deb87
13 changed files with 262 additions and 942 deletions

View File

@ -14,262 +14,119 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_tcp_server).
-module(dgiot_ws_server).
-author("johnliu").
-include("dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
%% API
-export([start_link/5, child_spec/3, child_spec/4, send/2]).
-export([start/0]).
%% gen_server callbacks
-export([init/5, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
start() ->
F = fun interact/2,
spawn(fun() -> start(F, 0) end).
-define(SERVER, ?MODULE).
start(F, State0) ->
{ok, Listen} = gen_tcp:listen(8000, [{packet,raw}, {reuseaddr,true}, {active, true}]),
par_connect(Listen, F, State0).
-record(state, {mod, conn_state, active_n, incoming_bytes = 0, rate_limit, limit_timer, child = #tcp{}}).
child_spec(Mod, Port, State) ->
child_spec(Mod, Port, State, []).
child_spec(Mod, Port, State, Opts) ->
Name = Mod,
ok = esockd:start(),
case dgiot_transport:get_opts(tcp, Port) of
{ok, DefActiveN, DefRateLimit, TCPOpts} ->
ActiveN = proplists:get_value(active_n, Opts, DefActiveN),
RateLimit = proplists:get_value(rate_limit, Opts, DefRateLimit),
Opts1 = lists:foldl(fun(Key, Acc) -> proplists:delete(Key, Acc) end, Opts, [active_n, rate_limit]),
NewOpts = [{active_n, ActiveN}, {rate_limit, RateLimit}] ++ Opts1,
MFArgs = {?MODULE, start_link, [Mod, NewOpts, State]},
esockd:child_spec(Name, Port, TCPOpts, MFArgs);
_ ->
[]
par_connect(Listen, F, State0) ->
case gen_tcp:accept(Listen) of
{ok, Socket} ->
spawn(fun() -> par_connect(Listen, F, State0) end),
wait(Socket, F, State0);
{error, closed} ->
{ok, Listen2} = gen_tcp:listen(8000, [{packet,raw}, {reuseaddr,true}, {active, true}]),
par_connect(Listen2, F, State0)
end.
start_link(Transport, Sock, Mod, Opts, State) ->
{ok, proc_lib:spawn_link(?MODULE, init, [Mod, Transport, Opts, Sock, State])}.
init(Mod, Transport, Opts, Sock0, State) ->
case Transport:wait(Sock0) of
{ok, Sock} ->
ChildState = #tcp{socket = Sock, register = false, transport = Transport, state = State},
case Mod:init(ChildState) of
{ok, NewChildState} ->
GState = #state{
mod = Mod,
conn_state = running,
active_n = proplists:get_value(active_n, Opts, 8),
rate_limit = rate_limit(proplists:get_value(rate_limit, Opts)),
child = NewChildState
},
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server">>, 1),
ok = activate_socket(GState),
gen_server:enter_loop(?MODULE, [], GState);
{error, Reason} ->
{stop, Reason}
end;
{error, Reason} ->
{stop, Reason}
wait(Socket, F, State0) ->
receive
{tcp, Socket, Data} ->
Key = list_to_binary(lists:last(string:tokens(hd(lists:filter(fun(S) -> lists:prefix("Sec-WebSocket-Key:", S) end, string:tokens(Data, "\r\n"))), ": "))),
Challenge = base64:encode(crypto:hash(sha, << Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)),
Handshake =
["HTTP/1.1 101 Switching Protocols\r\n",
"connection: Upgrade\r\n",
"upgrade: websocket\r\n",
"sec-websocket-accept: ", Challenge, "\r\n",
"\r\n",<<>>],
gen_tcp:send(Socket, Handshake),
send_data(Socket, "Hello, my world"),
S = self(),
Pid = spawn_link(fun() -> F(S, State0) end),
loop(Socket, Pid);
_Any ->
wait(Socket, F, State0)
end.
handle_call(Request, From, #state{mod = Mod, child = ChildState} = State) ->
case Mod:handle_call(Request, From, ChildState) of
{reply, Reply, NewChildState} ->
{reply, Reply, State#state{child = NewChildState}, hibernate};
{stop, Reason, NewChildState} ->
{stop, Reason, State#state{child = NewChildState}}
end.
handle_cast(Msg, #state{mod = Mod, child = ChildState} = State) ->
case Mod:handle_cast(Msg, ChildState) of
{noreply, NewChildState} ->
{noreply, State#state{child = NewChildState}, hibernate};
{stop, Reason, NewChildState} ->
{stop, Reason, State#state{child = NewChildState}}
end.
handle_info(activate_socket, State) ->
NewState = State#state{limit_timer = undefined, conn_state = running},
ok = activate_socket(NewState),
{noreply, NewState, hibernate};
handle_info({tcp_passive, _Sock}, State) ->
NState = ensure_rate_limit(State),
ok = activate_socket(NState),
{noreply, NState};
%% add register function
handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{register = false, buff = Buff, socket = Sock} = ChildState} = State) ->
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1),
Binary = iolist_to_binary(Data),
NewBin =
case binary:referenced_byte_size(Binary) of
Large when Large > 2 * byte_size(Binary) ->
binary:copy(Binary);
_ ->
Binary
end,
write_log(ChildState#tcp.log, <<"RECV">>, NewBin),
Cnt = byte_size(NewBin),
NewChildState = ChildState#tcp{buff = <<>>},
case Mod:handle_info({tcp, <<Buff/binary, NewBin/binary>>}, NewChildState) of
{noreply, #tcp{register = true, clientid = ClientId, buff = Buff, socket = Sock} = NewChild} ->
dgiot_cm:register_channel(ClientId, self(), #{conn_mod => Mod}),
Ip = dgiot_utils:get_ip(Sock),
Port = dgiot_utils:get_port(Sock),
dgiot_cm:insert_channel_info(ClientId, #{ip => Ip, port => Port, online => dgiot_datetime:now_microsecs()}, [{tcp_recv, 1}]),
{noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate};
{noreply, NewChild} ->
{noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate};
{stop, Reason, NewChild} ->
{stop, Reason, State#state{child = NewChild}}
end;
handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socket = Sock} = ChildState} = State) ->
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1),
Binary = iolist_to_binary(Data),
NewBin =
case binary:referenced_byte_size(Binary) of
Large when Large > 2 * byte_size(Binary) ->
binary:copy(Binary);
_ ->
Binary
end,
write_log(ChildState#tcp.log, <<"RECV">>, NewBin),
Cnt = byte_size(NewBin),
NewChildState = ChildState#tcp{buff = <<>>},
case NewChildState of
#tcp{clientid = CliendId, register = true} ->
dgiot_device:online(CliendId),
dgiot_tracer:check_trace(CliendId, CliendId, dgiot_utils:binary_to_hex(Binary), ?MODULE, ?LINE);
_ ->
pass
end,
case Mod:handle_info({tcp, <<Buff/binary, NewBin/binary>>}, NewChildState) of
{noreply, NewChild} ->
{noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate};
{stop, Reason, NewChild} ->
{stop, Reason, State#state{child = NewChild}}
end;
handle_info({shutdown, Reason}, #state{child = #tcp{clientid = CliendId, register = true} = ChildState} = State) ->
?LOG(error, "shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]),
dgiot_cm:unregister_channel(CliendId),
dgiot_device:offline(CliendId),
write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))),
{stop, normal, State#state{child = ChildState#tcp{socket = undefined}}};
handle_info({shutdown, Reason}, #state{child = ChildState} = State) ->
?LOG(error, "shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]),
write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))),
{stop, normal, State#state{child = ChildState#tcp{socket = undefined}}};
handle_info({tcp_error, _Sock, Reason}, #state{child = ChildState} = State) ->
?LOG(error, "tcp_error, ~p, ~p~n", [Reason, ChildState#tcp.state]),
write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))),
{stop, {shutdown, Reason}, State};
handle_info({tcp_closed, Sock}, #state{mod = Mod, child = #tcp{socket = Sock} = ChildState} = State) ->
write_log(ChildState#tcp.log, <<"ERROR">>, <<"tcp_closed">>),
?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]),
case Mod:handle_info(tcp_closed, ChildState) of
{noreply, NewChild} ->
{stop, normal, State#state{child = NewChild#tcp{socket = undefined}}};
{stop, _Reason, NewChild} ->
{stop, normal, State#state{child = NewChild#tcp{socket = undefined}}}
end;
handle_info(Info, #state{mod = Mod, child = ChildState} = State) ->
case Mod:handle_info(Info, ChildState) of
{noreply, NewChildState} ->
{noreply, State#state{child = NewChildState}, hibernate};
{stop, Reason, NewChildState} ->
{stop, Reason, State#state{child = NewChildState}}
end.
terminate(Reason, #state{mod = Mod, child = #tcp{clientid = CliendId, register = true} = ChildState}) ->
dgiot_cm:unregister_channel(CliendId),
dgiot_metrics:dec(dgiot_bridge, <<"tcp_server">>, 1),
Mod:terminate(Reason, ChildState);
terminate(Reason, #state{mod = Mod, child = ChildState}) ->
dgiot_metrics:dec(dgiot_bridge, <<"tcp_server">>, 1),
Mod:terminate(Reason, ChildState).
code_change(OldVsn, #state{mod = Mod, child = ChildState} = State, Extra) ->
{ok, NewChildState} = Mod:code_change(OldVsn, ChildState, Extra),
{ok, State#state{child = NewChildState}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
send(#tcp{clientid = CliendId, register = true, transport = Transport, socket = Socket}, Payload) ->
dgiot_tracer:check_trace(CliendId, CliendId, dgiot_utils:binary_to_hex(Payload), ?MODULE, ?LINE),
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1),
case Socket == undefined of
loop(Socket, Pid) ->
receive
{tcp, Socket, Data} ->
Text = websocket_data(Data),
case Text =/= <<>> of
true ->
{error, disconnected};
Pid ! {browser, self(), ["You said: ", Text]};
false ->
Transport:send(Socket, Payload)
end;
send(#tcp{transport = Transport, socket = Socket}, Payload) ->
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1),
case Socket == undefined of
true ->
{error, disconnected};
false ->
Transport:send(Socket, Payload)
end.
rate_limit({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst).
activate_socket(#state{conn_state = blocked}) ->
ok;
activate_socket(#state{child = #tcp{transport = Transport, socket = Socket}, active_n = N}) ->
TrueOrN =
case Transport:is_ssl(Socket) of
true -> true; %% Cannot set '{active, N}' for SSL:(
false -> N
end,
case Transport:setopts(Socket, [{active, TrueOrN}]) of
ok -> ok;
{error, Reason} ->
self() ! {shutdown, Reason},
ok
end.
ensure_rate_limit(State) ->
case esockd_rate_limit:check(State#state.incoming_bytes, State#state.rate_limit) of
{0, RateLimit} ->
State#state{incoming_bytes = 0, rate_limit = RateLimit};
{Pause, RateLimit} ->
%?LOG(info,"[~p] ensure_rate_limit :~p", [Pause, ensure_rate_limit]),
TRef = erlang:send_after(Pause, self(), activate_socket),
State#state{conn_state = blocked, incoming_bytes = 0, rate_limit = RateLimit, limit_timer = TRef}
end.
write_log(file, Type, Buff) ->
[Pid] = io_lib:format("~p", [self()]),
Date = dgiot_datetime:format("YYYY-MM-DD"),
Path = <<"log/tcp_server/", Date/binary, ".txt">>,
filelib:ensure_dir(Path),
Time = dgiot_datetime:format("HH:NN:SS " ++ Pid),
Data = case Type of
<<"ERROR">> -> Buff;
_ -> <<<<Y>> || <<X:4>> <= Buff, Y <- integer_to_list(X, 16)>>
end,
file:write_file(Path, <<Time/binary, " ", Type/binary, " ", Data/binary, "\r\n">>, [append]),
loop(Socket, Pid);
{tcp_closed, Socket} ->
ok;
write_log({Mod, Fun}, Type, Buff) ->
catch apply(Mod, Fun, [Type, Buff]);
write_log(Fun, Type, Buff) when is_function(Fun) ->
catch Fun(Type, Buff);
write_log(_, _, _) ->
ok.
{send, Data} ->
send_data(Socket, Data),
loop(Socket, Pid);
_Any ->
loop(Socket, Pid)
end.
interact(Browser, State) ->
receive
{browser, Browser, Str} ->
Browser ! {send, Str},
interact(Browser, State)
after 1000 ->
Browser ! {send, "clock ! tick " ++ integer_to_list(State)},
interact(Browser, State+1)
end.
%% 125
websocket_data(Data) when is_list(Data) ->
websocket_data(list_to_binary(Data));
websocket_data(<< 1:1, 0:3, 1:4, 1:1, Len:7, MaskKey:32, Rest/bits >>) when Len < 126 ->
<<End:Len/binary, _/bits>> = Rest,
Text = websocket_unmask(End, MaskKey, <<>>),
Text;
websocket_data(_) ->
<<>>.
%% Browser发过来的数据都是mask的,unmask
websocket_unmask(<<>>, _, Unmasked) ->
Unmasked;
websocket_unmask(<< O:32, Rest/bits >>, MaskKey, Acc) ->
T = O bxor MaskKey,
websocket_unmask(Rest, MaskKey, << Acc/binary, T:32 >>);
websocket_unmask(<< O:24 >>, MaskKey, Acc) ->
<< MaskKey2:24, _:8 >> = << MaskKey:32 >>,
T = O bxor MaskKey2,
<< Acc/binary, T:24 >>;
websocket_unmask(<< O:16 >>, MaskKey, Acc) ->
<< MaskKey2:16, _:16 >> = << MaskKey:32 >>,
T = O bxor MaskKey2,
<< Acc/binary, T:16 >>;
websocket_unmask(<< O:8 >>, MaskKey, Acc) ->
<< MaskKey2:8, _:24 >> = << MaskKey:32 >>,
T = O bxor MaskKey2,
<< Acc/binary, T:8 >>.
%% Client
send_data(Socket, Payload) ->
Len = iolist_size(Payload),
BinLen = payload_length_to_binary(Len),
gen_tcp:send(Socket, [<< 1:1, 0:3, 1:4, 0:1, BinLen/bits >>, Payload]).
payload_length_to_binary(N) ->
case N of
N when N =< 125 -> << N:7 >>;
N when N =< 16#ffff -> << 126:7, N:16 >>;
N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >>
end.

View File

@ -207,7 +207,7 @@ init(Req0, install) ->
init(Req0, mod) ->
Mod = dgiot_req:binding(<<"Mod">>, Req0),
Fun = dgiot_req:binding(<<"Fun">>, Req0),
?LOG(error,"Mod ~p Fun ~p",[Mod,Fun]),
%% ?LOG(error,"Mod ~p Fun ~p",[Mod,Fun]),
Req =
case catch apply(list_to_atom(binary_to_list(Mod)), list_to_atom(binary_to_list(Fun)), [Req0]) of
{Err, Reason} when Err == 'EXIT'; Err == error ->

View File

@ -1,76 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc dgiot_bamis Protocol
-module(dgiot_bamis).
-include("dgiot_bamis.hrl").
-include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot/include/dgiot_mnesia.hrl").
-export([
create_amis/3,
put_amis_device/3,
del_amis_device/1,
created_amis_device/3
]).
-define(APP, ?MODULE).
del_amis_device(DeviceId) ->
dgiot_mnesia:delete(DeviceId).
%%
put_amis_device(put_amis_device, #{<<"objectId">> := Deviceid} = Body, SessionToken) ->
case dgiot_parse:get_object(<<"Device">>, Deviceid,
[{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"data">> := OldRole}} ->
dgiot_parse:update_object(<<"Device">>, Deviceid, #{
<<"data">> => maps:without([
<<"parent">>,
<<"createdAt">>,
<<"updatedAt">>], maps:merge(OldRole, Body))},
[{"X-Parse-Session-Token", SessionToken}], [{from, rest}]);
Error -> Error
end.
%%
created_amis_device(DtuAddr, ChannelId, DTUIP) ->
{ProductId, Acl, _Properties} = dgiot_data:get({amis, ChannelId}),
Requests = #{
<<"devaddr">> => DtuAddr,
<<"name">> => <<"AMIS_", DtuAddr/binary>>,
<<"ip">> => DTUIP,
<<"isEnable">> => true,
<<"product">> => ProductId,
<<"ACL">> => Acl,
<<"status">> => <<"ONLINE">>,
<<"brand">> => <<"AMIS", DtuAddr/binary>>,
<<"devModel">> => <<"AMIS">>
},
dgiot_device:create_device(Requests).
%%
create_amis(DtuAddr, ChannelId, DTUIP) ->
{ProductId, Acl, _Properties} = dgiot_data:get({amis, ChannelId}),
Requests = #{
<<"devaddr">> => DtuAddr,
<<"name">> => <<"AMIS_", DtuAddr/binary>>,
<<"ip">> => DTUIP,
<<"isEnable">> => true,
<<"product">> => ProductId,
<<"ACL">> => Acl,
<<"status">> => <<"ONLINE">>,
<<"brand">> => <<"AMIS", DtuAddr/binary>>,
<<"devModel">> => <<"AMIS">>
},
dgiot_device:create_device(Requests).

View File

@ -1,153 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_bamis_handler).
-author("johnliu").
-behavior(dgiot_rest).
-dgiot_rest(all).
-include_lib("dgiot/include/logger.hrl").
-dgiot_swagger(amis).
%% API
-export([swagger_amis/0]).
-export([handle/4]).
%% API描述
%%
%% :
%% 1. Metadata为map表示的JSON,
%% dgiot_http_server:bind(<<"/amis">>, ?MODULE, [], Metadata)
%% 2. priv/swagger/
%% dgiot_http_server:bind(<<"/swagger_amis.json">>, ?MODULE, [], priv)
swagger_amis() ->
[
dgiot_http_server:bind(<<"/swagger_amis.json">>, ?MODULE, [], priv)
].
%%%===================================================================
%%%
%%% , Context <<"user">>, version
%%%===================================================================
-spec handle(OperationID :: atom(), Args :: map(), Context :: map(), Req :: dgiot_req:req()) ->
{Status :: dgiot_req:http_status(), Body :: map()} |
{Status :: dgiot_req:http_status(), Headers :: map(), Body :: map()} |
{Status :: dgiot_req:http_status(), Headers :: map(), Body :: map(), Req :: dgiot_req:req()}.
handle(OperationID, Args, Context, Req) ->
Headers = #{},
case catch do_request(OperationID, Args, Context, Req) of
{ErrType, Reason} when ErrType == 'EXIT'; ErrType == error ->
Err = case is_binary(Reason) of
true -> Reason;
false -> dgiot_utils:format("~p", [Reason])
end,
{500, Headers, #{<<"error">> => Err}};
ok ->
?LOG(debug,"do request: ~p, ~p ->ok ~n", [OperationID, Args]),
{200, Headers, #{}, Req};
{ok, Res} ->
?LOG(debug,"do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{200, Headers, Res, Req};
{Status, Res} ->
?LOG(debug,"do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{Status, Headers, Res, Req};
{Status, NewHeaders, Res} ->
?LOG(info,"do request2: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{Status, maps:merge(Headers, NewHeaders), Res, Req}
end.
%%%===================================================================
%%% Version:API版本
%%%===================================================================
%% iot_hub : api资源 :
%% OperationId:get_amis
%% :POST /iotapi/get_amis
do_request(get_amis, #{<<"deviceid">> := Deviceid}, _Context, _Req) ->
case dgiot_parse:get_object(<<"Device">>, Deviceid) of
{ok, #{<<"profile">> := Profile1}} ->
{ok, Profile1};
_ ->
{ok, #{}}
end;
%% iot_hub : amis设备
%% OperationId:get_amis
%% :POST /iotapi/get_amis_device
do_request(get_amis_device, #{<<"deviceid">> := Deviceid}, _Context, _Req) ->
case dgiot_parse:get_object(<<"Device">>, Deviceid) of
{ok, info} ->
{ok, info};
_ ->
{ok, #{}}
end;
%% Role模版 : amis设备
%% OperationId:put_amis_device
%% :POST /iotapi/put_amis_device
do_request(put_amis_device, #{<<"objectId">> := Deviceid} = Body, #{<<"sessionToken">> := SessionToken} = _Context, _Req0) ->
io:format("Body ~p~n", [Body]),
case Deviceid of
<<"Klht7ERlYn">> ->
{ok, #{<<"code">> => 401, <<"result">> => <<"dgiot_admin Cannot be Resignation">>}};
<<"lerYRy2jsh">> ->
{ok, #{<<"code">> => 401, <<"result">> => <<"dgiot_admin Cannot be Resignation">>}};
_ ->
dgiot_bamis_handler:put_amis_device(Body, SessionToken)
end;
%% iot_hub : amis设备
%% OperationId:del_amis_device
%% :POST /iotapi/del_amis_device
do_request(del_amis_device, #{<<"deviceid">> := Deviceid}, _Context, _Req) ->
case Deviceid of
<<"Klht7ERlYn">> ->
{ok, #{<<"code">> => 401, <<"result">> => <<"dgiot_admin Cannot be Resignation">>}};
<<"lerY给i他gitgit'x'f'b'xRy2jsh">> ->
{ok, #{<<"code">> => 401, <<"result">> => <<"dgiot_admin Cannot be Resignation">>}};
_ ->
dgiot_bamis_handler:del_amis_device(Deviceid)
end;
%% iot_hub : amis设备
%% OperationId:created_amis_device
%% :POST /iotapi/created_amis_device
do_request(created_amis_device, #{<<"deviceid">> := Deviceid}, #{<<"ChannelId">> := ChannelId},#{<<"DTUIP">> := DTUIP} = Body) ->
case Deviceid of
<<"test">> ->
{ok, #{<<"code">> => test, <<"result">> => <<"you input test">>}};
_ ->
{ok, dgiot_bamis_handler:created_amis_device(Deviceid,ChannelId,DTUIP)}
end;
%% API接口
do_request(_OperationId, _Args, _Context, _Req) ->
?LOG(info,"_OperationId:~p~n", [_OperationId]),
{error, <<"Not Allowed.">>}.

View File

@ -47,230 +47,6 @@
"AMIS"
]
}
},
"/amis": {
"get": {
"description": "amis测试",
"parameters": [
{
"name": "deviceid",
"in": "query",
"type": "string",
"default": "b1002a0317",
"description": "设备Id"
}
],
"responses": {
"200": {
"description": "Returns operation status"
},
"400": {
"description": "Bad Request"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"summary": "amis测试",
"tags": [
"AMIS"
]
}
},
"/amis_device": {
"get": {
"description": "查询amis设备",
"parameters": [
{
"name": "deviceid",
"in": "query",
"type": "string",
"default": "b1002a0317",
"description": "设备Id"
}
],
"responses": {
"200": {
"description": "Returns operation status"
},
"400": {
"description": "Bad Request"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"summary": "查询amis设备",
"tags": [
"AMIS"
]
},
"put": {
"description": "修改amis设备",
"parameters": [
{
"in": "body",
"name": "user",
"required": true,
"schema": {
"type": "object",
"properties": {
"deviceid": {
"type": "string",
"description": "设备Id",
"required": true
},
"name": {
"required": true,
"type": "string",
"description": "设备名称"
},
"devaddr": {
"required": true,
"type": "string",
"description": "设备地址"
}
}
}
}
],
"responses": {
"200": {
"description": "Returns operation status"
},
"400": {
"description": "Bad Request"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"summary": "修改amis设备",
"tags": [
"AMIS"
]
},
"delete": {
"description": "删除amis设备",
"parameters": [
{
"name": "deviceid",
"in": "query",
"type": "string",
"default": "b1002a0317",
"description": "设备Id"
}
],
"responses": {
"200": {
"description": "Returns operation status"
},
"400": {
"description": "Bad Request"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"summary": "删除amis设备",
"tags": [
"AMIS"
]
},
"post": {
"description": "创建amis设备",
"parameters": [
{
"in": "body",
"name": "user",
"required": true,
"schema": {
"type": "object",
"properties": {
"deviceid": {
"type": "string",
"description": "设备Id",
"required": true
},
"ChannelId": {
"required": true,
"type": "string",
"description": "设备通道Id"
},
"DTUIP": {
"required": true,
"type": "string",
"description": "设备DTUIP"
}
}
}
}
],
"responses": {
"200": {
"description": "Returns operation status"
},
"400": {
"description": "Bad Request"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"summary": "创建amis设备",
"tags": [
"AMIS"
]
}
},
"/update_product": {
"post": {
"security": [],
"description": "更新设备数据",
"parameters": [
{
"in": "body",
"name": "data",
"schema": {
"type": "object",
"properties": {}
}
}
],
"responses": {
"200": {
"description": "Returns operation status"
},
"400": {
"description": "Bad Request"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"summary": "更新设备数据",
"tags": [
"AMIS"
]
}
}
}
}

View File

@ -24,11 +24,7 @@
get/1,
post/1,
put/1,
delete/1,
create_amis/3,
put_amis_device/2,
del_amis_device/1,
created_amis_device/3
delete/1
]).
-define(APP, ?MODULE).
@ -86,54 +82,3 @@ delete({'after', Data}) ->
<<"msg">> => <<"删除成功"/utf8>>,
<<"data">> => Data
}.
del_amis_device(DeviceId) ->
dgiot_device:delete(DeviceId).
%%
put_amis_device(#{<<"objectId">> := Deviceid} = Body, SessionToken) ->
case dgiot_parse:get_object(<<"Device">>, Deviceid,
[{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"data">> := OldRole}} ->
dgiot_parse:update_object(<<"Device">>, Deviceid, #{
<<"data">> => maps:without([
<<"parent">>,
<<"createdAt">>,
<<"updatedAt">>,
<<"ACL">>,
<<"objectId">>
], maps:merge(OldRole, Body))},
[{"X-Parse-Session-Token", SessionToken}], [{from, rest}]);
Error -> Error
end.
%%
created_amis_device(DtuAddr, ChannelId, DTUIP) ->
{ProductId, Acl, _Properties} = dgiot_data:get({amis, ChannelId}),
Requests = #{
<<"devaddr">> => DtuAddr,
<<"name">> => <<"AMIS_", DtuAddr/binary>>,
<<"ip">> => DTUIP,
<<"isEnable">> => true,
<<"product">> => ProductId,
<<"ACL">> => Acl,
<<"status">> => <<"ONLINE">>,
<<"brand">> => <<"AMIS", DtuAddr/binary>>,
<<"devModel">> => <<"AMIS">>
},
dgiot_device:create_device(Requests).
%%
create_amis(DtuAddr, ChannelId, DTUIP) ->
{ProductId, Acl, _Properties} = dgiot_data:get({amis, ChannelId}),
Requests = #{
<<"devaddr">> => DtuAddr,
<<"name">> => <<"AMIS_", DtuAddr/binary>>,
<<"ip">> => DTUIP,
<<"isEnable">> => true,
<<"product">> => ProductId,
<<"ACL">> => Acl,
<<"status">> => <<"ONLINE">>,
<<"brand">> => <<"AMIS", DtuAddr/binary>>,
<<"devModel">> => <<"AMIS">>
},
dgiot_device:create_device(Requests).

View File

@ -109,10 +109,8 @@ handle_event(_EventId, _Event, State) ->
% SELECT clientid, payload, topic FROM "meter"
% SELECT clientid, disconnected_at FROM "$events/client_disconnected" WHERE username = 'dgiot'
% SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'dgiot'
handle_message({rule, #{clientid := DtuAddr, connected_at := _ConnectedAt}, #{peername := PeerName} = _Context}, #state{id = ChannelId} = State) ->
handle_message({rule, #{clientid := DtuAddr, connected_at := _ConnectedAt}, #{peername := PeerName} = _Context}, #state{id = _ChannelId} = State) ->
?LOG(error,"DtuAddr ~p PeerName ~p",[DtuAddr,PeerName] ),
DTUIP = dgiot_utils:get_ip(PeerName),
dgiot_bamis:create_amis(DtuAddr, ChannelId, DTUIP),
{ok, State};
handle_message({rule, #{clientid := DevAddr, disconnected_at := _DisconnectedAt}, _Context}, State) ->

View File

@ -97,103 +97,6 @@ do_request(post_dashboard, Arg, Context, _Req) ->
Data = dgiot_dashboard:post_dashboard(Arg, Context),
{200, Data};
%% iot_hub : api资源 :
%% OperationId:get_amis
%% :POST /iotapi/get_amis
do_request(get_amis, #{<<"deviceid">> := Deviceid}, _Context, _Req) ->
case dgiot_parse:get_object(<<"Device">>, Deviceid) of
{ok, #{<<"profile">> := Profile}} ->
{ok, Profile};
_ ->
{error, #{}}
end;
%% iot_hub : amis设备
%% OperationId:get_amis
%% :POST /iotapi/get_amis_device
do_request(get_amis_device, #{<<"deviceid">> := Deviceid}, _Context, _Req) ->
case dgiot_parse:get_object(<<"Device">>, Deviceid) of
{ok, Info} ->
{ok, #{<<"status">> => 0, <<"msg">> => <<"success">>, <<"data">> => Info}};
_ ->
{error, #{<<"status">> => 404, <<"msg">> => <<"error">>, <<"result">> => <<"deviec info null">>}}
end;
%% Role模版 : amis设备
%% OperationId:put_amis_device
%% :POST /iotapi/put_amis_device
do_request(put_amis_device, Body, #{<<"sessionToken">> := SessionToken}, _Req0) ->
%% io:format("Body ~p~n", [Body]),
case dgiot_bamis:put_amis_device(Body, SessionToken) of
{ok, Info} ->
{ok, Info};
_ ->
{error, #{<<"code">> => 404, <<"result">> => <<"device info null">>}}
end;
%% iot_hub : amis设备
%% OperationId:del_amis_device
%% :POST /iotapi/del_amis_device
do_request(delete_amis_device, #{<<"deviceid">> := DeviceId} = _Body, #{<<"sessionToken">> := SessionToken} = _Context, _Req) ->
case dgiot_parse:del_object(<<"Device">>, DeviceId, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
ok ->
{ok, #{
<<"status">> => 200,
<<"msg">> => "delete success"
}};
_ ->
{ok, #{<<"status">> => 200, <<"msg">> => "delete error"}}
end;
%% iot_hub : amis设备
%% OperationId:created_amis_device
%% :POST /iotapi/created_amis_device
do_request(post_amis_device, #{<<"deviceid">> := Deviceid, <<"ChannelId">> := ChannelId, <<"DTUIP">> := DTUIP} = _Body, _Context, _Req) ->
case dgiot_bamis:created_amis_device(Deviceid, ChannelId, DTUIP) of
{ok, Info} ->
{ok, Info};
_ ->
{error, #{<<"code">> => 404, <<"result">> => <<"device info null">>}}
end;
do_request(post_update_product, _Body, _Context, _Req) ->
case dgiot_parse:query_object(<<"Product">>, #{<<"where">> => #{}}) of
{ok, #{<<"results">> := Products}} ->
io:format("~s ~p Products = ~p.~n", [?FILE, ?LINE, Products]),
lists:foldl(fun(Product, _Acc) ->
case Product of
#{<<"objectId">> := ProductId, <<"thing">> := #{<<"properties">> := Properties} = _Thing} ->
NewProperties =
lists:foldl(fun(X, Acc) ->
case X of
#{<<"dataForm">> := #{<<"protocol">> := <<"modbus">>, <<"data">> := Data, <<"address">> := Address,
<<"slaveid">> := Slaveid, <<"operatetype">> := Operatetype, <<"originaltype">> := Originaltype} = DataForm} ->
Acc ++ [X#{
<<"dataForm">> => maps:without([<<"address">>, <<"data">>, <<"slaveid">>, <<"operatetype">>, <<"originaltype">>],
DataForm#{<<"protocol">> => <<"MODBUSRTU">>}),
<<"dataSource">> => #{
<<"data">> => Data,
<<"address">> => Address,
<<"slaveid">> => Slaveid,
<<"_dlinkindex">> => 0,
<<"operatetype">> => Operatetype,
<<"originaltype">> => Originaltype,
<<"registersnumber">> => 0
}
}];
_ ->
Acc ++ [X]
end
end, [], Properties),
dgiot_parse:update_object(<<"Product">>, ProductId, #{<<"thing">> => #{<<"properties">> => NewProperties}});
_ ->
pass
end
end, [], Products);
_Error ->
{error, #{<<"code">> => 404, <<"result">> => <<"device info null">>}}
end;
%% API接口
do_request(_OperationId, _Args, _Context, _Req) ->
?LOG(info, "_OperationId:~p~n", [_OperationId]),

View File

@ -51,18 +51,6 @@
zh => <<"侦听端口"/utf8>>
}
},
<<"path">> => #{
order => 2,
type => string,
required => true,
default => <<"/test">>,
title => #{
zh => <<"路径"/utf8>>
},
description => #{
zh => <<"路径"/utf8>>
}
},
<<"ico">> => #{
order => 102,
type => string,

View File

@ -19,11 +19,10 @@
-include_lib("dgiot/include/logger.hrl").
-author("johnliu").
-record(state, {id, env}).
-record(state, {id, env, type = stream}).
%% API
-export([childSpec/3,
init/2]).
-export([childSpec/3]).
childSpec(Name, ChannelId, #{<<"port">> := Port} = ChannelArgs) ->
State = #state{
@ -42,10 +41,14 @@ childSpec(Name, ChannelId, #{<<"port">> := Port} = ChannelArgs) ->
SslOpts = [_ | _] ->
{ranch_ssl, Opts ++ SslOpts}
end,
Route = get_route(maps:get(<<"path">>, ChannelArgs, <<>>)),
Dispatch = cowboy_router:compile([
{'_', [
{Route, ?MODULE, State}
{"/", cowboy_static, {priv_file, dgiot_bridge, "www/index.html"}},
{"/websocket/[...]", dgiot_ws_h, State},
{"/static/[...]", cowboy_static, {priv_dir, dgiot_bridge, "www/static"}},
{"/api/[...]", dgiot_rest_h, State},
{"/http2ws/[...]", dgiot_http2ws_h, State},
{"/[...]", dgiot_http2ws_h, State}
]}
]),
CowboyOpts = #{
@ -54,25 +57,3 @@ childSpec(Name, ChannelId, #{<<"port">> := Port} = ChannelArgs) ->
}
},
ranch:child_spec(Name, 300, Transport, TransportOpts, cowboy_clear, CowboyOpts).
%% ====== http callback ======
init(Req, #state{ id = ChannelId, env = Env} = State) ->
{ok, _Type, ProductIds} = dgiot_bridge:get_products(ChannelId),
case dgiot_bridge:apply_channel(ChannelId, ProductIds, handle_info, [{http, Req}], Env) of
{ok, NewEnv} ->
Req1 = cowboy_req:reply(200, #{}, <<"hello word">>, Req),
{ok, Req1, State#state{env = NewEnv}};
{reply, _ProductId, {HTTPCode, Reply}, NewEnv} ->
Req1 = cowboy_req:reply(HTTPCode, #{}, Reply, Req),
{ok, Req1, State#state{env = NewEnv}};
{reply, _ProductId, {HTTPCode, Header, Reply}, NewEnv} ->
Req1 = cowboy_req:reply(HTTPCode, Header, Reply, Req),
{ok, Req1, State#state{env = NewEnv}}
end.
get_route(<<"http://", Path>>) ->
get_route(Path);
get_route(Path) when is_binary(Path) ->
binary_to_list(Path);
get_route(_) ->
"/[...]".

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_ffmpeg_h).
-module(dgiot_http2ws_h).
-include_lib("dgiot/include/dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
@ -28,12 +28,15 @@ init(Req0, Opts) ->
{cowboy_loop, Req, Opts}.
read_body_to_websocket(Req0) ->
Key = cowboy_req:path(Req0),
case cowboy_req:read_body(Req0) of
{ok, _Data, Req} ->
{ok, Data, Req} ->
dgiot_ws_h:run_hook(Key, Data),
%% io:format("~s", [Data]),
Req;
{more, _Data, Req} ->
{more, Data, Req} ->
%% io:format("~s", [Data]),
dgiot_ws_h:run_hook(Key, Data),
read_body_to_websocket(Req)
end.

View File

@ -0,0 +1,145 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_ws_h).
-include_lib("dgiot/include/dgiot_socket.hrl").
-include_lib("dgiot/include/logger.hrl").
-export([init/2]).
-export([websocket_init/1]).
-export([websocket_handle/2]).
-export([websocket_info/2]).
-export([terminate/3]).
-export([run_hook/2]).
%%GET /websocket/test HTTP/1.1
%%Host: 127.0.0.1:9082
%%Connection: Upgrade
%%Pragma: no-cache
%%Cache-Control: no-cache
%%User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36
%%Upgrade: websocket
%%Origin: http://127.0.0.1:3080
%%Sec-WebSocket-Version: 13
%%Accept-Encoding: gzip, deflate, br
%%Accept-Language: zh-CN,zh;q=0.9
%%Sec-WebSocket-Key: D7JD3d7II0KEJKvb4qCXcQ==
%%Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
%%Sec-WebSocket-Protocol: null
init(Req0, Opts) ->
case cowboy_websocket:is_upgrade_request(Req0) of
true ->
io:format("~s ~p ~p ~n", [?FILE, ?LINE, <<"1">>]),
cowboy_req:headers(Req0),
cowboy_websocket:upgrade(Req0, Opts, cowboy_websocket, #{});
_ ->
io:format("~s ~p ~p ~n", [?FILE, ?LINE, <<"2">>]),
{cowboy_websocket, Req0, Opts}
end.
websocket_init([Req, Opts]) ->
Path = cowboy_req:path(Req),
add_hook(Path),
erlang:start_timer(1000, self(), <<"Hello!">>),
{[], Opts}.
websocket_handle({text, Msg}, State) ->
{[{text, <<"That's what she said! ", Msg/binary>>}], State};
websocket_handle(_Data, State) ->
{[], State}.
websocket_info({timeout, _Ref, Msg}, State) ->
%% erlang:start_timer(1000, self(), <<"How' you doin'?">>),
{[{text, Msg}], State};
websocket_info({http2ws, Data}, State) ->
io:format("~s ~p ~p ~n", [?FILE, ?LINE, byte_size(Data)]),
{[{binary, Data}], State};
websocket_info(_Info, State) ->
{[], State}.
add_hook(Path) ->
case re:split(Path, <<"/">>) of
[<<>>, <<"websocket">>, Rest] ->
Key = <<"/", Rest/binary>>,
case dgiot_data:get({http2ws, Key}) of
not_find ->
dgiot_data:insert({http2ws, Key}, [self()]);
Pids ->
NewPids =
lists:foldl(fun(Pid, Acc) ->
case is_process_alive(Pid) of
true ->
Acc ++ [Pid];
_ ->
Acc
end
end, [self()], Pids),
dgiot_data:insert({http2ws, Key}, dgiot_utils:unique_1(NewPids))
end;
_ ->
pass
end.
terminate(_Reason, _Req, _UnExpectedState) ->
ok.
run_hook(Key, Data) ->
case dgiot_data:get({http2ws, Key}) of
not_find ->
pass;
Pids ->
lists:map(fun(Pid) ->
case is_process_alive(Pid) of
true ->
Pid ! {http2ws, Data};
_ ->
pass
end
end, Pids)
end.
%%check_update(Req0, Opts) ->
%% case cowboy_websocket:is_upgrade_request(Req0) of
%% true ->
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, <<"1">>]),
%% cowboy_req:headers(Req0),
%% cowboy_websocket:upgrade(Req0, Opts, cowboy_websocket, #{});
%% _ ->
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, <<"2">>]),
%% {cowboy_websocket, Req0, Opts}
%% end.
%%handshake(Data) ->
%%Key = list_to_binary(lists:last(string:tokens(hd(lists:filter(fun(S) -> lists:prefix("Sec-WebSocket-Key:", S) end, string:tokens(Data, "\r\n"))), ": "))),
%%Challenge = base64:encode(crypto:sha(<< Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)),
%%["HTTP/1.1 101 Switching Protocols\r\n",
%%"connection: Upgrade\r\n",
%%"upgrade: websocket\r\n",
%%"sec-websocket-accept: ", Challenge, "\r\n",
%%"\r\n",<<>>].
%%
%%http11_keepalive(Config) ->
%% {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
%% ws_opts => #{
%% keepalive => 100,
%% silence_pings => false
%% }
%% }),
%% {ok, _} = gun:await_up(ConnPid),
%% StreamRef = gun:ws_upgrade(ConnPid, "/", []),
%% {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
%% %% Gun sent a ping automatically, we therefore receive a pong.
%% {ws, pong} = gun:await(ConnPid, StreamRef),
%% gun:close(ConnPid).

View File

@ -24,53 +24,6 @@ docroot() ->
Root = dgiot_httpc:url_join([Dir, "/priv/"]),
Root ++ "www".
%%{ok,#{<<"results">> =>
%%[#{<<"battery_voltage">> => 11.7,<<"charge_current">> => 0,
%%<<"core_temperature">> => 37,
%%<<"createdat">> => <<"2021-06-07 18:49:42.061">>,
%%<<"day_electricity">> => 0.11,<<"dump_energy">> => 75.0,
%%<<"i_out">> => 0.0,<<"outside_temperature">> => 25,
%%<<"system_state">> => <<"0">>,<<"total_power">> => 2.1,
%%<<"v_out">> => 0.0,<<"v_solarpanel">> => 0.3}]}}
%%get_topo(Arg, _Context) ->
%% #{<<"productid">> := ProductId, <<"devaddr">> := Devaddr} = Arg,
%% Type = maps:get(<<"type">>, Arg, <<"web">>),
%% case dgiot_parse:get_object(<<"Product">>, ProductId) of
%% {ok, #{<<"config">> := #{<<"konva">> := #{<<"Stage">> := #{<<"children">> := Children} = Stage} = Konva}}} when length(Children) > 0 ->
%% case Devaddr of
%% undefined ->
%% NewChildren1 = get_children(Type, ProductId, Children, ProductId, <<"KonvatId">>, <<"Shapeid">>, <<"Identifier">>, <<"Name">>),
%% List = get_wechat(),
%% case Type of
%% <<"wechat">> ->
%% {ok, #{<<"code">> => 200, <<"message">> => <<"SUCCESS">>, <<"data">> => List}};
%% _ ->
%% {ok, #{<<"code">> => 200, <<"message">> => <<"SUCCESS">>, <<"data">> => Konva#{<<"Stage">> => Stage#{<<"children">> => NewChildren1}}}}
%% end;
%% _ ->
%% DeviceId = dgiot_parse_id:get_deviceid(ProductId, Devaddr),
%% case dgiot_tdengine:get_device(ProductId, Devaddr, #{<<"keys">> => <<"last_row(*)">>, <<"limit">> => 1}) of
%% {ok, #{<<"results">> := [Result | _]}} ->
%% put({self(), td}, Result);
%% _ ->
%% put({self(), td}, #{})
%% end,
%% NewChildren1 = get_children(Type, ProductId, Children, DeviceId, <<"KonvatId">>, <<"Shapeid">>, <<"Identifier">>, <<"Name">>),
%% List = get_wechat(),
%% case Type of
%% <<"wechat">> ->
%% {ok, #{<<"code">> => 200, <<"message">> => <<"SUCCESS">>, <<"data">> => List}};
%% _ ->
%% {ok, #{<<"code">> => 200, <<"message">> => <<"SUCCESS">>, <<"data">> => Konva#{<<"Stage">> => Stage#{<<"children">> => NewChildren1}}}}
%% end
%% end;
%% _ ->
%% {ok, #{<<"code">> => 204, <<"message">> => <<"没有组态"/utf8>>}}
%% end.
get_topo(Arg, _Context) ->
#{<<"productid">> := ProductId, <<"devaddr">> := Devaddr, <<"viewid">> := ViewId} = Arg,
Type = maps:get(<<"type">>, Arg, <<"web">>),