mirror of
https://gitee.com/dgiiot/dgiot.git
synced 2024-11-29 18:57:41 +08:00
fix: rm files and add grpc
This commit is contained in:
parent
a79dbc7e90
commit
8a6f2b9189
@ -1,372 +0,0 @@
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc
|
||||
%%
|
||||
%% task_server:
|
||||
%%
|
||||
%%
|
||||
%% Body^@
|
||||
%%
|
||||
%% @end
|
||||
|
||||
%%
|
||||
%% 执行任务调用
|
||||
%%
|
||||
%% task_server:run(Module, Fun, Args, Seconds)
|
||||
%%
|
||||
%% 例:5秒为周期打印,Hello, zwx
|
||||
%% Name = "zwx",
|
||||
%% task_server:run(io, format, ["Hello, ~p~n", [Name]], 5).
|
||||
%%
|
||||
%%
|
||||
%% 如果要自己控制任务的起停(TaskName为元子)
|
||||
%%
|
||||
%% 开启任务
|
||||
%% task_server:run(TaskName, Module, Fun, Args, Seconds),
|
||||
%%
|
||||
%% 停止任务
|
||||
%% task_server:clean(TaskName).
|
||||
%%
|
||||
%% 停止所有任务
|
||||
%% task_server:clean().
|
||||
%%
|
||||
%% 如果要更细的控制任务,比如执行次数或运行总时间
|
||||
%% Config =
|
||||
%% [
|
||||
%% {frequency, 5}, %% 频率,必填,根据unit为判断单位
|
||||
%%
|
||||
%% {unit, 0}, %% 频率单位, 0,1,2代表频率分别为单位为分 时 天,不填单位为秒
|
||||
%%
|
||||
%% {start_time, {{2017,11,19},{15,16,0}}, %% 启动时间,非必填,不填就为当前时间
|
||||
%%
|
||||
%% {run_time, 5 * 60} %% 任务运行总时间,非必填,不填无限制周期运行下去
|
||||
%%
|
||||
%% {count, 100} %% 执行次数,非必填,不填无限制周期运行下去
|
||||
%% ]
|
||||
%%
|
||||
%% task_server:run(TaskName, Config, {Module, Fun, Args})
|
||||
%%
|
||||
%% task_server:fun(TaskName, Config, fun() -> end)
|
||||
%%
|
||||
%% 例 以当前时间,5秒周期打印
|
||||
%% Name = "zwx",
|
||||
%% Config = [{frequency, 5}],
|
||||
%%
|
||||
%% task_server:run(task_test, Config, {io, format, ["Hello, ~p~n", [Name]]})
|
||||
%% 或
|
||||
%% task_server:run(task_test1, Config, fun() -> io:format("Hello, ~p~n", [Name]) end).
|
||||
|
||||
|
||||
-module(dgiot_cron_server).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% Include files
|
||||
% --------------------------------------------------------------------
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% External exports
|
||||
% --------------------------------------------------------------------
|
||||
-export([run/3, run/4, run/5, clean/1, start_link/3]).
|
||||
|
||||
% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
%-record(state, { name, start_time, frequency, callback, end_time, tref, mod, gettime }).
|
||||
-record(config, {start_time, unit, frequency, run_time, count}).
|
||||
|
||||
-define(DETS, task_config).
|
||||
-define(DETS_PATH, "data/task.config").
|
||||
|
||||
|
||||
% Config =
|
||||
%[
|
||||
% {start_time, {{2017,11,19},{15,16,0}},
|
||||
% {frequency, 5},
|
||||
% {unit, 0},
|
||||
% {run_time, 5 * 60}
|
||||
% {count, 100}
|
||||
%]
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% External API
|
||||
% --------------------------------------------------------------------
|
||||
|
||||
|
||||
run(Module, Fun, Args, Seconds) ->
|
||||
TaskName = list_to_atom(lists:concat([task_, dgiot_datetime:nowstamp()])),
|
||||
run(TaskName, Module, Fun, Args, Seconds).
|
||||
|
||||
run(TaskName, Module, Fun, Args, Seconds) ->
|
||||
run(TaskName, [{<<"frequency">>, Seconds}], {Module, Fun, Args}).
|
||||
|
||||
run(TaskName, Config, Fun) when is_record(Config, config) ->
|
||||
NewConfig = [
|
||||
{<<"start_time">>, Config#config.start_time},
|
||||
{<<"frequency">>, Config#config.frequency},
|
||||
{<<"unit">>, Config#config.unit},
|
||||
{<<"run_time">>, Config#config.run_time},
|
||||
{<<"count">>, Config#config.count}
|
||||
],
|
||||
run(TaskName, NewConfig, Fun);
|
||||
|
||||
run(TaskName, Config, Fun) when is_list(Config) ->
|
||||
case supervisor:start_child(dgiot_task_manager_sup, [TaskName, Config, Fun]) of
|
||||
{error, Why} ->
|
||||
{error, Why};
|
||||
{ok, Pid} ->
|
||||
?LOG(info,"Task[~p, ~p] run successful!~n", [TaskName, Pid]),
|
||||
{ok, Pid}
|
||||
end.
|
||||
|
||||
clean(TaskName) ->
|
||||
gen_server:call(TaskName, clean, 500).
|
||||
|
||||
|
||||
start_link(TaskName, Config, Fun) ->
|
||||
gen_server:start_link({local, TaskName}, ?MODULE, [TaskName, Config, Fun], []).
|
||||
|
||||
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% Function: init/1
|
||||
% Description: Initiates the server
|
||||
% Returns: {ok, State} |
|
||||
% {ok, State, Timeout} |
|
||||
% ignore |
|
||||
% {stop, Reason}
|
||||
% --------------------------------------------------------------------
|
||||
init([TaskName, Config, Fun]) ->
|
||||
?LOG(info,"TaskName ~p, Config ~p, Fun ~p",[TaskName, Config, Fun]),
|
||||
case parse_config(Config) of
|
||||
{ok, {StartTime, Frequency, EndTime}} ->
|
||||
Waitting = waitting(StartTime, Frequency),
|
||||
case is_complete(Waitting, EndTime) of
|
||||
true ->
|
||||
{stop, normal};
|
||||
false ->
|
||||
{ok, #{
|
||||
<<"name">> => TaskName,
|
||||
<<"mod">> => proplists:get_value(<<"mod">>, Config),
|
||||
<<"gettime">> => proplists:get_value(<<"gettime">>, Config),
|
||||
<<"start_time">> => StartTime,
|
||||
<<"frequency">> => Frequency,
|
||||
<<"callback">> => Fun,
|
||||
<<"end_time">> => EndTime,
|
||||
<<"tref">> => {0, next_time(Waitting)}
|
||||
}}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{stop, Reason}
|
||||
end.
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% Function: handle_call/3
|
||||
% Description: Handling call messages
|
||||
% Returns: {reply, Reply, State} |
|
||||
% {reply, Reply, State, Timeout} |
|
||||
% {noreply, State} |
|
||||
% {noreply, State, Timeout} |
|
||||
% {stop, Reason, Reply, State} | (terminate/2 is called)
|
||||
% {stop, Reason, State} (terminate/2 is called)
|
||||
% --------------------------------------------------------------------
|
||||
handle_call(clean, _From, State) ->
|
||||
{stop, normal, ok, State};
|
||||
|
||||
handle_call(Request, From, #{ <<"mod">> := Mod } = State) ->
|
||||
case Mod == undefined of
|
||||
true ->
|
||||
{reply, noreply, State};
|
||||
false ->
|
||||
case catch(apply(Mod, handle_call, [Request, From, State])) of
|
||||
{'EXIT', Reason} ->
|
||||
{reply, Reason, State};
|
||||
Reply ->
|
||||
Reply
|
||||
end
|
||||
end.
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% Function: handle_cast/2
|
||||
% Description: Handling cast messages
|
||||
% Returns: {noreply, State} |
|
||||
% {noreply, State, Timeout} |
|
||||
% {stop, Reason, State} (terminate/2 is called)
|
||||
% --------------------------------------------------------------------
|
||||
handle_cast(Msg, #{ <<"mod">> := Mod } = State) ->
|
||||
case Mod == undefined of
|
||||
true ->
|
||||
{noreply, State};
|
||||
false ->
|
||||
case catch(apply(Mod, handle_cast, [Msg, State])) of
|
||||
{'EXIT', _Reason} ->
|
||||
{noreply, State};
|
||||
Reply ->
|
||||
Reply
|
||||
end
|
||||
end.
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% Function: handle_info/2
|
||||
% Description: Handling all non call/cast messages
|
||||
% Returns: {noreply, State} |
|
||||
% {noreply, State, Timeout} |
|
||||
% {stop, Reason, State} (terminate/2 is called)
|
||||
% --------------------------------------------------------------------
|
||||
handle_info({timeout, _TimerRef, do}, State) ->
|
||||
handle_info(do, State);
|
||||
|
||||
handle_info(do, State = #{
|
||||
<<"end_time">> := EndTime,
|
||||
<<"frequency">> := Frequency,
|
||||
<<"tref">> := {Pre, TRef} }) ->
|
||||
NewTRef = next_time(TRef, Frequency),
|
||||
Now = dgiot_datetime:nowstamp(),
|
||||
case Now == Pre of
|
||||
false ->
|
||||
NewState = do_task(State#{ <<"tref">> => {Now, NewTRef} }),
|
||||
case is_complete(Frequency, EndTime) of
|
||||
true ->
|
||||
{stop, normal, NewState};
|
||||
false ->
|
||||
{noreply, NewState}
|
||||
end;
|
||||
true ->
|
||||
{noreply, State#{ <<"tref">> => {Now, NewTRef} } }
|
||||
end;
|
||||
|
||||
|
||||
handle_info(Info, #{ <<"mod">> := Mod } = State) ->
|
||||
case Mod == undefined of
|
||||
true ->
|
||||
{noreply, State};
|
||||
false ->
|
||||
case catch(apply(Mod, handle_info, [Info, State])) of
|
||||
{'EXIT', _Reason} ->
|
||||
{noreply, State};
|
||||
Reply ->
|
||||
Reply
|
||||
end
|
||||
end.
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% Function: terminate/2
|
||||
% Description: Shutdown the server
|
||||
% Returns: any (ignored by gen_server)
|
||||
% --------------------------------------------------------------------
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
% --------------------------------------------------------------------
|
||||
% Func: code_change/3
|
||||
% Purpose: Convert process state when code is changed
|
||||
% Returns: {ok, NewState}
|
||||
% --------------------------------------------------------------------
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
to_int(<<V:8>>) ->
|
||||
V;
|
||||
to_int(V) when is_list(V) ->
|
||||
list_to_integer(V);
|
||||
to_int(V) when is_integer(V) ->
|
||||
V;
|
||||
to_int(V) ->
|
||||
V.
|
||||
|
||||
|
||||
is_complete(Waitting, EndTime) when is_integer(EndTime) andalso is_integer(Waitting) ->
|
||||
dgiot_datetime:nowstamp() + Waitting div 1000 > EndTime;
|
||||
is_complete(_, _) ->
|
||||
false.
|
||||
|
||||
|
||||
next_time(TRef, Timeout) ->
|
||||
_ = erlang:cancel_timer(TRef),
|
||||
next_time(Timeout).
|
||||
next_time(Timeout) ->
|
||||
erlang:send_after(Timeout, self(), do).
|
||||
%% erlang:start_timer(Timeout, self(), do).
|
||||
|
||||
|
||||
% Config =
|
||||
%[
|
||||
% {start_time, {{2017,11,19},{15,16,0}},
|
||||
% {frequency, 5},
|
||||
% {unit, 0},
|
||||
% {run_time, 5 * 60}
|
||||
% {count, 100}
|
||||
%]
|
||||
parse_config(Config) ->
|
||||
case to_int(proplists:get_value(<<"frequency">>, Config)) of
|
||||
undefined ->
|
||||
{error, frequency_is_invalid};
|
||||
Frequency ->
|
||||
NewFrequency = case to_int(proplists:get_value(<<"unit">>, Config)) of
|
||||
0 ->
|
||||
Frequency * 60 * 1000; % 分
|
||||
1 ->
|
||||
Frequency * 60 * 60 * 1000; % 时
|
||||
2 ->
|
||||
Frequency * 60 * 60 * 24 * 1000; % 天
|
||||
3 ->
|
||||
Frequency * 1000; % 秒
|
||||
_ ->
|
||||
Frequency % 毫秒
|
||||
end,
|
||||
StartTime = dgiot_datetime:localtime_to_unixtime(proplists:get_value(<<"start_time">>, Config, calendar:local_time())),
|
||||
EndTime1 = case proplists:get_value(<<"count">>, Config) of
|
||||
undefined ->
|
||||
undefined;
|
||||
Count ->
|
||||
StartTime + (Count - 1) * NewFrequency div 1000
|
||||
end,
|
||||
EndTime2 = case proplists:get_value(<<"run_time">>, Config) of
|
||||
undefined -> undefined;
|
||||
V -> StartTime + V
|
||||
end,
|
||||
EndTime = min(EndTime1, EndTime2),
|
||||
{ok, {StartTime, NewFrequency , EndTime}}
|
||||
end.
|
||||
|
||||
|
||||
do_task(State = #{ <<"name">> := TaskName, <<"callback">> := Callback, <<"tref">> := {_Now, _} }) ->
|
||||
% io:format("Task:~p, Now:~p~n", [TaskName, dgiot_datetime:unixtime_to_localtime(Now)]),
|
||||
case catch(execute(Callback)) of
|
||||
{ok, _Time} ->
|
||||
State;
|
||||
{_, Reason} ->
|
||||
?LOG(error,"Task[~p] execute error why:~p~n", [TaskName, Reason]),
|
||||
State
|
||||
end.
|
||||
|
||||
execute({M,F, A}) -> apply(M, F, A);
|
||||
execute(Callback) -> Callback().
|
||||
|
||||
waitting(StartTime, Frequency) ->
|
||||
Now = dgiot_datetime:nowstamp(),
|
||||
if
|
||||
StartTime >= Now ->
|
||||
(StartTime - Now) * 1000;
|
||||
true ->
|
||||
Frequency - (Now - StartTime) * 1000 rem Frequency
|
||||
end.
|
||||
|
||||
|
||||
|
@ -16,7 +16,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
namespace Helloworld;
|
||||
namespace dlink;
|
||||
|
||||
/**
|
||||
* The greeting service definition.
|
||||
@ -34,16 +34,16 @@ class GreeterClient extends \Grpc\BaseStub {
|
||||
|
||||
/**
|
||||
* Sends a greeting
|
||||
* @param \Helloworld\HelloRequest $argument input argument
|
||||
* @param \dlink\HelloRequest $argument input argument
|
||||
* @param array $metadata metadata
|
||||
* @param array $options call options
|
||||
* @return \Grpc\UnaryCall
|
||||
*/
|
||||
public function SayHello(\Helloworld\HelloRequest $argument,
|
||||
public function SayHello(\dlink\HelloRequest $argument,
|
||||
$metadata = [], $options = []) {
|
||||
return $this->_simpleRequest('/helloworld.Greeter/SayHello',
|
||||
return $this->_simpleRequest('/dlink.Greeter/SayHello',
|
||||
$argument,
|
||||
['\Helloworld\HelloReply', 'decode'],
|
||||
['\dlink\HelloReply', 'decode'],
|
||||
$metadata, $options);
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
namespace Helloworld;
|
||||
namespace dlink;
|
||||
|
||||
/**
|
||||
* The greeting service definition.
|
||||
@ -31,9 +31,9 @@ class GreeterStub {
|
||||
* initial metadata (if any) and status (if not ok) should be set to $context
|
||||
*/
|
||||
public function SayHello(
|
||||
\Helloworld\HelloRequest $request,
|
||||
\dlink\HelloRequest $request,
|
||||
\Grpc\ServerContext $context
|
||||
): ?\Helloworld\HelloReply {
|
||||
): ?\dlink\HelloReply {
|
||||
$context->setStatus(\Grpc\Status::unimplemented());
|
||||
return null;
|
||||
}
|
||||
@ -46,10 +46,10 @@ class GreeterStub {
|
||||
public final function getMethodDescriptors(): array
|
||||
{
|
||||
return [
|
||||
'/helloworld.Greeter/SayHello' => new \Grpc\MethodDescriptor(
|
||||
'/dlink.Greeter/SayHello' => new \Grpc\MethodDescriptor(
|
||||
$this,
|
||||
'SayHello',
|
||||
'\Helloworld\HelloRequest',
|
||||
'\dlink\HelloRequest',
|
||||
\Grpc\MethodDescriptor::UNARY_CALL
|
||||
),
|
||||
];
|
||||
|
@ -1,6 +1,6 @@
|
||||
<?php
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: helloworld.proto
|
||||
# source: dlink.proto
|
||||
|
||||
namespace Helloworld;
|
||||
|
||||
@ -11,7 +11,7 @@ use Google\Protobuf\Internal\GPBUtil;
|
||||
/**
|
||||
* The response message containing the greetings
|
||||
*
|
||||
* Generated from protobuf message <code>helloworld.HelloReply</code>
|
||||
* Generated from protobuf message <code>dlink.HelloReply</code>
|
||||
*/
|
||||
class HelloReply extends \Google\Protobuf\Internal\Message
|
||||
{
|
||||
|
Binary file not shown.
Binary file not shown.
@ -23,6 +23,7 @@ import dlink_pb2_grpc
|
||||
class Dlink(dlink_pb2_grpc.DlinkServicer):
|
||||
|
||||
def SayHello(self, request, context):
|
||||
print( request.name)
|
||||
return dlink_pb2.HelloReply(message='Hello, %s!' % request.name)
|
||||
|
||||
|
||||
|
@ -20,7 +20,8 @@
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
start() ->
|
||||
Services = #{protos => [dgiot_dlink_pb],
|
||||
Services = #{
|
||||
protos => [dgiot_dlink_pb],
|
||||
services => #{'dgiot.Dlink' => dgiot_dlink_server}
|
||||
},
|
||||
{ok, _} = grpc:start_server(server, 30051, Services, []).
|
||||
@ -28,14 +29,15 @@ start() ->
|
||||
stop() ->
|
||||
_ = grpc:stop_server(server).
|
||||
|
||||
%%https://grpc.io/docs/languages/php/basics/
|
||||
login() ->
|
||||
SvrAddr = "http://127.0.0.1:30051",
|
||||
SvrAddr = "http://127.0.0.1:30051",
|
||||
{ok, _} = grpc_client_sup:create_channel_pool(channel, SvrAddr, #{}).
|
||||
|
||||
logout() ->
|
||||
_ = grpc_client_sup:stop_channel_pool(channel).
|
||||
|
||||
send() ->
|
||||
Result = dgiot_dlink_client:say_hello(#{name => <<"Xiao Ming">>}, #{channel => channel}),
|
||||
io:format("Result ~p ~n",[Result]).
|
||||
Result = dgiot_dlink_client:say_hello(#{name => <<"Xiao Ming">>}, #{channel => channel}),
|
||||
io:format("Result ~p ~n", [Result]).
|
||||
|
||||
|
@ -27,6 +27,7 @@
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
|
||||
dgiot_http_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
|
@ -1,274 +0,0 @@
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(dgiot_instruct).
|
||||
-author("jonhl").
|
||||
-include("dgiot_task.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
|
||||
%% API
|
||||
-export([
|
||||
createsub/6,
|
||||
create/6,
|
||||
create_group/7,
|
||||
get_instruct/4,
|
||||
get_child_instruct/3
|
||||
]).
|
||||
|
||||
createsub(ProductId, DeviceId, DtuAddr, ACL, Rotation, #{<<"parentDtu">> := ParentDtu}) ->
|
||||
lists:map(fun(X) ->
|
||||
#{
|
||||
<<"route">> := #{DtuAddr := Pn},
|
||||
<<"ACL">> := ACL,
|
||||
<<"product">> := #{<<"thing">> := Thing}
|
||||
} = X,
|
||||
NewPn = <<DtuAddr/binary, "/", Pn/binary>>,
|
||||
create(ProductId, DeviceId, NewPn, ACL, Rotation, Thing#{<<"parentDtu">> => ParentDtu})
|
||||
end, dgiot_device:get_sub_device(DtuAddr)),
|
||||
ok.
|
||||
|
||||
create(ProductId, DeviceId, Pn, ACL, Rotation, #{<<"properties">> := Props}) ->
|
||||
lists:map(fun(X) ->
|
||||
case X of
|
||||
#{<<"dataForm">> := #{<<"strategy">> := <<"计算值"/utf8>>}} ->
|
||||
pass;
|
||||
#{<<"dataForm">> := #{<<"strategy">> := <<"主动上报"/utf8>>}} ->
|
||||
pass;
|
||||
#{<<"accessMode">> := Op, <<"dataForm">> := #{<<"address">> := Di, <<"order">> := Order} = DataForm,
|
||||
<<"name">> := Name, <<"identifier">> := Identifier, <<"required">> := Enable} ->
|
||||
case Di of
|
||||
<<"">> -> pass;
|
||||
_ ->
|
||||
ObjectId = dgiot_parse_id:get_instructid(DeviceId, Pn, Di),
|
||||
case dgiot_parse:get_object(<<"Instruct">>, ObjectId) of
|
||||
{ok, _} ->
|
||||
pass;
|
||||
_ ->
|
||||
Map = #{<<"ACL">> => ACL, <<"enable">> => Enable,
|
||||
<<"product">> => #{
|
||||
<<"__type">> => <<"Pointer">>,
|
||||
<<"className">> => <<"Product">>,
|
||||
<<"objectId">> => ProductId
|
||||
},
|
||||
<<"device">> => #{
|
||||
<<"__type">> => <<"Pointer">>,
|
||||
<<"className">> => <<"Device">>,
|
||||
<<"objectId">> => DeviceId
|
||||
},
|
||||
<<"name">> => Name, <<"order">> => dgiot_utils:to_binary(Order),
|
||||
<<"pn">> => Pn, <<"di">> => Di,
|
||||
<<"op">> => Op, <<"interval">> => 20,
|
||||
<<"duration">> => 5, <<"rotation">> => Rotation,
|
||||
<<"other">> => DataForm#{<<"identifier">> => Identifier}
|
||||
},
|
||||
dgiot_parse:create_object(<<"Instruct">>, Map)
|
||||
end
|
||||
end;
|
||||
Other ->
|
||||
?LOG(info, "Other ~p", [Other]),
|
||||
pass
|
||||
end
|
||||
end, Props).
|
||||
|
||||
create_group(ProductId, DeviceId, Group, Pn, ACL, Rotation, #{<<"properties">> := Props} = Thing) ->
|
||||
lists:map(fun(X) ->
|
||||
#{
|
||||
<<"accessMode">> := Op,
|
||||
<<"dataForm">> := #{
|
||||
<<"address">> := Di
|
||||
},
|
||||
<<"name">> := Name,
|
||||
<<"identifier">> := Identifier,
|
||||
<<"required">> := Enable
|
||||
} = X,
|
||||
case Di of
|
||||
<<"">> -> pass;
|
||||
_ -> case dgiot_parse:query_object(<<"Instruct">>, #{<<"where">> => #{
|
||||
<<"product">> => ProductId,
|
||||
<<"device">> => DeviceId,
|
||||
<<"pn">> => Pn,
|
||||
<<"di">> => Di}}) of
|
||||
{ok, #{<<"results">> := []}} ->
|
||||
Other = maps:without([<<"properties">>], Thing),
|
||||
Map = #{
|
||||
<<"ACL">> => ACL,
|
||||
<<"enable">> => Enable,
|
||||
<<"product">> => #{
|
||||
<<"__type">> => <<"Pointer">>,
|
||||
<<"className">> => <<"Product">>,
|
||||
<<"objectId">> => ProductId
|
||||
},
|
||||
<<"device">> => #{
|
||||
<<"__type">> => <<"Pointer">>,
|
||||
<<"className">> => <<"Device">>,
|
||||
<<"objectId">> => DeviceId
|
||||
},
|
||||
<<"name">> => Name,
|
||||
<<"order">> => Group,
|
||||
<<"pn">> => Pn,
|
||||
<<"di">> => Di,
|
||||
<<"op">> => Op,
|
||||
<<"interval">> => 30,
|
||||
<<"duration">> => 5,
|
||||
<<"rotation">> => Rotation,
|
||||
<<"other">> => Other#{<<"identifier">> => Identifier}
|
||||
},
|
||||
dgiot_parse:create_object(<<"Instruct">>, Map);
|
||||
_ ->
|
||||
pass
|
||||
end
|
||||
end
|
||||
end, Props).
|
||||
|
||||
init_que(DeviceId, Round) ->
|
||||
case dgiot_parse:query_object(<<"Instruct">>, #{<<"order">> => <<"-order">>, <<"where">> => #{<<"device">> => DeviceId}}) of
|
||||
{ok, #{<<"results">> := []}} ->
|
||||
[];
|
||||
{ok, #{<<"results">> := List}} ->
|
||||
NewList = lists:foldl(
|
||||
fun(X, Acc) ->
|
||||
case X of
|
||||
#{<<"enable">> := true, <<"op">> := Op, <<"order">> := Order, <<"pn">> := Pn, <<"di">> := Di,
|
||||
<<"interval">> := Interval, <<"other">> := DataForm} ->
|
||||
Identifier = maps:get(<<"accessMode">>, DataForm, <<"">>),
|
||||
AccessMode = maps:get(<<"accessMode">>, DataForm, Op),
|
||||
Address = maps:get(<<"address">>, DataForm, Di),
|
||||
Protocol = maps:get(<<"protocol">>, DataForm, <<"">>),
|
||||
ThingRound = maps:get(<<"round">>, DataForm, <<"all">>),
|
||||
InstructOrder = maps:get(<<"order">>, DataForm, Order),
|
||||
Data = maps:get(<<"data">>, DataForm, <<"null">>),
|
||||
Control = maps:get(<<"control">>, DataForm, "%d"),
|
||||
NewData = dgiot_task:get_control(Round, Data, Control),
|
||||
Strategy = dgiot_utils:to_int(maps:get(<<"strategy">>, DataForm, Interval)),
|
||||
case ThingRound of
|
||||
<<"all">> ->
|
||||
Acc ++ [{Order, {InstructOrder, Strategy, Identifier, Pn, Address, AccessMode, NewData, Protocol, ThingRound}}];
|
||||
Round ->
|
||||
Acc ++ [{Order, {InstructOrder, Strategy, Identifier, Pn, Address, AccessMode, NewData, Protocol, ThingRound}}];
|
||||
RoundList ->
|
||||
case lists:member(Round, RoundList) of
|
||||
true ->
|
||||
Acc ++ [{Order, {InstructOrder, Strategy, Identifier, Pn, Address, AccessMode, NewData, Protocol, ThingRound}}];
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end;
|
||||
_ -> Acc
|
||||
end
|
||||
end, [], List),
|
||||
lists:foldl(fun(X, Acc1) ->
|
||||
{_, Y} = X,
|
||||
Acc1 ++ [Y]
|
||||
end, [], lists:keysort(1, NewList));
|
||||
_ -> []
|
||||
end.
|
||||
|
||||
get_instruct(ProductId, _DeviceId, Round, thing) ->
|
||||
get_instruct(ProductId, Round);
|
||||
|
||||
get_instruct(ProductId, DeviceId, Round, instruct) ->
|
||||
get_que(ProductId, DeviceId, Round).
|
||||
|
||||
get_instruct(ProductId, Round) ->
|
||||
case dgiot_product:lookup_prod(ProductId) of
|
||||
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} when length(Props) > 0 ->
|
||||
{_, NewList} = lists:foldl(fun(X, Acc) ->
|
||||
{Order, List} = Acc,
|
||||
case X of
|
||||
#{<<"dataForm">> := #{<<"strategy">> := <<"计算值"/utf8>>}} ->
|
||||
Acc;
|
||||
#{<<"dataForm">> := #{<<"strategy">> := <<"主动上报"/utf8>>}} ->
|
||||
Acc;
|
||||
#{<<"accessMode">> := AccessMode,
|
||||
<<"identifier">> := Identifier,
|
||||
<<"dataType">> := #{<<"specs">> := #{<<"min">> := Min}},
|
||||
<<"dataForm">> := DataForm,
|
||||
<<"dataSource">> := DataSource} ->
|
||||
Protocol = maps:get(<<"protocol">>, DataForm, <<"">>),
|
||||
ThingRound = maps:get(<<"round">>, DataForm, <<"all">>),
|
||||
InstructOrder = maps:get(<<"order">>, DataForm, Order),
|
||||
Control = maps:get(<<"control">>, DataForm, "%d"),
|
||||
NewData = dgiot_task:get_control(Round, Min, Control),
|
||||
Strategy = dgiot_utils:to_int(maps:get(<<"strategy">>, DataForm, 20)),
|
||||
BinRound = dgiot_utils:to_binary(Round),
|
||||
case ThingRound of
|
||||
<<"all">> ->
|
||||
{Order + 1, List ++ [{InstructOrder, Strategy, Identifier, AccessMode, NewData, Protocol, DataSource, ThingRound}]};
|
||||
BinRound ->
|
||||
{Order + 1, List ++ [{InstructOrder, Strategy, Identifier, AccessMode, NewData, Protocol, DataSource, ThingRound}]};
|
||||
Rounds ->
|
||||
RoundList = binary:split(Rounds, <<",">>, [global]),
|
||||
case lists:member(BinRound, RoundList) of
|
||||
true ->
|
||||
{Order + 1, List ++ [{InstructOrder, Strategy, Identifier, AccessMode, NewData, Protocol, DataSource, ThingRound}]};
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, {1, []}, Props),
|
||||
lists:keysort(1, NewList);
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
get_child_instruct(DeviceId, Round, thing) ->
|
||||
case dgiot_parse:query_object(<<"Device">>, #{<<"where">> => #{<<"parentId">> => DeviceId}}) of
|
||||
{ok, #{<<"results">> := ChildDevices}} ->
|
||||
lists:foldl(fun(#{<<"product">> := #{<<"objectId">> := ProductId}}, Acc) ->
|
||||
Acc ++ dgiot_instruct:get_instruct(ProductId, DeviceId, Round, thing)
|
||||
end, [], ChildDevices);
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
get_que(_ProductId, DeviceId, Round) ->
|
||||
case dgiot_data:get({instuct, DeviceId}) of
|
||||
not_find ->
|
||||
Que = init_que(DeviceId, Round),
|
||||
dgiot_data:insert({instuct, DeviceId}, Que),
|
||||
Que;
|
||||
Que ->
|
||||
NewQue = get_que_(Que, Round),
|
||||
dgiot_data:insert({instuct, DeviceId}, NewQue),
|
||||
NewQue
|
||||
end.
|
||||
|
||||
get_que_(Que, Round) ->
|
||||
lists:foldl(fun(X, Acc) ->
|
||||
case X of
|
||||
{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound} ->
|
||||
case ThingRound of
|
||||
<<"all">> ->
|
||||
Acc ++ [{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound}];
|
||||
Round ->
|
||||
Acc ++ [{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound}];
|
||||
RoundList ->
|
||||
case lists:member(Round, RoundList) of
|
||||
true ->
|
||||
Acc ++ [{InstructOrder, Strategy, Identifier, Address, AccessMode, NewData, Protocol, ThingRound}];
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end;
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, [], Que).
|
||||
|
||||
|
@ -1,68 +0,0 @@
|
||||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(task_loader).
|
||||
-author("johnliu").
|
||||
-include("dgiot_task.hrl").
|
||||
-include_lib("dgiot/include/logger.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
%% API
|
||||
-export([start_link/0
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {}).
|
||||
|
||||
|
||||
-define(PAGE_SIZE, 100).
|
||||
-define(PAGE_INDEX, 1).
|
||||
-define(MAX, 0).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
erlang:send_after(1000 * 5, self(), load),
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(_Request, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({deliver, _Topic, Msg}, State) ->
|
||||
_Payload = binary_to_term(dgiot_mqtt:get_payload(Msg)),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
15
out/production/dgiot/emqx.app
Normal file
15
out/production/dgiot/emqx.app
Normal file
@ -0,0 +1,15 @@
|
||||
{application, emqx,
|
||||
[{id, "emqx"},
|
||||
{description, "EMQ X"},
|
||||
{vsn, "4.3.11"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
|
||||
{mod, {emqx_app,[]}},
|
||||
{env, []},
|
||||
{licenses, ["Apache-2.0"]},
|
||||
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
|
||||
{links, [{"Homepage", "https://emqx.io/"},
|
||||
{"Github", "https://github.com/emqx/emqx"}
|
||||
]}
|
||||
]}.
|
15
out/test/dgiot/emqx.app
Normal file
15
out/test/dgiot/emqx.app
Normal file
@ -0,0 +1,15 @@
|
||||
{application, emqx,
|
||||
[{id, "emqx"},
|
||||
{description, "EMQ X"},
|
||||
{vsn, "4.3.11"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
|
||||
{mod, {emqx_app,[]}},
|
||||
{env, []},
|
||||
{licenses, ["Apache-2.0"]},
|
||||
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
|
||||
{links, [{"Homepage", "https://emqx.io/"},
|
||||
{"Github", "https://github.com/emqx/emqx"}
|
||||
]}
|
||||
]}.
|
Loading…
Reference in New Issue
Block a user