From 1eadb3acd12b3b6c7c818771999f2b5c3fc8b9a7 Mon Sep 17 00:00:00 2001 From: AvantLiu Date: Sat, 9 Oct 2021 12:55:34 +0800 Subject: [PATCH] feat: trace --- apps/dgiot/src/transport/dgiot_tcp_server.erl | 42 ++- apps/dgiot/src/utils/dgiot_tracer.erl | 14 +- apps/dgiot_meter/priv/dgiot_meter.metrics | 40 ++- apps/dgiot_meter/src/dgiot_meter_app.erl | 1 + apps/dgiot_meter/src/dgiot_meter_tcp.erl | 339 ++++++------------ .../src/handler/dgiot_tdengine_handler.erl | 2 +- apps/dgiot_topo/src/dgiot_topo.erl | 4 +- 7 files changed, 179 insertions(+), 263 deletions(-) diff --git a/apps/dgiot/src/transport/dgiot_tcp_server.erl b/apps/dgiot/src/transport/dgiot_tcp_server.erl index cde5f44f..50f2f17a 100644 --- a/apps/dgiot/src/transport/dgiot_tcp_server.erl +++ b/apps/dgiot/src/transport/dgiot_tcp_server.erl @@ -99,7 +99,7 @@ handle_info({tcp_passive, _Sock}, State) -> %% add register function handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{register = false, buff = Buff, socket = Sock} = ChildState} = State) -> - dgiot_metrics:inc(dgiot_bridge,<<"tcp_server_recv">>,1), + dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1), Binary = iolist_to_binary(Data), NewBin = case binary:referenced_byte_size(Binary) of @@ -118,14 +118,14 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{register = false, Port = dgiot_utils:get_port(Sock), dgiot_cm:insert_channel_info(ClientId,#{ip => Ip, port => Port,online => dgiot_datetime:now_microsecs()},[{tcp_recv, 1}]), {noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate}; - {noreply, NewChild} -> + {noreply, NewChild} -> {noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate}; {stop, Reason, NewChild} -> {stop, Reason, State#state{child = NewChild}} end; handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socket = Sock} = ChildState} = State) -> - dgiot_metrics:inc(dgiot_bridge,<<"tcp_server_recv">>,1), + dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_recv">>, 1), Binary = iolist_to_binary(Data), NewBin = case binary:referenced_byte_size(Binary) of @@ -135,9 +135,15 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socke Binary end, write_log(ChildState#tcp.log, <<"RECV">>, NewBin), - ?LOG(info,"ChildState ~p",[ChildState]), + ?LOG(info, "ChildState ~p", [ChildState]), Cnt = byte_size(NewBin), NewChildState = ChildState#tcp{buff = <<>>}, + case NewChildState of + #tcp{clientid = CliendId, register = true}-> + dgiot_tracer:check_trace(CliendId, CliendId, Binary); + _-> + pass + end, case Mod:handle_info({tcp, <>}, NewChildState) of {noreply, NewChild} -> {noreply, State#state{child = NewChild, incoming_bytes = Cnt}, hibernate}; @@ -146,25 +152,25 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socke end; handle_info({shutdown, Reason}, #state{child = #tcp{clientid = CliendId, register = true} = ChildState} = State) -> - ?LOG(error,"shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]), - dgiot_cm:connection_closed(CliendId), + ?LOG(error, "shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]), + dgiot_cm:unregister_channel(CliendId), write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))), {stop, normal, State#state{child = ChildState#tcp{socket = undefined}}}; handle_info({shutdown, Reason}, #state{child = ChildState} = State) -> - ?LOG(error,"shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]), + ?LOG(error, "shutdown, ~p, ~p~n", [Reason, ChildState#tcp.state]), write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))), {stop, normal, State#state{child = ChildState#tcp{socket = undefined}}}; handle_info({tcp_error, _Sock, Reason}, #state{child = ChildState} = State) -> - ?LOG(error,"tcp_error, ~p, ~p~n", [Reason, ChildState#tcp.state]), + ?LOG(error, "tcp_error, ~p, ~p~n", [Reason, ChildState#tcp.state]), write_log(ChildState#tcp.log, <<"ERROR">>, list_to_binary(io_lib:format("~w", [Reason]))), {stop, {shutdown, Reason}, State}; handle_info({tcp_closed, Sock}, #state{mod = Mod, child = #tcp{socket = Sock} = ChildState} = State) -> write_log(ChildState#tcp.log, <<"ERROR">>, <<"tcp_closed">>), - ?LOG(error,"tcp_closed ~p", [ChildState#tcp.state]), + ?LOG(error, "tcp_closed ~p", [ChildState#tcp.state]), case Mod:handle_info(tcp_closed, ChildState) of {noreply, NewChild} -> {stop, normal, State#state{child = NewChild#tcp{socket = undefined}}}; @@ -181,12 +187,12 @@ handle_info(Info, #state{mod = Mod, child = ChildState} = State) -> end. terminate(Reason, #state{mod = Mod, child = #tcp{clientid = CliendId, register = true} = ChildState}) -> - dgiot_cm:connection_closed(CliendId), - dgiot_metrics:dec(dgiot_bridge,<<"tcp_server">>, 1), + dgiot_cm:unregister_channel(CliendId), + dgiot_metrics:dec(dgiot_bridge, <<"tcp_server">>, 1), Mod:terminate(Reason, ChildState); terminate(Reason, #state{mod = Mod, child = ChildState}) -> - dgiot_metrics:dec(dgiot_bridge,<<"tcp_server">>,1), + dgiot_metrics:dec(dgiot_bridge, <<"tcp_server">>, 1), Mod:terminate(Reason, ChildState). code_change(OldVsn, #state{mod = Mod, child = ChildState} = State, Extra) -> @@ -197,6 +203,17 @@ code_change(OldVsn, #state{mod = Mod, child = ChildState} = State, Extra) -> %%% Internal functions %%%=================================================================== +send(#tcp{clientid = CliendId, register = true, transport = Transport, socket = Socket, log = Log}, Payload) -> + dgiot_tracer:check_trace(CliendId, CliendId, Payload), + dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1), + write_log(Log, <<"SEND">>, Payload), + case Socket == undefined of + true -> + {error, disconnected}; + false -> + Transport:send(Socket, Payload) + end; + send(#tcp{transport = Transport, socket = Socket, log = Log}, Payload) -> dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1), write_log(Log, <<"SEND">>, Payload), @@ -207,6 +224,7 @@ send(#tcp{transport = Transport, socket = Socket, log = Log}, Payload) -> Transport:send(Socket, Payload) end. + rate_limit({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). diff --git a/apps/dgiot/src/utils/dgiot_tracer.erl b/apps/dgiot/src/utils/dgiot_tracer.erl index b709e99f..18686bb2 100644 --- a/apps/dgiot/src/utils/dgiot_tracer.erl +++ b/apps/dgiot/src/utils/dgiot_tracer.erl @@ -83,23 +83,23 @@ get_trace({Type, Id}) when is_list(Id) -> get_trace({Type, Id}) when is_atom(Id) -> get_trace({Type, atom_to_binary(Id)}); get_trace({clientid, ClientId}) -> - ets:member(?DGIOT_CLIENT_TRACE, ClientId); + ets:member(?DGIOT_CLIENT_TRACE, ClientId); get_trace({topic, Topic}) -> - lists:any(fun({emqx_topic_trace,TopicFilter, _}) -> + lists:any(fun({emqx_topic_trace, TopicFilter, _}) -> emqx_topic:match(Topic, TopicFilter) end, ets:tab2list(?DGIOT_TOPIC_TRACE)); get_trace(_) -> false. -check_trace(From, Topic,Payload) -> +check_trace(From, Topic, Payload) -> case get_trace({clientid, From}) of true -> BinClientId = dgiot_utils:to_binary(From), - dgiot_mqtt:publish(self(), <<"logger_trace/trace/", BinClientId/binary,"/", Topic/binary>>, Payload); + dgiot_mqtt:publish(self(), <<"logger_trace/trace/", BinClientId/binary, "/", Topic/binary>>, Payload); false -> case get_trace({topic, Topic}) of true -> - dgiot_mqtt:publish(self(), <<"logger_trace/trace/", Topic/binary>>, Payload); + dgiot_mqtt:publish(self(), <<"logger_trace/trace/", Topic/binary>>, Payload); false -> false end @@ -122,7 +122,7 @@ insert(?DGIOT_CLIENT_TRACE, Key, Value) -> end, case get(?DGIOT_CLIENT_TRACE, Key) of not_find -> - insert_(?DGIOT_CLIENT_TRACE,#?DGIOT_CLIENT_TRACE{key = Key, value = Value}); + insert_(?DGIOT_CLIENT_TRACE, #?DGIOT_CLIENT_TRACE{key = Key, value = Value}); _ -> pass end, insert_(?DGIOT_CLIENT_TRACE, #?DGIOT_CLIENT_TRACE{key = Key, value = Value}); @@ -136,7 +136,7 @@ insert(?DGIOT_TOPIC_TRACE, Key, Value) -> end, case get(?DGIOT_TOPIC_TRACE, Key) of not_find -> - insert_(?DGIOT_TOPIC_TRACE,#?DGIOT_TOPIC_TRACE{key = Key, value = Value}); + insert_(?DGIOT_TOPIC_TRACE, #?DGIOT_TOPIC_TRACE{key = Key, value = Value}); _ -> pass end, insert_(?DGIOT_TOPIC_TRACE, #?DGIOT_TOPIC_TRACE{key = Key, value = Value}). diff --git a/apps/dgiot_meter/priv/dgiot_meter.metrics b/apps/dgiot_meter/priv/dgiot_meter.metrics index 24bc7d8f..5c089e07 100644 --- a/apps/dgiot_meter/priv/dgiot_meter.metrics +++ b/apps/dgiot_meter/priv/dgiot_meter.metrics @@ -1,20 +1,44 @@ [ { - "name": "dtu_login", - "help": "服务收到登陆报文统计", + "name": "dtu_online", + "help": "dtu在线数量", "type": "gauge", "labels": [] }, { - "name": "dtu_login_ack", - "help": "服务发送注册报文统计", + "name": "tcp_send", + "help": "TCP采集通道发包数", "type": "gauge", "labels": [] }, { - "name": "dtu_register", - "help": "服务收到注册报文统计", + "name": "mqtt_send", + "help": "MQTT任务通道发包数", + "type": "gauge", + "labels": [] + }, + { + "name": "tcp_revc", + "help": "TCP采集通道收包数", "type": "gauge", "labels": [] - } -] \ No newline at end of file + }, + { + "name": "mqtt_revc", + "help": "MQTT任务通道收包数", + "type": "gauge", + "labels": [] + }, + { + "name": "meter_register", + "help": "电表注册成功数量", + "type": "gauge", + "labels": [] + }, + { + "name": "search_meter", + "help": "搜表次数", + "type": "gauge", + "labels": [] + } +] diff --git a/apps/dgiot_meter/src/dgiot_meter_app.erl b/apps/dgiot_meter/src/dgiot_meter_app.erl index 065aebca..f9fce085 100644 --- a/apps/dgiot_meter/src/dgiot_meter_app.erl +++ b/apps/dgiot_meter/src/dgiot_meter_app.erl @@ -28,6 +28,7 @@ %%-------------------------------------------------------------------- start(_StartType, _StartArgs) -> + dgiot_metrics:start(dgiot_meter), {ok, Sup} = dgiot_meter_sup:start_link(), {ok, Sup}. diff --git a/apps/dgiot_meter/src/dgiot_meter_tcp.erl b/apps/dgiot_meter/src/dgiot_meter_tcp.erl index 9e743f33..8e90733a 100644 --- a/apps/dgiot_meter/src/dgiot_meter_tcp.erl +++ b/apps/dgiot_meter/src/dgiot_meter_tcp.erl @@ -36,35 +36,33 @@ init(TCPState) -> {ok, TCPState}. %%设备登录报文,登陆成功后,开始搜表 -% +% handle_info({tcp, DtuAddr}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = <<>>, search = Search} = State} = TCPState) -> DTUIP = dgiot_utils:get_ip(Socket), - ?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录) ~p,,,~p",[DtuAddr,TCPState]), - {_,[Acc|_]} = dlt376_decoder:parse_frame(DtuAddr,[]), + {_, [Acc | _]} = dlt376_decoder:parse_frame(DtuAddr, []), #{<<"msgtype">> := Protocol} = Acc, - case Protocol of + case Protocol of ?DLT376 -> #{<<"addr">> := MeterAddr} = Acc, dgiot_meter:create_meter4G(MeterAddr, ChannelId, DTUIP), - {ProductId, _,_} = dgiot_data:get({meter, ChannelId}), + {ProductId, _, _} = dgiot_data:get({meter, ChannelId}), + {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), Topic = <<"thing/", ProductId/binary, "/", MeterAddr/binary>>, dgiot_mqtt:subscribe(Topic), %为这个设备订阅一个mqtt - % timer:sleep(500), - % TopicCtrl = <<"thingctrl/", ProductId/binary, "/", MeterAddr/binary>>, - % dgiot_mqtt:subscribe(TopicCtrl), + dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "dtu ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]), {NewRef, NewStep} = {undefined, read_meter}, - {noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = MeterAddr, protocol = ?DLT376,ref = NewRef, step = NewStep}}}; + DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr), + dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1), + {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = MeterAddr, protocol = ?DLT376, ref = NewRef, step = NewStep}}}; ?DLT645 -> - % HexDtuAddr = dgiot_utils:binary_to_hex(DtuAddr), - ?LOG(info, "GGM 335 dgiot_meter_tcp, handle_info (DLT645创建) ~p,",[DtuAddr]), dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP), {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), Topic = <<"profile/", DtuProductId/binary, "/", DtuAddr/binary>>, dgiot_mqtt:subscribe(Topic), - {NewRef, NewStep} = + {NewRef, NewStep} = case Search of <<"nosearch">> -> - [dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := #{<<"objectId">>:=Meterproductid}, <<"devaddr">> := Meteraddr} + [dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := #{<<"objectId">> := Meterproductid}, <<"devaddr">> := Meteraddr} <- dgiot_meter:get_sub_device(DtuAddr)], {undefined, read_meter}; <<"quick">> -> @@ -74,26 +72,29 @@ handle_info({tcp, DtuAddr}, #tcp{socket = Socket, state = #state{id = ChannelId, {Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1), {Ref, Step} end, - {noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT645,ref = NewRef, step = NewStep}}}; + dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "dtu ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]), + DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr), + dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1), + {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT645, ref = NewRef, step = NewStep}}}; _ -> - ?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录s失败) ~p",[dgiot_utils:binary_to_hex(DtuAddr)]), + ?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录s失败) ~p", [dgiot_utils:binary_to_hex(DtuAddr)]), dgiot_utils:binary_to_hex(DtuAddr) end; -%%定时器触发搜表 -handle_info(search_meter, #tcp{state = #state{ref = Ref,protocol = ?DLT645} = State} = TCPState) -> +%%定时器触发搜表 +handle_info(search_meter, #tcp{state = #state{ref = Ref, protocol = ?DLT645} = State} = TCPState) -> {NewRef, Step, _Payload} = dgiot_meter:search_meter(tcp, Ref, TCPState, 1), {noreply, TCPState#tcp{buff = <<>>, state = State#state{ref = NewRef, step = Step}}}; %%ACK报文触发搜表 -handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, protocol = ?DLT645,ref = Ref, step = search_meter, search = Search} = State} = TCPState) -> - ?LOG(info, "from_dev: search_meter Buff ~p", [dgiot_utils:binary_to_hex(Buff)]), - ?LOG(info, "from_dev: parse_frame Buff ~p", [dgiot_meter:parse_frame(?DLT645, Buff, [])]), +handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, protocol = ?DLT645, ref = Ref, step = search_meter, search = Search} = State} = TCPState) -> + dgiot_metrics:inc(dgiot_meter, <<"search_meter">>, 1), + {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), + dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "dtu ~p (搜表)", [dgiot_utils:binary_to_hex(Buff)]), {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []), lists:map(fun(X) -> case X of #{<<"addr">> := Addr} -> - ?LOG(info, "from_dev: search_meter Addr ~p", [Addr]), DTUIP = dgiot_utils:get_ip(Socket), dgiot_meter:create_meter(dgiot_utils:binary_to_hex(Addr), ChannelId, DTUIP, DtuAddr); Other -> @@ -115,31 +116,31 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt end; %%接受抄表任务命令抄表(下发指令) -handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPState) -> +handle_info({deliver, Topic, Msg}, #tcp{state = #state{id = _ChannelId}} = TCPState) -> Payload = dgiot_mqtt:get_payload(Msg), - dgiot_bridge:send_log(ChannelId, "Topic ~p Msg ~p", [dgiot_mqtt:get_topic(Msg), Payload]), + dgiot_metrics:inc(dgiot_meter, <<"mqtt_revc">>, 1), case jsx:is_json(Payload) of true -> case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of - [<<"thing">>, _ProductId, _DevAddr] -> + [<<"thing">>, ProductId, DevAddr] -> + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), #tcp{state = #state{protocol = Protocol}} = TCPState, - case Protocol of + case Protocol of ?DLT376 -> [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]), Payload1 = dgiot_meter:to_frame(ThingData), - dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]), - % ?LOG(info, "GGM 214 task->dev: Payload ~p", [dgiot_utils:binary_to_hex(Payload1)]), + dgiot_tracer:check_trace(DeviceId, Topic, Payload1), dgiot_tcp_server:send(TCPState, Payload1); ?DLT645 -> [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]), Payload1 = dgiot_meter:to_frame(ThingData), - % ?LOG(info, "GGM 216 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,Payload1]), - dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]), dgiot_tcp_server:send(TCPState, Payload1) end; - [<<"profile">>, _ProductId, _DtuAddr] -> + [<<"profile">>, ProductId, DevAddr] -> + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), case Payload of #{<<"_dgiotprotocol">> := <<"hex">>} -> + dgiot_tracer:check_trace(DeviceId, Topic, Payload), maps:fold(fun(_K, V, Acc) -> dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(V)), Acc @@ -147,49 +148,44 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPSt _ -> pass end; - [<<"thingctrl">>, _, _DtuAddr] -> + [<<"thingctrl">>, ProductId, DevAddr] -> + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), #tcp{state = #state{protocol = Protocol}} = TCPState, - ?LOG(info, "GGM 250 dgiot_meter_tcp, thingctrl ~p",[TCPState]), - case Protocol of + case Protocol of ?DLT376 -> [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]), - % ?LOG(info, "GGM 205 dgiot_meter_tcp, handle_info10 ~p",[ThingData]), case ThingData of % 远程控制电表拉闸、合闸 - #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"devpass">> := DevPass,<<"apiname">> := <<"get_meter_ctrl">>} -> - % ?LOG(info, "GGM 206 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]), + #{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} -> ThingData1 = #{ <<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, - <<"protocol">> => Protocol , + <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl }, Payload1 = dgiot_meter:to_frame(ThingData1), - % diot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]), - dgiot_tcp_server:send(TCPState, Payload1); + dgiot_tcp_server:send(DeviceId, TCPState, Payload1); % 获取上次拉闸、合闸的时间(一次发送两条查询指令) - #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"apiname">> := <<"get_meter_ctrl_status">>} -> - ?LOG(info, "GGM 235 dgiot_meter_tcp, handle_info9 ~p,~p",[DevAddr,Protocol]), + #{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} -> % 上次合闸时间 ThingData1 = #{ <<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status - }, + }, Payload1 = dgiot_meter:to_frame(ThingData1), dgiot_tcp_server:send(TCPState, Payload1); - _-> - ?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p",[ThingData]), + _ -> + ?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p", [ThingData]), pass end; ?DLT645 -> [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]), - ?LOG(info, "GGM 211 dgiot_meter_tcp, handle_info10 ~p",[ThingData]), case ThingData of % 远程控制电表拉闸、合闸 - #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"devpass">> := DevPass,<<"apiname">> := <<"get_meter_ctrl">>} -> + #{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} -> % ?LOG(info, "GGM 212 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]), ThingData1 = #{ <<"devaddr">> => DevAddr, @@ -199,21 +195,18 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPSt <<"apiname">> => get_meter_ctrl }, Payload1 = dgiot_meter:to_frame(ThingData1), - ?LOG(info, "GGM 213 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,dgiot_utils:to_hex(Payload1)]), dgiot_tcp_server:send(TCPState, Payload1); % 获取上次拉闸、合闸的时间(一次发送两条查询指令) - #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"apiname">> := <<"get_meter_ctrl_status">>} -> - ?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p,~p",[DevAddr,Protocol]), + #{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} -> ThingData1 = #{ <<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status - }, + }, Payload1 = dgiot_meter:to_frame(ThingData1), - ?LOG(info, "GGM 237 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,dgiot_utils:to_hex(Payload1)]), dgiot_tcp_server:send(TCPState, Payload1); - _-> + _ -> pass end end; @@ -225,55 +218,55 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPSt {noreply, TCPState}; -handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId,protocol = Protocol,step = Step}} = TCPState) -> - ?LOG(info, "GGM 333 Buff========ALL REST========= ~p,~p,~p~n~n~n", [ChannelId,Step,dgiot_utils:binary_to_hex(Buff)]), - case Protocol of +handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = Protocol, step = _Step}} = TCPState) -> + case Protocol of ?DLT376 -> dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]), - ?LOG(info, "GGM 140 Buff========DLT376========= ~p", [dgiot_utils:binary_to_hex(Buff)]), {Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []), - ?LOG(info, "GGM 246 Frames ~p", [Frames]), case Frames of % 返回抄表数据 - [#{<<"di">> :=<<16#01,16#01,16#01,16#10>>,<<"addr">> := Addr, <<"value">> := Value} | _] -> + [#{<<"di">> := <<16#01, 16#01, 16#01, 16#10>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/post">>, % 发送给mqtt进行数据存储 - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value)); _ -> pass end; % 返回读取上次合闸时间 - [#{<<"di">> :=<<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr, <<"value">> := Value} | _] -> + [#{<<"di">> := <<16#1E, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), - Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, - ?LOG(info, "GGM 246 jsx:encode(Value) ~p", [jsx:encode(Value)]), - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); + Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value)); _ -> pass end; % 返回读取上次拉闸时间 - [#{<<"di">> :=<<16#1D,16#00,16#01,16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> + [#{<<"di">> := <<16#1D, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), - Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, - ?LOG(info, "GGM 245 jsx:encode(Value) ~p", [jsx:encode(Value)]), - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); + Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value)); _ -> pass end; % 拉闸,合闸成功 - [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> + [#{<<"di">> := <<16#FE, 16#FE, 16#FE, 16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), - Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); + Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value)); _ -> pass end; % 拉闸,合闸失败 - [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> + [#{<<"di">> := <<16#FE, 16#FE, 16#FE, 16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), - Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); + Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value)); _ -> pass end; _ -> pass @@ -281,193 +274,73 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId,protocol = Protocol, {noreply, TCPState#tcp{buff = Rest}}; ?DLT645 -> dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]), - ?LOG(info, "GGM 222 Meter Rev Buff========DLT645========= ~p~n~n~n", [dgiot_utils:binary_to_hex(Buff)]), {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []), % ?LOG(info, "GGM 245 Frames ~p", [Frames]), case Frames of % 抄表数据返回 - [#{<<"command">> := 16#91,<<"di">>:=<<16#00,16#00,16#00,16#00>>,<<"addr">> := Addr, <<"value">> := Value} | _] -> + [#{<<"command">> := 16#91, <<"di">> := <<16#00, 16#00, 16#00, 16#00>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>, - % ?LOG(info, "GGM 247 jsx:encode(Value) ~p", [jsx:encode(Value)]), - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value)); _ -> pass end; % 查询上一次合闸时间返回 - [#{<<"command">>:=16#91,<<"di">> := <<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] -> - case dgiot_data:get({meter, ChannelId}) of - {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), - Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, - Di = <<16#1E,16#00,16#01,16#01>>, - DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, - ?LOG(info, "GGM 248 jsx:encode(Value) ~p", [jsx:encode(DValue)]), - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); - _ -> pass - end; - % 查询上一次拉闸时间返回 - [#{<<"command">>:=16#91,<<"di">> := <<16#1D,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] -> - case dgiot_data:get({meter, ChannelId}) of - {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), - Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, - Di = <<16#1D,16#00,16#01,16#01>>, - DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, - ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]), - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); - _ -> pass - end; - % 拉闸,合闸成功 - [#{<<"command">>:=16#9C, <<"addr">> := Addr} | _] -> + [#{<<"command">> := 16#91, <<"di">> := <<16#1E, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, - Di = <<16#FE,16#FE,16#FE,16#FE>>, - DValue = #{dgiot_utils:to_hex(Di)=>0 }, - ?LOG(info, "GGM 250 jsx:encode(Value) ~p", [jsx:encode(DValue)]), - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); + Di = <<16#1E, 16#00, 16#01, 16#01>>, + DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue)); + _ -> pass + end; + % 查询上一次拉闸时间返回 + [#{<<"command">> := 16#91, <<"di">> := <<16#1D, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] -> + case dgiot_data:get({meter, ChannelId}) of + {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), + Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, + Di = <<16#1D, 16#00, 16#01, 16#01>>, + DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue)); + _ -> pass + end; + % 拉闸,合闸成功 + [#{<<"command">> := 16#9C, <<"addr">> := Addr} | _] -> + case dgiot_data:get({meter, ChannelId}) of + {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), + Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, + Di = <<16#FE, 16#FE, 16#FE, 16#FE>>, + DValue = #{dgiot_utils:to_hex(Di) => 0}, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue)); _ -> pass end; % 拉闸,合闸失败 - [#{<<"command">>:=16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] -> + [#{<<"command">> := 16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), - Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, - Di = <<16#FE,16#FE,16#FE,16#FD>>, - DValue = #{dgiot_utils:to_hex(Di)=>dgiot_utils:to_hex(Value)}, - ?LOG(info, "GGM 251 jsx:encode(Value) ~p", [jsx:encode(DValue)]), - dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); + Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, + Di = <<16#FE, 16#FE, 16#FE, 16#FD>>, + DValue = #{dgiot_utils:to_hex(Di) => dgiot_utils:to_hex(Value)}, + DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr), + dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue)); _ -> pass end; _ -> pass end, {noreply, TCPState#tcp{buff = Rest}}; - _ -> - {noreply, TCPState#tcp{buff = <<0,0,0,0,0,0,0,0>>}} + _ -> + {noreply, TCPState#tcp{buff = <<0, 0, 0, 0, 0, 0, 0, 0>>}} end; -% %% 接收抄表任务的ACK报文DLT376 -% handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = ?DLT376,step = read_meter}} = TCPState) -> -% dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]), -% ?LOG(info, "GGM 140 Buff========DLT376========= ~p", [dgiot_utils:binary_to_hex(Buff)]), -% {Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []), -% ?LOG(info, "GGM 246 Frames ~p", [Frames]), -% case Frames of -% % 返回抄表数据 -% [#{<<"di">> :=<<16#01,16#01,16#01,16#10>>,<<"addr">> := Addr, <<"value">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/post">>, % 发送给mqtt进行数据存储 -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); -% _ -> pass -% end; -% % 返回读取上次合闸时间 -% [#{<<"di">> :=<<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr, <<"value">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, -% ?LOG(info, "GGM 246 jsx:encode(Value) ~p", [jsx:encode(Value)]), -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); -% _ -> pass -% end; -% % 返回读取上次拉闸时间 -% [#{<<"di">> :=<<16#1D,16#00,16#01,16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, -% ?LOG(info, "GGM 245 jsx:encode(Value) ~p", [jsx:encode(Value)]), -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); -% _ -> pass -% end; -% % 拉闸,合闸成功 -% [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); -% _ -> pass -% end; -% % 拉闸,合闸失败 -% [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>, -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); -% _ -> pass -% end; -% _ -> pass -% end, -% {noreply, TCPState#tcp{buff = Rest}}; - - - -% %% 接收抄表任务的ACK报文DLT645 -% handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = ?DLT645,step = read_meter}} = TCPState) -> -% dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]), -% ?LOG(info, "GGM 222 Meter Rev Buff========DLT645========= ~p~n~n~n", [dgiot_utils:binary_to_hex(Buff)]), -% {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []), -% % ?LOG(info, "GGM 245 Frames ~p", [Frames]), -% case Frames of -% % 抄表数据返回 -% [#{<<"command">> := 16#91,<<"di">>:=<<16#00,16#00,16#00,16#00>>,<<"addr">> := Addr, <<"value">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>, -% % ?LOG(info, "GGM 247 jsx:encode(Value) ~p", [jsx:encode(Value)]), -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value)); -% _ -> pass -% end; -% % 查询上一次合闸时间返回 -% [#{<<"command">>:=16#91,<<"di">> := <<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, -% Di = <<16#1E,16#00,16#01,16#01>>, -% DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, -% ?LOG(info, "GGM 248 jsx:encode(Value) ~p", [jsx:encode(DValue)]), -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); -% _ -> pass -% end; -% % 查询上一次拉闸时间返回 -% [#{<<"command">>:=16#91,<<"di">> := <<16#1D,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, -% Di = <<16#1D,16#00,16#01,16#01>>, -% DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, -% ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]), -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); -% _ -> pass -% end; -% % 拉闸,合闸成功 -% [#{<<"command">>:=16#9C, <<"addr">> := Addr} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, -% Di = <<16#FE,16#FE,16#FE,16#FE>>, -% DValue = #{dgiot_utils:to_hex(Di)=>0 }, -% ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]), -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); -% _ -> pass -% end; -% % 拉闸,合闸失败 -% [#{<<"command">>:=16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] -> -% case dgiot_data:get({meter, ChannelId}) of -% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), -% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, -% Di = <<16#FE,16#FE,16#FE,16#FD>>, -% DValue = #{dgiot_utils:to_hex(Di)=>dgiot_utils:to_hex(Value)}, -% ?LOG(info, "GGM 250 jsx:encode(Value) ~p", [jsx:encode(DValue)]), -% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue)); -% _ -> pass -% end; -% _ -> pass -% end, -% {noreply, TCPState#tcp{buff = Rest}}; - %% 异常报文丢弃 %% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop handle_info(_Info, TCPState) -> - ?LOG(info, "GGM 999 Meter Rev Buff========ERROR========= dgiot_meter_tcp, handle_info9, ~p,~p~n~n~n ",[_Info,TCPState]), + ?LOG(info, "GGM 999 Meter Rev Buff========ERROR========= dgiot_meter_tcp, handle_info9, ~p,~p~n~n~n ", [_Info, TCPState]), {noreply, TCPState}. handle_call(_Msg, _From, TCPState) -> @@ -477,7 +350,7 @@ handle_cast(_Msg, TCPState) -> {noreply, TCPState}. terminate(_Reason, _TCPState) -> - dgiot_metrics:dec(dgiot_meter, <<"dtu_login">>, 1), + dgiot_metrics:dec(dgiot_meter, <<"dtu_online">>, 1), ok. code_change(_OldVsn, TCPState, _Extra) -> diff --git a/apps/dgiot_tdengine/src/handler/dgiot_tdengine_handler.erl b/apps/dgiot_tdengine/src/handler/dgiot_tdengine_handler.erl index 51d03415..bdf0e4df 100644 --- a/apps/dgiot_tdengine/src/handler/dgiot_tdengine_handler.erl +++ b/apps/dgiot_tdengine/src/handler/dgiot_tdengine_handler.erl @@ -374,7 +374,7 @@ get_app(ProductId, Results, DeviceId) -> end, FormattedAddress; _ -> - <<"[", Longitude/binary, ",", Latitude/binary, "]经纬度解析错误"/utf8>> + <<"[", BinV/binary, "]经纬度解析错误"/utf8>> end; _ -> <<"无GPS信息"/utf8>> diff --git a/apps/dgiot_topo/src/dgiot_topo.erl b/apps/dgiot_topo/src/dgiot_topo.erl index 98ad5b54..790fd432 100644 --- a/apps/dgiot_topo/src/dgiot_topo.erl +++ b/apps/dgiot_topo/src/dgiot_topo.erl @@ -52,7 +52,7 @@ get_topo(Arg, _Context) -> end; _ -> DeviceId = dgiot_parse:get_deviceid(ProductId, Devaddr), - case dgiot_tdengine:get_device(<<"09d0bbcf44">>, <<"9CA525B343F0">>, #{<<"keys">> => <<"last_row(*)">>, <<"limit">> => 1}) of + case dgiot_tdengine:get_device(ProductId, Devaddr, #{<<"keys">> => <<"last_row(*)">>, <<"limit">> => 1}) of {ok, #{<<"results">> := [Result | _]}} -> put({self(), td}, Result); _ -> @@ -331,7 +331,7 @@ get_gpsaddr(V) -> #{<<"baiduaddr">> := #{<<"formatted_address">> := FormattedAddress}} -> FormattedAddress; _ -> - <<"[", Longitude/binary, ",", Latitude/binary, "]经纬度解析错误"/utf8>> + <<"[", BinV/binary, "]经纬度解析错误"/utf8>> end; _ -> <<"无GPS信息"/utf8>>