mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-12-02 04:08:54 +08:00
feat: sync_dict,view
This commit is contained in:
parent
d495519c4d
commit
170e2d1f93
@ -38,7 +38,7 @@ get_opts(Path) ->
|
||||
string:sub_string(dgiot_utils:to_list(Path), Size + 1)
|
||||
end,
|
||||
{ok, Type} = application:get_env(dgiot_api, dicttype),
|
||||
DictId = dgiot_parse:get_dictid(Flag, dgiot_utils:to_binary(Type)),
|
||||
DictId = dgiot_parse:get_dictid(Flag, dgiot_utils:to_binary(Type), <<"Dict">>, <<"Dict">>),
|
||||
Data =
|
||||
case dgiot_data:get(DictId) of
|
||||
not_find ->
|
||||
|
@ -111,7 +111,7 @@ do_request(put_rule_id, #{<<"id">> := RuleID} = Params, _Context, _Req) ->
|
||||
%% OperationId:delete_rule_id
|
||||
%% 请求:DELETE /iotapi/rule/:{id}
|
||||
do_request(delete_rule_id, #{<<"id">> := RuleID}, _Context, _Req) ->
|
||||
ObjectId = dgiot_parse:get_dictid(RuleID, <<"ruleengine">>),
|
||||
ObjectId = dgiot_parse:get_dictid(RuleID, <<"ruleengine">>, <<"Rule">>, <<"Rule">>),
|
||||
dgiot_parse:del_object(<<"Dict">>, ObjectId),
|
||||
dgiot_data:delete(?DGIOT_RUlES, RuleID),
|
||||
emqx_rule_engine_api:delete_rule(#{id => RuleID}, []);
|
||||
@ -214,7 +214,7 @@ save_rule_to_dict(RuleID, Params) ->
|
||||
<<"data">> => #{<<"rule">> => jsx:encode(Rule)}
|
||||
},
|
||||
dgiot_data:insert(?DGIOT_RUlES, Dict),
|
||||
ObjectId = dgiot_parse:get_dictid(RuleID, <<"ruleengine">>),
|
||||
ObjectId = dgiot_parse:get_dictid(RuleID, <<"ruleengine">>, <<"Rule">>, <<"Rule">>),
|
||||
case dgiot_parse:get_object(<<"Dict">>, ObjectId) of
|
||||
{ok, _} ->
|
||||
dgiot_parse:update_object(<<"Dict">>, ObjectId, Dict);
|
||||
@ -247,7 +247,7 @@ get_channel(_Data) ->
|
||||
%%Acc ++ [maps:with([<<"name">>, <<"value">>, <<"caption">>, <<"meta">>, <<"type">>, <<"score">>], Data)] %% with 只取需要的字段
|
||||
%% ;结尾是分支 .结尾是结束
|
||||
get_dictLanguage(Language) ->
|
||||
Type = dgiot_parse:get_dictid(Language, <<"dict_template">>),
|
||||
Type = dgiot_parse:get_dictid(Language, <<"dict_template">>, <<"Dict">>, <<"Dict">>),
|
||||
case dgiot_parse:query_object(<<"Dict">>, #{<<"where">> => #{<<"type">> => Type}}) of
|
||||
{ok, #{<<"results">> := Results}} when length(Results) > 0 ->
|
||||
lists:foldl(fun(#{<<"data">> := Data}, Acc) ->
|
||||
@ -256,7 +256,6 @@ get_dictLanguage(Language) ->
|
||||
_ -> []
|
||||
end.
|
||||
|
||||
|
||||
sysc_rules() ->
|
||||
case dgiot_datetime:start_time() < 15 of
|
||||
true ->
|
||||
|
@ -522,7 +522,6 @@ get_app(SessionToken) ->
|
||||
|
||||
create_report(ProductParentId, Config, DevType, Name, Num, Imagurl, WordUrl, SessionToken) ->
|
||||
CategoryId = maps:get(<<"category">>, Config, <<"d6ad425529">>),
|
||||
Producttempid = maps:get(<<"producttemplet">>, Config, <<"">>),
|
||||
NewNum = dgiot_utils:to_binary(dgiot_utils:to_int(Num) + 1),
|
||||
ProductName = <<Name/binary, NewNum/binary>>,
|
||||
NodeType = 0,
|
||||
@ -554,8 +553,8 @@ create_report(ProductParentId, Config, DevType, Name, Num, Imagurl, WordUrl, Ses
|
||||
<<"netType">> => <<"Evidence">>,
|
||||
<<"devType">> => DevType,
|
||||
<<"desc">> => NewNum,
|
||||
<<"channel">> => #{<<"type">> => 1, <<"tdchannel">> => <<"24b9b4bc50">>, <<"taskchannel">> => <<"0edaeb918e">>, <<"otherchannel">> => [<<"11ed8ad9f2">>]},
|
||||
<<"category">> => #{<<"objectId">> => CategoryId, <<"__type">> => <<"Pointer">>, <<"className">> => <<"Category">>},
|
||||
<<"producttemplet">> => #{<<"objectId">> => Producttempid, <<"__type">> => <<"Pointer">>, <<"className">> => <<"ProductTemplet">>},
|
||||
<<"name">> => ProductName}, SessionToken) of
|
||||
{ok, #{<<"objectId">> := ObjectId} = Result} ->
|
||||
dgiot_parse:update_object(<<"Product">>, ProductParentId, #{<<"children">> => #{
|
||||
|
@ -266,6 +266,7 @@ do_report(Config, DevType, Name, SessionToken, FullPath, Uri) ->
|
||||
<<"devType">> => DevType,
|
||||
<<"desc">> => <<"0">>,
|
||||
<<"nodeType">> => 1,
|
||||
<<"channel">> => #{<<"type">> => 1, <<"tdchannel">> => <<"24b9b4bc50">>, <<"taskchannel">> => <<"0edaeb918e">>, <<"otherchannel">> => [<<"11ed8ad9f2">>]},
|
||||
<<"netType">> => <<"Evidence">>,
|
||||
<<"category">> => #{<<"objectId">> => CategoryId, <<"__type">> => <<"Pointer">>, <<"className">> => <<"Category">>},
|
||||
<<"producttemplet">> => #{<<"objectId">> => Producttempid, <<"__type">> => <<"Pointer">>, <<"className">> => <<"ProductTemplet">>},
|
||||
|
@ -254,12 +254,13 @@ post_group(Body, SessionToken) ->
|
||||
case dgiot_product:update_config(NewBody#{
|
||||
<<"desc">> => <<"DG-IoT设备分组"/utf8>>,
|
||||
<<"netType">> => <<"WIFI">>,
|
||||
<<"category">> => <<"IotHub">>,
|
||||
<<"category">> => #{<<"objectId">> => <<"e5a9059441">>, <<"__type">> => <<"Pointer">>, <<"className">> => <<"Category">>},
|
||||
<<"config">> => #{},
|
||||
<<"channel">> => #{<<"type">> => 1, <<"tdchannel">> => <<"24b9b4bc50">>, <<"taskchannel">> => <<"0edaeb918e">>, <<"otherchannel">> => [<<"11ed8ad9f2">>]},
|
||||
<<"thing">> => #{},
|
||||
<<"ACL">> => Acl,
|
||||
<<"name">> => ProductName,
|
||||
<<"nodeType">> => 2}, SessionToken) of
|
||||
<<"nodeType">> => 1}, SessionToken) of
|
||||
{_, #{<<"objectId">> := ProductId}} ->
|
||||
<<NewAddr:12/binary, _/binary>> = dgiot_utils:to_md5(<<ProductId/binary, Addr/binary>>),
|
||||
dgiot_device:create_device(#{
|
||||
|
@ -88,7 +88,7 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr,
|
||||
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "DtuAddr ~p revice from ~p", [DtuAddr, dgiot_utils:binary_to_hex(Buff)]),
|
||||
<<H:8, L:8>> = dgiot_utils:hex_to_binary(modbus_rtu:is16(Di)),
|
||||
<<Sh:8, Sl:8>> = dgiot_utils:hex_to_binary(modbus_rtu:is16(Pn)),
|
||||
case modbus_rtu:parse_frame(Buff, [], #{
|
||||
case modbus_rtu:parse_frame(Buff, #{}, #{
|
||||
<<"dtuproduct">> => ProductId,
|
||||
<<"channel">> => ChannelId,
|
||||
<<"dtuaddr">> => DtuAddr,
|
||||
|
@ -27,7 +27,7 @@
|
||||
build_req_message/1]
|
||||
).
|
||||
|
||||
-export([modbus_encoder/4, modbus_decoder/4, is16/1, set_params/3]).
|
||||
-export([modbus_encoder/4, modbus_decoder/5, is16/1, set_params/3]).
|
||||
|
||||
init(State) ->
|
||||
State#{<<"req">> => [], <<"ts">> => dgiot_datetime:now_ms(), <<"interval">> => 300}.
|
||||
@ -212,7 +212,7 @@ parse_frame(<<MbAddr:8, BadCode:8, ErrorCode:8, Crc:2/binary>> = Buff, Acc,
|
||||
_ -> {error, unknown_response_code}
|
||||
end,
|
||||
?LOG(info, "DtuAddr ~p Modbus ~p, BadCode ~p, Error ~p", [DtuAddr, MbAddr, BadCode, Error]),
|
||||
{<<>>, Acc};
|
||||
{<<>>, #{}};
|
||||
false ->
|
||||
parse_frame(Buff, Acc, State)
|
||||
end;
|
||||
@ -273,8 +273,7 @@ decode_data(Buff, ProductId, DtuAddr, Address, Acc) ->
|
||||
CheckCrc = dgiot_utils:crc16(CheckBuf),
|
||||
case CheckCrc =:= Crc of
|
||||
true ->
|
||||
Acc1 = Acc ++ modbus_decoder(ProductId, SlaveId, Address, UserZone),
|
||||
{Rest1, Acc1};
|
||||
{Rest1, modbus_decoder(ProductId, SlaveId, Address, UserZone, Acc)};
|
||||
false ->
|
||||
{Rest1, Acc}
|
||||
end;
|
||||
@ -300,8 +299,7 @@ get_write(ResponseData, SlaveId, FunCode, _DtuAddr, ProductId, Address, Acc) ->
|
||||
CheckCrc = dgiot_utils:crc16(CheckBuf),
|
||||
case CheckCrc =:= Crc of
|
||||
true ->
|
||||
Acc1 = Acc ++ modbus_decoder(ProductId, SlaveId, Address, UserZone),
|
||||
{<<>>, Acc1};
|
||||
{<<>>, modbus_decoder(ProductId, SlaveId, Address, UserZone, Acc)};
|
||||
false ->
|
||||
{<<>>, Acc}
|
||||
end.
|
||||
@ -434,7 +432,7 @@ list_word16_to_binary(Values) when is_list(Values) ->
|
||||
)
|
||||
).
|
||||
|
||||
modbus_decoder(ProductId, SlaveId, Address, Data) ->
|
||||
modbus_decoder(ProductId, SlaveId, Address, Data, Acc1) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
@ -462,8 +460,8 @@ modbus_decoder(ProductId, SlaveId, Address, Data) ->
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, #{}, Props);
|
||||
_ -> []
|
||||
end, Acc1, Props);
|
||||
_ -> #{}
|
||||
end.
|
||||
|
||||
modbus_encoder(ProductId, SlaveId, Address, Value) ->
|
||||
|
@ -242,7 +242,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
|
||||
|
||||
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = _DtuAddr, env = #{product := _ProductId, pn := _Pn, di := _Di}, product = _DtuProductId} = State} = TCPState) ->
|
||||
dgiot_bridge:send_log(ChannelId, "revice from ~p", [dgiot_utils:binary_to_hex(Buff)]),
|
||||
%% case modbus_rtu:parse_frame(Buff, [], #{
|
||||
%% case modbus_rtu:parse_frame(Buff, #{}, #{
|
||||
%% <<"dtuproduct">> => ProductId,
|
||||
%% <<"channel">> => ChannelId,
|
||||
%% <<"dtuaddr">> => DtuAddr,
|
||||
|
@ -83,8 +83,8 @@
|
||||
-export([
|
||||
get_objectid/2,
|
||||
get_deviceid/2,
|
||||
get_dictid/3,
|
||||
get_viewid/3,
|
||||
get_dictid/4,
|
||||
get_viewid/4,
|
||||
get_shapeid/2,
|
||||
get_instruct/3,
|
||||
get_roleid/1,
|
||||
@ -145,14 +145,14 @@ get_shapeid(DeviceId, Identifier) ->
|
||||
<<ShapeId:10/binary, _/binary>> = dgiot_utils:to_md5(<<DeviceId/binary, Identifier/binary, "dgiottopo">>),
|
||||
ShapeId.
|
||||
|
||||
get_dictid(Key, Type, Class) ->
|
||||
get_dictid(Key, Type, Class, Title) ->
|
||||
#{<<"objectId">> := DeviceId} =
|
||||
dgiot_parse:get_objectid(<<"Dict">>, #{<<"key">> => Key, <<"type">> => Type, <<"class">> => Class}),
|
||||
dgiot_parse:get_objectid(<<"Dict">>, #{<<"key">> => Key, <<"type">> => Type, <<"class">> => Class, <<"title">> => Title}),
|
||||
DeviceId.
|
||||
|
||||
get_viewid(Key, Type, Class) ->
|
||||
get_viewid(Key, Type, Class,Title) ->
|
||||
#{<<"objectId">> := DeviceId} =
|
||||
dgiot_parse:get_objectid(<<"View">>, #{<<"key">> => Key, <<"type">> => Type, <<"class">> => Class}),
|
||||
dgiot_parse:get_objectid(<<"View">>, #{<<"key">> => Key, <<"type">> => Type, <<"class">> => Class,<<"title">> => Title}),
|
||||
DeviceId.
|
||||
|
||||
get_deviceid(ProductId, DevAddr) ->
|
||||
@ -318,7 +318,8 @@ get_objectid(Class, Map) ->
|
||||
Key = maps:get(<<"key">>, Map, <<"">>),
|
||||
Type = maps:get(<<"type">>, Map, <<"">>),
|
||||
Class1 = maps:get(<<"class">>, Map, <<"">>),
|
||||
<<DId:10/binary, _/binary>> = dgiot_utils:to_md5(<<"Dict", Class1/binary, Key/binary, Type/binary>>),
|
||||
Title = maps:get(<<"Title">>, Map, <<"">>),
|
||||
<<DId:10/binary, _/binary>> = dgiot_utils:to_md5(<<"Dict", Class1/binary, Key/binary, Type/binary,Title/binary>>),
|
||||
Map#{
|
||||
<<"objectId">> => DId
|
||||
};
|
||||
@ -328,7 +329,8 @@ get_objectid(Class, Map) ->
|
||||
Key = maps:get(<<"key">>, Map, <<"">>),
|
||||
Type = maps:get(<<"type">>, Map, <<"">>),
|
||||
Class2 = maps:get(<<"class">>, Map, <<"">>),
|
||||
<<VId:10/binary, _/binary>> = dgiot_utils:to_md5(<<"View", Class2/binary, Key/binary, Type/binary>>),
|
||||
Title = maps:get(<<"title">>, Map, <<"">>),
|
||||
<<VId:10/binary, _/binary>> = dgiot_utils:to_md5(<<"View", Class2/binary, Key/binary, Type/binary,Title/binary>>),
|
||||
Map#{
|
||||
<<"objectId">> => VId
|
||||
};
|
||||
|
@ -164,6 +164,8 @@ stop(#{
|
||||
|
||||
%%获取计算值,必须返回物模型里面的数据表示,不能用寄存器地址
|
||||
get_calculated(ProductId, Ack) ->
|
||||
io:format("ProductId ~p~n", [ProductId]),
|
||||
io:format("Ack ~p~n", [Ack]),
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
@ -179,7 +181,12 @@ get_calculated(ProductId, Ack) ->
|
||||
Str = re:replace(Acc2, dgiot_utils:to_list(<<"%%", K/binary>>), "(" ++ dgiot_utils:to_list(V) ++ ")", [global, {return, list}]),
|
||||
re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(V) ++ ")", [global, {return, list}])
|
||||
end, dgiot_utils:to_list(Collection), Ack),
|
||||
Acc#{Identifier => string2value(Str1, Type, Specs)};
|
||||
case string2value(Str1, Type, Specs) of
|
||||
error ->
|
||||
maps:without([Identifier], Acc);
|
||||
Value1 ->
|
||||
Acc#{Identifier => Value1}
|
||||
end;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
@ -210,7 +217,7 @@ get_collection(ProductId, Dis, Payload, Ack) ->
|
||||
Acc2#{Identifier => Value};
|
||||
_ ->
|
||||
dgiot_data:insert({topogps, dgiot_parse:get_shapeid(ProductId, Identifier)}, <<"无GPS信息"/utf8>>),
|
||||
Acc2#{Identifier => <<"">>}
|
||||
Acc2
|
||||
end;
|
||||
#{<<"dataForm">> := #{<<"address">> := Address, <<"strategy">> := Strategy, <<"collection">> := Collection},
|
||||
<<"dataType">> := #{<<"type">> := Type, <<"specs">> := Specs},
|
||||
@ -219,13 +226,23 @@ get_collection(ProductId, Dis, Payload, Ack) ->
|
||||
{ok, Value} ->
|
||||
Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
|
||||
Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
|
||||
Acc2#{Identifier => string2value(Str1, Type, Specs)};
|
||||
case string2value(Str1, Type, Specs) of
|
||||
error ->
|
||||
maps:without([Identifier], Acc2);
|
||||
Value1 ->
|
||||
Acc2#{Identifier => Value1}
|
||||
end;
|
||||
_ ->
|
||||
case maps:find(Address, Payload) of
|
||||
{ok, Value} ->
|
||||
Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
|
||||
Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
|
||||
Acc2#{Identifier => string2value(Str1, Type, Specs)};
|
||||
case string2value(Str1, Type, Specs) of
|
||||
error ->
|
||||
maps:without([Identifier], Acc2);
|
||||
Value1 ->
|
||||
Acc2#{Identifier => Value1}
|
||||
end;
|
||||
_ -> Acc2
|
||||
end
|
||||
end;
|
||||
|
@ -83,7 +83,7 @@ init([#{<<"app">> := App, <<"channel">> := ChannelId, <<"dtuid">> := DtuId, <<"m
|
||||
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
|
||||
dgiot_mqtt:subscribe(Topic),
|
||||
AppData = maps:get(<<"appdata">>, _Args, #{}),
|
||||
dgiot_metrics:inc(dgiot_task,<<"task">>,1),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task">>, 1),
|
||||
{ok, #task{mode = dgiot_utils:to_atom(Mode), app = App, dtuid = DtuId, product = ProductId, devaddr = DevAddr,
|
||||
tid = ChannelId, firstid = DeviceId, que = Que, round = 1, appdata = AppData, ts = Nowstamp, freq = Freq, endtime = Tsendtime}}
|
||||
end;
|
||||
@ -145,7 +145,7 @@ handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = Product
|
||||
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
|
||||
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "to_dev=> ~s ~p ~ts: ~ts ", [?FILE, ?LINE, unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
|
||||
NewAck = dgiot_task:get_collection(ProductId, Dis, Payload, Ack),
|
||||
dgiot_metrics:inc(dgiot_task,<<"task_recv">>,1),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1),
|
||||
{noreply, get_next_pn(State#task{ack = NewAck})};
|
||||
|
||||
%% ACK消息触发抄表指令
|
||||
@ -153,14 +153,14 @@ handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = Product
|
||||
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
|
||||
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "to_dev=> ~s ~p ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
|
||||
NewAck = dgiot_task:get_collection(ProductId, Dis, Payload, Ack),
|
||||
dgiot_metrics:inc(dgiot_task,<<"task_recv">>,1),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1),
|
||||
{noreply, send_msg(State#task{ack = NewAck})};
|
||||
|
||||
handle_info(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
dgiot_metrics:dec(dgiot_task,<<"task">>,1),
|
||||
dgiot_metrics:dec(dgiot_task, <<"task">>, 1),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
@ -210,7 +210,7 @@ send_msg(#task{tid = Channel, product = Product, devaddr = DevAddr, ref = Ref, q
|
||||
_ -> erlang:cancel_timer(Ref)
|
||||
end,
|
||||
NewQue = lists:nthtail(NewCount, Que),
|
||||
dgiot_metrics:inc(dgiot_task,<<"task_send">>,1),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task_send">>, 1),
|
||||
State#task{que = NewQue, dis = Dis, ref = erlang:send_after(Interval * 1000, self(), retry)}.
|
||||
|
||||
|
||||
@ -249,20 +249,15 @@ save_td(#task{app = _App, tid = Channel, product = ProductId, devaddr = DevAddr,
|
||||
0 ->
|
||||
pass;
|
||||
_ ->
|
||||
case lists:member(error, maps:values(Data)) of
|
||||
false ->
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
Payload = jsx:encode(#{<<"thingdata">> => Data, <<"appdata">> => AppData}),
|
||||
Topic = <<"topo/", ProductId/binary, "/", DevAddr/binary, "/post">>,
|
||||
dgiot_mqtt:publish(DeviceId, Topic, Payload),
|
||||
dgiot_tdengine_adapter:save(ProductId, DevAddr, Data),
|
||||
dgiot_metrics:inc(dgiot_task,<<"task_save">>,1),
|
||||
NotificationTopic = <<"notification/", ProductId/binary, "/", DevAddr/binary, "/post">>,
|
||||
dgiot_mqtt:publish(DevAddr, NotificationTopic, jsx:encode(Data)),
|
||||
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "from_Notification=> ~s ~p ~ts: ~ts ", [?FILE, ?LINE, unicode:characters_to_list(NotificationTopic), unicode:characters_to_list(jsx:encode(Data))]);
|
||||
true ->
|
||||
pass
|
||||
end
|
||||
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
|
||||
Payload = jsx:encode(#{<<"thingdata">> => Data, <<"appdata">> => AppData}),
|
||||
Topic = <<"topo/", ProductId/binary, "/", DevAddr/binary, "/post">>,
|
||||
dgiot_mqtt:publish(DeviceId, Topic, Payload),
|
||||
dgiot_tdengine_adapter:save(ProductId, DevAddr, Data),
|
||||
dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1),
|
||||
NotificationTopic = <<"notification/", ProductId/binary, "/", DevAddr/binary, "/post">>,
|
||||
dgiot_mqtt:publish(DevAddr, NotificationTopic, jsx:encode(Data)),
|
||||
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "from_Notification=> ~s ~p ~ts: ~ts ", [?FILE, ?LINE, unicode:characters_to_list(NotificationTopic), unicode:characters_to_list(jsx:encode(Data))])
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -114,12 +114,55 @@ init(?TYPE, ChannelId, #{<<"product">> := Products, <<"BRIDGEURL">> := Bridgeurl
|
||||
%% 初始化池子
|
||||
handle_init(#state{env = #{productids := ProductIds}} = State) ->
|
||||
[dgiot_mqtt:subscribe(<<"topo/", ProductId/binary, "/#">>) || ProductId <- ProductIds],
|
||||
dgiot_parse:subscribe(<<"Product">>, post),
|
||||
{ok, State}.
|
||||
|
||||
%% 通道消息处理,注意:进程池调用
|
||||
handle_event(EventId, Event, _State) ->
|
||||
?LOG(info, "channel ~p, ~p", [EventId, Event]),
|
||||
ok.
|
||||
handle_message({sync_parse, Args}, State) ->
|
||||
%% io:format("Args ~p~n", [jsx:decode(Args, [{labels, binary}, return_maps])]),
|
||||
case jsx:decode(Args, [{labels, binary}, return_maps]) of
|
||||
#{<<"producttemplet">> := #{<<"className">> := <<"ProductTemplet">>, <<"objectId">> := ProducttempletId, <<"__type">> := <<"Pointer">>}, <<"objectId">> := ObjectId} ->
|
||||
case dgiot_parse:query_object(<<"Dict">>, #{<<"where">> => #{<<"key">> => ProducttempletId, <<"class">> => <<"ProductTemplet">>}}) of
|
||||
{ok, #{<<"results">> := Dicts}} ->
|
||||
DictRequests =
|
||||
lists:foldl(fun(Dict, Acc) ->
|
||||
NewDict = maps:without([<<"createdAt">>, <<"objectId">>, <<"updatedAt">>], Dict),
|
||||
Acc ++ [#{
|
||||
<<"method">> => <<"POST">>,
|
||||
<<"path">> => <<"/classes/Dict">>,
|
||||
<<"body">> => NewDict#{
|
||||
<<"key">> => ObjectId,
|
||||
<<"class">> => <<"Product">>}
|
||||
}]
|
||||
end, [], Dicts),
|
||||
dgiot_parse:batch(DictRequests);
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
case dgiot_parse:query_object(<<"View">>, #{<<"where">> => #{<<"key">> => ProducttempletId, <<"class">> => <<"ProductTemplet">>}}) of
|
||||
{ok, #{<<"results">> := Views}} ->
|
||||
ViewRequests =
|
||||
lists:foldl(fun(View, Acc) ->
|
||||
NewDict = maps:without([<<"createdAt">>, <<"objectId">>, <<"updatedAt">>], View),
|
||||
Acc ++ [#{
|
||||
<<"method">> => <<"POST">>,
|
||||
<<"path">> => <<"/classes/View">>,
|
||||
<<"body">> => NewDict#{
|
||||
<<"key">> => ObjectId,
|
||||
<<"class">> => <<"Product">>}
|
||||
}]
|
||||
end, [], Views),
|
||||
dgiot_parse:batch(ViewRequests);
|
||||
_ ->
|
||||
pass
|
||||
end;
|
||||
_ ->
|
||||
pass
|
||||
end,
|
||||
{ok, State};
|
||||
|
||||
handle_message({deliver, _Topic, Msg}, #state{id = ChannelId} = State) ->
|
||||
Payload = dgiot_mqtt:get_payload(Msg),
|
||||
|
@ -12,8 +12,9 @@ processor=1
|
||||
fileserver="https://dgiot-release-1306147891.cos.ap-nanjing.myqcloud.com/v4.4.0"
|
||||
updateserver="http://dgiot-1253666439.cos.ap-shanghai-fsi.myqcloud.com/dgiot_release/update"
|
||||
#https://bcrypt-generator.com/
|
||||
yum install -y openssl openssl-devel &> /dev/null
|
||||
yum install -y libstdc++-devel openssl-devel &> /dev/null
|
||||
pg_pwd=`openssl rand -hex 8 | md5sum | cut -f1 -d ' '`
|
||||
|
||||
parse_user=dgiot
|
||||
parse_pwd=`openssl rand -hex 8 | md5sum | cut -f1 -d ' '`
|
||||
parse_appid=`openssl rand -hex 8 | md5sum | cut -f1 -d ' '`
|
||||
@ -211,6 +212,7 @@ function clean_services(){
|
||||
clean_service pushgateway
|
||||
clean_service node_exporter
|
||||
clean_service postgres_exporter
|
||||
clean_service nginx
|
||||
}
|
||||
|
||||
function clean_service() {
|
||||
@ -354,7 +356,6 @@ function yum_install_postgres() {
|
||||
${csudo} yum install -y clang libicu-devel perl-ExtUtils-Embed &> /dev/null
|
||||
${csudo} yum install -y readline readline-devel &> /dev/null
|
||||
${csudo} yum install -y zlib zlib-devel &> /dev/null
|
||||
${csudo} yum install -y openssl openssl-devel &> /dev/null
|
||||
${csudo} yum install -y pam-devel libxml2-devel libxslt-devel &> /dev/null
|
||||
${csudo} yum install -y openldap-devel systemd-devel &> /dev/null
|
||||
${csudo} yum install -y tcl-devel python-devel &> /dev/null
|
||||
@ -703,7 +704,6 @@ function yum_install_erlang_otp {
|
||||
echo -e "`date +%F_%T` $LINENO: ${GREEN} yum_install_erlang_otp${NC}"
|
||||
yum install -y make gcc gcc-c++ &> /dev/null
|
||||
yum install -y kernel-devel m4 ncurses-devel &> /dev/null
|
||||
yum install -y libstdc++-devel openssl-devel &> /dev/null
|
||||
yum install -y unixODBC unixODBC-devel &> /dev/null
|
||||
yum install -y libtool-ltdl libtool-ltdl-devel &> /dev/null
|
||||
|
||||
@ -938,6 +938,7 @@ function install_grafana() {
|
||||
}
|
||||
|
||||
function install_nginx() {
|
||||
clean_service nginx
|
||||
if systemctl is-active --quiet nginx; then
|
||||
echo -e "`date +%F_%T` $LINENO: ${GREEN} nginx is running, stopping it...${NC}"
|
||||
rpm -e nginx
|
||||
|
Loading…
Reference in New Issue
Block a user