聚焦生产者-消费者模式和并行任务调度,并提供更简洁的注释和每项改进的详细解释
代码基于你的原始场景(WinForms 应用中定期更新 UI 显示 sectionS 测试结果),使用 Task 和 async/await,结合 ConcurrentQueue 实现生产者-消费者模式,Parallel.ForEach 实现并行任务调度,确保线程安全、高效且现代化,避免卡顿、死循环和资源泄漏。适合处理独立任务或大数据量集合(如 sectionS 的状态更新)。基于你的需求,我将进
基于你的需求,我将进一步优化代码,聚焦生产者-消费者模式和并行任务调度,并提供更简洁的注释和每项改进的详细解释。
代码基于你的原始场景(WinForms 应用中定期更新 UI 显示 sectionS 测试结果),使用 Task 和 async/await,结合 ConcurrentQueue 实现生产者-消费者模式,Parallel.ForEach 实现并行任务调度,确保线程安全、高效且现代化,避免卡顿、死循环和资源泄漏。
以下是完整代码示例和详细分析。
生产者-消费者模式定义:
生产者-消费者模式是一种并发设计模式,生产者生成数据并放入线程安全的缓冲区(如 ConcurrentQueue),消费者从中提取数据进行处理。
生产者和消费者异步运行,解耦逻辑,适合你的场景中状态更新(生产者)和 UI 渲染(消费者)。
在你的场景中的应用:
- 生产者:定期更新 sectionS 的测试状态,推送到 ConcurrentQueue。
- 消费者:从队列读取状态,更新 UI 的 Label 控件。
- 缓冲区:ConcurrentQueue 存储状态更新,确保线程安全。
优点:
- 解耦状态更新和 UI 渲染,降低线程竞争。
- 异步处理提高吞吐量,适合实时数据流。
- 支持取消和错误处理,增强健壮性。
并行任务调度定义:
并行任务调度通过 Parallel.ForEach 或 Task 并行执行任务,利用多核 CPU 提高性能。适合处理独立任务或大数据量集合(如 sectionS 的状态更新)。
在你的场景中的应用:
- 使用 Parallel.ForEach 并行更新 sectionS 中的状态。
- 使用 Task.WhenAll 并发运行生产者和消费者任务。
优点:
- 充分利用多核 CPU,加速大数据量处理。
- 简化并发任务管理,保持代码清晰。
异步编程最佳实践(简要回顾)
- 异步优先:使用 async/await 和 Task,避免阻塞。
- 取消支持:通过 CancellationToken 终止任务。
- 异常处理:捕获异常,记录日志,传播错误。
- UI 优化:缓存控件,批量更新,减少卡顿。
- 资源管理:释放 GDI 资源(如 Label、Font)。
- 线程安全:使用 ConcurrentDictionary 和 ConcurrentQueue。
完整优化代码(带简洁注释)以下代码实现生产者-消费者模式和并行任务调度,使用 ConcurrentQueue 和 Parallel.ForEach,注释简洁但覆盖关键逻辑。csharp
using System;
using System.Collections.Concurrent;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
// 假设的日志接口,需替换为实际实现(如 NLog)
public interface ILogger
{
void Info(string message);
void Error(string message);
}
public partial class YourForm : Form
{
// 线程安全的键值对,存储测试状态
private readonly ConcurrentDictionary<string, string> sectionS = new ConcurrentDictionary<string, string>();
// 线程安全的队列,传递状态更新
private readonly ConcurrentQueue<(string Key, string Value)> statusQueue = new ConcurrentQueue<(string, string)>();
// 缓存 Label 控件,避免重复创建
private readonly Dictionary<string, Label> labelCache = new Dictionary<string, Label>();
// 静态字体,缓存 GDI 资源
private static readonly Font LabelFont = new Font("上首漠云体", 25);
// 线程安全标志,控制任务退出
private volatile bool m_Dis;
// UI 布局参数
private int _LX = 10, _LY = 25, _YSpace = 10;
// 时间基准
private DateTime now = DateTime.Now;
// 日志记录器
private readonly ILogger m_Logger;
public YourForm(ILogger logger)
{
m_Logger = logger ?? throw new ArgumentNullException(nameof(logger));
InitializeComponent(); // 初始化 WinForms 控件
}
// 启动异步任务,支持取消
public async Task StartTestResultAsync(CancellationToken ct = default)
{
try
{
m_Logger.Info("Starting test result pipeline");
// 并发运行生产者和消费者
await Task.WhenAll(
ProduceStatusUpdatesAsync(ct),
ConsumeStatusUpdatesAsync(ct)
);
m_Logger.Info("Pipeline completed");
}
catch (OperationCanceledException)
{
m_Logger.Info("Pipeline cancelled");
}
catch (Exception ex)
{
m_Logger.Error($"StartTestResultAsync failed: {ex.Message}");
throw;
}
}
// 生产者:更新状态并入队
private async Task ProduceStatusUpdatesAsync(CancellationToken ct)
{
try
{
m_Logger.Info("Producer started");
var startTime = DateTime.Now;
while (!m_Dis && !ct.IsCancellationRequested &&
(DateTime.Now - startTime).TotalSeconds < 300) // 5 分钟超时
{
UpdateSectionStatus(); // 并行更新状态
foreach (var section in sectionS)
statusQueue.Enqueue((section.Key, section.Value)); // 入队状态
await Task.Delay(3000, ct); // 异步延迟 3 秒
}
}
catch (OperationCanceledException)
{
m_Logger.Info("Producer cancelled");
throw;
}
catch (Exception ex)
{
m_Logger.Error($"Producer failed: {ex.Message}");
throw;
}
}
// 消费者:从队列读取状态,更新 UI
private async Task ConsumeStatusUpdatesAsync(CancellationToken ct)
{
try
{
m_Logger.Info("Consumer started");
var startTime = DateTime.Now;
while (!m_Dis && !ct.IsCancellationRequested &&
(DateTime.Now - startTime).TotalSeconds < 300)
{
while (statusQueue.TryDequeue(out var status))
{
if (InvokeRequired)
await Task.Run(() => Invoke((MethodInvoker)(() => UpdateUI(status))), ct);
else
UpdateUI(status);
}
await Task.Delay(100, ct); // 异步检查队列
}
}
catch (OperationCanceledException)
{
m_Logger.Info("Consumer cancelled");
throw;
}
catch (Exception ex)
{
m_Logger.Error($"Consumer failed: {ex.Message}");
throw;
}
}
// 并行更新 sectionS 状态
private void UpdateSectionStatus()
{
try
{
Parallel.ForEach(sectionS.Keys, key =>
{
sectionS.AddOrUpdate(key, key, (k, oldValue) =>
{
if (oldValue.Contains("PASS-OVER") || oldValue.Contains("FAIL-OVER") ||
oldValue.Contains("准备中") || oldValue.Contains("下压中"))
return oldValue;
if (DateTime.Now > now.AddSeconds(8) && DateTime.Now < now.AddSeconds(20))
return "NG模式FAIL";
if (DateTime.Now > now.AddSeconds(20) && DateTime.Now < now.AddSeconds(30))
return "测试中";
if (DateTime.Now > now.AddSeconds(30))
return "OK模式PASS";
return oldValue;
});
});
}
catch (Exception ex)
{
m_Logger.Error($"UpdateSectionStatus failed: {ex.Message}");
}
}
// 更新 UI,显示状态
private void UpdateUI((string Key, string Value) status)
{
try
{
var (key, value) = status;
string str = GetStatusString(key, value);
// 获取或创建 Label
if (!labelCache.TryGetValue(key, out var label))
{
label = new Label
{
Tag = key,
AutoSize = true,
Font = LabelFont,
Location = new Point(_LX, _LY)
};
panel5.Controls.Add(label);
labelCache[key] = label;
}
// 更新 Label 属性
label.Text = str;
label.ForeColor = value.Contains("PASS") ? Color.Green :
value.Contains("FAIL") ? Color.Red : Color.White;
label.Location = new Point(_LX, _LY);
_LY += _YSpace + label.Size.Height;
// 清理无用 Label
foreach (var oldKey in labelCache.Keys.Except(sectionS.Keys).ToList())
{
if (labelCache.TryGetValue(oldKey, out var oldLabel))
{
panel5.Controls.Remove(oldLabel);
oldLabel.Dispose();
labelCache.Remove(oldKey);
}
}
}
catch (Exception ex)
{
m_Logger.Error($"UpdateUI failed for {status.Key}: {ex.Message}");
}
}
// 生成状态字符串
private string GetStatusString(string key, string value)
{
var sb = new StringBuilder(key).Append(':');
if (value.Contains("PASS-OVER"))
sb.Append("点检结果PASS");
else if (value.Contains("FAIL-OVER"))
sb.Append("点检结果FAIL");
else if (value.Contains("准备中") || value.Contains("下压中"))
sb.Append(value);
else
sb.Append(value.Contains("FAIL") ? value : value.Contains("PASS") ? value : "测试中");
return sb.ToString();
}
// 示例:启动和取消任务
private async Task ExampleUsage()
{
using var cts = new CancellationTokenSource();
try
{
var task = StartTestResultAsync(cts.Token);
await Task.Delay(10000, cts.Token); // 10 秒后取消
cts.Cancel();
await task;
}
catch (OperationCanceledException)
{
m_Logger.Info("Task cancelled");
}
catch (Exception ex)
{
m_Logger.Error($"ExampleUsage failed: {ex.Message}");
}
}
}
每项详细解释
1. 生产者-消费者模式
- 代码:csharp
csharpawait Task.WhenAll(ProduceStatusUpdatesAsync(ct), ConsumeStatusUpdatesAsync(ct));
csharpprivate async Task ProduceStatusUpdatesAsync(CancellationToken ct) { while (!m_Dis && !ct.IsCancellationRequested && ...) { UpdateSectionStatus(); foreach (var section in sectionS) statusQueue.Enqueue((section.Key, section.Value)); await Task.Delay(3000, ct); } }private async Task ConsumeStatusUpdatesAsync(CancellationToken ct) { while (!m_Dis && !ct.IsCancellationRequested && ...) { while (statusQueue.TryDequeue(out var status)) { if (InvokeRequired) await Task.Run(() => Invoke((MethodInvoker)(() => UpdateUI(status))), ct); else UpdateUI(status); } await Task.Delay(100, ct); } } - 实现:
- 生产者:每 3 秒更新 sectionS 状态,推入 ConcurrentQueue。
- 消费者:每 100ms 检查队列,提取状态更新 UI。
- 队列:ConcurrentQueue 作为线程安全的缓冲区,解耦生产和消费。
- 解释:
- 生产者生成状态数据,消费者异步处理,队列确保数据顺序传递。
- Task.WhenAll 并发运行两者,互不阻塞。
- 优点:
- 解耦逻辑,生产者无需等待 UI 渲染。
- 异步处理支持动态数据流,适合实时更新。
- ConcurrentQueue 无锁设计,性能高效。
2. 并row任务调度
- 代码:csharp
Parallel.ForEach(sectionS.Keys, key => { sectionS.AddOrUpdate(key, key, (k, oldValue) => { /* 状态逻辑 */ }); }); - 实现:
- 使用 Parallel.ForEach 并行更新 sectionS 键值对。
- Task.WhenAll 并发运行生产者和消费者任务。
- 解释:
- Parallel.ForEach 将状态更新分配到多核 CPU,加速处理。
- Task.WhenAll 确保生产者和消费者并发执行,优化资源利用。
- 优点:
- 充分利用多核 CPU,适合大数据量(如 sectionS 含数百项)。
- 简化并发管理,代码清晰。
3. 异步编程最佳实践
- 异步优先:
- code:async Task StartTestResultAsync, await Task.Delay.
- 解释:Task 调度到线程池,await 非阻塞,优化响应性。
- 优点:简化线程管理,自动回收资源。
- 取消支持:
- code:CancellationToken ct, await Task.Delay(3000, ct).
- 解释:CancellationToken 允许外部取消,Task.Delay 检查取消状态。
- 优点:优雅终止任务,适合用户交互。
- 异常处理:
- code:csharp
catch (OperationCanceledException) { m_Logger.Info("Cancelled"); throw; } catch (Exception ex) { m_Logger.Error($"Failed: {ex.Message}"); throw; } - 解释:捕获取消和一般异常,记录日志并传播。
- 优点:防止无声失败,便于调试。
- code:csharp
- UI 优化:
- code:csharp
if (!labelCache.TryGetValue(key, out var label)) { /* 创建 Label */ } label.Text = str; label.ForeColor = ...; - 解释:缓存 Label,只更新必要属性,减少重建。
- 优点:降低 UI 线程负担,防止卡顿。
- code:csharp
- 资源管理:
- code:static readonly Font LabelFont, oldLabel.Dispose().
- 解释:静态字体缓存 GDI 资源,Dispose 释放控件。
- 优点:防止句柄泄漏,适合长时间运行。
- 线程安全:
- code:ConcurrentDictionary, ConcurrentQueue, volatile bool m_Dis.
- 解释:Concurrent* 提供无锁线程安全,volatile 确保标志同步。
- 优点:避免数据竞争和死循环。
4. 简洁注释的作用
- code:如 // 线程安全的队列,传递状态更新.
- 解释:注释简洁但覆盖变量用途、方法功能和关键逻辑。
- 优点:提高可读性,方便维护和协作。
优点总结
- 生产者-消费者:ConcurrentQueue 解耦状态更新和 UI 渲染,高效处理数据流。
- 并行调度:Parallel.ForEach 和 Task.WhenAll 加速处理,利用多核 CPU。
- 现代化:Task 和 async/await 简化管理,支持取消。
- UI 优化:缓存控件,批量更新,降低卡顿。
- 资源管理:静态字体、显式释放,防止泄漏。
- 线程安全:Concurrent 结构和 volatile 确保数据一致。
注意事项
- 初始化:确保 _LX, _YSpace, now 正确设置。
- 日志:替换 ILogger 为实际框架(如 NLog)。
- 测试:验证大数据量(sectionS 含 1000 项)和高频更新。
- 监控:使用 Visual Studio 诊断工具检查性能。
- 队列管理:监控 statusQueue 长度,可添加上限(如 1000 项)。
如需进一步优化(如调整队列大小、增加并行度),请提供更多细节(如 sectionS 规模、UI 刷新频率)!
更多推荐
所有评论(0)