feat: add dgiot_client

This commit is contained in:
jhonliu 2022-05-23 10:40:43 +08:00
parent 59a7efaa1e
commit b3bb039c1b
21 changed files with 538 additions and 712 deletions

View File

@ -18,7 +18,6 @@
-define(GLOBAL_TOPIC, <<"global/dgiot">>). -define(GLOBAL_TOPIC, <<"global/dgiot">>).
-define(DCACHE, dgiotdiskcache). -define(DCACHE, dgiotdiskcache).
-define(DEFREGISTRY, dgiot_global). -define(DEFREGISTRY, dgiot_global).
-define(DGIOT_CLIENT(Id), binary_to_atom(<<"dgiot_client_", Id/binary>>)).
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}). -define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
-define(CHILD2(I, Mod, Type, Args), {I, {Mod, start_link, Args}, permanent, 5000, Type, [Mod]}). -define(CHILD2(I, Mod, Type, Args), {I, {Mod, start_link, Args}, permanent, 5000, Type, [Mod]}).

View File

@ -0,0 +1,39 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-author("johnliu").
-record(dclock, {
nexttime :: non_neg_integer(), %%
endtime :: non_neg_integer(), %%
freq :: non_neg_integer(), %%
round :: non_neg_integer(), %%
rand :: boolean() %% ,
}).
-record(dclient, {
channel :: atom(), %%
client :: binary(), %%
status :: integer(), %% client的状态值
clock :: #dclock{}, %% client的闹铃
userdata %%
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% dclient的状态字
-define(DCLIENT_SUCCESS, 0). % CLIENT运行正常
-define(DCLIENT_INTIALIZED, 1). % CLIENT初始化状态

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2021-2022 DGIOT Technologies Co., Ltd. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -15,15 +15,21 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(dgiot_client). -module(dgiot_client).
-author("kenneth"). -author("johnliu").
-include("dgiot.hrl"). -include("dgiot.hrl").
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
%% API %% API
-export([register/3, start_link/2, start/2, stop/1, stop/2, get/2, restart/2, save/2, notify/3, set_consumer/2, get_consumer/1]). -export([register/3, start_link/2, add_clock/3, notify/3, add/2, set_consumer/2, get_consumer/1]).
-export([start/2, start/3, stop/1, stop/2, stop/3, restart/2, get/2, send/4]).
-export([get_nexttime/2, send_after/4]).
-export([add_clock/3]). -type(result() :: any()). %% todo
%% @doc client的通道管理池子
-spec register(atom() | binary(), atom(), map()) -> result().
register(ChannelId, Sup, State) when is_binary(ChannelId) ->
register(binary_to_atom(ChannelId), Sup, State);
register(ChannelId, Sup, State) -> register(ChannelId, Sup, State) ->
case dgiot_data:get({client, ChannelId}) of case dgiot_data:get({client, ChannelId}) of
not_find -> not_find ->
@ -32,29 +38,69 @@ register(ChannelId, Sup, State) ->
pass pass
end, end,
set_consumer(ChannelId, 100), set_consumer(ChannelId, 100),
dgiot_data:init(?DGIOT_CLIENT(ChannelId)), dgiot_data:init(ChannelId),
dgiot_data:delete({start_client, ChannelId}), dgiot_data:delete({start_client, ChannelId}),
dgiot_data:delete({stop_client, ChannelId}), dgiot_data:delete({stop_client, ChannelId}),
ChildSpec = dgiot:child_spec(Sup, supervisor, [?DGIOT_CLIENT(ChannelId)]), ChildSpec = dgiot:child_spec(Sup, supervisor, [ChannelId]),
[ChildSpec]. [ChildSpec].
save(ChannelId, ClientId) -> %% @doc client的Pid号
dgiot_data:insert(?DGIOT_CLIENT(ChannelId), ClientId, self()). -spec add(atom() | binary(), binary()) -> result().
add(ChannelId, ClientId) when is_binary(ChannelId) ->
add(binary_to_atom(ChannelId), ClientId);
add(ChannelId, ClientId) ->
dgiot_data:insert(ChannelId, ClientId, self()).
%% @doc client
-spec start(atom() | binary(), binary()) -> result().
start(ChannelId, ClientId) when is_binary(ChannelId) ->
start(binary_to_atom(ChannelId), ClientId);
start(ChannelId, ClientId) -> start(ChannelId, ClientId) ->
start(ChannelId, ClientId, #{}).
%% @doc client,
-spec start(atom() | binary(), binary(), map()) -> result().
start(ChannelId, ClientId, Args) when is_binary(ChannelId) ->
start(binary_to_atom(ChannelId), ClientId, Args);
start(ChannelId, ClientId, Args) ->
case dgiot_data:get({client, ChannelId}) of case dgiot_data:get({client, ChannelId}) of
State when is_map(State) -> State when is_map(State) ->
supervisor:start_child(?DGIOT_CLIENT(ChannelId), [State#{<<"client">> => ClientId}]); supervisor:start_child(ChannelId, [maps:merge(State#{<<"client">> => ClientId}, Args)]);
_ -> _ ->
pass pass
end. end.
%% @doc client
-spec stop(atom() | binary()) -> result().
stop(ChannelId) when is_binary(ChannelId) ->
stop(binary_to_atom(ChannelId));
stop(ChannelId) ->
case ets:info(ChannelId) of
undefined ->
pass;
_ ->
Fun =
fun
({_Key, Pid}) when is_pid(Pid) ->
supervisor:terminate_child(ChannelId, Pid);
(_) ->
pass
end,
dgiot_data:loop(ChannelId, Fun),
dgiot_data:clear(ChannelId)
end.
%% @doc stop client
-spec stop(atom() | binary(), binary()) -> result().
stop(ChannelId, ClientId) when is_binary(ChannelId) ->
stop(binary_to_atom(ChannelId), ClientId);
stop(ChannelId, ClientId) -> stop(ChannelId, ClientId) ->
case dgiot_data:get(?DGIOT_CLIENT(ChannelId), ClientId) of case dgiot_data:get(ChannelId, ClientId) of
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
case is_process_alive(Pid) of case is_process_alive(Pid) of
true -> true ->
supervisor:terminate_child(?DGIOT_CLIENT(ChannelId), Pid); io:format("~s ~p DtuId = ~p. Pid ~p ~n", [?FILE, ?LINE, ChannelId,Pid]),
supervisor:terminate_child(ChannelId, Pid);
_ -> _ ->
pass pass
end; end;
@ -62,25 +108,23 @@ stop(ChannelId, ClientId) ->
pass pass
end. end.
%% client %% @doc stop client
stop(ChannelId) -> -spec stop(atom() | binary(), binary(), non_neg_integer()) -> result().
case ets:info(?DGIOT_CLIENT(ChannelId)) of stop(ChannelId, ClientId, EndTime) when is_binary(ChannelId) ->
undefined -> stop(binary_to_atom(ChannelId), ClientId, EndTime);
pass; stop(ChannelId, ClientId, EndTime) ->
_ -> NowTime = dgiot_datetime:nowstamp(),
Fun = case NowTime > EndTime of
fun true ->
({_Key, Pid}) when is_pid(Pid) -> stop(ChannelId, ClientId)
supervisor:terminate_child(?DGIOT_CLIENT(ChannelId), Pid);
(_) ->
pass
end,
dgiot_data:loop(?DGIOT_CLIENT(ChannelId), Fun),
dgiot_data:clear(?DGIOT_CLIENT(ChannelId))
end. end.
%% @doc restart client
-spec restart(atom() | binary(), binary()) -> result().
restart(ChannelId, ClientId) when is_binary(ChannelId) ->
restart(binary_to_atom(ChannelId), ClientId);
restart(ChannelId, ClientId) -> restart(ChannelId, ClientId) ->
case dgiot_data:get(?DGIOT_CLIENT(ChannelId), ClientId) of case dgiot_data:get(ChannelId, ClientId) of
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
case is_process_alive(Pid) of case is_process_alive(Pid) of
true -> true ->
@ -93,8 +137,12 @@ restart(ChannelId, ClientId) ->
start(ChannelId, ClientId) start(ChannelId, ClientId)
end. end.
%% @doc get client info
-spec get(atom() | binary(), binary()) -> result().
get(ChannelId, ClientId) when is_binary(ChannelId) ->
get(binary_to_atom(ChannelId), ClientId);
get(ChannelId, ClientId) -> get(ChannelId, ClientId) ->
case dgiot_data:get(?DGIOT_CLIENT(ChannelId), ClientId) of case dgiot_data:get(ChannelId, ClientId) of
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
case is_process_alive(Pid) of case is_process_alive(Pid) of
true -> true ->
@ -106,8 +154,28 @@ get(ChannelId, ClientId) ->
offline offline
end. end.
%% @doc send message to client
-spec send(atom() | binary(), binary(), binary(), binary() | map()) -> result().
send(ChannelId, ClientId, Topic, Payload) when is_binary(ChannelId) ->
send(binary_to_atom(ChannelId), ClientId, Topic, Payload);
send(ChannelId, ClientId, Topic, Payload) ->
case dgiot_data:get(ChannelId, ClientId) of
Pid when is_pid(Pid) ->
case is_process_alive(Pid) of
true ->
Pid ! {dclient_ack, Topic, Payload},
ok;
false ->
fasle
end;
_ ->
fasle
end.
%% @doc client start_link
-spec start_link(atom(), map()) -> result().
start_link(Module, #{<<"channel">> := ChannelId, <<"client">> := Client} = State) -> start_link(Module, #{<<"channel">> := ChannelId, <<"client">> := Client} = State) ->
case dgiot_data:lookup(?DGIOT_CLIENT(ChannelId), Client) of case dgiot_data:lookup(dgiot_utils:to_atom(ChannelId), Client) of
{ok, Pid} when is_pid(Pid) -> {ok, Pid} when is_pid(Pid) ->
case is_process_alive(Pid) of case is_process_alive(Pid) of
true -> true ->
@ -119,20 +187,58 @@ start_link(Module, #{<<"channel">> := ChannelId, <<"client">> := Client} = State
gen_server:start_link(Module, [State], []) gen_server:start_link(Module, [State], [])
end. end.
%% @doc
-spec send_after(integer(), integer(), boolean(), any()) -> result().
send_after(RetryTime, Freq, true, Msg) ->
Seed = Freq * 200, % 20%
Rand = rand:uniform(Seed),
erlang:send_after(RetryTime * 1000 + Rand, self(), Msg);
send_after(RetryTime, _Freq, _, Msg) ->
erlang:send_after(RetryTime * 1000, self(), Msg).
get_nexttime(NextTime, Freq) ->
NowTime = dgiot_datetime:nowstamp(),
get_nexttime(NowTime, Freq, NextTime).
get_nexttime(NowTime, Freq, NextTime) when (NextTime > NowTime) ->
RetryTime = NextTime - NowTime,
erlang:send_after(RetryTime * 1000, self(), next_time),
NextTime + Freq;
get_nexttime(NowTime, Freq, NextTime) ->
get_nexttime(NowTime, Freq, NextTime + Freq).
%% @doc
-spec set_consumer(binary() | atom(), integer()) -> result().
set_consumer(ChannelId, PoolSize) when is_binary(ChannelId) ->
set_consumer(binary_to_atom(ChannelId), PoolSize);
set_consumer(ChannelId, PoolSize) -> set_consumer(ChannelId, PoolSize) ->
dgiot_data:set_consumer(?DGIOT_CLIENT(ChannelId), PoolSize). dgiot_data:set_consumer(ChannelId, PoolSize).
%% @doc
-spec get_consumer(binary() | atom()) -> result().
get_consumer(ChannelId) when is_binary(ChannelId) ->
get_consumer(binary_to_atom(ChannelId));
get_consumer(ChannelId) -> get_consumer(ChannelId) ->
dgiot_data:get_consumer(?DGIOT_CLIENT(ChannelId), 1). dgiot_data:get_consumer(ChannelId, 1).
%% , 10s %% , 10s
%% @doc
-spec add_clock(binary() | atom(), binary(), binary()) -> result().
add_clock(Channel, Start_time, End_time) when is_binary(Channel) ->
add_clock(dgiot_utils:to_atom(Channel), Start_time, End_time);
add_clock(Channel, Start_time, End_time) -> add_clock(Channel, Start_time, End_time) ->
io:format("Channel ~p, Start_time ~p, End_time ~p",[Channel, Start_time, End_time]), BinChannel = dgiot_utils:to_binary(Channel),
dgiot_cron:push(Channel, dgiot_datetime:to_localtime(Start_time), {?MODULE, notify, [Channel, start_client]}), dgiot_cron:push(BinChannel, dgiot_datetime:to_localtime(Start_time), {?MODULE, notify, [Channel, start_client]}),
dgiot_cron:push(<<Channel/binary,"_stop">>, dgiot_datetime:to_localtime(End_time), {?MODULE, notify, [Channel, stop_client]}). dgiot_cron:push(<<BinChannel/binary, "_stop">>, dgiot_datetime:to_localtime(End_time), {?MODULE, notify, [Channel, stop_client]}).
%% , 10s
%% @doc
-spec notify(any(), binary() | atom(), atom()) -> result().
notify(_Task, Channel, Type) when is_binary(Channel) ->
notify(_Task, binary_to_atom(Channel), Type);
notify(_Task, Channel, Type) -> notify(_Task, Channel, Type) ->
dgiot_channelx:do_message(Channel, Type), dgiot_channelx:do_message(atom_to_binary(Channel), Type),
timer:sleep(50), timer:sleep(50),
dgiot_data:insert({Type,Channel}, Type). dgiot_data:insert({Type, Channel}, Type).

View File

@ -28,6 +28,7 @@
, unsubscribe/1 , unsubscribe/1
, publish/3 , publish/3
, publish/4 , publish/4
, message/3
, shared_sub/3 , shared_sub/3
, shared_unsub/3 , shared_unsub/3
, get_payload/1 , get_payload/1
@ -65,6 +66,9 @@ publish(Client, Topic, Payload, check_route) ->
publish(Client, Topic, Payload, _) -> publish(Client, Topic, Payload, _) ->
publish(Client, Topic, Payload). publish(Client, Topic, Payload).
message(Client, Topic, Payload) ->
emqx_message:make(dgiot_utils:to_binary(Client), 0, Topic, Payload).
shared_sub(Group, Topic, SubPid) -> shared_sub(Group, Topic, SubPid) ->
emqx_shared_sub:subscribe(Group, Topic, SubPid). emqx_shared_sub:subscribe(Group, Topic, SubPid).

View File

@ -20,28 +20,21 @@
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
-behaviour(gen_server). -behaviour(gen_server).
%% API %% API
-export([start_link/6, start_link/4, start_link/5, start_link/3, start_link/7, send/2]). -export([start_link/1, send/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {host, port, child = #tcp{}, mod, reconnect_times, reconnect_sleep = 30}). -record(state, {host, port, child = #tcp{}, mod, reconnect_times, reconnect_sleep = 30}).
-define(TIMEOUT, 10000). -define(TIMEOUT, 10000).
-define(TCP_OPTIONS, [binary, {active, once}, {packet, raw}, {reuseaddr, false}, {send_timeout, ?TIMEOUT}]). -define(TCP_OPTIONS, [binary, {active, once}, {packet, raw}, {reuseaddr, false}, {send_timeout, ?TIMEOUT}]).
start_link(Args) ->
dgiot_client:start_link(?MODULE, Args).
start_link(Mod, Host, Port) -> init([#{<<"host">> := Host, <<"port">> := Port, <<"mod">> := Mod, <<"child">> := ChildState} = Args]) ->
start_link(Mod, Host, Port, undefined). io:format("~s ~p ~p ~n", [?FILE, ?LINE, ChildState]),
start_link(Mod, Host, Port, Args) ->
start_link(Mod, Host, Port, max, 30, Args).
start_link(Name, Mod, Host, Port, Args) ->
start_link(Name, Mod, Host, Port, max, 30, Args).
start_link(Mod, Host, Port, ReconnectTimes, ReconnectSleep, Args) ->
start_link(undefined, Mod, Host, Port, ReconnectTimes, ReconnectSleep, Args).
start_link(Name, Mod, Host, Port, ReconnectTimes, ReconnectSleep, Args) ->
Ip = dgiot_utils:to_list(Host), Ip = dgiot_utils:to_list(Host),
Port1 = dgiot_utils:to_int(Port), Port1 = dgiot_utils:to_int(Port),
ReconnectTimes = maps:get(<<"reconnect_times">>, Args, max),
ReconnectSleep = maps:get(<<"reconnect_sleep">>, Args, 30),
State = #state{ State = #state{
mod = Mod, mod = Mod,
host = Ip, host = Ip,
@ -49,19 +42,9 @@ start_link(Name, Mod, Host, Port, ReconnectTimes, ReconnectSleep, Args) ->
reconnect_times = ReconnectTimes, %% reconnect_times = ReconnectTimes, %%
reconnect_sleep = ReconnectSleep %% reconnect_sleep = ReconnectSleep %%
}, },
case Name of
undefined ->
gen_server:start_link(?MODULE, [State, Args], []);
_ ->
gen_server:start_link({local, list_to_atom(dgiot_utils:to_list(Name))}, ?MODULE, [State, Args], [])
end.
init([#state{mod = Mod} = State, Args]) ->
?LOG(info,"Mod ~p Args ~p ",[Mod, Args]),
Transport = gen_tcp, Transport = gen_tcp,
Child = #tcp{transport = Transport, socket = undefined}, Child = #tcp{transport = Transport, socket = undefined},
case Mod:init(Child#tcp{state = Args}) of case Mod:init(Child#tcp{state = ChildState}) of
{ok, ChildState} -> {ok, ChildState} ->
NewState = State#state{ NewState = State#state{
child = ChildState child = ChildState
@ -98,17 +81,17 @@ handle_cast(Msg, #state{mod = Mod, child = ChildState} = State) ->
%% 0 %% 0
handle_info(do_connect, State) -> handle_info(do_connect, State) ->
?LOG(info,"CONNECT CLOSE ~s:~p", [State#state.host, State#state.port]), ?LOG(info, "CONNECT CLOSE ~s:~p", [State#state.host, State#state.port]),
{stop, normal, State}; {stop, normal, State};
%% 0 %% 0
handle_info(connect_stop, State) -> handle_info(connect_stop, State) ->
?LOG(info,"CONNECT CLOSE ~s:~p", [State#state.host, State#state.port]), ?LOG(info, "CONNECT CLOSE ~s:~p", [State#state.host, State#state.port]),
{stop, normal, State}; {stop, normal, State};
handle_info({connection_ready, Socket}, #state{mod = Mod, child = ChildState} = State) -> handle_info({connection_ready, Socket}, #state{mod = Mod, child = ChildState} = State) ->
NewChildState = ChildState#tcp{socket = Socket}, NewChildState = ChildState#tcp{socket = Socket},
?LOG(info,"connection_ready ~p~n", [Socket]), ?LOG(info, "connection_ready ~p~n", [Socket]),
case Mod:handle_info(connection_ready, NewChildState) of case Mod:handle_info(connection_ready, NewChildState) of
{noreply, NewChildState1} -> {noreply, NewChildState1} ->
inet:setopts(Socket, [{active, once}]), inet:setopts(Socket, [{active, once}]),

View File

@ -14,9 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(task_sup). -module(tcp_client_sup).
-include("dgiot_task.hrl").
-behaviour(supervisor). -behaviour(supervisor).
-export([start_link/1, init/1]). -export([start_link/1, init/1]).
@ -25,16 +24,10 @@ start_link(Name) ->
supervisor:start_link({local, Name}, ?MODULE, []). supervisor:start_link({local, Name}, ?MODULE, []).
init([]) -> init([]) ->
ChildSpec = [dgiot:child_spec(dgiot_task_worker, worker)], ChildSpec = [dgiot:child_spec(dgiot_tcp_client, worker)],
{ok, {{simple_one_for_one, 5, 10}, ChildSpec}}. {ok, {{simple_one_for_one, 5, 10}, ChildSpec}}.

View File

@ -17,11 +17,7 @@
-define(AMIS, <<"AMIS">>). -define(AMIS, <<"AMIS">>).
-record(state, { -record(state, {
id, id,
env = #{}, env = #{}
dtuaddr = <<>>,
step = login,
ref = undefined,
search = <<"quick">>
}). }).
-record(task, {oldque = [], newque = [], freq = 0, heart = 0, dashboardId = <<>>, sessiontoken = <<>>}). -record(task, {oldque = [], newque = [], freq = 0, heart = 0, dashboardId = <<>>, sessiontoken = <<>>}).

View File

@ -44,18 +44,6 @@
}). }).
%% %%
-params(#{ -params(#{
<<"port">> => #{
order => 1,
type => integer,
required => true,
default => 61888,
title => #{
zh => <<"端口"/utf8>>
},
description => #{
zh => <<"侦听端口"/utf8>>
}
},
<<"ico">> => #{ <<"ico">> => #{
order => 102, order => 102,
type => string, type => string,
@ -76,27 +64,10 @@
start(ChannelId, ChannelArgs) -> start(ChannelId, ChannelArgs) ->
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs). dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).
%% init(?TYPE, ChannelId, _Args) ->
init(?TYPE, ChannelId, #{ dgiot_parse_hook:subscribe(<<"View">>, post, ChannelId),
<<"product">> := Products, dgiot_parse_hook:subscribe(<<"View/*">>, put, ChannelId, [<<"isEnable">>]),
<<"search">> := Search}) -> dgiot_parse_hook:subscribe(<<"View/*">>, delete, ChannelId),
lists:map(fun(X) ->
case X of
{ProductId, #{<<"ACL">> := Acl, <<"nodeType">> := 1,<<"thing">> := Thing}} ->
dgiot_data:insert({amis, ChannelId}, {ProductId, Acl, maps:get(<<"properties">>,Thing,[])});
_ ->
?LOG(info,"X ~p", [X]),
pass
end
end, Products),
dgiot_data:set_consumer(ChannelId, 20),
State = #state{
id = ChannelId,
search = Search
},
{ok, State, []};
init(?TYPE, _ChannelId, _Args) ->
{ok, #{}, #{}}. {ok, #{}, #{}}.
handle_init(State) -> handle_init(State) ->
@ -106,19 +77,19 @@ handle_init(State) ->
handle_event(_EventId, _Event, State) -> handle_event(_EventId, _Event, State) ->
{ok, State}. {ok, State}.
% SELECT clientid, payload, topic FROM "meter" handle_message({sync_parse, Pid, 'after', post, _Token, <<"View">>, QueryData}, State) ->
% SELECT clientid, disconnected_at FROM "$events/client_disconnected" WHERE username = 'dgiot' io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]),
% SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'dgiot' dgiot_bamis_view:post('after', QueryData),
handle_message({rule, #{clientid := DtuAddr, connected_at := _ConnectedAt}, #{peername := PeerName} = _Context}, #state{id = _ChannelId} = State) ->
?LOG(error,"DtuAddr ~p PeerName ~p",[DtuAddr,PeerName] ),
{ok, State}; {ok, State};
handle_message({rule, #{clientid := DevAddr, disconnected_at := _DisconnectedAt}, _Context}, State) -> handle_message({sync_parse, _Pid, 'after', put, _Token, <<"View">>, QueryData}, State) ->
?LOG(error,"DevAddr ~p ",[DevAddr] ), %% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, QueryData]),
dgiot_bamis_view:put('after', QueryData),
{ok, State}; {ok, State};
handle_message({rule, #{clientid := DevAddr, payload := Payload, topic := _Topic}, _Msg}, #state{id = ChannelId} = State) -> handle_message({sync_parse, _Pid, 'after', delete, _Token, <<"View">>, ObjectId}, State) ->
?LOG(error,"DevAddr ~p Payload ~p ChannelId ~p",[DevAddr,Payload,ChannelId] ), %% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Pid, ObjectId]),
dgiot_bamis_view:delete('after', ObjectId),
{ok, State}; {ok, State};
handle_message(_Message, State) -> handle_message(_Message, State) ->

View File

@ -0,0 +1,48 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc dgiot_bamis Protocol
-module(dgiot_bamis_view).
-include("dgiot_bamis.hrl").
-include_lib("dgiot/include/logger.hrl").
-dgiot_swagger(<<"amis">>).
-export([
post/2,
put/2,
delete/2
]).
post('before', Args) ->
%% io:format("~s ~p ~p ~p~n", [?FILE, ?LINE, Args, Id]),
Args;
post('after', Data) ->
%% io:format("~s ~p ~p~n", [?FILE, ?LINE, Data]),
Data.
put('before', Args) ->
%% io:format("~s ~p ~p ~p~n", [?FILE, ?LINE, Args, Id]),
Args;
put('after', Data) ->
%% io:format("~s ~p ~p~n", [?FILE, ?LINE, Data]),
Data.
delete('before', Args) ->
%% io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, Args, Id]),
Args;
delete('after', Data) ->
Data.

View File

@ -17,6 +17,7 @@
-behavior(dgiot_channelx). -behavior(dgiot_channelx).
-include("dgiot_bridge.hrl"). -include("dgiot_bridge.hrl").
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot/include/dgiot_socket.hrl").
-define(TYPE, <<"TCPC">>). -define(TYPE, <<"TCPC">>).
-record(state, {id, env}). -record(state, {id, env}).
%% API %% API
@ -27,6 +28,10 @@
%% Channel callback %% Channel callback
-export([init/3, handle_init/1, handle_event/3, handle_message/2, stop/3]). -export([init/3, handle_init/1, handle_event/3, handle_message/2, stop/3]).
%% tcp client callback
-define(MAX_BUFF_SIZE, 10 * 1024).
-export([init/1, handle_info/2, terminate/2]).
-channel(?TYPE). -channel(?TYPE).
-channel_type(#{ -channel_type(#{
cType => ?TYPE, cType => ?TYPE,
@ -64,42 +69,6 @@
zh => <<"端口"/utf8>> zh => <<"端口"/utf8>>
} }
}, },
<<"page_index">> => #{
order => 3,
type => integer,
required => false,
default => 1,
title => #{
zh => <<"起始记录号"/utf8>>
},
description => #{
zh => <<"起始记录号"/utf8>>
}
},
<<"page_size">> => #{
order => 4,
type => integer,
required => false,
default => 1,
title => #{
zh => <<"每页记录数"/utf8>>
},
description => #{
zh => <<"每页记录数"/utf8>>
}
},
<<"total">> => #{
order => 5,
type => integer,
required => false,
default => 1,
title => #{
zh => <<"总计页数"/utf8>>
},
description => #{
zh => <<"总计页数"/utf8>>
}
},
<<"ico">> => #{ <<"ico">> => #{
order => 102, order => 102,
type => string, type => string,
@ -121,18 +90,13 @@ start(ChannelId, ChannelArgs) ->
%% %%
init(?TYPE, ChannelId, Args) -> init(?TYPE, ChannelId,
State = #state{ #{<<"ip">> := Ip, <<"port">> := Port} = Args) ->
id = ChannelId, State = #state{id = ChannelId, env = Args},
env = Args NewArgs = #{<<"ip">> => Ip, <<"port">> => Port, <<"mod">> => ?MODULE, <<"child">> => #{}},
}, {ok, State, dgiot_client:register(ChannelId, tcp_client_sup, NewArgs)}.
{ok, State, []}.
handle_init(#state{ id = ChannelId, env = Args} = State) -> handle_init(State) ->
#{<<"product">> := Products, <<"ip">> := Ip, <<"port">> := Port} = Args,
lists:map(fun({ProductId, _Opt}) ->
start_client(ChannelId, ProductId, Ip, Port, Args)
end, Products),
{ok, State}. {ok, State}.
%% , %% ,
@ -147,28 +111,97 @@ stop(ChannelType, ChannelId, _State) ->
?LOG(warning, "channel stop ~p,~p", [ChannelType, ChannelId]), ?LOG(warning, "channel stop ~p,~p", [ChannelType, ChannelId]),
ok. ok.
start_client(ChannelId, ProductId, Ip, Port,
#{<<"page_index">> := PageIndex, <<"page_size">> := PageSize, <<"total">> := Total}) -> %% tcp client callback
Success = fun(Page) -> init(TCPState) ->
lists:map(fun(X) -> {ok, TCPState}.
case X of
#{<<"devaddr">> := DevAddr} -> handle_info(connection_ready, #tcp{state = #{productid := ProductId} = _State} = TCPState) ->
dgiot_tcpc_worker:start_connect(#{ rand:seed(exs1024),
<<"channelid">> => ChannelId, Time = erlang:round(rand:uniform() * 1 + 1) * 1000,
<<"auto_reconnect">> => 10, dgiot_tcp_client:send(TCPState, <<"login">>),
<<"reconnect_times">> => 3, case do_cmd(ProductId, connection_ready, <<>>, TCPState) of
<<"ip">> => Ip, default ->
<<"port">> => Port, erlang:send_after(Time, self(), login);
<<"productid">> => ProductId, _ ->
<<"hb">> => 60, pass
<<"devaddr">> => DevAddr end,
}); {noreply, TCPState};
_ ->
ok handle_info(#{<<"cmd">> := Cmd, <<"data">> := Data, <<"productId">> := ProductId}, TCPState) ->
end case do_cmd(ProductId, Cmd, Data, TCPState) of
end, Page) default ->
end, {noreply, TCPState};
Query = #{ Result ->
<<"where">> => #{<<"product">> => ProductId} Result
}, end;
dgiot_parse_loader:start(<<"Device">>, Query, PageIndex, PageSize, Total, Success).
handle_info(tcp_closed, #tcp{state = #{productid := ProductId} = _State} = TCPState) ->
case do_cmd(ProductId, tcp_closed, <<>>, TCPState) of
default ->
{noreply, TCPState};
Result ->
Result
end;
handle_info({tcp, Buff}, #tcp{buff = Old, state = #{productid := ProductId} = _State} = TCPState) ->
Data = <<Old/binary, Buff/binary>>,
case do_cmd(ProductId, tcp, Data, TCPState) of
default ->
{noreply, TCPState};
{noreply, Bin, State} ->
{noreply, TCPState#tcp{buff = Bin, state = State}};
{stop, Reason, State} ->
{stop, Reason, TCPState#tcp{state = State}};
Result ->
Result
end;
handle_info({deliver, _Topic, Msg}, #tcp{state = State} = TCPState) ->
Payload = dgiot_mqtt:get_payload(Msg),
?LOG(info, "Client recv from mqtt Payload ~p ~n ~p~n", [Payload, State]),
%% Message =
%% case jsx:is_json(Payload) of
%% true ->
%% jsx:decode(Payload, [{labels, binary}, return_maps]);
%% false ->
%% binary_to_term(Payload)
%% end,
{noreply, TCPState};
handle_info(login, #tcp{state = #{productid := ProductId, devaddr := DevAddr, hb := Hb} = _State} = TCPState) ->
Topic = <<"mock/", ProductId/binary, "/", DevAddr/binary>>,
dgiot_mqtt:subscribe(Topic),
erlang:send_after(Hb * 1000, self(), heartbeat),
?LOG(info, "~p ", [<<"login">>]),
dgiot_tcp_client:send(TCPState, <<"login">>),
{noreply, TCPState};
handle_info(heartbeat, #tcp{state = #{devaddr := _DevAddr, hb := Hb} = _State} = TCPState) ->
erlang:send_after(Hb * 1000, self(), heartbeat),
%% ?LOG(info,"~p ",[<<"heartbeat">>]),
dgiot_tcp_client:send(TCPState, <<"heartbeat">>),
{noreply, TCPState};
handle_info(_Info, TCPState) ->
{noreply, TCPState}.
terminate(_Reason, _TCPState) ->
ok.
do_cmd(ProductId, Cmd, Data, #tcp{state = #{id := ChannelId} = State} = TCPState) ->
case dgiot_hook:run_hook({tcp, ProductId}, [Cmd, Data, State]) of
{ok, NewState} ->
{noreply, TCPState#tcp{state = NewState}};
{reply, ProductId, Payload, NewState} ->
case dgiot_tcp_server:send(TCPState#tcp{state = NewState}, Payload) of
ok ->
ok;
{error, Reason} ->
dgiot_bridge:send_log(ChannelId, ProductId, "Send Fail, ~p, CMD:~p", [Cmd, Reason])
end,
{noreply, TCPState#tcp{state = NewState}};
_ ->
default
end.

View File

@ -29,7 +29,7 @@ post('after', #{<<"objectId">> := ProductId, <<"channel">> := Channel} = QueryDa
TaskchannelId = maps:get(<<"taskchannel">>, Channel, <<"">>), TaskchannelId = maps:get(<<"taskchannel">>, Channel, <<"">>),
Otherchannel = maps:get(<<"otherchannel">>, Channel, <<"">>), Otherchannel = maps:get(<<"otherchannel">>, Channel, <<"">>),
dgiot_product:add_product_relation(lists:flatten([Otherchannel]) ++ [TdchannelId] ++ [TaskchannelId], ProductId), dgiot_product:add_product_relation(lists:flatten([Otherchannel]) ++ [TdchannelId] ++ [TaskchannelId], ProductId),
%% io:format("~s ~p ~p ~n ",[?FILE,?LINE, QueryData]), %% io:format("~s ~p ~p ~n ",[?FILE,?LINE, QueryData]),
post('after', maps:without([<<"channel">>], QueryData)); post('after', maps:without([<<"channel">>], QueryData));
post('after', #{<<"objectId">> := ProductId, <<"producttemplet">> := #{<<"objectId">> := ProducttempletId}} = _QueryData) -> post('after', #{<<"objectId">> := ProductId, <<"producttemplet">> := #{<<"objectId">> := ProducttempletId}} = _QueryData) ->

View File

@ -194,7 +194,7 @@ create_scr_device(#{
SCREENADDR = <<"SCR_", GWAddr/binary>>, SCREENADDR = <<"SCR_", GWAddr/binary>>,
{_, #{<<"objectId">> := ProductId}} = {_, #{<<"objectId">> := ProductId}} =
scan_ipc(Env#{<<"IPCMAC">> => SCREENADDR}), scan_ipc(Env#{<<"IPCMAC">> => SCREENADDR}),
{_, #{<<"objectId">> := DeviceId1}} = dgiot_device:create_device(#{ dgiot_device:create_device(#{
<<"status">> => <<"ONLINE">>, <<"status">> => <<"ONLINE">>,
<<"devaddr">> => SCREENADDR, <<"devaddr">> => SCREENADDR,
<<"name">> => SCREENADDR, <<"name">> => SCREENADDR,
@ -208,8 +208,7 @@ create_scr_device(#{
<<"className">> => <<"Device">>, <<"className">> => <<"Device">>,
<<"objectId">> => GwDeviceId <<"objectId">> => GwDeviceId
} }
}), }).
create_instruct(Acl, ProductId, DeviceId1, SCREENADDR).
create_ipc_device(#{ create_ipc_device(#{
<<"IPCMAC">> := Mac, <<"IPCMAC">> := Mac,
@ -229,7 +228,7 @@ create_ipc_device(#{
DevAddr = <<"IPC_", IPCMAC/binary>>, DevAddr = <<"IPC_", IPCMAC/binary>>,
{_, #{<<"objectId">> := ProductId}} = {_, #{<<"objectId">> := ProductId}} =
scan_ipc(Env#{<<"IPCMAC">> => DevAddr}), scan_ipc(Env#{<<"IPCMAC">> => DevAddr}),
{_, #{<<"objectId">> := DeviceId}} = {_, #{<<"objectId">> := _DeviceId}} =
dgiot_device:create_device(#{ dgiot_device:create_device(#{
<<"status">> => <<"ONLINE">>, <<"status">> => <<"ONLINE">>,
<<"devaddr">> => DevAddr, <<"devaddr">> => DevAddr,
@ -246,7 +245,6 @@ create_ipc_device(#{
<<"objectId">> => GwDeviceId <<"objectId">> => GwDeviceId
} }
}), }),
create_instruct(Acl, ProductId, DeviceId, DevAddr),
Acc#{DevAddr => IPCIP} Acc#{DevAddr => IPCIP}
end end
end, #{}, MacList). end, #{}, MacList).
@ -261,28 +259,6 @@ get_ipc(DtuAddr) ->
end end
end, #{}, dgiot_device:get_sub_device(DtuAddr)). end, #{}, dgiot_device:get_sub_device(DtuAddr)).
create_instruct(ACL, ProductId, DeviceId, DevAddr) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := Thing}} ->
#{<<"properties">> := Props} = Thing,
NewProps =
lists:foldl(fun(X, Acc) ->
case X of
#{<<"name">> := <<"点播地址"/utf8>>,
<<"dataForm">> := DataForm} ->
#{<<"dataForm">> := DataForm} = X,
Acc ++ [X#{<<"dataForm">> => DataForm#{<<"address">> => <<DevAddr/binary, "/VIDEO">>}}];
_ -> Acc
end
end, [], Props),
Pn = <<DevAddr/binary, "/FFMPEG">>,
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/", Pn/binary>>,
dgiot_mqtt:subscribe(Topic),
dgiot_instruct:create(ProductId, DeviceId, Pn, ACL, <<"all">>, Thing#{
<<"properties">> => NewProps
});
_ -> pass
end.
get_path(DevAddr) -> get_path(DevAddr) ->
{file, Here} = code:is_loaded(?MODULE), {file, Here} = code:is_loaded(?MODULE),

View File

@ -173,8 +173,7 @@ create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) ->
case dgiot_parse:get_object(<<"Device">>, DeviceId) of case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"devaddr">> := _GWAddr}} -> {ok, #{<<"devaddr">> := _GWAddr}} ->
dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"ip">> => DTUIP, <<"status">> => <<"ONLINE">>}), dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"ip">> => DTUIP, <<"status">> => <<"ONLINE">>}),
dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC), dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC);
create_instruct(Acl, ProductId, DeviceId);
_ -> _ ->
dgiot_device:create_device(#{ dgiot_device:create_device(#{
<<"devaddr">> => DTUMAC, <<"devaddr">> => DTUMAC,
@ -188,8 +187,7 @@ create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) ->
<<"brand">> => Dtutype, <<"brand">> => Dtutype,
<<"devModel">> => DevType <<"devModel">> => DevType
}), }),
dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC), dgiot_task:save_pnque(ProductId, DTUMAC, ProductId, DTUMAC)
create_instruct(Acl, ProductId, DeviceId)
end, end,
Productname = Productname =
case dgiot_parse:get_object(<<"Product">>, ProductId) of case dgiot_parse:get_object(<<"Product">>, ProductId) of
@ -204,21 +202,3 @@ create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) ->
%% ?LOG(info, "Error2 ~p ", [Error2]), %% ?LOG(info, "Error2 ~p ", [Error2]),
{<<>>, <<>>} {<<>>, <<>>}
end. end.
create_instruct(ACL, DtuProductId, DtuDevId) ->
case dgiot_product:lookup_prod(DtuProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Properties}}} ->
lists:map(fun(Y) ->
case Y of
#{<<"dataSource">> := #{<<"slaveid">> := 256}} -> %%
pass;
#{<<"dataSource">> := #{<<"slaveid">> := SlaveId}} ->
Pn = dgiot_utils:to_binary(SlaveId),
%% ?LOG(info,"DtuProductId ~p DtuDevId ~p Pn ~p ACL ~p", [DtuProductId, DtuDevId, Pn, ACL]),
%% ?LOG(info,"Y ~p", [Y]),
dgiot_instruct:create(DtuProductId, DtuDevId, Pn, ACL, <<"all">>, #{<<"properties">> => [Y]});
_ -> pass
end
end, Properties);
_ -> pass
end.

View File

@ -100,7 +100,7 @@ init(?TYPE, ChannelId, #{
<<"ip">> := Ip, <<"ip">> := Ip,
<<"port">> := Port <<"port">> := Port
} = Args) -> } = Args) ->
{FileName, MinAddr, MaxAddr} = {_FileName, _MinAddr, _MaxAddr} =
case maps:find(<<"file">>, Args) of case maps:find(<<"file">>, Args) of
{ok, FileName1} -> {ok, FileName1} ->
{MinAddr1, MaxAddr1} = dgiot_product_csv:read_csv(ChannelId, FileName1), {MinAddr1, MaxAddr1} = dgiot_product_csv:read_csv(ChannelId, FileName1),
@ -109,18 +109,18 @@ init(?TYPE, ChannelId, #{
_ -> _ ->
{<<>>, 0, 100} {<<>>, 0, 100}
end, end,
dgiot_modbusc_tcp:start_connect(#{ %% dgiot_modbusc_tcp:start_connect(#{
<<"auto_reconnect">> => 10, %% <<"auto_reconnect">> => 10,
<<"reconnect_times">> => 3, %% <<"reconnect_times">> => 3,
<<"ip">> => Ip, %% <<"ip">> => Ip,
<<"port">> => Port, %% <<"port">> => Port,
<<"channelid">> => ChannelId, %% <<"channelid">> => ChannelId,
<<"hb">> => 10, %% <<"hb">> => 10,
<<"filename">> => FileName, %% <<"filename">> => FileName,
<<"minaddr">> => MinAddr, %% <<"minaddr">> => MinAddr,
<<"maxaddr">> => MaxAddr %% <<"maxaddr">> => MaxAddr
}), %% }),
{ok, #state{id = ChannelId}, []}. {ok, #state{id = ChannelId}, dgiot_client:register(ChannelId, tcp_client_sup, #{<<"ip">> => Ip, <<"port">> => Port})}.
handle_init(State) -> handle_init(State) ->
{ok, State}. {ok, State}.

View File

@ -18,36 +18,11 @@
-include_lib("dgiot/include/dgiot_socket.hrl"). -include_lib("dgiot/include/dgiot_socket.hrl").
%% API %% API
-export([init/1, handle_info/2, terminate/2]). -export([init/1, handle_info/2, terminate/2]).
-export([start_connect/1]).
-include("dgiot_modbus.hrl"). -include("dgiot_modbus.hrl").
-include_lib("dgiot/include/dgiot.hrl"). -include_lib("dgiot/include/dgiot.hrl").
-define(MAX_BUFF_SIZE, 10 * 1024). -define(MAX_BUFF_SIZE, 10 * 1024).
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
start_connect(_Opts =
#{
<<"auto_reconnect">> := Recon,
<<"reconnect_times">> := ReTimes,
<<"ip">> := Ip,
<<"port">> := Port,
<<"channelid">> := ChannelId,
<<"hb">> := HB,
<<"filename">> := FileName,
<<"minaddr">> := MinAddr,
<<"maxaddr">> := Maxaddr
}) ->
State = #state{
id = ChannelId,
hb = HB,
env = #{
data => <<>>,
filename => FileName,
minaddr => MinAddr,
maxaddr => Maxaddr
}
},
dgiot_tcp_client:start_link(?MODULE, Ip, Port, Recon, ReTimes, State).
init(TCPState) -> init(TCPState) ->
{ok, TCPState}. {ok, TCPState}.

View File

@ -15,7 +15,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-author("johnliu"). -author("johnliu").
-define(TYPE, <<"INSTRUCT">>).
-define(DGIOT_TASK, dgiot_task). -define(DGIOT_TASK, dgiot_task).
-define(DGIOT_PNQUE, dgiot_pnque). -define(DGIOT_PNQUE, dgiot_pnque).
-define(DGIOT_DATA_CACHE, dgiot_data_cache). -define(DGIOT_DATA_CACHE, dgiot_data_cache).

View File

@ -1,273 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dgiot_instruct).
-author("jonhl").
-include("dgiot_task.hrl").
-include_lib("dgiot/include/logger.hrl").
%% API
-export([
createsub/6,
create/6,
create_group/7,
get_instruct/4,
get_child_instruct/3
]).
createsub(ProductId, DeviceId, DtuAddr, ACL, Rotation, #{<<"parentDtu">> := ParentDtu}) ->
lists:map(fun(X) ->
#{
<<"route">> := #{DtuAddr := Pn},
<<"ACL">> := ACL,
<<"product">> := #{<<"thing">> := Thing}
} = X,
NewPn = <<DtuAddr/binary, "/", Pn/binary>>,
create(ProductId, DeviceId, NewPn, ACL, Rotation, Thing#{<<"parentDtu">> => ParentDtu})
end, dgiot_device:get_sub_device(DtuAddr)),
ok.
create(ProductId, DeviceId, Pn, ACL, Rotation, #{<<"properties">> := Props}) ->
lists:map(fun(X) ->
case X of
#{<<"dataForm">> := #{<<"strategy">> := <<"计算值"/utf8>>}} ->
pass;
#{<<"dataForm">> := #{<<"strategy">> := <<"主动上报"/utf8>>}} ->
pass;
#{<<"accessMode">> := Op, <<"dataForm">> := #{<<"address">> := Di, <<"order">> := Order} = DataForm,
<<"name">> := Name, <<"identifier">> := Identifier, <<"required">> := Enable} ->
case Di of
<<"">> -> pass;
_ ->
ObjectId = dgiot_parse_id:get_instructid(DeviceId, Pn, Di),
case dgiot_parse:get_object(<<"Instruct">>, ObjectId) of
{ok, _} ->
pass;
_ ->
Map = #{<<"ACL">> => ACL, <<"enable">> => Enable,
<<"product">> => #{
<<"__type">> => <<"Pointer">>,
<<"className">> => <<"Product">>,
<<"objectId">> => ProductId
},
<<"device">> => #{
<<"__type">> => <<"Pointer">>,
<<"className">> => <<"Device">>,
<<"objectId">> => DeviceId
},
<<"name">> => Name, <<"order">> => dgiot_utils:to_binary(Order),
<<"pn">> => Pn, <<"di">> => Di,
<<"op">> => Op, <<"interval">> => 20,
<<"duration">> => 5, <<"rotation">> => Rotation,
<<"other">> => DataForm#{<<"identifier">> => Identifier}
},
dgiot_parse:create_object(<<"Instruct">>, Map)
end
end;
Other ->
?LOG(info, "Other ~p", [Other]),
pass
end
end, Props).
create_group(ProductId, DeviceId, Group, Pn, ACL, Rotation, #{<<"properties">> := Props} = Thing) ->
lists:map(fun(X) ->
#{
<<"accessMode">> := Op,
<<"dataForm">> := #{
<<"address">> := Di
},
<<"name">> := Name,
<<"identifier">> := Identifier,
<<"required">> := Enable
} = X,
case Di of
<<"">> -> pass;
_ -> case dgiot_parse:query_object(<<"Instruct">>, #{<<"where">> => #{
<<"product">> => ProductId,
<<"device">> => DeviceId,
<<"pn">> => Pn,
<<"di">> => Di}}) of
{ok, #{<<"results">> := []}} ->
Other = maps:without([<<"properties">>], Thing),
Map = #{
<<"ACL">> => ACL,
<<"enable">> => Enable,
<<"product">> => #{
<<"__type">> => <<"Pointer">>,
<<"className">> => <<"Product">>,
<<"objectId">> => ProductId
},
<<"device">> => #{
<<"__type">> => <<"Pointer">>,
<<"className">> => <<"Device">>,
<<"objectId">> => DeviceId
},
<<"name">> => Name,
<<"order">> => Group,
<<"pn">> => Pn,
<<"di">> => Di,
<<"op">> => Op,
<<"interval">> => 30,
<<"duration">> => 5,
<<"rotation">> => Rotation,
<<"other">> => Other#{<<"identifier">> => Identifier}
},
dgiot_parse:create_object(<<"Instruct">>, Map);
_ ->
pass
end
end
end, Props).
init_que(DeviceId, Round) ->
case dgiot_parse:query_object(<<"Instruct">>, #{<<"order">> => <<"-order">>, <<"where">> => #{<<"device">> => DeviceId}}) of
{ok, #{<<"results">> := []}} ->
[];
{ok, #{<<"results">> := List}} ->
NewList = lists:foldl(
fun(X, Acc) ->
case X of
#{<<"enable">> := true, <<"op">> := Op, <<"order">> := Order, <<"pn">> := Pn, <<"di">> := Di,
<<"interval">> := Interval, <<"other">> := DataForm} ->
Identifier = maps:get(<<"accessMode">>, DataForm, <<"">>),
AccessMode = maps:get(<<"accessMode">>, DataForm, Op),
Address = maps:get(<<"address">>, DataForm, Di),
Protocol = maps:get(<<"protocol">>, DataForm, <<"">>),
ThingRound = maps:get(<<"round">>, DataForm, <<"all">>),
InstructOrder = maps:get(<<"order">>, DataForm, Order),
Data = maps:get(<<"data">>, DataForm, <<"null">>),
Control = maps:get(<<"control">>, DataForm, "%d"),
NewData = dgiot_task:get_control(Round, Data, Control),
Strategy = dgiot_utils:to_int(maps:get(<<"strategy">>, DataForm, Interval)),
case ThingRound of
<<"all">> ->
Acc ++ [{Order, {InstructOrder, Strategy, Identifier, Pn, Address, AccessMode, NewData, Protocol, ThingRound}}];
Round ->
Acc ++ [{Order, {InstructOrder, Strategy, Identifier, Pn, Address, AccessMode, NewData, Protocol, ThingRound}}];
RoundList ->
case lists:member(Round, RoundList) of
true ->
Acc ++ [{Order, {InstructOrder, Strategy, Identifier, Pn, Address, AccessMode, NewData, Protocol, ThingRound}}];
false ->
Acc
end
end;
_ -> Acc
end
end, [], List),
lists:foldl(fun(X, Acc1) ->
{_, Y} = X,
Acc1 ++ [Y]
end, [], lists:keysort(1, NewList));
_ -> []
end.
get_instruct(ProductId, _DeviceId, Round, thing) ->
get_instruct(ProductId, Round);
get_instruct(ProductId, DeviceId, Round, instruct) ->
get_que(ProductId, DeviceId, Round).
get_instruct(ProductId, Round) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} when length(Props) > 0 ->
{_, NewList} = lists:foldl(fun(X, Acc) ->
{Order, List} = Acc,
case X of
#{<<"dataForm">> := #{<<"strategy">> := <<"计算值"/utf8>>}} ->
Acc;
#{<<"dataForm">> := #{<<"strategy">> := <<"主动上报"/utf8>>}} ->
Acc;
#{<<"accessMode">> := AccessMode,
<<"identifier">> := Identifier,
<<"dataType">> := #{<<"specs">> := #{<<"min">> := Min}},
<<"dataForm">> := DataForm,
<<"dataSource">> := DataSource} ->
Protocol = maps:get(<<"protocol">>, DataForm, <<"">>),
NewDataSource = dgiot_task_data:get_datasource(Protocol, DataSource),
ThingRound = maps:get(<<"round">>, DataForm, <<"all">>),
InstructOrder = maps:get(<<"order">>, DataForm, Order),
Control = maps:get(<<"control">>, DataForm, "%d"),
NewData = dgiot_task:get_control(Round, Min, Control),
Strategy = dgiot_utils:to_int(maps:get(<<"strategy">>, DataForm, 20)),
BinRound = dgiot_utils:to_binary(Round),
case ThingRound of
<<"all">> ->
{Order + 1, List ++ [{InstructOrder, Strategy, Identifier, AccessMode, NewData, Protocol, NewDataSource, ThingRound}]};
BinRound ->
{Order + 1, List ++ [{InstructOrder, Strategy, Identifier, AccessMode, NewData, Protocol, NewDataSource, ThingRound}]};
Rounds ->
RoundList = binary:split(Rounds, <<",">>, [global]),
case lists:member(BinRound, RoundList) of
true ->
{Order + 1, List ++ [{InstructOrder, Strategy, Identifier, AccessMode, NewData, Protocol, NewDataSource, ThingRound}]};
false ->
Acc
end
end;
_ ->
Acc
end
end, {1, []}, Props),
lists:keysort(1, NewList);
_ ->
[]
end.
get_child_instruct(DeviceId, Round, thing) ->
case dgiot_parse:query_object(<<"Device">>, #{<<"where">> => #{<<"parentId">> => DeviceId}}) of
{ok, #{<<"results">> := ChildDevices}} ->
lists:foldl(fun(#{<<"product">> := #{<<"objectId">> := ProductId}}, Acc) ->
Acc ++ dgiot_instruct:get_instruct(ProductId, DeviceId, Round, thing)
end, [], ChildDevices);
_ ->
[]
end.
get_que(_ProductId, DeviceId, Round) ->
case dgiot_data:get({instuct, DeviceId}) of
not_find ->
Que = init_que(DeviceId, Round),
dgiot_data:insert({instuct, DeviceId}, Que),
Que;
Que ->
NewQue = get_que_(Que, Round),
dgiot_data:insert({instuct, DeviceId}, NewQue),
NewQue
end.
get_que_(Que, Round) ->
lists:foldl(fun(X, Acc) ->
case X of
{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound} ->
case ThingRound of
<<"all">> ->
Acc ++ [{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound}];
Round ->
Acc ++ [{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound}];
RoundList ->
case lists:member(Round, RoundList) of
true ->
Acc ++ [{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound}];
false ->
Acc
end
end;
_ ->
Acc
end
end, [], Que).

View File

@ -19,8 +19,8 @@
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl"). -include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-export([start/1, start/2, save_pnque/4, get_pnque/1, del_pnque/1, save_td/4]). -export([start/1, start/2, send/3, get_pnque_len/1, save_pnque/4, get_pnque/1, del_pnque/1, save_td/4]).
-export([get_control/3, get_collection/4, get_calculated/2, string2value/2, string2value/3]). -export([get_control/3, get_collection/4, get_calculated/2, get_instruct/2, string2value/2, string2value/3]).
start(ChannelId) -> start(ChannelId) ->
lists:map(fun(Y) -> lists:map(fun(Y) ->
@ -35,6 +35,15 @@ start(ChannelId) ->
start(ChannelId, ClientId) -> start(ChannelId, ClientId) ->
dgiot_client:start(ChannelId, ClientId). dgiot_client:start(ChannelId, ClientId).
send(ProductId, DevAddr, Payload) ->
case dgiot_data:get({?TYPE, ProductId}) of
not_find ->
pass;
ChannelId ->
Topic = <<"$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/properties/report">>,
dgiot_client:send(ChannelId, DevAddr, Topic, Payload)
end.
%% %%
get_calculated(ProductId, Ack) -> get_calculated(ProductId, Ack) ->
case dgiot_product:lookup_prod(ProductId) of case dgiot_product:lookup_prod(ProductId) of
@ -126,6 +135,52 @@ get_control(Round, Data, Control) ->
dgiot_task:string2value(Str1, <<"type">>) dgiot_task:string2value(Str1, <<"type">>)
end. end.
get_instruct(ProductId, Round) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} when length(Props) > 0 ->
{_, NewList} = lists:foldl(fun(X, Acc) ->
{Seq, List} = Acc,
case X of
#{<<"dataForm">> := #{<<"strategy">> := <<"计算值"/utf8>>}} -> %%
Acc;
#{<<"dataForm">> := #{<<"strategy">> := <<"主动上报"/utf8>>}} -> %%
Acc;
#{<<"accessMode">> := AccessMode,
<<"identifier">> := Identifier,
<<"dataType">> := #{<<"specs">> := #{<<"min">> := Min}},
<<"dataForm">> := DataForm,
<<"dataSource">> := DataSource} ->
Protocol = maps:get(<<"protocol">>, DataForm, <<"Dlink">>),
NewDataSource = dgiot_task_data:get_datasource(Protocol, DataSource), %%
Order = maps:get(<<"order">>, DataForm, Seq), %%
Control = maps:get(<<"control">>, DataForm, "%d"), %%
Data = dgiot_task:get_control(Round, Min, Control), %%
Interval = dgiot_utils:to_int(maps:get(<<"strategy">>, DataForm, 20)), %%
ThingRound = maps:get(<<"round">>, DataForm, <<"all">>), %%
BinRound = dgiot_utils:to_binary(Round), %%
case ThingRound of
<<"all">> -> %%
{Seq + 1, List ++ [{Order, Interval, Identifier, AccessMode, Data, NewDataSource}]};
BinRound ->
{Seq + 1, List ++ [{Order, Interval, Identifier, AccessMode, Data, NewDataSource}]};
Rounds ->
RoundList = binary:split(Rounds, <<",">>, [global]),
case lists:member(BinRound, RoundList) of
true ->
{Seq + 1, List ++ [{Order, Interval, Identifier, AccessMode, Data, NewDataSource}]};
false ->
Acc
end
end;
_ ->
Acc
end
end, {1, []}, Props),
lists:keysort(1, NewList);
_ ->
[]
end.
string2value(Str, <<"TEXT">>) when is_list(Str) -> string2value(Str, <<"TEXT">>) when is_list(Str) ->
%% eralng语法中. %% eralng语法中.
case string:find(Str, "%%") of case string:find(Str, "%%") of
@ -186,6 +241,14 @@ save_pnque(DtuProductId, DtuAddr, ProductId, DevAddr) ->
supervisor:start_child(?TASK_SUP(Channel), [Args#{<<"dtuid">> => DtuId}]) supervisor:start_child(?TASK_SUP(Channel), [Args#{<<"dtuid">> => DtuId}])
end. end.
get_pnque_len(DtuId) ->
case dgiot_data:get(?DGIOT_PNQUE, DtuId) of
not_find ->
0;
PnQue ->
length(PnQue)
end.
get_pnque(DtuId) -> get_pnque(DtuId) ->
case dgiot_data:get(?DGIOT_PNQUE, DtuId) of case dgiot_data:get(?DGIOT_PNQUE, DtuId) of
not_find -> not_find ->

View File

@ -20,7 +20,6 @@
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl"). -include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-include("dgiot_task.hrl"). -include("dgiot_task.hrl").
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
-define(TYPE, <<"INSTRUCT">>).
-record(state, {id, mod, product, env = #{}}). -record(state, {id, mod, product, env = #{}}).
-dgiot_data("ets"). -dgiot_data("ets").
@ -33,7 +32,6 @@
%% %%
-channel_type(#{ -channel_type(#{
cType => ?TYPE, cType => ?TYPE,
type => ?BRIDGE_CHL, type => ?BRIDGE_CHL,
title => #{ title => #{
@ -45,24 +43,8 @@
}). }).
%% %%
-params(#{ -params(#{
<<"mode">> => #{
order => 1,
type => enum,
required => false,
default => <<"物模型指令模式"/utf8>>,
enum => [
#{<<"value">> => <<"thing">>, <<"label">> => <<"物模型指令模式"/utf8>>},
#{<<"value">> => <<"instruct">>, <<"label">> => <<"设备指令模式"/utf8>>}
],
title => #{
zh => <<"指令模式"/utf8>>
},
description => #{
zh => <<"物模型指令模式:用产品物模型来生成采集指令;设备指令模式:设备配置的独立的指令"/utf8>>
}
},
<<"freq">> => #{ <<"freq">> => #{
order => 2, order => 1,
type => integer, type => integer,
required => false, required => false,
default => 180, default => 180,
@ -74,7 +56,7 @@
} }
}, },
<<"start_time">> => #{ <<"start_time">> => #{
order => 3, order => 2,
type => string, type => string,
required => false, required => false,
default => <<"2020-03-26 10:35:10"/utf8>>, default => <<"2020-03-26 10:35:10"/utf8>>,
@ -86,7 +68,7 @@
} }
}, },
<<"end_time">> => #{ <<"end_time">> => #{
order => 4, order => 3,
type => string, type => string,
required => false, required => false,
default => <<"2025-05-28 10:35:10"/utf8>>, default => <<"2025-05-28 10:35:10"/utf8>>,
@ -123,11 +105,15 @@ start(ChannelId, ChannelArgs) ->
%% %%
init(?TYPE, ChannelId, Args) -> init(?TYPE, ChannelId, Args) ->
NewArgs = maps:with([<<"freq">>, <<"mode">>], Args), #{<<"freq">> := Freq, <<"start_time">> := Start_time, <<"end_time">> := End_time} = Args,
#{<<"start_time">> := Start_time, <<"end_time">> := End_time} = Args,
dgiot_client:add_clock(ChannelId, Start_time, End_time), dgiot_client:add_clock(ChannelId, Start_time, End_time),
State = #state{id = ChannelId}, State = #state{id = ChannelId},
{ok, State, dgiot_client:register(ChannelId, task_sup, NewArgs#{<<"channel">> => ChannelId, <<"app">> => #{}})}. {ok, State, dgiot_client:register(ChannelId, task_sup, #{
<<"channel">> => ChannelId,
<<"starttime">> => dgiot_datetime:localtime_to_unixtime(dgiot_datetime:to_localtime(Start_time)),
<<"endtime">> => dgiot_datetime:localtime_to_unixtime(dgiot_datetime:to_localtime(End_time)),
<<"freq">> => Freq
})}.
handle_init(State) -> handle_init(State) ->
{ok, State}. {ok, State}.
@ -151,7 +137,7 @@ handle_message(start_client, #state{id = ChannelId} = State) ->
handle_message(stop_client, #state{id = ChannelId} = State) -> handle_message(stop_client, #state{id = ChannelId} = State) ->
io:format("~s ~p ChannelId =~p.~n", [?FILE, ?LINE, ChannelId]), io:format("~s ~p ChannelId =~p.~n", [?FILE, ?LINE, ChannelId]),
case dgiot_data:get({stop_client, ChannelId}) of case dgiot_data:get({stop_client, binary_to_atom(ChannelId)}) of
not_find -> not_find ->
dgiot_client:stop(ChannelId); dgiot_client:stop(ChannelId);
_ -> _ ->
@ -160,8 +146,8 @@ handle_message(stop_client, #state{id = ChannelId} = State) ->
{ok, State}; {ok, State};
handle_message(check_client, #state{id = ChannelId} = State) -> handle_message(check_client, #state{id = ChannelId} = State) ->
io:format("~s ~p ChannelId =~p.~n", [?FILE, ?LINE, ChannelId]), %% io:format("~s ~p ChannelId =~p.~n", [?FILE, ?LINE, ChannelId]),
case dgiot_data:get({stop_client, ChannelId}) of case dgiot_data:get({stop_client, binary_to_atom(ChannelId)}) of
not_find -> not_find ->
dgiot_task:start(ChannelId), dgiot_task:start(ChannelId),
erlang:send_after(1000 * 60 * 1, self(), check_client); erlang:send_after(1000 * 60 * 1, self(), check_client);

View File

@ -18,7 +18,7 @@
-include("dgiot_task.hrl"). -include("dgiot_task.hrl").
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl"). -include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-export([get_userdata/6, get_datasource/2]). -export([get_userdata/6, get_datasource/2, get_ack/4]).
get_userdata(ProductId, Identifier, _DataForm, #{<<"type">> := <<"geopoint">>}, Payload, Acc) -> get_userdata(ProductId, Identifier, _DataForm, #{<<"type">> := <<"geopoint">>}, Payload, Acc) ->
case maps:find(Identifier, Payload) of case maps:find(Identifier, Payload) of
@ -59,6 +59,18 @@ get_userdata(_ProductId, Identifier, #{<<"collection">> := Collection} = DataFor
end. end.
get_ack(ProductId, Payload, Dis, Ack) ->
NewPayload =
maps:fold(fun(K, V, Acc) ->
case dgiot_data:get({protocol, K, ProductId}) of
not_find ->
Acc#{K => V};
Identifier ->
Acc#{Identifier => V}
end
end, #{}, Payload),
dgiot_task:get_collection(ProductId, Dis, NewPayload, maps:merge(Ack, NewPayload)).
get_datasource(Protocol, DataSource) -> get_datasource(Protocol, DataSource) ->
case catch dgiot_hook:run_hook({datasource, Protocol}, DataSource) of case catch dgiot_hook:run_hook({datasource, Protocol}, DataSource) of
{ok, [Rtn | _]} -> {ok, [Rtn | _]} ->

View File

@ -17,42 +17,39 @@
-module(dgiot_task_worker). -module(dgiot_task_worker).
-author("johnliu"). -author("johnliu").
-include("dgiot_task.hrl"). -include("dgiot_task.hrl").
-include_lib("dgiot/include/dgiot_client.hrl").
-include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot/include/logger.hrl").
-behaviour(gen_server). -behaviour(gen_server).
%% gen_server callbacks %% gen_server callbacks
-export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([start_link/1, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(device_task, {
-record(task, {mode = thing, tid, firstid, dtuid, product, devaddr, dis = [], que, round, ref, ack = #{}, appdata = #{}, ts = 0, freq = 0, interval = 5}). pnque_len = 0 :: integer(), %%
product :: atom(), %% ID
devaddr :: binary(), %%
dique = [] :: list(), %%
interval = 3 :: integer(), %%
appdata = #{} :: map() %%
}).
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}). -define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
%%%===================================================================a
%%% APIa
%%%=================================================================== %%%===================================================================
%%% API start_link(Args) ->
%%%=================================================================== dgiot_client:start_link(?MODULE, Args).
start_link(State) ->
dgiot_client:start_link(?MODULE, State).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
init([#{<<"channel">> := ChannelId, <<"client">> := DtuId, <<"mode">> := Mode, <<"freq">> := Freq} = State]) -> init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"starttime">> := StartTime, <<"endtime">> := EndTime, <<"freq">> := Freq}]) ->
case dgiot_task:get_pnque(DtuId) of dgiot_client:add(ChannelId, ClientId),
not_find -> erlang:send_after(20, self(), init),
io:format("~s ~p State ~p ~n", [?FILE, ?LINE, State]), dgiot_metrics:inc(dgiot_task, <<"task">>, 1),
{stop, normal, State}; NextTime = dgiot_client:get_nexttime(StartTime, Freq),
{ProductId, DevAddr} -> io:format("~s ~p DtuId = ~p.~n", [?FILE, ?LINE, NextTime]),
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr), Dclient = #dclient{channel = ChannelId, client = ClientId, status = ?DCLIENT_INTIALIZED, clock = #dclock{nexttime = NextTime, endtime = EndTime, freq = Freq, round = 0}},
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, 1, dgiot_utils:to_atom(Mode)), {ok, Dclient};
%% ChildQue = dgiot_instruct:get_child_instruct(DeviceId, 1, dgiot_utils:to_atom(Mode)),
Nowstamp = dgiot_datetime:nowstamp(),
erlang:send_after(1000, self(), init),
Topic = <<"$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/properties/report">>,
dgiot_mqtt:subscribe(Topic),
dgiot_metrics:inc(dgiot_task, <<"task">>, 1),
dgiot_client:save(ChannelId, DtuId),
{ok, #task{mode = dgiot_utils:to_atom(Mode), dtuid = DtuId, product = ProductId, devaddr = DevAddr,
tid = ChannelId, firstid = DeviceId, que = Que, round = 1, ts = Nowstamp, freq = Freq}}
end;
init(A) -> init(A) ->
?LOG(error, "A ~p ", [A]). ?LOG(error, "A ~p ", [A]).
@ -75,72 +72,45 @@ handle_info(stop, State) ->
erlang:garbage_collect(self()), erlang:garbage_collect(self()),
{stop, normal, State}; {stop, normal, State};
handle_info(init, #task{dtuid = DtuId, mode = Mode, round = Round} = State) -> %%
%% io:format("~s ~p DtuId = ~p.~n", [?FILE, ?LINE, DtuId]), handle_info({change_clock, NextTime, EndTime, Freq}, #dclient{clock = Clock} = Dclient) ->
case dgiot_task:get_pnque(DtuId) of {noreply, Dclient#dclient{clock = Clock#dclock{nexttime = NextTime, endtime = EndTime, freq = Freq}}};
%% ,
handle_info(next_time, #dclient{ channel = Channel, client = Client, userdata = UserData,
clock = #dclock{round = Round, nexttime = NextTime, endtime = EndTime, freq = Freq, rand = Rand} = Clock} = Dclient) ->
io:format("~s ~p DtuId = ~p.~n", [?FILE, ?LINE, Client]),
dgiot_client:stop(Channel, Client, EndTime), %%
NewNextTime = dgiot_client:get_nexttime(NextTime, Freq),
case dgiot_task:get_pnque(Client) of
not_find -> not_find ->
?LOG(info, "DtuId ~p", [DtuId]), {noreply, Dclient#dclient{clock = Clock#dclock{ nexttime = NewNextTime}}};
{noreply, State};
{ProductId, DevAddr} -> {ProductId, DevAddr} ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
NewRound = Round + 1, NewRound = Round + 1,
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, NewRound, dgiot_utils:to_atom(Mode)), PnQueLen = dgiot_task:get_pnque_len(Client),
erlang:send_after(1000, self(), retry), DiQue = dgiot_task:get_instruct(ProductId, NewRound),
{noreply, State#task{product = ProductId, devaddr = DevAddr, round = NewRound, firstid = DeviceId, que = Que}} dgiot_client:send_after(10, Freq, Rand, read), %
{noreply, Dclient#dclient{userdata = UserData#device_task{product = ProductId, devaddr = DevAddr, pnque_len = PnQueLen, dique = DiQue},
clock = Clock#dclock{nexttime = NewNextTime, round = NewRound}}}
end; end;
%% %%
handle_info(read, #dclient{userdata = #device_task{dique = DiQue} } = State) when length(DiQue) == 0 ->
{noreply, get_next_pn(State)};
%%
handle_info(retry, State) -> handle_info(retry, State) ->
{noreply, send_msg(State)}; {noreply, send_msg(State)};
%% %% ACK消息触发进行新的指令发送
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = _ProductId1, devaddr = _DevAddr1, ack = Ack, que = Que} = State) when length(Que) == 0 -> handle_info({dclient_ack, Topic, Payload}, #dclient{userdata = Usedata} = State) ->
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
[<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] ->
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p ~ts: ~ts ", [?FILE, ?LINE, unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
NewPayload =
maps:fold(fun(K, V, Acc) ->
case dgiot_data:get({protocol, K, ProductId}) of
not_find ->
Acc#{K => V};
Identifier ->
Acc#{Identifier => V}
end
end, #{}, Payload),
NewAck = dgiot_task:get_collection(ProductId, Dis, NewPayload, maps:merge(Ack, NewPayload)),
dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1),
{noreply, get_next_pn(State#task{ack = NewAck, product = ProductId, devaddr = DevAddr})};
[<<"$dg">>, <<"thing">>, _ProductId, _DevAddr, <<"events">>] -> % todo
{noreply, get_next_pn(State#task{ack = Ack})};
_ ->
{noreply, get_next_pn(State#task{ack = Ack})}
end;
%% ACK消息触发抄表指令
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = _ProductId1, devaddr = _DevAddr1, ack = Ack} = State) ->
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1), dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1),
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of case binary:split(Topic, <<$/>>, [global, trim]) of
[<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] -> [<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] ->
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p ~ts: ~ts ", dgiot_task:save_td(ProductId, DevAddr, Payload, #{}),
[?FILE, ?LINE, unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]), {noreply, send_msg(State#dclient{userdata = Usedata#device_task{product = ProductId, devaddr = DevAddr}})};
NewPayload =
maps:fold(fun(K, V, Acc) ->
case dgiot_data:get({protocol, K, ProductId}) of
not_find ->
Acc#{K => V};
Identifier ->
Acc#{Identifier => V}
end
end, #{}, Payload),
NewAck = dgiot_task:get_collection(ProductId, Dis, NewPayload, maps:merge(Ack, NewPayload)),
{noreply, send_msg(State#task{ack = NewAck, product = ProductId, devaddr = DevAddr})};
[<<"$dg">>, <<"thing">>, _ProductId, _DevAddr, <<"events">>] -> % todo
{noreply, get_next_pn(State#task{ack = Ack})};
_ -> _ ->
{noreply, send_msg(State#task{ack = Ack})} {noreply, send_msg(State)}
end; end;
handle_info(_Msg, State) -> handle_info(_Msg, State) ->
@ -153,17 +123,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
send_msg(#task{ref = Ref, que = Que} = State) when length(Que) == 0 -> send_msg(#dclient{channel = Channel, userdata = #device_task{product = Product, devaddr = DevAddr, dique = DisQue} = UserData} = State) ->
case Ref of {InstructOrder, Interval, _, _, _, Protocol, _, _} = lists:nth(1, DisQue),
undefined -> {NewCount, _Payload, _Dis} =
pass;
_ -> erlang:cancel_timer(Ref)
end,
get_next_pn(State);
send_msg(#task{tid = Channel, product = Product, devaddr = DevAddr, ref = Ref, que = Que} = State) ->
{InstructOrder, Interval, _, _, _, Protocol, _, _} = lists:nth(1, Que),
{NewCount, _Payload, Dis} =
lists:foldl(fun(X, {Count, Acc, Acc1}) -> lists:foldl(fun(X, {Count, Acc, Acc1}) ->
case X of case X of
{InstructOrder, _, _, _, error, _, _, _} -> {InstructOrder, _, _, _, error, _, _, _} ->
@ -177,41 +139,14 @@ send_msg(#task{tid = Channel, product = Product, devaddr = DevAddr, ref = Ref, q
_ -> _ ->
{Count, Acc, Acc1} {Count, Acc, Acc1}
end end
end, {0, [], []}, Que), end, {0, [], []}, DisQue),
%% NewDisQue = lists:nthtail(NewCount, DisQue),
case Ref of
undefined ->
pass;
_ -> erlang:cancel_timer(Ref)
end,
NewQue = lists:nthtail(NewCount, Que),
dgiot_metrics:inc(dgiot_task, <<"task_send">>, 1), dgiot_metrics:inc(dgiot_task, <<"task_send">>, 1),
State#task{que = NewQue, dis = Dis, ref = erlang:send_after(Interval * 1000, self(), retry), interval = Interval}. erlang:send_after(Interval * 1000, self(), retry),
State#dclient{userdata = UserData#device_task{dique = NewDisQue, interval = Interval}}.
get_next_pn(#task{tid = _Channel, mode = Mode, dtuid = DtuId, firstid = DeviceId, product = _ProductId, devaddr = _DevAddr, round = Round, ref = Ref, interval = Interval} = State) ->
save_td(State),
{NextProductId, NextDevAddr} = dgiot_task:get_pnque(DtuId),
NextDeviceId = dgiot_parse_id:get_deviceid(NextProductId, NextDevAddr),
Que = dgiot_instruct:get_instruct(NextProductId, NextDeviceId, Round, Mode),
%% dgiot_bridge:send_log(Channel, NextProductId, NextDevAddr, "to_dev=> ~s ~p NextProductId ~p NextDevAddr ~p NextDeviceId ~p", [?FILE, ?LINE, NextProductId, NextDevAddr, NextDeviceId]),
NextTopic = <<"$dg/device/", NextProductId/binary, "/", NextDevAddr/binary, "/properties/report">>,
dgiot_mqtt:subscribe(NextTopic),
case Ref of
undefined ->
pass;
_ -> erlang:cancel_timer(Ref)
end,
timer:sleep(20),
NewRef =
case NextDeviceId of
DeviceId ->
erlang:send_after(1000, self(), init);
_ ->
erlang:send_after(Interval * 1000 - 20, self(), retry)
end,
State#task{product = NextProductId, devaddr = NextDevAddr, que = Que, dis = [], ack = #{}, ref = NewRef}.
save_td(#task{tid = Channel, product = ProductId, devaddr = DevAddr, ack = Ack, appdata = AppData}) ->
Data = dgiot_task:save_td(ProductId, DevAddr, Ack, AppData),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "save_td=> ~s ~p ProductId ~p DevAddr ~p : ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(Data))]).
get_next_pn(#dclient{client = CLient, clock = #dclock{round = Round}, userdata = UserData} = State) ->
{NextProductId, NextDevAddr} = dgiot_task:get_pnque(CLient),
DisQue = dgiot_task:get_instruct(NextProductId, Round),
NewState = State#dclient{ userdata = UserData#device_task{product = NextProductId, devaddr = NextDevAddr, dique = DisQue}},
send_msg(NewState).