fix: task and meter

This commit is contained in:
jhonliu 2022-05-20 10:29:01 +08:00
parent 8a2108f71a
commit 2a36b88007
4 changed files with 23 additions and 70 deletions

View File

@ -122,8 +122,8 @@ create_meter4G(MeterAddr, MDa, ChannelId, DTUIP, DtuId, DtuAddr) ->
},
dgiot_device:create_device(Requests),
DeviceId = dgiot_parse_id:get_deviceid(ProductId, MeterAddr),
dgiot_data:insert({metetda, DeviceId}, {dgiot_utils:to_binary(MDa), DtuAddr}),
Topic = <<"$dg/device/", ProductId/binary, "/", MeterAddr/binary, "/profile">>,
dgiot_data:insert({metertda, DeviceId}, {dgiot_utils:to_binary(MDa), DtuAddr}),
Topic = <<"$dg/device/", ProductId/binary, "/", MeterAddr/binary, "/properties/report">>,
dgiot_mqtt:subscribe(Topic),
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
dgiot_task:save_pnque(DtuProductId, DtuAddr, ProductId, MeterAddr);

View File

@ -75,7 +75,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
case Protocol of
?DLT376 ->
{ProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
Topic = <<"thing/", ProductId/binary, "/", DtuAddr/binary>>,
Topic = <<"$dg/device/", ProductId/binary, "/", DtuAddr/binary, "/properties/report">>,
dgiot_mqtt:subscribe(Topic), %mqtt
%% $dg/device/{productId}/{deviceAddr}/profile
Topic2 = <<"$dg/device/", ProductId/binary, "/", DtuAddr/binary, "/profile">>,
@ -212,20 +212,20 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
case jsx:is_json(Payload) of
true ->
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
[<<"thing">>, ProductId, DevAddr] ->
[<<"$dg">>, <<"device">>, ProductId, DevAddr, <<"properties">>, <<"report">>] ->
case Protocol of
?DLT376 ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
case dgiot_data:get({metetda, DeviceId}) of
case dgiot_data:get({metertda, DeviceId}) of
not_find ->
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(ThingData),
DataSource = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(#{<<"devaddr">> => DevAddr, <<"dataSource">> => DataSource}),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, " ~s ~p DLT376 send to DevAddr ~p => ~p", [?FILE, ?LINE, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
dgiot_tcp_server:send(TCPState, Payload1);
{Da, Dtuaddr} ->
DA = dgiot_utils:binary_to_hex(dlt376_decoder:pn_to_da(dgiot_utils:to_int(Da))),
[#{<<"thingdata">> := #{<<"dataSource">> := DataSource} = ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(ThingData#{<<"devaddr">> => Dtuaddr, <<"dataSource">> => DataSource#{<<"da">> => DA}}),
DataSource = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(#{<<"protocol">> => Protocol, <<"devaddr">> => Dtuaddr, <<"dataSource">> => DataSource#{<<"da">> => DA}}),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, " ~s ~p DLT376 send to DevAddr ~p => ~p", [?FILE, ?LINE, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
dgiot_tcp_server:send(TCPState, Payload1)
end;
@ -249,45 +249,6 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
dgiot_tcp_server:send(TCPState, Payload1)
end,
{noreply, TCPState#tcp{state = State#state{env = #{product => ProductId, devaddr => DevAddr}}}};
[<<"thingctrl">>, _ProductId, _DevAddr] ->
#tcp{state = #state{protocol = Protocol}} = TCPState,
case Protocol of
?DLT376 ->
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
case ThingData of
%
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
%
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
%
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
_ ->
pass
end;
?DLT645 ->
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
case ThingData of
%
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
% ?LOG(info, "GGM 212 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
%
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
_ ->
pass
end
end,
{noreply, TCPState};
_ ->
{noreply, TCPState}
end;

View File

@ -668,7 +668,7 @@ send_childvalue(DeviceId, ChildValue) ->
case dgiot_parse:query_object(<<"Device">>, #{<<"where">> => #{<<"parentId">> => DeviceId}}) of
{ok, #{<<"results">> := ChildDevices}} ->
lists:foldl(fun(#{<<"objectId">> := ChildId, <<"devaddr">> := Devaddr, <<"product">> := #{<<"objectId">> := ProductId}}, _Acc) ->
case dgiot_data:get({metetda, ChildId}) of
case dgiot_data:get({metertda, ChildId}) of
not_find ->
pass;
{Da, _Dtuaddr} ->
@ -677,7 +677,7 @@ send_childvalue(DeviceId, ChildValue) ->
error ->
pass;
{ok, Value} ->
Topic = <<"thing/", ProductId/binary, "/", Devaddr/binary, "/post">>, % mqtt进行数据存储
Topic = <<"$dg/thing/", ProductId/binary, "/", Devaddr/binary, "/post">>, % mqtt进行数据存储
DeviceId1 = dgiot_parse_id:get_deviceid(ProductId, Devaddr),
dgiot_mqtt:publish(DeviceId1, Topic, jsx:encode(Value)),
timer:sleep(1 * 1000)

View File

@ -23,7 +23,7 @@
%% gen_server callbacks
-export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(task, {mode = thing, tid, app, firstid, dtuid, product, devaddr, dis = [], que, round, ref, ack = #{}, appdata = #{}, ts = 0, freq = 0, interval = 5}).
-record(task, {mode = thing, tid, firstid, dtuid, product, devaddr, dis = [], que, round, ref, ack = #{}, appdata = #{}, ts = 0, freq = 0, interval = 5}).
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
%%%===================================================================
@ -35,7 +35,7 @@ start_link(State) ->
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([#{<<"app">> := App, <<"channel">> := ChannelId, <<"client">> := DtuId, <<"mode">> := Mode, <<"freq">> := Freq} = State]) ->
init([#{<<"channel">> := ChannelId, <<"client">> := DtuId, <<"mode">> := Mode, <<"freq">> := Freq} = State]) ->
case dgiot_task:get_pnque(DtuId) of
not_find ->
io:format("~s ~p State ~p ~n", [?FILE, ?LINE, State]),
@ -48,11 +48,10 @@ init([#{<<"app">> := App, <<"channel">> := ChannelId, <<"client">> := DtuId, <<"
erlang:send_after(1000, self(), init),
Topic = <<"$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/#">>,
dgiot_mqtt:subscribe(Topic),
AppData = maps:get(<<"appdata">>, State, #{}),
dgiot_metrics:inc(dgiot_task, <<"task">>, 1),
dgiot_client:save(ChannelId, DtuId),
{ok, #task{mode = dgiot_utils:to_atom(Mode), app = App, dtuid = DtuId, product = ProductId, devaddr = DevAddr,
tid = ChannelId, firstid = DeviceId, que = Que, round = 1, appdata = AppData, ts = Nowstamp, freq = Freq}}
{ok, #task{mode = dgiot_utils:to_atom(Mode), dtuid = DtuId, product = ProductId, devaddr = DevAddr,
tid = ChannelId, firstid = DeviceId, que = Que, round = 1, ts = Nowstamp, freq = Freq}}
end;
init(A) ->
@ -124,7 +123,8 @@ handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = _Produc
dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1),
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
[<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] ->
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p ~ts: ~ts ", [?FILE, ?LINE, unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p ~ts: ~ts ",
[?FILE, ?LINE, unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
NewPayload =
maps:fold(fun(K, V, Acc) ->
case dgiot_data:get({protocol, K, ProductId}) of
@ -160,24 +160,17 @@ send_msg(#task{ref = Ref, que = Que} = State) when length(Que) == 0 ->
end,
get_next_pn(State);
send_msg(#task{tid = Channel, product = Product, devaddr = DevAddr, ref = Ref, que = Que, appdata = AppData} = State) ->
send_msg(#task{tid = Channel, product = Product, devaddr = DevAddr, ref = Ref, que = Que} = State) ->
{InstructOrder, Interval, _, _, _, Protocol, _, _} = lists:nth(1, Que),
{NewCount, Payload, Dis} =
lists:foldl(fun(X, {Count, Acc, Acc1}) ->
case X of
{InstructOrder, _, _, _, error, _, _, _} ->
{Count + 1, Acc, Acc1};
{InstructOrder, _, Identifier1, AccessMode, NewData, Protocol, DataSource, _} ->
Payload1 = #{
<<"appdata">> => AppData,
<<"thingdata">> => #{
<<"product">> => Product,
<<"devaddr">> => DevAddr,
<<"command">> => AccessMode,
<<"protocol">> => Protocol,
<<"dataSource">> => DataSource#{<<"data">> => NewData}
}
},
{InstructOrder, _, Identifier1, _AccessMode, NewData, Protocol, DataSource, _} ->
Payload1 = DataSource#{<<"data">> => NewData},
Topic = <<"$dg/device/", Product/binary, "/", DevAddr/binary, "/properties/report">>,
dgiot_mqtt:publish(Channel, Topic, jsx:encode(Payload1)),
{Count + 1, Acc ++ [Payload1], Acc1 ++ [Identifier1]};
_ ->
{Count, Acc, Acc1}
@ -186,7 +179,6 @@ send_msg(#task{tid = Channel, product = Product, devaddr = DevAddr, ref = Ref, q
Newpayload = jsx:encode(Payload),
Topic = <<"$dg/device/", Product/binary, "/", DevAddr/binary, "/properties/report">>,
dgiot_bridge:send_log(Channel, Product, DevAddr, "to_dev=> ~s ~p ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(Newpayload)]),
dgiot_mqtt:publish(Channel, Topic, Newpayload),
%%
case Ref of
undefined ->
@ -220,7 +212,7 @@ get_next_pn(#task{tid = _Channel, mode = Mode, dtuid = DtuId, firstid = DeviceId
end,
State#task{product = NextProductId, devaddr = NextDevAddr, que = Que, dis = [], ack = #{}, ref = NewRef}.
save_td(#task{app = _App, tid = Channel, product = ProductId, devaddr = DevAddr, ack = Ack, appdata = AppData}) ->
save_td(#task{tid = Channel, product = ProductId, devaddr = DevAddr, ack = Ack, appdata = AppData}) ->
Data = dgiot_task:save_td(ProductId, DevAddr, Ack, AppData),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "save_td=> ~s ~p ProductId ~p DevAddr ~p : ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(Data))]).