add dgiot bridge

This commit is contained in:
lsxredrain 2021-06-16 15:54:03 +09:00
parent fa25f6387a
commit 9abf0668fb
4 changed files with 178 additions and 60 deletions

View File

@ -22,7 +22,7 @@
-include_lib("dgiot/include/logger.hrl").
%% API
-export([get_name/2, add2/5, add/4, add/5, spec/0, spec/1, delete/2, delete/3, do_event/4, do_event/5, do_message/4, do_message/3, call/3, call/4, call2/3, call2/4, start_link/1]).
-export([get_name/2, add2/5, add/4, add/5, spec/0, spec/1, delete/2, delete/3, status/1, do_event/4, do_event/5, do_message/4, do_message/3, call/3, call/4, call2/3, call2/4, start_link/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -69,6 +69,12 @@ spec() ->
spec(SerName) ->
{dgiot_channelx_mgr, {dgiot_channelx_mgr, start_link, [SerName]}, permanent, 5000, supervisor, [dgiot_channelx_mgr]}.
status(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, status);
status(Id) ->
gen_statem:call(name(Id), status).
name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
do_event(ChannelType, ChannelId, EventId, Event) ->
do_event(ChannelType, ChannelId, EventId, Event, 5000).

View File

@ -16,6 +16,7 @@
-module(dgiot_channelx_sup).
-author("johnliu").
-include_lib("dgiot/include/logger.hrl").
-behaviour(supervisor).
%% API
@ -116,6 +117,7 @@ init([sup, ServerName, ChannelType, ChannelId, Mod]) ->
init([ChannelType, ChannelId, Mod]) ->
Name = binary_to_atom(dgiot_channelx:get_name(ChannelType, ChannelId), utf8),
?LOG(error,"Name ~p",[Name]),
case dgiot_data:lookup({Name, channel}) of
{ok, {_, ChannelArgs}} ->
Size = maps:get(<<"Size">>, ChannelArgs, 5),

View File

@ -18,6 +18,10 @@
-module(dgiot_mqtt).
-author("jonhliu").
-include("dgiot_mqtt.hrl").
-include_lib("dgiot/include/logger.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-export([has_routes/1
, subscribe/1
, unsubscribe/1
@ -26,7 +30,9 @@
, shared_sub/3
, shared_unsub/3
, get_payload/1
, get_topic/1]).
, get_topic/1
, get_channel/1
, republish/2]).
has_routes(Topic) ->
emqx_router:has_routes(Topic).
@ -68,3 +74,71 @@ get_payload(Msg) ->
get_topic(Msg) ->
Msg#message.topic.
get_channel(#{
?BINDING_KEYS := #{
'Params' := Params
}}) ->
maps:get(<<"channel">>, Params, <<"">>);
get_channel(_) ->
<<"">>.
republish(Selected, #{
qos := QoS, flags := Flags, timestamp := Timestamp,
?BINDING_KEYS := #{
'_Id' := ActId,
'Params' := _Params,
'TopicTks' := TopicTks,
'PayloadTks' := PayloadTks
} = Bind}) ->
?LOG(error,"Selected ~p ",[Selected]),
TargetQoS = maps:get('TargetQoS',Bind,0),
Msg = #message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end,
from = ActId,
flags = Flags,
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
timestamp = Timestamp
},
?LOG(error,"Msg ~p ",[Msg]),
_ = emqx_broker:safe_publish(Msg),
emqx_rule_metrics:inc_actions_success(ActId),
emqx_metrics:inc_msg(Msg);
republish(Selected, #{
?BINDING_KEYS := #{
'_Id' := ActId,
'Params' := _Params,
'TopicTks' := TopicTks,
'PayloadTks' := PayloadTks
} = Bind}) ->
?LOG(error,"Selected ~p ",[Selected]),
TargetQoS = maps:get('TargetQoS',Bind,0),
Msg = #message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end,
from = ActId,
flags = #{dup => false, retain => false},
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = emqx_rule_utils:proc_tmpl(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond)
},
?LOG(error,"Msg ~p ",[Msg]),
_ = emqx_broker:safe_publish(Msg),
emqx_rule_metrics:inc_actions_success(ActId),
emqx_metrics:inc_msg(Msg);
republish(_Selected, Envs = #{
topic := Topic,
headers := #{republish_by := ActId},
?BINDING_KEYS := #{'_Id' := ActId}
}) ->
?LOG(error, " msg topic: ~p, target topic: ~p",
[Topic, ?bound_v('TargetTopic', Envs)]),
emqx_rule_metrics:inc_actions_error(?bound_v('_Id', Envs)).

View File

@ -17,10 +17,9 @@
-module(dgiot_bridge_actions).
-author("zwx").
-include_lib("dgiot/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-define(RESOURCE_TYPE, 'dgiot_resource').
-define(RESOURCE_TYPE_DGIOT, 'dgiot_resource').
-define(RESOURCE_CONFIG_SPEC, #{
channel => #{
@ -30,95 +29,132 @@
default => <<"">>,
title => #{
en => <<"DGIOT Channel ID">>,
zh => <<"数蛙通道ID"/utf8>>
zh => <<"物联网通道ID"/utf8>>
},
description => #{
en => <<"DGIOT Channel ID">>,
zh => <<"数蛙通道ID"/utf8>>}
zh => <<"物联网通道ID"/utf8>>}
}
}).
-define(ACTION_DATA_SPEC, #{
'$resource' => ?ACTION_PARAM_RESOURCE,
target_topic => #{
order => 1,
type => string,
required => true,
default => <<"thing/${productid}/${clientid}/post">>,
title => #{en => <<"Target Topic">>,
zh => <<"目的主题"/utf8>>},
description => #{en => <<"To which topic the message will be republished">>,
zh => <<"重新发布消息到哪个主题"/utf8>>}
},
target_qos => #{
order => 2,
type => number,
enum => [-1, 0, 1, 2],
required => true,
default => 0,
title => #{en => <<"Target QoS">>,
zh => <<"目的 QoS"/utf8>>},
description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>,
zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>}
},
payload_tmpl => #{
order => 3,
type => string,
input => textarea,
required => false,
default => <<"${payload}">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported">>,
zh => <<"消息内容模板,支持变量"/utf8>>}
}
}).
-define(ACTION_PARAM_RESOURCE, #{
type => string,
required => true,
title => #{en => <<"Resource ID">>, zh => <<"资源 ID"/utf8>>},
description => #{en => <<"Bind a resource to this action">>,
zh => <<"给动作绑定一个资源"/utf8>>}
}).
-resource_type(#{
name => ?RESOURCE_TYPE,
provider => dgiot,
name => ?RESOURCE_TYPE_DGIOT,
create => on_resource_create,
status => on_get_resource_status,
destroy => on_resource_destroy,
params => ?RESOURCE_CONFIG_SPEC,
title => #{en => <<"DGIOT Channel">>, zh => <<"数蛙通道"/utf8>>},
description => #{en => <<"MQTT message to DGIOT Channel">>, zh => <<"MQTT消息桥接到数蛙通道"/utf8>>}
title => #{en => <<"DGIOT Bridge">>, zh => <<"DGIOT Bridge"/utf8>>},
description => #{en => <<"MQTT Message Bridge">>, zh => <<"MQTT 消息桥接"/utf8>>}
}).
-rule_action(#{name => dgiot,
category => data_forward,
for => '$any',
types => [],
types => [?RESOURCE_TYPE_DGIOT],
create => on_action_create_dgiot,
params => ?RESOURCE_CONFIG_SPEC,
title => #{en => <<"DGIOT Channel">>,
zh => <<"数蛙通道"/utf8>>},
description => #{en => <<"Republish a MQTT message to DGIOT Channel">>,
zh => <<"重新发布消息到数蛙通道"/utf8>>}
params => ?ACTION_DATA_SPEC,
title => #{en => <<"DGIOT CHANNEL">>,
zh => <<"数蛙物联网通道"/utf8>>},
description => #{en => <<"Republish a MQTT message to dgiot channel">>,
zh => <<"重新发布消息到物联网通道"/utf8>>}
}).
-export([
on_resource_create/2,
on_get_resource_status/2,
on_resource_destroy/2
-export([on_resource_create/2
, on_get_resource_status/2
, on_resource_destroy/2
]).
%% callbacks for rule engine
-export([on_action_create_dgiot/2
, on_action_dgiot/2
]).
-export([on_action_dgiot/2
]).
-spec(on_resource_create(binary(), map()) -> map()).
on_resource_create(ResId, #{<<"channel">> := _ObjectId} = Params) ->
?LOG(error, "on_resource_destroy ~p,~p", [ResId, Params]),
#{<<"channel">> => false}.
%% application:ensure_all_started(dgiot_parse),
%% case dgiot_parse:get_object(<<"Channel">>, ObjectId) of
%% {ok, Channel} ->
%% #{<<"channel">> => Channel};
%% {error, #{<<"code">> := 101, <<"error">> := <<"Object not found.">>}} ->
%% case catch emqx_rule_registry:remove_resource(ResId) of
%% {'EXIT', {{throw, {dependency_exists, {rule, RuleId}}}, _}} ->
%% ok = emqx_rule_registry:remove_rule(RuleId),
%% ok = emqx_rule_registry:remove_resource(ResId);
%% _ ->
%% ok
%% end,
%% #{<<"channel">> => false};
%% {error, Reason} ->
%% ?LOG(error, "Resource create error, Channel:~s, Reason:~p", [ObjectId, Reason]),
%% #{<<"channel">> => false}
%% end.
on_resource_create(ResId, Conf) ->
?LOG(error, "ResId ~p, Conf ~p", [ResId, Conf]),
ChannelId = maps:get(<<"channel">>, Conf, <<"">>),
#{<<"channel">> => ChannelId}.
on_get_resource_status(ResId, #{<<"channel">> := Channel} = Params) ->
?LOG(error, "on_resource_destroy ~p,~p", [ResId, Params]),
case Channel of
false ->
#{is_alive => false};
-spec(on_get_resource_status(ResId :: binary(), Params :: map()) -> Status :: map()).
on_get_resource_status(_ResId, _Conf) ->
#{is_alive => true}.
on_resource_destroy(ResId, Conf) ->
?LOG(error, "on_resource_destroy ~p,~p", [ResId, Conf]),
case catch emqx_rule_registry:remove_resource(ResId) of
{'EXIT', {{throw, {dependency_exists, {rule, RuleId}}}, _}} ->
ok = emqx_rule_registry:remove_rule(RuleId),
ok = emqx_rule_registry:remove_resource(ResId);
_ ->
#{is_alive => true}
ok
end.
on_resource_destroy(ResId, Params) ->
?LOG(error, "on_resource_destroy ~p,~p", [ResId, Params]),
ok.
%%------------------------------------------------------------------------------
%% Action 'republish'
%% Action 'dgiot'
%%------------------------------------------------------------------------------
-spec on_action_create_dgiot(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
on_action_create_dgiot(Id, #{<<"channel">> := #{<<"cType">> := CType, <<"objectId">> := ChannelId}} = Params) ->
?LOG(error, "Id ~p", [Id, Params]),
fun(Msg, Env) ->
dgiot_channelx:do_message(CType, ChannelId, {rule, Msg, Env}, 60000)
end.
on_action_create_dgiot(_Id, Params = #{
<<"target_topic">> := TargetTopic,
<<"target_qos">> := _TargetQoS,
<<"payload_tmpl">> := PayloadTmpl
}) ->
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
?LOG(error, " msg topic: ~p, payload: ~p",
[TopicTks, PayloadTks]),
Params.
-spec on_action_dgiot(selected_data(), env_vars()) -> any().
on_action_dgiot(Selected, Envs ) ->
Channel = dgiot_mqtt:get_channel(Envs),
?LOG(error, "[dgiot] recursively Channel: ~p", [Channel]),
dgiot_mqtt:republish(Selected, Envs).
on_action_dgiot(Selected, _Envs) ->
?LOG(debug, "[republish] republish to, Payload: ~p", [Selected]).