diff --git a/CHANGELOG.md b/CHANGELOG.md
index a2600f50..635023e7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,9 @@
-# [](https://github.com/dgiot/dgiot/compare/v4.3.4...v) (2021-10-07)
+# [](https://github.com/dgiot/dgiot/compare/v4.3.4...v) (2021-10-08)
### Bug Fixes
+* add device log ([f3b173c](https://github.com/dgiot/dgiot/commit/f3b173c3f146e1ddf21038b5026dfd08ae93c95f))
* channel_log ([1e37058](https://github.com/dgiot/dgiot/commit/1e3705876eb905cab215085e7341e050ce822170))
* task_worker bug ([0396163](https://github.com/dgiot/dgiot/commit/039616312126d24e36b03cf339e0e6ffb6f3aa13))
diff --git a/apps/dgiot_meter/include/dgiot_meter.hrl b/apps/dgiot_meter/include/dgiot_meter.hrl
index ca404aeb..97b777b9 100644
--- a/apps/dgiot_meter/include/dgiot_meter.hrl
+++ b/apps/dgiot_meter/include/dgiot_meter.hrl
@@ -14,8 +14,20 @@
%% limitations under the License.
%%--------------------------------------------------------------------
+% 使用说明:
+% --------------------------------------------------------------------
+% 在 dgiot_task文件 save_pnque(DtuProductId, DtuAddr, ProductId, DevAddr) 函数,用于注册电表的远程控制的topic
+% timer:sleep(500),
+% TopicCtrl = <<"thingctrl/", ProductId/binary, "/", DevAddr/binary>>,
+% dgiot_mqtt:subscribe(TopicCtrl),
+% timer:sleep(500),
+% TopicStatus = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+% dgiot_mqtt:subscribe(TopicStatus),
+% --------------------------------------------------------------------
+
-define(METER, <<"METER">>).
-define(DLT645, <<"DLT645">>).
+-define(DLT376, <<"DLT376">>).
-record(state, {
id,
@@ -23,11 +35,20 @@
dtuaddr = <<>>,
step = login,
ref = undefined,
- search = <<"quick">>
+ search = <<"quick">>,
+ protocol = dlt645
}).
%% Internal Header File
+%% @doc dlt376 COMMAND.
+-define(DLT376_MS_READ_DATA, 16#5B).
+-define(DLT376_MS_READ_DATA_AFN, 16#0C).
+-define(DLT376_MS_CONVERT_SEND_AFN, 16#10). %透明转发
+-define(DLT376_MS_CTRL_DEV, 16#5A).
+-define(DLT376_MS_CTRL_DEV_AFN, 16#05).
+
+
%% @doc dlt645 COMMAND.
-define(DLT645_MS_BROADCAST_DATA, 16#08).
-define(DLT645_MS_READ_DATA, 16#11).
diff --git a/apps/dgiot_meter/priv/swagger/swagger_meter.json b/apps/dgiot_meter/priv/swagger/swagger_meter.json
index 5e5898a6..6a6bfcf0 100644
--- a/apps/dgiot_meter/priv/swagger/swagger_meter.json
+++ b/apps/dgiot_meter/priv/swagger/swagger_meter.json
@@ -1,11 +1,126 @@
{
- "definitions": {},
- "paths": {
- },
"tags": [
{
- "description": "电力抄表",
- "name": "METER"
+ "name": "METER",
+ "description": "电力抄表"
}
- ]
+ ],
+ "definitions": {
+ },
+ "paths": {
+ "/meter_ctrl": {
+ "get": {
+ "security": [],
+ "summary": "对远程电表进行关闸/合闸(重要:接口调用后间隔15秒以上,避免对同一终端设备频繁操作)",
+ "description": "对远程电表进行关闸/合闸",
+ "parameters": [
+ {
+ "in": "query",
+ "name": "pid",
+ "required": false,
+ "description": "产品ID",
+ "type": "string",
+ "default": "3ea14b556a"
+ },
+ {
+ "in": "query",
+ "name": "devaddr",
+ "description": "设备地址",
+ "required": true,
+ "type": "string",
+ "default": "000003146332"
+ },
+ {
+ "in": "query",
+ "name": "ctrlflag",
+ "description": "=ture合闸, =false 开闸",
+ "required": true,
+ "type": "boolean",
+ "default": true
+ },
+ {
+ "in": "query",
+ "name": "devpass",
+ "description": "终端设备密码",
+ "required": true,
+ "type": "string",
+ "default": "000000"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Returns operation status"
+ },
+ "400": {
+ "description": "Bad Request"
+ },
+ "401": {
+ "description": "Unauthorized"
+ },
+ "403": {
+ "description": "Forbidden"
+ },
+ "500": {
+ "description": "Server Internal error"
+ }
+ },
+ "tags": [
+ "METER"
+ ]
+ }
+ },
+ "/meter_ctrl_status": {
+ "get": {
+ "security": [],
+ "summary": "查询拉闸/合闸的状态(重要:接口调用后间隔15秒以上,避免对同一终端设备频繁操作)",
+ "description": "返回上次拉闸/合闸的时间,连接mqtt服务(连接信息:ws://121.40.78.136:8083/mqtt)
订阅topic:thing/productid/devid/status(如 thing/3ea14b556a/000003146332/status) 获取结果
订阅mqtt主题之后,收到格式 :
{\"1D000101\":1632549207}(上次电表跳闸时间)
{\"1E000101\":1632549176}(上次电表合闸时间)
{\"FEFEFEFE\":0} 合闸、跳闸成功",
+ "parameters": [
+ {
+ "in": "query",
+ "name": "pid",
+ "required": false,
+ "description": "产品ID",
+ "type": "string",
+ "default": "3ea14b556a"
+ },
+ {
+ "in": "query",
+ "name": "ctrlflag",
+ "description": "=ture合闸, =false 开闸",
+ "required": true,
+ "type": "boolean",
+ "default": true
+ },
+ {
+ "in": "query",
+ "name": "devaddr",
+ "description": "设备地址",
+ "required": true,
+ "type": "string",
+ "default": "000003146332"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Returns operation status"
+ },
+ "400": {
+ "description": "Bad Request"
+ },
+ "401": {
+ "description": "Unauthorized"
+ },
+ "403": {
+ "description": "Forbidden"
+ },
+ "500": {
+ "description": "Server Internal error"
+ }
+ },
+ "tags": [
+ "METER"
+ ]
+ }
+ }
+ }
}
diff --git a/apps/dgiot_meter/src/dgiot_meter.erl b/apps/dgiot_meter/src/dgiot_meter.erl
index f6b0f923..0d361ed3 100644
--- a/apps/dgiot_meter/src/dgiot_meter.erl
+++ b/apps/dgiot_meter/src/dgiot_meter.erl
@@ -24,6 +24,7 @@
create_dtu/3,
create_dtu/4,
create_meter/4,
+ create_meter4G/3,
get_sub_device/1
]).
@@ -83,6 +84,22 @@ create_meter(MeterAddr, ChannelId, DTUIP, DtuAddr) ->
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
dgiot_task:save_pnque(DtuProductId, DtuAddr, ProductId, MeterAddr).
+create_meter4G(MeterAddr, ChannelId, DTUIP) ->
+ {ProductId, ACL, _Properties} = dgiot_data:get({meter, ChannelId}),
+ Requests = #{
+ <<"devaddr">> => MeterAddr,
+ <<"name">> => <<"Meter_", MeterAddr/binary>>,
+ <<"ip">> => DTUIP,
+ <<"isEnable">> => true,
+ <<"product">> => ProductId,
+ <<"ACL">> => ACL,
+ <<"status">> => <<"ONLINE">>,
+ <<"brand">> => <<"Meter", MeterAddr/binary>>,
+ <<"devModel">> => <<"Meter">>
+ },
+ dgiot_device:create_device(Requests),
+ dgiot_task:save_pnque(ProductId, MeterAddr, ProductId, MeterAddr).
+
get_sub_device(DtuAddr) ->
Query = #{<<"keys">> => [<<"devaddr">>, <<"product">>],
<<"where">> => #{<<"route.", DtuAddr/binary>> => #{<<"$regex">> => <<".+">>}},
@@ -93,12 +110,37 @@ get_sub_device(DtuAddr) ->
_ -> []
end.
-parse_frame(dlt645, Buff, Opts) ->
+parse_frame(?DLT645, Buff, Opts) ->
{Rest, Frames} = dlt645_decoder:parse_frame(Buff, Opts),
+ {Rest, lists:foldl(fun(X, Acc) ->
+ Acc ++ [maps:without([<<"diff">>, <<"send_di">>], X)]
+ end, [], Frames)};
+
+parse_frame(?DLT376, Buff, Opts) ->
+ {Rest, Frames} = dlt376_decoder:parse_frame(Buff, Opts),
+ % ?LOG(warning,"GGM 170 dgiot_meter parse_frame:~p", [Frames]),
{Rest, lists:foldl(fun(X, Acc) ->
Acc ++ [maps:without([<<"diff">>, <<"send_di">>], X)]
end, [], Frames)}.
+% DLT376发送抄数指令
+to_frame(#{
+ <<"devaddr">> := Addr,
+ <<"di">> := Di,
+ <<"command">> := <<"r">>,
+ <<"protocol">> := ?DLT376,
+ <<"data">> := <<"null">>
+} = Frame) ->
+ dlt376_decoder:to_frame(Frame#{
+ <<"msgtype">> => ?DLT376,
+ <<"addr">> => dlt376_proctol:decode_of_addr(dgiot_utils:hex_to_binary(Addr)),
+ <<"data">> => <<>>,
+ <<"di">> => Di,
+ <<"command">> => ?DLT376_MS_READ_DATA,
+ <<"afn">> => ?DLT376_MS_READ_DATA_AFN
+ });
+
+% DLT645 组装电表抄表指令
to_frame(#{
<<"devaddr">> := Addr,
<<"di">> := Di,
@@ -114,6 +156,127 @@ to_frame(#{
<<"command">> => ?DLT645_MS_READ_DATA
});
+
+% DLT645 组装电表控制指令
+to_frame(#{
+ <<"devaddr">> := Addr,
+ <<"ctrlflag">> := CtrlFlag,
+ <<"protocol">> := ?DLT645,
+ <<"devpass">> := DevPass,
+ <<"apiname">> := get_meter_ctrl
+} = Frame) ->
+ case CtrlFlag of
+ true ->
+ PassGrade = <<"02">>,
+ Di = <<(dgiot_utils:hex_to_binary(DevPass))/binary,(dgiot_utils:hex_to_binary(PassGrade))/binary>>,
+ Data= <<"111111111C00010101010133">>,
+ dlt645_decoder:to_frame(Frame#{
+ <<"msgtype">> => ?DLT645,
+ <<"addr">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Addr)),
+ <<"di">> => dlt645_proctol:reverse(Di),
+ <<"data">> => dgiot_utils:hex_to_binary(Data),
+ <<"command">> => ?DLT645_MS_FORCE_EVENT
+ });
+ false ->
+ PassGrade = <<"02">>,
+ Di = <<(dgiot_utils:hex_to_binary(DevPass))/binary,(dgiot_utils:hex_to_binary(PassGrade))/binary>>,
+ Data = <<"111111111A00010101010133">>,
+ dlt645_decoder:to_frame(Frame#{
+ <<"msgtype">> => ?DLT645,
+ <<"addr">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Addr)),
+ <<"data">> => dgiot_utils:hex_to_binary(Data),
+ <<"di">> => dlt645_proctol:reverse(Di),
+ <<"command">> => ?DLT645_MS_FORCE_EVENT
+ })
+ end;
+
+% DLT376 远程电表控制(透明转发)
+to_frame(#{
+ <<"devaddr">> := DevAddr,
+ <<"ctrlflag">> := CtrlFlag,
+ <<"devpass">> := DevPass,
+ <<"protocol">> := ?DLT376,
+ <<"apiname">> := get_meter_ctrl
+} = Frame) ->
+ Di = <<"00000100">>,
+ Data= <<16#02,16#6B,16#64,16#64,16#1C,16#00>>,
+ Data2 = <<16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00>>,
+ CtrlPayload = to_frame(Frame#{
+ <<"protocol">> => ?DLT645,
+ <<"devaddr">> => DevAddr,
+ <<"ctrlflag">> => CtrlFlag,
+ <<"devpass">> => DevPass,
+ <<"apiname">> => get_meter_ctrl
+ }),
+ DataNew = <>,
+ % ?LOG(info, "GGM 230 to_frame, DataNew ~p~n~n~n",[dgiot_utils:binary_to_hex(DataNew)]),
+ RetPlayload = dlt376_decoder:to_frame(Frame#{
+ <<"msgtype">> => ?DLT376,
+ <<"addr">> => dlt376_proctol:decode_of_addr(dgiot_utils:hex_to_binary(DevAddr)),
+ <<"data">> => dgiot_utils:binary_to_hex(DataNew),
+ <<"di">> => Di,
+ <<"command">> => ?DLT376_MS_READ_DATA,
+ <<"afn">> => ?DLT376_MS_CONVERT_SEND_AFN
+ }),
+ % ?LOG(info, "GGM 231 to_frame, Payload1 ~p~n~n~n",[dgiot_utils:binary_to_hex(RetPlayload)]),
+ RetPlayload;
+
+% DLT645 组装电表获取上次拉闸合闸的时间
+to_frame(#{
+ <<"devaddr">> := DevAddr,
+ <<"ctrlflag">> := CtrlFlag,
+ <<"protocol">> := ?DLT645,
+ <<"apiname">> := get_meter_ctrl_status
+} = Frame) ->
+ case CtrlFlag of
+ true ->
+ Di = <<"1E000101">>,
+ dlt645_decoder:to_frame(Frame#{
+ <<"msgtype">> => ?DLT645,
+ <<"addr">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(DevAddr)),
+ <<"di">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Di)),
+ <<"data">> => <<>>,
+ <<"command">> => ?DLT645_MS_READ_DATA
+ });
+ false ->
+ Di = <<"1D000101">>,
+ dlt645_decoder:to_frame(Frame#{
+ <<"msgtype">> => ?DLT645,
+ <<"addr">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(DevAddr)),
+ <<"di">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Di)),
+ <<"data">> => <<>>,
+ <<"command">> => ?DLT645_MS_READ_DATA
+ })
+ end;
+
+% DLT376 组装电表获取上次拉闸合闸的时间(透明转发)
+to_frame(#{
+ <<"devaddr">> := DevAddr,
+ <<"ctrlflag">> := CtrlFlag,
+ <<"protocol">> := ?DLT376,
+ <<"apiname">> := get_meter_ctrl_status
+} = Frame) ->
+ Di = <<"00000100">>,
+ Data= <<16#02,16#6B,16#64,16#64,16#10,16#00>>,
+ Data2 = <<16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00,16#00>>,
+ CtrlPayload = to_frame(Frame#{
+ <<"devaddr">> := DevAddr,
+ <<"ctrlflag">> := CtrlFlag,
+ <<"protocol">> := ?DLT645,
+ <<"apiname">> := get_meter_ctrl_status
+ }),
+ DataNew = <>,
+ % ?LOG(info, "GGM 260 to_frame, DataNew ~p,~n~n~n",[dgiot_utils:binary_to_hex(CtrlPayload)]),
+ RetPlayload = dlt376_decoder:to_frame(Frame#{
+ <<"msgtype">> => ?DLT376,
+ <<"addr">> => dlt376_proctol:decode_of_addr(dgiot_utils:hex_to_binary(DevAddr)),
+ <<"data">> => dgiot_utils:binary_to_hex(DataNew),
+ <<"di">> => Di,
+ <<"command">> => ?DLT376_MS_READ_DATA,
+ <<"afn">> => ?DLT376_MS_CONVERT_SEND_AFN
+ }),
+ RetPlayload;
+
to_frame(#{
<<"devaddr">> := Addr,
<<"di">> := Di,
diff --git a/apps/dgiot_meter/src/dgiot_meter_handler.erl b/apps/dgiot_meter/src/dgiot_meter_handler.erl
index fcc24010..36ff406c 100644
--- a/apps/dgiot_meter/src/dgiot_meter_handler.erl
+++ b/apps/dgiot_meter/src/dgiot_meter_handler.erl
@@ -17,7 +17,8 @@
-author("johnliu").
-behavior(dgiot_rest).
-include_lib("dgiot/include/logger.hrl").
-
+-include("dgiot_meter.hrl").
+-dgiot_rest(all).
%% API
-export([swagger_meter/0]).
-export([handle/4, check_auth/3]).
@@ -57,16 +58,16 @@ handle(OperationID, Args, Context, Req) ->
end,
{500, Headers, #{<<"error">> => Err}};
ok ->
- ?LOG(debug,"do request: ~p, ~p ->ok ~n", [OperationID, Args]),
+ ?LOG(debug, "do request: ~p, ~p ->ok ~n", [OperationID, Args]),
{200, Headers, #{}, Req};
{ok, Res} ->
- ?LOG(debug,"do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
+ ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{200, Headers, Res, Req};
{Status, Res} ->
- ?LOG(debug,"do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
+ ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{Status, Headers, Res, Req};
{Status, NewHeaders, Res} ->
- ?LOG(debug,"do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
+ ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{Status, maps:merge(Headers, NewHeaders), Res, Req}
end.
@@ -75,14 +76,58 @@ handle(OperationID, Args, Context, Req) ->
%%% 内部函数 Version:API版本
%%%===================================================================
+
+%% TDengine 概要: 获取当前产品下的所有设备数据 描述:获取当前产品下的所有设备数据
+%% OperationId:get_td_cid_pid
+%% 请求:GET /iotapi/td/prodcut/:productId
+do_request(get_meter_ctrl, #{
+ <<"pid">> := ProductId,
+ <<"devaddr">> := DevAddr,
+ <<"ctrlflag">> := CtrlFlag,
+ <<"devpass">> := DevPass
+}, _Context, Req0) ->
+ get_meter_ctrl(Req0, ProductId, DevAddr, CtrlFlag, DevPass);
+
+do_request(get_meter_ctrl_status, #{
+ <<"pid">> := ProductId,
+ <<"ctrlflag">> := CtrlFlag,
+ <<"devaddr">> := DevAddr
+}, _Context, _Req) ->
+ TopicCtrl = <<"thingctrl/", ProductId/binary, "/", DevAddr/binary>>,
+ ThingData = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"apiname">> => get_meter_ctrl_status},
+ Payload = [#{<<"appdata">> => #{}, <<"thingdata">> => ThingData}],
+ dgiot_mqtt:publish(DevAddr, TopicCtrl, jsx:encode(Payload));
+
+
%% 服务器不支持的API接口
do_request(_OperationId, _Args, _Context, _Req) ->
- ?LOG(info,"_OperationId:~p~n", [_OperationId]),
+ ?LOG(info, "_OperationId:~p~n", [_OperationId]),
{error, <<"Not Allowed.">>}.
-
-
-
+get_meter_ctrl(Req0, ProductId, DevAddr, CtrlFlag, DevPass) ->
+ Sendtopic = <<"thingctrl/", ProductId/binary, "/", DevAddr/binary>>,
+ ThingData = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, <<"apiname">> => get_meter_ctrl},
+ Payload = [#{<<"pid">> => self(),<<"appdata">> => #{}, <<"thingdata">> => ThingData}],
+ case dgiot_mqtt:has_routes(Sendtopic) of
+ true ->
+ dgiot_mqtt:publish(DevAddr, Sendtopic, jsx:encode(Payload)),
+ receive
+ {ctrl_meter, Msg} ->
+ Resp = cowboy_req:reply(200, #{<<"content-type">> => <<"application/json">>}, jsx:encode(Msg), Req0),
+ {ok, Resp};
+ {error} ->
+ Resp = cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"CTRL_METER_FAILED">>, Req0),
+ {ok, Resp}
+ after 10000 ->
+ Resp = cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain">>
+ }, <<"CTRL_METER_TIMEOUT">>, Req0),
+ {ok, Resp}
+ end;
+ false ->
+ Resp = cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"METER_OFFLINE">>, Req0),
+ {ok, Resp}
+ end.
diff --git a/apps/dgiot_meter/src/dgiot_meter_tcp.erl b/apps/dgiot_meter/src/dgiot_meter_tcp.erl
index 36007fcc..9e743f33 100644
--- a/apps/dgiot_meter/src/dgiot_meter_tcp.erl
+++ b/apps/dgiot_meter/src/dgiot_meter_tcp.erl
@@ -31,43 +31,65 @@ start(Port, State) ->
%% tcp server start
%% {ok, State} | {stop, Reason}
init(TCPState) ->
+
dgiot_metrics:inc(dgiot_meter, <<"dtu_login">>, 1),
{ok, TCPState}.
%%设备登录报文,登陆成功后,开始搜表
+%
handle_info({tcp, DtuAddr}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = <<>>, search = Search} = State} = TCPState) ->
DTUIP = dgiot_utils:get_ip(Socket),
- HexDtuAddr = dgiot_utils:binary_to_hex(DtuAddr),
- dgiot_meter:create_dtu(HexDtuAddr, ChannelId, DTUIP),
- {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
- Topic = <<"profile/", DtuProductId/binary, "/", DtuAddr/binary>>,
- dgiot_mqtt:subscribe(Topic),
- {NewRef, NewStep} =
- case Search of
- <<"nosearch">> ->
- [dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := Meterproductid, <<"devaddr">> := Meteraddr}
- <- dgiot_meter:get_sub_device(DtuAddr)],
- {undefined, read_meter};
- <<"quick">> ->
- dgiot_meter:search_meter(tcp, undefined, TCPState, 0),
- {undefined, search_meter};
- _ ->
- {Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
- {Ref, Step}
- end,
- {noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = HexDtuAddr, ref = NewRef, step = NewStep}}};
+ ?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录) ~p,,,~p",[DtuAddr,TCPState]),
+ {_,[Acc|_]} = dlt376_decoder:parse_frame(DtuAddr,[]),
+ #{<<"msgtype">> := Protocol} = Acc,
+ case Protocol of
+ ?DLT376 ->
+ #{<<"addr">> := MeterAddr} = Acc,
+ dgiot_meter:create_meter4G(MeterAddr, ChannelId, DTUIP),
+ {ProductId, _,_} = dgiot_data:get({meter, ChannelId}),
+ Topic = <<"thing/", ProductId/binary, "/", MeterAddr/binary>>,
+ dgiot_mqtt:subscribe(Topic), %为这个设备订阅一个mqtt
+ % timer:sleep(500),
+ % TopicCtrl = <<"thingctrl/", ProductId/binary, "/", MeterAddr/binary>>,
+ % dgiot_mqtt:subscribe(TopicCtrl),
+ {NewRef, NewStep} = {undefined, read_meter},
+ {noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = MeterAddr, protocol = ?DLT376,ref = NewRef, step = NewStep}}};
+ ?DLT645 ->
+ % HexDtuAddr = dgiot_utils:binary_to_hex(DtuAddr),
+ ?LOG(info, "GGM 335 dgiot_meter_tcp, handle_info (DLT645创建) ~p,",[DtuAddr]),
+ dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP),
+ {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
+ Topic = <<"profile/", DtuProductId/binary, "/", DtuAddr/binary>>,
+ dgiot_mqtt:subscribe(Topic),
+ {NewRef, NewStep} =
+ case Search of
+ <<"nosearch">> ->
+ [dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := #{<<"objectId">>:=Meterproductid}, <<"devaddr">> := Meteraddr}
+ <- dgiot_meter:get_sub_device(DtuAddr)],
+ {undefined, read_meter};
+ <<"quick">> ->
+ dgiot_meter:search_meter(tcp, undefined, TCPState, 0),
+ {undefined, search_meter};
+ _ ->
+ {Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
+ {Ref, Step}
+ end,
+ {noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT645,ref = NewRef, step = NewStep}}};
+ _ ->
+ ?LOG(info, "GGM 334 dgiot_meter_tcp, handle_info (登录s失败) ~p",[dgiot_utils:binary_to_hex(DtuAddr)]),
+ dgiot_utils:binary_to_hex(DtuAddr)
+ end;
-
-%%定时器触发搜表
-handle_info(search_meter, #tcp{state = #state{ref = Ref} = State} = TCPState) ->
+%%定时器触发搜表
+handle_info(search_meter, #tcp{state = #state{ref = Ref,protocol = ?DLT645} = State} = TCPState) ->
{NewRef, Step, _Payload} = dgiot_meter:search_meter(tcp, Ref, TCPState, 1),
{noreply, TCPState#tcp{buff = <<>>, state = State#state{ref = NewRef, step = Step}}};
%%ACK报文触发搜表
-handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, ref = Ref, step = search_meter, search = Search} = State} = TCPState) ->
+handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, protocol = ?DLT645,ref = Ref, step = search_meter, search = Search} = State} = TCPState) ->
?LOG(info, "from_dev: search_meter Buff ~p", [dgiot_utils:binary_to_hex(Buff)]),
- ?LOG(info, "from_dev: parse_frame Buff ~p", [dgiot_meter:parse_frame(dlt645, Buff, [])]),
- {Rest, Frames} = dgiot_meter:parse_frame(dlt645, Buff, []),
+ ?LOG(info, "from_dev: parse_frame Buff ~p", [dgiot_meter:parse_frame(?DLT645, Buff, [])]),
+ {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
lists:map(fun(X) ->
case X of
#{<<"addr">> := Addr} ->
@@ -92,19 +114,29 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
end
end;
-%%接受抄表任务命令抄表
-handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, step = read_meter}} = TCPState) ->
+%%接受抄表任务命令抄表(下发指令)
+handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId}} = TCPState) ->
Payload = dgiot_mqtt:get_payload(Msg),
dgiot_bridge:send_log(ChannelId, "Topic ~p Msg ~p", [dgiot_mqtt:get_topic(Msg), Payload]),
case jsx:is_json(Payload) of
true ->
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
[<<"thing">>, _ProductId, _DevAddr] ->
- [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
- Payload = dgiot_meter:to_frame(ThingData),
- dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
- ?LOG(info, "task->dev: Payload ~p", [dgiot_utils:binary_to_hex(Payload)]),
- dgiot_tcp_server:send(TCPState, Payload);
+ #tcp{state = #state{protocol = Protocol}} = TCPState,
+ case Protocol of
+ ?DLT376 ->
+ [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
+ Payload1 = dgiot_meter:to_frame(ThingData),
+ dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
+ % ?LOG(info, "GGM 214 task->dev: Payload ~p", [dgiot_utils:binary_to_hex(Payload1)]),
+ dgiot_tcp_server:send(TCPState, Payload1);
+ ?DLT645 ->
+ [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
+ Payload1 = dgiot_meter:to_frame(ThingData),
+ % ?LOG(info, "GGM 216 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,Payload1]),
+ dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
+ dgiot_tcp_server:send(TCPState, Payload1)
+ end;
[<<"profile">>, _ProductId, _DtuAddr] ->
case Payload of
#{<<"_dgiotprotocol">> := <<"hex">>} ->
@@ -115,6 +147,76 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, step = r
_ ->
pass
end;
+ [<<"thingctrl">>, _, _DtuAddr] ->
+ #tcp{state = #state{protocol = Protocol}} = TCPState,
+ ?LOG(info, "GGM 250 dgiot_meter_tcp, thingctrl ~p",[TCPState]),
+ case Protocol of
+ ?DLT376 ->
+ [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
+ % ?LOG(info, "GGM 205 dgiot_meter_tcp, handle_info10 ~p",[ThingData]),
+ case ThingData of
+ % 远程控制电表拉闸、合闸
+ #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"devpass">> := DevPass,<<"apiname">> := <<"get_meter_ctrl">>} ->
+ % ?LOG(info, "GGM 206 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
+ ThingData1 = #{
+ <<"devaddr">> => DevAddr,
+ <<"ctrlflag">> => CtrlFlag,
+ <<"devpass">> => DevPass,
+ <<"protocol">> => Protocol ,
+ <<"apiname">> => get_meter_ctrl
+ },
+ Payload1 = dgiot_meter:to_frame(ThingData1),
+ % diot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
+ dgiot_tcp_server:send(TCPState, Payload1);
+ % 获取上次拉闸、合闸的时间(一次发送两条查询指令)
+ #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"apiname">> := <<"get_meter_ctrl_status">>} ->
+ ?LOG(info, "GGM 235 dgiot_meter_tcp, handle_info9 ~p,~p",[DevAddr,Protocol]),
+ % 上次合闸时间
+ ThingData1 = #{
+ <<"devaddr">> => DevAddr,
+ <<"ctrlflag">> => CtrlFlag,
+ <<"protocol">> => Protocol,
+ <<"apiname">> => get_meter_ctrl_status
+ },
+ Payload1 = dgiot_meter:to_frame(ThingData1),
+ dgiot_tcp_server:send(TCPState, Payload1);
+ _->
+ ?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p",[ThingData]),
+ pass
+ end;
+ ?DLT645 ->
+ [#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
+ ?LOG(info, "GGM 211 dgiot_meter_tcp, handle_info10 ~p",[ThingData]),
+ case ThingData of
+ % 远程控制电表拉闸、合闸
+ #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"devpass">> := DevPass,<<"apiname">> := <<"get_meter_ctrl">>} ->
+ % ?LOG(info, "GGM 212 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
+ ThingData1 = #{
+ <<"devaddr">> => DevAddr,
+ <<"ctrlflag">> => CtrlFlag,
+ <<"devpass">> => DevPass,
+ <<"protocol">> => Protocol,
+ <<"apiname">> => get_meter_ctrl
+ },
+ Payload1 = dgiot_meter:to_frame(ThingData1),
+ ?LOG(info, "GGM 213 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,dgiot_utils:to_hex(Payload1)]),
+ dgiot_tcp_server:send(TCPState, Payload1);
+ % 获取上次拉闸、合闸的时间(一次发送两条查询指令)
+ #{<<"devaddr">> := DevAddr,<<"ctrlflag">> := CtrlFlag,<<"apiname">> := <<"get_meter_ctrl_status">>} ->
+ ?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p,~p",[DevAddr,Protocol]),
+ ThingData1 = #{
+ <<"devaddr">> => DevAddr,
+ <<"ctrlflag">> => CtrlFlag,
+ <<"protocol">> => Protocol,
+ <<"apiname">> => get_meter_ctrl_status
+ },
+ Payload1 = dgiot_meter:to_frame(ThingData1),
+ ?LOG(info, "GGM 237 dgiot_meter_tcp, Payload1 ~p,~p",[ChannelId,dgiot_utils:to_hex(Payload1)]),
+ dgiot_tcp_server:send(TCPState, Payload1);
+ _->
+ pass
+ end
+ end;
_ ->
pass
end;
@@ -122,27 +224,250 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, step = r
end,
{noreply, TCPState};
-%% 接收抄表任务的ACK报文
-handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, step = read_meter}} = TCPState) ->
- dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
- ?LOG(info, "Buff ~p", [dgiot_utils:binary_to_hex(Buff)]),
- {Rest, Frames} = dgiot_meter:parse_frame(dlt645, Buff, []),
- case Frames of
- [#{<<"addr">> := Addr, <<"value">> := Value} | _] ->
- case dgiot_data:get({meter, ChannelId}) of
- {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
- Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
- DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
- dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
+
+handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId,protocol = Protocol,step = Step}} = TCPState) ->
+ ?LOG(info, "GGM 333 Buff========ALL REST========= ~p,~p,~p~n~n~n", [ChannelId,Step,dgiot_utils:binary_to_hex(Buff)]),
+ case Protocol of
+ ?DLT376 ->
+ dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
+ ?LOG(info, "GGM 140 Buff========DLT376========= ~p", [dgiot_utils:binary_to_hex(Buff)]),
+ {Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
+ ?LOG(info, "GGM 246 Frames ~p", [Frames]),
+ case Frames of
+ % 返回抄表数据
+ [#{<<"di">> :=<<16#01,16#01,16#01,16#10>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/post">>, % 发送给mqtt进行数据存储
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+ _ -> pass
+ end;
+ % 返回读取上次合闸时间
+ [#{<<"di">> :=<<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+ ?LOG(info, "GGM 246 jsx:encode(Value) ~p", [jsx:encode(Value)]),
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+ _ -> pass
+ end;
+ % 返回读取上次拉闸时间
+ [#{<<"di">> :=<<16#1D,16#00,16#01,16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+ ?LOG(info, "GGM 245 jsx:encode(Value) ~p", [jsx:encode(Value)]),
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+ _ -> pass
+ end;
+ % 拉闸,合闸成功
+ [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+ _ -> pass
+ end;
+ % 拉闸,合闸失败
+ [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+ _ -> pass
+ end;
_ -> pass
- end;
- _ -> pass
- end,
- {noreply, TCPState#tcp{buff = Rest}};
+ end,
+ {noreply, TCPState#tcp{buff = Rest}};
+ ?DLT645 ->
+ dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
+ ?LOG(info, "GGM 222 Meter Rev Buff========DLT645========= ~p~n~n~n", [dgiot_utils:binary_to_hex(Buff)]),
+ {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
+ % ?LOG(info, "GGM 245 Frames ~p", [Frames]),
+ case Frames of
+ % 抄表数据返回
+ [#{<<"command">> := 16#91,<<"di">>:=<<16#00,16#00,16#00,16#00>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
+ % ?LOG(info, "GGM 247 jsx:encode(Value) ~p", [jsx:encode(Value)]),
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+ _ -> pass
+ end;
+ % 查询上一次合闸时间返回
+ [#{<<"command">>:=16#91,<<"di">> := <<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+ Di = <<16#1E,16#00,16#01,16#01>>,
+ DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
+ ?LOG(info, "GGM 248 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+ _ -> pass
+ end;
+ % 查询上一次拉闸时间返回
+ [#{<<"command">>:=16#91,<<"di">> := <<16#1D,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+ Di = <<16#1D,16#00,16#01,16#01>>,
+ DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
+ ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+ _ -> pass
+ end;
+ % 拉闸,合闸成功
+ [#{<<"command">>:=16#9C, <<"addr">> := Addr} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+ Di = <<16#FE,16#FE,16#FE,16#FE>>,
+ DValue = #{dgiot_utils:to_hex(Di)=>0 },
+ ?LOG(info, "GGM 250 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+ _ -> pass
+ end;
+ % 拉闸,合闸失败
+ [#{<<"command">>:=16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] ->
+ case dgiot_data:get({meter, ChannelId}) of
+ {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+ Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+ Di = <<16#FE,16#FE,16#FE,16#FD>>,
+ DValue = #{dgiot_utils:to_hex(Di)=>dgiot_utils:to_hex(Value)},
+ ?LOG(info, "GGM 251 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+ dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+ _ -> pass
+ end;
+ _ -> pass
+ end,
+ {noreply, TCPState#tcp{buff = Rest}};
+ _ ->
+ {noreply, TCPState#tcp{buff = <<0,0,0,0,0,0,0,0>>}}
+ end;
+
+% %% 接收抄表任务的ACK报文DLT376
+% handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = ?DLT376,step = read_meter}} = TCPState) ->
+% dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
+% ?LOG(info, "GGM 140 Buff========DLT376========= ~p", [dgiot_utils:binary_to_hex(Buff)]),
+% {Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
+% ?LOG(info, "GGM 246 Frames ~p", [Frames]),
+% case Frames of
+% % 返回抄表数据
+% [#{<<"di">> :=<<16#01,16#01,16#01,16#10>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/post">>, % 发送给mqtt进行数据存储
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+% _ -> pass
+% end;
+% % 返回读取上次合闸时间
+% [#{<<"di">> :=<<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+% ?LOG(info, "GGM 246 jsx:encode(Value) ~p", [jsx:encode(Value)]),
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+% _ -> pass
+% end;
+% % 返回读取上次拉闸时间
+% [#{<<"di">> :=<<16#1D,16#00,16#01,16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+% ?LOG(info, "GGM 245 jsx:encode(Value) ~p", [jsx:encode(Value)]),
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+% _ -> pass
+% end;
+% % 拉闸,合闸成功
+% [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+% _ -> pass
+% end;
+% % 拉闸,合闸失败
+% [#{<<"di">> :=<<16#FE,16#FE,16#FE,16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+% _ -> pass
+% end;
+% _ -> pass
+% end,
+% {noreply, TCPState#tcp{buff = Rest}};
+
+
+
+% %% 接收抄表任务的ACK报文DLT645
+% handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = ?DLT645,step = read_meter}} = TCPState) ->
+% dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
+% ?LOG(info, "GGM 222 Meter Rev Buff========DLT645========= ~p~n~n~n", [dgiot_utils:binary_to_hex(Buff)]),
+% {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
+% % ?LOG(info, "GGM 245 Frames ~p", [Frames]),
+% case Frames of
+% % 抄表数据返回
+% [#{<<"command">> := 16#91,<<"di">>:=<<16#00,16#00,16#00,16#00>>,<<"addr">> := Addr, <<"value">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
+% % ?LOG(info, "GGM 247 jsx:encode(Value) ~p", [jsx:encode(Value)]),
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(Value));
+% _ -> pass
+% end;
+% % 查询上一次合闸时间返回
+% [#{<<"command">>:=16#91,<<"di">> := <<16#1E,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+% Di = <<16#1E,16#00,16#01,16#01>>,
+% DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
+% ?LOG(info, "GGM 248 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+% _ -> pass
+% end;
+% % 查询上一次拉闸时间返回
+% [#{<<"command">>:=16#91,<<"di">> := <<16#1D,16#00,16#01,16#01>>,<<"addr">> := Addr,<<"data">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+% Di = <<16#1D,16#00,16#01,16#01>>,
+% DValue = #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
+% ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+% _ -> pass
+% end;
+% % 拉闸,合闸成功
+% [#{<<"command">>:=16#9C, <<"addr">> := Addr} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+% Di = <<16#FE,16#FE,16#FE,16#FE>>,
+% DValue = #{dgiot_utils:to_hex(Di)=>0 },
+% ?LOG(info, "GGM 249 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+% _ -> pass
+% end;
+% % 拉闸,合闸失败
+% [#{<<"command">>:=16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] ->
+% case dgiot_data:get({meter, ChannelId}) of
+% {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
+% Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
+% Di = <<16#FE,16#FE,16#FE,16#FD>>,
+% DValue = #{dgiot_utils:to_hex(Di)=>dgiot_utils:to_hex(Value)},
+% ?LOG(info, "GGM 250 jsx:encode(Value) ~p", [jsx:encode(DValue)]),
+% dgiot_mqtt:publish(DevAddr, Topic, jsx:encode(DValue));
+% _ -> pass
+% end;
+% _ -> pass
+% end,
+% {noreply, TCPState#tcp{buff = Rest}};
%% 异常报文丢弃
%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop
handle_info(_Info, TCPState) ->
+ ?LOG(info, "GGM 999 Meter Rev Buff========ERROR========= dgiot_meter_tcp, handle_info9, ~p,~p~n~n~n ",[_Info,TCPState]),
{noreply, TCPState}.
handle_call(_Msg, _From, TCPState) ->
diff --git a/apps/dgiot_meter/src/proctol/dlt376_decoder.erl b/apps/dgiot_meter/src/proctol/dlt376_decoder.erl
new file mode 100644
index 00000000..7ed822fd
--- /dev/null
+++ b/apps/dgiot_meter/src/proctol/dlt376_decoder.erl
@@ -0,0 +1,262 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(dlt376_decoder).
+-author("gugm").
+-include("dgiot_meter.hrl").
+-include_lib("dgiot/include/logger.hrl").
+-protocol([?DLT376]).
+
+%% API
+-export([parse_frame/2, to_frame/1, parse_value/2]).
+
+
+parse_frame(Buff, Opts) ->
+ parse_frame(Buff, [], Opts).
+
+parse_frame(<<>>, Acc, _Opts) ->
+ {<<>>, Acc};
+
+% 对于小于9的消息,独立decode
+parse_frame(<> = Bin, Acc, _Opts) when byte_size(Rest) == 15 ->
+ NewFrame = #{
+ <<"msgtype">> => ?DLT645
+ },
+ Acc1 = Acc ++ [NewFrame],
+ {Bin, Acc1};
+
+
+%% DLT376协议
+%% 68 32 00 32 00 68 C9 14 03 32 63 00 02 73 00 00 01 00 EB 16
+parse_frame(<<16#68, _:16, L2_low:6, _:2,L2_high:8, 16#68, C:8, A1:2/bytes,A2:2/bytes,A3:1/bytes, AFN:8, SEQ:8,Rest/binary>> = Bin, Acc, Opts) ->
+ Len = L2_high * 255 + L2_low,
+ DLen = Len -8,
+ case byte_size(Rest) -2 >= DLen of
+ true ->
+ case Rest of
+ <> ->
+ CheckBuf = <>,
+ CheckCrc = dgiot_utils:get_parity(CheckBuf),
+ % BinA = dgiot_utils:to_binary(A),
+ Acc1 =
+ case CheckCrc =:= Crc of
+ true ->
+ Frame = #{
+ % <<"addr">> => <<"16#00,16#00",dlt645_proctol:reverse(A1)/binary,dlt645_proctol:reverse(A2)/binary>>,
+ <<"addr">> => dgiot_utils:binary_to_hex(dlt376_proctol:encode_of_addr(A1,A2)), %dlt376_proctol:concrat_binary(dlt645_proctol:reverse(A1),dlt645_proctol:reverse(A2)),
+ <<"command">> => C,
+ <<"afn">> => AFN,
+ <<"datalen">> => DLen,
+ <<"msgtype">> => ?DLT376
+ },
+ case catch (parse_userzone(UserZone, Frame, Opts)) of
+ {'EXIT', Reason} ->
+ ?LOG(warning,"UserZone error,UserZone:~p, Reason:~p~n", [dgiot_utils:binary_to_hex(UserZone), Reason]),
+ Acc;
+ NewFrame ->
+ Acc ++ [NewFrame]
+ end;
+ false ->
+ Acc
+ end,
+ parse_frame(Rest1, Acc1, Opts);
+ _ ->
+ parse_frame(Rest, Acc, Opts)
+ end;
+ false ->
+ {Bin, Acc}
+ end;
+
+parse_frame(<<_:8, Rest/binary>>, Acc, Opts) when byte_size(Rest) > 50 ->
+ parse_frame(Rest, Acc, Opts);
+
+parse_frame(<>, Acc, _Opts) ->
+ {Rest, Acc}.
+
+parse_userzone(UserZone, #{<<"msgtype">> := ?DLT376} = Frame, _Opts) ->
+ check_Command(Frame#{<<"data">> => UserZone}).
+
+%% 组装成封包
+to_frame(#{
+ % <<"msgtype">> := ?DLT376,
+ <<"command">> := C,
+ <<"addr">> := Addr,
+ <<"afn">> := AFN
+} = Msg) ->
+ {ok, UserZone} = get_userzone(Msg),
+ Len = (byte_size(UserZone) + 8) * 4 + 2,
+ Crc = dgiot_utils:get_parity(<>),
+ <<
+ 16#68,
+ Len:8,
+ 16#00,
+ Len:8,
+ 16#00,
+ 16#68,
+ C:8,
+ Addr:5/bytes,
+ AFN:8,
+ 16#71,
+ UserZone/binary,
+ Crc:8,
+ 16#16
+ >>.
+
+% DLT376 链路检测,心跳数据
+check_Command(State = #{<<"command">> := 16#C9, <<"afn">> := 16#02}) ->
+ State;
+
+% DLT376 抄表返回的数据
+check_Command(State = #{<<"command">> := 16#88, <<"afn">> := 16#0C}) ->
+ Data = maps:get(<<"data">>, State, <<>>),
+ case Data of
+ <> ->
+ % {Value, Diff, TopicDI} = parse_value(dlt645_proctol:reverse(Di), Bin),
+ State1 = #{
+ <<"di">> => Di,
+ <<"time">> =>dgiot_utils:to_hex(DTime),
+ <<"valuenum">> => DNum,
+ <<"value">> => #{dgiot_utils:to_hex(Di)=>binary_to_value_dlt376_bcd(DValue) },
+ <<"addr">> => maps:get(<<"addr">>, State, <<>>)
+ },
+ State1;
+ _ ->
+ State
+ end;
+
+% DLT376 穿透转发返回
+check_Command(State = #{<<"command">> := 16#88, <<"afn">> := 16#10}) ->
+ Data = maps:get(<<"data">>, State, <<>>),
+ case Data of
+ <<_:4/bytes,_:1/bytes,DLen2:8,DLen1:8,Rest/bytes>> ->
+ DLen = DLen1 * 255 + DLen2,
+ case Rest of
+ <> ->
+ {_, Frames} = dlt645_decoder:parse_frame(DValue, []),
+ ?LOG(warning,"GGM 160 check_Command:~p", [Frames]),
+ case Frames of
+ % 拉闸、合闸返回成功
+ [#{<<"command">>:=16#9C} | _] ->
+ Di = <<16#FE,16#FE,16#FE,16#FE>>,
+ State1 = #{
+ <<"di">> => Di,%不做处理
+ <<"value">> => #{dgiot_utils:to_hex(Di)=>0 },
+ <<"addr">> => maps:get(<<"addr">>, State, <<>>)
+ },
+ State1;
+ % 拉闸、合闸返回失败
+ [#{<<"command">>:=16#DC,<<"data">> := VData} | _] ->
+ Di = <<16#FE,16#FE,16#FE,16#FD>>,
+ State1 = #{
+ <<"di">> => Di,%不做处理
+ <<"value">> => #{dgiot_utils:to_hex(Di)=>dgiot_utils:to_hex(VData) },
+ <<"addr">> => maps:get(<<"addr">>, State, <<>>)
+ },
+ State1;
+ % 查询上一次合闸时间返回
+ [#{<<"command">>:=16#91,<<"di">> := <<16#1E,16#00,16#01,16#01>>,<<"data">> := VData} | _] ->
+ Di = <<16#1E,16#00,16#01,16#01>>,
+ State1 = #{
+ <<"di">> => Di,
+ <<"value">> => #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(VData) },
+ <<"addr">> => maps:get(<<"addr">>, State, <<>>)
+ },
+ State1;
+ % 查询上一次拉闸时间返回
+ [#{<<"command">>:=16#91,<<"di">> := <<16#1D,16#00,16#01,16#01>>,<<"data">> := VData} | _] ->
+ Di = <<16#1D,16#00,16#01,16#01>>,
+ State1 = #{
+ <<"di">> => Di,
+ <<"value">> => #{dgiot_utils:to_hex(Di)=>dlt645_decoder:binary_to_dtime_dlt645_bcd(VData) },
+ <<"addr">> => maps:get(<<"addr">>, State, <<>>)
+ },
+ State1;
+ _ ->
+ pass
+ end;
+ _->
+ pass
+ end;
+ _ ->
+ State
+ end;
+
+check_Command(State) ->
+ State.
+
+
+% DLT376协议中把二进制转化成float
+binary_to_value_dlt376_bcd(BinValue) ->
+ RValue =
+ case BinValue of
+ <> ->
+ Value = V6 * 100000 + V5 * 10000 + V4 * 1000 + V3 * 100 + V2 * 10 + V1 + Vf1 * 0.1 + Vf2 * 0.01 + Vf3 * 0.001 + Vf4 * 0.0001,
+ Value;
+ _ ->
+ 0.0
+ end,
+ RValue.
+
+get_userzone(Msg) ->
+ Di = maps:get(<<"di">>, Msg, <<>>),
+ Data = maps:get(<<"data">>, Msg, <<>>),
+ Di2 = dgiot_utils:hex_to_binary(Di),
+ Data2 = dgiot_utils:hex_to_binary(Data),
+ UserZone = <>,
+ {ok, UserZone}.
+
+parse_value(Di, Data) ->
+ {DI, Diff, SendDi} =
+ case Di of
+ <<16#05, 16#06, Di3:8, Di4:8>> when (Di3 >= 1 andalso Di3 =< 8) andalso (Di4 >= 1 andalso Di4 =< 63) ->
+ {<<16#05, 16#06, Di3:8, 1>>, Di4 - 1, dgiot_utils:binary_to_hex(<<1, Di3:8, 6, 5>>)};
+ <<16#00, D2:8, 16#FF, Di4:8>> when D2 >= 1 andalso D2 =< 10 andalso (Di4 >= 1 andalso Di4 =< 12) ->
+ {<<16#00, D2:8, 16#FF, 1>>, Di4 - 1, dgiot_utils:binary_to_hex(<<1, 16#FF, D2:8, 0>>)};
+ _ ->
+ {Di, 0, dgiot_utils:binary_to_hex(dlt645_proctol:reverse(Di))}
+ end,
+ case dlt645_proctol:parse_data_to_json(DI, Data) of
+ {Key, Value} ->
+ ValueMap =
+ case jsx:is_json(Value) of
+ false ->
+ #{Key => Value};
+ true ->
+ [{K1, _V1} | _] = Value0 = jsx:decode(Value),
+ case size(K1) == 8 of
+ true ->
+ maps:from_list(Value0);
+ false ->
+ maps:from_list(lists:map(fun({K2, V2}) ->
+ <> = K2,
+ {<>, V2}
+ end, Value0))
+
+ end
+ end,
+ {ValueMap, Diff, SendDi};
+ _ -> {#{}, Diff, SendDi}
+ end.
+
+% test() ->
+% B1 = <<12, 16#68, 16#01, 16#00, 16#00, 16#00, 16#00, 16#00, 16#68, 16#91, 16#08, 16#33, 16#33, 16#3D, 16#33, 16#33, 16#33, 16#33, 16#33, 16#0C, 16#16,
+% 1, 3, 36,
+% 16#68, 16#18, 16#00, 16#18, 16#00, 16#68, 16#88, 16#00, 16#31, 16#07, 16#02, 16#00, 16#00, 16#01, 16#0c, 16#64, 16#00, 16#00, 16#00, 16#00, 16#01, 16#01, 16#58, 16#23, 16#10, 16#03, 16#16, 16#93, 16#99, 16#02, 16#07, 16#16,
+% 16#68, 16#90, 16#F0, 16#55, 16#00, 16#87>>,
+% {Rest, Frames} = parse_frame(B1, [], #{<<"vcaddr">> => <<"003107020000">>}),
+% io:format("Rest:~p~n", [Frames]),
+% B2 = <<16#00, 16#68, 16#12, 16#09, 16#00, 16#40, 16#01, 16#02, 16#00, 16#07, 16#00, 16#18, 16#11, 16#18, 16#D2, 16#16>>,
+% {Rest2, Frames2} = parse_frame(<>, [], #{vcaddr => <<"00310702">>}),
+% io:format("Rest:~p, Frames:~p~n", [Rest2, Frames2]).
diff --git a/apps/dgiot_meter/src/proctol/dlt376_proctol.erl b/apps/dgiot_meter/src/proctol/dlt376_proctol.erl
new file mode 100644
index 00000000..8bc2a3ea
--- /dev/null
+++ b/apps/dgiot_meter/src/proctol/dlt376_proctol.erl
@@ -0,0 +1,61 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @doc dlt645 Protocol Processor.
+-module(dlt376_proctol).
+-author("gugm").
+
+-include_lib("dgiot_meter.hrl").
+-include_lib("dgiot/include/logger.hrl").
+
+-export([
+ binary_to_hex/1,
+ concrat_binary/2,
+ encode_of_addr/2,
+ decode_of_addr/1]).
+
+% -record(di_data_A6, {di1, di2, di3, di4,di5}).
+
+
+binary_to_hex(Id) ->
+ << <> || <> <= Id, Y <- integer_to_list(X,16)>>.
+
+% binary 相连接
+concrat_binary( Acc,<<>>) -> Acc;
+concrat_binary(Acc,<> ) ->
+ concrat_binary(<>,Rest).
+
+
+split_head_bytes(<>) ->
+ {Head,Rest}.
+
+
+% 把地址转化成binary
+encode_of_addr(A1,A2) ->
+ AA = concrat_binary(dlt645_proctol:reverse(A1),dlt645_proctol:reverse(A2)),
+ AA1 = concrat_binary(<<16#00,16#00>>,AA),
+ AA1.
+
+%把binary转化成地址
+decode_of_addr(A) ->
+ {_,Rest} = split_head_bytes(A),
+ {A1,A2} = split_head_bytes(Rest),
+ AA = concrat_binary(dlt645_proctol:reverse(A1),dlt645_proctol:reverse(A2)),
+ AA1 = concrat_binary(AA,<<16#14>>),
+ AA1.
+
+
+
diff --git a/apps/dgiot_meter/src/proctol/dlt645_decoder.erl b/apps/dgiot_meter/src/proctol/dlt645_decoder.erl
index 5508c5be..e22245e5 100644
--- a/apps/dgiot_meter/src/proctol/dlt645_decoder.erl
+++ b/apps/dgiot_meter/src/proctol/dlt645_decoder.erl
@@ -20,7 +20,7 @@
-protocol([?DLT645]).
%% API
--export([parse_frame/2, to_frame/1, test/0, parse_value/2]).
+-export([parse_frame/2, to_frame/1, test/0, parse_value/2,binary_to_dtime_dlt645_bcd/1]).
parse_frame(Buff, Opts) ->
parse_frame(Buff, [], Opts).
@@ -81,7 +81,7 @@ parse_userzone(UserZone, #{<<"msgtype">> := ?DLT645} = Frame, _Opts) ->
%% 组装成封包
to_frame(#{
- <<"msgtype">> := ?DLT645,
+ % <<"msgtype">> := ?DLT645,
<<"command">> := C,
<<"addr">> := Addr
} = Msg) ->
@@ -152,11 +152,13 @@ check_Command(State = #{<<"command">> := 16#C1, <<"data">> := <>})
% -define(DLT645_MS_FORCE_EVENT_NAME, <<"1C">>).
% -define(DLT645_SM_FORCE_EVENT_NORM_NAME, <<"9C">>).
% -define(DLT645_SM_FORCE_EVENT_ERRO_NAME, <<"DC">>).
+% 远程开闸、拉闸 返回正常
check_Command(State = #{<<"command">> := 16#9C}) ->
State#{
};
+% 远程开闸、拉闸 返回正常
check_Command(State = #{<<"command">> := 16#DC}) ->
State#{
@@ -207,6 +209,29 @@ parse_value(Di, Data) ->
_ -> {#{}, Diff, SendDi}
end.
+%返回数据转化成时间(时间戳(单位:秒))如1554282344
+binary_to_dtime_dlt645_bcd(BinValue) ->
+ RValue =
+ case BinValue of
+ <> ->
+ Year = 2 * 1000 + Y1 * 10 + Y2 ,
+ Month = M1 * 10 + M2,
+ Day = D1 * 10 + D2,
+ Hour = H1 * 10 + H2,
+ Minite = MT1 * 10 + MT2,
+ Second = S1 * 10 + S2,
+ case Year of
+ 2000 ->
+ 0;
+ _ ->
+ Value = calendar:datetime_to_gregorian_seconds({{Year,Month,Day},{Hour,Minite,Second}})-719528*24*3600-8*3600,
+ Value
+ end;
+ _ ->
+ 0
+ end,
+ RValue.
+
test() ->
B1 = <<12, 16#68, 16#01, 16#00, 16#00, 16#00, 16#00, 16#00, 16#68, 16#91, 16#08, 16#33, 16#33, 16#3D, 16#33, 16#33, 16#33, 16#33, 16#33, 16#0C, 16#16,
1, 3, 36,