feat: add dgiot_mqtt bridge

This commit is contained in:
jonhliu 2023-04-26 23:43:47 +08:00
parent 7733f72e32
commit e7f0b29567
6 changed files with 46 additions and 17 deletions

View File

@ -190,7 +190,7 @@ get_message(Selected, #{?BINDING_KEYS := #{
<<"target_topic">> := Target_topic <<"target_topic">> := Target_topic
} = Params } = Params
}} = Envs) -> }} = Envs) ->
Republish = maps:get( <<"republish">>, Params, <<"channel">>), Republish = maps:get(<<"republish">>, Params, <<"channel">>),
message(Selected, ActId, Payload_tmpl, Target_topic, Target_qos, Republish, Envs); message(Selected, ActId, Payload_tmpl, Target_topic, Target_qos, Republish, Envs);
get_message(Selected, #{?BINDING_KEYS := #{ get_message(Selected, #{?BINDING_KEYS := #{
@ -202,7 +202,7 @@ get_message(Selected, #{?BINDING_KEYS := #{
<<"target_topic">> := Target_topic <<"target_topic">> := Target_topic
} = Params } = Params
}} = Envs) -> }} = Envs) ->
Republish = maps:get( <<"republish">>, Params, <<"channel">>), Republish = maps:get(<<"republish">>, Params, <<"channel">>),
message(Selected, ActId, Payload_tmpl, Target_topic, Target_qos, Republish, Envs); message(Selected, ActId, Payload_tmpl, Target_topic, Target_qos, Republish, Envs);
get_message(_Selected, Envs) -> get_message(_Selected, Envs) ->
@ -232,7 +232,7 @@ message(Selected, ActId, Payload_tmpl, Target_topic, Target_qos, Republish, Envs
deviceid => DeviceId, deviceid => DeviceId,
republish_by => ActId, republish_by => ActId,
republish_mod => Republish, republish_mod => Republish,
'TargetQoS' => Target_qos, 'TargetQoS' => Target_qos,
topic => Topic, topic => Topic,
payload => Payload, payload => Payload,
timestamp => maps:get(timestamp, Envs, erlang:system_time(millisecond)) timestamp => maps:get(timestamp, Envs, erlang:system_time(millisecond))

View File

@ -23,7 +23,7 @@
%% API %% API
-dgiot_data("ets"). -dgiot_data("ets").
-export([init_ets/0]). -export([init_ets/0, send/3]).
-export([start/2]). -export([start/2]).
-export([init/3, handle_event/3, handle_message/2, handle_init/1, stop/3]). -export([init/3, handle_event/3, handle_message/2, handle_init/1, stop/3]).
@ -154,6 +154,12 @@ init(?TYPE, ChannelId, ChannelArgs) ->
State = #state{ State = #state{
id = ChannelId id = ChannelId
}, },
case dgiot_bridge:get_products(ChannelId) of
{ok, ?TYPE, ProductIds} ->
[dgiot_data:insert(?DGIOT_MQTT_WORK, ProductId, ChannelId) || ProductId <- ProductIds];
_ ->
pass
end,
{ok, State, dgiot_mqttc_worker:childSpec(ChannelId, ChannelArgs)}. {ok, State, dgiot_mqttc_worker:childSpec(ChannelId, ChannelArgs)}.
%% %%
@ -173,3 +179,25 @@ handle_message(Message, State) ->
stop(ChannelType, ChannelId, _State) -> stop(ChannelType, ChannelId, _State) ->
?LOG(info, "channel stop ~p,~p", [ChannelType, ChannelId]), ?LOG(info, "channel stop ~p,~p", [ChannelType, ChannelId]),
ok. ok.
send(bridge, Topic, Payload) ->
case dgiot_mqtt:has_routes(<<"bridge/#">>) of
true ->
dgiot_mqtt:publish(Topic, <<"bridge/", Topic/binary>>, Payload);
_ ->
pass
end;
send(ProductId, Topic, Payload) ->
case dgiot_data:get(?DGIOT_MQTT_WORK, ProductId) of
not_find ->
case dgiot_mqtt:has_routes(<<"forward/#">>) of
true ->
dgiot_mqtt:publish(ProductId, <<"forward/", Topic/binary>>, Payload);
_ ->
pass
end;
ChannelId ->
dgiot_channelx:do_message(ChannelId, {forward, Topic, Payload})
end.

View File

@ -65,6 +65,11 @@ handle_info({publish, #{payload := Payload, topic := <<"bridge/", Topic/binary>>
dgiot_mqtt:publish(ChannelId, Topic, Payload), dgiot_mqtt:publish(ChannelId, Topic, Payload),
{noreply, State}; {noreply, State};
handle_info({forward, Topic, Payload}, #dclient{client = Client, channel = ChannelId} = State) ->
dgiot_bridge:send_log(ChannelId, "edge to cloud: Topic ~p Payload ~p ~n", [Topic, Payload]),
emqtt:publish(Client, Topic, Payload),
{noreply, State};
handle_info({deliver, _, Msg}, #dclient{client = Client, channel = ChannelId} = State) -> handle_info({deliver, _, Msg}, #dclient{client = Client, channel = ChannelId} = State) ->
case dgiot_mqtt:get_topic(Msg) of case dgiot_mqtt:get_topic(Msg) of
<<"forward/", Topic/binary>> -> <<"forward/", Topic/binary>> ->

View File

@ -39,7 +39,8 @@ put('before', #{<<"id">> := DeviceId, <<"profile">> := UserProfile} = Device) ->
_ -> _ ->
<<"$dg/device/", ProductId/binary, "/", Devaddr/binary, "/profile">> <<"$dg/device/", ProductId/binary, "/", Devaddr/binary, "/profile">>
end, end,
dgiot_mqtt:publish(DeviceId, ProfileTopic, jsx:encode(UserProfile)); dgiot_mqtt:publish(DeviceId, ProfileTopic, jsx:encode(UserProfile)),
dgiot_mqttc_channel:send(bridge, ProfileTopic, jsx:encode(UserProfile));
_ -> _ ->
pass pass
end; end;

View File

@ -349,18 +349,10 @@ dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage) ->
%% %%
NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>, NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>,
dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(AllData)), dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(AllData)),
%% %%
case dgiot_mqtt:has_routes(<<"forward/#">>) of
true ->
ForwardTopic = <<"forward/$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/properties/report">>,
dgiot_mqtt:publish(DeviceId, ForwardTopic, jsx:encode(AllData));
_ ->
pass
end,
%%
ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>), ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}), dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}),
%% save td %% save td
dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage), dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage),
Channel = dgiot_product_channel:get_taskchannel(ProductId), Channel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(Storage))]), dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(Storage))]),

View File

@ -114,6 +114,7 @@ handle_info({dclient_ack, Topic, Payload}, #dclient{channel = ChannelId, userdat
[<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] -> [<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] ->
dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), ProductId, DevAddr, "~s ~p recv => ~p ~ts ", [?FILE, ?LINE, Topic, unicode:characters_to_list(jsx:encode(Payload))]), dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), ProductId, DevAddr, "~s ~p recv => ~p ~ts ", [?FILE, ?LINE, Topic, unicode:characters_to_list(jsx:encode(Payload))]),
dgiot_task:save_td(ProductId, DevAddr, Payload, #{}), dgiot_task:save_td(ProductId, DevAddr, Payload, #{}),
dgiot_mqttc_channel:send(ProductId, Topic, Payload),
{noreply, send_msg(State#dclient{userdata = Usedata#device_task{product = ProductId, devaddr = DevAddr}})}; {noreply, send_msg(State#dclient{userdata = Usedata#device_task{product = ProductId, devaddr = DevAddr}})};
_ -> _ ->
io:format("~s ~p Topic = ~p.~n", [?FILE, ?LINE, Topic]), io:format("~s ~p Topic = ~p.~n", [?FILE, ?LINE, Topic]),
@ -145,8 +146,10 @@ send_msg(#dclient{channel = ChannelId, userdata = #device_task{ref = Ref, produc
case X of case X of
{InstructOrder, _, Identifier1, DataSource} -> {InstructOrder, _, Identifier1, DataSource} ->
Topic = <<"$dg/device/", Product/binary, "/", DevAddr/binary, "/properties">>, Topic = <<"$dg/device/", Product/binary, "/", DevAddr/binary, "/properties">>,
%% io:format("~s ~p DataSource = ~p.~n", [?FILE, ?LINE, DataSource]), Payload = jsx:encode(DataSource),
dgiot_mqtt:publish(dgiot_utils:to_binary(ChannelId), Topic, jsx:encode(DataSource)), %% io:format("~s ~p DataSource = ~p.~n", [?FILE, ?LINE, DataSource]),
dgiot_mqtt:publish(dgiot_utils:to_binary(ChannelId), Topic, Payload),
dgiot_mqttc_channel:send(bridge, Topic, Payload),
dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), Product, DevAddr, "~s ~p to dev => ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(jsx:encode(DataSource))]), dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), Product, DevAddr, "~s ~p to dev => ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(jsx:encode(DataSource))]),
{Count + 1, Acc ++ [DataSource], Acc1 ++ [Identifier1]}; {Count + 1, Acc ++ [DataSource], Acc1 ++ [Identifier1]};
_ -> _ ->