feat: add dgiot_rule

This commit is contained in:
jhonliu 2022-12-08 11:39:15 +08:00
parent b5c6e0d5d9
commit aee14b9136
7 changed files with 198 additions and 6 deletions

View File

@ -0,0 +1,11 @@
-compile({parse_transform, dgiot_rule_actions_trans}).
-type selected_data() :: map().
-type env_vars() :: map().
-type bindings() :: list({atom(), term()}).
-define(BINDING_KEYS, '__bindings__').
-define(bound_v(Key, ENVS0),
maps:get(Key,
maps:get(?BINDING_KEYS, ENVS0, #{}))).

View File

@ -0,0 +1,181 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(APP, dgiot_rule_engine).
-define(KV_TAB, '@rule_engine_db').
-type(maybe(T) :: T | undefined).
-type(rule_id() :: binary()).
-type(rule_name() :: binary()).
-type(resource_id() :: binary()).
-type(action_instance_id() :: binary()).
-type(action_name() :: atom()).
-type(resource_type_name() :: atom()).
-type(category() :: data_persist| data_forward | offline_msgs | debug | other).
-type(descr() :: #{en := binary(), zh => binary()}).
-type(mf() :: {Module::atom(), Fun::atom()}).
-type(hook() :: atom() | 'any').
-type(topic() :: binary()).
-type(resource_status() :: #{ alive := boolean()
, atom() => binary() | atom() | list(binary()|atom())
}).
-define(descr, #{en => <<>>, zh => <<>>}).
-record(action,
{ name :: action_name()
, category :: category()
, for :: hook()
, app :: atom()
, types = [] :: list(resource_type_name())
, module :: module()
, on_create :: mf()
, on_destroy :: maybe(mf())
, hidden = false :: boolean()
, params_spec :: #{atom() => term()} %% params specs
, title = ?descr :: descr()
, description = ?descr :: descr()
}).
-record(action_instance,
{ id :: action_instance_id()
, name :: action_name()
, fallbacks :: list(#action_instance{})
, args :: #{binary() => term()} %% the args got from API for initializing action_instance
}).
-record(rule,
{ id :: rule_id()
, for :: list(topic())
, rawsql :: binary()
, is_foreach :: boolean()
, fields :: list()
, doeach :: term()
, incase :: list()
, conditions :: tuple()
, on_action_failed :: continue | stop
, actions :: list(#action_instance{})
, enabled :: boolean()
, created_at :: integer() %% epoch in millisecond precision
, description :: binary()
, state = normal :: atom()
}).
-record(resource,
{ id :: resource_id()
, type :: resource_type_name()
, config :: #{} %% the configs got from API for initializing resource
, created_at :: integer() | undefined %% epoch in millisecond precision
, description :: binary()
}).
-record(resource_type,
{ name :: resource_type_name()
, provider :: atom()
, params_spec :: #{atom() => term()} %% params specs
, on_create :: mf()
, on_status :: mf()
, on_destroy :: mf()
, title = ?descr :: descr()
, description = ?descr :: descr()
}).
-record(rule_hooks,
{ hook :: atom()
, rule_id :: rule_id()
}).
-record(resource_params,
{ id :: resource_id()
, params :: #{} %% the params got after initializing the resource
, status = #{is_alive => false} :: #{is_alive := boolean(), atom() => term()}
}).
-record(action_instance_params,
{ id :: action_instance_id()
%% the params got after initializing the action
, params :: #{}
%% the Func/Bindings got after initializing the action
, apply :: fun((Data::map(), Envs::map()) -> any())
| #{mod := module(), bindings := #{atom() => term()}}
}).
%% Arithmetic operators
-define(is_arith(Op), (Op =:= '+' orelse
Op =:= '-' orelse
Op =:= '*' orelse
Op =:= '/' orelse
Op =:= 'div')).
%% Compare operators
-define(is_comp(Op), (Op =:= '=' orelse
Op =:= '=~' orelse
Op =:= '>' orelse
Op =:= '<' orelse
Op =:= '<=' orelse
Op =:= '>=' orelse
Op =:= '<>' orelse
Op =:= '!=')).
%% Logical operators
-define(is_logical(Op), (Op =:= 'and' orelse Op =:= 'or')).
-define(RAISE(_EXP_, _ERROR_),
?RAISE(_EXP_, _ = do_nothing, _ERROR_)).
-define(RAISE(_EXP_, _EXP_ON_FAIL_, _ERROR_),
fun() ->
try (_EXP_)
catch _EXCLASS_:_EXCPTION_:_ST_ ->
_EXP_ON_FAIL_,
throw(_ERROR_)
end
end()).
-define(CLUSTER_CALL(Func, Args), ?CLUSTER_CALL(Func, Args, ok)).
-define(CLUSTER_CALL(Func, Args, ResParttern),
fun() -> case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, Func, Args, 30000) of
{ResL, []} ->
case lists:filter(fun(ResParttern) -> false; (_) -> true end, ResL) of
[] -> ResL;
ErrL ->
?LOG(error, "cluster_call error found, ResL: ~p", [ResL]),
throw({Func, ErrL})
end;
{ResL, BadNodes} ->
?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]),
throw({Func, {failed_on_nodes, BadNodes}})
end end()).
%% Tables
-define(RULE_TAB, emqx_rule).
-define(ACTION_TAB, emqx_rule_action).
-define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params).
-define(RES_TAB, emqx_resource).
-define(RES_PARAMS_TAB, emqx_resource_params).
-define(RULE_HOOKS, emqx_rule_hooks).
-define(RES_TYPE_TAB, emqx_resource_type).

View File

@ -16,7 +16,7 @@
-module(dgiot_rule_engine).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("dgiot/include/rule_engine.hrl").
-include_lib("dgiot/include/logger.hrl").
-export([create_rule/1
@ -161,4 +161,4 @@ test() ->
" end "
"FROM abc">>,
dgiot_rule_sqlparser:parse_select(Sql),
create_rule(#{rawsql => Sql}).
create_rule(#{rawsql => Sql}).

View File

@ -18,7 +18,7 @@
-behaviour(supervisor).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("dgiot/include/rule_engine.hrl").
-export([start_link/0]).

View File

@ -18,7 +18,7 @@
-behaviour(gen_server).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("dgiot/include/rule_engine.hrl").
-include_lib("dgiot/include/logger.hrl").
-include_lib("stdlib/include/qlc.hrl").
-define(DGIOT_RULE_TAB, dgiot_rule).

View File

@ -16,7 +16,7 @@
-module(dgiot_rule_runtime).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("dgiot/include/rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("dgiot/include/logger.hrl").

View File

@ -16,7 +16,7 @@
-module(dgiot_rule_sqlparser).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("dgiot/include/rule_engine.hrl").
-export([parse_select/1]).