diff --git a/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs b/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs index 125aefc..0befebb 100644 --- a/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs +++ b/IoTGateway.ViewModel/Config/SystemConfigVMs/SystemConfigVM.cs @@ -30,7 +30,7 @@ namespace IoTGateway.ViewModel.Config.SystemConfigVMs { base.DoEdit(updateAllFields); var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient; - myMqttClient.ConnectAsync(); + myMqttClient.StartManagedClientAsync().Wait(); } public override void DoDelete() diff --git a/IoTGateway.ViewModel/MqttServer/MqttClientVMs/MqttClientListVM.cs b/IoTGateway.ViewModel/MqttServer/MqttClientVMs/MqttClientListVM.cs index 33d5226..b9ea43e 100644 --- a/IoTGateway.ViewModel/MqttServer/MqttClientVMs/MqttClientListVM.cs +++ b/IoTGateway.ViewModel/MqttServer/MqttClientVMs/MqttClientListVM.cs @@ -9,7 +9,6 @@ using System.ComponentModel.DataAnnotations; using IoTGateway.Model; using Microsoft.Extensions.Primitives; using MQTTnet.Server; -using MQTTnet.Server.Status; using MQTTnet.Formatter; namespace IoTGateway.ViewModel.MqttClient.MqttServerVMs @@ -45,12 +44,12 @@ namespace IoTGateway.ViewModel.MqttClient.MqttServerVMs } public override void DoSearch() { - var mqttServer = Wtm.ServiceProvider.GetService(typeof(IMqttServer)) as IMqttServer; - foreach (var client in mqttServer.GetClientStatusAsync().Result) + var mqttServer = Wtm.ServiceProvider.GetService(typeof(MqttServer)) as MqttServer; + foreach (var client in mqttServer.GetClientsAsync().Result) { MqttClient_View mqttClient_ = new MqttClient_View { - ClientId = client.ClientId, + ClientId = client.Id, BytesReceived = client.BytesReceived, BytesSent = client.BytesSent, MqttProtocolVersion = client.ProtocolVersion, diff --git a/IoTGateway/IoTGateway.csproj b/IoTGateway/IoTGateway.csproj index 94e53f9..06bedf0 100644 --- a/IoTGateway/IoTGateway.csproj +++ b/IoTGateway/IoTGateway.csproj @@ -19,12 +19,11 @@ - - - - - - + + + + + diff --git a/IoTGateway/Program.cs b/IoTGateway/Program.cs index f770fc8..9e33d00 100644 --- a/IoTGateway/Program.cs +++ b/IoTGateway/Program.cs @@ -6,7 +6,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using MQTTnet.AspNetCore.Extensions; +using MQTTnet.AspNetCore; using WalkingTec.Mvvm.Core; using NLog; using NLog.Web; diff --git a/IoTGateway/Startup.cs b/IoTGateway/Startup.cs index 12a8982..d390cd7 100644 --- a/IoTGateway/Startup.cs +++ b/IoTGateway/Startup.cs @@ -12,7 +12,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.FileProviders; using Microsoft.Extensions.Options; using MQTTnet.AspNetCore; -using MQTTnet.AspNetCore.Extensions; using Plugin; using WalkingTec.Mvvm.Core; using WalkingTec.Mvvm.Core.Extensions; @@ -66,11 +65,8 @@ namespace IoTGateway options.ReloadUserFunc = ReloadUser; }); - //MQTTServer - services.AddHostedMqttServer(mqttServer => - { - mqttServer.WithoutDefaultEndpoint(); - }) + //MqttServer + services.AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint()) .AddMqttConnectionHandler() .AddConnections(); @@ -78,7 +74,7 @@ namespace IoTGateway services.AddHostedService(); services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); + //services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); @@ -125,10 +121,15 @@ namespace IoTGateway app.UseEndpoints(endpoints => { - //MqttServerWebSocket - endpoints.MapConnectionHandler("/mqtt", options => + //MqttServer + app.UseEndpoints(endpoints => { - options.WebSockets.SubProtocolSelector = MqttSubProtocolSelector.SelectSubProtocol; + endpoints.MapMqtt("/mqtt"); + }); + + app.UseMqttServer(server => + { + // Todo: Do something with the server }); endpoints.MapControllerRoute( diff --git a/IoTGateway/iotgateway.db b/IoTGateway/iotgateway.db index c24e360..c43349b 100644 Binary files a/IoTGateway/iotgateway.db and b/IoTGateway/iotgateway.db differ diff --git a/Plugins/Plugin/DeviceService.cs b/Plugins/Plugin/DeviceService.cs index 877bb40..408b92b 100644 --- a/Plugins/Plugin/DeviceService.cs +++ b/Plugins/Plugin/DeviceService.cs @@ -19,18 +19,18 @@ namespace Plugin public List DeviceThreads = new List(); private readonly MyMqttClient _myMqttClient; private readonly UAService _uAService; - private readonly IMqttServer _mqttServer; + private readonly MqttServer _mqttServer; private readonly string _connnectSetting = IoTBackgroundService.connnectSetting; private readonly DBTypeEnum _dbType = IoTBackgroundService.DbType; + //UAService? uAService, public DeviceService(IConfiguration configRoot, DriverService drvierManager, MyMqttClient myMqttClient, - UAService uAService, IMqttServer mqttServer, ILogger logger) + MqttServer mqttServer, ILogger logger) { - if (mqttServer == null) throw new ArgumentNullException(nameof(mqttServer)); _logger = logger; DrvierManager = drvierManager; _myMqttClient = myMqttClient; - _uAService = uAService; + //_uAService = uAService; _mqttServer = mqttServer ?? throw new ArgumentNullException(nameof(mqttServer)); try { diff --git a/Plugins/Plugin/DeviceThread.cs b/Plugins/Plugin/DeviceThread.cs index 23af468..a9ec41e 100644 --- a/Plugins/Plugin/DeviceThread.cs +++ b/Plugins/Plugin/DeviceThread.cs @@ -1,11 +1,13 @@ using PluginInterface; using System.Reflection; +using System.Text; using IoTGateway.DataAccess; using IoTGateway.Model; using DynamicExpresso; using MQTTnet.Server; using Newtonsoft.Json; using Microsoft.Extensions.Logging; +using MQTTnet; namespace Plugin { @@ -26,7 +28,7 @@ namespace Plugin private bool _lastConnected; public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient, - IMqttServer mqttServer, ILogger logger) + MqttServer mqttServer, ILogger logger) { _myMqttClient = myMqttClient; _myMqttClient.OnExcRpc += MyMqttClient_OnExcRpc; @@ -119,13 +121,24 @@ namespace Plugin ret.CookedValue?.ToString()) { //这是设备变量列表要用的 - mqttServer.PublishAsync( - $"internal/v1/gateway/telemetry/{Device.DeviceName}/{item.Name}", - JsonConvert.SerializeObject(ret)); + var msg = new InjectedMqttApplicationMessage( + new MqttApplicationMessage() + { + Topic = + $"internal/v1/gateway/telemetry/{Device.DeviceName}/{item.Name}", + Payload = Encoding.UTF8.GetBytes( + JsonConvert.SerializeObject(ret)) + }); + mqttServer.InjectApplicationMessage(msg); //这是在线组态要用的 - mqttServer.PublishAsync( - $"v1/gateway/telemetry/{Device.DeviceName}/{item.Name}", - JsonConvert.SerializeObject(ret.CookedValue)); + msg = new InjectedMqttApplicationMessage( + new MqttApplicationMessage() + { + Topic = + $"v1/gateway/telemetry/{Device.DeviceName}/{item.Name}", + Payload = Encoding.UTF8.GetBytes( + JsonConvert.SerializeObject(ret.CookedValue)) + }); } DeviceValues[item.ID] = ret; @@ -224,7 +237,7 @@ namespace Plugin if (!writeResponse.IsSuccess) { rpcResponse.Description = writeResponse.Description; - break; + continue; } } else diff --git a/Plugins/Plugin/MyMqttClient.cs b/Plugins/Plugin/MyMqttClient.cs index bab323f..691c590 100644 --- a/Plugins/Plugin/MyMqttClient.cs +++ b/Plugins/Plugin/MyMqttClient.cs @@ -1,13 +1,10 @@ -using System.Text; -using IoTGateway.DataAccess; +using IoTGateway.DataAccess; using IoTGateway.Model; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; -using MQTTnet.Client.Connecting; -using MQTTnet.Client.Disconnecting; -using MQTTnet.Client.Options; -using MQTTnet.Client.Receiving; +using MQTTnet.Extensions.ManagedClient; +using MQTTnet.Packets; using MQTTnet.Protocol; using Newtonsoft.Json; using PluginInterface; @@ -15,7 +12,6 @@ using PluginInterface.HuaWeiRoma; using PluginInterface.IotDB; using PluginInterface.IoTSharp; using PluginInterface.ThingsBoard; -using Quickstarts.ReferenceServer; namespace Plugin { @@ -24,113 +20,74 @@ namespace Plugin private readonly ILogger _logger; //private readonly ReferenceNodeManager? _uaNodeManager; - private SystemConfig? _systemConfig; - private IMqttClientOptions _clientOptions; + private SystemConfig _systemConfig; + private ManagedMqttClientOptions _options; public bool IsConnected => (Client.IsConnected); - private IMqttClient Client { get; set; } + private IManagedMqttClient? Client { get; set; } public event EventHandler OnExcRpc; public event EventHandler OnReceiveAttributes; + private readonly string _tbRpcTopic = "v1/gateway/rpc"; - public MyMqttClient(UAService uaService, ILogger logger) + //UAService uaService, + public MyMqttClient(ILogger logger) { _logger = logger; //_uaNodeManager = uaService.server.m_server.nodeManagers[0] as ReferenceNodeManager; - ConnectAsync(); + + StartManagedClientAsync().Wait(); } - public async Task ConnectAsync() + public async Task StartManagedClientAsync() { try { + if (Client != null) + { + Client.Dispose(); + } + Client = new MqttFactory().CreateManagedMqttClient(); await using var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType); - _systemConfig = dc.Set().FirstOrDefault(); - if (_systemConfig == null) - { - _systemConfig = new SystemConfig() - { - ID = Guid.NewGuid(), - GatewayName = "iotgateway", - ClientId = Guid.NewGuid().ToString(), - MqttIp = "localhost", - MqttPort = 1888, - MqttUName = "user", - MqttUPwd = "pwd", - IoTPlatformType = IoTPlatformType.IoTSharp - }; - dc.Set().Add(_systemConfig); - await dc.SaveChangesAsync(); - } + _systemConfig = dc.Set().First(); - var factory = new MqttFactory(); - Client = (MqttClient)factory.CreateMqttClient(); - _clientOptions = new MqttClientOptionsBuilder() - .WithClientId(_systemConfig.ClientId ?? Guid.NewGuid().ToString()) - .WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort) - .WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd) - .WithCommunicationTimeout(TimeSpan.FromSeconds(30)) - .WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) + #region ClientOptions + // Setup and start a managed MQTT client. + _options = new ManagedMqttClientOptionsBuilder() + .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) + .WithMaxPendingMessages(10000) + .WithClientOptions(new MqttClientOptionsBuilder() + .WithClientId(_systemConfig.ClientId ?? Guid.NewGuid().ToString()) + .WithTcpServer(_systemConfig.MqttIp, _systemConfig.MqttPort) + .WithCredentials(_systemConfig.MqttUName, _systemConfig.MqttUPwd) + .WithTimeout(TimeSpan.FromSeconds(30)) + .WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) + .Build()) .Build(); + #endregion + #region Topics - Client.ApplicationMessageReceivedHandler = - new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived); - Client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(_ => OnConnected()); - Client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(_ => OnDisconnectedAsync()); - try - { - await Client.ConnectAsync(_clientOptions); - } - catch (Exception ex) - { - _logger.LogError("MQTT CONNECTING FAILED", ex); - } - - _logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES"); - } - catch (Exception ex) - { - _logger.LogError("MQTT CONNECTING FAILED", ex); - } - } - - private async Task OnDisconnectedAsync() - { - try - { - await Client.ConnectAsync(_clientOptions); - } - catch (Exception ex) - { - _logger.LogError("MQTT CONNECTING FAILED", ex); - } - } - - private readonly string _tbRpcTopic = "v1/gateway/rpc"; - - private void OnConnected() - { - if (_systemConfig != null) + List subTopics = new(); switch (_systemConfig.IoTPlatformType) { case IoTPlatformType.ThingsBoard: //{"device": "Device A", "data": {"id": $request_id, "method": "toggle_gpio", "params": {"pin":1}}} - Client.SubscribeAsync(_tbRpcTopic, MqttQualityOfServiceLevel.ExactlyOnce); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic(_tbRpcTopic).WithExactlyOnceQoS().Build()); //Message: {"id": $request_id, "device": "Device A", "value": "value1"} - Client.SubscribeAsync("v1/gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributes/response").WithExactlyOnceQoS().Build()); //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} - Client.SubscribeAsync("v1/gateway/attributes", MqttQualityOfServiceLevel.ExactlyOnce); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("v1/gateway/attributese").WithExactlyOnceQoS().Build()); break; case IoTPlatformType.IoTSharp: - Client.SubscribeAsync("devices/+/rpc/request/+/+", MqttQualityOfServiceLevel.ExactlyOnce); - Client.SubscribeAsync("devices/+/attributes/update", MqttQualityOfServiceLevel.ExactlyOnce); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/rpc/request/+/+").WithExactlyOnceQoS().Build()); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/update").WithExactlyOnceQoS().Build()); //Message: {"device": "Device A", "data": {"attribute1": "value1", "attribute2": 42}} - Client.SubscribeAsync("devices/+/attributes/response/+", MqttQualityOfServiceLevel.ExactlyOnce); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("devices/+/attributes/response/+").WithExactlyOnceQoS().Build()); break; case IoTPlatformType.ThingsCloud: - Client.SubscribeAsync("gateway/attributes/response", MqttQualityOfServiceLevel.ExactlyOnce); - Client.SubscribeAsync("gateway/attributes/get/response", MqttQualityOfServiceLevel.ExactlyOnce); - Client.SubscribeAsync("gateway/attributes/push", MqttQualityOfServiceLevel.ExactlyOnce); - Client.SubscribeAsync("gateway/event/response", MqttQualityOfServiceLevel.ExactlyOnce); - Client.SubscribeAsync("gateway/command/send", MqttQualityOfServiceLevel.ExactlyOnce); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/response").WithExactlyOnceQoS().Build()); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/get/response").WithExactlyOnceQoS().Build()); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/attributes/push").WithExactlyOnceQoS().Build()); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/event/response").WithExactlyOnceQoS().Build()); + subTopics.Add(new MqttTopicFilterBuilder().WithTopic("gateway/command/send").WithExactlyOnceQoS().Build()); break; case IoTPlatformType.AliCloudIoT: break; @@ -142,11 +99,34 @@ namespace Plugin break; } - _logger.LogInformation($"MQTT CONNECTED WITH SERVER "); + #endregion + Client.ConnectedAsync += Client_ConnectedAsync; + Client.DisconnectedAsync += Client_DisconnectedAsync; + Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; + await Client.SubscribeAsync(subTopics); + await Client.StartAsync(_options); + + _logger.LogInformation("MQTT WAITING FOR APPLICATION MESSAGES"); + } + catch (Exception ex) + { + _logger.LogError($"StartManagedClientAsync FAILED, {ex}"); + } } + private Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg) + { + _logger.LogInformation($"MQTT CONNECTED WITH SERVER "); + return Task.CompletedTask; + } - private Task Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) + private Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) + { + _logger.LogError($"MQTT CONNECTING FAILED, {arg.ReasonString}"); + return Task.CompletedTask; + } + + private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e) { _logger.LogDebug( $"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} "); @@ -269,7 +249,7 @@ namespace Plugin private async Task ResponseTbRpcAsync(TBRpcResponse tBRpcResponse) { - await Client.PublishAsync(new MqttApplicationMessageBuilder() + await Client.EnqueueAsync(new MqttApplicationMessageBuilder() .WithTopic(_tbRpcTopic) .WithPayload(JsonConvert.SerializeObject(tBRpcResponse)) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); @@ -278,7 +258,7 @@ namespace Plugin private async Task ResponseTcRpcAsync(TCRpcRequest tCRpcResponse) { var topic = $"command/reply/{tCRpcResponse.RequestData.RequestId}"; - await Client.PublishAsync(new MqttApplicationMessageBuilder() + await Client.EnqueueAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(JsonConvert.SerializeObject(tCRpcResponse)) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); @@ -288,7 +268,7 @@ namespace Plugin { //var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}"; var topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method}/{rpcResult.ResponseId}"; - await Client.PublishAsync(new MqttApplicationMessageBuilder() + await Client.EnqueueAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(JsonConvert.SerializeObject(rpcResult)) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce).Build()); @@ -327,7 +307,7 @@ namespace Plugin try { if (Client.IsConnected) - return Client.PublishAsync(new MqttApplicationMessageBuilder() + return Client.EnqueueAsync(new MqttApplicationMessageBuilder() .WithTopic($"devices/{deviceName}/attributes").WithPayload(JsonConvert.SerializeObject(obj)) .Build()); } @@ -341,14 +321,14 @@ namespace Plugin public async Task UploadIsTelemetryDataAsync(string deviceName, object obj) { - await Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry") + await Client.EnqueueAsync(new MqttApplicationMessageBuilder().WithTopic($"devices/{deviceName}/telemetry") .WithPayload(JsonConvert.SerializeObject(obj)).Build()); } public async Task UploadTcTelemetryDataAsync(string deviceName, object obj) { var toSend = new Dictionary { { deviceName, obj } }; - await Client.PublishAsync("gateway/attributes", JsonConvert.SerializeObject(toSend)); + await Client.EnqueueAsync("gateway/attributes", JsonConvert.SerializeObject(toSend)); } public async Task UploadHwTelemetryDataAsync(Device device, object obj) @@ -374,7 +354,7 @@ namespace Plugin Devices = hwTelemetry }; - await Client.PublishAsync($"/v1/devices/{_systemConfig?.GatewayName}/datas", + await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/datas", JsonConvert.SerializeObject(hwTelemetrys)); } @@ -382,43 +362,42 @@ namespace Plugin { try { - if (_systemConfig != null) - switch (_systemConfig.IoTPlatformType) - { - case IoTPlatformType.ThingsBoard: - var tRpcResponse = new TBRpcResponse + switch (_systemConfig.IoTPlatformType) + { + case IoTPlatformType.ThingsBoard: + var tRpcResponse = new TBRpcResponse + { + DeviceName = rpcResponse.DeviceName, + RequestId = rpcResponse.RequestId, + ResponseData = new Dictionary + { { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } } + }; + await ResponseTbRpcAsync(tRpcResponse); + break; + case IoTPlatformType.IoTSharp: + await ResponseIsRpcAsync(new ISRpcResponse + { + DeviceId = rpcResponse.DeviceName, + Method = "Method", + ResponseId = rpcResponse.RequestId, + Data = JsonConvert.SerializeObject(new Dictionary { - DeviceName = rpcResponse.DeviceName, - RequestId = rpcResponse.RequestId, - ResponseData = new Dictionary - { { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } } - }; - await ResponseTbRpcAsync(tRpcResponse); - break; - case IoTPlatformType.IoTSharp: - await ResponseIsRpcAsync(new ISRpcResponse - { - DeviceId = rpcResponse.DeviceName, - Method = "Method", - ResponseId = rpcResponse.RequestId, - Data = JsonConvert.SerializeObject(new Dictionary - { - { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } - }) - }); - break; - case IoTPlatformType.ThingsCloud: - //官网API不需要回复的 - break; - case IoTPlatformType.AliCloudIoT: - break; - case IoTPlatformType.TencentIoTHub: - break; - case IoTPlatformType.BaiduIoTCore: - break; - case IoTPlatformType.OneNET: - break; - } + { "success", rpcResponse.IsSuccess }, { "description", rpcResponse.Description } + }) + }); + break; + case IoTPlatformType.ThingsCloud: + //官网API不需要回复的 + break; + case IoTPlatformType.AliCloudIoT: + break; + case IoTPlatformType.TencentIoTHub: + break; + case IoTPlatformType.BaiduIoTCore: + break; + case IoTPlatformType.OneNET: + break; + } } catch (Exception ex) { @@ -431,39 +410,38 @@ namespace Plugin try { string id = Guid.NewGuid().ToString(); - if (_systemConfig != null) - switch (_systemConfig.IoTPlatformType) - { - case IoTPlatformType.ThingsBoard: - //{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"} - Dictionary tbRequestData = new Dictionary - { - { "id", id }, - { "device", deviceName }, - { "client", true }, - { "key", args[0] } - }; - await Client.PublishAsync("v1/gateway/attributes/request", - JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce); - break; - case IoTPlatformType.IoTSharp: - string topic = $"devices/{deviceName}/attributes/request/{id}"; - Dictionary keys = new Dictionary(); - keys.Add(anySide ? "anySide" : "server", string.Join(",", args)); - await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}", - MqttQualityOfServiceLevel.ExactlyOnce); - await Client.PublishAsync(topic, JsonConvert.SerializeObject(keys), - MqttQualityOfServiceLevel.ExactlyOnce); - break; - case IoTPlatformType.AliCloudIoT: - break; - case IoTPlatformType.TencentIoTHub: - break; - case IoTPlatformType.BaiduIoTCore: - break; - case IoTPlatformType.OneNET: - break; - } + switch (_systemConfig.IoTPlatformType) + { + case IoTPlatformType.ThingsBoard: + //{"id": $request_id, "device": "Device A", "client": true, "key": "attribute1"} + Dictionary tbRequestData = new Dictionary + { + { "id", id }, + { "device", deviceName }, + { "client", true }, + { "key", args[0] } + }; + await Client.EnqueueAsync("v1/gateway/attributes/request", + JsonConvert.SerializeObject(tbRequestData), MqttQualityOfServiceLevel.ExactlyOnce); + break; + case IoTPlatformType.IoTSharp: + string topic = $"devices/{deviceName}/attributes/request/{id}"; + Dictionary keys = new Dictionary(); + keys.Add(anySide ? "anySide" : "server", string.Join(",", args)); + await Client.SubscribeAsync($"devices/{deviceName}/attributes/response/{id}", + MqttQualityOfServiceLevel.ExactlyOnce); + await Client.EnqueueAsync(topic, JsonConvert.SerializeObject(keys), + MqttQualityOfServiceLevel.ExactlyOnce); + break; + case IoTPlatformType.AliCloudIoT: + break; + case IoTPlatformType.TencentIoTHub: + break; + case IoTPlatformType.BaiduIoTCore: + break; + case IoTPlatformType.OneNET: + break; + } } catch (Exception ex) { @@ -526,64 +504,63 @@ namespace Plugin { if (CanPubTelemetry(device, sendModel)) { - if (_systemConfig != null) - switch (_systemConfig.IoTPlatformType) + switch (_systemConfig.IoTPlatformType) + { + case IoTPlatformType.ThingsBoard: + await Client.EnqueueAsync("v1/gateway/telemetry", + JsonConvert.SerializeObject(sendModel)); + break; + case IoTPlatformType.IoTSharp: + foreach (var payload in sendModel[device.DeviceName]) + { + if (payload.Values != null) + await UploadIsTelemetryDataAsync(device.DeviceName, payload.Values); + } + + break; + case IoTPlatformType.ThingsCloud: + foreach (var payload in sendModel[device.DeviceName]) + { + if (payload.Values != null) + await UploadTcTelemetryDataAsync(device.DeviceName, payload.Values); + } + + break; + case IoTPlatformType.IotDB: { - case IoTPlatformType.ThingsBoard: - await Client.PublishAsync("v1/gateway/telemetry", - JsonConvert.SerializeObject(sendModel)); - break; - case IoTPlatformType.IoTSharp: - foreach (var payload in sendModel[device.DeviceName]) + foreach (var payload in sendModel[device.DeviceName]) + { + if (payload.DeviceStatus != DeviceStatusTypeEnum.Good) + continue; + + IotTsData tsData = new IotTsData() { - if (payload.Values != null) - await UploadIsTelemetryDataAsync(device.DeviceName, payload.Values); - } + device = device.DeviceName, + timestamp = payload.TS, + measurements = payload.Values?.Keys.ToList(), + values = payload.Values?.Values.ToList() + }; + await Client.EnqueueAsync(device.DeviceName, JsonConvert.SerializeObject(tsData)); + } - break; - case IoTPlatformType.ThingsCloud: - foreach (var payload in sendModel[device.DeviceName]) - { - if (payload.Values != null) - await UploadTcTelemetryDataAsync(device.DeviceName, payload.Values); - } - - break; - case IoTPlatformType.IotDB: - { - foreach (var payload in sendModel[device.DeviceName]) - { - if (payload.DeviceStatus != DeviceStatusTypeEnum.Good) - continue; - - IotTsData tsData = new IotTsData() - { - device = device.DeviceName, - timestamp = payload.TS, - measurements = payload.Values?.Keys.ToList(), - values = payload.Values?.Values.ToList() - }; - await Client.PublishAsync(device.DeviceName, JsonConvert.SerializeObject(tsData)); - } - - break; - } - case IoTPlatformType.HuaWei: - foreach (var payload in sendModel[device.DeviceName]) - { - if (payload.Values != null) - await UploadHwTelemetryDataAsync(device, payload.Values); - } - - break; - - case IoTPlatformType.AliCloudIoT: - case IoTPlatformType.TencentIoTHub: - case IoTPlatformType.BaiduIoTCore: - case IoTPlatformType.OneNET: - default: - break; + break; } + case IoTPlatformType.HuaWei: + foreach (var payload in sendModel[device.DeviceName]) + { + if (payload.Values != null) + await UploadHwTelemetryDataAsync(device, payload.Values); + } + + break; + + case IoTPlatformType.AliCloudIoT: + case IoTPlatformType.TencentIoTHub: + case IoTPlatformType.BaiduIoTCore: + case IoTPlatformType.OneNET: + default: + break; + } } //foreach (var payload in sendModel[device.DeviceName]) @@ -599,7 +576,7 @@ namespace Plugin } catch (Exception ex) { - _logger.LogError($"PublishTelemetryAsync Error", ex); + _logger.LogError($"PublishTelemetryAsync Error:{ex}"); } } @@ -607,48 +584,47 @@ namespace Plugin { try { - if (_systemConfig != null) - switch (_systemConfig.IoTPlatformType) - { - case IoTPlatformType.ThingsBoard: - case IoTPlatformType.IoTSharp: - await Client.PublishAsync("v1/gateway/connect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); - break; - case IoTPlatformType.AliCloudIoT: - break; - case IoTPlatformType.TencentIoTHub: - break; - case IoTPlatformType.BaiduIoTCore: - break; - case IoTPlatformType.OneNET: - break; - case IoTPlatformType.ThingsCloud: - await Client.PublishAsync("gateway/connect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); - break; - case IoTPlatformType.HuaWei: - var deviceOnLine = new HwDeviceOnOffLine() + switch (_systemConfig.IoTPlatformType) + { + case IoTPlatformType.ThingsBoard: + case IoTPlatformType.IoTSharp: + await Client.EnqueueAsync("v1/gateway/connect", + JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })); + break; + case IoTPlatformType.AliCloudIoT: + break; + case IoTPlatformType.TencentIoTHub: + break; + case IoTPlatformType.BaiduIoTCore: + break; + case IoTPlatformType.OneNET: + break; + case IoTPlatformType.ThingsCloud: + await Client.EnqueueAsync("gateway/connect", + JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })); + break; + case IoTPlatformType.HuaWei: + var deviceOnLine = new HwDeviceOnOffLine() + { + MId = new Random().Next(0, 1024), //命令ID + DeviceStatuses = new List() { - MId = new Random().Next(0, 1024), //命令ID - DeviceStatuses = new List() + new DeviceStatus() { - new DeviceStatus() - { - DeviceId = device.DeviceConfigs - .FirstOrDefault(x => x.DeviceConfigName == "DeviceId") - ?.Value, - Status = "ONLINE" - } + DeviceId = device.DeviceConfigs + .FirstOrDefault(x => x.DeviceConfigName == "DeviceId") + ?.Value, + Status = "ONLINE" } - }; - await Client.PublishAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", - JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, - retain: false); - break; - } + } + }; + await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", + JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, + retain: false); + break; + } } catch (Exception ex) { @@ -660,48 +636,47 @@ namespace Plugin { try { - if (_systemConfig != null) - switch (_systemConfig.IoTPlatformType) - { - case IoTPlatformType.ThingsBoard: - case IoTPlatformType.IoTSharp: - await Client.PublishAsync("v1/gateway/disconnect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); - break; - case IoTPlatformType.AliCloudIoT: - break; - case IoTPlatformType.TencentIoTHub: - break; - case IoTPlatformType.BaiduIoTCore: - break; - case IoTPlatformType.OneNET: - break; - case IoTPlatformType.ThingsCloud: - await Client.PublishAsync("gateway/disconnect", - JsonConvert.SerializeObject(new Dictionary - { { "device", device.DeviceName } })); - break; - case IoTPlatformType.HuaWei: - var deviceOnLine = new HwDeviceOnOffLine() + switch (_systemConfig.IoTPlatformType) + { + case IoTPlatformType.ThingsBoard: + case IoTPlatformType.IoTSharp: + await Client.EnqueueAsync("v1/gateway/disconnect", + JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })); + break; + case IoTPlatformType.AliCloudIoT: + break; + case IoTPlatformType.TencentIoTHub: + break; + case IoTPlatformType.BaiduIoTCore: + break; + case IoTPlatformType.OneNET: + break; + case IoTPlatformType.ThingsCloud: + await Client.EnqueueAsync("gateway/disconnect", + JsonConvert.SerializeObject(new Dictionary + { { "device", device.DeviceName } })); + break; + case IoTPlatformType.HuaWei: + var deviceOnLine = new HwDeviceOnOffLine() + { + MId = new Random().Next(0, 1024), //命令ID + DeviceStatuses = new List() { - MId = new Random().Next(0, 1024), //命令ID - DeviceStatuses = new List() + new DeviceStatus() { - new DeviceStatus() - { - DeviceId = device.DeviceConfigs - .FirstOrDefault(x => x.DeviceConfigName == "DeviceId") - ?.Value, - Status = "OFFLINE" - } + DeviceId = device.DeviceConfigs + .FirstOrDefault(x => x.DeviceConfigName == "DeviceId") + ?.Value, + Status = "OFFLINE" } - }; - await Client.PublishAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", - JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, - retain: false); - break; - } + } + }; + await Client.EnqueueAsync($"/v1/devices/{_systemConfig.GatewayName}/topo/update", + JsonConvert.SerializeObject(deviceOnLine), MqttQualityOfServiceLevel.AtLeastOnce, + retain: false); + break; + } } catch (Exception ex) { @@ -714,31 +689,30 @@ namespace Plugin { try { - if (_systemConfig != null) - switch (_systemConfig.IoTPlatformType) - { - case IoTPlatformType.HuaWei: - var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/add"; + switch (_systemConfig.IoTPlatformType) + { + case IoTPlatformType.HuaWei: + var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/add"; - var addDeviceDto = new HwAddDeviceDto + var addDeviceDto = new HwAddDeviceDto + { + MId = new Random().Next(0, 1024), //命令ID + }; + addDeviceDto.DeviceInfos.Add( + new DeviceInfo { - MId = new Random().Next(0, 1024), //命令ID - }; - addDeviceDto.DeviceInfos.Add( - new DeviceInfo - { - NodeId = device.DeviceName, - Name = device.DeviceName, - Description = device.Description, - ManufacturerId = "Test_n", - ProductType = "A_n" - } - ); + NodeId = device.DeviceName, + Name = device.DeviceName, + Description = device.Description, + ManufacturerId = "Test_n", + ProductType = "A_n" + } + ); - await Client.PublishAsync(topic, - JsonConvert.SerializeObject(addDeviceDto)); - break; - } + await Client.EnqueueAsync(topic, + JsonConvert.SerializeObject(addDeviceDto)); + break; + } } catch (Exception ex) { @@ -751,35 +725,34 @@ namespace Plugin { try { - if (_systemConfig != null) - switch (_systemConfig.IoTPlatformType) - { - case IoTPlatformType.HuaWei: - var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/delete"; + switch (_systemConfig.IoTPlatformType) + { + case IoTPlatformType.HuaWei: + var topic = $"/v1/devices/{_systemConfig.GatewayName}/topo/delete"; - await using (var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType)) + await using (var dc = new DataContext(IoTBackgroundService.connnectSetting, IoTBackgroundService.DbType)) + { + var deviceId = dc.Set().FirstOrDefault(x => + x.DeviceId == device.ID && x.DeviceConfigName == "DeviceId")?.Value; + + var deleteDeviceDto = new HwDeleteDeviceDto { - var deviceId = dc.Set().FirstOrDefault(x => - x.DeviceId == device.ID && x.DeviceConfigName == "DeviceId")?.Value; - - var deleteDeviceDto = new HwDeleteDeviceDto + Id = new Random().Next(0, 1024), //命令ID + DeviceId = deviceId, + RequestTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds, + Request = new() { - Id = new Random().Next(0, 1024), //命令ID - DeviceId = deviceId, - RequestTime = (long)(DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds, - Request = new() - { - ManufacturerId = "Test_n", - ManufacturerName = "Test_n", - ProductType = "A_n" - } - }; + ManufacturerId = "Test_n", + ManufacturerName = "Test_n", + ProductType = "A_n" + } + }; - await Client.PublishAsync(topic, - JsonConvert.SerializeObject(deleteDeviceDto)); - } - break; - } + await Client.EnqueueAsync(topic, + JsonConvert.SerializeObject(deleteDeviceDto)); + } + break; + } } catch (Exception ex) { diff --git a/Plugins/Plugin/Plugin.csproj b/Plugins/Plugin/Plugin.csproj index 134114c..2e23a46 100644 --- a/Plugins/Plugin/Plugin.csproj +++ b/Plugins/Plugin/Plugin.csproj @@ -1,4 +1,4 @@ - + net6.0 @@ -9,8 +9,9 @@ - - + + +