Merge branch 'master' of github.com:dgiot/dgiot

This commit is contained in:
lsxredrain 2021-08-20 19:21:49 +08:00
commit a27b5c7960
15 changed files with 418 additions and 150 deletions

View File

@ -413,6 +413,90 @@
}
},
"/thing": {
"post": {
"summary": "添加物模型",
"description": "添加物模型",
"parameters": [
{
"in": "body",
"name": "data",
"description": "关联信息",
"required": true,
"schema": {
"type": "object",
"properties": {
"productid": {
"description": "产品标识",
"type": "string",
"example": "ed09e37bbb"
},
"item": {
"description": "物模型单项指标",
"type": "object",
"example": {
"name": "电压",
"index": 11,
"isshow": true,
"dataForm": {
"data": "1",
"rate": 1,
"order": 26,
"round": "all",
"offset": 0,
"address": "0X0107",
"control": "%d",
"iscount": "0",
"slaveid": "0X01",
"protocol": "modbus",
"strategy": "20",
"collection": "%s*0.1",
"countround": "all",
"operatetype": "readHregs",
"originaltype": "short16_AB",
"countstrategy": 20,
"countcollection": "%s"
},
"dataType": {
"type": "float",
"specs": {
"max": 9999,
"min": 0,
"step": 0,
"unit": "V",
"precision": 2
}
},
"required": true,
"accessMode": "r",
"devicetype": "控制器",
"identifier": "voltage"
}
}
}
}
}
],
"responses": {
"200": {
"description": "Returns success"
},
"400": {
"description": "Bad Request"
},
"401": {
"description": "Unauthorized"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"tags": [
"Product"
]
},
"put": {
"summary": "修改物模型",
"description": "修改物模型",
@ -428,36 +512,132 @@
"productid": {
"description": "产品标识",
"type": "string",
"example": "9c5930e565"
"example": "ed09e37bbb"
},
"item": {
"description": "物模型单项指标",
"type": "object",
"example": {
"name": "出口压力",
"name": "电压",
"index": 11,
"isshow": true,
"dataForm": {
"data": "1",
"rate": 1,
"order": 26,
"round": "all",
"offset": 0,
"address": "0x0001",
"slaveid": 1,
"address": "0X0107",
"control": "%d",
"iscount": "0",
"slaveid": "0X01",
"protocol": "modbus",
"quantity": 2,
"byteorder": "big",
"operatetype": "holdingRegister",
"originaltype": "int16"
"strategy": "20",
"collection": "%s*0.1",
"countround": "all",
"operatetype": "readHregs",
"originaltype": "short16_AB",
"countstrategy": 20,
"countcollection": "%s"
},
"dataType": {
"type": "float",
"specs": {
"max": 500,
"min": -500,
"step": 0.1,
"unit": "MPa"
"max": 9999,
"min": 0,
"step": 0,
"unit": "V",
"precision": 2
}
},
"required": true,
"accessMode": "r",
"identifier": "pressure_out"
"devicetype": "控制器",
"identifier": "voltage"
}
}
}
}
}
],
"responses": {
"200": {
"description": "Returns success"
},
"400": {
"description": "Bad Request"
},
"401": {
"description": "Unauthorized"
},
"403": {
"description": "Forbidden"
},
"500": {
"description": "Server Internal error"
}
},
"tags": [
"Product"
]
},
"delete": {
"summary": "删除物模型",
"description": "删除物模型",
"parameters": [
{
"in": "body",
"name": "data",
"description": "关联信息",
"required": true,
"schema": {
"type": "object",
"properties": {
"productid": {
"description": "产品标识",
"type": "string",
"example": "ed09e37bbb"
},
"item": {
"description": "物模型单项指标",
"type": "object",
"example": {
"name": "电压",
"index": 11,
"isshow": true,
"dataForm": {
"data": "1",
"rate": 1,
"order": 26,
"round": "all",
"offset": 0,
"address": "0X0107",
"control": "%d",
"iscount": "0",
"slaveid": "0X01",
"protocol": "modbus",
"strategy": "20",
"collection": "%s*0.1",
"countround": "all",
"operatetype": "readHregs",
"originaltype": "short16_AB",
"countstrategy": 20,
"countcollection": "%s"
},
"dataType": {
"type": "float",
"specs": {
"max": 9999,
"min": 0,
"step": 0,
"unit": "V",
"precision": 2
}
},
"required": true,
"accessMode": "r",
"devicetype": "控制器",
"identifier": "voltage"
}
}
}

View File

@ -52,23 +52,23 @@ handle(OperationID, Args, Context, Req) ->
Headers = #{},
case catch do_request(OperationID, Args, Context, Req) of
{ErrType, Reason} when ErrType == 'EXIT'; ErrType == error ->
?LOG(info, "do request: ~p, ~p, ~p~n", [OperationID, Args, Reason]),
?LOG(debug, "do request: ~p, ~p, ~p~n", [OperationID, Args, Reason]),
Err = case is_binary(Reason) of
true -> Reason;
false -> dgiot_utils:format("~p", [Reason])
end,
{500, Headers, #{<<"error">> => Err}};
ok ->
?LOG(error, "do request: ~p, ~p ->ok ~n", [OperationID, Args]),
?LOG(debug, "do request: ~p, ~p ->ok ~n", [OperationID, Args]),
{200, Headers, #{}, Req};
{ok, Res} ->
?LOG(error, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{200, Headers, Res, Req};
{Status, Res} ->
?LOG(error, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{Status, Headers, Res, Req};
{Status, NewHeaders, Res} ->
?LOG(error, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]),
{Status, maps:merge(Headers, NewHeaders), Res, Req}
end.
@ -162,8 +162,39 @@ do_request(post_graphql, Body, #{<<"sessionToken">> := SessionToken} = _Context,
Other -> Other
end;
%% Thing : :json文件导库
%% OperationId:post_product
%% Thing : :
%% OperationId:post_thing
%% :PUT /iotapi/post_thing
do_request(post_thing, #{<<"productid">> := ProductId, <<"item">> := Item} = _Body,
#{<<"sessionToken">> := SessionToken} = _Context, _Req) ->
#{<<"identifier">> := Identifier} = Item,
case dgiot_parse:get_object(<<"Product">>, ProductId, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"thing">> := Thing}} ->
Properties = maps:get(<<"properties">>, Thing, []),
NewThing =
lists:foldl(fun(X, _Acc) ->
case X of
#{<<"identifier">> := Identifier} -> <<>>;
_ -> Item
end
end, <<>>, Properties),
case NewThing of
<<>> ->
{error, #{<<"code">> => 500, <<"msg">> => <<Identifier/binary, " already existed">>}};
X ->
NewProperties = Properties ++ [X],
dgiot_parse:update_object(<<"Product">>, ProductId,
#{<<"thing">> => Thing#{<<"properties">> => NewProperties}},
[{"X-Parse-Session-Token", SessionToken}], [{from, rest}])
end;
Error ->
{error, Error}
end;
%% Thing : :
%% OperationId:put_thing
%% :PUT /iotapi/put_thing
do_request(put_thing, #{<<"productid">> := ProductId, <<"item">> := Item} = _Body,
#{<<"sessionToken">> := SessionToken} = _Context, _Req) ->
@ -171,17 +202,63 @@ do_request(put_thing, #{<<"productid">> := ProductId, <<"item">> := Item} = _Bod
case dgiot_parse:get_object(<<"Product">>, ProductId, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"thing">> := Thing}} ->
#{<<"properties">> := Properties} = Thing,
NewProperties = lists:foldl(fun(X, Acc) ->
case X of
#{<<"identifier">> := Identifier} -> Acc ++ [Item];
_ -> Acc ++ [X]
end
end, [], Properties),
NewProperties =
lists:foldl(fun(X, Acc) ->
case X of
#{<<"identifier">> := Identifier} ->
Acc ++ [Item];
_ ->
Acc ++ [X]
end
end, [], Properties),
dgiot_parse:update_object(<<"Product">>, ProductId,
#{<<"thing">> => Thing#{<<"properties">> => NewProperties}},
[{"X-Parse-Session-Token", SessionToken}], [{from, rest}]);
Error ->
Error
{error, Error}
end;
%% Thing : :
%% OperationId:put_thing
%% :PUT /iotapi/put_thing
do_request(delete_thing, #{<<"productid">> := ProductId, <<"item">> := Item} = _Body,
#{<<"sessionToken">> := SessionToken} = _Context, _Req) ->
#{<<"identifier">> := Identifier} = Item,
case dgiot_parse:get_object(<<"Product">>, ProductId, [{"X-Parse-Session-Token", SessionToken}], [{from, rest}]) of
{ok, #{<<"thing">> := Thing}} ->
#{<<"properties">> := Properties} = Thing,
{Ids, NewProperties} =
lists:foldl(fun(X, {Ids1, Acc}) ->
case X of
#{<<"identifier">> := Identifier} ->
{Ids1, Acc};
#{<<"identifier">> := Identifier1, <<"dataForm">> := #{<<"collection">> := Collection}} ->
case binary:match(Collection, [Identifier]) of
nomatch ->
{Ids1, Acc ++ [X]};
_ ->
case Ids1 of
[] ->
{Ids1 ++ [Identifier1], Acc};
_ ->
{Ids1 ++ [<<",", Identifier1/binary>>], Acc}
end
end;
_ ->
{Ids1, Acc}
end
end, {[], []}, Properties),
case length(Ids) == 0 of
true ->
dgiot_parse:update_object(<<"Product">>, ProductId,
#{<<"thing">> => Thing#{<<"properties">> => NewProperties}},
[{"X-Parse-Session-Token", SessionToken}], [{from, rest}]);
false ->
BinIds = dgiot_utils:to_binary(Ids),
{error, #{<<"code">> => 500, <<"msg">> => <<BinIds/binary, " use ", Identifier/binary>>}}
end;
Error ->
{error, Error}
end;

View File

@ -168,7 +168,7 @@ on_action_dgiot(Selected, #{event := Event} = Envs) ->
Msg = dgiot_mqtt:get_message(Selected, Envs),
case Event of
'message.publish' ->
?LOG(info, "Msg ~p", [Msg]),
%% ?LOG(info, "Msg ~p", [Msg]),
post_rule(Msg),
case dgiot_channelx:do_message(ChannelId, {rule, Msg, Selected}) of
not_find -> dgiot_mqtt:republish(Selected, Envs);
@ -179,13 +179,10 @@ on_action_dgiot(Selected, #{event := Event} = Envs) ->
end.
%% SELECT payload, payload.dump_energy as dump_energy, clientid, 'productid' as productid FROM "notification/c1e44b39f0/868615051803274/#" WHERE dump_energy < 90
post_rule(#{metadata := #{rule_id := <<"rule:Notification_", Ruleid/binary>>}, clientid := DevAddr, payload := Payload, topic := _Topic} = Msg) ->
?LOG(info, "Msg ~p", [Msg]),
post_rule(#{metadata := #{rule_id := <<"rule:Notification_", Ruleid/binary>>}, clientid := DevAddr, payload := Payload, topic := _Topic}) ->
%% ?LOG(info, "Msg ~p", [Msg]),
NewPayload = jsx:decode(Payload, [{labels, binary}, return_maps]),
dgiot_umeng:add_notification(Ruleid, DevAddr, NewPayload);
post_rule(Msg) ->
?LOG(error, "Msg ~p", [Msg]).

View File

@ -100,21 +100,22 @@ save(ProductId, DevAddr, _Data) ->
sync_parse(OffLine) ->
Fun = fun(X) ->
?LOG(debug, "X ~p", [X]),
{_, DeviceId, V} = X,
Now = dgiot_datetime:now_secs(),
case V of
{[true, Last, _], Node} when (Now - Last) > (OffLine * 1000) ->
{[true, Last, Acl], Node} when (Now - Last) > OffLine ->
case dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"status">> => <<"OFFLINE">>}) of
{ok, _R} ->
dgiot_mnesia:insert(DeviceId, {[false, Last], Node});
dgiot_mnesia:insert(DeviceId, {[false, Last, Acl], Node});
_ ->
pass
end,
timer:sleep(50);
{[false, Last, _], Node} when (Now - Last) < (OffLine * 1000) ->
{[false, Last, Acl], Node} when (Now - Last) < OffLine ->
case dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"status">> => <<"ONLINE">>}) of
{ok, _R} ->
dgiot_mnesia:insert(DeviceId, {[true, Last], Node});
dgiot_mnesia:insert(DeviceId, {[true, Last, Acl], Node});
_ ->
pass
end,
@ -340,7 +341,7 @@ get_online(DeviceId) ->
false
end.
get_file(ProductId, DevAddr, FileUrl, Ext) ->
get_file(ProductId, DevAddr, FileUrl, Ext) ->
Name = dgiot_datetime:now_microsecs(),
FileName = dgiot_utils:to_list(Name) ++ "." ++ Ext,
case ibrowse:send_req(FileUrl, [], get) of
@ -383,7 +384,7 @@ get_file(ProductId, DevAddr, FileUrl, Ext) ->
{ok, {{"HTTP/1.1", 200, "OK"}, _, Json}} ->
case jsx:decode(dgiot_utils:to_binary(Json), [{labels, binary}, return_maps]) of
#{<<"md5">> := _Md5} ->
{ok,Name};
{ok, Name};
Error1 ->
Error1
end;

View File

@ -148,13 +148,14 @@ load() ->
%% key, % [ProductId], [ID]
%% product % ,map类型
%%}).
save_prod(ProductId, Product) ->
case dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product) of
true ->
ok;
{error, Reason} ->
{error, Reason}
end.
save_prod(ProductId, #{<<"thing">> := _thing} = Product) ->
dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product),
?LOG(debug, "product ~p", [Product]),
{ok, Product};
save_prod(_ProductId, _Product) ->
?LOG(error, "product error ~p", [_Product]),
pass.
lookup_prod(ProductId) ->
case dgiot_data:get(?DGIOT_PRODUCT, ProductId) of
@ -164,12 +165,16 @@ lookup_prod(ProductId) ->
{ok, Value}
end.
save(Product) ->
save(#{<<"thing">> := _thing} = Product) ->
Product1 = format_product(Product),
#{<<"productId">> := ProductId} = Product1,
dgiot_data:insert(?DGIOT_PRODUCT, ProductId, Product1),
?LOG(debug, "product ~p", [Product1]),
{ok, Product1}.
{ok, Product1};
save(_Product) ->
?LOG(error, "product error ~p", [_Product]),
pass.
local(ProductId) ->
case dgiot_data:lookup(?DGIOT_PRODUCT, ProductId) of
@ -183,7 +188,7 @@ delete(ProductId) ->
dgiot_data:delete(?DGIOT_PRODUCT, ProductId).
get(ProductId) ->
Keys = [<<"nodeType">>, <<"objectId">>, <<"thing">>, <<"dynamicReg">>, <<"topics">>, <<"ACL">>],
Keys = [<<"nodeType">>, <<"dynamicReg">>, <<"topics">>],
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, Product} ->
{ok, maps:with(Keys, Product)};
@ -378,3 +383,4 @@ parse_frame(ProductId, Bin, Opts) ->
to_frame(ProductId, Msg) ->
apply(binary_to_atom(ProductId, utf8), to_frame, [Msg]).

View File

@ -199,7 +199,7 @@ add_notification(<<"stop_", Ruleid/binary>>, DevAddr, Payload) ->
case dgiot_data:get(?NOTIFICATION, {DeviceId, Ruleid}) of
{start, _Time} ->
save_notification(Ruleid, DevAddr, Payload#{<<"alertstatus">> => false});
_->
_ ->
pass
end,
dgiot_data:insert(?NOTIFICATION, {DeviceId, Ruleid}, {stop, dgiot_datetime:now_secs()});
@ -323,7 +323,8 @@ sendSubscribe(Type, Content, UserId) ->
maps:fold(fun(Key, Value1, Form) ->
case maps:find(Key, Content) of
{ok, Value} ->
Form#{<<"thing15">> => #{<<"value">> => Value}};
BinValue = dgiot_utils:to_binary(Value),
Form#{<<"thing15">> => #{<<"value">> => BinValue}};
_ ->
Default = maps:get(<<"default">>, Value1, <<>>),
Form#{Key => #{<<"value">> => Default}}

View File

@ -120,7 +120,7 @@ sendSubscribe(UserId, Data) ->
case jsx:decode(Json, [{labels, binary}, return_maps]) of
#{<<"access_token">> := AccessToken, <<"expires_in">> := _ExpiresIn} ->
SubscribeUrl = "https://api.weixin.qq.com/cgi-bin/message/subscribe/send?access_token=" ++ dgiot_utils:to_list(AccessToken),
?LOG(info, "SubscribeUrl ~p", [SubscribeUrl]),
?LOG(debug, "SubscribeUrl ~p", [SubscribeUrl]),
Subscribe = #{<<"touser">> => OpenId,
<<"template_id">> => <<"9Fmc0vtA7vnh_HtoVtXJy_cRDOnIk1ubniO_Oe3WatU">>,
<<"page">> => <<"pages/alarm/alarm">>,
@ -129,7 +129,7 @@ sendSubscribe(UserId, Data) ->
<<"data">> => Data},
Data1 = dgiot_utils:to_list(jsx:encode(Subscribe)),
R = httpc:request(post, {SubscribeUrl, [], "application/x-www-form-urlencoded", Data1}, [{timeout, 5000}, {connect_timeout, 10000}], [{body_format, binary}]),
?LOG(info, "R ~p", [R]);
?LOG(debug, "R ~p", [R]);
_Result ->
{error, <<"not find access_token">>}
end;

View File

@ -216,7 +216,7 @@ parse_frame(<<SlaveId:8, _/binary>> = Buff, Acc, #{<<"dtuaddr">> := DtuAddr, <<"
%rtu modbus
parse_frame(_Other, Acc, _State) ->
?LOG(error, "_Other ~p", [_Other]),
{<<>>, Acc}.
{error, Acc}.
decode_data(Buff, ProductId, DtuAddr, Address, Acc) ->
<<SlaveId:8, FunCode:8, ResponseData/binary>> = Buff,

View File

@ -328,11 +328,9 @@ do_request(Method, Path, Header, Data, Options) ->
{ok, StatusCode, Headers, ResBody} ->
case do_request_after(Method, Path, Data, ResBody, Options) of
{ok, NewResBody} ->
?LOG(info, "Path ~p", [Path]),
save_cache(Method, Path, Data),
{ok, StatusCode, Headers, NewResBody};
ignore ->
?LOG(info, "Path ~p", [Path]),
save_cache(Method, Path, Data),
{ok, StatusCode, Headers, ResBody};
{error, Reason} ->
@ -394,7 +392,6 @@ do_request_hook(Type, [<<"classes">>, Class, ObjectId], Method, Data, Body) ->
do_request_hook(Type, [<<"classes">>, Class], Method, Data, Body) ->
do_hook({Class, Method}, [Type, Data, Body]);
do_request_hook(_Type, _Paths, _Method, _Data, _Body) ->
?LOG(info, "_Paths ~p", [_Paths]),
ignore.
do_hook(Key, Args) ->
case catch dgiot_hook:run_hook(Key, Args) of

View File

@ -191,7 +191,6 @@ get_calculated(ProductId, Ack) ->
%%
get_collection(ProductId, Dis, Payload, Ack) ->
%% ?LOG(info,"Payload ~p", [Payload]),
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
lists:foldl(fun(Identifier, Acc1) ->

View File

@ -60,29 +60,32 @@ stop(#{<<"channel">> := Channel, <<"dtuid">> := DtuId}) ->
%%%===================================================================
init([#{<<"app">> := App, <<"channel">> := ChannelId, <<"dtuid">> := DtuId, <<"mode">> := Mode, <<"freq">> := Freq, <<"end_time">> := Endtime} = _Args]) ->
dgiot_data:insert(?DGIOT_TASK, {ChannelId, DtuId}, self()),
%% Round = dgiot_data:get_consumer(<<"taskround/", ChannelId/binary, "/", DtuId/binary>>, 1),
{ProductId, DevAddr} = dgiot_task:get_pnque(DtuId),
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, 1, dgiot_utils:to_atom(Mode)),
%% ?LOG(info, "Que ~p", [Que]),
Tsendtime = dgiot_datetime:localtime_to_unixtime(dgiot_datetime:to_localtime(Endtime)),
Nowstamp = dgiot_datetime:nowstamp(),
case length(Que) of
0 ->
erlang:send_after(300, self(), stop);
_ ->
case Tsendtime > Nowstamp of
true ->
erlang:send_after(1000, self(), retry);
false ->
erlang:send_after(300, self(), stop)
end
end,
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
dgiot_mqtt:subscribe(Topic),
AppData = maps:get(<<"appdata">>, _Args, #{}),
{ok, #task{mode = dgiot_utils:to_atom(Mode), app = App, dtuid = DtuId, product = ProductId, devaddr = DevAddr,
tid = ChannelId, firstid = DeviceId, que = Que, round = 1, appdata = AppData, ts = Nowstamp, freq = Freq, endtime = Tsendtime}};
case dgiot_task:get_pnque(DtuId) of
not_find ->
?LOG(info, "not_find ~p", [DtuId]),
pass;
{ProductId, DevAddr} ->
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, 1, dgiot_utils:to_atom(Mode)),
Tsendtime = dgiot_datetime:localtime_to_unixtime(dgiot_datetime:to_localtime(Endtime)),
Nowstamp = dgiot_datetime:nowstamp(),
case length(Que) of
0 ->
erlang:send_after(300, self(), stop);
_ ->
case Tsendtime > Nowstamp of
true ->
erlang:send_after(1000, self(), retry);
false ->
erlang:send_after(300, self(), stop)
end
end,
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
dgiot_mqtt:subscribe(Topic),
AppData = maps:get(<<"appdata">>, _Args, #{}),
{ok, #task{mode = dgiot_utils:to_atom(Mode), app = App, dtuid = DtuId, product = ProductId, devaddr = DevAddr,
tid = ChannelId, firstid = DeviceId, que = Que, round = 1, appdata = AppData, ts = Nowstamp, freq = Freq, endtime = Tsendtime}}
end;
init(A) ->
?LOG(error, "A ~p ", [A]).
@ -103,29 +106,34 @@ handle_info({'EXIT', _From, Reason}, State) ->
handle_info(init, #task{dtuid = DtuId, mode = Mode, round = Round, ts = Oldstamp, freq = Freq, endtime = Tsendtime} = State) ->
dgiot_datetime:now_secs(),
{ProductId, DevAddr} = dgiot_task:get_pnque(DtuId),
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
NewRound = Round + 1,
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, NewRound, dgiot_utils:to_atom(Mode)),
Nowstamp = dgiot_datetime:nowstamp(),
Newfreq = Nowstamp - Oldstamp,
case length(Que) of
0 ->
erlang:send_after(300, self(), stop);
_ ->
case Tsendtime > Nowstamp of
true ->
case Newfreq > Freq of
case dgiot_task:get_pnque(DtuId) of
not_find ->
?LOG(info, "not_find ~p", [DtuId]),
{noreply, State};
{ProductId, DevAddr} ->
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
NewRound = Round + 1,
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, NewRound, dgiot_utils:to_atom(Mode)),
Nowstamp = dgiot_datetime:nowstamp(),
Newfreq = Nowstamp - Oldstamp,
case length(Que) of
0 ->
erlang:send_after(300, self(), stop);
_ ->
case Tsendtime > Nowstamp of
true ->
erlang:send_after(1000, self(), retry);
case Newfreq > Freq of
true ->
erlang:send_after(1000, self(), retry);
false ->
erlang:send_after((Freq - Newfreq) * 1000, self(), retry)
end;
false ->
erlang:send_after((Freq - Newfreq) * 1000, self(), retry)
end;
false ->
erlang:send_after(300, self(), stop)
end
end,
{noreply, State#task{product = ProductId, devaddr = DevAddr, round = NewRound, firstid = DeviceId, que = Que, ts = Nowstamp}};
erlang:send_after(300, self(), stop)
end
end,
{noreply, State#task{product = ProductId, devaddr = DevAddr, round = NewRound, firstid = DeviceId, que = Que, ts = Nowstamp}}
end;
%%
handle_info(retry, State) ->
@ -136,7 +144,6 @@ handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = Product
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
dgiot_bridge:send_log(Channel, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg)), ?FILE, ?LINE]),
NewAck = dgiot_task:get_collection(ProductId, Dis, Payload, Ack),
%% ?LOG(info, "NewAck ~p", [NewAck]),
{noreply, get_next_pn(State#task{ack = NewAck})};
%% ACK消息触发抄表指令
@ -144,7 +151,6 @@ handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = Product
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
dgiot_bridge:send_log(Channel, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg)), ?FILE, ?LINE]),
NewAck = dgiot_task:get_collection(ProductId, Dis, Payload, Ack),
%% ?LOG(info, "NewAck ~p", [NewAck]),
{noreply, send_msg(State#task{ack = NewAck})};
handle_info(_Msg, State) ->
@ -231,9 +237,6 @@ save_td(#task{app = _App, tid = Channel, product = ProductId, devaddr = DevAddr,
0 -> pass;
_ ->
Data = dgiot_task:get_calculated(ProductId, Ack),
?LOG(info,"ProductId ~p",[ProductId]),
?LOG(info,"Ack ~p ~t",[Ack]),
?LOG(info,"Data ~p",[Data]),
case length(maps:to_list(Data)) of
0 -> pass;
_ ->

View File

@ -26,7 +26,7 @@
-export([init_ets/0]).
%% API
-export([start/2, transaction/2, run_sql/3, handle_save/1, save_to_cache/2]).
-export([init/3, handle_event/3, handle_message/2, stop/3]).
-export([init/3, handle_event/3, handle_message/2, stop/3, handle_init/1]).
-export([test/1]).
%%
@ -172,31 +172,25 @@ start(ChannelId, #{
%%
init(?TYPE, ChannelId, Config) ->
case dgiot_bridge:get_products(ChannelId) of
{ok, _, ProductIds} ->
NewProducts = lists:foldl(fun(X, Acc) ->
Acc ++ dgiot_tdengine:get_products(X, ChannelId)
end, [], ProductIds),
do_check(ChannelId, dgiot_utils:unique_1(NewProducts), Config),
Opts = [?CACHE(ChannelId), #{
auto_save => application:get_env(dgiot_tdengine, cache_auto_save, 30000),
size => application:get_env(dgiot_tdengine, cache_max_size, 50000),
memory => application:get_env(dgiot_tdengine, cache_max_memory, 102400),
max_time => application:get_env(dgiot_tdengine, cache_max_time, 30),
handle => {?MODULE, handle_save, [ChannelId]}
}],
State = #state{
id = ChannelId,
env = Config,
product = dgiot_utils:unique_1(NewProducts)
},
Specs = [
{dgiot_dcache, {dgiot_dcache, start_link, Opts}, permanent, 5000, worker, [dgiot_dcache]}
],
{ok, State, Specs};
{error, not_find} ->
{stop, not_find_product}
end.
Opts = [?CACHE(ChannelId), #{
auto_save => application:get_env(dgiot_tdengine, cache_auto_save, 30000),
size => application:get_env(dgiot_tdengine, cache_max_size, 50000),
memory => application:get_env(dgiot_tdengine, cache_max_memory, 102400),
max_time => application:get_env(dgiot_tdengine, cache_max_time, 30),
handle => {?MODULE, handle_save, [ChannelId]}
}],
State = #state{
id = ChannelId,
env = Config
},
Specs = [
{dgiot_dcache, {dgiot_dcache, start_link, Opts}, permanent, 5000, worker, [dgiot_dcache]}
],
{ok, State, Specs}.
handle_init(State) ->
erlang:send_after(5000, self(), init),
{ok, State}.
%% ,
handle_event(full, _From, #state{id = Channel}) ->
@ -213,6 +207,19 @@ handle_message({rule, Msg, Context}, State) ->
?LOG(info, "Context ~p", [Context]),
handle_message({data, Msg, Context}, State);
handle_message(init, #state{id = ChannelId, env = Config} = State) ->
case dgiot_bridge:get_products(ChannelId) of
{ok, _, ProductIds} ->
NewProducts = lists:foldl(fun(X, Acc) ->
Acc ++ dgiot_tdengine:get_products(X, ChannelId)
end, [], ProductIds),
do_check(ChannelId, dgiot_utils:unique_1(NewProducts), Config),
{ok, State#state{product = NewProducts}};
{error, not_find} ->
{ok, State}
end;
%%
handle_message({data, Product, DevAddr, Data, Context}, State) ->
case catch do_save([Product, DevAddr, Data, Context], State) of
@ -319,15 +326,16 @@ save_to_tdengine(_, []) -> ok;
save_to_tdengine(Channel, Requests) ->
case dgiot_tdengine:batch(Channel, Requests) of
{ok, _Results} ->
%?LOG(info,"Batch ~p-> ~p~n", [length(Requests), Results]),
?LOG(info, "Batch ~p-> ~p~n", [length(Requests), _Results]),
ok;
{error, Reason} when Reason == timeout; Reason == disconnect ->
save_to_cache(Channel, Requests),
ok;
?LOG(error, "save cache,~p,~p~n", [Requests, Reason]),
%% save_to_cache(Channel, Requests),
pass;
{error, Reason} ->
?LOG(error, "save cache,~p,~p~n", [Requests, Reason]),
save_to_cache(Channel, Requests),
ok
%% save_to_cache(Channel, Requests),
pass
end.
@ -417,7 +425,7 @@ create_table(ChannelId, [ProductId | ProductIds], Config) ->
end
end;
{error, Reason} ->
?LOG(error, "Create Table Error, ~p", [Reason])
?LOG(error, "Create Table Error, ~p ~p", [Reason, ProductId])
end,
create_table(ChannelId, ProductIds, Config).
@ -545,7 +553,6 @@ check_field(Data, #{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> :=
_ ->
Value
end,
?LOG(info, "NewValue ~p", [NewValue]),
case check_validate(NewValue, DataType) of
true ->
NewValue;

View File

@ -22,7 +22,7 @@
-include_lib("dgiot/include/logger.hrl").
%% API
-export([swagger_tdengine/0]).
-export([swagger_tdengine/0, get_props/1]).
-export([handle/4]).
@ -241,7 +241,7 @@ get_prop(ProductId) ->
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
lists:foldl(fun(X, Acc) ->
case X of
#{<<"identifier">> := Identifier, <<"name">> := Name} ->
#{<<"identifier">> := Identifier, <<"name">> := Name, <<"isshow">> := true} ->
Acc#{Identifier => Name};
_ -> Acc
end
@ -270,7 +270,7 @@ get_props(ProductId) ->
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
lists:foldl(fun(X, Acc) ->
case X of
#{<<"name">> := Name} ->
#{<<"name">> := Name, <<"isshow">> := true} ->
Acc#{Name => X};
_ -> Acc
end
@ -303,7 +303,7 @@ get_chart(ProductId, Results, Names, Interval) ->
end, #{}, Line),
Lines ++ [NewLine]
end, [], Results),
?LOG(info, "Rows ~p", [Rows]),
?LOG(debug, "Rows ~p", [Rows]),
ChildRows = lists:foldl(fun(X, Acc1) ->
Date = maps:get(<<"日期"/utf8>>, X),
maps:fold(fun(K1, V1, Acc) ->
@ -315,7 +315,7 @@ get_chart(ProductId, Results, Names, Interval) ->
end
end, Acc1, maps:without([<<"日期"/utf8>>], X))
end, #{}, Rows),
?LOG(info, "ChildRows ~p", [ChildRows]),
?LOG(debug, "ChildRows ~p", [ChildRows]),
Child =
maps:fold(fun(K, V, Acc) ->
Unit =
@ -325,7 +325,7 @@ get_chart(ProductId, Results, Names, Interval) ->
end,
Acc ++ [#{<<"columns">> => [<<"日期"/utf8>>, K], <<"rows">> => V, <<"unit">> => Unit}]
end, [], ChildRows),
?LOG(info, "Child ~p", [Child]),
?LOG(debug, "Child ~p", [Child]),
#{<<"columns">> => Columns, <<"rows">> => Rows, <<"child">> => Child}.
get_app(ProductId, Results) ->
@ -342,7 +342,7 @@ get_app(ProductId, Results) ->
{NewV, Unit, Ico, Devicetype} =
case maps:find(Name, Props) of
error ->
{V, <<"">>, <<"">>};
{V, <<"">>, <<"">>, <<"others">>};
{ok, #{<<"dataType">> := #{<<"type">> := Type, <<"specs">> := Specs}} = Prop} ->
Devicetype1 = maps:get(<<"devicetype">>, Prop, <<"others">>),
case Type of
@ -359,7 +359,7 @@ get_app(ProductId, Results) ->
{V, Unit1, Ico1, Devicetype1}
end;
_ ->
{V, <<"">>, <<"">>, <<"">>}
{V, <<"">>, <<"">>, <<"others">>}
end,
Acc ++ [#{<<"name">> => Name, <<"number">> => NewV, <<"time">> => NewTime, <<"unit">> => Unit, <<"imgurl">> => Ico, <<"devicetype">> => Devicetype}]
end

View File

@ -9,7 +9,7 @@ The `emqx_exproto` extremly enhance the extensibility for EMQ X. It allow using
## Architecture
![EMQ X ExProto Arch](./docs/images/exproto-arch.jpg)
![](./docs/images/exproto-arch.jpg)
## Usage

View File

@ -417,7 +417,7 @@ log.to = both
## this level will be logged.
##
## Default: warning
log.level = warning
log.level = info
## The dir for log files.
##