mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-11-30 11:17:48 +08:00
feat: add hook token
This commit is contained in:
parent
c9f460a244
commit
b8a29552e6
@ -133,35 +133,35 @@ handle_message(check, #state{env = #{<<"offline">> := OffLine, <<"checktime">> :
|
||||
dgiot_device:sync_parse(OffLine),
|
||||
{ok, State};
|
||||
|
||||
handle_message({sync_parse, Pid, 'after', get, Header, <<"Product">>, #{<<"results">> := _Results} = ResBody}, State) ->
|
||||
handle_message({sync_parse, Pid, 'after', get, Token, <<"Product">>, #{<<"results">> := _Results} = ResBody}, State) ->
|
||||
%% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid,Header]),
|
||||
Key = dgiot_device_static:get_count(Header),
|
||||
Key = dgiot_device_static:get_count(Token),
|
||||
timer:sleep(100),
|
||||
NewResBody = dgiot_device_static:stats(ResBody, Key),
|
||||
dgiot_parse_hook:publish(Pid, NewResBody),
|
||||
{ok, State};
|
||||
|
||||
handle_message({sync_parse, Pid, 'after', get, _Header, <<"Product">>, #{<<"objectId">> := _ObjectId} = ResBody}, State) ->
|
||||
handle_message({sync_parse, Pid, 'after', get, _Token, <<"Product">>, #{<<"objectId">> := _ObjectId} = ResBody}, State) ->
|
||||
%% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, ObjectId]),
|
||||
dgiot_parse_hook:publish(Pid, ResBody),
|
||||
{ok, State};
|
||||
|
||||
handle_message({sync_parse, Pid, 'after', put, _Header, <<"Device">>, QueryData}, State) ->
|
||||
handle_message({sync_parse, Pid, 'after', put, _Token, <<"Device">>, QueryData}, State) ->
|
||||
io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]),
|
||||
dgiot_device:put(QueryData),
|
||||
{ok, State};
|
||||
|
||||
handle_message({sync_parse, Pid, 'after', post, _Header, <<"Product">>, QueryData}, State) ->
|
||||
handle_message({sync_parse, Pid, 'after', post, _Token, <<"Product">>, QueryData}, State) ->
|
||||
io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]),
|
||||
dgiot_product_hook:post('after', QueryData),
|
||||
{ok, State};
|
||||
|
||||
handle_message({sync_parse, Pid, 'after', put, _Header, <<"Product">>, QueryData}, State) ->
|
||||
handle_message({sync_parse, Pid, 'after', put, _Token, <<"Product">>, QueryData}, State) ->
|
||||
io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]),
|
||||
dgiot_product_hook:put('after', QueryData),
|
||||
{ok, State};
|
||||
|
||||
handle_message({sync_parse, Pid, 'after', delete, _Header, <<"Product">>, ObjectId}, State) ->
|
||||
handle_message({sync_parse, Pid, 'after', delete, _Token, <<"Product">>, ObjectId}, State) ->
|
||||
io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, ObjectId]),
|
||||
dgiot_product_hook:delete('after', ObjectId),
|
||||
{ok, State};
|
||||
|
@ -22,14 +22,7 @@
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
-export([stats/2, val/2, val/3, get_count/1, get_count/3]).
|
||||
|
||||
get_count(Header) ->
|
||||
Token =
|
||||
case proplists:get_value("X-Parse-Session-Token", Header) of
|
||||
undefined ->
|
||||
proplists:get_value(<<"X-Parse-Session-Token">>, Header);
|
||||
Token1 ->
|
||||
Token1
|
||||
end,
|
||||
get_count(Token) ->
|
||||
RoleIds =
|
||||
case dgiot_auth:get_session(dgiot_utils:to_binary(Token)) of
|
||||
#{<<"roles">> := Roles} ->
|
||||
|
@ -35,7 +35,8 @@
|
||||
get_trigger/0,
|
||||
get_trigger/2,
|
||||
add_all_trigger/1,
|
||||
do_hook/2
|
||||
do_hook/2,
|
||||
notify/5
|
||||
]).
|
||||
|
||||
subscribe(Table, Method, Channel) ->
|
||||
@ -58,21 +59,21 @@ subscribe(Table, Method, Channel, Keys) ->
|
||||
add_hook(Key) ->
|
||||
Fun =
|
||||
fun
|
||||
({'after', get, Header, Class, _QueryData, ResBody}) ->
|
||||
send('after', get, Header, Class, dgiot_utils:to_map(ResBody)),
|
||||
({'after', get, Token, Class, _QueryData, ResBody}) ->
|
||||
notify('after', get, Token, Class, dgiot_utils:to_map(ResBody)),
|
||||
receive_ack(ResBody);
|
||||
({'after', get, Header, Class, _ObjectId, _QueryData, ResBody}) ->
|
||||
send('after', get, Header, Class, dgiot_utils:to_map(ResBody)),
|
||||
({'after', get, Token, Class, _ObjectId, _QueryData, ResBody}) ->
|
||||
notify('after', get, Token, Class, dgiot_utils:to_map(ResBody)),
|
||||
receive_ack(ResBody);
|
||||
({'after', post, Header, Class, QueryData, ResBody}) ->
|
||||
send('after', post, Header, Class, dgiot_utils:to_map(QueryData)),
|
||||
({'after', post, Token, Class, QueryData, ResBody}) ->
|
||||
notify('after', post, Token, Class, dgiot_utils:to_map(QueryData)),
|
||||
{ok, ResBody};
|
||||
({'after', put, Header, Class, ObjectId, QueryData, ResBody}) ->
|
||||
({'after', put, Token, Class, ObjectId, QueryData, ResBody}) ->
|
||||
Map = dgiot_utils:to_map(QueryData),
|
||||
send('after', put, Header, Class, Map#{<<"objectId">> => ObjectId}),
|
||||
notify('after', put, Token, Class, Map#{<<"objectId">> => ObjectId}),
|
||||
{ok, ResBody};
|
||||
({'after', delete, Header, Class, ObjectId, _QueryData, ResBody}) ->
|
||||
send('after', delete, Header, Class, ObjectId),
|
||||
({'after', delete, Token, Class, ObjectId, _QueryData, ResBody}) ->
|
||||
notify('after', delete, Token, Class, ObjectId),
|
||||
{ok, ResBody};
|
||||
(_) ->
|
||||
{ok, []}
|
||||
@ -97,7 +98,7 @@ receive_ack(ResBody) ->
|
||||
{ok, ResBody}
|
||||
end.
|
||||
|
||||
send(Type, Method, Header, Class, Data) ->
|
||||
notify(Type, Method, Token, Class, Data) ->
|
||||
case dgiot_data:get({sub, Class, Method}) of
|
||||
not_find ->
|
||||
pass;
|
||||
@ -105,33 +106,33 @@ send(Type, Method, Header, Class, Data) ->
|
||||
lists:map(
|
||||
fun
|
||||
({ChannelId, [<<"*">>]}) ->
|
||||
dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Header, Class, Data});
|
||||
dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Token, Class, Data});
|
||||
({ChannelId, Keys}) when Method == put ->
|
||||
List = maps:keys(Data),
|
||||
case List -- Keys of
|
||||
List ->
|
||||
pass;
|
||||
_ ->
|
||||
dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Header, Class, Data})
|
||||
dgiot_channelx:do_message(ChannelId, {sync_parse, self(), Type, Method, Token, Class, Data})
|
||||
end
|
||||
end, List)
|
||||
end.
|
||||
|
||||
do_request_hook(Type, [<<"classes">>, Class, ObjectId], Method, Header, QueryData, ResBody) ->
|
||||
do_hook({<<Class/binary, "/*">>, Method}, {Type, Method, Header, Class, ObjectId, QueryData, ResBody});
|
||||
do_request_hook(Type, [<<"classes">>, Class], Method, Header, QueryData, ResBody) ->
|
||||
do_hook({Class, Method}, {Type, Method, Header, Class, QueryData, ResBody});
|
||||
do_request_hook(Type, [<<"classes">>, Class, ObjectId], Method, Token, QueryData, ResBody) ->
|
||||
do_hook({<<Class/binary, "/*">>, Method}, {Type, Method, Token, Class, ObjectId, QueryData, ResBody});
|
||||
do_request_hook(Type, [<<"classes">>, Class], Method, Token, QueryData, ResBody) ->
|
||||
do_hook({Class, Method}, {Type, Method, Token, Class, QueryData, ResBody});
|
||||
%% 批处理只做异步通知,不做同步hook
|
||||
do_request_hook(Type, [<<"batch">>], _Method, Header, QueryData, #{<<"requests">> := Requests} = ResBody) ->
|
||||
do_request_hook(Type, [<<"batch">>], _Method, Token, QueryData, #{<<"requests">> := Requests} = ResBody) ->
|
||||
lists:map(fun(Request) ->
|
||||
SubMethod = maps:get(<<"method">>, Request, <<"post">>),
|
||||
Path = maps:get(<<"path">>, Request, <<"">>),
|
||||
Body = maps:get(<<"body">>, Request, #{}),
|
||||
{match, PathList} = re:run(Path, <<"([^/]+)">>, [global, {capture, all_but_first, binary}]),
|
||||
do_request_hook(Type, lists:concat(PathList), dgiot_parse_rest:method(SubMethod), Header, QueryData, Body)
|
||||
do_request_hook(Type, lists:concat(PathList), dgiot_parse_rest:method(SubMethod), Token, QueryData, Body)
|
||||
end, Requests),
|
||||
{ok, ResBody};
|
||||
do_request_hook(_Type, _Paths, _Method, _Header, _QueryData, _ResBody) ->
|
||||
do_request_hook(_Type, _Paths, _Method, _Token, _QueryData, _ResBody) ->
|
||||
ignore.
|
||||
do_hook(Key, Args) ->
|
||||
case catch dgiot_hook:run_hook(Key, Args) of
|
||||
|
@ -358,7 +358,15 @@ do_request_after(Method0, Path, Header, NewQueryData, ResBody, Options) ->
|
||||
end,
|
||||
{match, PathList} = re:run(Path, <<"([^/]+)">>, [global, {capture, all_but_first, binary}]),
|
||||
%% io:format("~s ~p ~p ~p ~n",[?FILE, ?LINE, Path, NewQueryData]),
|
||||
dgiot_parse_hook:do_request_hook('after', lists:concat(PathList), Method, Header, NewQueryData, ResBody).
|
||||
dgiot_parse_hook:do_request_hook('after', lists:concat(PathList), Method, get_token(Header), NewQueryData, ResBody).
|
||||
|
||||
get_token(Header) ->
|
||||
case proplists:get_value("X-Parse-Session-Token", Header) of
|
||||
undefined ->
|
||||
proplists:get_value(<<"X-Parse-Session-Token">>, Header);
|
||||
Token1 ->
|
||||
Token1
|
||||
end.
|
||||
|
||||
list_join([], Sep) when is_list(Sep) -> [];
|
||||
list_join([H | T], Sep) ->
|
||||
|
@ -127,7 +127,7 @@ handle_message({check_profile, _Args}, State) ->
|
||||
%%<<\"248e9007bf\">>
|
||||
%% }
|
||||
%%#state{env = #{<<"args">> := #{<<"mode">> := <<"incremental">>}}} =
|
||||
handle_message({sync_parse, _Pid, 'after', put, _Header, <<"Device">>, QueryData}, State) ->
|
||||
handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, QueryData}, State) ->
|
||||
dgiot_task_hook:put('after',QueryData),
|
||||
{ok, State};
|
||||
|
||||
|
@ -218,9 +218,10 @@ handle_event(_EventId, Event, State) ->
|
||||
?LOG(info, "channel ~p", [Event]),
|
||||
{ok, State}.
|
||||
|
||||
handle_message({sync_parse, delete, _Table, _BeforeData, AfterData, DeviceId}, State) ->
|
||||
handle_message({sync_parse, _Pid, 'after', delete, _Token, <<"Device">>, _DeviceId}, State) ->
|
||||
%%handle_message({sync_parse, delete, _Table, _BeforeData, AfterData, DeviceId}, State) ->
|
||||
%% io:format("DeviceArgs ~p~n", [jsx:decode(Args, [{labels, binary}, return_maps])]),
|
||||
dgiot_task_hook:delete('after', AfterData, DeviceId),
|
||||
%% dgiot_task_hook:delete('after', AfterData, DeviceId),
|
||||
{ok, State};
|
||||
|
||||
handle_message(_Message, State) ->
|
||||
|
Loading…
Reference in New Issue
Block a user