From 9273ac3718bb1b19d0d0bd203a9acd570c52b1b5 Mon Sep 17 00:00:00 2001 From: dawnwinterLiu <1737801684@qq.com> Date: Wed, 20 Jul 2022 17:23:45 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20taskk=20=E5=A4=84=E7=90=86=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=80=BC,=20mqtt=20=E6=B7=BB=E5=8A=A0opc=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/channel/dgiot_tcp_worker.erl | 3 +- .../src/utils/dgiot_device_card.erl | 3 +- .../src/utils/dgiot_device_tdengine.erl | 2 + .../src/mock/dgiot_mqtt_channel.erl | 8 +- .../src/proctol/dgiot_dlink_proctol.erl | 36 ++++++-- .../src/proctol/dgiot_mqtt_message.erl | 16 +++- apps/dgiot_evidence/priv/python/drawxnqx.py | 59 ------------- .../src/handler/dgiot_evidence_handler.erl | 3 +- .../src/dgiot_modbusc_channel.erl | 2 +- apps/dgiot_modbus/src/modbus/modbus_tcp.erl | 7 +- .../src/handler/dgiot_opc_handler.erl | 10 +-- apps/dgiot_task/src/dgiot_task.erl | 88 +++++++++++++++---- apps/dgiot_task/src/dgiot_task_data.erl | 14 +-- .../src/utils/dgiot_tdengine_field.erl | 30 ++++--- 14 files changed, 158 insertions(+), 123 deletions(-) diff --git a/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl b/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl index d3d7bf7f..553f6182 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl @@ -83,7 +83,8 @@ handle_info({deliver, _, Msg}, TCPState) -> handle_info(#{<<"cmd">> := Cmd, <<"data">> := Data, <<"productId">> := ProductId}, TCPState) -> do_cmd(ProductId, Cmd, Data, TCPState); -handle_info({tcp, Buff}, #tcp{state = #state{productIds = ProductIds}} = TCPState) -> +handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, productIds = ProductIds}} = TCPState) -> + dgiot_bridge:send_log(ChannelId, "Buff ~p", [Buff]), lists:map(fun(ProductId) -> do_cmd(ProductId, tcp, Buff, TCPState) end, ProductIds), diff --git a/apps/dgiot_device/src/utils/dgiot_device_card.erl b/apps/dgiot_device/src/utils/dgiot_device_card.erl index 388c7c7d..a594edde 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_card.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_card.erl @@ -44,7 +44,8 @@ get_card(ProductId, Results, DeviceId, Args) -> Props = dgiot_product:get_props(ProductId, Keys), lists:foldl(fun(X, Acc) -> case X of - #{<<"name">> := Name, <<"identifier">> := Identifier, <<"dataForm">> := #{<<"protocol">> := Protocol}, <<"dataSource">> := DataSource, <<"dataType">> := #{<<"type">> := Typea} = DataType} -> + #{<<"name">> := Name, <<"identifier">> := Identifier, <<"dataForm">> := #{<<"protocol">> := Protocol}, <<"dataType">> := #{<<"type">> := Typea} = DataType} -> + DataSource = maps:get(<<"dataSource">>, X, #{}), Time = maps:get(<<"createdat">>, Result, dgiot_datetime:now_secs()), NewTime = dgiot_tdengine_field:get_time(dgiot_utils:to_binary(Time), <<"111">>), Devicetype = diff --git a/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl b/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl index 9ac3e9d1..16adf675 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_tdengine.erl @@ -112,6 +112,8 @@ get_device(Channel, ProductId, DeviceId, _DevAddr, Query) -> end. %% SELECT max(day_electricity) '时间' ,max(charge_current) '日期' FROM _2d26a94cf8._c5e1093e30 WHERE createdat >= now - 1h INTERVAL(1h) limit 10; +%% SELECT spread(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h); +%% SELECT last(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h); get_history_data(Channel, TableName, Query) -> dgiot_tdengine:transaction(Channel, fun(Context) -> diff --git a/apps/dgiot_dlink/src/mock/dgiot_mqtt_channel.erl b/apps/dgiot_dlink/src/mock/dgiot_mqtt_channel.erl index d94afce1..5e23c4b2 100644 --- a/apps/dgiot_dlink/src/mock/dgiot_mqtt_channel.erl +++ b/apps/dgiot_dlink/src/mock/dgiot_mqtt_channel.erl @@ -92,7 +92,7 @@ start(ChannelId, ChannelArgs) -> %% 通道初始化 init(?TYPE, ChannelId, #{ - <<"product">> := [{ProductId, _Product} |_], + <<"product">> := [{_ProductId, _Product} |_], <<"auth">> := Auth, <<"count">> := Count}) -> State = #state{ @@ -100,11 +100,11 @@ init(?TYPE, ChannelId, #{ auth = Auth, count = Count }, - io:format("~s ~p ProductId ~p ~n",[?FILE, ?LINE, ProductId]), +%% io:format("~s ~p ProductId ~p ~n",[?FILE, ?LINE, ProductId]), {ok, State}; init(?TYPE, _ChannelId, _Args) -> - io:format("~s ~p _ChannelId ~p ~n",[?FILE, ?LINE, _ChannelId]), +%% io:format("~s ~p _ChannelId ~p ~n",[?FILE, ?LINE, _ChannelId]), {ok, #{}}. handle_init(State) -> @@ -115,7 +115,7 @@ handle_event(_EventId, _Event, State) -> {ok, State}. handle_message(_Message, State) -> - io:format("~s ~p _Message = ~p.~n", [?FILE, ?LINE, _Message]), +%% io:format("~s ~p _Message = ~p.~n", [?FILE, ?LINE, _Message]), {ok, State}. stop(_ChannelType, _ChannelId, _State) -> diff --git a/apps/dgiot_dlink/src/proctol/dgiot_dlink_proctol.erl b/apps/dgiot_dlink/src/proctol/dgiot_dlink_proctol.erl index c4f21b8b..2c450944 100644 --- a/apps/dgiot_dlink/src/proctol/dgiot_dlink_proctol.erl +++ b/apps/dgiot_dlink/src/proctol/dgiot_dlink_proctol.erl @@ -23,7 +23,8 @@ -export([login/3]). -export([ properties_report/3 - ,firmware_report/3 + , firmware_report/3 + , parse_payload/2 ]). @@ -42,8 +43,9 @@ properties_report(ProductId, DevAddr, Payload) -> ok. firmware_report(ProductId, DevAddr, Payload) when is_map(Payload) -> - io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]), - dgiot_task:save_td(ProductId, DevAddr, Payload, #{}); +%% io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]), + NewPload = parse_payload(ProductId, Payload), + dgiot_task:save_td(ProductId, DevAddr, NewPload, #{<<"interval">> => 30}); firmware_report(ProductId, DevAddr, Payload) -> lists:map(fun @@ -52,8 +54,32 @@ firmware_report(ProductId, DevAddr, Payload) -> (_) -> pass end, dgiot_bridge:get_proctol_channel(ProductId)), - io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]), +%% io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]), ok. login(_A, _B, _C) -> - ok. \ No newline at end of file + ok. + +parse_payload(ProductId, Payload) -> + case dgiot_product:lookup_prod(ProductId) of + {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> + lists:foldl(fun(X, Acc) -> + case X of + #{<<"identifier">> := Identifier, + <<"dataType">> := DataType} -> + Das = maps:get(<<"das">>, DataType, []), + maps:fold(fun(PK, PV, Acc1) -> + case lists:member(PK, Das) of + true -> + Acc1#{Identifier => PV}; + _ -> + Acc1#{PK => PV} + end + end, Acc, Payload); + _ -> + Acc + end + end, #{}, Props); + _Error -> + Payload + end. diff --git a/apps/dgiot_dlink/src/proctol/dgiot_mqtt_message.erl b/apps/dgiot_dlink/src/proctol/dgiot_mqtt_message.erl index a684f415..23aa7588 100644 --- a/apps/dgiot_dlink/src/proctol/dgiot_mqtt_message.erl +++ b/apps/dgiot_dlink/src/proctol/dgiot_mqtt_message.erl @@ -32,6 +32,20 @@ on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, pa dgiot_dlink_proctol:properties_report(ProductId, DevAddr, get_payload(Payload)); [ProductId, DevAddr, <<"firmware">>, <<"report">>] -> dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload)); + [DeviceId, <<"properties">>, <<"get">>, <<"request_id=", Request_id/binary>>] -> +%% 属性获取 $dg/thing/{deviceId}/properties/get/request_id={request_id} 用户 平台 + case dgiot_device:lookup(DeviceId) of + {ok, #{<<"devaddr">> := Devaddr, <<"productid">> := ProductId}} -> +%% 属性获取 $dg/device/{productId}/{deviceAddr}/properties/get/response/request_id={request_id} 平台 设备 + RequestTopic = <<"$dg/device/", ProductId/binary, "/", Devaddr/binary, "/properties/get/response/request_id=", Request_id/binary>>, +%% io:format("~s ~p Payload = ~p~n", [?FILE, ?LINE, Payload]), + dgiot_mqtt:publish(DeviceId, RequestTopic, Payload); + _ -> + pass + end; + [ProductId, DevAddr, <<"properties">>, <<"get">>, <<"response">>, _] -> +%% 属性获取 $dg/thing/{productId}/{deviceAddr}/properties/get/response/request_id={request_id} 设备 平台 + dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload)); _ -> pass end, @@ -59,7 +73,7 @@ on_message_publish(Message, _State) -> %% ok. get_payload(Payload) -> - io:format("~s ~p Payload: ~p~n", [?FILE, ?LINE, Payload]), +%% io:format("~s ~p Payload: ~p~n", [?FILE, ?LINE, Payload]), case jsx:is_json(Payload) of true -> jiffy:decode(Payload, [return_maps]); diff --git a/apps/dgiot_evidence/priv/python/drawxnqx.py b/apps/dgiot_evidence/priv/python/drawxnqx.py index f7a2d192..0943e67f 100644 --- a/apps/dgiot_evidence/priv/python/drawxnqx.py +++ b/apps/dgiot_evidence/priv/python/drawxnqx.py @@ -111,15 +111,8 @@ def calculate(data_x, parameters): datay.append(parameters[2] + parameters[1] * x + parameters[0] * x * x) return datay -<<<<<<< HEAD - """完成函数的绘制""" - -======= -"""完成函数的绘制""" - ->>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7 def draw(flow1, head1, headparameters, power1, powerparameters, effect, effectparameters, params): fm = math.ceil(max(flow1)) hm = math.ceil(max(head1)) @@ -129,68 +122,27 @@ def draw(flow1, head1, headparameters, power1, powerparameters, effect, effectpa nm = math.ceil(max(effect)) nmin = math.floor(min(effect)) -<<<<<<< HEAD - fig = plt.figure(1) -======= fig = plt.figure(figsize=(9, 5)) ->>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7 host = HostAxes(fig, [0.15, 0.1, 0.65, 0.8]) par1 = ParasiteAxes(host, sharex=host) par2 = ParasiteAxes(host, sharex=host) host.parasites.append(par1) host.parasites.append(par2) -<<<<<<< HEAD - host.set_ylabel('扬程') - host.set_xlabel('流量') - host.axis['right'].set_visible(False) - par1.axis['right'].set_visible(True) - par1.set_ylabel('功率') -======= host.set_ylabel('效率(E)(%)', color="blue") host.set_xlabel('流量(Q)(m3/h)') host.axis['right'].set_visible(False) par1.axis['right'].set_visible(True) par1.set_ylabel('功率(P)(kW)', color="red") ->>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7 par1.axis['right'].major_ticklabels.set_visible(True) par1.axis['right'].label.set_visible(True) -<<<<<<< HEAD - par2.set_ylabel('扬程') -======= par2.set_ylabel('效率(E)(%)', color="blue") ->>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7 offset = (60, 0) new_axisline = par2._grid_helper.new_fixed_axis par2.axis['right2'] = new_axisline(loc='right', axes=par2, offset=offset) fig.add_axes(host) host.set_xlim(0, fm + 1) -<<<<<<< HEAD - host.set_ylim(0, hm + 5) - - host.set_xlabel('流量') - host.set_ylabel('功率') - host.set_ylabel('效率') - x = np.linspace(0, fm, 500) - y = headparameters[0] * x ** 2 + headparameters[1] * x + headparameters[2] - p1, = host.plot(x, y, label="HQ拟合曲线", color="black") - host.scatter(flow1, head1, label="HQ离散数据") - x1 = np.linspace(0, fm, 500) - y1 = powerparameters[0] * x ** 2 + powerparameters[1] * x + powerparameters[2] - p2, = par1.plot(x, y1, label="PQ拟合曲线", color="red") - par1.scatter(flow1, power1, label="PQ离散数据") - x2 = np.linspace(0, fm, 500) - y2 = effectparameters[0] * x ** 2 + effectparameters[1] * x + effectparameters[2] - p3, = par2.plot(x, y2, label="EQ拟合曲线", color="blue") - par2.scatter(flow1, effect, label="EQ离散数据") - par1.set_ylim(0, pm * 2) - par2.set_ylim(0, nm + 5) - host.legend() - par2.axis['right2'].major_ticklabels.set_color(p3.get_color()) # 刻度值颜色 - par2.axis['right2'].set_axisline_style('-|>', size=1.5) # 轴的形状色 - -======= # plt.xticks(range(0, fm + 1, 1)) host.set_ylim(0, hm + 5) @@ -236,30 +188,20 @@ def draw(flow1, head1, headparameters, power1, powerparameters, effect, effectpa ("流量" + str(fppoints[0][0]) + " m3/h", "功率" + str(fppoints[0][1]) + " kW"), ha='center', color='r') ->>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7 # 解决使用matplotliblib画图的时候出现中文或者是负号无法显示的情况 mpl.rcParams['font.sans-serif'] = ['SimHei'] mpl.rcParams['axes.unicode_minus'] = False plt.title("性能曲线拟合数据") -<<<<<<< HEAD - plt.legend(loc="best") - # 获取当前时间 - # localtime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) - -======= plt.legend(loc=9, bbox_to_anchor=(-0.142, 1.1), borderaxespad=0., fontsize=8) # 获取当前时间 # localtime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) ->>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7 filename = params['path'] + params['name'] # print(filename) plt.savefig(filename) # plt.show() return (filename) -<<<<<<< HEAD -======= def find_close(arr, e): low = 0 high = len(arr) - 1 @@ -279,7 +221,6 @@ def find_close(arr, e): idx += 1 return arr[idx] ->>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7 def main(argv): params = json.loads(base64.b64decode(argv).decode("utf-8")) diff --git a/apps/dgiot_evidence/src/handler/dgiot_evidence_handler.erl b/apps/dgiot_evidence/src/handler/dgiot_evidence_handler.erl index cbd415f7..aca063d3 100644 --- a/apps/dgiot_evidence/src/handler/dgiot_evidence_handler.erl +++ b/apps/dgiot_evidence/src/handler/dgiot_evidence_handler.erl @@ -992,7 +992,8 @@ python_drawxnqx(TaskId, NewData) -> _ -> 0 end, - Path = code:priv_dir(?MODULE), + {file, Here} = code:is_loaded(?MODULE), + Path = dgiot_httpc:url_join([filename:dirname(filename:dirname(Here)), "/priv"]), Python3path = Path ++ "/python/drawxnqx.py ", Filepath = application:get_env(dgiot_evidence, gofastdfs_path, <<"/data/dgiot/go_fastdfs/files/dgiot_file/pump_python/">>), PythonBody = #{<<"name">> => <>, <<"data">> => NewData, <<"path">> => Filepath, <<"dgiot_testing_equipment_flowSet">> => Dgiot_testing_equipment_flowSet}, diff --git a/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl b/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl index 8c0f3360..1324c98f 100644 --- a/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl +++ b/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl @@ -56,7 +56,7 @@ order => 2, type => integer, required => true, - default => 8080, + default => 502, title => #{ zh => <<"服务器端口"/utf8>> }, diff --git a/apps/dgiot_modbus/src/modbus/modbus_tcp.erl b/apps/dgiot_modbus/src/modbus/modbus_tcp.erl index e7fb5c38..cf3b5d43 100644 --- a/apps/dgiot_modbus/src/modbus/modbus_tcp.erl +++ b/apps/dgiot_modbus/src/modbus/modbus_tcp.erl @@ -272,13 +272,14 @@ set_params(Payload, _ProductId, _DevAddr) -> parse_frame(<<_TransactionId:16, _ProtocolId:16, _Size1:16, _Slaveid:8, _FunCode:8, DataLen:8, Data:DataLen/bytes, _/bytes>>) -> Data. +%% 00 03 64 5c parse_frame(FileName, Data) -> AtomName = dgiot_utils:to_atom(FileName), - Things = ets:match(AtomName, {'_', ['_', '_', '_', '_', '$1', '_', '_', '_', '$2', '_', '_', '_', '_', '_', '_', '$3' | '_']}), + Things = ets:match(AtomName, {'$1', ['_', '_', '_', '_', '$2', '_', '_', '_', '$3', '_', '_', '_', '_', '_', '_', '$4' | '_']}), AllData = - lists:foldl(fun([Devaddr, Address, Originaltype | _], Acc) -> + lists:foldl(fun([Number, Devaddr, Address, Originaltype | _], Acc) -> ProductId = dgiot_data:get(AtomName, {addr, Address}), - IntOffset = dgiot_utils:to_int(Address), + IntOffset = dgiot_utils:to_int(Number) - 1, Thing = #{ <<"identifier">> => Address, <<"dataSource">> => #{ diff --git a/apps/dgiot_opc/src/handler/dgiot_opc_handler.erl b/apps/dgiot_opc/src/handler/dgiot_opc_handler.erl index 1a2db4c8..bee2e4b3 100644 --- a/apps/dgiot_opc/src/handler/dgiot_opc_handler.erl +++ b/apps/dgiot_opc/src/handler/dgiot_opc_handler.erl @@ -51,23 +51,23 @@ handle(OperationID, Args, Context, Req) -> Headers = #{}, case catch do_request(OperationID, Args, Context, Req) of {ErrType, Reason} when ErrType == 'EXIT'; ErrType == error -> - ?LOG(info, "do request: ~p, ~p, ~p~n", [OperationID, Args, Reason]), + ?LOG(debug, "do request: ~p, ~p, ~p~n", [OperationID, Args, Reason]), Err = case is_binary(Reason) of true -> Reason; false -> dgiot_utils:format("~p", [Reason]) end, {500, Headers, #{<<"error">> => Err}}; ok -> - ?LOG(info, "do request: ~p, ~p ->ok ~n", [OperationID, Args]), + ?LOG(debug, "do request: ~p, ~p ->ok ~n", [OperationID, Args]), {200, Headers, #{}, Req}; {ok, Res} -> - ?LOG(info, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), + ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), {200, Headers, Res, Req}; {Status, Res} -> - ?LOG(info, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), + ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), {Status, Headers, Res, Req}; {Status, NewHeaders, Res} -> - ?LOG(info, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), + ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), {Status, maps:merge(Headers, NewHeaders), Res, Req} end. diff --git a/apps/dgiot_task/src/dgiot_task.erl b/apps/dgiot_task/src/dgiot_task.erl index 04060d95..b749980d 100644 --- a/apps/dgiot_task/src/dgiot_task.erl +++ b/apps/dgiot_task/src/dgiot_task.erl @@ -21,7 +21,7 @@ -export([start/1, start/2, send/3, get_pnque_len/1, save_pnque/4, get_pnque/1, del_pnque/1, save_td/4, merge_cache_data/3, save_cache_data/2]). -export([get_control/3, get_collection/4, get_calculated/2, get_instruct/2, string2value/2, string2value/3]). - +-export([save_td_no_match/4]). start(ChannelId) -> lists:map(fun(Y) -> case Y of @@ -45,7 +45,7 @@ send(ProductId, DevAddr, Payload) -> end. %%获取计算值,必须返回物模型里面的数据表示,不能用寄存器地址 -get_calculated(ProductId, Ack) -> +get_calculated(ProductId, Calculated) -> case dgiot_product:lookup_prod(ProductId) of {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> lists:foldl(fun(X, Acc) -> @@ -72,13 +72,13 @@ get_calculated(ProductId, Ack) -> Acc end end - end, Ack, Props); + end, Calculated, Props); _Error -> - Ack + Calculated end. %% 主动上报 dis为[] -get_collection(ProductId, [], Payload, Ack) -> +get_collection(ProductId, [], Payload) -> case dgiot_product:lookup_prod(ProductId) of {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> lists:foldl(fun(X, Acc2) -> @@ -87,8 +87,7 @@ get_collection(ProductId, [], Payload, Ack) -> Acc2; _ -> case X of - #{<<"isstorage">> := true, - <<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm, + #{<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm, <<"dataType">> := DataType, <<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> -> dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, Payload, Acc2); @@ -96,13 +95,13 @@ get_collection(ProductId, [], Payload, Ack) -> Acc2 end end - end, Ack, Props); + end, Payload, Props); _Error -> - Ack + Payload end; %%转换设备上报值,必须返回物模型里面的数据表示,不能用寄存器地址 -get_collection(ProductId, Dis, Payload, Ack) -> +get_collection(ProductId, Dis, Payload) -> case dgiot_product:lookup_prod(ProductId) of {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> lists:foldl(fun(Identifier, Acc1) -> @@ -112,8 +111,7 @@ get_collection(ProductId, Dis, Payload, Ack) -> Acc2; _ -> case X of - #{<<"isstorage">> := true, - <<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm, + #{<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm, <<"dataType">> := DataType, <<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> -> dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, Payload, Acc2); @@ -122,9 +120,9 @@ get_collection(ProductId, Dis, Payload, Ack) -> end end end, Acc1, Props) - end, Ack, Dis); + end, Payload, Dis); _Error -> - Ack + Payload end. %% 获取控制值 @@ -138,6 +136,32 @@ get_control(Round, Data, Control) -> dgiot_task:string2value(Str1, <<"type">>) end. +%%获取存储值 +get_storage(ProductId, Calculated) -> + case dgiot_product:lookup_prod(ProductId) of + {ok, #{<<"thing">> := #{<<"properties">> := Props}}} -> + lists:foldl(fun(X, Acc) -> + case Acc of + error -> + Acc; + _ -> + case X of + #{<<"isstorage">> := true, <<"identifier">> := Identifier} -> + case maps:find(Identifier, Calculated) of + {ok, Value} -> + Acc#{Identifier => Value}; + _ -> + Acc + end; + _ -> + Acc + end + end + end, #{}, Props); + _Error -> + Calculated + end. + get_instruct(ProductId, Round) -> case dgiot_product:lookup_prod(ProductId) of {ok, #{<<"thing">> := #{<<"properties">> := Props}}} when length(Props) > 0 -> @@ -288,12 +312,16 @@ save_td(ProductId, DevAddr, Ack, AppData) -> 0 -> #{}; _ -> - NewAck = dgiot_task:get_collection(ProductId, [], Ack, Ack), - NewData = dgiot_task:get_calculated(ProductId, NewAck), +%% 计算上报值 + Collection = dgiot_task:get_collection(ProductId, [], Ack), +%% 计算计算值 + Calculated = dgiot_task:get_calculated(ProductId, Collection), +%% 过滤存储值 + Storage = dgiot_task:get_storage(ProductId, Calculated), Keys = dgiot_product:get_keys(ProductId), DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr), Interval = maps:get(<<"interval">>, AppData, 3), - AllData = merge_cache_data(DeviceId, NewData, Interval), + AllData = merge_cache_data(DeviceId, Storage, Interval), AllDataKey = maps:keys(AllData), case Keys -- AllDataKey of List when length(List) == 0 andalso length(AllDataKey) =/= 0 -> @@ -337,3 +365,29 @@ merge_cache_data(DeviceId, NewData, Interval) -> NewData end end. + + +save_td_no_match(ProductId, DevAddr, Ack, AppData) -> + case length(maps:to_list(Ack)) of + 0 -> + #{}; + _ -> +%% 计算上报值 + Collection = dgiot_task:get_collection(ProductId, [], Ack), +%% 计算计算值 + Calculated = dgiot_task:get_calculated(ProductId, Collection), +%% 过滤存储值 + Storage = dgiot_task:get_storage(ProductId, Calculated), + DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr), + Interval = maps:get(<<"interval">>, AppData, 3), + AllData = merge_cache_data(DeviceId, Storage, Interval), + ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>), + dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}), + dgiot_tdengine_adapter:save(ProductId, DevAddr, AllData), + Channel = dgiot_product:get_taskchannel(ProductId), + dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(AllData))]), + dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1), + NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>, + dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(AllData)), + AllData + end. diff --git a/apps/dgiot_task/src/dgiot_task_data.erl b/apps/dgiot_task/src/dgiot_task_data.erl index e706b262..a295d42c 100644 --- a/apps/dgiot_task/src/dgiot_task_data.erl +++ b/apps/dgiot_task/src/dgiot_task_data.erl @@ -18,7 +18,7 @@ -include("dgiot_task.hrl"). -include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot_bridge/include/dgiot_bridge.hrl"). --export([get_userdata/6, get_datasource/4, get_ack/4]). +-export([get_userdata/6, get_datasource/4]). get_userdata(ProductId, Identifier, _DataForm, #{<<"type">> := <<"geopoint">>}, Payload, Acc) -> case maps:find(Identifier, Payload) of @@ -46,18 +46,6 @@ get_userdata(_ProductId, Identifier, #{<<"collection">> := Collection}, #{<<"typ Acc end. -get_ack(ProductId, Payload, Dis, Ack) -> - NewPayload = - maps:fold(fun(K, V, Acc) -> - case dgiot_data:get({protocol, K, ProductId}) of - not_find -> - Acc#{K => V}; - Identifier -> - Acc#{Identifier => V} - end - end, #{}, Payload), - dgiot_task:get_collection(ProductId, Dis, NewPayload, maps:merge(Ack, NewPayload)). - get_datasource(Protocol, AccessMode, Data, DataSource) -> case catch dgiot_hook:run_hook({?DGIOT_DATASOURCE, Protocol}, DataSource#{<<"accessMode">> => AccessMode, <<"data">> => Data}) of {ok, [Rtn | _]} -> diff --git a/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl b/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl index b6887cfe..d4d5f2f4 100644 --- a/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl +++ b/apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl @@ -52,33 +52,39 @@ add_field(Type, Database, TableName, LowerIdentifier) -> %% 10 NCHAR 自定义 记录包含多字节字符在内的字符串,如中文字符。每个 nchar 字符占用 4 bytes 的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 \’。nchar 使用时须指定字符串大小,类型为 nchar(10) 的列表示此列的字符串最多存储 10 个 nchar 字符,会固定占用 40 bytes 的空间。如果用户字符串长度超出声明长度,将会报错。 get_field(#{<<"isstorage">> := false}) -> pass; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"int">>}}) -> +get_field(#{<<"isstorage">> := true} = Property) -> + get_field_(Property); +get_field(#{<<"isshow">> := true} = Property) -> + get_field_(Property); +get_field(_) -> + pass. +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"int">>}}) -> {Field, #{<<"type">> => <<"INT">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"image">>}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"image">>}}) -> {Field, #{<<"type">> => <<"BIGINT">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"long">>}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"long">>}}) -> {Field, #{<<"type">> => <<"BIGINT">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"float">>}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"float">>}}) -> {Field, #{<<"type">> => <<"FLOAT">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"date">>}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"date">>}}) -> {Field, #{<<"type">> => <<"TIMESTAMP">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"bool">>}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"bool">>}}) -> {Field, #{<<"type">> => <<"BOOL">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"double">>}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"double">>}}) -> {Field, #{<<"type">> => <<"DOUBLE">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"string">>} = Spec}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"string">>} = Spec}) -> Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 10), 200)), {Field, #{<<"type">> => <<"NCHAR(", Size/binary, ")">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"text">>} = Spec}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"text">>} = Spec}) -> Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)), {Field, #{<<"type">> => <<"NCHAR(", Size/binary, ")">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"geopoint">>} = Spec}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"geopoint">>} = Spec}) -> Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)), {Field, #{<<"type">> => <<"NCHAR(", Size/binary, ")">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"enum">>, <<"specs">> := _Specs}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"enum">>, <<"specs">> := _Specs}}) -> %% Size = integer_to_binary(maps:size(Specs)), {Field, #{<<"type">> => <<"INT">>}}; -get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"struct">>, <<"specs">> := SubFields}}) -> +get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"struct">>, <<"specs">> := SubFields}}) -> [get_field(SubField#{<<"identifier">> => ?Struct(Field, Field1)}) || #{<<"identifier">> := Field1} = SubField <- SubFields].