feat:task

This commit is contained in:
dawnwinterLiu 2024-04-11 17:03:05 +08:00
parent 9f132d9e3a
commit f71b83ead5
5 changed files with 18 additions and 64 deletions

View File

@ -164,8 +164,7 @@ handle_message({rule, #{clientid := DevAddr, payload := Payload, topic := _Topic
handle_message(_Message, State) ->
{ok, State}.
stop(_ChannelType, ChannelId, _State) ->
dgiot_data:delete({check_connection, ChannelId}),
stop(_ChannelType, _ChannelId, _State) ->
ok.
get_app(Products) ->

View File

@ -46,19 +46,6 @@ init(#tcp{state = #state{id = ChannelId}} = TCPState) ->
{stop, not_find_channel}
end.
handle_info(check_connection, #tcp{state = #state{id = ChannelId, hb = Hb}} = TCPState) ->
Now = dgiot_datetime:now_secs(),
case dgiot_data:get({check_connection, ChannelId}) of
OldTime when (Now - OldTime) > (Hb + 60) ->
%%
dgiot_bridge:control_channel(ChannelId, <<"disable">>, <<>>),
timer:sleep(500),
dgiot_bridge:control_channel(ChannelId, <<"enable">>, <<>>);
_ ->
pass
end,
erlang:send_after(Hb * 1000, self(), check_connection),
{noreply, TCPState};
%% 9C A5 25 CD 00 DB
%% 11 04 02 06 92 FA FE
@ -67,7 +54,6 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
DtuAddr = dgiot_utils:binary_to_hex(Buff),
List = dgiot_utils:to_list(DtuAddr),
List1 = dgiot_utils:to_list(Buff),
erlang:send_after(600 * 1000, self(), check_connection),
case re:run(DtuAddr, Head, [{capture, first, list}]) of
{match, [Head]} when length(List) == Len ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DtuAddr),
@ -102,7 +88,6 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr,
<<"slaveId">> => Sh * 256 + Sl,
<<"address">> => H * 256 + L}) of
{_, Things} ->
dgiot_data:insert({check_connection, ChannelId}, dgiot_datetime:now_secs()),
NewTopic = <<"$dg/thing/", DtuProductId/binary, "/", DtuAddr/binary, "/properties/report">>,
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "~s ~p to task ~p ~ts ", [?FILE, ?LINE, NewTopic, unicode:characters_to_list(dgiot_json:encode(Things))]),
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DtuAddr),
@ -126,7 +111,6 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr,
<<"slaveId">> => SlaveId,
<<"address">> => Address}) of
{_, Things} ->
dgiot_data:insert({check_connection, ChannelId}, dgiot_datetime:now_secs()),
NewTopic = <<"$dg/thing/", DtuProductId/binary, "/", DtuAddr/binary, "/properties/report">>,
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "~s ~p to task ~p ~ts~n ", [?FILE, ?LINE, NewTopic, unicode:characters_to_list(dgiot_json:encode(Things))]),
DeviceId = dgiot_parse_id:get_deviceid(DtuProductId, DtuAddr),
@ -158,7 +142,6 @@ handle_info({deliver, _, Msg}, #tcp{state = #state{id = ChannelId} = State} = TC
[<<"$dg">>, <<"device">>, ProductId, DevAddr, <<"properties">>] ->
case jsx:decode(Payload, [{labels, binary}, return_maps]) of
#{<<"_dgiotTaskFreq">> := Freq, <<"slaveid">> := SlaveId, <<"address">> := Address} = DataSource ->
dgiot_data:insert({check_connection, ChannelId}, dgiot_datetime:now_secs()),
Data = modbus_rtu:to_frame(DataSource),
%% io:format("~s ~p Data = ~p.~n", [?FILE, ?LINE, dgiot_utils:to_hex(Data)]),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "Channel sends ~p to DTU ~p", [dgiot_utils:binary_to_hex(Data), DevAddr]),
@ -197,8 +180,7 @@ handle_call(_Msg, _From, TCPState) ->
handle_cast(_Msg, TCPState) ->
{noreply, TCPState}.
terminate(_Reason, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr, product = ProductId}} = _TCPState) ->
dgiot_data:delete({check_connection, ChannelId}),
terminate(_Reason, #tcp{state = #state{id = _ChannelId, devaddr = DtuAddr, product = ProductId}} = _TCPState) ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DtuAddr),
Taskchannel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_task:del_pnque(DeviceId),

View File

@ -19,7 +19,7 @@
-include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-export([start/1, start/2, send/3, get_pnque_len/1, save_pnque/4, get_pnque/1, del_pnque/1, save_td/4, merge_cache_data/3, save_cache_data/2]).
-export([start/1, send/3, get_pnque_len/1, save_pnque/4, get_pnque/1, del_pnque/1, save_td/4, merge_cache_data/3, save_cache_data/2]).
-export([get_props/1, get_control/3, get_collection/4, get_calculated/4, get_instruct/2, get_storage/2, string2value/2, string2value/3, get_statistic/7]).
-export([save_td_no_match/4, get_last_value/4]).
@ -102,31 +102,14 @@
start(ChannelId) ->
lists:map(fun(Y) ->
case Y of
{ClientId, _} ->
{ClientId, [{ProductId, _} | _]} ->
timer:sleep(1),
dgiot_data:insert({taskchannel_product, binary_to_atom(ProductId)}, ChannelId),
dgiot_client:start(ChannelId, ClientId);
_ ->
pass
end
end, ets:tab2list(?DGIOT_PNQUE)).
start(ChannelId, Products) when is_list(Products) ->
lists:map(fun({ProductId, _}) ->
dgiot_data:insert({taskchannel_product, binary_to_atom(ProductId)}, ChannelId),
Success = fun(Page) ->
lists:map(fun(#{<<"objectId">> := DeviceId}) ->
dgiot_client:start(ChannelId, DeviceId)
end, Page)
end,
Query = #{
<<"order">> => <<"updatedAt">>,
<<"keys">> => [<<"objectId">>],
<<"where">> => #{<<"product">> => ProductId}
},
dgiot_parse_loader:start(<<"Device">>, Query, 0, 100, 1000000, Success)
end, Products);
start(ChannelId, ClientId) ->
dgiot_client:start(ChannelId, ClientId).
end, ets:tab2list(dgiot_pnque)).
send(ProductId, DevAddr, Payload) ->
case dgiot_data:get({?TYPE, ProductId}) of
@ -442,13 +425,6 @@ save_pnque(DtuProductId, DtuAddr, ProductId, DevAddr) ->
Pn_que ->
New_Pn_que = dgiot_utils:unique_2(Pn_que ++ [{ProductId, DevAddr}]),
dgiot_data:insert(?DGIOT_PNQUE, DtuId, New_Pn_que)
end,
case dgiot_data:get({task_args, DtuProductId}) of
not_find ->
pass;
#{<<"channel">> := Channel} = Args ->
%% io:format("Args ~p.~n", [Args]),
supervisor:start_child(?TASK_SUP(Channel), [Args#{<<"dtuid">> => DtuId}])
end.
get_pnque_len(DtuId) ->

View File

@ -137,12 +137,13 @@ handle_event(_EventId, Event, State) ->
?LOG(info, "channel ~p", [Event]),
{ok, State}.
handle_message(start_client, #state{id = ChannelId, products = Products} = State) ->
handle_message(start_client, #state{id = ChannelId, products = _Products} = State) ->
%% io:format("~s ~p ChannelId = ~p.~n", [?FILE, ?LINE, ChannelId]),
case dgiot_data:get({start_client, ChannelId}) of
case dgiot_data:get({start_client, binary_to_atom(ChannelId)}) of
not_find ->
dgiot_task:start(ChannelId, Products),
erlang:send_after(1000 * 60 * 1, self(), check_client);
dgiot_task:start(ChannelId),
dgiot_data:insert({start_client, ChannelId}, ChannelId),
erlang:send_after(1000 * 60 * 1, self(), check_newdevice);
_ ->
pass
end,
@ -150,20 +151,16 @@ handle_message(start_client, #state{id = ChannelId, products = Products} = State
handle_message(stop_client, #state{id = ChannelId} = State) ->
%% io:format("~s ~p ChannelId = ~p.~n", [?FILE, ?LINE, ChannelId]),
case dgiot_data:get({stop_client, binary_to_atom(ChannelId)}) of
not_find ->
dgiot_client:stop(ChannelId);
_ ->
pass
end,
dgiot_client:stop(ChannelId),
dgiot_data:insert({stop_client, ChannelId}, ChannelId),
{ok, State};
handle_message(check_client, #state{id = ChannelId, products = Products} = State) ->
handle_message(check_newdevice, #state{id = ChannelId, products = _Products} = State) ->
%% io:format("~s ~p time ~p ChannelId = ~p.~n", [?FILE, ?LINE, dgiot_datetime:format(dgiot_datetime:now_secs(), <<"YY-MM-DD HH:NN:SS">>), ChannelId]),
case dgiot_data:get({stop_client, binary_to_atom(ChannelId)}) of
not_find ->
dgiot_task:start(ChannelId, Products),
erlang:send_after(1000 * 60 * 1, self(), check_client);
dgiot_task:start(ChannelId),
erlang:send_after(1000 * 60 * 1, self(), check_newdevice);
_ ->
pass
end,

View File

@ -148,7 +148,7 @@ send_msg(#dclient{channel = ChannelId, clock = #dclock{freq = Freq}, userdata =
Payload = dgiot_json:encode(DataSource#{<<"identifier">> => Identifier1, <<"_dgiotTaskFreq">> => Freq}),
%% io:format("~s ~p DataSource = ~p.~n", [?FILE, ?LINE, DataSource]),
dgiot_mqtt:publish(dgiot_utils:to_binary(ChannelId), Topic, Payload),
dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), Product, DevAddr, "~s ~p to dev => ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(dgiot_json:encode(DataSource))]),
dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), Product, DevAddr, "~s ~p to dev => ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(Payload)]),
{Count + 1, Acc ++ [DataSource], Acc1 ++ [Identifier1]};
_ ->
{Count, Acc, Acc1}