From b8a29552e620208cd893d544422a85c17ed9f7fa Mon Sep 17 00:00:00 2001 From: jhonliu <34489690@qq.com> Date: Thu, 5 May 2022 08:51:11 +0800 Subject: [PATCH] feat: add hook token --- .../dgiot_device/src/dgiot_device_channel.erl | 14 +++--- .../src/utils/dgiot_device_static.erl | 9 +--- apps/dgiot_parse/src/dgiot_parse_hook.erl | 43 ++++++++++--------- apps/dgiot_parse/src/dgiot_parse_rest.erl | 10 ++++- apps/dgiot_task/src/dgiot_profile_channel.erl | 2 +- apps/dgiot_task/src/dgiot_task_channel.erl | 5 ++- 6 files changed, 43 insertions(+), 40 deletions(-) diff --git a/apps/dgiot_device/src/dgiot_device_channel.erl b/apps/dgiot_device/src/dgiot_device_channel.erl index 91eb6923..0d76a101 100644 --- a/apps/dgiot_device/src/dgiot_device_channel.erl +++ b/apps/dgiot_device/src/dgiot_device_channel.erl @@ -133,35 +133,35 @@ handle_message(check, #state{env = #{<<"offline">> := OffLine, <<"checktime">> : dgiot_device:sync_parse(OffLine), {ok, State}; -handle_message({sync_parse, Pid, 'after', get, Header, <<"Product">>, #{<<"results">> := _Results} = ResBody}, State) -> +handle_message({sync_parse, Pid, 'after', get, Token, <<"Product">>, #{<<"results">> := _Results} = ResBody}, State) -> %% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid,Header]), - Key = dgiot_device_static:get_count(Header), + Key = dgiot_device_static:get_count(Token), timer:sleep(100), NewResBody = dgiot_device_static:stats(ResBody, Key), dgiot_parse_hook:publish(Pid, NewResBody), {ok, State}; -handle_message({sync_parse, Pid, 'after', get, _Header, <<"Product">>, #{<<"objectId">> := _ObjectId} = ResBody}, State) -> +handle_message({sync_parse, Pid, 'after', get, _Token, <<"Product">>, #{<<"objectId">> := _ObjectId} = ResBody}, State) -> %% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, ObjectId]), dgiot_parse_hook:publish(Pid, ResBody), {ok, State}; -handle_message({sync_parse, Pid, 'after', put, _Header, <<"Device">>, QueryData}, State) -> +handle_message({sync_parse, Pid, 'after', put, _Token, <<"Device">>, QueryData}, State) -> io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]), dgiot_device:put(QueryData), {ok, State}; -handle_message({sync_parse, Pid, 'after', post, _Header, <<"Product">>, QueryData}, State) -> +handle_message({sync_parse, Pid, 'after', post, _Token, <<"Product">>, QueryData}, State) -> io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]), dgiot_product_hook:post('after', QueryData), {ok, State}; -handle_message({sync_parse, Pid, 'after', put, _Header, <<"Product">>, QueryData}, State) -> +handle_message({sync_parse, Pid, 'after', put, _Token, <<"Product">>, QueryData}, State) -> io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]), dgiot_product_hook:put('after', QueryData), {ok, State}; -handle_message({sync_parse, Pid, 'after', delete, _Header, <<"Product">>, ObjectId}, State) -> +handle_message({sync_parse, Pid, 'after', delete, _Token, <<"Product">>, ObjectId}, State) -> io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, ObjectId]), dgiot_product_hook:delete('after', ObjectId), {ok, State}; diff --git a/apps/dgiot_device/src/utils/dgiot_device_static.erl b/apps/dgiot_device/src/utils/dgiot_device_static.erl index a771e550..f5a2bdc6 100644 --- a/apps/dgiot_device/src/utils/dgiot_device_static.erl +++ b/apps/dgiot_device/src/utils/dgiot_device_static.erl @@ -22,14 +22,7 @@ -include_lib("dgiot/include/logger.hrl"). -export([stats/2, val/2, val/3, get_count/1, get_count/3]). -get_count(Header) -> - Token = - case proplists:get_value("X-Parse-Session-Token", Header) of - undefined -> - proplists:get_value(<<"X-Parse-Session-Token">>, Header); - Token1 -> - Token1 - end, +get_count(Token) -> RoleIds = case dgiot_auth:get_session(dgiot_utils:to_binary(Token)) of #{<<"roles">> := Roles} -> diff --git a/apps/dgiot_parse/src/dgiot_parse_hook.erl b/apps/dgiot_parse/src/dgiot_parse_hook.erl index c72fa95c..aab9603c 100644 --- a/apps/dgiot_parse/src/dgiot_parse_hook.erl +++ b/apps/dgiot_parse/src/dgiot_parse_hook.erl @@ -35,7 +35,8 @@ get_trigger/0, get_trigger/2, add_all_trigger/1, - do_hook/2 + do_hook/2, + notify/5 ]). subscribe(Table, Method, Channel) -> @@ -58,21 +59,21 @@ subscribe(Table, Method, Channel, Keys) -> add_hook(Key) -> Fun = fun - ({'after', get, Header, Class, _QueryData, ResBody}) -> - send('after', get, Header, Class, dgiot_utils:to_map(ResBody)), + ({'after', get, Token, Class, _QueryData, ResBody}) -> + notify('after', get, Token, Class, dgiot_utils:to_map(ResBody)), receive_ack(ResBody); - ({'after', get, Header, Class, _ObjectId, _QueryData, ResBody}) -> - send('after', get, Header, Class, dgiot_utils:to_map(ResBody)), + ({'after', get, Token, Class, _ObjectId, _QueryData, ResBody}) -> + notify('after', get, Token, Class, dgiot_utils:to_map(ResBody)), receive_ack(ResBody); - ({'after', post, Header, Class, QueryData, ResBody}) -> - send('after', post, Header, Class, dgiot_utils:to_map(QueryData)), + ({'after', post, Token, Class, QueryData, ResBody}) -> + notify('after', post, Token, Class, dgiot_utils:to_map(QueryData)), {ok, ResBody}; - ({'after', put, Header, Class, ObjectId, QueryData, ResBody}) -> + ({'after', put, Token, Class, ObjectId, QueryData, ResBody}) -> Map = dgiot_utils:to_map(QueryData), - send('after', put, Header, Class, Map#{<<"objectId">> => ObjectId}), + notify('after', put, Token, Class, Map#{<<"objectId">> => ObjectId}), {ok, ResBody}; - ({'after', delete, Header, Class, ObjectId, _QueryData, ResBody}) -> - send('after', delete, Header, Class, ObjectId), + ({'after', delete, Token, Class, ObjectId, _QueryData, ResBody}) -> + notify('after', delete, Token, Class, ObjectId), {ok, ResBody}; (_) -> {ok, []} @@ -97,7 +98,7 @@ receive_ack(ResBody) -> {ok, ResBody} end. -send(Type, Method, Header, Class, Data) -> +notify(Type, Method, Token, Class, Data) -> case dgiot_data:get({sub, Class, Method}) of not_find -> pass; @@ -105,33 +106,33 @@ send(Type, Method, Header, Class, Data) -> lists:map( fun ({ChannelId, [<<"*">>]}) -> - dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Header, Class, Data}); + dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Token, Class, Data}); ({ChannelId, Keys}) when Method == put -> List = maps:keys(Data), case List -- Keys of List -> pass; _ -> - dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Header, Class, Data}) + dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Token, Class, Data}) end end, List) end. -do_request_hook(Type, [<<"classes">>, Class, ObjectId], Method, Header, QueryData, ResBody) -> - do_hook({<>, Method}, {Type, Method, Header, Class, ObjectId, QueryData, ResBody}); -do_request_hook(Type, [<<"classes">>, Class], Method, Header, QueryData, ResBody) -> - do_hook({Class, Method}, {Type, Method, Header, Class, QueryData, ResBody}); +do_request_hook(Type, [<<"classes">>, Class, ObjectId], Method, Token, QueryData, ResBody) -> + do_hook({<>, Method}, {Type, Method, Token, Class, ObjectId, QueryData, ResBody}); +do_request_hook(Type, [<<"classes">>, Class], Method, Token, QueryData, ResBody) -> + do_hook({Class, Method}, {Type, Method, Token, Class, QueryData, ResBody}); %% 批处理只做异步通知,不做同步hook -do_request_hook(Type, [<<"batch">>], _Method, Header, QueryData, #{<<"requests">> := Requests} = ResBody) -> +do_request_hook(Type, [<<"batch">>], _Method, Token, QueryData, #{<<"requests">> := Requests} = ResBody) -> lists:map(fun(Request) -> SubMethod = maps:get(<<"method">>, Request, <<"post">>), Path = maps:get(<<"path">>, Request, <<"">>), Body = maps:get(<<"body">>, Request, #{}), {match, PathList} = re:run(Path, <<"([^/]+)">>, [global, {capture, all_but_first, binary}]), - do_request_hook(Type, lists:concat(PathList), dgiot_parse_rest:method(SubMethod), Header, QueryData, Body) + do_request_hook(Type, lists:concat(PathList), dgiot_parse_rest:method(SubMethod), Token, QueryData, Body) end, Requests), {ok, ResBody}; -do_request_hook(_Type, _Paths, _Method, _Header, _QueryData, _ResBody) -> +do_request_hook(_Type, _Paths, _Method, _Token, _QueryData, _ResBody) -> ignore. do_hook(Key, Args) -> case catch dgiot_hook:run_hook(Key, Args) of diff --git a/apps/dgiot_parse/src/dgiot_parse_rest.erl b/apps/dgiot_parse/src/dgiot_parse_rest.erl index fe6aeac8..07b9254c 100644 --- a/apps/dgiot_parse/src/dgiot_parse_rest.erl +++ b/apps/dgiot_parse/src/dgiot_parse_rest.erl @@ -358,7 +358,15 @@ do_request_after(Method0, Path, Header, NewQueryData, ResBody, Options) -> end, {match, PathList} = re:run(Path, <<"([^/]+)">>, [global, {capture, all_but_first, binary}]), %% io:format("~s ~p ~p ~p ~n",[?FILE, ?LINE, Path, NewQueryData]), - dgiot_parse_hook:do_request_hook('after', lists:concat(PathList), Method, Header, NewQueryData, ResBody). + dgiot_parse_hook:do_request_hook('after', lists:concat(PathList), Method, get_token(Header), NewQueryData, ResBody). + +get_token(Header) -> + case proplists:get_value("X-Parse-Session-Token", Header) of + undefined -> + proplists:get_value(<<"X-Parse-Session-Token">>, Header); + Token1 -> + Token1 + end. list_join([], Sep) when is_list(Sep) -> []; list_join([H | T], Sep) -> diff --git a/apps/dgiot_task/src/dgiot_profile_channel.erl b/apps/dgiot_task/src/dgiot_profile_channel.erl index a3baa7b5..e65e5f18 100644 --- a/apps/dgiot_task/src/dgiot_profile_channel.erl +++ b/apps/dgiot_task/src/dgiot_profile_channel.erl @@ -127,7 +127,7 @@ handle_message({check_profile, _Args}, State) -> %%<<\"248e9007bf\">> %% } %%#state{env = #{<<"args">> := #{<<"mode">> := <<"incremental">>}}} = -handle_message({sync_parse, _Pid, 'after', put, _Header, <<"Device">>, QueryData}, State) -> +handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, QueryData}, State) -> dgiot_task_hook:put('after',QueryData), {ok, State}; diff --git a/apps/dgiot_task/src/dgiot_task_channel.erl b/apps/dgiot_task/src/dgiot_task_channel.erl index fee1c981..2c05a360 100644 --- a/apps/dgiot_task/src/dgiot_task_channel.erl +++ b/apps/dgiot_task/src/dgiot_task_channel.erl @@ -218,9 +218,10 @@ handle_event(_EventId, Event, State) -> ?LOG(info, "channel ~p", [Event]), {ok, State}. -handle_message({sync_parse, delete, _Table, _BeforeData, AfterData, DeviceId}, State) -> +handle_message({sync_parse, _Pid, 'after', delete, _Token, <<"Device">>, _DeviceId}, State) -> +%%handle_message({sync_parse, delete, _Table, _BeforeData, AfterData, DeviceId}, State) -> %% io:format("DeviceArgs ~p~n", [jsx:decode(Args, [{labels, binary}, return_maps])]), - dgiot_task_hook:delete('after', AfterData, DeviceId), +%% dgiot_task_hook:delete('after', AfterData, DeviceId), {ok, State}; handle_message(_Message, State) ->