From 1a074f47a52a63e42c3ce0d5aa9fd2237a3f13d2 Mon Sep 17 00:00:00 2001 From: dawnwinterLiu <1737801684@qq.com> Date: Fri, 17 Jun 2022 09:35:52 +0800 Subject: [PATCH 1/3] feat: streamline rules engine --- apps/dgiot/src/dgiot_sup.erl | 2 + apps/dgiot/src/rules/dgiot_rule_engine.erl | 164 +++++++++ .../dgiot/src/rules/dgiot_rule_engine_sup.erl | 48 +++ apps/dgiot/src/rules/dgiot_rule_locker.erl | 34 ++ apps/dgiot/src/rules/dgiot_rule_registry.erl | 226 ++++++++++++ apps/dgiot/src/rules/dgiot_rule_runtime.erl | 346 ++++++++++++++++++ apps/dgiot/src/rules/dgiot_rule_sqlparser.erl | 108 ++++++ 7 files changed, 928 insertions(+) create mode 100644 apps/dgiot/src/rules/dgiot_rule_engine.erl create mode 100644 apps/dgiot/src/rules/dgiot_rule_engine_sup.erl create mode 100644 apps/dgiot/src/rules/dgiot_rule_locker.erl create mode 100644 apps/dgiot/src/rules/dgiot_rule_registry.erl create mode 100644 apps/dgiot/src/rules/dgiot_rule_runtime.erl create mode 100644 apps/dgiot/src/rules/dgiot_rule_sqlparser.erl diff --git a/apps/dgiot/src/dgiot_sup.erl b/apps/dgiot/src/dgiot_sup.erl index ffd2f86e..ac47e53d 100644 --- a/apps/dgiot/src/dgiot_sup.erl +++ b/apps/dgiot/src/dgiot_sup.erl @@ -74,9 +74,11 @@ init([]) -> KernelSup = child_spec(dgiot_kernel_sup, supervisor, []), MnesiaSup = child_spec(dgiot_mnesia_sup, supervisor, []), CMSup = child_spec(dgiot_cm_sup, supervisor, []), + RuleSup = child_spec(dgiot_rule_engine_sup,supervisor, []), Childs = [KernelSup] ++ [MnesiaSup] ++ [CMSup] + ++ [RuleSup] ++ [child_spec(dgiot_dcache, worker, [?DCACHE]), dgiot_channelx:spec(channelx_mgr)], diff --git a/apps/dgiot/src/rules/dgiot_rule_engine.erl b/apps/dgiot/src/rules/dgiot_rule_engine.erl new file mode 100644 index 00000000..8f78d673 --- /dev/null +++ b/apps/dgiot/src/rules/dgiot_rule_engine.erl @@ -0,0 +1,164 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_rule_engine). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("dgiot/include/logger.hrl"). + +-export([create_rule/1 + , update_rule/1 + , delete_rule/1 + , refresh_rules/0 + , test/0 +]). + +-type(rule() :: #rule{}). + +-export_type([rule/0 +]). + +-define(T_RETRY, 60000). + +%% APIs for rules and resources +%%------------------------------------------------------------------------------ + +-dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]). +-spec create_rule(map()) -> {ok, rule()} | {error, term()}. +create_rule(Params = #{rawsql := Sql}) -> + case dgiot_rule_sqlparser:parse_select(Sql) of + {ok, Select} -> + RuleId = maps:get(id, Params, rule_id()), + Enabled = maps:get(enabled, Params, true), + Rule = #rule{ + id = RuleId, + rawsql = Sql, + for = dgiot_rule_sqlparser:select_from(Select), + is_foreach = dgiot_rule_sqlparser:select_is_foreach(Select), + fields = dgiot_rule_sqlparser:select_fields(Select), + doeach = dgiot_rule_sqlparser:select_doeach(Select), + incase = dgiot_rule_sqlparser:select_incase(Select), + conditions = dgiot_rule_sqlparser:select_where(Select), + enabled = Enabled, + created_at = erlang:system_time(millisecond), + description = maps:get(description, Params, ""), + state = normal + }, + ok = dgiot_rule_registry:add_rule(Rule), + {ok, Rule}; + Reason -> {error, Reason} + end. + +-spec(update_rule(#{id := binary(), _ => _}) -> {ok, rule()} | {error, {not_found, rule_id()}}). +update_rule(Params = #{id := RuleId}) -> + case dgiot_rule_registry:get_rule(RuleId) of + {ok, Rule0} -> + try may_update_rule_params(Rule0, Params) of + Rule -> + ok = dgiot_rule_registry:add_rule(Rule), + {ok, Rule} + catch + throw:Reason -> + {error, Reason} + end; + not_found -> + {error, {not_found, RuleId}} + end. + +-spec(delete_rule(RuleId :: rule_id()) -> ok). +delete_rule(RuleId) -> + case dgiot_rule_registry:get_rule(RuleId) of + {ok, Rule} -> + try + _ = ?CLUSTER_CALL(clear_rule, [Rule]), + ok = dgiot_rule_registry:remove_rule(Rule) + catch + Error:Reason:ST -> + ?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]) + end; + not_found -> + ok + end. + +-spec(refresh_rules() -> ok). +refresh_rules() -> + lists:foreach(fun + (#rule{enabled = true} = Rule) -> + try refresh_rule(Rule) + catch _:_ -> + dgiot_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) + end; + (_) -> ok + end, dgiot_rule_registry:get_rules()). + +refresh_rule(#rule{id = RuleId, for = Topics}) -> + ok = emqx_rule_metrics:create_rule_metrics(RuleId), + lists:foreach(fun emqx_rule_events:load/1, Topics). + +-dialyzer([{nowarn_function, may_update_rule_params/2}]). +may_update_rule_params(Rule, Params = #{rawsql := SQL}) -> + case emqx_rule_sqlparser:parse_select(SQL) of + {ok, Select} -> + may_update_rule_params( + Rule#rule{ + rawsql = SQL, + for = dgiot_rule_sqlparser:select_from(Select), + is_foreach = dgiot_rule_sqlparser:select_is_foreach(Select), + fields = dgiot_rule_sqlparser:select_fields(Select), + doeach = dgiot_rule_sqlparser:select_doeach(Select), + incase = dgiot_rule_sqlparser:select_incase(Select), + conditions = dgiot_rule_sqlparser:select_where(Select) + }, + maps:remove(rawsql, Params)); + Reason -> throw(Reason) + end; +may_update_rule_params(Rule = #rule{enabled = OldEnb, actions = Actions, state = OldState}, + Params = #{enabled := NewEnb}) -> + State = case {OldEnb, NewEnb} of + {false, true} -> + _ = ?CLUSTER_CALL(refresh_rule, [Rule]), + force_changed; + {true, false} -> + _ = ?CLUSTER_CALL(clear_actions, [Actions]), + force_changed; + _NoChange -> OldState + end, + may_update_rule_params(Rule#rule{enabled = NewEnb, state = State}, maps:remove(enabled, Params)); +may_update_rule_params(Rule, Params = #{description := Descr}) -> + may_update_rule_params(Rule#rule{description = Descr}, maps:remove(description, Params)); +may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params + Rule. + +rule_id() -> + gen_id("rule:", fun dgiot_rule_registry:get_rule/1). + +gen_id(Prefix, TestFun) -> + Id = iolist_to_binary([Prefix, emqx_rule_id:gen()]), + case TestFun(Id) of + not_found -> Id; + _Res -> gen_id(Prefix, TestFun) + end. + +test() -> + Sql = <<"SELECT " + " case " + " when a = 1 then a " + " when a = 2 then a " + " else a-1 " + " end " + "FROM abc">>, + dgiot_rule_sqlparser:parse_select(Sql), + create_rule(#{rawsql => Sql}). \ No newline at end of file diff --git a/apps/dgiot/src/rules/dgiot_rule_engine_sup.erl b/apps/dgiot/src/rules/dgiot_rule_engine_sup.erl new file mode 100644 index 00000000..d3cdaef7 --- /dev/null +++ b/apps/dgiot/src/rules/dgiot_rule_engine_sup.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_rule_engine_sup). + +-behaviour(supervisor). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). + +-export([start_link/0]). + +-export([start_locker/0]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + Registry = #{id => dgiot_rule_registry, + start => {dgiot_rule_registry, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [dgiot_rule_registry]}, + {ok, {{one_for_one, 10, 10}, [Registry]}}. + +start_locker() -> + Locker = #{id => dgiot_rule_locker, + start => {dgiot_rule_locker, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [dgiot_rule_locker]}, + supervisor:start_child(?MODULE, Locker). diff --git a/apps/dgiot/src/rules/dgiot_rule_locker.erl b/apps/dgiot/src/rules/dgiot_rule_locker.erl new file mode 100644 index 00000000..370c3070 --- /dev/null +++ b/apps/dgiot/src/rules/dgiot_rule_locker.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_rule_locker). + +-export([start_link/0]). + +-export([ lock/1 + , unlock/1 + ]). + +start_link() -> + ekka_locker:start_link(?MODULE). + +-spec(lock(binary()) -> ekka_locker:lock_result()). +lock(Id) -> + ekka_locker:acquire(?MODULE, Id, local). + +-spec(unlock(binary()) -> {boolean(), [node()]}). +unlock(Id) -> + ekka_locker:release(?MODULE, Id, local). diff --git a/apps/dgiot/src/rules/dgiot_rule_registry.erl b/apps/dgiot/src/rules/dgiot_rule_registry.erl new file mode 100644 index 00000000..c1824966 --- /dev/null +++ b/apps/dgiot/src/rules/dgiot_rule_registry.erl @@ -0,0 +1,226 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_rule_registry). + +-behaviour(gen_server). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("dgiot/include/logger.hrl"). +-include_lib("stdlib/include/qlc.hrl"). +-define(DGIOT_RULE_TAB, dgiot_rule). +-define(DG_KV_TAB, '@dg_rule_engine_db'). +-export([start_link/0]). + +%% Rule Management +-export([ get_rules/0 + , get_rules_for/1 + , get_rules_with_same_event/1 + , get_rules_ordered_by_ts/0 + , get_rule/1 + , add_rule/1 + , add_rules/1 + , remove_rule/1 + , remove_rules/1 + ]). + +-export([ load_hooks_for_rule/1 + , unload_hooks_for_rule/1 + ]). + +%% for debug purposes +-export([dump/0]). + +%% gen_server Callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-define(REGISTRY, ?MODULE). + +-define(T_CALL, 10000). + +%%------------------------------------------------------------------------------ +%% Mnesia bootstrap +%%------------------------------------------------------------------------------ + +%% @doc Create or replicate tables. +-spec(mnesia(boot | copy) -> ok). +mnesia(boot) -> + %% Optimize storage + StoreProps = [{ets, [{read_concurrency, true}]}], + %% Rule table + ok = ekka_mnesia:create_table(?DGIOT_RULE_TAB, [ + {ram_copies, [node()]}, + {record_name, rule}, + {index, [#rule.for]}, + {attributes, record_info(fields, rule)}, + {storage_properties, StoreProps}]); + +mnesia(copy) -> + %% Copy rule table + ok = ekka_mnesia:copy_table(?DGIOT_RULE_TAB, ram_copies). + +dump() -> + io:format("Rules: ~p~n", + [ets:tab2list(?DGIOT_RULE_TAB)]). + +%%------------------------------------------------------------------------------ +%% Start the registry +%%------------------------------------------------------------------------------ + +-spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). + +%%------------------------------------------------------------------------------ +%% Rule Management +%%------------------------------------------------------------------------------ + +-spec(get_rules() -> list(emqx_rule_engine:rule())). +get_rules() -> + get_all_records(?DGIOT_RULE_TAB). + +get_rules_ordered_by_ts() -> + F = fun() -> + Query = qlc:q([E || E <- mnesia:table(?DGIOT_RULE_TAB)]), + qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}])) + end, + {atomic, List} = mnesia:transaction(F), + List. + +-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). +get_rules_for(Topic) -> + [Rule || Rule = #rule{for = For} <- get_rules(), + emqx_rule_utils:can_topic_match_oneof(Topic, For)]. + +-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). +get_rules_with_same_event(Topic) -> + EventName = emqx_rule_events:event_name(Topic), + [Rule || Rule = #rule{for = For} <- get_rules(), + lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)]. + +is_of_event_name(EventName, Topic) -> + EventName =:= emqx_rule_events:event_name(Topic). + +-spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found). +get_rule(Id) -> + case mnesia:dirty_read(?RULE_TAB, Id) of + [Rule] -> {ok, Rule}; + [] -> not_found + end. + +-spec(add_rule(emqx_rule_engine:rule()) -> ok). +add_rule(Rule) when is_record(Rule, rule) -> + add_rules([Rule]). + +-spec(add_rules(list(emqx_rule_engine:rule())) -> ok). +add_rules(Rules) -> + gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL). + +-spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok). +remove_rule(RuleOrId) -> + remove_rules([RuleOrId]). + +-spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok). +remove_rules(Rules) -> + gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). + +%% @private +insert_rule(Rule) -> + _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), + mnesia:write(?DGIOT_RULE_TAB, Rule, write). + +%% @private +delete_rule(RuleId) when is_binary(RuleId) -> + case get_rule(RuleId) of + {ok, Rule} -> delete_rule(Rule); + not_found -> ok + end; +delete_rule(Rule) -> + _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), + mnesia:delete_object(?DGIOT_RULE_TAB, Rule, write). + +load_hooks_for_rule(#rule{for = Topics}) -> + lists:foreach(fun emqx_rule_events:load/1, Topics). + +unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> + lists:foreach(fun(Topic) -> + case get_rules_with_same_event(Topic) of + [#rule{id = Id}] -> %% we are now deleting the last rule + emqx_rule_events:unload(Topic); + _ -> ok + end + end, Topics). + +%%------------------------------------------------------------------------------ +%% gen_server callbacks +%%------------------------------------------------------------------------------ + +init([]) -> + _TableId = ets:new(?DG_KV_TAB, [named_table, set, public, {write_concurrency, true}, + {read_concurrency, true}]), + {ok, #{}}. + +handle_call({add_rules, Rules}, _From, State) -> + trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), + {reply, ok, State}; + +handle_call({remove_rules, Rules}, _From, State) -> + trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + {reply, ok, State}; + +handle_call(Req, _From, State) -> + ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), + {noreply, State}. + +handle_info(Info, State) -> + ?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%------------------------------------------------------------------------------ +%% Private functions +%%------------------------------------------------------------------------------ + +get_all_records(Tab) -> + %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). + ets:tab2list(Tab). + +trans(Fun, Args) -> + case mnesia:transaction(Fun, Args) of + {atomic, Result} -> Result; + {aborted, Reason} -> error(Reason) + end. diff --git a/apps/dgiot/src/rules/dgiot_rule_runtime.erl b/apps/dgiot/src/rules/dgiot_rule_runtime.erl new file mode 100644 index 00000000..d6df34fb --- /dev/null +++ b/apps/dgiot/src/rules/dgiot_rule_runtime.erl @@ -0,0 +1,346 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_rule_runtime). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("dgiot/include/logger.hrl"). + +-export([ apply_rule/2 + , apply_rules/2 + , clear_rule_payload/0 + ]). + +-import(emqx_rule_maps, + [ nested_get/2 + , range_gen/2 + , range_get/3 + ]). + +-compile({no_auto_import,[alias/1]}). + +-type(input() :: map()). +-type(alias() :: atom()). +-type(collection() :: {alias(), [term()]}). + +-define(ephemeral_alias(TYPE, NAME), + iolist_to_binary(io_lib:format("_v_~s_~p_~p", [TYPE, NAME, erlang:system_time()]))). + +-define(ActionMaxRetry, 3). + +%%------------------------------------------------------------------------------ +%% Apply rules +%%------------------------------------------------------------------------------ +-spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok). +apply_rules([], _Input) -> + ok; +apply_rules([#rule{enabled = false}|More], Input) -> + apply_rules(More, Input); +apply_rules([Rule = #rule{id = RuleID}|More], Input) -> + try apply_rule_discard_result(Rule, Input) + catch + %% ignore the errors if select or match failed + _:{select_and_transform_error, Error} -> + ?LOG(warning, "SELECT clause exception for ~s failed: ~p", + [RuleID, Error]); + _:{match_conditions_error, Error} -> + ?LOG(warning, "WHERE clause exception for ~s failed: ~p", + [RuleID, Error]); + _:{select_and_collect_error, Error} -> + ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", + [RuleID, Error]); + _:{match_incase_error, Error} -> + ?LOG(warning, "INCASE clause exception for ~s failed: ~p", + [RuleID, Error]); + _:Error:StkTrace -> + ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", + [RuleID, Error, StkTrace]) + end, + apply_rules(More, Input). + +apply_rule_discard_result(Rule, Input) -> + _ = apply_rule(Rule, Input), + ok. + +apply_rule(Rule = #rule{id = RuleID}, Input) -> + clear_rule_payload(), + do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). + +do_apply_rule(#rule{is_foreach = true, + fields = Fields, + conditions = Conditions}, Input) -> + {Selected, _Collection} = ?RAISE(select_and_collect(Fields, Input), + {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}), + ColumnsAndSelected = maps:merge(Input, Selected), + case ?RAISE(match_conditions(Conditions, ColumnsAndSelected), + {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of + true -> + {ok, match}; + false -> + {error, nomatch} + end; + +do_apply_rule(#rule{ + is_foreach = false, + fields = Fields, + conditions = Conditions}, Input) -> + Selected = ?RAISE(select_and_transform(Fields, Input), + {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}), + case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), + {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of + true -> + {ok, match}; + false -> + {error, nomatch} + end. + +clear_rule_payload() -> + erlang:erase(rule_payload). + +%% SELECT Clause +select_and_transform(Fields, Input) -> + select_and_transform(Fields, Input, #{}). + +select_and_transform([], _Input, Output) -> + Output; +select_and_transform(['*'|More], Input, Output) -> + select_and_transform(More, Input, maps:merge(Output, Input)); +select_and_transform([{as, Field, Alias}|More], Input, Output) -> + Val = eval(Field, Input), + select_and_transform(More, + nested_put(Alias, Val, Input), + nested_put(Alias, Val, Output)); +select_and_transform([Field|More], Input, Output) -> + Val = eval(Field, Input), + Key = alias(Field), + select_and_transform(More, + nested_put(Key, Val, Input), + nested_put(Key, Val, Output)). + +%% FOREACH Clause +-spec select_and_collect(list(), input()) -> {input(), collection()}. +select_and_collect(Fields, Input) -> + select_and_collect(Fields, Input, {#{}, {'item', []}}). + +select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) -> + Val = eval(Field, Input), + {nested_put(Alias, Val, Output), {A, ensure_list(Val)}}; +select_and_collect([{as, Field, Alias}|More], Input, {Output, LastKV}) -> + Val = eval(Field, Input), + select_and_collect(More, + nested_put(Alias, Val, Input), + {nested_put(Alias, Val, Output), LastKV}); +select_and_collect([Field], Input, {Output, _}) -> + Val = eval(Field, Input), + Key = alias(Field), + {nested_put(Key, Val, Output), {'item', ensure_list(Val)}}; +select_and_collect([Field|More], Input, {Output, LastKV}) -> + Val = eval(Field, Input), + Key = alias(Field), + select_and_collect(More, + nested_put(Key, Val, Input), + {nested_put(Key, Val, Output), LastKV}). + +%% Conditional Clauses such as WHERE, WHEN. +match_conditions({'and', L, R}, Data) -> + match_conditions(L, Data) andalso match_conditions(R, Data); +match_conditions({'or', L, R}, Data) -> + match_conditions(L, Data) orelse match_conditions(R, Data); +match_conditions({'not', Var}, Data) -> + case eval(Var, Data) of + Bool when is_boolean(Bool) -> + not Bool; + _other -> false + end; +match_conditions({in, Var, {list, Vals}}, Data) -> + lists:member(eval(Var, Data), [eval(V, Data) || V <- Vals]); +match_conditions({'fun', {_, Name}, Args}, Data) -> + apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data); +match_conditions({Op, L, R}, Data) when ?is_comp(Op) -> + compare(Op, eval(L, Data), eval(R, Data)); +%%match_conditions({'like', Var, Pattern}, Data) -> +%% match_like(eval(Var, Data), Pattern); +match_conditions({}, _Data) -> + true. + +%% comparing numbers against strings +compare(Op, L, R) when is_number(L), is_binary(R) -> + do_compare(Op, L, number(R)); +compare(Op, L, R) when is_binary(L), is_number(R) -> + do_compare(Op, number(L), R); +compare(Op, L, R) when is_atom(L), is_binary(R) -> + do_compare(Op, atom_to_binary(L, utf8), R); +compare(Op, L, R) when is_binary(L), is_atom(R) -> + do_compare(Op, L, atom_to_binary(R, utf8)); +compare(Op, L, R) -> + do_compare(Op, L, R). + +do_compare('=', L, R) -> L == R; +do_compare('>', L, R) -> L > R; +do_compare('<', L, R) -> L < R; +do_compare('<=', L, R) -> L =< R; +do_compare('>=', L, R) -> L >= R; +do_compare('<>', L, R) -> L /= R; +do_compare('!=', L, R) -> L /= R; +do_compare('=~', T, F) -> emqx_topic:match(T, F). + +number(Bin) -> + try binary_to_integer(Bin) + catch error:badarg -> binary_to_float(Bin) + end. + +eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> + nested_get({path, Path}, may_decode_payload(Payload)); +eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) -> + nested_get({path, Path}, may_decode_payload(Payload)); +eval({path, _} = Path, Input) -> + nested_get(Path, Input); +eval({range, {Begin, End}}, _Input) -> + range_gen(Begin, End); +eval({get_range, {Begin, End}, Data}, Input) -> + range_get(Begin, End, eval(Data, Input)); +eval({var, _} = Var, Input) -> + nested_get(Var, Input); +eval({const, Val}, _Input) -> + Val; +%% unary add +eval({'+', L}, Input) -> + eval(L, Input); +%% unary subtract +eval({'-', L}, Input) -> + -(eval(L, Input)); +eval({Op, L, R}, Input) when ?is_arith(Op) -> + apply_func(Op, [eval(L, Input), eval(R, Input)], Input); +eval({Op, L, R}, Input) when ?is_comp(Op) -> + compare(Op, eval(L, Input), eval(R, Input)); +eval({list, List}, Input) -> + [eval(L, Input) || L <- List]; +eval({'case', <<>>, CaseClauses, ElseClauses}, Input) -> + eval_case_clauses(CaseClauses, ElseClauses, Input); +eval({'case', CaseOn, CaseClauses, ElseClauses}, Input) -> + eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input); +eval({'fun', {_, Name}, Args}, Input) -> + apply_func(Name, [eval(Arg, Input) || Arg <- Args], Input). + +handle_alias({path, [{key, <<"payload">>} | _]}, #{payload := Payload} = Input) -> + Input#{payload => may_decode_payload(Payload)}; +handle_alias({path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Input) -> + Input#{<<"payload">> => may_decode_payload(Payload)}; +handle_alias(_, Input) -> + Input. + +alias({var, Var}) -> + {var, Var}; +alias({const, Val}) when is_binary(Val) -> + {var, Val}; +alias({list, L}) -> + {var, ?ephemeral_alias(list, length(L))}; +alias({range, R}) -> + {var, ?ephemeral_alias(range, R)}; +alias({get_range, _, {var, Key}}) -> + {var, Key}; +alias({get_range, _, {path, Path}}) -> + {path, Path}; +alias({path, Path}) -> + {path, Path}; +alias({const, Val}) -> + {var, ?ephemeral_alias(const, Val)}; +alias({Op, _L, _R}) when ?is_arith(Op); ?is_comp(Op) -> + {var, ?ephemeral_alias(op, Op)}; +alias({'case', On, _, _}) -> + {var, ?ephemeral_alias('case', On)}; +alias({'fun', Name, _}) -> + {var, ?ephemeral_alias('fun', Name)}; +alias(_) -> + ?ephemeral_alias(unknown, unknown). + +eval_case_clauses([], ElseClauses, Input) -> + case ElseClauses of + {} -> undefined; + _ -> eval(ElseClauses, Input) + end; +eval_case_clauses([{Cond, Clause} | CaseClauses], ElseClauses, Input) -> + case match_conditions(Cond, Input) of + true -> + eval(Clause, Input); + _ -> + eval_case_clauses(CaseClauses, ElseClauses, Input) + end. + +eval_switch_clauses(_CaseOn, [], ElseClauses, Input) -> + case ElseClauses of + {} -> undefined; + _ -> eval(ElseClauses, Input) + end; +eval_switch_clauses(CaseOn, [{Cond, Clause} | CaseClauses], ElseClauses, Input) -> + ConResult = eval(Cond, Input), + case eval(CaseOn, Input) of + ConResult -> + eval(Clause, Input); + _ -> + eval_switch_clauses(CaseOn, CaseClauses, ElseClauses, Input) + end. + +apply_func(Name, Args, Input) when is_atom(Name) -> + do_apply_func(Name, Args, Input); +apply_func(Name, Args, Input) when is_binary(Name) -> + FunName = + try binary_to_existing_atom(Name, utf8) + catch error:badarg -> error({sql_function_not_supported, Name}) + end, + do_apply_func(FunName, Args, Input). + +do_apply_func(Name, Args, Input) -> + case erlang:apply(emqx_rule_funcs, Name, Args) of + Func when is_function(Func) -> + erlang:apply(Func, [Input]); + Result -> Result + end. + +add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) -> + NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata), + Input#{metadata => NewMetadata}. + +%%------------------------------------------------------------------------------ +%% Internal Functions +%%------------------------------------------------------------------------------ +may_decode_payload(Payload) when is_binary(Payload) -> + case get_cached_payload() of + undefined -> safe_decode_and_cache(Payload); + DecodedP -> DecodedP + end; +may_decode_payload(Payload) -> + Payload. + +get_cached_payload() -> + erlang:get(rule_payload). + +cache_payload(DecodedP) -> + erlang:put(rule_payload, DecodedP), + DecodedP. + +safe_decode_and_cache(MaybeJson) -> + try cache_payload(emqx_json:decode(MaybeJson, [return_maps])) + catch _:_ -> #{} + end. + +ensure_list(List) when is_list(List) -> List; +ensure_list(_NotList) -> []. + +nested_put(Alias, Val, Input0) -> + Input = handle_alias(Alias, Input0), + emqx_rule_maps:nested_put(Alias, Val, Input). diff --git a/apps/dgiot/src/rules/dgiot_rule_sqlparser.erl b/apps/dgiot/src/rules/dgiot_rule_sqlparser.erl new file mode 100644 index 00000000..de889698 --- /dev/null +++ b/apps/dgiot/src/rules/dgiot_rule_sqlparser.erl @@ -0,0 +1,108 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(dgiot_rule_sqlparser). + +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). + +-export([parse_select/1]). + +-export([ select_fields/1 + , select_is_foreach/1 + , select_doeach/1 + , select_incase/1 + , select_from/1 + , select_where/1 + ]). + +-import(proplists, [ get_value/2 + , get_value/3 + ]). + +-record(select, {fields, from, where, is_foreach, doeach, incase}). + +-opaque(select() :: #select{}). + +-type(const() :: {const, number()|binary()}). + +-type(variable() :: binary() | list(binary())). + +-type(alias() :: binary() | list(binary())). + +-type(field() :: const() | variable() + | {as, field(), alias()} + | {'fun', atom(), list(field())}). + +-export_type([select/0]). + +%% Dialyzer gives up on the generated code. +%% probably due to stack depth, or inlines. +-dialyzer({nowarn_function, [parse_select/1]}). + +%% Parse one select statement. +-spec(parse_select(string() | binary()) + -> {ok, select()} | {parse_error, term()} | {lex_error, term()}). +parse_select(Sql) -> + try case rulesql:parsetree(Sql) of + {ok, {select, Clauses}} -> + {ok, #select{ + is_foreach = false, + fields = get_value(fields, Clauses), + doeach = [], + incase = {}, + from = get_value(from, Clauses), + where = get_value(where, Clauses) + }}; + {ok, {foreach, Clauses}} -> + {ok, #select{ + is_foreach = true, + fields = get_value(fields, Clauses), + doeach = get_value(do, Clauses, []), + incase = get_value(incase, Clauses, {}), + from = get_value(from, Clauses), + where = get_value(where, Clauses) + }}; + Error -> Error + end + catch + _Error:Reason:StackTrace -> + {parse_error, {Reason, StackTrace}} + end. + +-spec(select_fields(select()) -> list(field())). +select_fields(#select{fields = Fields}) -> + Fields. + +-spec(select_is_foreach(select()) -> boolean()). +select_is_foreach(#select{is_foreach = IsForeach}) -> + IsForeach. + +-spec(select_doeach(select()) -> list(field())). +select_doeach(#select{doeach = DoEach}) -> + DoEach. + +-spec(select_incase(select()) -> list(field())). +select_incase(#select{incase = InCase}) -> + InCase. + +-spec(select_from(select()) -> list(binary())). +select_from(#select{from = From}) -> + From. + +-spec(select_where(select()) -> tuple()). +select_where(#select{where = Where}) -> + Where. + From 7b2a34dfb4dd97a84368d841a6e2c95ac3319bf2 Mon Sep 17 00:00:00 2001 From: h7ml Date: Fri, 17 Jun 2022 11:42:41 +0800 Subject: [PATCH 2/3] fix: device profile --- apps/dgiot_device/src/dgiot_device_channel.erl | 2 +- apps/dgiot_device/src/dgiot_device_profile.erl | 3 +-- .../dgiot_device/src/dgiot_profile_channel.erl | 18 ++++++------------ 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/apps/dgiot_device/src/dgiot_device_channel.erl b/apps/dgiot_device/src/dgiot_device_channel.erl index d2bc0f5f..66f14e2e 100644 --- a/apps/dgiot_device/src/dgiot_device_channel.erl +++ b/apps/dgiot_device/src/dgiot_device_channel.erl @@ -159,7 +159,7 @@ handle_message({sync_parse, _Pid, 'after', post, Token, <<"Device">>, QueryData} {ok, State}; handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, QueryData}, State) -> -%% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]), +%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, QueryData]), dgiot_device:put(QueryData), {ok, State}; diff --git a/apps/dgiot_device/src/dgiot_device_profile.erl b/apps/dgiot_device/src/dgiot_device_profile.erl index fb884884..b4cbb7cd 100644 --- a/apps/dgiot_device/src/dgiot_device_profile.erl +++ b/apps/dgiot_device/src/dgiot_device_profile.erl @@ -28,10 +28,9 @@ post('after', _AfterData) -> %% 配置下发 put('before', #{<<"id">> := DeviceId, <<"profile">> := UserProfile} = Device) -> -%% io:format("~s ~p Device = ~p.~n", [?FILE, ?LINE, _Device]), - dgiot_device:save_profile(Device), case dgiot_device:lookup(DeviceId) of {ok, #{<<"devaddr">> := Devaddr, <<"productid">> := ProductId}} -> + dgiot_device:save_profile(Device#{<<"objectId">> => DeviceId, <<"product">> => #{<<"objectId">> => ProductId}}), ProfileTopic = case dgiot_product:lookup_prod(ProductId) of {ok, #{<<"topics">> := #{<<"device_profile">> := ToipcTempl}}} -> diff --git a/apps/dgiot_device/src/dgiot_profile_channel.erl b/apps/dgiot_device/src/dgiot_profile_channel.erl index 52a9c2d1..accf6532 100644 --- a/apps/dgiot_device/src/dgiot_profile_channel.erl +++ b/apps/dgiot_device/src/dgiot_profile_channel.erl @@ -126,27 +126,21 @@ handle_event(_EventId, _Event, State) -> %% todo 定时自动同步,不太好判断,通过采集通道里,通过设备登录时,检查状态来进行配置同步 handle_message({sync_profile, _Pid, ProductId, DeviceAddr, DeviceProfile, Delay}, State) -> -%% io:format("~s ~p ~p ~p ~p ~p ~p ~n", [?FILE, ?LINE, Pid, ProductId, DeviceAddr, Profile, Delay]), DeviceId = dgiot_parse_id:get_deviceid(ProductId, DeviceAddr), maps:fold(fun(DeviceProfileKey, UserProfileKey, Count) -> case maps:find(DeviceProfileKey, DeviceProfile) of {ok, DeviceProfileValue} -> + BinDeviceProfileValue = dgiot_utils:to_binary(DeviceProfileValue), + NewDeviceProfileValue = <<" ", BinDeviceProfileValue/binary>>, case dgiot_device:get_profile(DeviceId, UserProfileKey) of not_find -> Count; - DeviceProfileValue -> + NewDeviceProfileValue -> Count; UserProfileValue -> - NewUserProfileValue = dgiot_utils:trim_string(dgiot_utils:to_list(UserProfileValue)), - NewDeviceProfileValue = dgiot_utils:trim_string(dgiot_utils:to_list(DeviceProfileValue)), - case NewDeviceProfileValue of - NewUserProfileValue -> - RealDelay = Delay * timer:seconds(Count), - erlang:send_after(RealDelay, self(), {send_profile, DeviceId, #{UserProfileKey => UserProfileValue}}), - Count + 1; - _ -> - Count - end + RealDelay = Delay * timer:seconds(Count), + erlang:send_after(RealDelay, self(), {send_profile, DeviceId, #{UserProfileKey => UserProfileValue}}), + Count + 1 end; _ -> Count From 4b6f380c3ef15f1fef1eeeb460201e100000e998 Mon Sep 17 00:00:00 2001 From: wanguliux <51999461+wanguliux@users.noreply.github.com> Date: Mon, 20 Jun 2022 10:01:55 +0800 Subject: [PATCH 3/3] Update README-CN.md --- README-CN.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README-CN.md b/README-CN.md index dec1a87f..76fb7ef3 100644 --- a/README-CN.md +++ b/README-CN.md @@ -19,7 +19,7 @@ DG-IoT是国内首款轻量级开源工业物联网平台,我们致力于提 ## 快速体验与微信群 | 微信技术支持群 | [QQ技术支持群](https://jq.qq.com/?_wv=1027&k=LipWZvDe) | 小程序体验 |电脑端https://prod.iotn2n.com| |:---:|:---:|:---:|:---:| -|![image](https://user-images.githubusercontent.com/51999461/172523982-c3179a1d-aa1a-46b0-b614-1e347cbab33f.png)| ||运维账号:dgiot_admin
密码: dgiot_admin

开发账号: dgiot_dev
密码: dgiot_dev | +|![image](https://user-images.githubusercontent.com/51999461/174512542-cd2ef696-17aa-4de0-994a-c5b690b5e60f.png)| ||运维账号:dgiot_admin
密码: dgiot_admin

开发账号: dgiot_dev
密码: dgiot_dev | ## 核心特色