mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-04 21:27:39 +08:00
feat: meter optimization
This commit is contained in:
parent
8a50fa9004
commit
b599dc6e8d
@ -158,10 +158,8 @@ init(?TYPE, ChannelId, #{
|
||||
lists:map(fun(Prop) ->
|
||||
case Prop of
|
||||
#{<<"identifier">> := Identifier, <<"dataSource">> := #{<<"da">> := Da, <<"dt">> := Dt}} ->
|
||||
io:format("~s ~p ProductId ~p => Identifier ~p => dadt = ~p.~n", [?FILE, ?LINE, ProductId, Identifier, <<Da/binary, Dt/binary>>]),
|
||||
dgiot_data:insert({protocol, <<Da/binary, Dt/binary>>, ProductId}, Identifier);
|
||||
_ ->
|
||||
io:format("~s ~p Prop = ~p.~n", [?FILE, ?LINE, Prop]),
|
||||
pass
|
||||
end
|
||||
end, Props);
|
||||
|
@ -54,8 +54,6 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
|
||||
1 ->
|
||||
Frame1 = maps:get(<<"frame">>, Acc, <<>>),
|
||||
dgiot_bridge:send_log(ChannelId, " ~s ~p DLT376 login response: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Frame1)]),
|
||||
io:format("~s ~p DLT376 login Buff = ~p.~n", [?FILE, ?LINE, NewBuff]),
|
||||
io:format("~s ~p DLT376 response login Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Frame1)]),
|
||||
dgiot_tcp_server:send(TCPState, Frame1);
|
||||
_ -> pass
|
||||
end,
|
||||
@ -63,13 +61,13 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
|
||||
Frame2 = dlt376_decoder:to_frame(#{<<"command">> => 16#4B, %%读取集中器保存的电表信息,包括测量点号、表地址等
|
||||
<<"addr">> => Concentrator,
|
||||
<<"afn">> => ?AFN_READ_PARAM,
|
||||
<<"di">> => <<"00000201080001000200030004000500060007000800">>}), %%读取测量点号0001~0008的电表信息
|
||||
io:format("~s ~p DLT376 (集中器搜表) = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Frame2)]),
|
||||
<<"di">> => <<"0000020120000100020003000400050006000700080009000A000B000C000D000E000F0010001100120013001400150016001700180019001A001B001C001D001E001F002000">>}), %%读取测量点号0001~0020的电表信息
|
||||
%% io:format("~s ~p DLT376 (集中器搜表) = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Frame2)]),
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT376 (集中器搜表): ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Frame2)]),
|
||||
dgiot_tcp_server:send(TCPState, Frame2),
|
||||
{Protocol1, MeterAddr};
|
||||
_ ->
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT645 login ~p", [?FILE, ?LINE, NewBuff]),
|
||||
%% dgiot_bridge:send_log(ChannelId, "~s ~p DLT645 login ~p", [?FILE, ?LINE, NewBuff]),
|
||||
{?DLT645, Buff}
|
||||
end,
|
||||
case Protocol of
|
||||
@ -135,12 +133,11 @@ handle_info(search_meter, #tcp{state = #state{ref = Ref, protocol = ?DLT645} = S
|
||||
|
||||
%%ACK报文触发搜表
|
||||
handle_info({tcp, Buff}, #tcp{socket = Socket, clientid = DtuId, state = #state{id = ChannelId, dtuaddr = DtuAddr, protocol = ?DLT645, ref = Ref, step = search_meter, search = Search} = State} = TCPState) ->
|
||||
io:format("~s ~p tcp Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p from_dev=> ~p", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
%% io:format("~s ~p tcp Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
dgiot_metrics:inc(dgiot_meter, <<"search_meter">>, 1),
|
||||
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
|
||||
?LOG(info, "Buff ~p", [Buff]),
|
||||
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "from dev ~p (搜表成功)", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
%% ?LOG(info, "Buff ~p", [Buff]),
|
||||
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, " ~s ~p DLT645(搜表成功) Buff: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
|
||||
lists:map(fun(X) ->
|
||||
case X of
|
||||
@ -165,52 +162,28 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, clientid = DtuId, state = #state{
|
||||
end
|
||||
end;
|
||||
|
||||
%% 下发报文返回
|
||||
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = Protocol, env = #{product := ProductId, devaddr := DevAddr}}} = TCPState) ->
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT376 from_dev: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
io:format("~s ~p Response Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
|
||||
io:format("~s ~p parse_frame = ~p.~n", [?FILE, ?LINE, Frames]),
|
||||
dlt376_decoder:process_message(?DLT376, Frames, ChannelId),
|
||||
case get(DeviceId) of
|
||||
undefined -> pass;
|
||||
Pid ->
|
||||
Pid ! {control_msg, Frames}
|
||||
end,
|
||||
{noreply, TCPState#tcp{buff = Rest}};
|
||||
?DLT645 ->
|
||||
dgiot_bridge:send_log(ChannelId, "DLT645 from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
|
||||
dlt645_decoder:process_message(Frames, ChannelId),
|
||||
{noreply, TCPState#tcp{buff = Rest}};
|
||||
_ ->
|
||||
{noreply, TCPState#tcp{buff = <<0, 0, 0, 0, 0, 0, 0, 0>>}}
|
||||
end;
|
||||
|
||||
handle_info({tcp, Buff}, #tcp{socket = Socket, clientid = DtuId, state = #state{id = ChannelId, protocol = Protocol, step = _Step}} = TCPState) ->
|
||||
DTUIP = dgiot_utils:get_ip(Socket),
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT376 from_dev: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
io:format("~s ~p Response Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
%% dgiot_bridge:send_log(ChannelId, "~s ~p DLT376 from_dev: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
|
||||
io:format("~s ~p parse_frame = ~p.~n", [?FILE, ?LINE, Frames]),
|
||||
case Frames of
|
||||
[#{<<"con">> := 1, <<"frame">> := Frame} | _] ->
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT376 response: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Frame)]),
|
||||
io:format("~s ~p DLT376 send = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT376(回复确认)Frame: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Frame)]),
|
||||
dgiot_tcp_server:send(TCPState, Frame); %%回复确认
|
||||
[#{<<"afn">> := 16#0A, <<"di">> := <<16#00, 16#00, 16#02, 16#01>>} | _] ->
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT376(搜表回复)Buff: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
io:format("~s ~p (搜表回复) parse_frame = ~p.~n", [?FILE, ?LINE, Frames]),
|
||||
dlt376_decoder:process_message(Frames, ChannelId, DTUIP, DtuId); %%注册或更新电表信
|
||||
_ ->
|
||||
dgiot_bridge:send_log(ChannelId, "~s ~p DLT376(抄表回复)Buff: ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
|
||||
io:format("~s ~p (抄表回复) parse_frame = ~p.~n", [?FILE, ?LINE, Frames]),
|
||||
dlt376_decoder:process_message(?DLT376, Frames, ChannelId)
|
||||
end,
|
||||
{noreply, TCPState#tcp{buff = Rest}};
|
||||
?DLT645 ->
|
||||
dgiot_bridge:send_log(ChannelId, "DLT645 from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
dgiot_bridge:send_log(ChannelId, "DLT645(抄表回复) Buff: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
|
||||
dlt645_decoder:process_message(Frames, ChannelId),
|
||||
{noreply, TCPState#tcp{buff = Rest}};
|
||||
@ -221,18 +194,13 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, clientid = DtuId, state = #state{
|
||||
handle_info({retry, Concentrator}, TCPState) ->
|
||||
Crc1 = dgiot_utils:get_parity(<<16#4B, Concentrator/binary, 16#0C, 16#60, 16#01, 16#01, 16#01, 16#10>>),
|
||||
Frame1 = <<16#68, 16#32, 16#00, 16#32, 16#00, 16#68, 16#4B, Concentrator/binary, 16#0C, 16#60, 16#01, 16#01, 16#01, 16#10, Crc1:8, 16#16>>,
|
||||
dgiot_tcp_server:send(TCPState, Frame1), %16#27,16#03,16#00,16#72,16#05,16#53
|
||||
%% UserData = <<16#4B,Concentrator/binary,16#10,16#60,16#00,16#00,16#01,16#01,16#02,16#01,16#00,16#00,16#00,16#00,16#00,16#00,16#68,16#27,16#02,16#11,16#07,16#15,16#01,16#00,16#00,16#01,16#00>>,
|
||||
%% Crc2 = dgiot_utils:get_parity(UserData),
|
||||
%% Frame2 = <<16#68,16#7E,16#00,16#7E,16#00,16#68, UserData/binary,Crc2:8,16#16>>,
|
||||
%% dgiot_tcp_server:send(TCPState, Frame2),
|
||||
dgiot_tcp_server:send(TCPState, Frame1),
|
||||
erlang:send_after(20000, self(), {retry, Concentrator}),
|
||||
{noreply, TCPState};
|
||||
|
||||
%%接受抄表任务命令抄表(下发指令)
|
||||
handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol = Protocol} = State} = TCPState) ->
|
||||
Payload = dgiot_mqtt:get_payload(Msg),
|
||||
%% ?LOG(info, "Payload ~p ~n~n", [Payload]),
|
||||
dgiot_metrics:inc(dgiot_meter, <<"mqtt_revc">>, 1),
|
||||
case jsx:is_json(Payload) of
|
||||
true ->
|
||||
@ -241,14 +209,7 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
|
||||
Payload1 = dgiot_meter:to_frame(ThingData), %%DLT645协议,需要透传转发
|
||||
io:format("~s ~p Payload1 = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Payload1)]),
|
||||
%% HexPayload = dgiot_utils:binary_to_hex(Payload1),
|
||||
%% Payload2 = dlt376_decoder:to_frame(#{<<"afn">> => 16#10,
|
||||
%% <<"command">> => 16#4A,
|
||||
%% <<"addr">> => <<16#01, 16#33, 16#48, 16#00, 16#00>>,
|
||||
%% <<"di">> => <<"00000100">>,
|
||||
%% <<"data">> => <<"026B81801000", HexPayload/binary, "00000000000000000000000000000000">>}),
|
||||
Payload1 = dgiot_meter:to_frame(ThingData),
|
||||
dgiot_bridge:send_log(ChannelId, " ~s ~p DLT376 send to DevAddr ~p => ~p", [?FILE, ?LINE, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
?DLT645 ->
|
||||
@ -259,22 +220,15 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
|
||||
end,
|
||||
{noreply, TCPState};
|
||||
[<<"profile">>, ProductId, DevAddr] ->
|
||||
dgiot_umeng:send_message_to3D(ProductId, DevAddr, jsx:decode(Payload)),
|
||||
case jsx:decode(Payload) of
|
||||
#{<<"pid">> := Pid, <<"deviceid">> := DeviceId} = NewPayload ->
|
||||
put(DeviceId, Pid),
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
Payload2 = dlt376_decoder:frame_write_param(#{<<"concentrator">> => DevAddr, <<"payload">> => maps:without([<<"pid">>, <<"deviceid">>], NewPayload)}),
|
||||
?LOG(info, "Payload2:~p ~n~n", [dgiot_utils:binary_to_hex(Payload2)]),
|
||||
dgiot_tcp_server:send(TCPState, Payload2);
|
||||
?DLT645 ->
|
||||
Payload1 = dlt645_decoder:frame_write_param(#{<<"meter">> => DevAddr, <<"payload">> => jsx:decode(Payload)}),
|
||||
?LOG(info, "DLT645 Payload1 :~p ~n~n", [dgiot_utils:binary_to_hex(Payload1)]),
|
||||
dgiot_tcp_server:send(TCPState, Payload1)
|
||||
end;
|
||||
_ ->
|
||||
pass
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
Payload2 = dlt376_decoder:frame_write_param(#{<<"concentrator">> => DevAddr, <<"payload">> => jsx:decode(Payload)}),
|
||||
dgiot_bridge:send_log(ChannelId, " ~s ~p DLT376(下发) send to DevAddr ~p => ~p", [?FILE, ?LINE, DevAddr, dgiot_utils:binary_to_hex(Payload2)]),
|
||||
dgiot_tcp_server:send(TCPState, Payload2);
|
||||
?DLT645 ->
|
||||
Payload1 = dlt645_decoder:frame_write_param(#{<<"meter">> => DevAddr, <<"payload">> => jsx:decode(Payload)}),
|
||||
dgiot_bridge:send_log(ChannelId, " ~s ~p DLT645(下发) send to DevAddr ~p => ~p", [?FILE, ?LINE, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
|
||||
dgiot_tcp_server:send(TCPState, Payload1)
|
||||
end,
|
||||
{noreply, TCPState#tcp{state = State#state{env = #{product => ProductId, devaddr => DevAddr}}}};
|
||||
[<<"thingctrl">>, _ProductId, _DevAddr] ->
|
||||
@ -283,14 +237,14 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
|
||||
?DLT376 ->
|
||||
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
|
||||
case ThingData of
|
||||
% 远程控制电表拉闸、合闸
|
||||
% 远程控制电表拉闸、合闸
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
|
||||
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
|
||||
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
|
||||
% 上次合闸时间
|
||||
% 上次合闸时间
|
||||
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
@ -300,13 +254,13 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
|
||||
?DLT645 ->
|
||||
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
|
||||
case ThingData of
|
||||
% 远程控制电表拉闸、合闸
|
||||
% 远程控制电表拉闸、合闸
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
|
||||
% ?LOG(info, "GGM 212 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
|
||||
% ?LOG(info, "GGM 212 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
|
||||
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
|
||||
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
|
||||
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
@ -322,27 +276,9 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
|
||||
false -> {noreply, TCPState}
|
||||
end;
|
||||
|
||||
|
||||
%%handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = <<>>, search = Search} = State} = TCPState) ->
|
||||
%% DTUIP = dgiot_utils:get_ip(Socket),
|
||||
%% NewBuff =
|
||||
%% case is_binary(Buff) of
|
||||
%% true -> dgiot_utils:binary_to_hex(Buff);
|
||||
%% false -> Buff
|
||||
%% end,
|
||||
%% {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
|
||||
%% dgiot_bridge:send_log(ChannelId, DtuProductId, NewBuff, "(登录) ~p ", [NewBuff]),
|
||||
%% {Protocol, DtuAddr, NewRef, NewStep} = frame(Buff, DTUIP, DtuProductId, TCPState),
|
||||
%% DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
|
||||
%% {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId,
|
||||
%% state = State#state{dtuaddr = DtuAddr, protocol = Protocol, ref = NewRef, step = NewStep}}};
|
||||
|
||||
|
||||
%% 异常报文丢弃
|
||||
%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop
|
||||
handle_info(Info, TCPState) ->
|
||||
io:format("~s ~p Error Info = ~p.~n", [?FILE, ?LINE, Info]),
|
||||
io:format("~s ~p Error TCPState = ~p.~n", [?FILE, ?LINE, TCPState]),
|
||||
handle_info(_Info, TCPState) ->
|
||||
{noreply, TCPState}.
|
||||
|
||||
handle_call(_Msg, _From, TCPState) ->
|
||||
@ -357,84 +293,3 @@ terminate(_Reason, _TCPState) ->
|
||||
|
||||
code_change(_OldVsn, TCPState, _Extra) ->
|
||||
{ok, TCPState}.
|
||||
|
||||
|
||||
%%frame(Buff, DTUIP, DtuProductId, #tcp{state = #state{id = ChannelId, dtuaddr = <<>>, search = Search}} = TCPState) ->
|
||||
%% {Protocol, DtuAddr} =
|
||||
%% case Buff of
|
||||
%% <<16#68, _:4/bytes, 16#68, _A1:8/bytes, _Rest/binary>> ->
|
||||
%% {_, [Acc | _]} = dlt376_decoder:parse_frame(Buff, []), %% NewBuff
|
||||
%% #{<<"msgtype">> := Protocol1, <<"con">> := Con, <<"addr">> := MeterAddr} = Acc,
|
||||
%% Concentrator = maps:get(<<"concentrator">>, Acc, <<16#31,16#07,16#5F,16#81,16#00>>),
|
||||
%% case Con of
|
||||
%% 1 ->
|
||||
%% Frame1 = maps:get(<<"frame">>, Acc, <<>>),
|
||||
%% dgiot_tcp_server:send(TCPState, Frame1),
|
||||
%% erlang:send_after(10000, self(), {retry, Concentrator});
|
||||
%% _ -> pass
|
||||
%% end,
|
||||
%% Frame2 = dlt376_decoder:to_frame(#{<<"command">> => <<16#4B>>, %%读取集中器保存的电表信息,包括测量点号、表地址等
|
||||
%% <<"concentrator">> => Concentrator,
|
||||
%% <<"afn">> => ?AFN_READ_PARAM,
|
||||
%% <<"di">> => <<"00000201">>}),
|
||||
%% dgiot_tcp_server:send(TCPState, Frame2),
|
||||
%% dgiot_meter:create_meter4G(MeterAddr, ChannelId, DTUIP),
|
||||
%% {Protocol1, MeterAddr};
|
||||
%% _ ->
|
||||
%% {?DLT645, Buff}
|
||||
%% end,
|
||||
%% case Protocol of
|
||||
%% ?DLT376 ->
|
||||
%% {ProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
|
||||
%% Topic = <<"thing/", ProductId/binary, "/", DtuAddr/binary>>,
|
||||
%% dgiot_mqtt:subscribe(Topic), %为这个设备订阅一个mqtt
|
||||
%% dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
%% {NewRef, NewStep} = {undefined, read_meter},
|
||||
%% DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
|
||||
%% case Search of
|
||||
%% <<"nosearch">> ->
|
||||
%% lists:map(fun(X) ->
|
||||
%% case X of
|
||||
%% #{<<"product">> := #{<<"objectId">> := MeterProductid}, <<"devaddr">> := Meteraddr,<<"route">> := Route} ->
|
||||
%% dgiot_bridge:send_log(ChannelId, MeterProductid, Meteraddr, "save taskque MeterProductid ~p Meteraddr ~p", [MeterProductid, Meteraddr]),
|
||||
%% dgiot_data:insert({concentrator, MeterProductid, Meteraddr}, Route),
|
||||
%% dgiot_task:save_pnque(DtuProductId, DtuAddr, MeterProductid, Meteraddr);
|
||||
%% _ ->
|
||||
%% pass
|
||||
%% end
|
||||
%% end, dgiot_meter:get_sub_device(DtuAddr));
|
||||
%% _ ->
|
||||
%% pass
|
||||
%% end,
|
||||
%% dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
|
||||
%% {Protocol, DtuAddr, NewRef, NewStep};
|
||||
%% ?DLT645 ->
|
||||
%% dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP),
|
||||
%% {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
|
||||
%% Topic = <<"profile/", DtuProductId/binary, "/", DtuAddr/binary>>,
|
||||
%% dgiot_mqtt:subscribe(Topic),
|
||||
%% {NewRef, NewStep} =
|
||||
%% case Search of
|
||||
%% <<"nosearch">> ->
|
||||
%% lists:map(fun(X) ->
|
||||
%% case X of
|
||||
%% #{<<"product">> := #{<<"objectId">> := MeterProductid}, <<"devaddr">> := Meteraddr} ->
|
||||
%% dgiot_bridge:send_log(ChannelId, MeterProductid, Meteraddr, "save taskque Meteraddr ~p", [Meteraddr]),
|
||||
%% dgiot_task:save_pnque(DtuProductId, DtuAddr, MeterProductid, Meteraddr);
|
||||
%% _ ->
|
||||
%% pass
|
||||
%% end
|
||||
%% end, dgiot_meter:get_sub_device(DtuAddr)),
|
||||
%% {undefined, read_meter};
|
||||
%% <<"quick">> ->
|
||||
%% dgiot_meter:search_meter(tcp, undefined, TCPState, 0),
|
||||
%% {undefined, search_meter};
|
||||
%% _ ->
|
||||
%% {Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
|
||||
%% {Ref, Step}
|
||||
%% end,
|
||||
%% dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]),
|
||||
%% DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
|
||||
%% dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
|
||||
%% {Protocol, DtuAddr, NewRef, NewStep}
|
||||
%% end.
|
||||
|
@ -472,9 +472,7 @@ to_frame(#{
|
||||
>>;
|
||||
|
||||
to_frame(#{
|
||||
% <<"msgtype">> := ?DLT376,
|
||||
<<"command">> := C,
|
||||
%% <<"addr">> := Addr, %%?? Addr是6位字符
|
||||
<<"addr">> := Addr,
|
||||
<<"afn">> := AFN
|
||||
} = Msg) when AFN == 1 orelse AFN == 4 orelse AFN == 5 orelse AFN == 6 orelse AFN == 10 ->
|
||||
@ -500,13 +498,11 @@ to_frame(#{
|
||||
|
||||
%% 组装成封包
|
||||
to_frame(#{
|
||||
% <<"msgtype">> := ?DLT376,
|
||||
<<"command">> := C,
|
||||
%% <<"addr">> := Addr, %%?? Addr是6位字符
|
||||
<<"concentrator">> := Addr,
|
||||
<<"afn">> := AFN
|
||||
} = Msg) ->
|
||||
io:format("~s ~p Msg = ~p.~n", [?FILE, ?LINE, Msg]),
|
||||
%% io:format("~s ~p Msg = ~p.~n", [?FILE, ?LINE, Msg]),
|
||||
{ok, UserZone} = get_userzone(Msg),
|
||||
UserZone1 = <<C:8, Addr:5/bytes, AFN:8, 16#61, UserZone/binary>>,
|
||||
Len = byte_size(UserZone1) * 4 + 2,
|
||||
@ -765,37 +761,16 @@ pn_to_da(Pn) ->
|
||||
%% ConAddr = <<"000033010048">>,
|
||||
frame_write_param(#{<<"concentrator">> := ConAddr, <<"payload">> := Frame}) ->
|
||||
Length = length(maps:keys(Frame)),
|
||||
io:format("~s ~p SortFrame ~p.~n", [?FILE, ?LINE, Length]),
|
||||
{BitList, Afn, Da, Fn} =
|
||||
lists:foldl(fun(Index, {Acc, A, D, F}) ->
|
||||
case maps:find(dgiot_utils:to_binary(Index), Frame) of
|
||||
{ok, #{<<"value">> := Value, <<"dataSource">> := #{<<"afn">> := AFN, <<"da">> := Da, <<"dt">> := FN, <<"length">> := Len, <<"type">> := Type} = _DataForm}} ->
|
||||
io:format("~s ~p Value ~p. Da ~p FN ~p ~n", [?FILE, ?LINE, Value, Da, FN]),
|
||||
DA = dgiot_utils:binary_to_hex(pn_to_da(Da)),
|
||||
case Type of
|
||||
<<"bytes">> ->
|
||||
NewValue = dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Value)),
|
||||
io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, NewValue]),
|
||||
{get_values(Acc, NewValue), AFN, DA, FN};
|
||||
<<"little">> ->
|
||||
NewValue = dgiot_utils:to_int(Value),
|
||||
L = dgiot_utils:to_int(Len),
|
||||
Len1 = L * 8,
|
||||
io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, NewValue]),
|
||||
{get_values(Acc, <<NewValue:Len1/little>>), AFN, DA, FN};
|
||||
<<"bit">> ->
|
||||
NewValue = dgiot_utils:to_int(Value),
|
||||
L = dgiot_utils:to_int(Len),
|
||||
io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, NewValue]),
|
||||
{Acc ++ [{NewValue, L}], AFN, DA, FN}
|
||||
end;
|
||||
{ok, #{<<"value">> := Value, <<"dataSource">> := DataSource}} ->
|
||||
get_bitlist(Value, DataSource, Acc);
|
||||
_ ->
|
||||
{Acc, A, D, F}
|
||||
end
|
||||
end, {[], 0, <<>>, <<>>}, lists:seq(1, Length)),
|
||||
io:format("~s ~p BitList ~p.~n", [?FILE, ?LINE, BitList]),
|
||||
UserZone = <<<<V:BitLen>> || {V, BitLen} <- BitList>>,
|
||||
io:format("~s ~p UserZone ~p. Afn ~p ~n", [?FILE, ?LINE, UserZone, Afn]),
|
||||
UserData = add_to_userzone(UserZone, Afn, Fn),
|
||||
dlt376_decoder:to_frame(#{<<"command">> => 16#4B,
|
||||
<<"addr">> => concentrator_to_addr(ConAddr),
|
||||
@ -808,35 +783,25 @@ get_values(Acc, Data) ->
|
||||
Acc1 ++ [{V, 8}]
|
||||
end, Acc, binary_to_list(Data)).
|
||||
|
||||
%%frame_write_param(#{<<"concentrator">> := ConAddr, <<"basedata">> := Data}) ->
|
||||
%% ZZXH = to_integer(maps:get(<<"zzxh">>, Data, <<"2040">>)),
|
||||
%% CLDH = to_integer(maps:get(<<"cldh">>, Data, <<"2040">>)),
|
||||
%% BTL = to_integer(maps:get(<<"btl">>, Data, <<"3">>)),
|
||||
%% TXDKH = to_integer(maps:get(<<"txdkh">>, Data, <<"2">>)),
|
||||
%% TXXY = to_integer(maps:get(<<"txxy">>, Data, <<"30">>)),
|
||||
%% DBDZ = to_device(maps:get(<<"dbdz">>, Data, <<"000000000000">>)),
|
||||
%% TXMM = to_device(maps:get(<<"txmm">>, Data, <<"000000000000">>)),
|
||||
%% CJQ = to_device(maps:get(<<"cjq">>, Data, <<"000000000000">>)),
|
||||
%% DNFLS = to_integer(maps:get(<<"dnfls">>, Data, <<"4">>)),
|
||||
%% YHLX = to_integer(maps:get(<<"yhlx">>, Data, <<"5">>)),
|
||||
%% DBLX = to_integer(maps:get(<<"dblx">>, Data, <<"1">>)),
|
||||
%% UserZone = <<16#01, 16#00,
|
||||
%% ZZXH:16/little,
|
||||
%% CLDH:16/little,
|
||||
%% BTL:3, TXDKH:5,
|
||||
%% TXXY:8,
|
||||
%% DBDZ:6/bytes,
|
||||
%% TXMM:6/bytes,
|
||||
%% DNFLS:8,
|
||||
%% 16#0B,
|
||||
%% CJQ:6/bytes,
|
||||
%% YHLX:4, DBLX:1>>,
|
||||
%% dlt376_decoder:to_frame(#{<<"command">> => 16#4B,
|
||||
%% <<"concentrator">> => ConAddr,
|
||||
%% <<"afn">> => 16#04,
|
||||
%% <<"da">> => <<16#00, 16#00>>,
|
||||
%% <<"di">> => <<16#02, 16#01>>,
|
||||
%% <<"data">> => UserZone}).
|
||||
%% 下发指令
|
||||
get_bitlist(Value, #{<<"afn">> := AFN, <<"da">> := DA, <<"dt">> := FN, <<"length">> := Len, <<"type">> := Type}, Acc) ->
|
||||
case Type of
|
||||
<<"bytes">> ->
|
||||
NewValue = dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Value)),
|
||||
%% io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(NewValue)]),
|
||||
{get_values(Acc, NewValue), AFN, DA, FN};
|
||||
<<"little">> ->
|
||||
NewValue = dgiot_utils:to_int(Value),
|
||||
L = dgiot_utils:to_int(Len),
|
||||
Len1 = L * 8,
|
||||
%% io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(NewValue)]),
|
||||
{get_values(Acc, <<NewValue:Len1/little>>), AFN, DA, FN};
|
||||
<<"bit">> ->
|
||||
NewValue = dgiot_utils:to_int(Value),
|
||||
L = dgiot_utils:to_int(Len),
|
||||
%% io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(NewValue)]),
|
||||
{Acc ++ [{NewValue, L}], AFN, DA, FN}
|
||||
end.
|
||||
|
||||
add_to_userzone(UserZone, _Afn, _Fn) ->
|
||||
UserZone.
|
||||
@ -856,7 +821,7 @@ concentrator_to_addr(ConAddr) when byte_size(ConAddr) == 12 ->
|
||||
concentrator_to_addr(_ConAddr) ->
|
||||
<<16#00, 16#00, 16#00, 16#00, 16#00>>.
|
||||
|
||||
decoder_value_Rate(Index, BinDi, BinDa, BinDt, DValue, Rates, #{<<"afn">> := 16#0C, <<"value">> := _Value, <<"childvalue">> := ChildValue} = State) ->
|
||||
decoder_value_Rate(Index, BinDi, BinDa, BinDt, DValue, Rates, #{<<"afn">> := 16#0C, <<"value">> := Value, <<"childvalue">> := ChildValue} = State) ->
|
||||
NewBinDa =
|
||||
case maps:find(BinDa, ChildValue) of
|
||||
error ->
|
||||
@ -875,10 +840,10 @@ decoder_value_Rate(Index, BinDi, BinDa, BinDt, DValue, Rates, #{<<"afn">> := 16#
|
||||
Rate
|
||||
end,
|
||||
State1 = State#{
|
||||
%% <<"value">> => Value#{
|
||||
%% <<BinDi/binary, "">> => binary_to_value_dlt376_bcd(DValue),
|
||||
%% <<BinDi/binary, "0", BinIndex/binary>> => binary_to_value_dlt376_bcd(NewRate)
|
||||
%% },
|
||||
<<"value">> => Value#{
|
||||
<<BinDi/binary, "">> => binary_to_value_dlt376_bcd(DValue),
|
||||
<<BinDi/binary, "0", BinIndex/binary>> => binary_to_value_dlt376_bcd(NewRate)
|
||||
},
|
||||
<<"childvalue">> => ChildValue#{
|
||||
<<BinDa/binary, "">> => NewBinDa#{
|
||||
<<"0000", BinDt/binary>> => binary_to_value_dlt376_bcd(DValue),
|
||||
|
@ -143,12 +143,12 @@ handle_info(retry, State) ->
|
||||
{noreply, send_msg(State)};
|
||||
|
||||
%% 任务结束
|
||||
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = ProductId1, devaddr = DevAddr1, ack = Ack, que = Que} = State) when length(Que) == 0 ->
|
||||
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = _ProductId1, devaddr = _DevAddr1, ack = Ack, que = Que} = State) when length(Que) == 0 ->
|
||||
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
|
||||
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
|
||||
[<<"thing">>, ProductId, DevAddr, <<"post">>] ->
|
||||
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p ~ts: ~ts ", [?FILE, ?LINE, unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
|
||||
io:format("~s ~p DevAddr ~p => ProductId ~p => Payload ~p.~n", [?FILE, ?LINE, DevAddr, ProductId, Payload]),
|
||||
%% io:format("~s ~p DevAddr ~p => ProductId ~p => Payload ~p.~n", [?FILE, ?LINE, DevAddr, ProductId, Payload]),
|
||||
NewPayload =
|
||||
maps:fold(fun(K, V, Acc) ->
|
||||
case dgiot_data:get({protocol, K, ProductId}) of
|
||||
@ -160,10 +160,10 @@ handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = Product
|
||||
end, #{}, Payload),
|
||||
NewAck = dgiot_task:get_collection(ProductId, Dis, NewPayload, maps:merge(Ack, NewPayload)),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1),
|
||||
io:format("~s ~p DevAddr ~p => NewAck = ~p.~n", [?FILE, ?LINE, DevAddr, NewAck]),
|
||||
%% io:format("~s ~p DevAddr ~p => NewAck = ~p.~n", [?FILE, ?LINE, DevAddr, NewAck]),
|
||||
{noreply, get_next_pn(State#task{ack = NewAck, product = ProductId, devaddr = DevAddr})};
|
||||
_ ->
|
||||
io:format("~s ~p DevAddr ~p => ProductId ~p => Payload ~p.~n", [?FILE, ?LINE, DevAddr1, ProductId1, Payload]),
|
||||
%% io:format("~s ~p DevAddr ~p => ProductId ~p => Payload ~p.~n", [?FILE, ?LINE, DevAddr1, ProductId1, Payload]),
|
||||
{noreply, get_next_pn(State#task{ack = Ack})}
|
||||
end;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user