mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-02 04:08:54 +08:00
fix: taskk 处理存储值, mqtt 添加opc处理
This commit is contained in:
parent
574d37cd3e
commit
9273ac3718
@ -83,7 +83,8 @@ handle_info({deliver, _, Msg}, TCPState) ->
|
||||
handle_info(#{<<"cmd">> := Cmd, <<"data">> := Data, <<"productId">> := ProductId}, TCPState) ->
|
||||
do_cmd(ProductId, Cmd, Data, TCPState);
|
||||
|
||||
handle_info({tcp, Buff}, #tcp{state = #state{productIds = ProductIds}} = TCPState) ->
|
||||
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, productIds = ProductIds}} = TCPState) ->
|
||||
dgiot_bridge:send_log(ChannelId, "Buff ~p", [Buff]),
|
||||
lists:map(fun(ProductId) ->
|
||||
do_cmd(ProductId, tcp, Buff, TCPState)
|
||||
end, ProductIds),
|
||||
|
@ -44,7 +44,8 @@ get_card(ProductId, Results, DeviceId, Args) ->
|
||||
Props = dgiot_product:get_props(ProductId, Keys),
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
case X of
|
||||
#{<<"name">> := Name, <<"identifier">> := Identifier, <<"dataForm">> := #{<<"protocol">> := Protocol}, <<"dataSource">> := DataSource, <<"dataType">> := #{<<"type">> := Typea} = DataType} ->
|
||||
#{<<"name">> := Name, <<"identifier">> := Identifier, <<"dataForm">> := #{<<"protocol">> := Protocol}, <<"dataType">> := #{<<"type">> := Typea} = DataType} ->
|
||||
DataSource = maps:get(<<"dataSource">>, X, #{}),
|
||||
Time = maps:get(<<"createdat">>, Result, dgiot_datetime:now_secs()),
|
||||
NewTime = dgiot_tdengine_field:get_time(dgiot_utils:to_binary(Time), <<"111">>),
|
||||
Devicetype =
|
||||
|
@ -112,6 +112,8 @@ get_device(Channel, ProductId, DeviceId, _DevAddr, Query) ->
|
||||
end.
|
||||
|
||||
%% SELECT max(day_electricity) '时间' ,max(charge_current) '日期' FROM _2d26a94cf8._c5e1093e30 WHERE createdat >= now - 1h INTERVAL(1h) limit 10;
|
||||
%% SELECT spread(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h);
|
||||
%% SELECT last(cumulativescale) FROM _797197ad06 WHERE createdat >= now - 1Y INTERVAL(1h);
|
||||
get_history_data(Channel, TableName, Query) ->
|
||||
dgiot_tdengine:transaction(Channel,
|
||||
fun(Context) ->
|
||||
|
@ -92,7 +92,7 @@ start(ChannelId, ChannelArgs) ->
|
||||
|
||||
%% 通道初始化
|
||||
init(?TYPE, ChannelId, #{
|
||||
<<"product">> := [{ProductId, _Product} |_],
|
||||
<<"product">> := [{_ProductId, _Product} |_],
|
||||
<<"auth">> := Auth,
|
||||
<<"count">> := Count}) ->
|
||||
State = #state{
|
||||
@ -100,11 +100,11 @@ init(?TYPE, ChannelId, #{
|
||||
auth = Auth,
|
||||
count = Count
|
||||
},
|
||||
io:format("~s ~p ProductId ~p ~n",[?FILE, ?LINE, ProductId]),
|
||||
%% io:format("~s ~p ProductId ~p ~n",[?FILE, ?LINE, ProductId]),
|
||||
{ok, State};
|
||||
|
||||
init(?TYPE, _ChannelId, _Args) ->
|
||||
io:format("~s ~p _ChannelId ~p ~n",[?FILE, ?LINE, _ChannelId]),
|
||||
%% io:format("~s ~p _ChannelId ~p ~n",[?FILE, ?LINE, _ChannelId]),
|
||||
{ok, #{}}.
|
||||
|
||||
handle_init(State) ->
|
||||
@ -115,7 +115,7 @@ handle_event(_EventId, _Event, State) ->
|
||||
{ok, State}.
|
||||
|
||||
handle_message(_Message, State) ->
|
||||
io:format("~s ~p _Message = ~p.~n", [?FILE, ?LINE, _Message]),
|
||||
%% io:format("~s ~p _Message = ~p.~n", [?FILE, ?LINE, _Message]),
|
||||
{ok, State}.
|
||||
|
||||
stop(_ChannelType, _ChannelId, _State) ->
|
||||
|
@ -23,7 +23,8 @@
|
||||
-export([login/3]).
|
||||
-export([
|
||||
properties_report/3
|
||||
,firmware_report/3
|
||||
, firmware_report/3
|
||||
, parse_payload/2
|
||||
]).
|
||||
|
||||
|
||||
@ -42,8 +43,9 @@ properties_report(ProductId, DevAddr, Payload) ->
|
||||
ok.
|
||||
|
||||
firmware_report(ProductId, DevAddr, Payload) when is_map(Payload) ->
|
||||
io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]),
|
||||
dgiot_task:save_td(ProductId, DevAddr, Payload, #{});
|
||||
%% io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]),
|
||||
NewPload = parse_payload(ProductId, Payload),
|
||||
dgiot_task:save_td(ProductId, DevAddr, NewPload, #{<<"interval">> => 30});
|
||||
|
||||
firmware_report(ProductId, DevAddr, Payload) ->
|
||||
lists:map(fun
|
||||
@ -52,8 +54,32 @@ firmware_report(ProductId, DevAddr, Payload) ->
|
||||
(_) ->
|
||||
pass
|
||||
end, dgiot_bridge:get_proctol_channel(ProductId)),
|
||||
io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]),
|
||||
%% io:format("~s ~p ProductId ~p, DevAddr ~p, Payload: ~p ~n", [?FILE, ?LINE, ProductId, DevAddr, Payload]),
|
||||
ok.
|
||||
|
||||
login(_A, _B, _C) ->
|
||||
ok.
|
||||
ok.
|
||||
|
||||
parse_payload(ProductId, Payload) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
case X of
|
||||
#{<<"identifier">> := Identifier,
|
||||
<<"dataType">> := DataType} ->
|
||||
Das = maps:get(<<"das">>, DataType, []),
|
||||
maps:fold(fun(PK, PV, Acc1) ->
|
||||
case lists:member(PK, Das) of
|
||||
true ->
|
||||
Acc1#{Identifier => PV};
|
||||
_ ->
|
||||
Acc1#{PK => PV}
|
||||
end
|
||||
end, Acc, Payload);
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, #{}, Props);
|
||||
_Error ->
|
||||
Payload
|
||||
end.
|
||||
|
@ -32,6 +32,20 @@ on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, pa
|
||||
dgiot_dlink_proctol:properties_report(ProductId, DevAddr, get_payload(Payload));
|
||||
[ProductId, DevAddr, <<"firmware">>, <<"report">>] ->
|
||||
dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload));
|
||||
[DeviceId, <<"properties">>, <<"get">>, <<"request_id=", Request_id/binary>>] ->
|
||||
%% 属性获取 $dg/thing/{deviceId}/properties/get/request_id={request_id} 用户 平台
|
||||
case dgiot_device:lookup(DeviceId) of
|
||||
{ok, #{<<"devaddr">> := Devaddr, <<"productid">> := ProductId}} ->
|
||||
%% 属性获取 $dg/device/{productId}/{deviceAddr}/properties/get/response/request_id={request_id} 平台 设备
|
||||
RequestTopic = <<"$dg/device/", ProductId/binary, "/", Devaddr/binary, "/properties/get/response/request_id=", Request_id/binary>>,
|
||||
%% io:format("~s ~p Payload = ~p~n", [?FILE, ?LINE, Payload]),
|
||||
dgiot_mqtt:publish(DeviceId, RequestTopic, Payload);
|
||||
_ ->
|
||||
pass
|
||||
end;
|
||||
[ProductId, DevAddr, <<"properties">>, <<"get">>, <<"response">>, _] ->
|
||||
%% 属性获取 $dg/thing/{productId}/{deviceAddr}/properties/get/response/request_id={request_id} 设备 平台
|
||||
dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload));
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
@ -59,7 +73,7 @@ on_message_publish(Message, _State) ->
|
||||
%% ok.
|
||||
|
||||
get_payload(Payload) ->
|
||||
io:format("~s ~p Payload: ~p~n", [?FILE, ?LINE, Payload]),
|
||||
%% io:format("~s ~p Payload: ~p~n", [?FILE, ?LINE, Payload]),
|
||||
case jsx:is_json(Payload) of
|
||||
true ->
|
||||
jiffy:decode(Payload, [return_maps]);
|
||||
|
@ -111,15 +111,8 @@ def calculate(data_x, parameters):
|
||||
datay.append(parameters[2] + parameters[1] * x + parameters[0] * x * x)
|
||||
return datay
|
||||
|
||||
<<<<<<< HEAD
|
||||
|
||||
"""完成函数的绘制"""
|
||||
|
||||
|
||||
=======
|
||||
"""完成函数的绘制"""
|
||||
|
||||
>>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7
|
||||
def draw(flow1, head1, headparameters, power1, powerparameters, effect, effectparameters, params):
|
||||
fm = math.ceil(max(flow1))
|
||||
hm = math.ceil(max(head1))
|
||||
@ -129,68 +122,27 @@ def draw(flow1, head1, headparameters, power1, powerparameters, effect, effectpa
|
||||
nm = math.ceil(max(effect))
|
||||
nmin = math.floor(min(effect))
|
||||
|
||||
<<<<<<< HEAD
|
||||
fig = plt.figure(1)
|
||||
=======
|
||||
fig = plt.figure(figsize=(9, 5))
|
||||
>>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7
|
||||
host = HostAxes(fig, [0.15, 0.1, 0.65, 0.8])
|
||||
par1 = ParasiteAxes(host, sharex=host)
|
||||
par2 = ParasiteAxes(host, sharex=host)
|
||||
host.parasites.append(par1)
|
||||
host.parasites.append(par2)
|
||||
<<<<<<< HEAD
|
||||
host.set_ylabel('扬程')
|
||||
host.set_xlabel('流量')
|
||||
host.axis['right'].set_visible(False)
|
||||
par1.axis['right'].set_visible(True)
|
||||
par1.set_ylabel('功率')
|
||||
=======
|
||||
host.set_ylabel('效率(E)(%)', color="blue")
|
||||
host.set_xlabel('流量(Q)(m3/h)')
|
||||
host.axis['right'].set_visible(False)
|
||||
par1.axis['right'].set_visible(True)
|
||||
par1.set_ylabel('功率(P)(kW)', color="red")
|
||||
>>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7
|
||||
|
||||
par1.axis['right'].major_ticklabels.set_visible(True)
|
||||
par1.axis['right'].label.set_visible(True)
|
||||
|
||||
<<<<<<< HEAD
|
||||
par2.set_ylabel('扬程')
|
||||
=======
|
||||
par2.set_ylabel('效率(E)(%)', color="blue")
|
||||
>>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7
|
||||
offset = (60, 0)
|
||||
new_axisline = par2._grid_helper.new_fixed_axis
|
||||
par2.axis['right2'] = new_axisline(loc='right', axes=par2, offset=offset)
|
||||
fig.add_axes(host)
|
||||
host.set_xlim(0, fm + 1)
|
||||
<<<<<<< HEAD
|
||||
host.set_ylim(0, hm + 5)
|
||||
|
||||
host.set_xlabel('流量')
|
||||
host.set_ylabel('功率')
|
||||
host.set_ylabel('效率')
|
||||
x = np.linspace(0, fm, 500)
|
||||
y = headparameters[0] * x ** 2 + headparameters[1] * x + headparameters[2]
|
||||
p1, = host.plot(x, y, label="HQ拟合曲线", color="black")
|
||||
host.scatter(flow1, head1, label="HQ离散数据")
|
||||
x1 = np.linspace(0, fm, 500)
|
||||
y1 = powerparameters[0] * x ** 2 + powerparameters[1] * x + powerparameters[2]
|
||||
p2, = par1.plot(x, y1, label="PQ拟合曲线", color="red")
|
||||
par1.scatter(flow1, power1, label="PQ离散数据")
|
||||
x2 = np.linspace(0, fm, 500)
|
||||
y2 = effectparameters[0] * x ** 2 + effectparameters[1] * x + effectparameters[2]
|
||||
p3, = par2.plot(x, y2, label="EQ拟合曲线", color="blue")
|
||||
par2.scatter(flow1, effect, label="EQ离散数据")
|
||||
par1.set_ylim(0, pm * 2)
|
||||
par2.set_ylim(0, nm + 5)
|
||||
host.legend()
|
||||
par2.axis['right2'].major_ticklabels.set_color(p3.get_color()) # 刻度值颜色
|
||||
par2.axis['right2'].set_axisline_style('-|>', size=1.5) # 轴的形状色
|
||||
|
||||
=======
|
||||
# plt.xticks(range(0, fm + 1, 1))
|
||||
host.set_ylim(0, hm + 5)
|
||||
|
||||
@ -236,30 +188,20 @@ def draw(flow1, head1, headparameters, power1, powerparameters, effect, effectpa
|
||||
("流量" + str(fppoints[0][0]) + " m3/h", "功率" + str(fppoints[0][1]) + " kW"),
|
||||
ha='center', color='r')
|
||||
|
||||
>>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7
|
||||
# 解决使用matplotliblib画图的时候出现中文或者是负号无法显示的情况
|
||||
mpl.rcParams['font.sans-serif'] = ['SimHei']
|
||||
mpl.rcParams['axes.unicode_minus'] = False
|
||||
|
||||
plt.title("性能曲线拟合数据")
|
||||
<<<<<<< HEAD
|
||||
plt.legend(loc="best")
|
||||
# 获取当前时间
|
||||
# localtime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
|
||||
|
||||
=======
|
||||
plt.legend(loc=9, bbox_to_anchor=(-0.142, 1.1), borderaxespad=0., fontsize=8)
|
||||
# 获取当前时间
|
||||
# localtime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
|
||||
>>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7
|
||||
filename = params['path'] + params['name']
|
||||
# print(filename)
|
||||
plt.savefig(filename)
|
||||
# plt.show()
|
||||
return (filename)
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
def find_close(arr, e):
|
||||
low = 0
|
||||
high = len(arr) - 1
|
||||
@ -279,7 +221,6 @@ def find_close(arr, e):
|
||||
idx += 1
|
||||
|
||||
return arr[idx]
|
||||
>>>>>>> f5b55f5735c94ec9573668b5d56b78a8eef2e8f7
|
||||
|
||||
def main(argv):
|
||||
params = json.loads(base64.b64decode(argv).decode("utf-8"))
|
||||
|
@ -992,7 +992,8 @@ python_drawxnqx(TaskId, NewData) ->
|
||||
_ ->
|
||||
0
|
||||
end,
|
||||
Path = code:priv_dir(?MODULE),
|
||||
{file, Here} = code:is_loaded(?MODULE),
|
||||
Path = dgiot_httpc:url_join([filename:dirname(filename:dirname(Here)), "/priv"]),
|
||||
Python3path = Path ++ "/python/drawxnqx.py ",
|
||||
Filepath = application:get_env(dgiot_evidence, gofastdfs_path, <<"/data/dgiot/go_fastdfs/files/dgiot_file/pump_python/">>),
|
||||
PythonBody = #{<<"name">> => <<TaskId/binary, ".png">>, <<"data">> => NewData, <<"path">> => Filepath, <<"dgiot_testing_equipment_flowSet">> => Dgiot_testing_equipment_flowSet},
|
||||
|
@ -56,7 +56,7 @@
|
||||
order => 2,
|
||||
type => integer,
|
||||
required => true,
|
||||
default => 8080,
|
||||
default => 502,
|
||||
title => #{
|
||||
zh => <<"服务器端口"/utf8>>
|
||||
},
|
||||
|
@ -272,13 +272,14 @@ set_params(Payload, _ProductId, _DevAddr) ->
|
||||
parse_frame(<<_TransactionId:16, _ProtocolId:16, _Size1:16, _Slaveid:8, _FunCode:8, DataLen:8, Data:DataLen/bytes, _/bytes>>) ->
|
||||
Data.
|
||||
|
||||
%% 00 03 64 5c
|
||||
parse_frame(FileName, Data) ->
|
||||
AtomName = dgiot_utils:to_atom(FileName),
|
||||
Things = ets:match(AtomName, {'_', ['_', '_', '_', '_', '$1', '_', '_', '_', '$2', '_', '_', '_', '_', '_', '_', '$3' | '_']}),
|
||||
Things = ets:match(AtomName, {'$1', ['_', '_', '_', '_', '$2', '_', '_', '_', '$3', '_', '_', '_', '_', '_', '_', '$4' | '_']}),
|
||||
AllData =
|
||||
lists:foldl(fun([Devaddr, Address, Originaltype | _], Acc) ->
|
||||
lists:foldl(fun([Number, Devaddr, Address, Originaltype | _], Acc) ->
|
||||
ProductId = dgiot_data:get(AtomName, {addr, Address}),
|
||||
IntOffset = dgiot_utils:to_int(Address),
|
||||
IntOffset = dgiot_utils:to_int(Number) - 1,
|
||||
Thing = #{
|
||||
<<"identifier">> => Address,
|
||||
<<"dataSource">> => #{
|
||||
|
@ -51,23 +51,23 @@ handle(OperationID, Args, Context, Req) ->
|
||||
Headers = #{},
|
||||
case catch do_request(OperationID, Args, Context, Req) of
|
||||
{ErrType, Reason} when ErrType == 'EXIT'; ErrType == error ->
|
||||
?LOG(info, "do request: ~p, ~p, ~p~n", [OperationID, Args, Reason]),
|
||||
?LOG(debug, "do request: ~p, ~p, ~p~n", [OperationID, Args, Reason]),
|
||||
Err = case is_binary(Reason) of
|
||||
true -> Reason;
|
||||
false -> dgiot_utils:format("~p", [Reason])
|
||||
end,
|
||||
{500, Headers, #{<<"error">> => Err}};
|
||||
ok ->
|
||||
?LOG(info, "do request: ~p, ~p ->ok ~n", [OperationID, Args]),
|
||||
?LOG(debug, "do request: ~p, ~p ->ok ~n", [OperationID, Args]),
|
||||
{200, Headers, #{}, Req};
|
||||
{ok, Res} ->
|
||||
?LOG(info, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
|
||||
?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
|
||||
{200, Headers, Res, Req};
|
||||
{Status, Res} ->
|
||||
?LOG(info, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
|
||||
?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
|
||||
{Status, Headers, Res, Req};
|
||||
{Status, NewHeaders, Res} ->
|
||||
?LOG(info, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
|
||||
?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
|
||||
{Status, maps:merge(Headers, NewHeaders), Res, Req}
|
||||
end.
|
||||
|
||||
|
@ -21,7 +21,7 @@
|
||||
|
||||
-export([start/1, start/2, send/3, get_pnque_len/1, save_pnque/4, get_pnque/1, del_pnque/1, save_td/4, merge_cache_data/3, save_cache_data/2]).
|
||||
-export([get_control/3, get_collection/4, get_calculated/2, get_instruct/2, string2value/2, string2value/3]).
|
||||
|
||||
-export([save_td_no_match/4]).
|
||||
start(ChannelId) ->
|
||||
lists:map(fun(Y) ->
|
||||
case Y of
|
||||
@ -45,7 +45,7 @@ send(ProductId, DevAddr, Payload) ->
|
||||
end.
|
||||
|
||||
%%获取计算值,必须返回物模型里面的数据表示,不能用寄存器地址
|
||||
get_calculated(ProductId, Ack) ->
|
||||
get_calculated(ProductId, Calculated) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
@ -72,13 +72,13 @@ get_calculated(ProductId, Ack) ->
|
||||
Acc
|
||||
end
|
||||
end
|
||||
end, Ack, Props);
|
||||
end, Calculated, Props);
|
||||
_Error ->
|
||||
Ack
|
||||
Calculated
|
||||
end.
|
||||
|
||||
%% 主动上报 dis为[]
|
||||
get_collection(ProductId, [], Payload, Ack) ->
|
||||
get_collection(ProductId, [], Payload) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(X, Acc2) ->
|
||||
@ -87,8 +87,7 @@ get_collection(ProductId, [], Payload, Ack) ->
|
||||
Acc2;
|
||||
_ ->
|
||||
case X of
|
||||
#{<<"isstorage">> := true,
|
||||
<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm,
|
||||
#{<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm,
|
||||
<<"dataType">> := DataType,
|
||||
<<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> ->
|
||||
dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, Payload, Acc2);
|
||||
@ -96,13 +95,13 @@ get_collection(ProductId, [], Payload, Ack) ->
|
||||
Acc2
|
||||
end
|
||||
end
|
||||
end, Ack, Props);
|
||||
end, Payload, Props);
|
||||
_Error ->
|
||||
Ack
|
||||
Payload
|
||||
end;
|
||||
|
||||
%%转换设备上报值,必须返回物模型里面的数据表示,不能用寄存器地址
|
||||
get_collection(ProductId, Dis, Payload, Ack) ->
|
||||
get_collection(ProductId, Dis, Payload) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(Identifier, Acc1) ->
|
||||
@ -112,8 +111,7 @@ get_collection(ProductId, Dis, Payload, Ack) ->
|
||||
Acc2;
|
||||
_ ->
|
||||
case X of
|
||||
#{<<"isstorage">> := true,
|
||||
<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm,
|
||||
#{<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm,
|
||||
<<"dataType">> := DataType,
|
||||
<<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> ->
|
||||
dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, Payload, Acc2);
|
||||
@ -122,9 +120,9 @@ get_collection(ProductId, Dis, Payload, Ack) ->
|
||||
end
|
||||
end
|
||||
end, Acc1, Props)
|
||||
end, Ack, Dis);
|
||||
end, Payload, Dis);
|
||||
_Error ->
|
||||
Ack
|
||||
Payload
|
||||
end.
|
||||
|
||||
%% 获取控制值
|
||||
@ -138,6 +136,32 @@ get_control(Round, Data, Control) ->
|
||||
dgiot_task:string2value(Str1, <<"type">>)
|
||||
end.
|
||||
|
||||
%%获取存储值
|
||||
get_storage(ProductId, Calculated) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
case Acc of
|
||||
error ->
|
||||
Acc;
|
||||
_ ->
|
||||
case X of
|
||||
#{<<"isstorage">> := true, <<"identifier">> := Identifier} ->
|
||||
case maps:find(Identifier, Calculated) of
|
||||
{ok, Value} ->
|
||||
Acc#{Identifier => Value};
|
||||
_ ->
|
||||
Acc
|
||||
end;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end
|
||||
end, #{}, Props);
|
||||
_Error ->
|
||||
Calculated
|
||||
end.
|
||||
|
||||
get_instruct(ProductId, Round) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} when length(Props) > 0 ->
|
||||
@ -288,12 +312,16 @@ save_td(ProductId, DevAddr, Ack, AppData) ->
|
||||
0 ->
|
||||
#{};
|
||||
_ ->
|
||||
NewAck = dgiot_task:get_collection(ProductId, [], Ack, Ack),
|
||||
NewData = dgiot_task:get_calculated(ProductId, NewAck),
|
||||
%% 计算上报值
|
||||
Collection = dgiot_task:get_collection(ProductId, [], Ack),
|
||||
%% 计算计算值
|
||||
Calculated = dgiot_task:get_calculated(ProductId, Collection),
|
||||
%% 过滤存储值
|
||||
Storage = dgiot_task:get_storage(ProductId, Calculated),
|
||||
Keys = dgiot_product:get_keys(ProductId),
|
||||
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
|
||||
Interval = maps:get(<<"interval">>, AppData, 3),
|
||||
AllData = merge_cache_data(DeviceId, NewData, Interval),
|
||||
AllData = merge_cache_data(DeviceId, Storage, Interval),
|
||||
AllDataKey = maps:keys(AllData),
|
||||
case Keys -- AllDataKey of
|
||||
List when length(List) == 0 andalso length(AllDataKey) =/= 0 ->
|
||||
@ -337,3 +365,29 @@ merge_cache_data(DeviceId, NewData, Interval) ->
|
||||
NewData
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
save_td_no_match(ProductId, DevAddr, Ack, AppData) ->
|
||||
case length(maps:to_list(Ack)) of
|
||||
0 ->
|
||||
#{};
|
||||
_ ->
|
||||
%% 计算上报值
|
||||
Collection = dgiot_task:get_collection(ProductId, [], Ack),
|
||||
%% 计算计算值
|
||||
Calculated = dgiot_task:get_calculated(ProductId, Collection),
|
||||
%% 过滤存储值
|
||||
Storage = dgiot_task:get_storage(ProductId, Calculated),
|
||||
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
|
||||
Interval = maps:get(<<"interval">>, AppData, 3),
|
||||
AllData = merge_cache_data(DeviceId, Storage, Interval),
|
||||
ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
|
||||
dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}),
|
||||
dgiot_tdengine_adapter:save(ProductId, DevAddr, AllData),
|
||||
Channel = dgiot_product:get_taskchannel(ProductId),
|
||||
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(AllData))]),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1),
|
||||
NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>,
|
||||
dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(AllData)),
|
||||
AllData
|
||||
end.
|
||||
|
@ -18,7 +18,7 @@
|
||||
-include("dgiot_task.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
|
||||
-export([get_userdata/6, get_datasource/4, get_ack/4]).
|
||||
-export([get_userdata/6, get_datasource/4]).
|
||||
|
||||
get_userdata(ProductId, Identifier, _DataForm, #{<<"type">> := <<"geopoint">>}, Payload, Acc) ->
|
||||
case maps:find(Identifier, Payload) of
|
||||
@ -46,18 +46,6 @@ get_userdata(_ProductId, Identifier, #{<<"collection">> := Collection}, #{<<"typ
|
||||
Acc
|
||||
end.
|
||||
|
||||
get_ack(ProductId, Payload, Dis, Ack) ->
|
||||
NewPayload =
|
||||
maps:fold(fun(K, V, Acc) ->
|
||||
case dgiot_data:get({protocol, K, ProductId}) of
|
||||
not_find ->
|
||||
Acc#{K => V};
|
||||
Identifier ->
|
||||
Acc#{Identifier => V}
|
||||
end
|
||||
end, #{}, Payload),
|
||||
dgiot_task:get_collection(ProductId, Dis, NewPayload, maps:merge(Ack, NewPayload)).
|
||||
|
||||
get_datasource(Protocol, AccessMode, Data, DataSource) ->
|
||||
case catch dgiot_hook:run_hook({?DGIOT_DATASOURCE, Protocol}, DataSource#{<<"accessMode">> => AccessMode, <<"data">> => Data}) of
|
||||
{ok, [Rtn | _]} ->
|
||||
|
@ -52,33 +52,39 @@ add_field(Type, Database, TableName, LowerIdentifier) ->
|
||||
%% 10 NCHAR 自定义 记录包含多字节字符在内的字符串,如中文字符。每个 nchar 字符占用 4 bytes 的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 \’。nchar 使用时须指定字符串大小,类型为 nchar(10) 的列表示此列的字符串最多存储 10 个 nchar 字符,会固定占用 40 bytes 的空间。如果用户字符串长度超出声明长度,将会报错。
|
||||
get_field(#{<<"isstorage">> := false}) ->
|
||||
pass;
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"int">>}}) ->
|
||||
get_field(#{<<"isstorage">> := true} = Property) ->
|
||||
get_field_(Property);
|
||||
get_field(#{<<"isshow">> := true} = Property) ->
|
||||
get_field_(Property);
|
||||
get_field(_) ->
|
||||
pass.
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"int">>}}) ->
|
||||
{Field, #{<<"type">> => <<"INT">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"image">>}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"image">>}}) ->
|
||||
{Field, #{<<"type">> => <<"BIGINT">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"long">>}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"long">>}}) ->
|
||||
{Field, #{<<"type">> => <<"BIGINT">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"float">>}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"float">>}}) ->
|
||||
{Field, #{<<"type">> => <<"FLOAT">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"date">>}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"date">>}}) ->
|
||||
{Field, #{<<"type">> => <<"TIMESTAMP">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"bool">>}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"bool">>}}) ->
|
||||
{Field, #{<<"type">> => <<"BOOL">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"double">>}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"double">>}}) ->
|
||||
{Field, #{<<"type">> => <<"DOUBLE">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"string">>} = Spec}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"string">>} = Spec}) ->
|
||||
Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 10), 200)),
|
||||
{Field, #{<<"type">> => <<"NCHAR(", Size/binary, ")">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"text">>} = Spec}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"text">>} = Spec}) ->
|
||||
Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)),
|
||||
{Field, #{<<"type">> => <<"NCHAR(", Size/binary, ")">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"geopoint">>} = Spec}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"geopoint">>} = Spec}) ->
|
||||
Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)),
|
||||
{Field, #{<<"type">> => <<"NCHAR(", Size/binary, ")">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"enum">>, <<"specs">> := _Specs}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"enum">>, <<"specs">> := _Specs}}) ->
|
||||
%% Size = integer_to_binary(maps:size(Specs)),
|
||||
{Field, #{<<"type">> => <<"INT">>}};
|
||||
get_field(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"struct">>, <<"specs">> := SubFields}}) ->
|
||||
get_field_(#{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> := <<"struct">>, <<"specs">> := SubFields}}) ->
|
||||
[get_field(SubField#{<<"identifier">> => ?Struct(Field, Field1)}) || #{<<"identifier">> := Field1} = SubField <- SubFields].
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user