mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-02 04:08:54 +08:00
feat: td add tags
This commit is contained in:
parent
2d657bb7f3
commit
430fcf95d6
@ -70,6 +70,7 @@ lookup_prod(ProductId) ->
|
||||
save(Product) ->
|
||||
Product1 = format_product(Product),
|
||||
#{<<"productId">> := ProductId} = Product1,
|
||||
dgiot_data:delete(?DGIOT_PRODUCT, ProductId),
|
||||
dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product1),
|
||||
save_keys(ProductId),
|
||||
save_control(ProductId),
|
||||
@ -192,17 +193,34 @@ save_device_thingtype(ProductId) ->
|
||||
|
||||
%% 物模型标识符
|
||||
save_product_identifier(ProductId) ->
|
||||
delete_product_identifier(ProductId),
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} ->
|
||||
Tags = maps:get(<<"tags">>, Thing, []),
|
||||
lists:map(
|
||||
fun(#{<<"identifier">> := Identifie} = Prop) ->
|
||||
dgiot_data:insert(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifie, identifie}, Prop)
|
||||
end, Props);
|
||||
end, Props ++ Tags);
|
||||
|
||||
_Error ->
|
||||
[]
|
||||
end.
|
||||
|
||||
delete_product_identifier(ProductId) ->
|
||||
Fun =
|
||||
fun
|
||||
({Key, _}) ->
|
||||
case Key of
|
||||
{ProductId, _, identifie} ->
|
||||
dgiot_data:delete(?DGIOT_PRODUCT_IDENTIFIE, Key);
|
||||
_ ->
|
||||
pass
|
||||
end;
|
||||
(_) ->
|
||||
pass
|
||||
end,
|
||||
dgiot_data:loop(?DGIOT_PRODUCT_IDENTIFIE, Fun).
|
||||
|
||||
get_product_identifier(ProductId, Identifie) ->
|
||||
case dgiot_data:get(?DGIOT_PRODUCT_IDENTIFIE, {ProductId, Identifie, identifie}) of
|
||||
not_find ->
|
||||
|
@ -30,6 +30,7 @@ init_ets() ->
|
||||
|
||||
%% 设备类型
|
||||
save_product_enum(ProductId) ->
|
||||
delete_product_enum(ProductId),
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:map(
|
||||
@ -47,6 +48,22 @@ save_product_enum(ProductId) ->
|
||||
_Error ->
|
||||
[]
|
||||
end.
|
||||
|
||||
delete_product_enum(ProductId) ->
|
||||
Fun =
|
||||
fun
|
||||
({Key, _}) ->
|
||||
case Key of
|
||||
{ProductId, device_thing, _} ->
|
||||
dgiot_data:delete(?MODULE, Key);
|
||||
_ ->
|
||||
pass
|
||||
end;
|
||||
(_) ->
|
||||
pass
|
||||
end,
|
||||
dgiot_data:loop(?MODULE, Fun).
|
||||
|
||||
get_reverse(Spec) ->
|
||||
maps:fold(
|
||||
fun(K, V, Acc) ->
|
||||
|
@ -211,25 +211,23 @@ get_fields(Table) ->
|
||||
format_sql(ProductId, DevAddr, Data) ->
|
||||
case dgiot_bridge:get_product_info(ProductId) of
|
||||
{ok, #{<<"thing">> := Properties}} ->
|
||||
NewValues =
|
||||
Fields =
|
||||
case dgiot_data:get({ProductId, ?TABLEDESCRIBE}) of
|
||||
Results when length(Results) > 0 ->
|
||||
get_sqls(Data, ProductId, Properties, Results);
|
||||
_ ->
|
||||
" "
|
||||
""
|
||||
end,
|
||||
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
|
||||
TdChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"TD">>, <<"TD资源通道"/utf8>>),
|
||||
DB = dgiot_tdengine:get_database(TdChannelId, ProductId),
|
||||
TableName = ?Table(DeviceId),
|
||||
Using1 = <<" using ", DB/binary, "_", ProductId/binary>>,
|
||||
TagFields = <<" TAGS ('_", DevAddr/binary, "')">>,
|
||||
<<"INSERT INTO ", DB/binary, TableName/binary, Using1/binary, TagFields/binary, " VALUES", NewValues/binary, ";">>;
|
||||
<<"INSERT INTO ", DB/binary, TableName/binary, Using1/binary, Fields/binary, ";">>;
|
||||
_ ->
|
||||
<<"show database;">>
|
||||
end.
|
||||
|
||||
|
||||
get_sqls(Data, ProductId, Properties, Results) ->
|
||||
get_sqls(Data, ProductId, Properties, Results, <<"">>).
|
||||
|
||||
@ -238,43 +236,51 @@ get_sqls([], _ProductId, _Properties, _Results, Acc) ->
|
||||
|
||||
get_sqls([Data | Rest], ProductId, Properties, Results, Acc) ->
|
||||
Now = maps:get(<<"createdat">>, Data, now),
|
||||
Sql = get_sql(Results, Data, Now),
|
||||
Sql = get_sql(Results, ProductId, Data, Now),
|
||||
get_sqls(Rest, ProductId, Properties, Results, <<Acc/binary, Sql/binary>>).
|
||||
|
||||
get_sql(Results, Values, Now) ->
|
||||
get_sql(Results, Values, Now, " (").
|
||||
get_sql(Results, ProductId, Values, Now) ->
|
||||
get_sql(Results, ProductId, Values, Now, {"", ""}).
|
||||
|
||||
get_sql([], _Values, _Now, Acc) ->
|
||||
list_to_binary(Acc ++ ")");
|
||||
get_sql([], _ProductId, _Values, _Now, {TagAcc, Acc}) ->
|
||||
list_to_binary(" TAGS(" ++ TagAcc ++ ") VALUES(" ++ Acc ++ ")");
|
||||
|
||||
get_sql([Column | Results], Values, Now, Acc) ->
|
||||
get_sql([Column | Results], ProductId, Values, Now, {TagAcc, Acc}) ->
|
||||
NewAcc =
|
||||
case Column of
|
||||
#{<<"Note">> := <<"TAG">>} ->
|
||||
Acc;
|
||||
#{<<"note">> := <<"TAG">>} ->
|
||||
Acc;
|
||||
#{<<"Field">> := Field, <<"Note">> := <<"TAG">>} ->
|
||||
{get_value(Field, Values, ProductId, TagAcc), Acc};
|
||||
#{<<"field">> := Field, <<"note">> := <<"TAG">>} ->
|
||||
{get_value(Field, Values, ProductId, TagAcc), Acc};
|
||||
#{<<"Field">> := <<"createdat">>} ->
|
||||
Acc ++ dgiot_utils:to_list(Now);
|
||||
{TagAcc, Acc ++ dgiot_utils:to_list(Now)};
|
||||
#{<<"field">> := <<"createdat">>} ->
|
||||
Acc ++ dgiot_utils:to_list(Now);
|
||||
{TagAcc, Acc ++ dgiot_utils:to_list(Now)};
|
||||
#{<<"Field">> := Field} ->
|
||||
Value = maps:get(Field, Values, null),
|
||||
case Value of
|
||||
{NewValue, text} ->
|
||||
Acc ++ ",\'" ++ dgiot_utils:to_list(NewValue) ++ "\'";
|
||||
_ ->
|
||||
Acc ++ "," ++ dgiot_utils:to_list(Value)
|
||||
end;
|
||||
{TagAcc, get_value(Field, Values, ProductId, Acc)};
|
||||
#{<<"field">> := Field} ->
|
||||
Value = maps:get(Field, Values, null),
|
||||
case Value of
|
||||
{NewValue, text} ->
|
||||
Acc ++ ",\'" ++ dgiot_utils:to_list(NewValue) ++ "\'";
|
||||
_ ->
|
||||
Acc ++ "," ++ dgiot_utils:to_list(Value)
|
||||
end;
|
||||
{TagAcc, get_value(Field, Values, ProductId, Acc)};
|
||||
_ ->
|
||||
Acc
|
||||
{TagAcc, Acc}
|
||||
end,
|
||||
get_sql(Results, Values, Now, NewAcc).
|
||||
get_sql(Results, ProductId, Values, Now, NewAcc).
|
||||
|
||||
|
||||
get_value(Field, Values, ProductId, Acc) ->
|
||||
Value = dgiot_tdengine_field:check_value(maps:get(Field, Values, null), ProductId, Field),
|
||||
case Value of
|
||||
{NewValue, text} ->
|
||||
case Acc of
|
||||
"" ->
|
||||
",\'" ++ dgiot_utils:to_list(NewValue) ++ "\'";
|
||||
_ ->
|
||||
Acc ++ ",\'" ++ dgiot_utils:to_list(NewValue) ++ "\'"
|
||||
end;
|
||||
_ ->
|
||||
case Acc of
|
||||
"" ->
|
||||
dgiot_utils:to_list(Value);
|
||||
_ ->
|
||||
Acc ++ "," ++ dgiot_utils:to_list(Value)
|
||||
end
|
||||
end.
|
||||
|
@ -227,7 +227,7 @@ handle_message({data, Product, DevAddr, Data, Context}, #state{id = ChannelId} =
|
||||
%% 数据与产品,设备地址分离
|
||||
handle_message({sql, Sql}, #state{id = ChannelId} = State) ->
|
||||
dgiot_metrics:inc(dgiot_tdengine, <<"tdengine_recv">>, 1),
|
||||
dgiot_tdengine:batch_sql(ChannelId, Sql),
|
||||
dgiot_tdengine:batch_sql(ChannelId, Sql),
|
||||
{ok, State};
|
||||
|
||||
%% 规则引擎导入
|
||||
@ -264,10 +264,10 @@ handle_info(Message, State) ->
|
||||
io:format("~s ~p Message = ~p.~n", [?FILE, ?LINE, Message]),
|
||||
{ok, State}.
|
||||
|
||||
do_save([ProductId, DevAddr, Data, _Context], #state{id = ChannelId} = State) ->
|
||||
do_save([ProductId, DevAddr, Data, _Context], State) ->
|
||||
dgiot_device:save(ProductId, DevAddr),
|
||||
Object = dgiot_tdengine:format_data(ChannelId, ProductId, DevAddr, Data),
|
||||
dgiot_tdengine:batch(ChannelId, Object),
|
||||
Sql = dgiot_tdengine:format_sql(ProductId, DevAddr, [Data]),
|
||||
dgiot_tdengine_adapter:save_sql(ProductId, Sql),
|
||||
{ok, State}.
|
||||
|
||||
do_check(ChannelId, ProductIds, Config) ->
|
||||
|
@ -19,30 +19,30 @@
|
||||
-include("dgiot_tdengine.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
|
||||
-export([add_field/4, get_field/1, check_fields/2, check_fields/3, get_time/2, check_value/3, get_field_type/1]).
|
||||
-export([add_field/5, get_field/1, check_fields/2, check_fields/3, get_time/2, check_value/3, get_field_type/1]).
|
||||
|
||||
add_field(#{<<"type">> := <<"enum">>}, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " INT;">>;
|
||||
add_field(#{<<"type">> := <<"file">>} = Spec, Database, TableName, LowerIdentifier) ->
|
||||
add_field(#{<<"type">> := <<"enum">>}, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " INT;">>;
|
||||
add_field(#{<<"type">> := <<"file">>} = Spec, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)),
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " NCHAR(", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"text">>} = Spec, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " NCHAR(", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"text">>} = Spec, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)),
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " NCHAR(", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"url">>} = Spec, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " NCHAR(", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"url">>} = Spec, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)),
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " NCHAR((", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"geopoint">>} = Spec, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " NCHAR((", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"geopoint">>} = Spec, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
Size = integer_to_binary(min(maps:get(<<"size">>, Spec, 50), 200)),
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " NCHAR(", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"image">>}, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " BIGINT;">>;
|
||||
add_field(#{<<"type">> := <<"date">>}, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " TIMESTAMP;">>;
|
||||
add_field(#{<<"type">> := <<"long">>}, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " BIGINT;">>;
|
||||
add_field(#{<<"type">> := Type}, Database, TableName, LowerIdentifier) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD COLUMN ", LowerIdentifier/binary, " ", Type/binary, ";">>.
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " NCHAR(", Size/binary, ");">>;
|
||||
add_field(#{<<"type">> := <<"image">>}, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " BIGINT;">>;
|
||||
add_field(#{<<"type">> := <<"date">>}, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " TIMESTAMP;">>;
|
||||
add_field(#{<<"type">> := <<"long">>}, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " BIGINT;">>;
|
||||
add_field(#{<<"type">> := Type}, Database, TableName, LowerIdentifier, FieldType) ->
|
||||
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " ", Type/binary, ";">>.
|
||||
|
||||
%% https://www.taosdata.com/cn/documentation/taos-sql#data-type
|
||||
%% # 类型 Bytes 说明
|
||||
|
@ -71,7 +71,11 @@ format_keep(Query) ->
|
||||
|
||||
|
||||
create_table(#{<<"tableName">> := TableName, <<"using">> := STbName, <<"tags">> := Tags} = _Query, #{<<"channel">> := Channel} = _Context) ->
|
||||
TagFields = list_to_binary(dgiot_utils:join(",", Tags, fun dgiot_tdengine_select:format_value/1)),
|
||||
TagFields =
|
||||
list_to_binary(dgiot_utils:join(",", lists:foldr(
|
||||
fun({TagName, #{<<"type">> := TType}}, Acc) ->
|
||||
[<<TagName/binary, " ", TType/binary>> | Acc]
|
||||
end, [], Tags))),
|
||||
<<"_", ProductId/binary>> = TableName,
|
||||
DataBase = dgiot_tdengine:get_database(Channel, ProductId),
|
||||
<<"CREATE TABLE IF NOT EXISTS ", DataBase/binary, TableName/binary, " USING ", STbName/binary, " TAGS (", TagFields/binary, ");">>;
|
||||
@ -128,27 +132,35 @@ alter_table(#{<<"tableName">> := TableName}, #{<<"channel">> := Channel} = Conte
|
||||
|
||||
get_addSql(ProductId, TdColumn, Database, TableName) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props} = Thing}} ->
|
||||
Tags = maps:get(<<"tags">>, Thing, []),
|
||||
lists:foldl(fun(Prop, Acc) ->
|
||||
case Prop of
|
||||
#{<<"dataType">> := #{<<"type">> := Type} = DataType, <<"identifier">> := Identifier, <<"isstorage">> := true} ->
|
||||
#{<<"dataType">> := #{<<"type">> := Type} = DataType, <<"identifier">> := Identifier, <<"moduleType">> := ModuleType, <<"isstorage">> := true} ->
|
||||
LowerIdentifier = list_to_binary(string:to_lower(binary_to_list(Identifier))),
|
||||
LowerType = dgiot_tdengine_field:get_field_type(Type),
|
||||
FieldType = get_fieldtype(ModuleType),
|
||||
case maps:find(LowerIdentifier, TdColumn) of
|
||||
error ->
|
||||
Acc ++ [dgiot_tdengine_field:add_field(DataType, Database, TableName, LowerIdentifier)];
|
||||
Acc ++ [dgiot_tdengine_field:add_field(DataType, Database, TableName, LowerIdentifier, FieldType)];
|
||||
{ok, LowerType} ->
|
||||
Acc;
|
||||
_ ->
|
||||
%% 类型改变, 先删除列, 再重新添加
|
||||
DROP = <<"ALTER TABLE ", Database/binary, TableName/binary, " DROP COLUMN ", LowerIdentifier/binary, ";">>,
|
||||
ADD = dgiot_tdengine_field:add_field(DataType, Database, TableName, LowerIdentifier),
|
||||
DROP = <<"ALTER TABLE ", Database/binary, TableName/binary, " DROP ", FieldType/binary, " ", LowerIdentifier/binary, ";">>,
|
||||
ADD = dgiot_tdengine_field:add_field(DataType, Database, TableName, LowerIdentifier, FieldType),
|
||||
Acc ++ [DROP, ADD]
|
||||
end;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, [], Props);
|
||||
end, [], Props ++ Tags);
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
get_fieldtype(<<"tags">>) ->
|
||||
<<"TAG">>;
|
||||
get_fieldtype(_) ->
|
||||
<<"COLUMN">>.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user