feat: 后端自动订阅设备实时在线消息

This commit is contained in:
guo 2022-06-22 18:19:05 +08:00
parent 34ee0378bb
commit 8369c20700
6 changed files with 82 additions and 29 deletions

View File

@ -26,6 +26,9 @@
-define(SUBOPTION, emqx_suboption).
-define(SUBSCRIBER, emqx_subscriber).
-define(SUBSCRIPTION, emqx_subscription).
-dgiot_data("ets").
-export([init_ets/0]).
-define(DGIOT_ROUTE_KEY, dgiot_route_key).
-export([
has_routes/1
@ -44,8 +47,37 @@
, republish/2
, get_message/2
, subopts/0
,subscribe_route_key/3
]).
init_ets() ->
dgiot_data:init(?DGIOT_ROUTE_KEY).
%%
subscribe_route_key(DeviceList,SessionToken,Route) ->
TopicKey = Route,
case dgiot_data:get(?DGIOT_ROUTE_KEY,{SessionToken, TopicKey}) of
not_find ->
pass;
OldTopic ->
lists:foldl(
fun(X, _Acc) ->
Topic = <<"$dg/user/devicestate/",X/binary,"/report">>,
dgiot_mqtt:unsubscribe(SessionToken, Topic),
[]
end, [], OldTopic
)
end,
lists:foldl(
fun(X, _Acc) ->
Topic = <<"$dg/user/devicestate/",X/binary,"/report">>,
dgiot_mqtt:subscribe(SessionToken, Topic),
[]
end, [], DeviceList
),
dgiot_data:insert(?DGIOT_ROUTE_KEY,{SessionToken, TopicKey}, DeviceList).
has_routes(Topic) ->
emqx_router:has_routes(Topic).

View File

@ -135,23 +135,26 @@ handle_message(check, #state{id = ChannelId, env = #{<<"offline">> := OffLine, <
handle_message({sync_parse, Pid, 'after', get, _Token, <<"Device">>, #{<<"results">> := Results} = ResBody}, State) ->
%% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid,Results]),
NewResults = lists:foldl(fun(#{<<"objectId">> := DeviceId} = Device, Acc) ->
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"status">> := Status, <<"isEnable">> := IsEnable, <<"longitude">> := Longitude, <<"latitude">> := Latitude, <<"time">> := Time}} ->
NewStatus =
case Status of
true ->
<<"ONLINE">>;
_ ->
<<"OFFLINE">>
end,
Location = #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => Longitude, <<"latitude">> => Latitude},
Acc ++ [Device#{<<"location">> => Location, <<"status">> => NewStatus, <<"isEnable">> => IsEnable, <<"lastOnlineTime">> => Time}];
_ ->
Acc ++ [Device]
end
end, [], Results),
{NewResults, DeviceList} = lists:foldl(
fun(#{<<"objectId">> := DeviceId} = Device, Acc) ->
{NewResult,Dev} = Acc,
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"status">> := Status, <<"isEnable">> := IsEnable, <<"longitude">> := Longitude, <<"latitude">> := Latitude, <<"time">> := Time}} ->
NewStatus =
case Status of
true ->
<<"ONLINE">>;
_ ->
<<"OFFLINE">>
end,
Location = #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => Longitude, <<"latitude">> => Latitude},
{NewResult ++ [Device#{<<"location">> => Location, <<"status">> => NewStatus, <<"isEnable">> => IsEnable, <<"lastOnlineTime">> => Time}],Dev ++ [DeviceId]};
_ ->
NewResult ++ [Device]
end
end, {[], []}, Results),
SessionToken = dgiot_parse_auth:get_usersession(dgiot_utils:to_binary(_Token)),
dgiot_mqtt:subscribe_route_key(DeviceList,SessionToken,devicestate),
dgiot_parse_hook:publish(Pid, ResBody#{<<"results">> => NewResults}),
{ok, State};

View File

@ -39,15 +39,6 @@ put('before', #{<<"id">> := DeviceId, <<"profile">> := UserProfile} = Device) ->
_ ->
<<"$dg/device/", ProductId/binary, "/", Devaddr/binary, "/profile">>
end,
PowerOnCtrl = maps:get(<<"PowerOnCtrl">>, UserProfile, <<>>),
case dgiot_utils:trim_string(dgiot_utils:to_list(PowerOnCtrl)) of
"1" ->
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"isEnable">> => true});
"0" ->
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"isEnable">> => false});
_ ->
pass
end,
dgiot_mqtt:publish(DeviceId, ProfileTopic, jsx:encode(UserProfile));
_ ->
pass

View File

@ -139,8 +139,6 @@ put(Device) ->
{ok, <<"OFFLINE">>} -> false;
_ -> true
end,
Topic = <<"$dg/user/devicestate/", DeviceId/binary, "/", "report">>,
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(#{DeviceId => #{<<"isEnable">> => NewIsEnable}})),
NewAcl =
case maps:find(<<"ACL">>, Device) of
error ->
@ -154,6 +152,8 @@ put(Device) ->
end.
insert_mnesia(DeviceId, Acl, Status, Now, IsEnable, ProductId, Devaddr, DeviceSecret, Node, Longitude, Latitude) ->
Topic = <<"$dg/user/devicestate/",DeviceId/binary,"/report">>,
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(#{DeviceId => #{<<"state">> =>Status, <<"isEnable">> => IsEnable}})),
dgiot_mnesia:insert(DeviceId, ['Device', Acl, Status, Now, IsEnable, dgiot_utils:to_atom(ProductId), Devaddr, DeviceSecret, Node, Longitude, Latitude]).
%% profile配置

View File

@ -19,6 +19,10 @@
-include("dgiot_parse.hrl").
-include_lib("dgiot/include/logger.hrl").
-dgiot_data("ets").
-export([init_ets/0]).
-define(DGIOT_USERSESSION, dgiot_usersession).
%% API
-export([
login/2,
@ -44,6 +48,22 @@
-export([create_user/2, delete_user/2, put_user/2, disableusere/3, check_roles/1]).
-export([login_by_account/2, login_by_token/2, login_by_mail_phone/1, do_login/1]).
-export([create_user_for_app/1, get_token/1, set_cookies/3, add_acl/5]).
-export([get_usersession/1,put_usersession/1]).
init_ets() ->
dgiot_data:init(?DGIOT_USERSESSION).
get_usersession(Depart_token)->
dgiot_data:get(?DGIOT_USERSESSION,{Depart_token}).
put_usersession(SessionMap)->
[Depart_token] = maps:keys(SessionMap),
User_session = maps:get(Depart_token,SessionMap),
dgiot_data:insert(?DGIOT_USERSESSION,{Depart_token},User_session).
%%
login(UserName, Password) ->
login(?DEFAULT, UserName, Password).

View File

@ -83,7 +83,14 @@ do_request(get_token, #{<<"name">> := Name} = _Body, #{<<"sessionToken">> := Ses
<<"order">> => <<"updatedAt">>, <<"limit">> => 1,
<<"where">> => #{<<"name">> => Name}}, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"results">> := Results}} when length(Results) > 0 ->
dgiot_parse_auth:check_roles(Name);
Result = dgiot_parse_auth:check_roles(Name),
case Result of
{200, #{<<"access_token">> := Depart_token}} ->
dgiot_parse_auth:put_usersession(#{<<Depart_token/binary>> => SessionToken}),
Result;
_ ->
Result
end;
{ok, #{<<"results">> := Roles}} when length(Roles) == 0 ->
{404, #{<<"code">> => 101, <<"error">> => <<"User not found.">>}};
{error, Error} ->