并发编程模式(如生产者-消费者、任务分区、发布-订阅等)可以帮助我们更好地组织多线程代码,提高可维护性、性能和健壮性

张开发
2026/4/7 22:51:54 15 分钟阅读

分享文章

并发编程模式(如生产者-消费者、任务分区、发布-订阅等)可以帮助我们更好地组织多线程代码,提高可维护性、性能和健壮性
基于之前的线程同步优化代码,我将进一步引入并发编程模式,以更结构化和可扩展的方式优化加热控制逻辑。并发编程模式(如生产者-消费者、任务分区、发布-订阅等)可以帮助我们更好地组织多线程代码,提高可维护性、性能和健壮性。在加热控制场景中,适合的模式包括任务分区(为每个加热板分配独立任务)和发布-订阅(用于状态更新和错误通知)。优化后的代码(基于并发编程模式)using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; public class HeatController { // 从配置获取加热等待时间(默认6分钟),转换为毫秒并计算循环次数 private const int CHECK_INTERVAL_MS = 100; // 每次检查间隔100毫秒 private readonly ChannelHeatBoardStatus _statusChannel; // 发布-订阅通道 private readonly ConcurrentBagstring _errorMessages; // 线程安全错误收集 private readonly SemaphoreSlim _hardwareAccessLock; // 控制硬件访问 private readonly CancellationTokenSource _internalCts; // 内部取消令牌 private readonly int _maxConcurrency; // 最大并发度 public HeatController() { _statusChannel = Channel.CreateBoundedHeatBoardStatus(new BoundedChannelOptions(100)); _errorMessages = new ConcurrentBagstring(); _hardwareAccessLock = new SemaphoreSlim(1, 1); _internalCts = new CancellationTokenSource(); _maxConcurrency = Environment.ProcessorCount / 2; // 限制并发线程 } public async Task HeatControlAsync(CancellationToken externalCancellationToken = default) { using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(externalCancellationToken, _internalCts.Token); var cancellationToken = linkedCts.Token; int heatWaitTimeMinutes = ConfigHelper.GetAppSetting("HeatWaitTime", 6); int maxWaitIterations = (heatWaitTimeMinutes * 60 * 1000) / CHECK_INTERVAL_MS; if (!double.TryParse(ParameterMap["Tolerance"].Value, out double tolerance) || !double.TryParse(SetTem, out double targetTemp)) { SystemNotifEvent.HardErrorNotifEvent(this, $"{Name}: 参数解析失败。", ""); return; } // 启动状态监听任务 var statusListenerTask = ListenForStatusAsync(cancellationToken); // 任务分区:为每个加热板分配检查任务 var checkTasks = StartTemperatureChecksAsync(tolerance, targetTemp, cancellationToken); while (maxWaitIterations 0 !IsHeatingComplete() HardwareConnectStatus == HardwareConnectStatus.InUsing) { cancellationToken.ThrowIfCancellationRequested(); if (_errorMessages.TryTake(out var error)) { SystemNotifEvent.HardErrorNotifEvent(this, $"{Name}: {error}", ""); _internalCts.Cancel(); // 触发内部取消 break; } maxWaitIterations--; await Task.Delay(CHECK_INTERVAL_MS, cancellationToken); } // 清理:取消任务并等待完成 _internalCts.Cancel(); await Task.WhenAll(checkTasks.Concat(new[] { statusListenerTask }).ToArray()); if (maxWaitIterations = 0) { SystemNotifEvent.HardErrorNotifEvent(this, $"{Name}: 加热温度异常,超时失败。", ""); } } private async Task StartTemperatureChecksAsync(double tolerance, double targetTemp, CancellationToken cancellationToken) { int length = Math.Min(intArr.Length, HeatBoardRealTimeState.TempValue.Length); var activeBoards = Enumerable.Range(0, length).Where(i = intArr[i] == 1).ToList(); if (!activeBoards.Any()) { await _statusChannel.Writer.WriteAsync(new HeatBoardStatus { IsComplete = true }, cancellationToken); return; } using var semaphore = new SemaphoreSlim(_maxConcurrency); // 任务分区:为每个加热板分配独立任务 var tasks = activeBoards.Select(async index = { while (!cancellationToken.IsCancellationRequested) { await semaphore.WaitAsync(cancellationToken); try { await _hardwareAccessLock.WaitAsync(cancellationToken); double currentTemp; try { currentTemp = HeatBoardRealTimeState.TempValue[index]; } finally { _hardwareAccessLock.Release(); }

更多文章