feat: dgiot_serial_client

This commit is contained in:
dawnwinterLiu 2022-12-02 15:02:37 +08:00
parent a32eb9fea1
commit d7bd75eec1
4 changed files with 260 additions and 5 deletions

View File

@ -0,0 +1,251 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_serial_client).
-behaviour(gen_server).
-define(SERIAL, ets).
-export([
init/0,
open/1, open/2,
close/1,
getfd/1,
read/2,
write/2,
send/2,
controlling_process/2
]).
-export([start_link/2]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-record(state, {
% Original termios attributes
oattr,
port,
% PID of controlling process
pid,
% serial dev file descriptor
fd,
% device name
dev,
speed,
% interval (Unit: millisecond)
interval = 50 :: integer(), %% b2400
%% Timestamp (Unit: millisecond)
timestamp = 0 :: integer(),
%% Message from
data = <<>> :: binary(),
%%
package_recv_count = 0 :: integer(),
%%
package_send_count = 0 :: integer()
}).
init() ->
dgiot_data:init(?SERIAL).
%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
open(Dev) ->
open(Dev, []).
open(Dev, Opt) ->
start_link(Dev, Opt).
close(Ref) when is_pid(Ref) ->
catch gen_server:call(Ref, close, infinity),
ok;
close(Ref) ->
serctl:close(Ref),
ok.
getfd(Ref) when is_pid(Ref) ->
gen_server:call(Ref, fd, infinity).
read(FD, Len) when is_integer(Len) ->
serctl:read(FD, Len).
write(Serialport, Data) ->
case dgiot_data:get(?SERIAL, Serialport) of
Pid when is_pid(Pid) ->
case is_process_alive(Pid) of
true ->
gen_server:call(Pid, {write, Data}, infinity);
false ->
pass
end;
_ ->
pass
end.
send(Ref, Data) when is_pid(Ref) ->
gen_server:call(Ref, {send, Data}, infinity).
% FIXME: race condition: events can be delivered out of order
controlling_process(Ref, Pid) when is_pid(Ref), is_pid(Pid) ->
gen_server:call(Ref, {controlling_process, Pid}, infinity),
flush_events(Ref, Pid).
start_link(Dev, Opt) ->
ParentPid = self(),
gen_server:start_link(?MODULE, [ParentPid, Dev, Opt], []).
%%--------------------------------------------------------------------
%%% Callbacks
%%--------------------------------------------------------------------
init([ParentPid, Serialport, Opt]) ->
dgiot_data:insert(?SERIAL, Serialport, self()),
process_flag(trap_exit, true),
BSpeed = proplists:get_value(speed, Opt, b9600),
Flow = proplists:get_value(flow, Opt, true),
PortOpt = proplists:get_value(port_options, Opt, [stream, binary]),
Dev = <<"/dev/", Serialport/binary>>,
{ok, FD} = serctl:open(Dev),
{ok, Orig} = serctl:tcgetattr(FD),
Mode =
case proplists:get_value(mode, Opt, raw) of
raw -> serctl:mode(raw);
none -> Orig
end,
Termios = lists:foldl(
fun(Fun, Acc) -> Fun(Acc) end,
Mode,
[
fun(N) -> serctl:flow(N, Flow) end,
fun(N) -> serctl:ispeed(N, BSpeed) end,
fun(N) -> serctl:ospeed(N, BSpeed) end
]
),
<<"b", Speed/binary>> = dgiot_utils:to_binary(BSpeed),
ok = serctl:tcsetattr(FD, tcsanow, Termios),
ParentPid ! {serial_open, #{<<"pid">> => ParentPid, <<"fd">> => FD}},
{ok, #state{
oattr = Orig,
speed = dgiot_utils:to_int(Speed),
port = set_active(FD, PortOpt),
pid = ParentPid,
fd = FD,
dev = Dev
}}.
%%
%% retrieve/modify gen_server state
%%
handle_call(devname, _From, #state{dev = Dev} = State) ->
{reply, Dev, State};
handle_call(fd, _From, #state{fd = FD} = State) ->
{reply, FD, State};
handle_call({send, Data}, _From, #state{port = Port} = State) ->
Reply =
try erlang:port_command(Port, Data) of
true -> ok
catch
error:Error -> {error, Error}
end,
{reply, Reply, State};
handle_call({write, Data}, _From, #state{fd = FD} = State) ->
Reply =
try serctl:write(FD, Data) of
ok -> ok
catch
error:Error -> {error, Error}
end,
{reply, Reply, State};
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call({controlling_process, Pid}, {Owner, _}, #state{pid = Owner} = State) ->
link(Pid),
unlink(Owner),
{reply, ok, State#state{pid = Pid}}.
handle_cast(_Msg, State) ->
{noreply, State}.
%%
%% {active, true} mode
handle_info({Port, {data, FirstData}}, #state{port = Port, speed = Speed, data = <<>>} = State) when Speed < 9600 ->
Ms = dgiot_datetime:now_ms(),
erlang:send_after(100, self(), timeout),
{noreply, State#state{timestamp = Ms, data = iolist_to_binary([FirstData])}};
handle_info({Port, {data, Data}}, #state{port = Port, speed = Speed, data = OldData} = State) when Speed < 9600 ->
Ms = dgiot_datetime:now_ms(),
{noreply, State#state{timestamp = Ms, data = iolist_to_binary([OldData | Data])}};
handle_info({Port, {data, Data}}, #state{port = Port, pid = Pid} = State) ->
{noreply, send(Pid, Data, State)};
handle_info(timeout, #state{pid = ParentPid, data = NowData, timestamp = Timestamp} = State) ->
Ms = dgiot_datetime:now_ms(),
case Ms - Timestamp of
Interval when Interval > 15 ->
{noreply, send(ParentPid, NowData, State)};
_ ->
erlang:send_after(100, self(), timeout),
{noreply, State}
end;
% port has closed
handle_info({'EXIT', Port, _Reason}, #state{port = Port} = State) ->
{stop, shutdown, State};
% WTF?
handle_info(Info, State) ->
error_logger:error_report([wtf, Info]),
{noreply, State}.
terminate(_Reason, #state{fd = FD, port = Port, oattr = Orig}) ->
catch erlang:port_close(Port),
_ = serctl:tcsetattr(FD, tcsanow, Orig),
_ = serctl:close(FD),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
set_active(Res, Opt) ->
FD = serctl:getfd(Res),
erlang:open_port({fd, FD, FD}, Opt).
flush_events(Ref, Pid) ->
receive
{serial, Ref, _} = Event ->
Pid ! Event,
flush_events(Ref, Pid)
after 0 -> ok
end.
send(ParentPid, Data, #state{package_recv_count = Recv_count} = State) ->
ParentPid ! {serial_data, self(), Data},
State#state{data = <<>>, package_recv_count = Recv_count + 1}.

View File

@ -62,8 +62,9 @@ start(ChannelId, ChannelArgs) ->
init(?TYPE, ChannelId, ChannelArgs) ->
State = #state{id = ChannelId},
dgiot_parse_hook:subscribe(<<"Device/*">>, put, ChannelId, [<<"profile">>]),
ChildSpecs = dgiot_mock_mqtt:childspec(ChannelId, ChannelArgs),
{ok, State, ChildSpecs}.
MqttChildSpecs = dgiot_mock_mqtt:childspec(ChannelId, ChannelArgs),
TcpChildSpecs = dgiot_mock_tcp:childspec(ChannelId, ChannelArgs),
{ok, State, MqttChildSpecs ++ TcpChildSpecs}.
handle_init(State) ->
{ok, State}.

View File

@ -67,19 +67,19 @@
order => 3,
type => string,
required => true,
default => <<"9C-A5-25-**-**-**">>,
default => <<"6D-5G-8I-**-**-**">>,
title => #{
zh => <<"登录报文帧头"/utf8>>
},
description => #{
zh => <<"填写正则表达式匹配login报文, 9C-A5标识设备类型**-**-**-**为设备地址,中杆会自动去除"/utf8>>
zh => <<"填写正则表达式匹配login报文, 设备地址, 中杠会自动去除"/utf8>>
}
},
<<"dtutype">> => #{
order => 5,
type => string,
required => true,
default => <<"usr">>,
default => <<"DGIOT">>,
title => #{
zh => <<"控制器厂商"/utf8>>
},

View File

@ -274,6 +274,9 @@ set_params(Payload, _ProductId, _DevAddr) ->
%rtu modbus
parse_frame(<<>>, Acc, _State) -> {<<>>, Acc};
parse_frame(Buff, _Acc, _State) when length(Buff) < 6 ->
{<<>>, #{}};
parse_frame(<<MbAddr:8, BadCode:8, ErrorCode:8, Crc:2/binary>> = Buff, Acc,
#{<<"addr">> := DtuAddr} = State) ->
CheckCrc = dgiot_utils:crc16(<<MbAddr:8, BadCode:8, ErrorCode:8>>),