From 11ee5730d0805fa8058b9f5058e8bb18bcdc07c3 Mon Sep 17 00:00:00 2001 From: dawnwinterLiu <1737801684@qq.com> Date: Tue, 5 Dec 2023 20:38:08 +0800 Subject: [PATCH] fix: alter --- apps/dgiot/src/otp/dgiot_metrics.erl | 10 +- apps/dgiot/src/utils/dgiot_csv.erl | 1 + apps/dgiot/src/utils/dgiot_datetime.erl | 2 +- apps/dgiot_bamis/src/dgiot_bamis.erl | 2 +- .../src/channel/dgiot_mqttc_worker.erl | 1 + apps/dgiot_device/src/dgiot_device.erl | 1 - .../dgiot_device/src/dgiot_device_channel.erl | 1 + .../dgiot_device/src/dgiot_device_profile.erl | 2 +- .../src/utils/dgiot_device_cache.erl | 5 +- .../src/utils/dgiot_product_csv.erl | 5 +- apps/dgiot_dlink/priv/json/Notification.json | 139 ++++++++++++++++-- .../src/dgiot_modbusc_channel.erl | 74 +++++++++- apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl | 27 ++-- apps/dgiot_modbus/src/dgiot_modbusrtu_tcp.erl | 1 - apps/dgiot_modbus/src/modbus/modbus_tcp.erl | 101 +++---------- apps/dgiot_parse/src/dgiot_parse_hook.erl | 2 +- apps/dgiot_task/src/dgiot_task.erl | 2 +- .../src/pools/dgiot_tdengine_pool.erl | 15 +- .../src/utils/dgiot_tdengine_field.erl | 2 + dgiot_install.sh | 2 +- 20 files changed, 266 insertions(+), 129 deletions(-) diff --git a/apps/dgiot/src/otp/dgiot_metrics.erl b/apps/dgiot/src/otp/dgiot_metrics.erl index a1a053c6..d99b3d65 100644 --- a/apps/dgiot/src/otp/dgiot_metrics.erl +++ b/apps/dgiot/src/otp/dgiot_metrics.erl @@ -28,7 +28,7 @@ -export([histogram_reset/1, histogram_reset/2, histogram_reset/3]). -export([init_metrics/1, collect_metrics/4]). --export([start_metrics/1, inc/3, inc/4, inc/5, dec/3, dec/4, dec/5]). +-export([start_metrics/1, get/2, inc/3, inc/4, inc/5, dec/3, dec/4, dec/5]). -export([start/1, check_metrics/0, reset_metrics/1]). -route_path("/metrics/:Registry"). @@ -109,6 +109,14 @@ histogram_reset(Name, LabelValues) -> histogram_reset(Registry, Name, LabelValues) -> prometheus_histogram:reset(Registry, Name, LabelValues). +get(Registry, Name) -> + case dgiot_data:lookup(?DGIOT_METRICS_ETS, {Name, Registry}) of + {error, not_find} -> + not_find; + {ok, Count1} -> + Count1 + end. + %%新增统计函数 inc(Registry, Name, Value) -> {ok, Count} = diff --git a/apps/dgiot/src/utils/dgiot_csv.erl b/apps/dgiot/src/utils/dgiot_csv.erl index 4abe8cff..6ee68861 100644 --- a/apps/dgiot/src/utils/dgiot_csv.erl +++ b/apps/dgiot/src/utils/dgiot_csv.erl @@ -55,6 +55,7 @@ save_csv_ets(Module, FilePath) -> case dgiot_httpc:download(Url, DownloadPath) of {ok, saved_to_file} -> AtomName = dgiot_utils:to_atom(FileName), + dgiot_data:delete(AtomName), dgiot_data:init(AtomName), put(count, -1), Fun = fun(X) -> diff --git a/apps/dgiot/src/utils/dgiot_datetime.erl b/apps/dgiot/src/utils/dgiot_datetime.erl index 8598f60f..a3db6f04 100644 --- a/apps/dgiot/src/utils/dgiot_datetime.erl +++ b/apps/dgiot/src/utils/dgiot_datetime.erl @@ -190,7 +190,7 @@ to_localtime(Time) when is_tuple(Time) -> Time; to_localtime(NowStamp) when is_integer(NowStamp) -> unixtime_to_localtime(NowStamp); -to_localtime(<>) -> +to_localtime(<>) -> Data = {{to_int(Y), to_int(M), to_int(D)}, {to_int(H), to_int(N), to_int(S)}}, Ms = localtime_to_unixtime(Data) + timezone() * 60 * 60, unixtime_to_localtime(Ms); diff --git a/apps/dgiot_bamis/src/dgiot_bamis.erl b/apps/dgiot_bamis/src/dgiot_bamis.erl index 7941f72a..a327e7bf 100644 --- a/apps/dgiot_bamis/src/dgiot_bamis.erl +++ b/apps/dgiot_bamis/src/dgiot_bamis.erl @@ -53,7 +53,7 @@ get({'before', '*', Args}) -> end end, #{}, Args), Basic = format(NewData), -%% io:format("~s ~p Basic: ~p ~n", [?FILE, ?LINE, Basic]), +%% io:format("~s ~p Basic = ~p.~n", [?FILE, ?LINE, Basic]), Basic; get({'after', #{<<"results">> := Response, <<"count">> := Count} = _Data}) -> diff --git a/apps/dgiot_bridge/src/channel/dgiot_mqttc_worker.erl b/apps/dgiot_bridge/src/channel/dgiot_mqttc_worker.erl index 1635f909..3d1fedd6 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_mqttc_worker.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_mqttc_worker.erl @@ -92,6 +92,7 @@ handle_info({sub, Client, ProductId, DevAddr}, State) -> %% #{client_pid => <0.11482.0>, dup => false, packet_id => undefined, payload => <<"{ \"msg\": \"Hello, World!\" }">>, topic =><<"$dg/device/5392ccb3d7/00E0B45BFB4F_usb6-ai15/test">>, properties => undefined,qos => 0, retain => false} handle_info({publish, Topic, Payload}, #dclient{channel = ChannelId} = State) -> dgiot_bridge:send_log(ChannelId, "~s ~p cloud to edge Topic ~p Payload ~p ~n", [?FILE, ?LINE, Topic, Payload]), + dgiot_mqtt:publish(ChannelId, <<"cloud2edge/", Topic/binary>>, Payload), {noreply, State}; handle_info({dclient_ack, Topic, Payload}, #dclient{client = ClientId, channel = ChannelId} = State) -> diff --git a/apps/dgiot_device/src/dgiot_device.erl b/apps/dgiot_device/src/dgiot_device.erl index cfbc8902..19308c26 100644 --- a/apps/dgiot_device/src/dgiot_device.erl +++ b/apps/dgiot_device/src/dgiot_device.erl @@ -190,7 +190,6 @@ create_device(#{<<"status">> := Status, <<"brand">> := Brand, <<"devModel">> := Batch_name = dgiot_utils:to_list(Y) ++ dgiot_utils:to_list(M) ++ dgiot_utils:to_list(D), <> = dgiot_utils:to_md5(dgiot_utils:random()), NewDevice = Device#{ - <<"location">> => maps:get(<<"location">>, Device, #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.161324, <<"latitude">> => 30.262441}), <<"basedata">> => maps:get(<<"basedata">>, Device, #{}), <<"content">> => maps:get(<<"content">>, Device, #{}), <<"profile">> => maps:get(<<"profile">>, Device, #{}), diff --git a/apps/dgiot_device/src/dgiot_device_channel.erl b/apps/dgiot_device/src/dgiot_device_channel.erl index f754a322..352c9b95 100644 --- a/apps/dgiot_device/src/dgiot_device_channel.erl +++ b/apps/dgiot_device/src/dgiot_device_channel.erl @@ -259,6 +259,7 @@ handle_message({sync_parse, Pid, 'before', post, Token, <<"Device">>, #{<<"locat MapType = maps:get(<<"mapType">>, Cookie, <<"baidu">>), NewLocation = dgiot_gps:towgs84(Location, MapType), NewQueryData = QueryData#{<<"location">> => NewLocation}, + dgiot_device:put(NewQueryData), dgiot_parse_hook:publish(Pid, NewQueryData), {ok, State}; diff --git a/apps/dgiot_device/src/dgiot_device_profile.erl b/apps/dgiot_device/src/dgiot_device_profile.erl index a951bf64..0c85eac6 100644 --- a/apps/dgiot_device/src/dgiot_device_profile.erl +++ b/apps/dgiot_device/src/dgiot_device_profile.erl @@ -112,7 +112,7 @@ encode_profile(ProductId, Profile) -> case maps:find(Identifier, Profile) of {ok, V} -> Acc#{ - Index => #{ + dgiot_utils:to_int(Index) => #{ <<"value">> => V, <<"identifier">> => Identifier, <<"name">> => Name, diff --git a/apps/dgiot_device/src/utils/dgiot_device_cache.erl b/apps/dgiot_device/src/utils/dgiot_device_cache.erl index 32cca321..b96ba656 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_cache.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_cache.erl @@ -141,6 +141,9 @@ put(Device) -> {ok, #{<<"status">> := Status, <<"state">> := State, <<"acl">> := Acl, <<"isEnable">> := IsEnable, <<"devaddr">> := Devaddr, <<"time">> := Oldtime, <<"productid">> := ProductId, <<"devicesecret">> := DeviceSecret, <<"node">> := Node, <<"longitude">> := Longitude, <<"latitude">> := Latitude}} -> NewIsEnable = maps:get(<<"isEnable">>, Device, IsEnable), + Location = maps:get(<<"location">>, Device, #{<<"longitude">> => Longitude, <<"latitude">> => Latitude}), + NewLongitude = maps:get(<<"longitude">>, Location, Longitude), + NewLatitude = maps:get(<<"latitude">>, Location, Latitude), {NewStatus, Now} = check_time(Status, Device, Oldtime), NewAcl = case maps:find(<<"ACL">>, Device) of @@ -149,7 +152,7 @@ put(Device) -> _ -> dgiot_role:get_acls(Device) end, - insert_mnesia(DeviceId, NewAcl, NewStatus, State, Now, NewIsEnable, ProductId, Devaddr, DeviceSecret, Node, Longitude, Latitude); + insert_mnesia(DeviceId, NewAcl, NewStatus, State, Now, NewIsEnable, ProductId, Devaddr, DeviceSecret, Node, NewLongitude, NewLatitude); _ -> pass end. diff --git a/apps/dgiot_device/src/utils/dgiot_product_csv.erl b/apps/dgiot_device/src/utils/dgiot_product_csv.erl index bcfe4ffa..93f4226a 100644 --- a/apps/dgiot_device/src/utils/dgiot_product_csv.erl +++ b/apps/dgiot_device/src/utils/dgiot_product_csv.erl @@ -198,6 +198,7 @@ post_properties(Things, AtomName, ProductId, ProductName) -> <<"dataType">> => #{ <<"das">> => [], <<"type">> => to_lower(Type), + <<"size">> => 99, <<"specs">> => #{ <<"min">> => Min, <<"max">> => Max, @@ -244,10 +245,12 @@ get_accessmode(_AccessMode) -> get_min_max(Min_Max) -> case binary:split(Min_Max, <<$->>, [global, trim]) of + [<<>>, Min, Max] -> + {-dgiot_utils:to_int(Min), dgiot_utils:to_int(Max)}; [Min, Max] -> {dgiot_utils:to_int(Min), dgiot_utils:to_int(Max)}; _ -> - {0, 999} + {-65535, 65535} end. %%get_operatetype(Operatetype) -> diff --git a/apps/dgiot_dlink/priv/json/Notification.json b/apps/dgiot_dlink/priv/json/Notification.json index 8677339d..fa81292c 100644 --- a/apps/dgiot_dlink/priv/json/Notification.json +++ b/apps/dgiot_dlink/priv/json/Notification.json @@ -43,7 +43,7 @@ } ], "type": "radios", - "value": "true" + "value": "false" }, { "horizontal": null, @@ -71,16 +71,8 @@ "value": "" }, { - "label": "质检", - "value": "c5858f08bf" - }, - { - "label": "开发者(测试)", - "value": "ccf5456562" - }, - { - "label": "运维部(test)", - "value": "9470abe2e7" + "label": "智慧车间", + "value": "8f7191ca2d" } ], "readOnly": true, @@ -225,7 +217,7 @@ } ], "type": "radios", - "value": "true" + "value": "false" }, { "hint": "", @@ -394,7 +386,7 @@ } ], "type": "radios", - "value": "true" + "value": "false" }, { "autoComplete": false, @@ -601,6 +593,127 @@ } ], "title": "工单配置" + }, + { + "body": [ + { + "api": { + "data": null, + "dataType": "json", + "headers": { + "dgiotReplace": "parse_view_objectid", + "store": "localStorage" + }, + "method": "put", + "requestAdaptor": "return {\r\n ...api,\r\n data:{\r\n meta:api.data \r\n }\r\n}", + "url": "/iotapi/classes/View/parse_view_objectid" + }, + "body": [ + { + "checkAll": false, + "id": "u:de68a1c03290", + "joinValues": true, + "label": "是否推送", + "mode": "horizontal", + "name": "otherpush.ispush", + "options": [ + { + "label": "推送", + "value": "true" + }, + { + "label": "不推送", + "value": "false" + } + ], + "type": "radios", + "value": "false" + }, + { + "checkAll": false, + "id": "u:4fcc58963f83", + "label": "推送类型", + "mode": "horizontal", + "name": "otherpush.type", + "options": [ + { + "label": "mqtt", + "value": "mqtt" + }, + { + "label": "api", + "value": "api" + } + ], + "size": "lg", + "type": "select", + "value": "mqtt" + }, + { + "hint": "", + "id": "u:32dded637284", + "label": "第三方topic", + "mode": "horizontal", + "name": "otherpush.topic", + "size": "lg", + "type": "input-text", + "value": "/devWar/up", + "visibleOn": "this.otherpush.type == \"mqtt\"" + }, + { + "hint": "", + "id": "u:616e61bbae74", + "label": "第三方api", + "mode": "horizontal", + "name": "otherpush.api", + "size": "lg", + "type": "input-text", + "value": "http://127.0.0.1/devWar/up", + "visibleOn": "this.otherpush.type == \"api\"" + }, + { + "checkAll": false, + "id": "u:450ee0bc94e2", + "label": "告警等级", + "mode": "horizontal", + "name": "otherpush.level", + "options": [ + { + "label": "一级", + "value": "一级" + }, + { + "label": "二级", + "value": "二级" + }, + { + "label": "三级", + "value": "三级" + } + ], + "size": "lg", + "type": "select", + "value": "一级" + }, + { + "checkAll": false, + "id": "u:073a9a76df1c", + "label": "告警描述", + "mode": "horizontal", + "name": "otherpush.description", + "size": "lg", + "type": "input-text" + } + ], + "id": "u:4ae24134beb4", + "initApi": "", + "multiLine": false, + "name": "alarm_form", + "title": "", + "type": "form" + } + ], + "title": "第三方推送" } ], "type": "tabs" diff --git a/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl b/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl index 6a360608..2606b119 100644 --- a/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl +++ b/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl @@ -64,20 +64,61 @@ zh => <<"服务器端口"/utf8>> } }, - <<"filepath">> => #{ + <<"slaveid">> => #{ order => 3, - type => upload, + type => integer, required => true, - default => <<"">>, + default => 1, title => #{ - zh => <<"csv文件"/utf8>> + zh => <<"从机地址"/utf8>> }, description => #{ - zh => <<"上传csv点位文件"/utf8>> + zh => <<"从机地址"/utf8>> + } + }, + <<"function">> => #{ + order => 4, + type => string, + required => true, + default => #{<<"value">> => <<"readHregs">>, <<"label">> => <<"0X03:读保持寄存器"/utf8>>}, + enum => [#{<<"value">> => <<"readCoils">>, <<"label">> => <<"0X01:读线圈"/utf8>>}, + #{<<"value">> => <<"readInputs">>, <<"label">> => <<"0X02:读离散量输入"/utf8>>}, + #{<<"value">> => <<"readHregs">>, <<"label">> => <<"0X03:读保持寄存器"/utf8>>}, + #{<<"value">> => <<"readIregs">>, <<"label">> => <<"0X04:读输入寄存器"/utf8>>} + ], + title => #{ + zh => <<"功能码"/utf8>> + }, + description => #{ + zh => <<"功能码"/utf8>> + } + }, + <<"address">> => #{ + order => 5, + type => integer, + required => true, + default => 0, + title => #{ + zh => <<"起始地址"/utf8>> + }, + description => #{ + zh => <<"起始地址"/utf8>> + } + }, + <<"quantity">> => #{ + order => 6, + type => integer, + required => true, + default => 100, + title => #{ + zh => <<"长度"/utf8>> + }, + description => #{ + zh => <<"长度"/utf8>> } }, <<"freq">> => #{ - order => 4, + order => 7, type => integer, required => true, default => 60, @@ -88,6 +129,18 @@ zh => <<"采集频率/秒"/utf8>> } }, + <<"filepath">> => #{ + order => 8, + type => upload, + required => true, + default => <<"">>, + title => #{ + zh => <<"csv文件"/utf8>> + }, + description => #{ + zh => <<"上传csv点位文件"/utf8>> + } + }, <<"ico">> => #{ order => 102, type => string, @@ -111,6 +164,10 @@ start(ChannelId, ChannelArgs) -> init(?TYPE, ChannelId, #{ <<"ip">> := Ip, <<"port">> := Port, + <<"slaveid">> := SlaveId, + <<"function">> := Function, + <<"address">> := Address, + <<"quantity">> := Quantity, <<"freq">> := Freq, <<"Size">> := Size } = Args) -> @@ -128,6 +185,10 @@ init(?TYPE, ChannelId, #{ <<"port">> => Port, <<"mod">> => dgiot_modbusc_tcp, <<"child">> => #{ + slaveid => SlaveId, + function => Function, + address => dgiot_utils:to_int(Address), + quantity => Quantity, filename => FileName, data => <<>>, freq => Freq, @@ -160,6 +221,7 @@ handle_message(_Message, State) -> {ok, State}. stop(ChannelType, ChannelId, _State) -> + dgiot_client:stop(ChannelId), dgiot_data:delete({start_client, ChannelId}), ?LOG(info, "channel stop ~p,~p", [ChannelType, ChannelId]), ok. diff --git a/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl b/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl index cee0694c..dd7e05f6 100644 --- a/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl +++ b/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl @@ -42,12 +42,11 @@ handle_info(connection_ready, #dclient{child = ChildState} = Dclient) -> handle_info(tcp_closed, #dclient{child = ChildState} = Dclient) -> {noreply, Dclient#dclient{child = ChildState}}; -handle_info(read, #dclient{channel = ChannelId, client = ClientId, child = #{minaddr := MinAddr, maxaddr := Maxaddr} = ChildState} = Dclient) -> -%% _Address1 = modbus_tcp:get_addr(ChannelId, MinAddr, Maxaddr, 124), - Address = maps:get(di, ChildState, MinAddr), +handle_info(read, #dclient{channel = ChannelId, client = ClientId, child = #{slaveid := SlaveId, function := Operatetype, address := StartAddr, maxaddr := Maxaddr} = ChildState} = Dclient) -> + Address = maps:get(di, ChildState, StartAddr), Step = maps:get(step, ChildState, 100), Registersnumber = - case Address + Step >= Maxaddr of + case (Address + Step) >= Maxaddr of true -> Maxaddr - Address + 1; _ -> @@ -56,28 +55,29 @@ handle_info(read, #dclient{channel = ChannelId, client = ClientId, child = #{min DataSource = #{ <<"registersnumber">> => Registersnumber, - <<"slaveid">> => <<"0X01">>, - <<"operatetype">> => <<"readHregs">>, + <<"slaveid">> => SlaveId, + <<"operatetype">> => Operatetype, <<"address">> => Address }, Data = modbus_tcp:to_frame(DataSource), dgiot_tcp_client:send(ChannelId, ClientId, Data), -%% io:format("~s ~p Read Data = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Data)]), - dgiot_bridge:send_log(ChannelId, "Channel sends ~p to DTU", [dgiot_utils:binary_to_hex(Data)]), + dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), "~p Channel sends ~p to DTU", [dgiot_datetime:format("YYYY-MM-DD HH:NN:SS"), dgiot_utils:binary_to_hex(Data)]), %% io:format("~s ~p Address = ~p.~n", [?FILE, ?LINE, Address]), - {noreply, Dclient#dclient{child = ChildState#{minaddr => MinAddr, maxaddr => Maxaddr, di => Address, step => Step}}}; + {noreply, Dclient#dclient{child = ChildState#{di => Address, step => Step}}}; handle_info({tcp, Buff}, #dclient{channel = ChannelId, - child = #{freq := Freq, minaddr := MinAddr, maxaddr := Maxaddr, di := Address, filename := FileName, data := OldData, step := Step} = ChildState} = Dclient) -> - dgiot_bridge:send_log(ChannelId, "returns [~p] to Channel", [dgiot_utils:binary_to_hex(Buff)]), + child = #{freq := Freq, maxaddr := Maxaddr, di := Address, filename := FileName, address := StartAddr, data := OldData, step := Step} = ChildState} = Dclient) -> +%% io:format("~s ~p Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]), Data = modbus_tcp:parse_frame(Buff), case Address + Step >= Maxaddr of true -> EndData = <>, %% io:format("~s ~p EndData = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(EndData)]), - modbus_tcp:parse_frame(FileName, EndData), + dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), "~p recv EndData ~p", [dgiot_datetime:format("YYYY-MM-DD HH:NN:SS"), dgiot_utils:binary_to_hex(EndData)]), + AllData = modbus_tcp:parse_frame(StartAddr, FileName, EndData), + dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), "~p AllData ~ts", [dgiot_datetime:format("YYYY-MM-DD HH:NN:SS"), unicode:characters_to_list(jsx:encode(AllData))]), erlang:send_after(Freq * 1000, self(), read), - {noreply, Dclient#dclient{child = ChildState#{di => MinAddr, data => <<>>}}}; + {noreply, Dclient#dclient{child = ChildState#{di => StartAddr, data => <<>>}}}; _ -> erlang:send_after(1 * 1500, self(), read), {noreply, Dclient#dclient{child = ChildState#{di => Address + Step, data => <>}}} @@ -91,4 +91,3 @@ handle_info(_Info, #dclient{child = _ChildState} = Dclient) -> terminate(_Reason, _Dclient) -> ok. - diff --git a/apps/dgiot_modbus/src/dgiot_modbusrtu_tcp.erl b/apps/dgiot_modbus/src/dgiot_modbusrtu_tcp.erl index d31e7f04..74814419 100644 --- a/apps/dgiot_modbus/src/dgiot_modbusrtu_tcp.erl +++ b/apps/dgiot_modbus/src/dgiot_modbusrtu_tcp.erl @@ -181,7 +181,6 @@ create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) -> <<"product">> => ProductId, <<"ACL">> => Acl, <<"status">> => <<"ONLINE">>, - <<"location">> => #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => 120.161324, <<"latitude">> => 30.262441}, <<"brand">> => Dtutype, <<"devModel">> => DevType }), diff --git a/apps/dgiot_modbus/src/modbus/modbus_tcp.erl b/apps/dgiot_modbus/src/modbus/modbus_tcp.erl index fb40198a..d665feaf 100644 --- a/apps/dgiot_modbus/src/modbus/modbus_tcp.erl +++ b/apps/dgiot_modbus/src/modbus/modbus_tcp.erl @@ -24,7 +24,6 @@ -export([ init/1, parse_frame/1, - parse_frame/2, parse_frame/3, to_frame/1, build_req_message/1, @@ -272,14 +271,15 @@ set_params(Payload, _ProductId, _DevAddr) -> parse_frame(<<_TransactionId:16, _ProtocolId:16, _Size1:16, _Slaveid:8, _FunCode:8, DataLen:8, Data:DataLen/bytes, _/bytes>>) -> Data. -%% 00 03 64 5c -parse_frame(FileName, Data) -> +%% 00000001000200030004000540C0000041000000412000004140000041600000418000004190000041A0000041B0000041C0000041D0000041E000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000063 +%% 3F800000400000004080000040C0000041000000412000004140000041600000418000004190000041A0000041B0000041C0000041D0000041E0000041F00000420000004208000000000000000000000000000000000000000000000000000000000000000000000000000000000000426000004268000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000630000000000000000000000000000006B +parse_frame(StartAddr, FileName, Data) -> AtomName = dgiot_utils:to_atom(FileName), Things = ets:match(AtomName, {'$1', ['_', '_', '_', '_', '$2', '_', '_', '_', '$3', '_', '_', '_', '_', '_', '_', '$4' | '_']}), AllData = - lists:foldl(fun([Number, Devaddr, Address, Originaltype | _], Acc) -> + lists:foldl(fun([_Number, Devaddr, Address, Originaltype | _], Acc) -> ProductId = dgiot_data:get(AtomName, {addr, Address}), - IntOffset = dgiot_utils:to_int(Number) - 1, + IntOffset = (dgiot_utils:to_int(Address) + 1) - StartAddr, Thing = #{ <<"identifier">> => Address, <<"dataSource">> => #{ @@ -287,7 +287,6 @@ parse_frame(FileName, Data) -> <<"originaltype">> => Originaltype }}, IntLen = get_len(1, Originaltype), - Value = case IntOffset of 0 -> @@ -303,11 +302,13 @@ parse_frame(FileName, Data) -> null end; _ -> - NewIntOffset = get_len(IntOffset, Originaltype), +%% NewIntOffset = get_len(IntOffset, Originaltype), + NewIntOffset = IntOffset * 2, case Data of <<_:NewIntOffset/binary, V:IntLen/binary, _/binary>> -> case format_value(V, Thing, #{}) of {Value1, _Rest} -> +%% io:format("~s ~p ~p => ~p => ~p => ~p => ~p => ~p ~n", [?FILE, ?LINE, Address, IntOffset, NewIntOffset, IntLen, V, Value1]), Value1; _ -> V @@ -316,18 +317,25 @@ parse_frame(FileName, Data) -> null end end, - case Value of null -> Acc; _ -> -%% io:format("~s ~p Devaddr ~p. => Value = ~p.~n", [?FILE, ?LINE, Devaddr, Value]), - NewData = change_data(ProductId, #{Address => dgiot_utils:to_float(Value, 3)}), - dgiot_task:save_td(ProductId, Devaddr, NewData, #{<<"interval">> => 30}), - Acc ++ [NewData] + NewData = change_data(ProductId, #{Address => Value}), + {ProductId, Devaddr, OldData} = maps:get(<>, Acc, {ProductId, Devaddr, #{}}), + Acc#{<> => {ProductId, Devaddr, maps:merge(OldData, NewData)}} end - end, [], Things), - AllData. + end, #{}, Things), +%% io:format("~s ~p AllData = ~p.~n", [?FILE, ?LINE, AllData]), + NewAllData = + maps:fold(fun + (_, {ProductId1, Devaddr1, Ack}, Ncc) -> + NewData = dgiot_task:save_td(ProductId1, Devaddr1, Ack, #{<<"interval">> => 30}), + Ncc#{Devaddr1 => NewData}; + (_, _, Ncc) -> + Ncc + end, #{}, AllData), + NewAllData. change_data(ProductId, Data) -> case dgiot_product:lookup_prod(ProductId) of @@ -362,71 +370,6 @@ change_data(ProductId, Data) -> Data end. -%rtu modbus -parse_frame(<<>>, Acc, _State) -> {<<>>, Acc}; - -parse_frame(<> = Buff, Acc, - #{<<"addr">> := DtuAddr} = State) -> - CheckCrc = dgiot_utils:crc16(<>), - case CheckCrc =:= Crc of - true -> - Error = case ErrorCode of - ?ILLEGAL_FUNCTION -> {error, illegal_function}; - ?ILLEGAL_DATA_ADDRESS -> {error, illegal_data_address}; - ?ILLEGAL_DATA_VALUE -> {error, illegal_data_value}; - ?SLAVE_DEVICE_FAILURE -> {error, slave_device_failure}; - ?ACKNOWLEDGE -> {error, acknowledge}; - ?SLAVE_DEVICE_BUSY -> {error, slave_device_busy}; - ?NEGATIVE_ACKNOWLEDGE -> {error, negative_acknowledge}; - ?MEMORY_PARITY_ERROR -> {error, memory_parity_error}; - ?GATEWAY_PATH_UNAVAILABLE -> {error, gateway_path_unavailable}; - ?GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND -> {error, gateway_target_device_failed_to_respond}; - _ -> {error, unknown_response_code} - end, - ?LOG(info, "DtuAddr ~p Modbus ~p, BadCode ~p, Error ~p", [DtuAddr, MbAddr, BadCode, Error]), - {<<>>, #{}}; - false -> - parse_frame(Buff, Acc, State) - end; - -%% modbustcp -%% Buff = <<"000100000006011000000001">>, -parse_frame(<<_TransactionId:16, _ProtocolId:16, Size:16, _ResponseData:Size/bytes>> = Buff, Acc, #{<<"dtuproduct">> := ProductId, <<"address">> := Address} = State) -> - io:format("~s ~p Buff = ~p.~n", [?FILE, ?LINE, Buff]), - case decode_data(Buff, ProductId, Address, Acc) of - {Rest1, Acc1} -> - parse_frame(Rest1, Acc1, State); - [Buff, Acc] -> - [Buff, Acc] - end; - -%% 传感器直接做为dtu物模型的一个指标 -parse_frame(<> = Buff, Acc, #{<<"dtuproduct">> := ProductId, <<"slaveId">> := SlaveId, <<"dtuaddr">> := _DtuAddr, <<"address">> := Address} = State) -> - case decode_data(Buff, ProductId, Address, Acc) of - {Rest1, Acc1} -> - parse_frame(Rest1, Acc1, State); - [Buff, Acc] -> - [Buff, Acc] - end; - -%% 传感器独立建产品,做为子设备挂载到dtu上面 -parse_frame(<> = Buff, Acc, #{<<"dtuaddr">> := DtuAddr, <<"slaveId">> := SlaveId, <<"address">> := Address} = State) -> - case dgiot_device:get_subdevice(DtuAddr, dgiot_utils:to_binary(SlaveId)) of - not_find -> - [<<>>, Acc]; - [ProductId, _DevAddr] -> - case decode_data(Buff, ProductId, Address, Acc) of - {Rest1, Acc1} -> - parse_frame(Rest1, Acc1, State); - [Buff, Acc] -> - {Buff, Acc} - end - end; -%rtu modbus -parse_frame(_Other, Acc, _State) -> - io:format("~s ~p _Other = ~p.~n", [?FILE, ?LINE, _Other]), - {error, Acc}. - decode_data(<<_TransactionId:16, _ProtocolId:16, _Size1:16, Slaveid:8, _FunCode:8, DataLen:8, Data:DataLen/bytes>>, ProductId, Address, Acc) -> {<<>>, modbus_tcp_decoder(ProductId, Slaveid, Address, Data, Acc)}; diff --git a/apps/dgiot_parse/src/dgiot_parse_hook.erl b/apps/dgiot_parse/src/dgiot_parse_hook.erl index 34c5f33d..0b8cd933 100644 --- a/apps/dgiot_parse/src/dgiot_parse_hook.erl +++ b/apps/dgiot_parse/src/dgiot_parse_hook.erl @@ -358,7 +358,7 @@ do_put_(Id, Args, Token, Tail) -> {ok, NewArgs} = receive_put(Args), case dgiot_parse:get_object(ClassName, Id) of {ok, Class} -> - Keys = maps:keys(maps:without([<<"id">>, <<"ACL">>], NewArgs)), + Keys = maps:keys(maps:with([<<"profile">>], NewArgs)), dgiot_map:merge(maps:with(Keys, Class), maps:without([<<"id">>], NewArgs)); _ -> maps:without([<<"id">>], NewArgs) diff --git a/apps/dgiot_task/src/dgiot_task.erl b/apps/dgiot_task/src/dgiot_task.erl index 9be4599a..1f40c047 100644 --- a/apps/dgiot_task/src/dgiot_task.erl +++ b/apps/dgiot_task/src/dgiot_task.erl @@ -314,7 +314,7 @@ del_pnque(DtuId) -> save_td(ProductId, DevAddr, Ack, _AppData) -> Topic = <<"$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/properties/report">>, - dgiot_mqtt:send(ProductId, DevAddr, Topic, Ack), + dgiot_mqttc_channel:send(ProductId, DevAddr, Topic, Ack), case length(maps:to_list(Ack)) of 0 -> #{}; diff --git a/apps/dgiot_tdengine/src/pools/dgiot_tdengine_pool.erl b/apps/dgiot_tdengine/src/pools/dgiot_tdengine_pool.erl index 7778c0a3..6acd2627 100644 --- a/apps/dgiot_tdengine/src/pools/dgiot_tdengine_pool.erl +++ b/apps/dgiot_tdengine/src/pools/dgiot_tdengine_pool.erl @@ -26,7 +26,7 @@ login(ChannelId, #{<<"username">> := UserName, %% WebSocket insert_sql(#{<<"driver">> := <<"WS">>, <<"ws_pid">> := ConnPid, <<"ws_ref">> := StreamRef} = Context, _Action, Sql) when byte_size(Sql) > 0 -> -%% io:format("~s ~p Sql = ~p.~n", [?FILE, ?LINE, Sql]), +%% io:format("~s ~p insert_sql = ~p.~n", [?FILE, ?LINE, Sql]), Req_id = case get(req_id) of undefined -> @@ -47,7 +47,8 @@ insert_sql(#{<<"driver">> := <<"WS">>, <<"ws_pid">> := ConnPid, <<"ws_ref">> := #{<<"code">> := _} = R -> case maps:get(<<"channel">>, Context, <<"">>) of <<"">> -> - ?LOG(debug, "Execute (~ts) ", [unicode:characters_to_list(Sql)]); +%% ?LOG(debug, "Execute (~ts) ", [unicode:characters_to_list(Sql)]); + pass; ChannelId -> dgiot_bridge:send_log(ChannelId, "Execute (~ts) ~p", [unicode:characters_to_list(Sql), jsx:encode(R)]) end, @@ -67,14 +68,15 @@ insert_sql(_Context, _Action, _Sql) -> %% Action 用来区分数据库操作语句类型(DQL、DML、DDL、DCL) run_sql(#{<<"url">> := Url, <<"username">> := UserName, <<"password">> := Password} = Context, _Action, Sql) when byte_size(Sql) > 0 -> - ?LOG(debug, " ~p, ~p, ~p, (~ts)", [Url, UserName, Password, unicode:characters_to_list(Sql)]), -%% io:format("~s ~p Sql = ~p.~n", [?FILE, ?LINE, Sql]), +%% ?LOG(debug, " ~p, ~p, ~p, (~ts)", [Url, UserName, Password, unicode:characters_to_list(Sql)]), +%% io:format("~s ~p run_sql = ~p.~n", [?FILE, ?LINE, Sql]), case dgiot_tdengine_http:request(Url, UserName, Password, Sql) of {ok, #{<<"code">> := 0, <<"column_meta">> := Column_meta, <<"data">> := Data} = Result} -> NewData = get_data(Column_meta, Data), case maps:get(<<"channel">>, Context, <<"">>) of <<"">> -> - ?LOG(debug, "Execute ~p (~ts) ~p", [Url, unicode:characters_to_list(Sql), NewData]); +%% ?LOG(debug, "Execute ~p (~ts) ~p", [Url, unicode:characters_to_list(Sql), NewData]); + pass; ChannelId -> dgiot_bridge:send_log(ChannelId, "Execute ~p (~ts) ~p", [Url, unicode:characters_to_list(Sql), jsx:encode(NewData)]) end, @@ -84,7 +86,8 @@ run_sql(#{<<"url">> := Url, <<"username">> := UserName, <<"password">> := Passwo {ok, Result} -> case maps:get(<<"channel">>, Context, <<"">>) of <<"">> -> - ?LOG(debug, "Execute ~p (~ts) ~p", [Url, unicode:characters_to_list(Sql), Result]); +%% ?LOG(debug, "Execute ~p (~ts) ~p", [Url, unicode:characters_to_list(Sql), Result]); + pass; ChannelId -> dgiot_bridge:send_log(ChannelId, "Execute ~p (~ts) ~p", [Url, unicode:characters_to_list(Sql), jsx:encode(Result)]) end, diff --git a/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl b/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl index 6d1ca42f..66dfbd9d 100644 --- a/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl +++ b/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl @@ -161,6 +161,8 @@ check_field(Data, #{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := check_field(_, _) -> undefined. +check_validate({_, text}, _) -> + true; check_validate(null, _) -> true; check_validate(Value, #{<<"max">> := Max, <<"min">> := Min}) when is_integer(Max), is_integer(Min) -> diff --git a/dgiot_install.sh b/dgiot_install.sh index 74588068..c8aa8646 100755 --- a/dgiot_install.sh +++ b/dgiot_install.sh @@ -995,7 +995,7 @@ function install_postgres_exporter() { echo -e "$(date +%F_%T) $LINENO: ${GREEN} pg_stat_statements has installed${NC}" else sudo -u postgres /usr/local/pgsql/12/bin/psql -U postgres -d parse -c "CREATE EXTENSION pg_stat_statements SCHEMA public;" &>/dev/null - sudo -u postgres /usr/local/pgsql/12/bin/psql -U postgres -d parse -c "CREATE USER postgres_exporter PASSWORD 'password';" &>/dev/null + sudo -u postgres /usr/local/pgsql/12/bin/psql -U postgres -d parse -c "CREATE USER postgres_exporter PASSWORD '${pg_pwd}';" &>/dev/null sudo -u postgres /usr/local/pgsql/12/bin/psql -U postgres -d parse -c "ALTER USER postgres_exporter SET SEARCH_PATH TO postgres_exporter,pg_catalog;" &>/dev/null sudo -u postgres /usr/local/pgsql/12/bin/psql -U postgres -d parse -c "CREATE SCHEMA postgres_exporter AUTHORIZATION postgres_exporter;" &>/dev/null sudo -u postgres /usr/local/pgsql/12/bin/psql -U postgres -d parse -c "CREATE FUNCTION postgres_exporter.f_select_pg_stat_activity()