mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-11-30 19:28:09 +08:00
feat: trace
This commit is contained in:
parent
39a1cc93ec
commit
1eadb3acd1
@ -99,7 +99,7 @@ handle_info({tcp_passive, _Sock}, State) ->
|
||||
|
||||
%% add register function
|
||||
handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{register = false, buff = Buff, socket = Sock} = ChildState} = State) ->
|
||||
dgiot_metrics:inc(dgiot_bridge,<<"tcp_server_recv">>,1),
|
||||
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1),
|
||||
Binary = iolist_to_binary(Data),
|
||||
NewBin =
|
||||
case binary:referenced_byte_size(Binary) of
|
||||
@ -118,14 +118,14 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{register = false,
|
||||
Port = dgiot_utils:get_port(Sock),
|
||||
dgiot_cm:insert_channel_info(ClientId,#{ip => Ip, port => Port,online => dgiot_datetime:now_microsecs()},[{tcp_recv, 1}]),
|
||||
{noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate};
|
||||
{noreply, NewChild} ->
|
||||
{noreply, NewChild} ->
|
||||
{noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate};
|
||||
{stop, Reason, NewChild} ->
|
||||
{stop, Reason, State#state{child = NewChild}}
|
||||
end;
|
||||
|
||||
handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socket = Sock} = ChildState} = State) ->
|
||||
dgiot_metrics:inc(dgiot_bridge,<<"tcp_server_recv">>,1),
|
||||
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1),
|
||||
Binary = iolist_to_binary(Data),
|
||||
NewBin =
|
||||
case binary:referenced_byte_size(Binary) of
|
||||
@ -135,9 +135,15 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socke
|
||||
Binary
|
||||
end,
|
||||
write_log(ChildState#tcp.log, <<"RECV">>, NewBin),
|
||||
?LOG(info,"ChildState ~p",[ChildState]),
|
||||
?LOG(info, "ChildState ~p", [ChildState]),
|
||||
Cnt = byte_size(NewBin),
|
||||
NewChildState = ChildState#tcp{buff = <<>>},
|
||||
case NewChildState of
|
||||
#tcp{clientid = CliendId, register = true}->
|
||||
dgiot_tracer:check_trace(CliendId, CliendId, Binary);
|
||||
_->
|
||||
pass
|
||||
end,
|
||||
case Mod:handle_info({tcp, <<Buff/binary, NewBin/binary>>}, NewChildState) of
|
||||
{noreply, NewChild} ->
|
||||
{noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate};
|
||||
@ -146,25 +152,25 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socke
|
||||
end;
|
||||
|
||||
handle_info({shutdown, Reason}, #state{child = #tcp{clientid = CliendId, register = true} = ChildState} = State) ->
|
||||
?LOG(error,"shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]),
|
||||
dgiot_cm:connection_closed(CliendId),
|
||||
?LOG(error, "shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]),
|
||||
dgiot_cm:unregister_channel(CliendId),
|
||||
write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))),
|
||||
{stop, normal, State#state{child = ChildState#tcp{socket = undefined}}};
|
||||
|
||||
handle_info({shutdown, Reason}, #state{child = ChildState} = State) ->
|
||||
?LOG(error,"shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]),
|
||||
?LOG(error, "shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]),
|
||||
write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))),
|
||||
{stop, normal, State#state{child = ChildState#tcp{socket = undefined}}};
|
||||
|
||||
|
||||
handle_info({tcp_error, _Sock, Reason}, #state{child = ChildState} = State) ->
|
||||
?LOG(error,"tcp_error, ~p, ~p~n", [Reason, ChildState#tcp.state]),
|
||||
?LOG(error, "tcp_error, ~p, ~p~n", [Reason, ChildState#tcp.state]),
|
||||
write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))),
|
||||
{stop, {shutdown, Reason}, State};
|
||||
|
||||
handle_info({tcp_closed, Sock}, #state{mod = Mod, child = #tcp{socket = Sock} = ChildState} = State) ->
|
||||
write_log(ChildState#tcp.log, <<"ERROR">>, <<"tcp_closed">>),
|
||||
?LOG(error,"tcp_closed ~p", [ChildState#tcp.state]),
|
||||
?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]),
|
||||
case Mod:handle_info(tcp_closed, ChildState) of
|
||||
{noreply, NewChild} ->
|
||||
{stop, normal, State#state{child = NewChild#tcp{socket = undefined}}};
|
||||
@ -181,12 +187,12 @@ handle_info(Info, #state{mod = Mod, child = ChildState} = State) ->
|
||||
end.
|
||||
|
||||
terminate(Reason, #state{mod = Mod, child = #tcp{clientid = CliendId, register = true} = ChildState}) ->
|
||||
dgiot_cm:connection_closed(CliendId),
|
||||
dgiot_metrics:dec(dgiot_bridge,<<"tcp_server">>, 1),
|
||||
dgiot_cm:unregister_channel(CliendId),
|
||||
dgiot_metrics:dec(dgiot_bridge, <<"tcp_server">>, 1),
|
||||
Mod:terminate(Reason, ChildState);
|
||||
|
||||
terminate(Reason, #state{mod = Mod, child = ChildState}) ->
|
||||
dgiot_metrics:dec(dgiot_bridge,<<"tcp_server">>,1),
|
||||
dgiot_metrics:dec(dgiot_bridge, <<"tcp_server">>, 1),
|
||||
Mod:terminate(Reason, ChildState).
|
||||
|
||||
code_change(OldVsn, #state{mod = Mod, child = ChildState} = State, Extra) ->
|
||||
@ -197,6 +203,17 @@ code_change(OldVsn, #state{mod = Mod, child = ChildState} = State, Extra) ->
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
send(#tcp{clientid = CliendId, register = true, transport = Transport, socket = Socket, log = Log}, Payload) ->
|
||||
dgiot_tracer:check_trace(CliendId, CliendId, Payload),
|
||||
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1),
|
||||
write_log(Log, <<"SEND">>, Payload),
|
||||
case Socket == undefined of
|
||||
true ->
|
||||
{error, disconnected};
|
||||
false ->
|
||||
Transport:send(Socket, Payload)
|
||||
end;
|
||||
|
||||
send(#tcp{transport = Transport, socket = Socket, log = Log}, Payload) ->
|
||||
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1),
|
||||
write_log(Log, <<"SEND">>, Payload),
|
||||
@ -207,6 +224,7 @@ send(#tcp{transport = Transport, socket = Socket, log = Log}, Payload) ->
|
||||
Transport:send(Socket, Payload)
|
||||
end.
|
||||
|
||||
|
||||
rate_limit({Rate, Burst}) ->
|
||||
esockd_rate_limit:new(Rate, Burst).
|
||||
|
||||
|
@ -83,23 +83,23 @@ get_trace({Type, Id}) when is_list(Id) ->
|
||||
get_trace({Type, Id}) when is_atom(Id) ->
|
||||
get_trace({Type, atom_to_binary(Id)});
|
||||
get_trace({clientid, ClientId}) ->
|
||||
ets:member(?DGIOT_CLIENT_TRACE, ClientId);
|
||||
ets:member(?DGIOT_CLIENT_TRACE, ClientId);
|
||||
get_trace({topic, Topic}) ->
|
||||
lists:any(fun({emqx_topic_trace,TopicFilter, _}) ->
|
||||
lists:any(fun({emqx_topic_trace, TopicFilter, _}) ->
|
||||
emqx_topic:match(Topic, TopicFilter)
|
||||
end, ets:tab2list(?DGIOT_TOPIC_TRACE));
|
||||
get_trace(_) ->
|
||||
false.
|
||||
|
||||
check_trace(From, Topic,Payload) ->
|
||||
check_trace(From, Topic, Payload) ->
|
||||
case get_trace({clientid, From}) of
|
||||
true ->
|
||||
BinClientId = dgiot_utils:to_binary(From),
|
||||
dgiot_mqtt:publish(self(), <<"logger_trace/trace/", BinClientId/binary,"/", Topic/binary>>, Payload);
|
||||
dgiot_mqtt:publish(self(), <<"logger_trace/trace/", BinClientId/binary, "/", Topic/binary>>, Payload);
|
||||
false ->
|
||||
case get_trace({topic, Topic}) of
|
||||
true ->
|
||||
dgiot_mqtt:publish(self(), <<"logger_trace/trace/", Topic/binary>>, Payload);
|
||||
dgiot_mqtt:publish(self(), <<"logger_trace/trace/", Topic/binary>>, Payload);
|
||||
false ->
|
||||
false
|
||||
end
|
||||
@ -122,7 +122,7 @@ insert(?DGIOT_CLIENT_TRACE, Key, Value) ->
|
||||
end,
|
||||
case get(?DGIOT_CLIENT_TRACE, Key) of
|
||||
not_find ->
|
||||
insert_(?DGIOT_CLIENT_TRACE,#?DGIOT_CLIENT_TRACE{key = Key, value = Value});
|
||||
insert_(?DGIOT_CLIENT_TRACE, #?DGIOT_CLIENT_TRACE{key = Key, value = Value});
|
||||
_ -> pass
|
||||
end,
|
||||
insert_(?DGIOT_CLIENT_TRACE, #?DGIOT_CLIENT_TRACE{key = Key, value = Value});
|
||||
@ -136,7 +136,7 @@ insert(?DGIOT_TOPIC_TRACE, Key, Value) ->
|
||||
end,
|
||||
case get(?DGIOT_TOPIC_TRACE, Key) of
|
||||
not_find ->
|
||||
insert_(?DGIOT_TOPIC_TRACE,#?DGIOT_TOPIC_TRACE{key = Key, value = Value});
|
||||
insert_(?DGIOT_TOPIC_TRACE, #?DGIOT_TOPIC_TRACE{key = Key, value = Value});
|
||||
_ -> pass
|
||||
end,
|
||||
insert_(?DGIOT_TOPIC_TRACE, #?DGIOT_TOPIC_TRACE{key = Key, value = Value}).
|
||||
|
@ -1,20 +1,44 @@
|
||||
[
|
||||
{
|
||||
"name": "dtu_login",
|
||||
"help": "服务收到登陆报文统计",
|
||||
"name": "dtu_online",
|
||||
"help": "dtu在线数量",
|
||||
"type": "gauge",
|
||||
"labels": []
|
||||
},
|
||||
{
|
||||
"name": "dtu_login_ack",
|
||||
"help": "服务发送注册报文统计",
|
||||
"name": "tcp_send",
|
||||
"help": "TCP采集通道发包数",
|
||||
"type": "gauge",
|
||||
"labels": []
|
||||
},
|
||||
{
|
||||
"name": "dtu_register",
|
||||
"help": "服务收到注册报文统计",
|
||||
"name": "mqtt_send",
|
||||
"help": "MQTT任务通道发包数",
|
||||
"type": "gauge",
|
||||
"labels": []
|
||||
},
|
||||
{
|
||||
"name": "tcp_revc",
|
||||
"help": "TCP采集通道收包数",
|
||||
"type": "gauge",
|
||||
"labels": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "mqtt_revc",
|
||||
"help": "MQTT任务通道收包数",
|
||||
"type": "gauge",
|
||||
"labels": []
|
||||
},
|
||||
{
|
||||
"name": "meter_register",
|
||||
"help": "电表注册成功数量",
|
||||
"type": "gauge",
|
||||
"labels": []
|
||||
},
|
||||
{
|
||||
"name": "search_meter",
|
||||
"help": "搜表次数",
|
||||
"type": "gauge",
|
||||
"labels": []
|
||||
}
|
||||
]
|
||||
|
@ -28,6 +28,7 @@
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
dgiot_metrics:start(dgiot_meter),
|
||||
{ok, Sup} = dgiot_meter_sup:start_link(),
|
||||
{ok, Sup}.
|
||||
|
||||
|
@ -36,35 +36,33 @@ init(TCPState) ->
|
||||
{ok, TCPState}.
|
||||
|
||||
%%设备登录报文,登陆成功后,开始搜表
|
||||
%
|
||||
%
|
||||
handle_info({tcp, DtuAddr}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = <<>>, search = Search} = State} = TCPState) ->
|
||||
DTUIP = dgiot_utils:get_ip(Socket),
|
||||
?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录) ~p,,,~p",[DtuAddr,TCPState]),
|
||||
{_,[Acc|_]} = dlt376_decoder:parse_frame(DtuAddr,[]),
|
||||
{_, [Acc | _]} = dlt376_decoder:parse_frame(DtuAddr, []),
|
||||
#{<<"msgtype">> := Protocol} = Acc,
|
||||
case Protocol of
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
#{<<"addr">> := MeterAddr} = Acc,
|
||||
dgiot_meter:create_meter4G(MeterAddr, ChannelId, DTUIP),
|
||||
{ProductId, _,_} = dgiot_data:get({meter, ChannelId}),
|
||||
{ProductId, _, _} = dgiot_data:get({meter, ChannelId}),
|
||||
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
|
||||
Topic = <<"thing/", ProductId/binary, "/", MeterAddr/binary>>,
|
||||
dgiot_mqtt:subscribe(Topic), %为这个设备订阅一个mqtt
|
||||
% timer:sleep(500),
|
||||
% TopicCtrl = <<"thingctrl/", ProductId/binary, "/", MeterAddr/binary>>,
|
||||
% dgiot_mqtt:subscribe(TopicCtrl),
|
||||
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "dtu ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]),
|
||||
{NewRef, NewStep} = {undefined, read_meter},
|
||||
{noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = MeterAddr, protocol = ?DLT376,ref = NewRef, step = NewStep}}};
|
||||
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
|
||||
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
|
||||
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = MeterAddr, protocol = ?DLT376, ref = NewRef, step = NewStep}}};
|
||||
?DLT645 ->
|
||||
% HexDtuAddr = dgiot_utils:binary_to_hex(DtuAddr),
|
||||
?LOG(info, "GGM 335 dgiot_meter_tcp, handle_info (DLT645创建) ~p,",[DtuAddr]),
|
||||
dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP),
|
||||
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
|
||||
Topic = <<"profile/", DtuProductId/binary, "/", DtuAddr/binary>>,
|
||||
dgiot_mqtt:subscribe(Topic),
|
||||
{NewRef, NewStep} =
|
||||
{NewRef, NewStep} =
|
||||
case Search of
|
||||
<<"nosearch">> ->
|
||||
[dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := #{<<"objectId">>:=Meterproductid}, <<"devaddr">> := Meteraddr}
|
||||
[dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := #{<<"objectId">> := Meterproductid}, <<"devaddr">> := Meteraddr}
|
||||
<- dgiot_meter:get_sub_device(DtuAddr)],
|
||||
{undefined, read_meter};
|
||||
<<"quick">> ->
|
||||
@ -74,26 +72,29 @@ handle_info({tcp, DtuAddr}, #tcp{socket = Socket, state = #state{id = ChannelId,
|
||||
{Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
|
||||
{Ref, Step}
|
||||
end,
|
||||
{noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT645,ref = NewRef, step = NewStep}}};
|
||||
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "dtu ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]),
|
||||
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
|
||||
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
|
||||
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT645, ref = NewRef, step = NewStep}}};
|
||||
_ ->
|
||||
?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录s失败) ~p",[dgiot_utils:binary_to_hex(DtuAddr)]),
|
||||
?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录s失败) ~p", [dgiot_utils:binary_to_hex(DtuAddr)]),
|
||||
dgiot_utils:binary_to_hex(DtuAddr)
|
||||
end;
|
||||
|
||||
%%定时器触发搜表
|
||||
handle_info(search_meter, #tcp{state = #state{ref = Ref,protocol = ?DLT645} = State} = TCPState) ->
|
||||
%%定时器触发搜表
|
||||
handle_info(search_meter, #tcp{state = #state{ref = Ref, protocol = ?DLT645} = State} = TCPState) ->
|
||||
{NewRef, Step, _Payload} = dgiot_meter:search_meter(tcp, Ref, TCPState, 1),
|
||||
{noreply, TCPState#tcp{buff = <<>>, state = State#state{ref = NewRef, step = Step}}};
|
||||
|
||||
%%ACK报文触发搜表
|
||||
handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, protocol = ?DLT645,ref = Ref, step = search_meter, search = Search} = State} = TCPState) ->
|
||||
?LOG(info, "from_dev: search_meter Buff ~p", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
?LOG(info, "from_dev: parse_frame Buff ~p", [dgiot_meter:parse_frame(?DLT645, Buff, [])]),
|
||||
handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, protocol = ?DLT645, ref = Ref, step = search_meter, search = Search} = State} = TCPState) ->
|
||||
dgiot_metrics:inc(dgiot_meter, <<"search_meter">>, 1),
|
||||
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
|
||||
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "dtu ~p (搜表)", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
|
||||
lists:map(fun(X) ->
|
||||
case X of
|
||||
#{<<"addr">> := Addr} ->
|
||||
?LOG(info, "from_dev: search_meter Addr ~p", [Addr]),
|
||||
DTUIP = dgiot_utils:get_ip(Socket),
|
||||
dgiot_meter:create_meter(dgiot_utils:binary_to_hex(Addr), ChannelId, DTUIP, DtuAddr);
|
||||
Other ->
|
||||
@ -115,31 +116,31 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
|
||||
end;
|
||||
|
||||
%%接受抄表任务命令抄表(下发指令)
|
||||
handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPState) ->
|
||||
handle_info({deliver, Topic, Msg}, #tcp{state = #state{id = _ChannelId}} = TCPState) ->
|
||||
Payload = dgiot_mqtt:get_payload(Msg),
|
||||
dgiot_bridge:send_log(ChannelId, "Topic ~p Msg ~p", [dgiot_mqtt:get_topic(Msg), Payload]),
|
||||
dgiot_metrics:inc(dgiot_meter, <<"mqtt_revc">>, 1),
|
||||
case jsx:is_json(Payload) of
|
||||
true ->
|
||||
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
|
||||
[<<"thing">>, _ProductId, _DevAddr] ->
|
||||
[<<"thing">>, ProductId, DevAddr] ->
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
#tcp{state = #state{protocol = Protocol}} = TCPState,
|
||||
case Protocol of
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
|
||||
Payload1 = dgiot_meter:to_frame(ThingData),
|
||||
dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
|
||||
% ?LOG(info, "GGM 214 task->dev: Payload ~p", [dgiot_utils:binary_to_hex(Payload1)]),
|
||||
dgiot_tracer:check_trace(DeviceId, Topic, Payload1),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
?DLT645 ->
|
||||
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
|
||||
Payload1 = dgiot_meter:to_frame(ThingData),
|
||||
% ?LOG(info, "GGM 216 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,Payload1]),
|
||||
dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
|
||||
dgiot_tcp_server:send(TCPState, Payload1)
|
||||
end;
|
||||
[<<"profile">>, _ProductId, _DtuAddr] ->
|
||||
[<<"profile">>, ProductId, DevAddr] ->
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
case Payload of
|
||||
#{<<"_dgiotprotocol">> := <<"hex">>} ->
|
||||
dgiot_tracer:check_trace(DeviceId, Topic, Payload),
|
||||
maps:fold(fun(_K, V, Acc) ->
|
||||
dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(V)),
|
||||
Acc
|
||||
@ -147,49 +148,44 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPSt
|
||||
_ ->
|
||||
pass
|
||||
end;
|
||||
[<<"thingctrl">>, _, _DtuAddr] ->
|
||||
[<<"thingctrl">>, ProductId, DevAddr] ->
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
#tcp{state = #state{protocol = Protocol}} = TCPState,
|
||||
?LOG(info, "GGM 250 dgiot_meter_tcp, thingctrl ~p",[TCPState]),
|
||||
case Protocol of
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
|
||||
% ?LOG(info, "GGM 205 dgiot_meter_tcp, handle_info10 ~p",[ThingData]),
|
||||
case ThingData of
|
||||
% 远程控制电表拉闸、合闸
|
||||
#{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"devpass">> := DevPass,<<"apiname">> := <<"get_meter_ctrl">>} ->
|
||||
% ?LOG(info, "GGM 206 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
|
||||
ThingData1 = #{
|
||||
<<"devaddr">> => DevAddr,
|
||||
<<"ctrlflag">> => CtrlFlag,
|
||||
<<"devpass">> => DevPass,
|
||||
<<"protocol">> => Protocol ,
|
||||
<<"protocol">> => Protocol,
|
||||
<<"apiname">> => get_meter_ctrl
|
||||
},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
% diot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
dgiot_tcp_server:send(DeviceId, TCPState, Payload1);
|
||||
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
|
||||
#{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"apiname">> := <<"get_meter_ctrl_status">>} ->
|
||||
?LOG(info, "GGM 235 dgiot_meter_tcp, handle_info9 ~p,~p",[DevAddr,Protocol]),
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
|
||||
% 上次合闸时间
|
||||
ThingData1 = #{
|
||||
<<"devaddr">> => DevAddr,
|
||||
<<"ctrlflag">> => CtrlFlag,
|
||||
<<"protocol">> => Protocol,
|
||||
<<"apiname">> => get_meter_ctrl_status
|
||||
},
|
||||
},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
_->
|
||||
?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p",[ThingData]),
|
||||
_ ->
|
||||
?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p", [ThingData]),
|
||||
pass
|
||||
end;
|
||||
?DLT645 ->
|
||||
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
|
||||
?LOG(info, "GGM 211 dgiot_meter_tcp, handle_info10 ~p",[ThingData]),
|
||||
case ThingData of
|
||||
% 远程控制电表拉闸、合闸
|
||||
#{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"devpass">> := DevPass,<<"apiname">> := <<"get_meter_ctrl">>} ->
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
|
||||
% ?LOG(info, "GGM 212 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
|
||||
ThingData1 = #{
|
||||
<<"devaddr">> => DevAddr,
|
||||
@ -199,21 +195,18 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPSt
|
||||
<<"apiname">> => get_meter_ctrl
|
||||
},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
?LOG(info, "GGM 213 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,dgiot_utils:to_hex(Payload1)]),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
|
||||
#{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"apiname">> := <<"get_meter_ctrl_status">>} ->
|
||||
?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p,~p",[DevAddr,Protocol]),
|
||||
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
|
||||
ThingData1 = #{
|
||||
<<"devaddr">> => DevAddr,
|
||||
<<"ctrlflag">> => CtrlFlag,
|
||||
<<"protocol">> => Protocol,
|
||||
<<"apiname">> => get_meter_ctrl_status
|
||||
},
|
||||
},
|
||||
Payload1 = dgiot_meter:to_frame(ThingData1),
|
||||
?LOG(info, "GGM 237 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,dgiot_utils:to_hex(Payload1)]),
|
||||
dgiot_tcp_server:send(TCPState, Payload1);
|
||||
_->
|
||||
_ ->
|
||||
pass
|
||||
end
|
||||
end;
|
||||
@ -225,55 +218,55 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPSt
|
||||
{noreply, TCPState};
|
||||
|
||||
|
||||
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId,protocol = Protocol,step = Step}} = TCPState) ->
|
||||
?LOG(info, "GGM 333 Buff========ALL REST========= ~p,~p,~p~n~n~n", [ChannelId,Step,dgiot_utils:binary_to_hex(Buff)]),
|
||||
case Protocol of
|
||||
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = Protocol, step = _Step}} = TCPState) ->
|
||||
case Protocol of
|
||||
?DLT376 ->
|
||||
dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
?LOG(info, "GGM 140 Buff========DLT376========= ~p", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
|
||||
?LOG(info, "GGM 246 Frames ~p", [Frames]),
|
||||
case Frames of
|
||||
% 返回抄表数据
|
||||
[#{<<"di">> :=<<16#01,16#01,16#01,16#10>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
[#{<<"di">> := <<16#01, 16#01, 16#01, 16#10>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/post">>, % 发送给mqtt进行数据存储
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
|
||||
_ -> pass
|
||||
end;
|
||||
% 返回读取上次合闸时间
|
||||
[#{<<"di">> :=<<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
[#{<<"di">> := <<16#1E, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
?LOG(info, "GGM 246 jsx:encode(Value) ~p", [jsx:encode(Value)]),
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
|
||||
_ -> pass
|
||||
end;
|
||||
% 返回读取上次拉闸时间
|
||||
[#{<<"di">> :=<<16#1D,16#00,16#01,16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
[#{<<"di">> := <<16#1D, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
?LOG(info, "GGM 245 jsx:encode(Value) ~p", [jsx:encode(Value)]),
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
|
||||
_ -> pass
|
||||
end;
|
||||
% 拉闸,合闸成功
|
||||
[#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
[#{<<"di">> := <<16#FE, 16#FE, 16#FE, 16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
|
||||
_ -> pass
|
||||
end;
|
||||
% 拉闸,合闸失败
|
||||
[#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
[#{<<"di">> := <<16#FE, 16#FE, 16#FE, 16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
|
||||
_ -> pass
|
||||
end;
|
||||
_ -> pass
|
||||
@ -281,193 +274,73 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId,protocol = Protocol,
|
||||
{noreply, TCPState#tcp{buff = Rest}};
|
||||
?DLT645 ->
|
||||
dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
?LOG(info, "GGM 222 Meter Rev Buff========DLT645========= ~p~n~n~n", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
{Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
|
||||
% ?LOG(info, "GGM 245 Frames ~p", [Frames]),
|
||||
case Frames of
|
||||
% 抄表数据返回
|
||||
[#{<<"command">> := 16#91,<<"di">>:=<<16#00,16#00,16#00,16#00>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
[#{<<"command">> := 16#91, <<"di">> := <<16#00, 16#00, 16#00, 16#00>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
|
||||
% ?LOG(info, "GGM 247 jsx:encode(Value) ~p", [jsx:encode(Value)]),
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
|
||||
_ -> pass
|
||||
end;
|
||||
% 查询上一次合闸时间返回
|
||||
[#{<<"command">>:=16#91,<<"di">> := <<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
Di = <<16#1E,16#00,16#01,16#01>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
|
||||
?LOG(info, "GGM 248 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
_ -> pass
|
||||
end;
|
||||
% 查询上一次拉闸时间返回
|
||||
[#{<<"command">>:=16#91,<<"di">> := <<16#1D,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
Di = <<16#1D,16#00,16#01,16#01>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
|
||||
?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
_ -> pass
|
||||
end;
|
||||
% 拉闸,合闸成功
|
||||
[#{<<"command">>:=16#9C, <<"addr">> := Addr} | _] ->
|
||||
[#{<<"command">> := 16#91, <<"di">> := <<16#1E, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
Di = <<16#FE,16#FE,16#FE,16#FE>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di)=>0 },
|
||||
?LOG(info, "GGM 250 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
Di = <<16#1E, 16#00, 16#01, 16#01>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
|
||||
_ -> pass
|
||||
end;
|
||||
% 查询上一次拉闸时间返回
|
||||
[#{<<"command">> := 16#91, <<"di">> := <<16#1D, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
Di = <<16#1D, 16#00, 16#01, 16#01>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
|
||||
_ -> pass
|
||||
end;
|
||||
% 拉闸,合闸成功
|
||||
[#{<<"command">> := 16#9C, <<"addr">> := Addr} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
Di = <<16#FE, 16#FE, 16#FE, 16#FE>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di) => 0},
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
|
||||
_ -> pass
|
||||
end;
|
||||
% 拉闸,合闸失败
|
||||
[#{<<"command">>:=16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] ->
|
||||
[#{<<"command">> := 16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] ->
|
||||
case dgiot_data:get({meter, ChannelId}) of
|
||||
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
Di = <<16#FE,16#FE,16#FE,16#FD>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di)=>dgiot_utils:to_hex(Value)},
|
||||
?LOG(info, "GGM 251 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
Di = <<16#FE, 16#FE, 16#FE, 16#FD>>,
|
||||
DValue = #{dgiot_utils:to_hex(Di) => dgiot_utils:to_hex(Value)},
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
|
||||
_ -> pass
|
||||
end;
|
||||
_ -> pass
|
||||
end,
|
||||
{noreply, TCPState#tcp{buff = Rest}};
|
||||
_ ->
|
||||
{noreply, TCPState#tcp{buff = <<0,0,0,0,0,0,0,0>>}}
|
||||
_ ->
|
||||
{noreply, TCPState#tcp{buff = <<0, 0, 0, 0, 0, 0, 0, 0>>}}
|
||||
end;
|
||||
|
||||
% %% 接收抄表任务的ACK报文DLT376
|
||||
% handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = ?DLT376,step = read_meter}} = TCPState) ->
|
||||
% dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
% ?LOG(info, "GGM 140 Buff========DLT376========= ~p", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
% {Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
|
||||
% ?LOG(info, "GGM 246 Frames ~p", [Frames]),
|
||||
% case Frames of
|
||||
% % 返回抄表数据
|
||||
% [#{<<"di">> :=<<16#01,16#01,16#01,16#10>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/post">>, % 发送给mqtt进行数据存储
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 返回读取上次合闸时间
|
||||
% [#{<<"di">> :=<<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
% ?LOG(info, "GGM 246 jsx:encode(Value) ~p", [jsx:encode(Value)]),
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 返回读取上次拉闸时间
|
||||
% [#{<<"di">> :=<<16#1D,16#00,16#01,16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
% ?LOG(info, "GGM 245 jsx:encode(Value) ~p", [jsx:encode(Value)]),
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 拉闸,合闸成功
|
||||
% [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 拉闸,合闸失败
|
||||
% [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% _ -> pass
|
||||
% end,
|
||||
% {noreply, TCPState#tcp{buff = Rest}};
|
||||
|
||||
|
||||
|
||||
% %% 接收抄表任务的ACK报文DLT645
|
||||
% handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = ?DLT645,step = read_meter}} = TCPState) ->
|
||||
% dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
% ?LOG(info, "GGM 222 Meter Rev Buff========DLT645========= ~p~n~n~n", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
% {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
|
||||
% % ?LOG(info, "GGM 245 Frames ~p", [Frames]),
|
||||
% case Frames of
|
||||
% % 抄表数据返回
|
||||
% [#{<<"command">> := 16#91,<<"di">>:=<<16#00,16#00,16#00,16#00>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
|
||||
% % ?LOG(info, "GGM 247 jsx:encode(Value) ~p", [jsx:encode(Value)]),
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 查询上一次合闸时间返回
|
||||
% [#{<<"command">>:=16#91,<<"di">> := <<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
% Di = <<16#1E,16#00,16#01,16#01>>,
|
||||
% DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
|
||||
% ?LOG(info, "GGM 248 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 查询上一次拉闸时间返回
|
||||
% [#{<<"command">>:=16#91,<<"di">> := <<16#1D,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
% Di = <<16#1D,16#00,16#01,16#01>>,
|
||||
% DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
|
||||
% ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 拉闸,合闸成功
|
||||
% [#{<<"command">>:=16#9C, <<"addr">> := Addr} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
% Di = <<16#FE,16#FE,16#FE,16#FE>>,
|
||||
% DValue = #{dgiot_utils:to_hex(Di)=>0 },
|
||||
% ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% % 拉闸,合闸失败
|
||||
% [#{<<"command">>:=16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] ->
|
||||
% case dgiot_data:get({meter, ChannelId}) of
|
||||
% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
|
||||
% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
|
||||
% Di = <<16#FE,16#FE,16#FE,16#FD>>,
|
||||
% DValue = #{dgiot_utils:to_hex(Di)=>dgiot_utils:to_hex(Value)},
|
||||
% ?LOG(info, "GGM 250 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
|
||||
% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
|
||||
% _ -> pass
|
||||
% end;
|
||||
% _ -> pass
|
||||
% end,
|
||||
% {noreply, TCPState#tcp{buff = Rest}};
|
||||
|
||||
%% 异常报文丢弃
|
||||
%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop
|
||||
handle_info(_Info, TCPState) ->
|
||||
?LOG(info, "GGM 999 Meter Rev Buff========ERROR========= dgiot_meter_tcp, handle_info9, ~p,~p~n~n~n ",[_Info,TCPState]),
|
||||
?LOG(info, "GGM 999 Meter Rev Buff========ERROR========= dgiot_meter_tcp, handle_info9, ~p,~p~n~n~n ", [_Info, TCPState]),
|
||||
{noreply, TCPState}.
|
||||
|
||||
handle_call(_Msg, _From, TCPState) ->
|
||||
@ -477,7 +350,7 @@ handle_cast(_Msg, TCPState) ->
|
||||
{noreply, TCPState}.
|
||||
|
||||
terminate(_Reason, _TCPState) ->
|
||||
dgiot_metrics:dec(dgiot_meter, <<"dtu_login">>, 1),
|
||||
dgiot_metrics:dec(dgiot_meter, <<"dtu_online">>, 1),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, TCPState, _Extra) ->
|
||||
|
@ -374,7 +374,7 @@ get_app(ProductId, Results, DeviceId) ->
|
||||
end,
|
||||
FormattedAddress;
|
||||
_ ->
|
||||
<<"[", Longitude/binary, ",", Latitude/binary, "]经纬度解析错误"/utf8>>
|
||||
<<"[", BinV/binary, "]经纬度解析错误"/utf8>>
|
||||
end;
|
||||
_ ->
|
||||
<<"无GPS信息"/utf8>>
|
||||
|
@ -52,7 +52,7 @@ get_topo(Arg, _Context) ->
|
||||
end;
|
||||
_ ->
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, Devaddr),
|
||||
case dgiot_tdengine:get_device(<<"09d0bbcf44">>, <<"9CA525B343F0">>, #{<<"keys">> => <<"last_row(*)">>, <<"limit">> => 1}) of
|
||||
case dgiot_tdengine:get_device(ProductId, Devaddr, #{<<"keys">> => <<"last_row(*)">>, <<"limit">> => 1}) of
|
||||
{ok, #{<<"results">> := [Result | _]}} ->
|
||||
put({self(), td}, Result);
|
||||
_ ->
|
||||
@ -331,7 +331,7 @@ get_gpsaddr(V) ->
|
||||
#{<<"baiduaddr">> := #{<<"formatted_address">> := FormattedAddress}} ->
|
||||
FormattedAddress;
|
||||
_ ->
|
||||
<<"[", Longitude/binary, ",", Latitude/binary, "]经纬度解析错误"/utf8>>
|
||||
<<"[", BinV/binary, "]经纬度解析错误"/utf8>>
|
||||
end;
|
||||
_ ->
|
||||
<<"无GPS信息"/utf8>>
|
||||
|
Loading…
Reference in New Issue
Block a user