fix: channel_log

This commit is contained in:
AvantLiu 2021-09-26 15:36:02 +08:00
parent 9fcc72ccd7
commit 1e3705876e
5 changed files with 21 additions and 21 deletions

View File

@ -159,9 +159,9 @@ do_handle(#{<<"channelId">> := ChannelId, <<"action">> := <<"start_logger">>} =
}),
case Filter of
#{<<"devaddr">> := Addr, <<"productId">> := ProductId} ->
dgiot_bridge:send_log(ChannelId, ProductId, Addr, Fmt, Args);
dgiot_bridge:send_log(ChannelId, ProductId, Addr, "Channel[~s] is Running, ProductId:~s, devaddr:~s, Log is ~s", [ChannelId, ProductId, Addr, true]);
#{<<"productId">> := ProductId} ->
dgiot_bridge:send_log(ChannelId, ProductId, Fmt, Args);
dgiot_bridge:send_log(ChannelId, ProductId, "Channel[~s] is Running, ProductId:~s, Log is ~s", [ChannelId, ProductId, true]);
_ ->
dgiot_bridge:send_log(ChannelId, Fmt, Args)
end;

View File

@ -39,7 +39,6 @@ start(Port, State) ->
%% {ok, TCPState}.
init(#tcp{state = #state{id = ChannelId}} = TCPState) ->
?LOG(info, "ChannelId ~p", [ChannelId]),
case dgiot_bridge:get_products(ChannelId) of
{ok, _TYPE, _ProductIds} ->
{ok, TCPState};
@ -50,7 +49,6 @@ init(#tcp{state = #state{id = ChannelId}} = TCPState) ->
%% 9C A5 25 CD 00 DB
%% 11 04 02 06 92 FA FE
handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, devaddr = <<>>, head = Head, len = Len, product = ProductId, dtutype = Dtutype} = State} = TCPState) ->
dgiot_bridge:send_log(ChannelId, ProductId, "DTU revice from ~p", [dgiot_utils:binary_to_hex(Buff)]),
DTUIP = dgiot_utils:get_ip(Socket),
DtuAddr = dgiot_utils:binary_to_hex(Buff),
List = dgiot_utils:to_list(DtuAddr),
@ -67,6 +65,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
{DevId1, Devaddr1}
end,
Topic = <<"profile/", ProductId/binary, "/", Devaddr/binary>>,
dgiot_bridge:send_log(ChannelId, ProductId, Devaddr, "DTU revice from ~p", [dgiot_utils:binary_to_hex(Buff)]),
dgiot_mqtt:subscribe(Topic),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DevId, state = State#state{devaddr = Devaddr, deviceId = DevId}}};
_Error ->
@ -74,6 +73,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
{match, [Head]} when length(List1) == Len ->
create_device(DeviceId, ProductId, Buff, DTUIP, Dtutype),
Topic = <<"profile/", ProductId/binary, "/", Buff/binary>>,
dgiot_bridge:send_log(ChannelId, ProductId, Buff, "DTU revice from ~p", [dgiot_utils:binary_to_hex(Buff)]),
dgiot_mqtt:subscribe(Topic),
{noreply, TCPState#tcp{buff = <<>>, state = State#state{devaddr = Buff}}};
Error1 ->
@ -83,7 +83,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
end;
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr, env = #{product := ProductId, pn := Pn, di := Di}, product = DtuProductId} = State} = TCPState) ->
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "revice from ~p", [dgiot_utils:binary_to_hex(Buff)]),
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "DtuAddr ~p revice from ~p", [DtuAddr, dgiot_utils:binary_to_hex(Buff)]),
<<H:8, L:8>> = dgiot_utils:hex_to_binary(modbus_rtu:is16(Di)),
<<Sh:8, Sl:8>> = dgiot_utils:hex_to_binary(modbus_rtu:is16(Pn)),
case modbus_rtu:parse_frame(Buff, [], #{
@ -93,9 +93,9 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr,
<<"slaveId">> => Sh * 256 + Sl,
<<"address">> => H * 256 + L}) of
{_, Things} ->
?LOG(info, "Things ~p", [Things]),
%% ?LOG(info, "Things ~p", [Things]),
NewTopic = <<"thing/", DtuProductId/binary, "/", DtuAddr/binary, "/post">>,
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "end to_task: ~p: ~p ~n", [NewTopic, jsx:encode(Things)]),
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "DtuAddr ~p end to_task: ~p: ~p ~n", [DtuAddr, NewTopic, jsx:encode(Things)]),
DeviceId = dgiot_parse:get_deviceid(ProductId, DtuAddr),
dgiot_mqtt:publish(DeviceId, NewTopic, jsx:encode(Things));
Other ->
@ -124,7 +124,7 @@ handle_info({deliver, _, Msg}, #tcp{state = #state{id = ChannelId} = State} = TC
{noreply, TCPState};
[<<"thing">>, _ProductId, DevAddr] ->
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(Payload, [{labels, binary}, return_maps]),
?LOG(error, "ThingData ~p", [ThingData]),
%% ?LOG(error, "ThingData ~p", [ThingData]),
case ThingData of
#{<<"command">> := <<"r">>,
<<"data">> := Value,
@ -138,9 +138,9 @@ handle_info({deliver, _, Msg}, #tcp{state = #state{id = ChannelId} = State} = TC
<<"value">> => Value,
<<"productid">> => ProductId,
<<"di">> => Di}),
?LOG(error, "Datas ~p", [Datas]),
%% ?LOG(error, "Datas ~p", [Datas]),
lists:map(fun(X) ->
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "to_device: ~p ", [dgiot_utils:binary_to_hex(X)]),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, " to_device: ~p [~p] ", [DevAddr, dgiot_utils:binary_to_hex(X)]),
dgiot_tcp_server:send(TCPState, X),
timer:sleep(1000)
end, Datas),
@ -159,7 +159,7 @@ handle_info({deliver, _, Msg}, #tcp{state = #state{id = ChannelId} = State} = TC
<<"di">> => Di}),
%% ?LOG(error, "Datas ~p", [Datas]),
lists:map(fun(X) ->
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "to_device: ~p ", [dgiot_utils:binary_to_hex(X)]),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "to_device: ~p [~p] ", [DevAddr, dgiot_utils:binary_to_hex(X)]),
dgiot_tcp_server:send(TCPState, X),
timer:sleep(1000)
end, Datas),
@ -180,7 +180,7 @@ handle_info({deliver, _, Msg}, #tcp{state = #state{id = ChannelId} = State} = TC
#{<<"_dgiotprotocol">> := <<"modbus">>} ->
Payloads = modbus_rtu:set_params(maps:without([<<"_dgiotprotocol">>], Payload), ProductId, DevAddr),
lists:map(fun(X) ->
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "to_device: ~p ", [dgiot_utils:binary_to_hex(X)]),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "to_device: ~p [~p] ", [DevAddr, dgiot_utils:binary_to_hex(X)]),
dgiot_tcp_server:send(TCPState, X)
end, Payloads);
_ ->

View File

@ -80,7 +80,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
%% 3D302E30303030203D302E303030303530303138200D0A
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DevAddr, product = ProductId} = State} = TCPState) ->
dgiot_bridge:send_log(ChannelId, "revice from ~p", [Buff]),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "revice from ~p", [Buff]),
case dgiot_shouyincheng:parse_frame(Buff, State) of
{params, Data} ->
dgiot_tdengine_adapter:save(ProductId, DevAddr, Data);

View File

@ -140,16 +140,16 @@ handle_info(retry, State) ->
{noreply, send_msg(State)};
%%
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = ProductId, ack = Ack, que = Que} = State) when length(Que) == 0 ->
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = ProductId, devaddr = DevAddr, ack = Ack, que = Que} = State) when length(Que) == 0 ->
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
dgiot_bridge:send_log(Channel, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg)), ?FILE, ?LINE]),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg)), ?FILE, ?LINE]),
NewAck = dgiot_task:get_collection(ProductId, Dis, Payload, Ack),
{noreply, get_next_pn(State#task{ack = NewAck})};
%% ACK消息触发抄表指令
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = ProductId, ack = Ack} = State) ->
handle_info({deliver, _, Msg}, #task{tid = Channel, dis = Dis, product = ProductId, devaddr = DevAddr, ack = Ack} = State) ->
Payload = jsx:decode(dgiot_mqtt:get_payload(Msg), [return_maps]),
dgiot_bridge:send_log(Channel, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg)), ?FILE, ?LINE]),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(dgiot_mqtt:get_topic(Msg)), unicode:characters_to_list(dgiot_mqtt:get_payload(Msg)), ?FILE, ?LINE]),
NewAck = dgiot_task:get_collection(ProductId, Dis, Payload, Ack),
{noreply, send_msg(State#task{ack = NewAck})};
@ -197,7 +197,7 @@ send_msg(#task{tid = Channel, product = Product, devaddr = DevAddr, ref = Ref, q
end, {0, [], []}, Que),
Newpayload = jsx:encode(Payload),
Topic = <<"thing/", Product/binary, "/", DevAddr/binary>>,
dgiot_bridge:send_log(Channel, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(Topic), unicode:characters_to_list(Newpayload), ?FILE, ?LINE]),
dgiot_bridge:send_log(Channel, Product, DevAddr, "to_dev=> ~ts: ~ts ~s ~p ", [unicode:characters_to_list(Topic), unicode:characters_to_list(Newpayload), ?FILE, ?LINE]),
dgiot_mqtt:publish(Channel, Topic, Newpayload),
%%
case Ref of
@ -249,7 +249,7 @@ save_td(#task{app = _App, tid = Channel, product = ProductId, devaddr = DevAddr,
dgiot_tdengine_adapter:save(ProductId, DevAddr, Data),
NotificationTopic = <<"notification/", ProductId/binary, "/", DevAddr/binary, "/post">>,
dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(Data)),
dgiot_bridge:send_log(Channel, "from_Notification=> ~ts: ~ts ", [unicode:characters_to_list(NotificationTopic), unicode:characters_to_list(jsx:encode(Data))]);
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "from_Notification=> ~ts: ~ts ", [unicode:characters_to_list(NotificationTopic), unicode:characters_to_list(jsx:encode(Data))]);
true ->
pass
end

View File

@ -477,7 +477,7 @@ function deploy_postgres() {
install_postgres
init_postgres_database
DATA_DIR="${install_dir}/dgiot_pg_writer/data"
install_service dgiot_pg_writer "notify" "/usr/local/pgsql/12/bin/postgres -D ${DATA_DIR}" "postgres" "DATA_DIR=${DATA_DIR}"
install_service1 dgiot_pg_writer "notify" "/usr/local/pgsql/12/bin/postgres -D ${DATA_DIR}" "postgres" "DATA_DIR=${DATA_DIR}"
sleep 2
psql -U postgres -c "CREATE USER repl WITH PASSWORD '${pg_pwd}' REPLICATION;" &> /dev/null
echo -e "`date +%F_%T`: ${GREEN} deploy postgres success${NC}"