This commit is contained in:
dawnwinterLiu 2022-05-05 21:23:00 +08:00
parent 72bc987177
commit f69b90f999
9 changed files with 109 additions and 105 deletions

View File

@ -81,14 +81,14 @@ do_task(#{<<"dataType">> := <<"card">>, <<"vuekey">> := <<"device_count">>, <<"t
do_task(#{<<"dataType">> := <<"card">>, <<"vuekey">> := <<"ChartStatus">>, <<"table">> := <<"Device">>, <<"query">> := Query}, #task{dashboardId = DashboardId, sessiontoken = SessionToken}) ->
Where = maps:get(<<"where">>, Query, #{}),
OnlineCount =
case dgiot_parse:query_object(<<"Device">>, Query#{<<"keys">> => [<<"count(*)">>], <<"where">> => Where#{<<"status">> => <<"ONLINE">>}}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
case dgiot_parse:query_object(<<"Device">>, Query#{<<"count">> => <<"objectId">>, <<"where">> => Where#{<<"status">> => <<"ONLINE">>}}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"count">> := OnlineCount1}} ->
OnlineCount1;
_ ->
0
end,
OfflineCount =
case dgiot_parse:query_object(<<"Device">>, Query#{<<"keys">> => [<<"count(*)">>], <<"where">> => Where#{<<"status">> => <<"OFFLINE">>}}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
case dgiot_parse:query_object(<<"Device">>, Query#{<<"count">> => <<"objectId">>, <<"where">> => Where#{<<"status">> => <<"OFFLINE">>}}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"count">> := OfflineCount2}} ->
OfflineCount2;
_ ->

View File

@ -28,6 +28,7 @@
%% Device 线线
parse_cache_Device(_ClassName) ->
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, ClassName]),
dgiot_product:load_cache(),
Success = fun(Page) ->
lists:map(fun(Device) ->
dgiot_device:save(Device)

View File

@ -19,18 +19,17 @@
-include("dgiot_device.hrl").
-include_lib("dgiot/include/logger.hrl").
-dgiot_data("ets").
-export([init_ets/0, load_cache/1, local/1, save/1, get/1, delete/1, save_prod/2, lookup_prod/1]).
-export([init_ets/0, load_cache/0, local/1, save/1, get/1, delete/1, save_prod/2, lookup_prod/1]).
-export([parse_frame/3, to_frame/2]).
-export([create_product/2, add_product_relation/2, delete_product_relation/1]).
init_ets() ->
dgiot_data:init(?DGIOT_PRODUCT).
load_cache(ClassName) ->
io:format("~s ~p ~p ~n",[?FILE, ?LINE, ClassName]),
load_cache() ->
Success = fun(Page) ->
lists:map(fun(#{<<"objectId">> := ObjectId, <<"productSecret">> := ProductSecret} = Product) ->
dgiot_mnesia:insert(ObjectId, ['Product', dgiot_role:get_acls(Product), ProductSecret]),
lists:map(fun(Product) ->
%% dgiot_mnesia:insert(ObjectId, ['Product', dgiot_role:get_acls(Product), ProductSecret]),
dgiot_product:save(Product)
end, Page)
end,

View File

@ -195,7 +195,7 @@ sendSubscribe_test(UserId, #{<<"data">> := Data,
%%
get_wechat_index(SessionToken) ->
case dgiot_parse:query_object(<<"Device">>, #{<<"keys">> => [<<"count(*)">>], <<"limit">> => 1000}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
case dgiot_parse:query_object(<<"Device">>, #{<<"count">> => <<"objectId">>, <<"limit">> => 1000}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"count">> := Count, <<"results">> := Results}} ->
{ONLINE, OFFLINE} =
lists:foldl(fun(X, {Online, Offline}) ->
@ -209,14 +209,14 @@ get_wechat_index(SessionToken) ->
end
end, {[], []}, Results),
NotificationCount =
case dgiot_parse:query_object(<<"Notification">>, #{<<"keys">> => [<<"count(*)">>], <<"limit">> => 1}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
case dgiot_parse:query_object(<<"Notification">>, #{<<"count">> => <<"objectId">>, <<"limit">> => 1}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"count">> := NotCount, <<"results">> := _}} ->
NotCount;
_ ->
0
end,
UnPanalarmCount =
case dgiot_parse:query_object(<<"Notification">>, #{<<"keys">> => [<<"count(*)">>], <<"limit">> => 1, <<"where">> => #{<<"status">> => 0}}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
case dgiot_parse:query_object(<<"Notification">>, #{<<"count">> => <<"objectId">>, <<"limit">> => 1, <<"where">> => #{<<"status">> => 0}}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"count">> := UnCount, <<"results">> := _}} ->
UnCount;
_ ->
@ -286,7 +286,7 @@ sendTemplate() ->
end.
get_wechat_map(SessionToken) ->
case dgiot_parse:query_object(<<"Device">>, #{<<"keys">> => [<<"count(*)">>], <<"limit">> => 1000}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
case dgiot_parse:query_object(<<"Device">>, #{<<"count">> => <<"objectId">>, <<"limit">> => 1000}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"results">> := Results}} ->
NewResult =
lists:foldl(fun(X, Acc) ->
@ -327,7 +327,7 @@ get_device_info(Deviceid, SessionToken) ->
end.
%%
%% dgiot_parse:query_object(<<"Notification">>, #{<<"keys">> => [<<"count(*)">>],<<"where">> => #{<<"type">> => #{<<"$regex">> => <<"c1e44b39f0">>}}}).
%% dgiot_parse:query_object(<<"Notification">>, #{<<"count">> => <<"objectId">>,<<"where">> => #{<<"type">> => #{<<"$regex">> => <<"c1e44b39f0">>}}}).
get_notification(ProductId1, SessionToken, Order, Limit, Skip, Where) ->
Where1 =
case ProductId1 of
@ -336,64 +336,71 @@ get_notification(ProductId1, SessionToken, Order, Limit, Skip, Where) ->
ProductId2 ->
Where#{<<"type">> => #{<<"$regex">> => ProductId2}}
end,
case dgiot_parse:query_object(<<"Notification">>, #{<<"keys">> => [<<"count(*)">>], <<"order">> => Order, <<"limit">> => Limit, <<"skip">> => Skip, <<"where">> => Where1}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
case dgiot_parse:query_object(<<"Notification">>, #{<<"count">> => <<"objectId">>, <<"order">> => Order, <<"limit">> => Limit, <<"skip">> => Skip, <<"where">> => Where1}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"count">> := Count, <<"results">> := Results}} ->
NewResult =
lists:foldl(fun(X, Acc) ->
case X of
#{<<"objectId">> := ObjectId, <<"type">> := Type, <<"public">> := Public, <<"status">> := Status, <<"content">> := Content, <<"process">> := Process, <<"createdAt">> := Createdat} ->
Alertstatus = maps:get(<<"alertstatus">>, Content, true),
DeviceId = maps:get(<<"_deviceid">>, Content, <<"">>),
Productid = maps:get(<<"_productid">>, Content, <<"">>),
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"name">> := DeviceName, <<"devaddr">> := Devaddr, <<"detail">> := Detail}} ->
Address = maps:get(<<"address">>, Detail, <<"无位置"/utf8>>),
Newdate = dgiot_datetime:format(dgiot_datetime:to_localtime(Createdat), <<"YY-MM-DD HH:NN:SS">>),
case binary:split(Type, <<$_>>, [global, trim]) of
[Productid, <<"status">>] ->
case dgiot_parse:get_object(<<"Product">>, Productid) of
{ok, #{<<"name">> := ProductName}} ->
Acc ++ [#{<<"objectId">> => ObjectId, <<"dynamicform">> => [#{<<"设备编号"/utf8>> => Devaddr}, #{<<"设备地址"/utf8>> => Address}, #{<<"报警内容"/utf8>> => <<"设备"/utf8, DeviceName/binary, "离线"/utf8>>}, #{<<"离线时间"/utf8>> => Newdate}], <<"alertstatus">> => Alertstatus, <<"productname">> => ProductName, <<"devicename">> => DeviceName, <<"process">> => Process, <<"content">> => Content, <<"public">> => Public, <<"status">> => Status, <<"createdAt">> => Createdat}];
_ ->
Acc
end;
[ProductId, AlertId] ->
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"name">> := ProductName, <<"config">> := #{<<"parser">> := Parser}}} ->
lists:foldl(fun(P, Par) ->
case P of
#{<<"uid">> := AlertId, <<"config">> := #{<<"formDesc">> := FormDesc}} ->
FormD =
maps:fold(fun(Key, Value1, Form) ->
case maps:find(Key, Content) of
{ok, Value} ->
Label = maps:get(<<"label">>, Value1),
Form ++ [#{Label => Value}];
_ ->
Label = maps:get(<<"label">>, Value1),
Default = maps:get(<<"default">>, Value1, <<>>),
Form ++ [#{Label => Default}]
end
end, [], FormDesc),
Acc ++ [Par#{<<"dynamicform">> => FormD ++ [#{<<"报警时间"/utf8>> => Newdate}]}];
_Oth ->
Acc ++ [Par]
end
end, #{<<"objectId">> => ObjectId, <<"alertstatus">> => Alertstatus, <<"productname">> => ProductName, <<"devicename">> => DeviceName, <<"process">> => Process, <<"public">> => Public, <<"status">> => Status, <<"createdAt">> => Newdate}, Parser);
_Other ->
Acc
end;
_Other1 ->
Acc
end;
_ ->
Acc
end;
_Other2 ->
Acc
end
Acc ++ get_list(X)
end, [], Results),
{ok, #{<<"count">> => Count, <<"results">> => NewResult}};
_ ->
{error, <<"no device">>}
{ok, #{<<"msg">> => <<"no device">>, <<"results">> => []}}
end.
get_list(#{<<"objectId">> := ObjectId} = Result) ->
Createdat = maps:get(<<"createdAt">>, Result),
Type = maps:get(<<"type">>, Result, <<"">>),
Public = maps:get(<<"public">>, Result, true),
Status = maps:get(<<"status">>, Result, 0),
Process = maps:get(<<"process">>, Result, <<"">>),
Content = maps:get(<<"content">>, Result, #{}),
Alertstatus = maps:get(<<"alertstatus">>, Content, true),
DeviceId = maps:get(<<"_deviceid">>, Content, <<"">>),
Productid = maps:get(<<"_productid">>, Content, <<"">>),
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"name">> := DeviceName, <<"devaddr">> := Devaddr, <<"detail">> := Detail}} ->
Address = maps:get(<<"address">>, Detail, <<"无位置"/utf8>>),
Newdate = dgiot_datetime:format(dgiot_datetime:to_localtime(Createdat), <<"YY-MM-DD HH:NN:SS">>),
case binary:split(Type, <<$_>>, [global, trim]) of
[Productid, <<"status">>] ->
case dgiot_parse:get_object(<<"Product">>, Productid) of
{ok, #{<<"name">> := ProductName}} ->
[#{<<"objectId">> => ObjectId, <<"dynamicform">> => [#{<<"设备编号"/utf8>> => Devaddr}, #{<<"设备地址"/utf8>> => Address}, #{<<"报警内容"/utf8>> => <<"设备"/utf8, DeviceName/binary, "离线"/utf8>>}, #{<<"离线时间"/utf8>> => Newdate}], <<"alertstatus">> => Alertstatus, <<"productname">> => ProductName, <<"devicename">> => DeviceName, <<"process">> => Process, <<"content">> => Content, <<"public">> => Public, <<"status">> => Status, <<"createdAt">> => Createdat}];
_ ->
[]
end;
[ProductId, AlertId] ->
get_product_not(AlertId, ProductId, Content, Newdate, ObjectId, Alertstatus, DeviceName, Process, Public, Status);
_Other1 ->
[]
end;
_ ->
[]
end.
get_product_not(AlertId, ProductId, Content, Newdate, ObjectId, Alertstatus, DeviceName, Process, Public, Status) ->
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"name">> := ProductName, <<"config">> := #{<<"parser">> := Parser}}} ->
lists:foldl(fun(P, Par) ->
case P of
#{<<"uid">> := AlertId, <<"config">> := #{<<"formDesc">> := FormDesc}} ->
FormD =
maps:fold(fun(Key, Value1, Form) ->
case maps:find(Key, Content) of
{ok, Value} ->
Label = maps:get(<<"label">>, Value1),
Form ++ [#{Label => Value}];
_ ->
Label = maps:get(<<"label">>, Value1),
Default = maps:get(<<"default">>, Value1, <<>>),
Form ++ [#{Label => Default}]
end
end, [], FormDesc),
[Par#{<<"dynamicform">> => FormD ++ [#{<<"报警时间"/utf8>> => Newdate}]}];
_Oth ->
[Par]
end
end, #{<<"objectId">> => ObjectId, <<"alertstatus">> => Alertstatus, <<"productname">> => ProductName, <<"devicename">> => DeviceName, <<"process">> => Process, <<"public">> => Public, <<"status">> => Status, <<"createdAt">> => Newdate}, Parser);
_Other ->
[]
end.

View File

@ -27,14 +27,11 @@ post('before', _BeforeData) ->
post('after', _AfterData) ->
ok.
put('before', Device) ->
put('before', #{<<"id">> := DeviceId} = Device) ->
io:format("~s ~p Device = ~p.~n", [?FILE, ?LINE, Device]),
DeviceId = maps:get(<<"objectId">>, Device),
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := Devaddr, <<"productid">> := ProductId}} ->
Profile = maps:get(<<"profile">>, Device, #{}),
io:format("~s ~p Devaddr = ~p.~n", [?FILE, ?LINE, Devaddr]),
io:format("~s ~p ProductId = ~p.~n", [?FILE, ?LINE, ProductId]),
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"name">> := ProductName, <<"thing">> := #{<<"properties">> := Properties}}} ->
NewPayLoad =

View File

@ -51,7 +51,7 @@ create_schemas(Schema) ->
create_schemas(Channel, #{<<"using">> := _STbName} = Query) ->
transaction(Channel,
fun(Context) ->
Sql = dgiot_tdengine_schema:create_table(Query,Context#{<<"channel">> => Channel}),
Sql = dgiot_tdengine_schema:create_table(Query, Context#{<<"channel">> => Channel}),
dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_update, Sql)
end);
@ -60,7 +60,9 @@ create_schemas(Channel, Query) ->
transaction(Channel,
fun(Context) ->
Sql = dgiot_tdengine_schema:create_table(Query, Context#{<<"channel">> => Channel}),
dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_update, Sql)
R = dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_update, Sql),
dgiot_tdengine_schema:alter_table(Query, Context#{<<"channel">> => Channel}),
R
end).
%%

View File

@ -36,7 +36,7 @@ start(_Type, Ip, Port) ->
{<<"HTTP">>, list_to_binary(lists:concat(["http://", binary_to_list(Ip), ":", Port, "/rest/sql"]))}.
%% Action (DQLDMLDDLDCL)
run_sql(#{<<"driver">> := <<"HTTP">>, <<"url">> := Url, <<"username">> := UserName, <<"password">> := Password} = Context, _Action, Sql) ->
run_sql(#{<<"driver">> := <<"HTTP">>, <<"url">> := Url, <<"username">> := UserName, <<"password">> := Password} = Context, _Action, Sql) when byte_size(Sql) > 0 ->
?LOG(debug, " ~p, ~p, ~p, (~ts)", [Url, UserName, Password, unicode:characters_to_list(Sql)]),
case dgiot_tdengine_http:request(Url, UserName, Password, Sql) of
{ok, Result} ->
@ -62,4 +62,7 @@ run_sql(#{<<"driver">> := <<"mqtt">>, <<"url">> := _Url}, Action, _Sql) when Act
run_sql(#{<<"driver">> := <<"JDBC">>, <<"url">> := _Url}, Action, _Sql) when Action == execute_update; Action == execute_query ->
%% ?LOG(info,"Execute ~p (~p) ~p", [Url, byte_size(Sql), Sql]),
%% apply(ejdbc, Action, [<<"com.taosdata.jdbc.TSDBDriver">>, Sql]).
ok;
run_sql(_Context, _Action, _Sql) ->
ok.

View File

@ -19,7 +19,7 @@
-include("dgiot_tdengine.hrl").
-include_lib("dgiot/include/logger.hrl").
-export([ add_field/4, get_field/1, check_fields/2, check_fields/3, get_time/2]).
-export([add_field/4, get_field/1, check_fields/2, check_fields/3, get_time/2]).
add_field(<<"enum">>, Database, TableName, LowerIdentifier) ->
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " INT;">>;

View File

@ -75,9 +75,9 @@ create_table(#{<<"tableName">> := TableName, <<"using">> := STbName, <<"tags">>
DB1 = dgiot_tdengine_select:format_db(TableName),
<<"CREATE TABLE IF NOT EXISTS ", DB1/binary, TableName/binary, " USING ", STbName/binary, " TAGS (", TagFields/binary, ");">>;
create_table(#{<<"tableName">> := TableName, <<"fields">> := Fields0} = Query, Context) ->
create_table(#{<<"tableName">> := TableName, <<"fields">> := Fields0} = Query, _Context) ->
Database = dgiot_tdengine_select:format_db(TableName),
alter_table(Query#{<<"db">> => Database}, Context),
%% alter_table(Query#{<<"db">> => Database}, Context),
Fields =
list_to_binary(dgiot_utils:join(",", ["createdat TIMESTAMP"] ++ lists:foldr(
fun({FieldName, #{<<"type">> := Type}}, Acc) ->
@ -95,10 +95,9 @@ create_table(#{<<"tableName">> := TableName, <<"fields">> := Fields0} = Query, C
<<"CREATE TABLE IF NOT EXISTS ", Database/binary, TableName/binary, " (", Fields/binary, ") TAGS (", TagFields/binary, ");">>
end.
alter_table(#{<<"db">> := Database, <<"tableName">> := TableName}, #{<<"channel">> := Channel} = Context) ->
alter_table(#{<<"tableName">> := TableName}, #{<<"channel">> := Channel} = Context) ->
Database = dgiot_tdengine_select:format_db(TableName),
Sql1 = <<"DESCRIBE ", Database/binary, TableName/binary, ";">>,
<<"_", ProductId/binary>> = TableName,
Props = get_prop(ProductId),
case dgiot_tdengine_pool:run_sql(Context, execute_query, Sql1) of
{ok, #{<<"results">> := Results}} when length(Results) > 0 ->
TdColumn =
@ -111,41 +110,37 @@ alter_table(#{<<"db">> := Database, <<"tableName">> := TableName}, #{<<"channel"
end
end, #{}, Results),
<<"_", ProductId/binary>> = TableName,
lists:foldl(fun(Prop, _Acc1) ->
case Prop of
#{<<"dataType">> := #{<<"type">> := Type}, <<"identifier">> := Identifier, <<"isshow">> := true} ->
LowerIdentifier = list_to_binary(string:to_lower(binary_to_list(Identifier))),
case maps:find(LowerIdentifier, TdColumn) of
error ->
AddSql = dgiot_tdengine_field:add_field(Type, Database, TableName, LowerIdentifier),
dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_query, AddSql);
_ ->
%% todo
pass
end;
_ ->
pass
end
end, #{}, Props),
AddSql = get_addSql(ProductId, TdColumn, Database, TableName),
dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_query, AddSql),
case dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_query, Sql1) of
{ok, #{<<"results">> := Results2}} ->
<<"_", ProductId/binary>> = TableName,
dgiot_data:insert({ProductId, ?TABLEDESCRIBE}, Results2);
_ ->
pass
end
end;
_ ->
pass
end.
get_prop(ProductId) ->
get_addSql(ProductId, TdColumn, Database, TableName) ->
case dgiot_data:get(dgiot_product, ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
lists:foldl(fun(X, Acc) ->
case X of
#{<<"identifier">> := Identifier, <<"name">> := Name} ->
Acc#{Identifier => Name};
_ -> Acc
lists:foldl(fun(Prop, _Acc1) ->
case Prop of
#{<<"dataType">> := #{<<"type">> := Type}, <<"identifier">> := Identifier, <<"isshow">> := true} ->
LowerIdentifier = list_to_binary(string:to_lower(binary_to_list(Identifier))),
case maps:find(LowerIdentifier, TdColumn) of
error ->
dgiot_tdengine_field:add_field(Type, Database, TableName, LowerIdentifier);
_ ->
%% todo
<<>>
end;
_ ->
<<>>
end
end, #{}, Props);
end, <<>>, Props);
_ ->
#{}
end.
<<>>
end.