优化Mqtt推送

This commit is contained in:
iioter 2022-10-13 14:50:50 +08:00
parent c22792d877
commit c261efcb07
5 changed files with 152 additions and 136 deletions

View File

@ -30,7 +30,7 @@ namespace IoTGateway.ViewModel.Config.SystemConfigVMs
{ {
base.DoEdit(updateAllFields); base.DoEdit(updateAllFields);
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient; var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.StartManagedClientAsync().Wait(); myMqttClient.StartClientAsync().Wait();
} }
public override void DoDelete() public override void DoDelete()

Binary file not shown.

BIN
IoTGateway/wwwroot/3d.zip Normal file

Binary file not shown.

View File

@ -3,8 +3,6 @@ using IoTGateway.Model;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Packets;
using MQTTnet.Protocol; using MQTTnet.Protocol;
using Newtonsoft.Json; using Newtonsoft.Json;
using PluginInterface; using PluginInterface;
@ -12,7 +10,6 @@ using PluginInterface.HuaWeiRoma;
using PluginInterface.IotDB; using PluginInterface.IotDB;
using PluginInterface.IoTSharp; using PluginInterface.IoTSharp;
using PluginInterface.ThingsBoard; using PluginInterface.ThingsBoard;
using System.Xml.Linq;
namespace Plugin namespace Plugin
{ {
@ -22,9 +19,9 @@ namespace Plugin
//private readonly ReferenceNodeManager? _uaNodeManager; //private readonly ReferenceNodeManager? _uaNodeManager;
private SystemConfig _systemConfig; private SystemConfig _systemConfig;
private ManagedMqttClientOptions _options; private MqttClientOptions _options;
public bool IsConnected => (Client.IsConnected); public bool IsConnected => (Client.IsConnected);
private IManagedMqttClient? Client { get; set; } private IMqttClient Client { get; set; }
public event EventHandler<RpcRequest> OnExcRpc; public event EventHandler<RpcRequest> OnExcRpc;
public event EventHandler<ISAttributeResponse> OnReceiveAttributes; public event EventHandler<ISAttributeResponse> OnReceiveAttributes;
private readonly string _tbRpcTopic = "v1/gateway/rpc"; private readonly string _tbRpcTopic = "v1/gateway/rpc";
@ -35,10 +32,10 @@ namespace Plugin
_logger = logger; _logger = logger;
//_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager; //_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager;
StartManagedClientAsync().Wait(); StartClientAsync().Wait();
} }
public async Task StartManagedClientAsync() public async Task StartClientAsync()
{ {
try try
{ {
@ -46,50 +43,71 @@ namespace Plugin
{ {
Client.Dispose(); Client.Dispose();
} }
Client = new MqttFactory().CreateManagedMqttClient(); Client = new MqttFactory().CreateMqttClient();
await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType); await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType);
_systemConfig = dc.Set<SystemConfig>().First(); _systemConfig = dc.Set<SystemConfig>().First();
#region ClientOptions #region ClientOptions
// Setup and start a managed MQTT client. // Setup and start a managed MQTT client.
_options = new ManagedMqttClientOptionsBuilder() _options = new MqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientId(string.IsNullOrWhiteSpace(_systemConfig.ClientId)
.WithMaxPendingMessages(10000) ? Guid.NewGuid().ToString()
.WithClientOptions(new MqttClientOptionsBuilder() : _systemConfig.ClientId)
.WithClientId(string.IsNullOrWhiteSpace( _systemConfig.ClientId) ? Guid.NewGuid().ToString() : _systemConfig.ClientId)
.WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort) .WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort)
.WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd) .WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd)
.WithTimeout(TimeSpan.FromSeconds(30)) .WithTimeout(TimeSpan.FromSeconds(30))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
.Build()) .Build();
.Build();
#endregion #endregion
#region Topics
List<MqttTopicFilter> subTopics = new(); Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
try
{
await Client.ConnectAsync(_options);
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT CONNECTING FAILED");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"StartManagedClientAsync FAILED ");
}
}
private async Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
#region Topics
try
{
switch (_systemConfig.IoTPlatformType) switch (_systemConfig.IoTPlatformType)
{ {
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
//{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}} //{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}}
subTopics.Add(new MqttTopicFilterBuilder().WithTopic(_tbRpcTopic).WithExactlyOnceQoS().Build()); await Client.SubscribeAsync(_tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"id": $request_id, "device": "Device A", "value": "value1"} //Message: {"id": $request_id, "device": "Device A", "value": "value1"}
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributes/response").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributese").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce);
break; break;
case IoTPlatformType.IotDB:
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/rpc/request/+/+").WithExactlyOnceQoS().Build()); case IoTPlatformType.IotDB:
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/update").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce);
await Client.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce);
//Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}}
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/response/+").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce);
break; break;
case IoTPlatformType.ThingsCloud: case IoTPlatformType.ThingsCloud:
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/response").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce);
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/get/response").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce);
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/push").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce);
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/event/response").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce);
subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/command/send").WithExactlyOnceQoS().Build()); await Client.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce);
break; break;
case IoTPlatformType.AliCloudIoT: case IoTPlatformType.AliCloudIoT:
break; break;
@ -100,34 +118,26 @@ namespace Plugin
case IoTPlatformType.OneNET: case IoTPlatformType.OneNET:
break; break;
} }
#endregion
Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
await Client.SubscribeAsync(subTopics);
await Client.StartAsync(_options);
_logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES");
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"StartManagedClientAsync FAILED, {ex}"); _logger.LogError(ex, "MQTT Subscribe FAILED");
}
#endregion
}
private async Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
try
{
await Client.ConnectAsync(_options);
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT CONNECTING FAILED");
} }
} }
private Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
_logger.LogInformation($"MQTT CONNECTED WITH SERVER ");
return Task.CompletedTask;
}
private Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
_logger.LogError($"MQTT CONNECTING FAILED, {arg.ReasonString}");
return Task.CompletedTask;
}
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e) private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{ {
_logger.LogDebug( _logger.LogDebug(
@ -154,8 +164,7 @@ namespace Plugin
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError( _logger.LogError(
$"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", ex, $"ClientId:{e.ClientId} Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
ex);
} }
return Task.CompletedTask; return Task.CompletedTask;
@ -184,9 +193,8 @@ namespace Plugin
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError( _logger.LogError(ex,
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", $"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
ex);
} }
} }
@ -211,9 +219,8 @@ namespace Plugin
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError( _logger.LogError(ex,
$"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", $"ReceiveTbRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
ex);
} }
} }
@ -246,15 +253,14 @@ namespace Plugin
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError( _logger.LogError(ex,
$"ReceiveIsRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}", $"ReceiveIsRpc:Topic:{e.ApplicationMessage.Topic},Payload:{e.ApplicationMessage.ConvertPayloadToString()}");
ex);
} }
} }
private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse) private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse)
{ {
await Client.EnqueueAsync(new MqttApplicationMessageBuilder() await Client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(_tbRpcTopic) .WithTopic(_tbRpcTopic)
.WithPayload(JsonConvert.SerializeObject(tBRpcResponse)) .WithPayload(JsonConvert.SerializeObject(tBRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -263,7 +269,7 @@ namespace Plugin
private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse) private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse)
{ {
var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}"; var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}";
await Client.EnqueueAsync(new MqttApplicationMessageBuilder() await Client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(tCRpcResponse)) .WithPayload(JsonConvert.SerializeObject(tCRpcResponse))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -273,7 +279,7 @@ namespace Plugin
{ {
//var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}"; //var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}"; var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}";
await Client.EnqueueAsync(new MqttApplicationMessageBuilder() await Client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic) .WithTopic(topic)
.WithPayload(JsonConvert.SerializeObject(rpcResult)) .WithPayload(JsonConvert.SerializeObject(rpcResult))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
@ -312,13 +318,13 @@ namespace Plugin
try try
{ {
if (Client.IsConnected) if (Client.IsConnected)
return Client.EnqueueAsync(new MqttApplicationMessageBuilder() return Client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj)) .WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj))
.Build()); .Build());
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"Device:{deviceName} UploadAttributeAsync Failed,{ex}"); _logger.LogError(ex, $"Device:{deviceName} UploadAttributeAsync Failed");
} }
return Task.CompletedTask; return Task.CompletedTask;
@ -326,14 +332,15 @@ namespace Plugin
public async Task UploadIsTelemetryDataAsync(string deviceName, object obj) public async Task UploadIsTelemetryDataAsync(string deviceName, object obj)
{ {
await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry") await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry")
.WithPayload(JsonConvert.SerializeObject(obj)).Build()); .WithPayload(JsonConvert.SerializeObject(obj)).Build());
} }
public async Task UploadTcTelemetryDataAsync(string deviceName, object obj) public async Task UploadTcTelemetryDataAsync(string deviceName, object obj)
{ {
var toSend = new Dictionary<string, object> { { deviceName, obj } }; var toSend = new Dictionary<string, object> { { deviceName, obj } };
await Client.EnqueueAsync("gateway/attributes", JsonConvert.SerializeObject(toSend)); await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/attributes")
.WithPayload(JsonConvert.SerializeObject(toSend)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
} }
public async Task UploadHwTelemetryDataAsync(Device device, object obj) public async Task UploadHwTelemetryDataAsync(Device device, object obj)
@ -359,8 +366,8 @@ namespace Plugin
Devices = hwTelemetry Devices = hwTelemetry
}; };
await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/datas", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/datas")
JsonConvert.SerializeObject(hwTelemetrys)); .WithPayload(JsonConvert.SerializeObject(hwTelemetrys)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
} }
public async Task ResponseRpcAsync(RpcResponse rpcResponse) public async Task ResponseRpcAsync(RpcResponse rpcResponse)
@ -406,7 +413,7 @@ namespace Plugin
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"ResponseRpc Error,{rpcResponse}", ex); _logger.LogError(ex, $"ResponseRpc Error,{rpcResponse}");
} }
} }
@ -426,8 +433,8 @@ namespace Plugin
{ "client", true }, { "client", true },
{ "key", args[0] } { "key", args[0] }
}; };
await Client.EnqueueAsync("v1/gateway/attributes/request", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/attributes/request")
JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce); .WithPayload(JsonConvert.SerializeObject(tbRequestData)).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
string topic = $"devices/{deviceName}/attributes/request/{id}"; string topic = $"devices/{deviceName}/attributes/request/{id}";
@ -435,8 +442,9 @@ namespace Plugin
keys.Add(anySide ? "anySide" : "server", string.Join(",", args)); keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}", await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}",
MqttQualityOfServiceLevel.ExactlyOnce); MqttQualityOfServiceLevel.ExactlyOnce);
await Client.EnqueueAsync(topic, JsonConvert.SerializeObject(keys), await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
MqttQualityOfServiceLevel.ExactlyOnce); .WithPayload(JsonConvert.SerializeObject(keys))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.AliCloudIoT: case IoTPlatformType.AliCloudIoT:
break; break;
@ -450,7 +458,7 @@ namespace Plugin
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"RequestAttributes:{deviceName}", ex); _logger.LogError(ex, $"RequestAttributes:{deviceName}");
} }
} }
@ -512,8 +520,9 @@ namespace Plugin
switch (_systemConfig.IoTPlatformType) switch (_systemConfig.IoTPlatformType)
{ {
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
await Client.EnqueueAsync("v1/gateway/telemetry", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/telemetry")
JsonConvert.SerializeObject(sendModel)); .WithPayload(JsonConvert.SerializeObject(sendModel))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
foreach (var payload in sendModel[device.DeviceName]) foreach (var payload in sendModel[device.DeviceName])
@ -532,24 +541,26 @@ namespace Plugin
break; break;
case IoTPlatformType.IotDB: case IoTPlatformType.IotDB:
{
foreach (var payload in sendModel[device.DeviceName])
{ {
if (payload.DeviceStatus != DeviceStatusTypeEnum.Good) foreach (var payload in sendModel[device.DeviceName])
continue;
IotTsData tsData = new IotTsData()
{ {
device = _systemConfig.GatewayName + device.DeviceName, if (payload.DeviceStatus != DeviceStatusTypeEnum.Good)
timestamp = payload.TS, continue;
measurements = payload.Values?.Keys.ToList(),
values = payload.Values?.Values.ToList()
};
await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName, JsonConvert.SerializeObject(tsData));
}
break; IotTsData tsData = new IotTsData()
} {
device = _systemConfig.GatewayName + device.DeviceName,
timestamp = payload.TS,
measurements = payload.Values?.Keys.ToList(),
values = payload.Values?.Values.ToList()
};
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(_systemConfig.GatewayName + device.DeviceName)
.WithPayload(JsonConvert.SerializeObject(tsData))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
break;
}
case IoTPlatformType.HuaWei: case IoTPlatformType.HuaWei:
foreach (var payload in sendModel[device.DeviceName]) foreach (var payload in sendModel[device.DeviceName])
{ {
@ -581,14 +592,14 @@ namespace Plugin
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"PublishTelemetryAsync Error:{ex}"); _logger.LogError(ex, $"PublishTelemetryAsync Error");
} }
} }
private readonly DateTime _tsStartDt = new(1970, 1, 1); private readonly DateTime _tsStartDt = new(1970, 1, 1);
private readonly List<string> iotDbOnLineMeasurement = new() { "online" }; private readonly List<string> _iotDbOnLineMeasurement = new() { "online" };
private readonly List<object> iotDbOnLine = new() { true }; private readonly List<object> _iotDbOnLine = new() { true };
private readonly List<object> iotDbOffLine = new() { false }; private readonly List<object> _iotDbOffLine = new() { false };
public async Task DeviceConnected(Device device) public async Task DeviceConnected(Device device)
{ {
try try
@ -597,9 +608,10 @@ namespace Plugin
{ {
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
await Client.EnqueueAsync("v1/gateway/connect", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("v1/gateway/connect")
JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } })); { { "device", device.DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.AliCloudIoT: case IoTPlatformType.AliCloudIoT:
break; break;
@ -610,20 +622,22 @@ namespace Plugin
case IoTPlatformType.OneNET: case IoTPlatformType.OneNET:
break; break;
case IoTPlatformType.ThingsCloud: case IoTPlatformType.ThingsCloud:
await Client.EnqueueAsync("gateway/connect", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("gateway/connect")
JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } })); { { "device", device.DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.IotDB: case IoTPlatformType.IotDB:
IotTsData onlineData = new IotTsData() IotTsData onlineData = new IotTsData()
{ {
device = _systemConfig.GatewayName + device.DeviceName, device = _systemConfig.GatewayName + device.DeviceName,
timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds, timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds,
measurements = iotDbOnLineMeasurement, measurements = _iotDbOnLineMeasurement,
values = iotDbOnLine values = _iotDbOnLine
}; };
await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName, await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(_systemConfig.GatewayName + device.DeviceName)
JsonConvert.SerializeObject(onlineData)); .WithPayload(JsonConvert.SerializeObject(onlineData))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.HuaWei: case IoTPlatformType.HuaWei:
var deviceOnLine = new HwDeviceOnOffLine() var deviceOnLine = new HwDeviceOnOffLine()
@ -640,15 +654,15 @@ namespace Plugin
} }
} }
}; };
await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update")
JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, .WithPayload(JsonConvert.SerializeObject(deviceOnLine))
retain: false); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"DeviceConnected:{device.DeviceName}", ex); _logger.LogError(ex, $"DeviceConnected:{device.DeviceName}");
} }
} }
@ -660,9 +674,10 @@ namespace Plugin
{ {
case IoTPlatformType.ThingsBoard: case IoTPlatformType.ThingsBoard:
case IoTPlatformType.IoTSharp: case IoTPlatformType.IoTSharp:
await Client.EnqueueAsync("v1/gateway/disconnect", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"v1/gateway/disconnect")
JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } })); { { "device", device.DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.AliCloudIoT: case IoTPlatformType.AliCloudIoT:
break; break;
@ -673,20 +688,22 @@ namespace Plugin
case IoTPlatformType.OneNET: case IoTPlatformType.OneNET:
break; break;
case IoTPlatformType.ThingsCloud: case IoTPlatformType.ThingsCloud:
await Client.EnqueueAsync("gateway/disconnect", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"gateway/disconnect")
JsonConvert.SerializeObject(new Dictionary<string, string> .WithPayload(JsonConvert.SerializeObject(new Dictionary<string, string>
{ { "device", device.DeviceName } })); { { "device", device.DeviceName } }))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.IotDB: case IoTPlatformType.IotDB:
IotTsData onlineData = new IotTsData() IotTsData onlineData = new IotTsData()
{ {
device = _systemConfig.GatewayName + device.DeviceName, device = _systemConfig.GatewayName + device.DeviceName,
timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds, timestamp = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds,
measurements = iotDbOnLineMeasurement, measurements = _iotDbOnLineMeasurement,
values = iotDbOffLine values = _iotDbOffLine
}; };
await Client.EnqueueAsync(_systemConfig.GatewayName + device.DeviceName, await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(_systemConfig.GatewayName + device.DeviceName)
JsonConvert.SerializeObject(onlineData)); .WithPayload(JsonConvert.SerializeObject(onlineData))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
case IoTPlatformType.HuaWei: case IoTPlatformType.HuaWei:
var deviceOnLine = new HwDeviceOnOffLine() var deviceOnLine = new HwDeviceOnOffLine()
@ -703,15 +720,15 @@ namespace Plugin
} }
} }
}; };
await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"/v1/devices/{_systemConfig.GatewayName}/topo/update")
JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, .WithPayload(JsonConvert.SerializeObject(deviceOnLine))
retain: false); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"DeviceDisconnected:{device.DeviceName}", ex); _logger.LogError(ex, $"DeviceDisconnected:{device.DeviceName}");
} }
} }
@ -738,15 +755,15 @@ namespace Plugin
ProductType = "A_n" ProductType = "A_n"
} }
); );
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
await Client.EnqueueAsync(topic, .WithPayload(JsonConvert.SerializeObject(addDeviceDto))
JsonConvert.SerializeObject(addDeviceDto)); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
break; break;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"DeviceAdded:{device.DeviceName}", ex); _logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
} }
} }
@ -776,16 +793,16 @@ namespace Plugin
ProductType = "A_n" ProductType = "A_n"
} }
}; };
await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic)
await Client.EnqueueAsync(topic, .WithPayload(JsonConvert.SerializeObject(deleteDeviceDto))
JsonConvert.SerializeObject(deleteDeviceDto)); .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build());
} }
break; break;
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError($"DeviceAdded:{device.DeviceName}", ex); _logger.LogError(ex, $"DeviceAdded:{device.DeviceName}");
} }
} }
} }

View File

@ -11,7 +11,6 @@
<PackageReference Include="Mono.Options" Version="6.12.0.148" /> <PackageReference Include="Mono.Options" Version="6.12.0.148" />
<PackageReference Include="MQTTnet" Version="4.1.1.318" /> <PackageReference Include="MQTTnet" Version="4.1.1.318" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.4.370.12" /> <PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Server" Version="1.4.370.12" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.1.1.318" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>