Merge branch 'master' of github.com:dgiot/dgiot

This commit is contained in:
guo 2022-06-21 10:52:59 +08:00
commit 5deaab53fd
11 changed files with 937 additions and 16 deletions

View File

@ -19,7 +19,7 @@ DG-IoT是国内首款轻量级开源工业物联网平台我们致力于提
## 快速体验与微信群
| 微信技术支持群 | [QQ技术支持群](https://jq.qq.com/?_wv=1027&k=LipWZvDe) | 小程序体验 |电脑端https://prod.iotn2n.com|
|:---:|:---:|:---:|:---:|
|![image](https://user-images.githubusercontent.com/51999461/172523982-c3179a1d-aa1a-46b0-b614-1e347cbab33f.png)|<img src="http://dgiot-1253666439.cos.ap-shanghai-fsi.myqcloud.com/shuwa_tech/zh/QQ%E6%8A%80%E6%9C%AF%E7%BE%A4%E4%BA%8C%E7%BB%B4%E7%A0%81.png" width = "60%" /> |<img src="http://dgiot-1253666439.cos.ap-shanghai-fsi.myqcloud.com/dgiot_release/dgiot_wechat.jpg" width = "60%" />|运维账号dgiot_admin </br> 密码: dgiot_admin </br> </br>开发账号: dgiot_dev </br> 密码: dgiot_dev |
|![image](https://user-images.githubusercontent.com/51999461/174512542-cd2ef696-17aa-4de0-994a-c5b690b5e60f.png)|<img src="http://dgiot-1253666439.cos.ap-shanghai-fsi.myqcloud.com/shuwa_tech/zh/QQ%E6%8A%80%E6%9C%AF%E7%BE%A4%E4%BA%8C%E7%BB%B4%E7%A0%81.png" width = "60%" /> |<img src="http://dgiot-1253666439.cos.ap-shanghai-fsi.myqcloud.com/dgiot_release/dgiot_wechat.jpg" width = "60%" />|运维账号dgiot_admin </br> 密码: dgiot_admin </br> </br>开发账号: dgiot_dev </br> 密码: dgiot_dev |
## 核心特色

View File

@ -74,9 +74,11 @@ init([]) ->
KernelSup = child_spec(dgiot_kernel_sup, supervisor, []),
MnesiaSup = child_spec(dgiot_mnesia_sup, supervisor, []),
CMSup = child_spec(dgiot_cm_sup, supervisor, []),
RuleSup = child_spec(dgiot_rule_engine_sup,supervisor, []),
Childs = [KernelSup]
++ [MnesiaSup]
++ [CMSup]
++ [RuleSup]
++ [child_spec(dgiot_dcache, worker, [?DCACHE]),
dgiot_channelx:spec(channelx_mgr)],

View File

@ -0,0 +1,164 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_rule_engine).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("dgiot/include/logger.hrl").
-export([create_rule/1
, update_rule/1
, delete_rule/1
, refresh_rules/0
, test/0
]).
-type(rule() :: #rule{}).
-export_type([rule/0
]).
-define(T_RETRY, 60000).
%% APIs for rules and resources
%%------------------------------------------------------------------------------
-dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]).
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
create_rule(Params = #{rawsql := Sql}) ->
case dgiot_rule_sqlparser:parse_select(Sql) of
{ok, Select} ->
RuleId = maps:get(id, Params, rule_id()),
Enabled = maps:get(enabled, Params, true),
Rule = #rule{
id = RuleId,
rawsql = Sql,
for = dgiot_rule_sqlparser:select_from(Select),
is_foreach = dgiot_rule_sqlparser:select_is_foreach(Select),
fields = dgiot_rule_sqlparser:select_fields(Select),
doeach = dgiot_rule_sqlparser:select_doeach(Select),
incase = dgiot_rule_sqlparser:select_incase(Select),
conditions = dgiot_rule_sqlparser:select_where(Select),
enabled = Enabled,
created_at = erlang:system_time(millisecond),
description = maps:get(description, Params, ""),
state = normal
},
ok = dgiot_rule_registry:add_rule(Rule),
{ok, Rule};
Reason -> {error, Reason}
end.
-spec(update_rule(#{id := binary(), _ => _}) -> {ok, rule()} | {error, {not_found, rule_id()}}).
update_rule(Params = #{id := RuleId}) ->
case dgiot_rule_registry:get_rule(RuleId) of
{ok, Rule0} ->
try may_update_rule_params(Rule0, Params) of
Rule ->
ok = dgiot_rule_registry:add_rule(Rule),
{ok, Rule}
catch
throw:Reason ->
{error, Reason}
end;
not_found ->
{error, {not_found, RuleId}}
end.
-spec(delete_rule(RuleId :: rule_id()) -> ok).
delete_rule(RuleId) ->
case dgiot_rule_registry:get_rule(RuleId) of
{ok, Rule} ->
try
_ = ?CLUSTER_CALL(clear_rule, [Rule]),
ok = dgiot_rule_registry:remove_rule(Rule)
catch
Error:Reason:ST ->
?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}])
end;
not_found ->
ok
end.
-spec(refresh_rules() -> ok).
refresh_rules() ->
lists:foreach(fun
(#rule{enabled = true} = Rule) ->
try refresh_rule(Rule)
catch _:_ ->
dgiot_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup})
end;
(_) -> ok
end, dgiot_rule_registry:get_rules()).
refresh_rule(#rule{id = RuleId, for = Topics}) ->
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
lists:foreach(fun emqx_rule_events:load/1, Topics).
-dialyzer([{nowarn_function, may_update_rule_params/2}]).
may_update_rule_params(Rule, Params = #{rawsql := SQL}) ->
case emqx_rule_sqlparser:parse_select(SQL) of
{ok, Select} ->
may_update_rule_params(
Rule#rule{
rawsql = SQL,
for = dgiot_rule_sqlparser:select_from(Select),
is_foreach = dgiot_rule_sqlparser:select_is_foreach(Select),
fields = dgiot_rule_sqlparser:select_fields(Select),
doeach = dgiot_rule_sqlparser:select_doeach(Select),
incase = dgiot_rule_sqlparser:select_incase(Select),
conditions = dgiot_rule_sqlparser:select_where(Select)
},
maps:remove(rawsql, Params));
Reason -> throw(Reason)
end;
may_update_rule_params(Rule = #rule{enabled = OldEnb, actions = Actions, state = OldState},
Params = #{enabled := NewEnb}) ->
State = case {OldEnb, NewEnb} of
{false, true} ->
_ = ?CLUSTER_CALL(refresh_rule, [Rule]),
force_changed;
{true, false} ->
_ = ?CLUSTER_CALL(clear_actions, [Actions]),
force_changed;
_NoChange -> OldState
end,
may_update_rule_params(Rule#rule{enabled = NewEnb, state = State}, maps:remove(enabled, Params));
may_update_rule_params(Rule, Params = #{description := Descr}) ->
may_update_rule_params(Rule#rule{description = Descr}, maps:remove(description, Params));
may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
Rule.
rule_id() ->
gen_id("rule:", fun dgiot_rule_registry:get_rule/1).
gen_id(Prefix, TestFun) ->
Id = iolist_to_binary([Prefix, emqx_rule_id:gen()]),
case TestFun(Id) of
not_found -> Id;
_Res -> gen_id(Prefix, TestFun)
end.
test() ->
Sql = <<"SELECT "
" case "
" when a = 1 then a "
" when a = 2 then a "
" else a-1 "
" end "
"FROM abc">>,
dgiot_rule_sqlparser:parse_select(Sql),
create_rule(#{rawsql => Sql}).

View File

@ -0,0 +1,48 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_rule_engine_sup).
-behaviour(supervisor).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-export([start_link/0]).
-export([start_locker/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Registry = #{id => dgiot_rule_registry,
start => {dgiot_rule_registry, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [dgiot_rule_registry]},
{ok, {{one_for_one, 10, 10}, [Registry]}}.
start_locker() ->
Locker = #{id => dgiot_rule_locker,
start => {dgiot_rule_locker, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [dgiot_rule_locker]},
supervisor:start_child(?MODULE, Locker).

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_rule_locker).
-export([start_link/0]).
-export([ lock/1
, unlock/1
]).
start_link() ->
ekka_locker:start_link(?MODULE).
-spec(lock(binary()) -> ekka_locker:lock_result()).
lock(Id) ->
ekka_locker:acquire(?MODULE, Id, local).
-spec(unlock(binary()) -> {boolean(), [node()]}).
unlock(Id) ->
ekka_locker:release(?MODULE, Id, local).

View File

@ -0,0 +1,226 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_rule_registry).
-behaviour(gen_server).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("dgiot/include/logger.hrl").
-include_lib("stdlib/include/qlc.hrl").
-define(DGIOT_RULE_TAB, dgiot_rule).
-define(DG_KV_TAB, '@dg_rule_engine_db').
-export([start_link/0]).
%% Rule Management
-export([ get_rules/0
, get_rules_for/1
, get_rules_with_same_event/1
, get_rules_ordered_by_ts/0
, get_rule/1
, add_rule/1
, add_rules/1
, remove_rule/1
, remove_rules/1
]).
-export([ load_hooks_for_rule/1
, unload_hooks_for_rule/1
]).
%% for debug purposes
-export([dump/0]).
%% gen_server Callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-define(REGISTRY, ?MODULE).
-define(T_CALL, 10000).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
%% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
%% Optimize storage
StoreProps = [{ets, [{read_concurrency, true}]}],
%% Rule table
ok = ekka_mnesia:create_table(?DGIOT_RULE_TAB, [
{ram_copies, [node()]},
{record_name, rule},
{index, [#rule.for]},
{attributes, record_info(fields, rule)},
{storage_properties, StoreProps}]);
mnesia(copy) ->
%% Copy rule table
ok = ekka_mnesia:copy_table(?DGIOT_RULE_TAB, ram_copies).
dump() ->
io:format("Rules: ~p~n",
[ets:tab2list(?DGIOT_RULE_TAB)]).
%%------------------------------------------------------------------------------
%% Start the registry
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% Rule Management
%%------------------------------------------------------------------------------
-spec(get_rules() -> list(emqx_rule_engine:rule())).
get_rules() ->
get_all_records(?DGIOT_RULE_TAB).
get_rules_ordered_by_ts() ->
F = fun() ->
Query = qlc:q([E || E <- mnesia:table(?DGIOT_RULE_TAB)]),
qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
end,
{atomic, List} = mnesia:transaction(F),
List.
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_for(Topic) ->
[Rule || Rule = #rule{for = For} <- get_rules(),
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_with_same_event(Topic) ->
EventName = emqx_rule_events:event_name(Topic),
[Rule || Rule = #rule{for = For} <- get_rules(),
lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)].
is_of_event_name(EventName, Topic) ->
EventName =:= emqx_rule_events:event_name(Topic).
-spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found).
get_rule(Id) ->
case mnesia:dirty_read(?RULE_TAB, Id) of
[Rule] -> {ok, Rule};
[] -> not_found
end.
-spec(add_rule(emqx_rule_engine:rule()) -> ok).
add_rule(Rule) when is_record(Rule, rule) ->
add_rules([Rule]).
-spec(add_rules(list(emqx_rule_engine:rule())) -> ok).
add_rules(Rules) ->
gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL).
-spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok).
remove_rule(RuleOrId) ->
remove_rules([RuleOrId]).
-spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok).
remove_rules(Rules) ->
gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
%% @private
insert_rule(Rule) ->
_ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
mnesia:write(?DGIOT_RULE_TAB, Rule, write).
%% @private
delete_rule(RuleId) when is_binary(RuleId) ->
case get_rule(RuleId) of
{ok, Rule} -> delete_rule(Rule);
not_found -> ok
end;
delete_rule(Rule) ->
_ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
mnesia:delete_object(?DGIOT_RULE_TAB, Rule, write).
load_hooks_for_rule(#rule{for = Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics).
unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
lists:foreach(fun(Topic) ->
case get_rules_with_same_event(Topic) of
[#rule{id = Id}] -> %% we are now deleting the last rule
emqx_rule_events:unload(Topic);
_ -> ok
end
end, Topics).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
_TableId = ets:new(?DG_KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
{ok, #{}}.
handle_call({add_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
{reply, ok, State};
handle_call({remove_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Private functions
%%------------------------------------------------------------------------------
get_all_records(Tab) ->
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
ets:tab2list(Tab).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Result} -> Result;
{aborted, Reason} -> error(Reason)
end.

View File

@ -0,0 +1,346 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_rule_runtime).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("dgiot/include/logger.hrl").
-export([ apply_rule/2
, apply_rules/2
, clear_rule_payload/0
]).
-import(emqx_rule_maps,
[ nested_get/2
, range_gen/2
, range_get/3
]).
-compile({no_auto_import,[alias/1]}).
-type(input() :: map()).
-type(alias() :: atom()).
-type(collection() :: {alias(), [term()]}).
-define(ephemeral_alias(TYPE, NAME),
iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))).
-define(ActionMaxRetry, 3).
%%------------------------------------------------------------------------------
%% Apply rules
%%------------------------------------------------------------------------------
-spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok).
apply_rules([], _Input) ->
ok;
apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules(More, Input);
apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
try apply_rule_discard_result(Rule, Input)
catch
%% ignore the errors if select or match failed
_:{select_and_transform_error, Error} ->
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{match_conditions_error, Error} ->
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{select_and_collect_error, Error} ->
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{match_incase_error, Error} ->
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
[RuleID, Error]);
_:Error:StkTrace ->
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
[RuleID, Error, StkTrace])
end,
apply_rules(More, Input).
apply_rule_discard_result(Rule, Input) ->
_ = apply_rule(Rule, Input),
ok.
apply_rule(Rule = #rule{id = RuleID}, Input) ->
clear_rule_payload(),
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
do_apply_rule(#rule{is_foreach = true,
fields = Fields,
conditions = Conditions}, Input) ->
{Selected, _Collection} = ?RAISE(select_and_collect(Fields, Input),
{select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
ColumnsAndSelected = maps:merge(Input, Selected),
case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true ->
{ok, match};
false ->
{error, nomatch}
end;
do_apply_rule(#rule{
is_foreach = false,
fields = Fields,
conditions = Conditions}, Input) ->
Selected = ?RAISE(select_and_transform(Fields, Input),
{select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}),
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true ->
{ok, match};
false ->
{error, nomatch}
end.
clear_rule_payload() ->
erlang:erase(rule_payload).
%% SELECT Clause
select_and_transform(Fields, Input) ->
select_and_transform(Fields, Input, #{}).
select_and_transform([], _Input, Output) ->
Output;
select_and_transform(['*'|More], Input, Output) ->
select_and_transform(More, Input, maps:merge(Output, Input));
select_and_transform([{as, Field, Alias}|More], Input, Output) ->
Val = eval(Field, Input),
select_and_transform(More,
nested_put(Alias, Val, Input),
nested_put(Alias, Val, Output));
select_and_transform([Field|More], Input, Output) ->
Val = eval(Field, Input),
Key = alias(Field),
select_and_transform(More,
nested_put(Key, Val, Input),
nested_put(Key, Val, Output)).
%% FOREACH Clause
-spec select_and_collect(list(), input()) -> {input(), collection()}.
select_and_collect(Fields, Input) ->
select_and_collect(Fields, Input, {#{}, {'item', []}}).
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) ->
Val = eval(Field, Input),
{nested_put(Alias, Val, Output), {A, ensure_list(Val)}};
select_and_collect([{as, Field, Alias}|More], Input, {Output, LastKV}) ->
Val = eval(Field, Input),
select_and_collect(More,
nested_put(Alias, Val, Input),
{nested_put(Alias, Val, Output), LastKV});
select_and_collect([Field], Input, {Output, _}) ->
Val = eval(Field, Input),
Key = alias(Field),
{nested_put(Key, Val, Output), {'item', ensure_list(Val)}};
select_and_collect([Field|More], Input, {Output, LastKV}) ->
Val = eval(Field, Input),
Key = alias(Field),
select_and_collect(More,
nested_put(Key, Val, Input),
{nested_put(Key, Val, Output), LastKV}).
%% Conditional Clauses such as WHERE, WHEN.
match_conditions({'and', L, R}, Data) ->
match_conditions(L, Data) andalso match_conditions(R, Data);
match_conditions({'or', L, R}, Data) ->
match_conditions(L, Data) orelse match_conditions(R, Data);
match_conditions({'not', Var}, Data) ->
case eval(Var, Data) of
Bool when is_boolean(Bool) ->
not Bool;
_other -> false
end;
match_conditions({in, Var, {list, Vals}}, Data) ->
lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]);
match_conditions({'fun', {_, Name}, Args}, Data) ->
apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
compare(Op, eval(L, Data), eval(R, Data));
%%match_conditions({'like', Var, Pattern}, Data) ->
%% match_like(eval(Var, Data), Pattern);
match_conditions({}, _Data) ->
true.
%% comparing numbers against strings
compare(Op, L, R) when is_number(L), is_binary(R) ->
do_compare(Op, L, number(R));
compare(Op, L, R) when is_binary(L), is_number(R) ->
do_compare(Op, number(L), R);
compare(Op, L, R) when is_atom(L), is_binary(R) ->
do_compare(Op, atom_to_binary(L, utf8), R);
compare(Op, L, R) when is_binary(L), is_atom(R) ->
do_compare(Op, L, atom_to_binary(R, utf8));
compare(Op, L, R) ->
do_compare(Op, L, R).
do_compare('=', L, R) -> L == R;
do_compare('>', L, R) -> L > R;
do_compare('<', L, R) -> L < R;
do_compare('<=', L, R) -> L =< R;
do_compare('>=', L, R) -> L >= R;
do_compare('<>', L, R) -> L /= R;
do_compare('!=', L, R) -> L /= R;
do_compare('=~', T, F) -> emqx_topic:match(T, F).
number(Bin) ->
try binary_to_integer(Bin)
catch error:badarg -> binary_to_float(Bin)
end.
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload));
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload));
eval({path, _} = Path, Input) ->
nested_get(Path, Input);
eval({range, {Begin, End}}, _Input) ->
range_gen(Begin, End);
eval({get_range, {Begin, End}, Data}, Input) ->
range_get(Begin, End, eval(Data, Input));
eval({var, _} = Var, Input) ->
nested_get(Var, Input);
eval({const, Val}, _Input) ->
Val;
%% unary add
eval({'+', L}, Input) ->
eval(L, Input);
%% unary subtract
eval({'-', L}, Input) ->
-(eval(L, Input));
eval({Op, L, R}, Input) when ?is_arith(Op) ->
apply_func(Op, [eval(L, Input), eval(R, Input)], Input);
eval({Op, L, R}, Input) when ?is_comp(Op) ->
compare(Op, eval(L, Input), eval(R, Input));
eval({list, List}, Input) ->
[eval(L, Input) || L <- List];
eval({'case', <<>>, CaseClauses, ElseClauses}, Input) ->
eval_case_clauses(CaseClauses, ElseClauses, Input);
eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) ->
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input);
eval({'fun', {_, Name}, Args}, Input) ->
apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input).
handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) ->
Input#{payload => may_decode_payload(Payload)};
handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) ->
Input#{<<"payload">> => may_decode_payload(Payload)};
handle_alias(_, Input) ->
Input.
alias({var, Var}) ->
{var, Var};
alias({const, Val}) when is_binary(Val) ->
{var, Val};
alias({list, L}) ->
{var, ?ephemeral_alias(list, length(L))};
alias({range, R}) ->
{var, ?ephemeral_alias(range, R)};
alias({get_range, _, {var, Key}}) ->
{var, Key};
alias({get_range, _, {path, Path}}) ->
{path, Path};
alias({path, Path}) ->
{path, Path};
alias({const, Val}) ->
{var, ?ephemeral_alias(const, Val)};
alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) ->
{var, ?ephemeral_alias(op, Op)};
alias({'case', On, _, _}) ->
{var, ?ephemeral_alias('case', On)};
alias({'fun', Name, _}) ->
{var, ?ephemeral_alias('fun', Name)};
alias(_) ->
?ephemeral_alias(unknown, unknown).
eval_case_clauses([], ElseClauses, Input) ->
case ElseClauses of
{} -> undefined;
_ -> eval(ElseClauses, Input)
end;
eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
case match_conditions(Cond, Input) of
true ->
eval(Clause, Input);
_ ->
eval_case_clauses(CaseClauses, ElseClauses, Input)
end.
eval_switch_clauses(_CaseOn, [], ElseClauses, Input) ->
case ElseClauses of
{} -> undefined;
_ -> eval(ElseClauses, Input)
end;
eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) ->
ConResult = eval(Cond, Input),
case eval(CaseOn, Input) of
ConResult ->
eval(Clause, Input);
_ ->
eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input)
end.
apply_func(Name, Args, Input) when is_atom(Name) ->
do_apply_func(Name, Args, Input);
apply_func(Name, Args, Input) when is_binary(Name) ->
FunName =
try binary_to_existing_atom(Name, utf8)
catch error:badarg -> error({sql_function_not_supported, Name})
end,
do_apply_func(FunName, Args, Input).
do_apply_func(Name, Args, Input) ->
case erlang:apply(emqx_rule_funcs, Name, Args) of
Func when is_function(Func) ->
erlang:apply(Func, [Input]);
Result -> Result
end.
add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata),
Input#{metadata => NewMetadata}.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
may_decode_payload(Payload) when is_binary(Payload) ->
case get_cached_payload() of
undefined -> safe_decode_and_cache(Payload);
DecodedP -> DecodedP
end;
may_decode_payload(Payload) ->
Payload.
get_cached_payload() ->
erlang:get(rule_payload).
cache_payload(DecodedP) ->
erlang:put(rule_payload, DecodedP),
DecodedP.
safe_decode_and_cache(MaybeJson) ->
try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
catch _:_ -> #{}
end.
ensure_list(List) when is_list(List) -> List;
ensure_list(_NotList) -> [].
nested_put(Alias, Val, Input0) ->
Input = handle_alias(Alias, Input0),
emqx_rule_maps:nested_put(Alias, Val, Input).

View File

@ -0,0 +1,108 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_rule_sqlparser).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-export([parse_select/1]).
-export([ select_fields/1
, select_is_foreach/1
, select_doeach/1
, select_incase/1
, select_from/1
, select_where/1
]).
-import(proplists, [ get_value/2
, get_value/3
]).
-record(select, {fields, from, where, is_foreach, doeach, incase}).
-opaque(select() :: #select{}).
-type(const() :: {const, number()|binary()}).
-type(variable() :: binary() | list(binary())).
-type(alias() :: binary() | list(binary())).
-type(field() :: const() | variable()
| {as, field(), alias()}
| {'fun', atom(), list(field())}).
-export_type([select/0]).
%% Dialyzer gives up on the generated code.
%% probably due to stack depth, or inlines.
-dialyzer({nowarn_function, [parse_select/1]}).
%% Parse one select statement.
-spec(parse_select(string() | binary())
-> {ok, select()} | {parse_error, term()} | {lex_error, term()}).
parse_select(Sql) ->
try case rulesql:parsetree(Sql) of
{ok, {select, Clauses}} ->
{ok, #select{
is_foreach = false,
fields = get_value(fields, Clauses),
doeach = [],
incase = {},
from = get_value(from, Clauses),
where = get_value(where, Clauses)
}};
{ok, {foreach, Clauses}} ->
{ok, #select{
is_foreach = true,
fields = get_value(fields, Clauses),
doeach = get_value(do, Clauses, []),
incase = get_value(incase, Clauses, {}),
from = get_value(from, Clauses),
where = get_value(where, Clauses)
}};
Error -> Error
end
catch
_Error:Reason:StackTrace ->
{parse_error, {Reason, StackTrace}}
end.
-spec(select_fields(select()) -> list(field())).
select_fields(#select{fields = Fields}) ->
Fields.
-spec(select_is_foreach(select()) -> boolean()).
select_is_foreach(#select{is_foreach = IsForeach}) ->
IsForeach.
-spec(select_doeach(select()) -> list(field())).
select_doeach(#select{doeach = DoEach}) ->
DoEach.
-spec(select_incase(select()) -> list(field())).
select_incase(#select{incase = InCase}) ->
InCase.
-spec(select_from(select()) -> list(binary())).
select_from(#select{from = From}) ->
From.
-spec(select_where(select()) -> tuple()).
select_where(#select{where = Where}) ->
Where.

View File

@ -159,7 +159,7 @@ handle_message({sync_parse, _Pid, 'after', post, Token, <<"Device">>, QueryData}
{ok, State};
handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, QueryData}, State) ->
%% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]),
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, QueryData]),
dgiot_device:put(QueryData),
{ok, State};

View File

@ -28,10 +28,9 @@ post('after', _AfterData) ->
%%
put('before', #{<<"id">> := DeviceId, <<"profile">> := UserProfile} = Device) ->
%% io:format("~s ~p Device = ~p.~n", [?FILE, ?LINE, _Device]),
dgiot_device:save_profile(Device),
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := Devaddr, <<"productid">> := ProductId}} ->
dgiot_device:save_profile(Device#{<<"objectId">> => DeviceId, <<"product">> => #{<<"objectId">> => ProductId}}),
ProfileTopic =
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"topics">> := #{<<"device_profile">> := ToipcTempl}}} ->

View File

@ -126,27 +126,21 @@ handle_event(_EventId, _Event, State) ->
%% todo
handle_message({sync_profile, _Pid, ProductId, DeviceAddr, DeviceProfile, Delay}, State) ->
%% io:format("~s ~p ~p ~p ~p ~p ~p ~n", [?FILE, ?LINE, Pid, ProductId, DeviceAddr, Profile, Delay]),
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DeviceAddr),
maps:fold(fun(DeviceProfileKey, UserProfileKey, Count) ->
case maps:find(DeviceProfileKey, DeviceProfile) of
{ok, DeviceProfileValue} ->
BinDeviceProfileValue = dgiot_utils:to_binary(DeviceProfileValue),
NewDeviceProfileValue = <<" ", BinDeviceProfileValue/binary>>,
case dgiot_device:get_profile(DeviceId, UserProfileKey) of
not_find ->
Count;
DeviceProfileValue ->
NewDeviceProfileValue ->
Count;
UserProfileValue ->
NewUserProfileValue = dgiot_utils:trim_string(dgiot_utils:to_list(UserProfileValue)),
NewDeviceProfileValue = dgiot_utils:trim_string(dgiot_utils:to_list(DeviceProfileValue)),
case NewDeviceProfileValue of
NewUserProfileValue ->
RealDelay = Delay * timer:seconds(Count),
erlang:send_after(RealDelay, self(), {send_profile, DeviceId, #{UserProfileKey => UserProfileValue}}),
Count + 1;
_ ->
Count
end
RealDelay = Delay * timer:seconds(Count),
erlang:send_after(RealDelay, self(), {send_profile, DeviceId, #{UserProfileKey => UserProfileValue}}),
Count + 1
end;
_ ->
Count