mirror of
https://gitee.com/iioter/iotgateway.git
synced 2024-11-29 18:28:09 +08:00
优化采集线程
This commit is contained in:
parent
080994aab8
commit
78c1314627
Binary file not shown.
@ -13,6 +13,7 @@ namespace Plugin
|
||||
{
|
||||
public class DeviceThread : IDisposable
|
||||
{
|
||||
private readonly MqttServer _mqttServer;
|
||||
private readonly ILogger _logger;
|
||||
public readonly Device Device;
|
||||
public readonly IDriver Driver;
|
||||
@ -35,6 +36,7 @@ namespace Plugin
|
||||
_projectId = projectId;
|
||||
_interpreter = new Interpreter();
|
||||
_logger = logger;
|
||||
_mqttServer = mqttServer;
|
||||
Methods = Driver.GetType().GetMethods().Where(x => x.GetCustomAttribute(typeof(MethodAttribute)) != null)
|
||||
.ToList();
|
||||
if (Device.AutoStart)
|
||||
@ -44,158 +46,178 @@ namespace Plugin
|
||||
if (Device.DeviceVariables != null)
|
||||
{
|
||||
foreach (var item in Device.DeviceVariables)
|
||||
{
|
||||
item.StatusType = VaribaleStatusTypeEnum.Bad;
|
||||
if (string.IsNullOrWhiteSpace(item.Alias))
|
||||
item.Alias = string.Empty;
|
||||
}
|
||||
}
|
||||
CreateThread().Wait();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task CreateThread()
|
||||
{
|
||||
_task = await Task.Factory.StartNew(async () =>
|
||||
{
|
||||
await Task.Delay(5000);
|
||||
//上传客户端属性
|
||||
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
|
||||
{
|
||||
_myMqttClient.UploadAttributeAsync(string.IsNullOrWhiteSpace(deviceVariables.Key)
|
||||
? Device.DeviceName
|
||||
: deviceVariables.Key,
|
||||
Device.DeviceConfigs.Where(x => x.DataSide == DataSide.ClientSide || x.DataSide == DataSide.AnySide)
|
||||
.ToDictionary(x => x.DeviceConfigName, x => x.Value));
|
||||
}
|
||||
|
||||
_task = Task.Factory.StartNew(() =>
|
||||
while (true)
|
||||
{
|
||||
Thread.Sleep(5000);
|
||||
//上传客户端属性
|
||||
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
|
||||
if (_tokenSource.IsCancellationRequested)
|
||||
{
|
||||
myMqttClient.UploadAttributeAsync(string.IsNullOrWhiteSpace(deviceVariables.Key)
|
||||
? Device.DeviceName
|
||||
: deviceVariables.Key,
|
||||
device.DeviceConfigs.Where(x => x.DataSide == DataSide.ClientSide || x.DataSide == DataSide.AnySide)
|
||||
.ToDictionary(x => x.DeviceConfigName, x => x.Value));
|
||||
_logger.LogInformation($"停止线程:{Device.DeviceName}");
|
||||
return;
|
||||
}
|
||||
|
||||
while (true)
|
||||
resetEvent.WaitOne();
|
||||
try
|
||||
{
|
||||
if (_tokenSource.IsCancellationRequested)
|
||||
if (Driver.IsConnected)
|
||||
{
|
||||
_logger.LogInformation($"停止线程:{Device.DeviceName}");
|
||||
return;
|
||||
}
|
||||
|
||||
resetEvent.WaitOne();
|
||||
try
|
||||
{
|
||||
if (driver.IsConnected)
|
||||
foreach (var deviceVariables in Device.DeviceVariables.Where(x => x.ProtectType != ProtectTypeEnum.WriteOnly).GroupBy(x => x.Alias))
|
||||
{
|
||||
foreach (var deviceVariables in Device.DeviceVariables.Where(x => x.ProtectType != ProtectTypeEnum.WriteOnly).GroupBy(x => x.Alias))
|
||||
{
|
||||
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
|
||||
? Device.DeviceName
|
||||
: deviceVariables.Key;
|
||||
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
|
||||
? Device.DeviceName
|
||||
: deviceVariables.Key;
|
||||
|
||||
Dictionary<string, List<PayLoad>> sendModel = new()
|
||||
Dictionary<string, List<PayLoad>> sendModel = new()
|
||||
{ { deviceName, new() } };
|
||||
var payLoad = new PayLoad() { Values = new() };
|
||||
var payLoad = new PayLoad() { Values = new() };
|
||||
|
||||
if (deviceVariables.Any())
|
||||
if (deviceVariables.Any())
|
||||
{
|
||||
foreach (var item in deviceVariables.OrderBy(x => x.Index))
|
||||
{
|
||||
foreach (var item in deviceVariables.OrderBy(x => x.Index))
|
||||
item.Value = null;
|
||||
item.CookedValue = null;
|
||||
item.StatusType = VaribaleStatusTypeEnum.Bad;
|
||||
|
||||
await Task.Delay((int)Device.CmdPeriod);
|
||||
|
||||
var ret = new DriverReturnValueModel();
|
||||
var ioarg = new DriverAddressIoArgModel
|
||||
{
|
||||
item.Value = null;
|
||||
item.CookedValue = null;
|
||||
item.StatusType = VaribaleStatusTypeEnum.Bad;
|
||||
ID = item.ID,
|
||||
Address = item.DeviceAddress,
|
||||
ValueType = item.DataType,
|
||||
EndianType = item.EndianType
|
||||
};
|
||||
var method = Methods.FirstOrDefault(x => x.Name == item.Method);
|
||||
if (method == null)
|
||||
ret.StatusType = VaribaleStatusTypeEnum.MethodError;
|
||||
else
|
||||
ret = (DriverReturnValueModel)method.Invoke(Driver,
|
||||
new object[] { ioarg })!;
|
||||
|
||||
Thread.Sleep((int)Device.CmdPeriod);
|
||||
item.EnqueueVariable(ret.Value);
|
||||
if (ret.StatusType == VaribaleStatusTypeEnum.Good &&
|
||||
!string.IsNullOrWhiteSpace(item.Expressions?.Trim()))
|
||||
{
|
||||
var expressionText = DealMysqlStr(item.Expressions)
|
||||
.Replace("raw",
|
||||
item.Values[0] is bool
|
||||
? $"Convert.ToBoolean(\"{item.Values[0]}\")"
|
||||
: item.Values[0]?.ToString())
|
||||
.Replace("$ppv",
|
||||
item.Values[2] is bool
|
||||
? $"Convert.ToBoolean(\"{item.Values[2]}\")"
|
||||
: item.Values[2]?.ToString())
|
||||
.Replace("$pv",
|
||||
item.Values[1] is bool
|
||||
? $"Convert.ToBoolean(\"{item.Values[1]}\")"
|
||||
: item.Values[1]?.ToString());
|
||||
|
||||
var ret = new DriverReturnValueModel();
|
||||
var ioarg = new DriverAddressIoArgModel
|
||||
try
|
||||
{
|
||||
ID = item.ID,
|
||||
Address = item.DeviceAddress,
|
||||
ValueType = item.DataType,
|
||||
EndianType = item.EndianType
|
||||
};
|
||||
var method = Methods.FirstOrDefault(x => x.Name == item.Method);
|
||||
if (method == null)
|
||||
ret.StatusType = VaribaleStatusTypeEnum.MethodError;
|
||||
else
|
||||
ret = (DriverReturnValueModel)method.Invoke(Driver,
|
||||
new object[] { ioarg })!;
|
||||
|
||||
item.EnqueueVariable(ret.Value);
|
||||
if (ret.StatusType == VaribaleStatusTypeEnum.Good &&
|
||||
!string.IsNullOrWhiteSpace(item.Expressions?.Trim()))
|
||||
{
|
||||
var expressionText = DealMysqlStr(item.Expressions)
|
||||
.Replace("raw",
|
||||
item.Values[0] is bool
|
||||
? $"Convert.ToBoolean(\"{item.Values[0]}\")"
|
||||
: item.Values[0]?.ToString())
|
||||
.Replace("$ppv",
|
||||
item.Values[2] is bool
|
||||
? $"Convert.ToBoolean(\"{item.Values[2]}\")"
|
||||
: item.Values[2]?.ToString())
|
||||
.Replace("$pv",
|
||||
item.Values[1] is bool
|
||||
? $"Convert.ToBoolean(\"{item.Values[1]}\")"
|
||||
: item.Values[1]?.ToString());
|
||||
|
||||
try
|
||||
{
|
||||
ret.CookedValue = _interpreter.Eval(expressionText);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
ret.StatusType = VaribaleStatusTypeEnum.ExpressionError;
|
||||
}
|
||||
ret.CookedValue = _interpreter.Eval(expressionText);
|
||||
}
|
||||
else
|
||||
ret.CookedValue = ret.Value;
|
||||
|
||||
|
||||
if (item.IsUpload)
|
||||
payLoad.Values[item.Name] = ret.CookedValue;
|
||||
|
||||
ret.VarId = item.ID;
|
||||
|
||||
//变化了才推送到mqttserver,用于前端展示
|
||||
if ((item.Values[1] == null && item.Values[0] != null) ||
|
||||
(item.Values[1] != null && item.Values[0] != null && item.Values[1].ToString() != item.Values[0].ToString()))
|
||||
catch (Exception)
|
||||
{
|
||||
//这是设备变量列表要用的
|
||||
var msgInternal = new InjectedMqttApplicationMessage(
|
||||
new MqttApplicationMessage()
|
||||
{
|
||||
Topic = $"internal/v1/gateway/telemetry/{deviceName}/{item.Name}",
|
||||
Payload = Encoding.UTF8.GetBytes(JsonUtility.SerializeToJson(ret))
|
||||
});
|
||||
mqttServer.InjectApplicationMessage(msgInternal);
|
||||
//这是在线组态要用的
|
||||
var msgConfigure = new InjectedMqttApplicationMessage(
|
||||
new MqttApplicationMessage()
|
||||
{
|
||||
Topic = $"v1/gateway/telemetry/{deviceName}/{item.Name}",
|
||||
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(ret.CookedValue))
|
||||
});
|
||||
mqttServer.InjectApplicationMessage(msgConfigure);
|
||||
ret.StatusType = VaribaleStatusTypeEnum.ExpressionError;
|
||||
}
|
||||
}
|
||||
else
|
||||
ret.CookedValue = ret.Value;
|
||||
|
||||
item.Value = ret.Value;
|
||||
item.CookedValue = ret.CookedValue;
|
||||
item.Timestamp = ret.Timestamp;
|
||||
item.StatusType = ret.StatusType;
|
||||
|
||||
if (item.IsUpload)
|
||||
payLoad.Values[item.Name] = ret.CookedValue;
|
||||
|
||||
ret.VarId = item.ID;
|
||||
|
||||
//变化了才推送到mqttserver,用于前端展示
|
||||
if ((item.Values[1] == null && item.Values[0] != null) ||
|
||||
(item.Values[1] != null && item.Values[0] != null && JsonConvert.SerializeObject(item.Values[1]) != JsonConvert.SerializeObject(item.Values[0])))
|
||||
{
|
||||
//这是设备变量列表要用的
|
||||
var msgInternal = new InjectedMqttApplicationMessage(
|
||||
new MqttApplicationMessage()
|
||||
{
|
||||
Topic = $"internal/v1/gateway/telemetry/{deviceName}/{item.Name}",
|
||||
Payload = Encoding.UTF8.GetBytes(JsonUtility.SerializeToJson(ret))
|
||||
});
|
||||
_mqttServer.InjectApplicationMessage(msgInternal);
|
||||
//这是在线组态要用的
|
||||
var msgConfigure = new InjectedMqttApplicationMessage(
|
||||
new MqttApplicationMessage()
|
||||
{
|
||||
Topic = $"v1/gateway/telemetry/{deviceName}/{item.Name}",
|
||||
Payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(ret.CookedValue))
|
||||
});
|
||||
_mqttServer.InjectApplicationMessage(msgConfigure);
|
||||
}
|
||||
|
||||
payLoad.TS = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds;
|
||||
|
||||
if (deviceVariables.Where(x => x.IsUpload && x.ProtectType != ProtectTypeEnum.WriteOnly).All(x => x.StatusType == VaribaleStatusTypeEnum.Good))
|
||||
{
|
||||
payLoad.DeviceStatus = DeviceStatusTypeEnum.Good;
|
||||
sendModel[deviceName] = new List<PayLoad> { payLoad };
|
||||
myMqttClient
|
||||
.PublishTelemetryAsync(deviceName,
|
||||
Device, sendModel).Wait();
|
||||
}
|
||||
else if (deviceVariables.Any(x => x.StatusType == VaribaleStatusTypeEnum.Bad))
|
||||
_myMqttClient?.DeviceDisconnected(deviceName, Device);
|
||||
item.Value = ret.Value;
|
||||
item.CookedValue = ret.CookedValue;
|
||||
item.Timestamp = ret.Timestamp;
|
||||
item.StatusType = ret.StatusType;
|
||||
}
|
||||
|
||||
payLoad.TS = (long)(DateTime.UtcNow - _tsStartDt).TotalMilliseconds;
|
||||
|
||||
if (deviceVariables.Where(x => x.IsUpload && x.ProtectType != ProtectTypeEnum.WriteOnly).All(x => x.StatusType == VaribaleStatusTypeEnum.Good))
|
||||
{
|
||||
payLoad.DeviceStatus = DeviceStatusTypeEnum.Good;
|
||||
sendModel[deviceName] = new List<PayLoad> { payLoad };
|
||||
_myMqttClient
|
||||
.PublishTelemetryAsync(deviceName,
|
||||
Device, sendModel).Wait();
|
||||
}
|
||||
else if (deviceVariables.Any(x => x.StatusType == VaribaleStatusTypeEnum.Bad))
|
||||
_myMqttClient?.DeviceDisconnected(deviceName, Device);
|
||||
}
|
||||
|
||||
//只要有读取异常且连接正常就断开
|
||||
if (Device.DeviceVariables.Where(x => x.IsUpload && x.ProtectType != ProtectTypeEnum.WriteOnly).Any(x => x.StatusType != VaribaleStatusTypeEnum.Good) && driver.IsConnected)
|
||||
{
|
||||
driver.Close();
|
||||
driver.Dispose();
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
//只要有读取异常且连接正常就断开
|
||||
if (Device.DeviceVariables.Where(x => x.IsUpload && x.ProtectType != ProtectTypeEnum.WriteOnly).Any(x => x.StatusType != VaribaleStatusTypeEnum.Good) && Driver.IsConnected)
|
||||
{
|
||||
Driver.Close();
|
||||
Driver.Dispose();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
|
||||
{
|
||||
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
|
||||
? Device.DeviceName
|
||||
: deviceVariables.Key;
|
||||
|
||||
_myMqttClient?.DeviceDisconnected(deviceName, Device);
|
||||
}
|
||||
|
||||
if (Driver.Connect())
|
||||
{
|
||||
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
|
||||
{
|
||||
@ -203,36 +225,21 @@ namespace Plugin
|
||||
? Device.DeviceName
|
||||
: deviceVariables.Key;
|
||||
|
||||
_myMqttClient?.DeviceDisconnected(deviceName, Device);
|
||||
}
|
||||
|
||||
if (driver.Connect())
|
||||
{
|
||||
foreach (var deviceVariables in Device.DeviceVariables!.GroupBy(x => x.Alias))
|
||||
{
|
||||
string deviceName = string.IsNullOrWhiteSpace(deviceVariables.Key)
|
||||
? Device.DeviceName
|
||||
: deviceVariables.Key;
|
||||
|
||||
_myMqttClient?.DeviceConnected(deviceName, Device);
|
||||
}
|
||||
_myMqttClient?.DeviceConnected(deviceName, Device);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, $"线程循环异常,{Device.DeviceName}");
|
||||
}
|
||||
|
||||
|
||||
Thread.Sleep(Device.DeviceVariables!.Any() ? (int)Driver.MinPeriod : 10000);
|
||||
}
|
||||
}, TaskCreationOptions.LongRunning);
|
||||
}
|
||||
//else
|
||||
//_myMqttClient?.DeviceDisconnected(Device);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, $"线程循环异常,{Device.DeviceName}");
|
||||
}
|
||||
|
||||
|
||||
await Task.Delay(Device.DeviceVariables!.Any() ? (int)Driver.MinPeriod : 10000);
|
||||
}
|
||||
}, TaskCreationOptions.LongRunning);
|
||||
}
|
||||
public void MyMqttClient_OnExcRpc(object? sender, RpcRequest e)
|
||||
{
|
||||
//设备名或者设备别名
|
||||
@ -249,7 +256,7 @@ namespace Plugin
|
||||
|
||||
_logger.LogInformation($"{e.DeviceName}收到RPC,{e}");
|
||||
RpcResponse rpcResponse = new()
|
||||
{ DeviceName = e.DeviceName, RequestId = e.RequestId, IsSuccess = false, Method = e.Method };
|
||||
{ DeviceName = e.DeviceName, RequestId = e.RequestId, IsSuccess = false, Method = e.Method };
|
||||
//执行写入变量RPC
|
||||
if (e.Method.ToLower() == "write")
|
||||
{
|
||||
@ -333,7 +340,7 @@ namespace Plugin
|
||||
public void StopThread()
|
||||
{
|
||||
_logger.LogInformation($"线程停止:{Device.DeviceName}");
|
||||
if (Device.DeviceVariables!=null&& Device.DeviceVariables.Any())
|
||||
if (Device.DeviceVariables != null && Device.DeviceVariables.Any())
|
||||
{
|
||||
foreach (var deviceVariables in Device.DeviceVariables.GroupBy(x => x.Alias))
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user