对 **动态超时调整**、**复杂模块依赖取消** 以及 **性能调试** 的详细解析,结合 WPF 应用(`MainView.xaml`)、Prism 框架和之前的任务取消机制
动态超时调整允许根据运行时条件(如用户输入、系统负载或任务优先级)动态修改任务的超时时间。在你的应用中,
对 动态超时调整、复杂模块依赖取消 以及 性能调试 的详细解析,结合 WPF 应用(MainView.xaml
)、Prism 框架和之前的任务取消机制。这些高级功能将进一步提升你的应用在处理后台任务(如监控 Global.OilTemp
)、事件处理(如 messageBox
更新)和模块化架构中的灵活性、健壮性和性能。以下内容基于之前的实现,重点探讨动态超时、模块依赖取消和性能调试的最佳实践,提供具体实现和示例代码。
1. 动态超时调整
1.1 概述
动态超时调整允许根据运行时条件(如用户输入、系统负载或任务优先级)动态修改任务的超时时间。在你的应用中,DataService
的监控任务可能需要根据 Global.OilTemp
的值或用户配置调整超时时间,以优化资源使用或响应紧急情况。
- 目标:
- 根据条件(如油温超限)动态调整
CancellationTokenSource
的超时。 - 更新
messageBox
显示超时状态。 - 确保任务取消和资源清理。
- 根据条件(如油温超限)动态调整
1.2 实现
步骤 1:DataService 支持动态超时
扩展 DataService
以接受动态超时参数,并通过事件通知超时调整。
using Prism.Events;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.Services
{
public class DataService
{
private readonly IEventAggregator _eventAggregator;
public DataService(IEventAggregator eventAggregator)
{
_eventAggregator = eventAggregator;
}
public async Task MonitorAsync(CancellationToken cancellationToken, Func<double, TimeSpan> timeoutAdjuster)
{
try
{
await Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(500, cancellationToken);
if (Global.OilTemp > 100)
{
await Task.Delay(1000, cancellationToken);
_eventAggregator.GetEvent<MessageEvent>().Publish(($"Critical: OilTemp {Global.OilTemp}°C!", 2));
// 动态调整超时(例如,油温超限时缩短超时)
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Publish(timeoutAdjuster(Global.OilTemp));
}
else
{
_eventAggregator.GetEvent<MessageEvent>().Publish(($"OilTemp: {Global.OilTemp}°C", 0));
}
}
}, cancellationToken);
}
catch (OperationCanceledException)
{
_eventAggregator.GetEvent<MessageEvent>().Publish(("Monitoring cancelled or timed out.", 0));
}
catch (Exception ex)
{
_eventAggregator.GetEvent<MessageEvent>().Publish(($"Error: {ex.Message}", 0));
}
}
}
}
- timeoutAdjuster:根据
Global.OilTemp
动态计算超时时间。 - TimeoutAdjustEvent:新事件,用于通知超时调整。
步骤 2:定义 TimeoutAdjustEvent
using Prism.Events;
namespace PowerCycling.Events
{
public class TimeoutAdjustEvent : PubSubEvent<TimeSpan> { }
}
步骤 3:MainViewModel 动态调整超时
MainViewModel
监听 TimeoutAdjustEvent
并更新 CancellationTokenSource
。
using Prism.Commands;
using Prism.Events;
using Prism.Mvvm;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.ViewModels
{
public class MainViewModel : BindableBase
{
private readonly IEventAggregator _eventAggregator;
private string _message;
private readonly ConcurrentPriorityQueue<(string Message, int Priority)> _messageQueue;
private CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly TimeSpan _throttleInterval = TimeSpan.FromSeconds(2);
private DateTime _lastProcessedTime;
private SubscriptionToken _messageSubscriptionToken;
private SubscriptionToken _cancelSubscriptionToken;
private SubscriptionToken _timeoutSubscriptionToken;
public string Message
{
get => _message;
set => SetProperty(ref _message, value);
}
public DelegateCommand StopMonitoringCommand { get; }
public DelegateCommand CancelAllModulesCommand { get; }
public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
{
_eventAggregator = eventAggregator;
_messageQueue = new ConcurrentPriorityQueue<(string Message, int Priority)>(Comparer<(string Message, int Priority)>.Create((a, b) => b.Priority.CompareTo(a.Priority)));
_messageSubscriptionToken = _eventAggregator.GetEvent<MessageEvent>().Subscribe(OnMessageReceivedAsync, ThreadOption.BackgroundThread);
_cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
_timeoutSubscriptionToken = _eventAggregator.GetEvent<TimeoutAdjustEvent>().Subscribe(OnTimeoutAdjust, ThreadOption.BackgroundThread);
StopMonitoringCommand = new DelegateCommand(StopMonitoring);
CancelAllModulesCommand = new DelegateCommand(CancelAllModules);
Task.Factory.StartNew(() => ProcessMessageQueueAsync(dataService), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private async Task OnMessageReceivedAsync((string Message, int Priority) message)
{
try
{
_messageQueue.Enqueue(message);
}
catch (Exception ex)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = $"Error: {ex.Message}";
});
}
}
private async Task ProcessMessageQueueAsync(DataService dataService)
{
try
{
await dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30));
while (!_cts.Token.IsCancellationRequested)
{
if (_messageQueue.TryDequeue(out var message))
{
var now = DateTime.Now;
if (now - _lastProcessedTime < _throttleInterval)
{
await Task.Delay(_throttleInterval - (now - _lastProcessedTime), _cts.Token);
}
await _semaphore.WaitAsync(_cts.Token);
try
{
await Task.Delay(500, _cts.Token);
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = message.Message;
});
_lastProcessedTime = now;
}
finally
{
_semaphore.Release();
}
}
await Task.Delay(100, _cts.Token);
}
}
catch (OperationCanceledException)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = "Monitoring cancelled or timed out.";
});
}
catch (Exception ex)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = $"Error: {ex.Message}";
});
}
}
private void OnTimeoutAdjust(TimeSpan newTimeout)
{
try
{
// 创建新的 CTS 并替换旧的
var newCts = new CancellationTokenSource(newTimeout);
Interlocked.Exchange(ref _cts, newCts)?.Dispose();
Message = $"Timeout adjusted to {newTimeout.TotalSeconds} seconds.";
}
catch (Exception ex)
{
Message = $"Error adjusting timeout: {ex.Message}";
}
}
private void OnModuleCancel(string moduleName)
{
if (moduleName == nameof(MainViewModel) || moduleName == "All")
{
StopMonitoring();
}
}
private void StopMonitoring()
{
try
{
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
Message = "Monitoring stopped.";
}
catch (ObjectDisposedException)
{
// CTS 已释放,忽略
}
}
private void CancelAllModules()
{
try
{
_eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
StopMonitoring();
}
catch (Exception ex)
{
Message = $"Error cancelling modules: {ex.Message}";
}
}
public void Cleanup()
{
try
{
if (!_cts.IsCancellationRequested)
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
}
finally
{
_cts.Dispose();
_semaphore.Dispose();
}
}
}
}
- 动态超时:
timeoutAdjuster
根据油温调整超时(>100°C 时为 10 秒,否则 30 秒)。 - Interlocked.Exchange:线程安全地替换
_cts
,并释放旧的 CTS。 - TimeoutAdjustEvent:通知超时调整并更新
messageBox
。
2. 复杂模块依赖取消
2.1 概述
复杂模块依赖取消涉及模块之间的依赖关系(如 SettingsModule
依赖 CoreModule
),需要确保取消操作按依赖顺序传播。例如,CoreModule
取消后,SettingsModule
也应取消。
- 目标:
- 定义模块依赖关系,确保依赖模块在主模块取消后停止。
- 使用
ModuleCancelEvent
协调取消顺序。 - 更新
messageBox
显示模块取消状态。
2.2 实现
步骤 1:定义模块依赖
在 App.xaml.cs
中明确模块依赖:
using Prism.Ioc;
using Prism.Modularity;
using Prism.Unity;
using System.Windows;
namespace PowerCycling
{
public partial class App : PrismApplication
{
protected override Window CreateShell()
{
return Container.Resolve<MainView>();
}
protected override void RegisterTypes(IContainerRegistry containerRegistry)
{
// 注册其他服务
}
protected override void ConfigureModuleCatalog(IModuleCatalog moduleCatalog)
{
moduleCatalog.AddModule<CoreModule>();
moduleCatalog.AddModule<SettingsModule>(InitializationMode.OnDemand, dependsOn: new[] { nameof(CoreModule) });
}
protected override void OnExit(ExitEventArgs e)
{
var eventAggregator = Container.Resolve<IEventAggregator>();
eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
base.OnExit(e);
}
}
}
- dependsOn:
SettingsModule
依赖CoreModule
,确保加载顺序。
步骤 2:CoreModule
CoreModule
管理核心服务并响应取消:
using Prism.Ioc;
using Prism.Modularity;
using Prism.Events;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.Modules
{
public class CoreModule : IModule
{
private readonly IEventAggregator _eventAggregator;
private readonly CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
private SubscriptionToken _cancelSubscriptionToken;
public CoreModule(IEventAggregator eventAggregator)
{
_eventAggregator = eventAggregator;
}
public void OnInitialized(IContainerProvider containerProvider)
{
var dataService = containerProvider.Resolve<DataService>();
Task.Run(() => dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30)));
_cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
}
public void RegisterTypes(IContainerRegistry containerRegistry)
{
containerRegistry.RegisterSingleton<DataService>();
}
private void OnModuleCancel(string moduleName)
{
if (moduleName == nameof(CoreModule) || moduleName == "All")
{
try
{
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Publish(("CoreModule cancelled.", 0));
// 通知依赖模块取消
_eventAggregator.GetEvent<ModuleCancelEvent>().Publish(nameof(SettingsModule));
}
catch (ObjectDisposedException)
{
// CTS 已释放,忽略
}
}
}
public void Cleanup()
{
try
{
if (!_cts.IsCancellationRequested)
_cts.Cancel();
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
}
finally
{
_cts.Dispose();
}
}
}
}
- 依赖取消:
CoreModule
取消后,发布ModuleCancelEvent
通知SettingsModule
。
步骤 3:SettingsModule
SettingsModule
响应 CoreModule
的取消:
using Prism.Ioc;
using Prism.Modularity;
using Prism.Regions;
using Prism.Events;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.Modules
{
public class SettingsModule : IModule
{
private readonly IRegionManager _regionManager;
private readonly IEventAggregator _eventAggregator;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private SubscriptionToken _cancelSubscriptionToken;
public SettingsModule(IRegionManager regionManager, IEventAggregator eventAggregator)
{
_regionManager = regionManager;
_eventAggregator = eventAggregator;
}
public void OnInitialized(IContainerProvider containerProvider)
{
_regionManager.RegisterViewWithRegion("ContentRegion", typeof(SettingsView));
_cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
Task.Run(() => SettingsTaskAsync(_cts.Token));
}
public void RegisterTypes(IContainerRegistry containerRegistry)
{
containerRegistry.RegisterForNavigation<SettingsView>();
}
private async Task SettingsTaskAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(1000, cancellationToken);
_eventAggregator.GetEvent<MessageEvent>().Publish(($"SettingsModule running.", 0));
}
}
catch (OperationCanceledException)
{
_eventAggregator.GetEvent<MessageEvent>().Publish(("SettingsModule cancelled.", 0));
}
}
private void OnModuleCancel(string moduleName)
{
if (moduleName == nameof(SettingsModule) || moduleName == "All")
{
try
{
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Publish(("SettingsModule cancelled.", 0));
}
catch (ObjectDisposedException)
{
// CTS 已释放,忽略
}
}
}
public void Cleanup()
{
try
{
if (!_cts.IsCancellationRequested)
_cts.Cancel();
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
}
finally
{
_cts.Dispose();
}
}
}
}
- 依赖取消:
SettingsModule
在接收到ModuleCancelEvent
(来自CoreModule
或全局)时取消任务。
3. 性能调试
3.1 概述
性能调试旨在识别和优化任务调度、取消和事件处理的性能瓶颈。你的应用涉及高频事件(如 Global.OilTemp
监控)和模块化任务,需要监控执行时间、内存使用和线程池负载。
- 目标:
- 记录任务执行时间和取消延迟。
- 监控线程池使用情况。
- 优化高频事件处理(如限流、优先级队列)。
3.2 实现
步骤 1:添加性能日志
扩展 MainViewModel
和 DataService
以记录性能指标。
using Prism.Commands;
using Prism.Events;
using Prism.Mvvm;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.ViewModels
{
public class MainViewModel : BindableBase
{
private readonly IEventAggregator _eventAggregator;
private string _message;
private readonly ConcurrentPriorityQueue<(string Message, int Priority)> _messageQueue;
private CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly TimeSpan _throttleInterval = TimeSpan.FromSeconds(2);
private DateTime _lastProcessedTime;
private SubscriptionToken _messageSubscriptionToken;
private SubscriptionToken _cancelSubscriptionToken;
private SubscriptionToken _timeoutSubscriptionToken;
private readonly Stopwatch _stopwatch = new Stopwatch();
public string Message
{
get => _message;
set => SetProperty(ref _message, value);
}
public DelegateCommand StopMonitoringCommand { get; }
public DelegateCommand CancelAllModulesCommand { get; }
public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
{
_eventAggregator = eventAggregator;
_messageQueue = new ConcurrentPriorityQueue<(string Message, int Priority)>(Comparer<(string Message, int Priority)>.Create((a, b) => b.Priority.CompareTo(a.Priority)));
_messageSubscriptionToken = _eventAggregator.GetEvent<MessageEvent>().Subscribe(OnMessageReceivedAsync, ThreadOption.BackgroundThread);
_cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
_timeoutSubscriptionToken = _eventAggregator.GetEvent<TimeoutAdjustEvent>().Subscribe(OnTimeoutAdjust, ThreadOption.BackgroundThread);
StopMonitoringCommand = new DelegateCommand(StopMonitoring);
CancelAllModulesCommand = new DelegateCommand(CancelAllModules);
Task.Factory.StartNew(() => ProcessMessageQueueAsync(dataService), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private async Task OnMessageReceivedAsync((string Message, int Priority) message)
{
try
{
_stopwatch.Restart();
_messageQueue.Enqueue(message);
Debug.WriteLine($"Message enqueued: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = $"Error: {ex.Message}";
});
}
}
private async Task ProcessMessageQueueAsync(DataService dataService)
{
try
{
_stopwatch.Restart();
await dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30));
Debug.WriteLine($"MonitorAsync started, Time: {_stopwatch.ElapsedMilliseconds}ms");
while (!_cts.Token.IsCancellationRequested)
{
if (_messageQueue.TryDequeue(out var message))
{
var now = DateTime.Now;
if (now - _lastProcessedTime < _throttleInterval)
{
await Task.Delay(_throttleInterval - (now - _lastProcessedTime), _cts.Token);
}
await _semaphore.WaitAsync(_cts.Token);
try
{
_stopwatch.Restart();
await Task.Delay(500, _cts.Token);
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = message.Message;
});
_lastProcessedTime = now;
Debug.WriteLine($"Message processed: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
finally
{
_semaphore.Release();
}
}
await Task.Delay(100, _cts.Token);
}
}
catch (OperationCanceledException)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = "Monitoring cancelled or timed out.";
});
Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = $"Error: {ex.Message}";
});
Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
}
private void OnTimeoutAdjust(TimeSpan newTimeout)
{
try
{
_stopwatch.Restart();
var newCts = new CancellationTokenSource(newTimeout);
Interlocked.Exchange(ref _cts, newCts)?.Dispose();
Message = $"Timeout adjusted to {newTimeout.TotalSeconds} seconds.";
Debug.WriteLine($"Timeout adjusted to {newTimeout.TotalSeconds}s, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Message = $"Error adjusting timeout: {ex.Message}";
}
}
private void OnModuleCancel(string moduleName)
{
if (moduleName == nameof(MainViewModel) || moduleName == "All")
{
StopMonitoring();
}
}
private void StopMonitoring()
{
try
{
_stopwatch.Restart();
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
Message = "Monitoring stopped.";
Debug.WriteLine($"Monitoring stopped, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (ObjectDisposedException)
{
// CTS 已释放,忽略
}
}
private void CancelAllModules()
{
try
{
_stopwatch.Restart();
_eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
StopMonitoring();
Debug.WriteLine($"All modules cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Message = $"Error cancelling modules: {ex.Message}";
}
}
public void Cleanup()
{
try
{
if (!_cts.IsCancellationRequested)
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
}
finally
{
_cts.Dispose();
_semaphore.Dispose();
}
}
}
}
步骤 2:DataService 添加性能日志
using Prism.Events;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.Services
{
public class DataService
{
private readonly IEventAggregator _eventAggregator;
private readonly Stopwatch _stopwatch = new Stopwatch();
public DataService(IEventAggregator eventAggregator)
{
_eventAggregator = eventAggregator;
}
public async Task MonitorAsync(CancellationToken cancellationToken, Func<double, TimeSpan> timeoutAdjuster)
{
try
{
_stopwatch.Restart();
await Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(500, cancellationToken);
if (Global.OilTemp > 100)
{
await Task.Delay(1000, cancellationToken);
_eventAggregator.GetEvent<MessageEvent>().Publish(($"Critical: OilTemp {Global.OilTemp}°C!", 2));
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Publish(timeoutAdjuster(Global.OilTemp));
Debug.WriteLine($"Published critical message, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
else
{
_eventAggregator.GetEvent<MessageEvent>().Publish(($"OilTemp: {Global.OilTemp}°C", 0));
}
}
}, cancellationToken);
}
catch (OperationCanceledException)
{
_eventAggregator.GetEvent<MessageEvent>().Publish(("Monitoring cancelled or timed out.", 0));
Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
_eventAggregator.GetEvent<MessageEvent>().Publish(($"Error: {ex.Message}", 0));
Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
}
}
}
步骤 3:监控线程池
添加线程池监控以优化性能:
public class MainViewModel : BindableBase
{
private void MonitorThreadPool()
{
Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
ThreadPool.GetMaxThreads(out int maxWorker, out int maxCompletion);
ThreadPool.GetAvailableThreads(out int availableWorker, out int availableCompletion);
Debug.WriteLine($"ThreadPool: Worker={availableWorker}/{maxWorker}, Completion={availableCompletion}/{maxCompletion}");
await Task.Delay(5000, _cts.Token);
}
});
}
public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
{
// ... 其他代码 ...
MonitorThreadPool();
}
}
- 性能日志:使用
Stopwatch
记录关键操作的时间(如消息处理、取消、超时调整)。 - 线程池监控:定期检查线程池状态,识别资源瓶颈。
- 优化建议:
- 如果线程池耗尽,调整
ThreadPool.SetMaxThreads
或减少并发任务。 - 使用限流(
_throttleInterval
)和优先级队列(ConcurrentPriorityQueue
)减少事件处理开销。
- 如果线程池耗尽,调整
4. 最佳实践总结
4.1 动态超时调整
- 实践:
- 使用
Func<double, TimeSpan>
动态计算超时。 - 通过
TimeoutAdjustEvent
通知调整,线程安全替换_cts
。 - 更新
messageBox
显示超时状态。
- 使用
- 应用:油温 >100°C 时缩短超时到 10 秒,否则保持 30 秒。
4.2 复杂模块依赖取消
- 实践:
- 定义模块依赖(
SettingsModule
依赖CoreModule
)。 - 使用
ModuleCancelEvent
按依赖顺序取消(CoreModule
取消触发SettingsModule
)。 - 每个模块实现
Cleanup
释放资源。
- 定义模块依赖(
- 应用:
CoreModule
取消后通知SettingsModule
,更新messageBox
。
4.3 性能调试
- 实践:
- 使用
Stopwatch
记录任务执行时间。 - 监控线程池状态(
ThreadPool.GetAvailableThreads
)。 - 优化高频事件处理(限流、优先级队列)。
- 使用
- 应用:记录消息处理和取消时间,监控线程池负载,优化
DataService
和MainViewModel
。
4.4 注意事项
- 线程安全:使用
Interlocked.Exchange
替换_cts
,确保 UI 更新通过Dispatcher.InvokeAsync
。 - 资源清理:释放
_cts
、_semaphore
和事件订阅。 - 异常处理:捕获
OperationCanceledException
和ObjectDisposedException
。 - 性能优化:结合限流、优先级队列和线程池监控。
5. 整合到你的代码
以下是完整整合代码,支持动态超时、模块依赖取消和性能调试:
5.1 MainViewModel.cs
using Prism.Commands;
using Prism.Events;
using Prism.Mvvm;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.ViewModels
{
public class MainViewModel : BindableBase
{
private readonly IEventAggregator _eventAggregator;
private string _message;
private readonly ConcurrentPriorityQueue<(string Message, int Priority)> _messageQueue;
private CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly TimeSpan _throttleInterval = TimeSpan.FromSeconds(2);
private DateTime _lastProcessedTime;
private SubscriptionToken _messageSubscriptionToken;
private SubscriptionToken _cancelSubscriptionToken;
private SubscriptionToken _timeoutSubscriptionToken;
private readonly Stopwatch _stopwatch = new Stopwatch();
public string Message
{
get => _message;
set => SetProperty(ref _message, value);
}
public DelegateCommand StopMonitoringCommand { get; }
public DelegateCommand CancelAllModulesCommand { get; }
public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
{
_eventAggregator = eventAggregator;
_messageQueue = new ConcurrentPriorityQueue<(string Message, int Priority)>(Comparer<(string Message, int Priority)>.Create((a, b) => b.Priority.CompareTo(a.Priority)));
_messageSubscriptionToken = _eventAggregator.GetEvent<MessageEvent>().Subscribe(OnMessageReceivedAsync, ThreadOption.BackgroundThread);
_cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
_timeoutSubscriptionToken = _eventAggregator.GetEvent<TimeoutAdjustEvent>().Subscribe(OnTimeoutAdjust, ThreadOption.BackgroundThread);
StopMonitoringCommand = new DelegateCommand(StopMonitoring);
CancelAllModulesCommand = new DelegateCommand(CancelAllModules);
Task.Factory.StartNew(() => ProcessMessageQueueAsync(dataService), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
MonitorThreadPool();
}
private async Task OnMessageReceivedAsync((string Message, int Priority) message)
{
try
{
_stopwatch.Restart();
_messageQueue.Enqueue(message);
Debug.WriteLine($"Message enqueued: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = $"Error: {ex.Message}";
});
}
}
private async Task ProcessMessageQueueAsync(DataService dataService)
{
try
{
_stopwatch.Restart();
await dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30));
Debug.WriteLine($"MonitorAsync started, Time: {_stopwatch.ElapsedMilliseconds}ms");
while (!_cts.Token.IsCancellationRequested)
{
if (_messageQueue.TryDequeue(out var message))
{
var now = DateTime.Now;
if (now - _lastProcessedTime < _throttleInterval)
{
await Task.Delay(_throttleInterval - (now - _lastProcessedTime), _cts.Token);
}
await _semaphore.WaitAsync(_cts.Token);
try
{
_stopwatch.Restart();
await Task.Delay(500, _cts.Token);
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = message.Message;
});
_lastProcessedTime = now;
Debug.WriteLine($"Message processed: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
finally
{
_semaphore.Release();
}
}
await Task.Delay(100, _cts.Token);
}
}
catch (OperationCanceledException)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = "Monitoring cancelled or timed out.";
});
Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
{
Message = $"Error: {ex.Message}";
});
Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
}
private void OnTimeoutAdjust(TimeSpan newTimeout)
{
try
{
_stopwatch.Restart();
var newCts = new CancellationTokenSource(newTimeout);
Interlocked.Exchange(ref _cts, newCts)?.Dispose();
Message = $"Timeout adjusted to {newTimeout.TotalSeconds} seconds.";
Debug.WriteLine($"Timeout adjusted to {newTimeout.TotalSeconds}s, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Message = $"Error adjusting timeout: {ex.Message}";
}
}
private void OnModuleCancel(string moduleName)
{
if (moduleName == nameof(MainViewModel) || moduleName == "All")
{
StopMonitoring();
}
}
private void StopMonitoring()
{
try
{
_stopwatch.Restart();
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
Message = "Monitoring stopped.";
Debug.WriteLine($"Monitoring stopped, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (ObjectDisposedException)
{
// CTS 已释放,忽略
}
}
private void CancelAllModules()
{
try
{
_stopwatch.Restart();
_eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
StopMonitoring();
Debug.WriteLine($"All modules cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Message = $"Error cancelling modules: {ex.Message}";
}
}
private void MonitorThreadPool()
{
Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
ThreadPool.GetMaxThreads(out int maxWorker, out int maxCompletion);
ThreadPool.GetAvailableThreads(out int availableWorker, out int availableCompletion);
Debug.WriteLine($"ThreadPool: Worker={availableWorker}/{maxWorker}, Completion={availableCompletion}/{maxCompletion}");
await Task.Delay(5000, _cts.Token);
}
});
}
public void Cleanup()
{
try
{
if (!_cts.IsCancellationRequested)
_cts.Cancel();
_eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
_eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
}
finally
{
_cts.Dispose();
_semaphore.Dispose();
}
}
}
}
5.2 DataService.cs
using Prism.Events;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace PowerCycling.Services
{
public class DataService
{
private readonly IEventAggregator _eventAggregator;
private readonly Stopwatch _stopwatch = new Stopwatch();
public DataService(IEventAggregator eventAggregator)
{
_eventAggregator = eventAggregator;
}
public async Task MonitorAsync(CancellationToken cancellationToken, Func<double, TimeSpan> timeoutAdjuster)
{
try
{
_stopwatch.Restart();
await Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(500, cancellationToken);
if (Global.OilTemp > 100)
{
await Task.Delay(1000, cancellationToken);
_eventAggregator.GetEvent<MessageEvent>().Publish(($"Critical: OilTemp {Global.OilTemp}°C!", 2));
_eventAggregator.GetEvent<TimeoutAdjustEvent>().Publish(timeoutAdjuster(Global.OilTemp));
Debug.WriteLine($"Published critical message, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
else
{
_eventAggregator.GetEvent<MessageEvent>().Publish(($"OilTemp: {Global.OilTemp}°C", 0));
}
}
}, cancellationToken);
}
catch (OperationCanceledException)
{
_eventAggregator.GetEvent<MessageEvent>().Publish(("Monitoring cancelled or timed out.", 0));
Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
_eventAggregator.GetEvent<MessageEvent>().Publish(($"Error: {ex.Message}", 0));
Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
}
}
}
}
6. 总结
- 动态超时调整:
- 使用
timeoutAdjuster
根据Global.OilTemp
动态调整超时。 - 通过
TimeoutAdjustEvent
和Interlocked.Exchange
线程安全替换_cts
。 - 更新
messageBox
显示超时状态。
- 使用
- 复杂模块依赖取消:
- 定义模块依赖(
SettingsModule
依赖CoreModule
)。 CoreModule
取消时通知SettingsModule
,通过ModuleCancelEvent
。- 每个模块实现
Cleanup
释放资源。
- 定义模块依赖(
- 性能调试:
- 使用
Stopwatch
记录任务执行和取消时间。 - 监控线程池状态,优化资源分配。
- 结合限流和优先级队列减少高频事件开销。
- 使用
- 在你的代码中的应用:
DataService
动态调整超时,监控Global.OilTemp
。MainViewModel
调度优先级消息,响应模块取消和超时调整。CoreModule
和SettingsModule
按依赖顺序取消。messageBox
显示状态和错误信息。
如果你需要进一步优化(如更复杂的超时策略、模块间任务协调或性能分析工具整合),或者需要调试特定代码,请告诉我!
更多推荐
所有评论(0)