动态超时调整复杂模块依赖取消 以及 性能调试 的详细解析,结合 WPF 应用(MainView.xaml)、Prism 框架和之前的任务取消机制。这些高级功能将进一步提升你的应用在处理后台任务(如监控 Global.OilTemp)、事件处理(如 messageBox 更新)和模块化架构中的灵活性、健壮性和性能。以下内容基于之前的实现,重点探讨动态超时、模块依赖取消和性能调试的最佳实践,提供具体实现和示例代码。


1. 动态超时调整

1.1 概述

动态超时调整允许根据运行时条件(如用户输入、系统负载或任务优先级)动态修改任务的超时时间。在你的应用中,DataService 的监控任务可能需要根据 Global.OilTemp 的值或用户配置调整超时时间,以优化资源使用或响应紧急情况。

  • 目标
    • 根据条件(如油温超限)动态调整 CancellationTokenSource 的超时。
    • 更新 messageBox 显示超时状态。
    • 确保任务取消和资源清理。

1.2 实现

步骤 1:DataService 支持动态超时

扩展 DataService 以接受动态超时参数,并通过事件通知超时调整。

using Prism.Events;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.Services
{
    public class DataService
    {
        private readonly IEventAggregator _eventAggregator;

        public DataService(IEventAggregator eventAggregator)
        {
            _eventAggregator = eventAggregator;
        }

        public async Task MonitorAsync(CancellationToken cancellationToken, Func<double, TimeSpan> timeoutAdjuster)
        {
            try
            {
                await Task.Run(async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        cancellationToken.ThrowIfCancellationRequested();
                        await Task.Delay(500, cancellationToken);
                        if (Global.OilTemp > 100)
                        {
                            await Task.Delay(1000, cancellationToken);
                            _eventAggregator.GetEvent<MessageEvent>().Publish(($"Critical: OilTemp {Global.OilTemp}°C!", 2));
                            // 动态调整超时(例如,油温超限时缩短超时)
                            _eventAggregator.GetEvent<TimeoutAdjustEvent>().Publish(timeoutAdjuster(Global.OilTemp));
                        }
                        else
                        {
                            _eventAggregator.GetEvent<MessageEvent>().Publish(($"OilTemp: {Global.OilTemp}°C", 0));
                        }
                    }
                }, cancellationToken);
            }
            catch (OperationCanceledException)
            {
                _eventAggregator.GetEvent<MessageEvent>().Publish(("Monitoring cancelled or timed out.", 0));
            }
            catch (Exception ex)
            {
                _eventAggregator.GetEvent<MessageEvent>().Publish(($"Error: {ex.Message}", 0));
            }
        }
    }
}
  • timeoutAdjuster:根据 Global.OilTemp 动态计算超时时间。
  • TimeoutAdjustEvent:新事件,用于通知超时调整。
步骤 2:定义 TimeoutAdjustEvent
using Prism.Events;

namespace PowerCycling.Events
{
    public class TimeoutAdjustEvent : PubSubEvent<TimeSpan> { }
}
步骤 3:MainViewModel 动态调整超时

MainViewModel 监听 TimeoutAdjustEvent 并更新 CancellationTokenSource

using Prism.Commands;
using Prism.Events;
using Prism.Mvvm;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.ViewModels
{
    public class MainViewModel : BindableBase
    {
        private readonly IEventAggregator _eventAggregator;
        private string _message;
        private readonly ConcurrentPriorityQueue<(string Message, int Priority)> _messageQueue;
        private CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
        private readonly TimeSpan _throttleInterval = TimeSpan.FromSeconds(2);
        private DateTime _lastProcessedTime;
        private SubscriptionToken _messageSubscriptionToken;
        private SubscriptionToken _cancelSubscriptionToken;
        private SubscriptionToken _timeoutSubscriptionToken;

        public string Message
        {
            get => _message;
            set => SetProperty(ref _message, value);
        }

        public DelegateCommand StopMonitoringCommand { get; }
        public DelegateCommand CancelAllModulesCommand { get; }

        public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
        {
            _eventAggregator = eventAggregator;
            _messageQueue = new ConcurrentPriorityQueue<(string Message, int Priority)>(Comparer<(string Message, int Priority)>.Create((a, b) => b.Priority.CompareTo(a.Priority)));
            _messageSubscriptionToken = _eventAggregator.GetEvent<MessageEvent>().Subscribe(OnMessageReceivedAsync, ThreadOption.BackgroundThread);
            _cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
            _timeoutSubscriptionToken = _eventAggregator.GetEvent<TimeoutAdjustEvent>().Subscribe(OnTimeoutAdjust, ThreadOption.BackgroundThread);
            StopMonitoringCommand = new DelegateCommand(StopMonitoring);
            CancelAllModulesCommand = new DelegateCommand(CancelAllModules);

            Task.Factory.StartNew(() => ProcessMessageQueueAsync(dataService), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        private async Task OnMessageReceivedAsync((string Message, int Priority) message)
        {
            try
            {
                _messageQueue.Enqueue(message);
            }
            catch (Exception ex)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = $"Error: {ex.Message}";
                });
            }
        }

        private async Task ProcessMessageQueueAsync(DataService dataService)
        {
            try
            {
                await dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30));

                while (!_cts.Token.IsCancellationRequested)
                {
                    if (_messageQueue.TryDequeue(out var message))
                    {
                        var now = DateTime.Now;
                        if (now - _lastProcessedTime < _throttleInterval)
                        {
                            await Task.Delay(_throttleInterval - (now - _lastProcessedTime), _cts.Token);
                        }

                        await _semaphore.WaitAsync(_cts.Token);
                        try
                        {
                            await Task.Delay(500, _cts.Token);
                            await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                            {
                                Message = message.Message;
                            });
                            _lastProcessedTime = now;
                        }
                        finally
                        {
                            _semaphore.Release();
                        }
                    }
                    await Task.Delay(100, _cts.Token);
                }
            }
            catch (OperationCanceledException)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = "Monitoring cancelled or timed out.";
                });
            }
            catch (Exception ex)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = $"Error: {ex.Message}";
                });
            }
        }

        private void OnTimeoutAdjust(TimeSpan newTimeout)
        {
            try
            {
                // 创建新的 CTS 并替换旧的
                var newCts = new CancellationTokenSource(newTimeout);
                Interlocked.Exchange(ref _cts, newCts)?.Dispose();
                Message = $"Timeout adjusted to {newTimeout.TotalSeconds} seconds.";
            }
            catch (Exception ex)
            {
                Message = $"Error adjusting timeout: {ex.Message}";
            }
        }

        private void OnModuleCancel(string moduleName)
        {
            if (moduleName == nameof(MainViewModel) || moduleName == "All")
            {
                StopMonitoring();
            }
        }

        private void StopMonitoring()
        {
            try
            {
                _cts.Cancel();
                _eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
                _eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
                Message = "Monitoring stopped.";
            }
            catch (ObjectDisposedException)
            {
                // CTS 已释放,忽略
            }
        }

        private void CancelAllModules()
        {
            try
            {
                _eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
                StopMonitoring();
            }
            catch (Exception ex)
            {
                Message = $"Error cancelling modules: {ex.Message}";
            }
        }

        public void Cleanup()
        {
            try
            {
                if (!_cts.IsCancellationRequested)
                    _cts.Cancel();
                _eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
                _eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
            }
            finally
            {
                _cts.Dispose();
                _semaphore.Dispose();
            }
        }
    }
}
  • 动态超时timeoutAdjuster 根据油温调整超时(>100°C 时为 10 秒,否则 30 秒)。
  • Interlocked.Exchange:线程安全地替换 _cts,并释放旧的 CTS。
  • TimeoutAdjustEvent:通知超时调整并更新 messageBox

2. 复杂模块依赖取消

2.1 概述

复杂模块依赖取消涉及模块之间的依赖关系(如 SettingsModule 依赖 CoreModule),需要确保取消操作按依赖顺序传播。例如,CoreModule 取消后,SettingsModule 也应取消。

  • 目标
    • 定义模块依赖关系,确保依赖模块在主模块取消后停止。
    • 使用 ModuleCancelEvent 协调取消顺序。
    • 更新 messageBox 显示模块取消状态。

2.2 实现

步骤 1:定义模块依赖

App.xaml.cs 中明确模块依赖:

using Prism.Ioc;
using Prism.Modularity;
using Prism.Unity;
using System.Windows;

namespace PowerCycling
{
    public partial class App : PrismApplication
    {
        protected override Window CreateShell()
        {
            return Container.Resolve<MainView>();
        }

        protected override void RegisterTypes(IContainerRegistry containerRegistry)
        {
            // 注册其他服务
        }

        protected override void ConfigureModuleCatalog(IModuleCatalog moduleCatalog)
        {
            moduleCatalog.AddModule<CoreModule>();
            moduleCatalog.AddModule<SettingsModule>(InitializationMode.OnDemand, dependsOn: new[] { nameof(CoreModule) });
        }

        protected override void OnExit(ExitEventArgs e)
        {
            var eventAggregator = Container.Resolve<IEventAggregator>();
            eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
            base.OnExit(e);
        }
    }
}
  • dependsOnSettingsModule 依赖 CoreModule,确保加载顺序。
步骤 2:CoreModule

CoreModule 管理核心服务并响应取消:

using Prism.Ioc;
using Prism.Modularity;
using Prism.Events;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.Modules
{
    public class CoreModule : IModule
    {
        private readonly IEventAggregator _eventAggregator;
        private readonly CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        private SubscriptionToken _cancelSubscriptionToken;

        public CoreModule(IEventAggregator eventAggregator)
        {
            _eventAggregator = eventAggregator;
        }

        public void OnInitialized(IContainerProvider containerProvider)
        {
            var dataService = containerProvider.Resolve<DataService>();
            Task.Run(() => dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30)));
            _cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
        }

        public void RegisterTypes(IContainerRegistry containerRegistry)
        {
            containerRegistry.RegisterSingleton<DataService>();
        }

        private void OnModuleCancel(string moduleName)
        {
            if (moduleName == nameof(CoreModule) || moduleName == "All")
            {
                try
                {
                    _cts.Cancel();
                    _eventAggregator.GetEvent<MessageEvent>().Publish(("CoreModule cancelled.", 0));
                    // 通知依赖模块取消
                    _eventAggregator.GetEvent<ModuleCancelEvent>().Publish(nameof(SettingsModule));
                }
                catch (ObjectDisposedException)
                {
                    // CTS 已释放,忽略
                }
            }
        }

        public void Cleanup()
        {
            try
            {
                if (!_cts.IsCancellationRequested)
                    _cts.Cancel();
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
            }
            finally
            {
                _cts.Dispose();
            }
        }
    }
}
  • 依赖取消CoreModule 取消后,发布 ModuleCancelEvent 通知 SettingsModule
步骤 3:SettingsModule

SettingsModule 响应 CoreModule 的取消:

using Prism.Ioc;
using Prism.Modularity;
using Prism.Regions;
using Prism.Events;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.Modules
{
    public class SettingsModule : IModule
    {
        private readonly IRegionManager _regionManager;
        private readonly IEventAggregator _eventAggregator;
        private readonly CancellationTokenSource _cts = new CancellationTokenSource();
        private SubscriptionToken _cancelSubscriptionToken;

        public SettingsModule(IRegionManager regionManager, IEventAggregator eventAggregator)
        {
            _regionManager = regionManager;
            _eventAggregator = eventAggregator;
        }

        public void OnInitialized(IContainerProvider containerProvider)
        {
            _regionManager.RegisterViewWithRegion("ContentRegion", typeof(SettingsView));
            _cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
            Task.Run(() => SettingsTaskAsync(_cts.Token));
        }

        public void RegisterTypes(IContainerRegistry containerRegistry)
        {
            containerRegistry.RegisterForNavigation<SettingsView>();
        }

        private async Task SettingsTaskAsync(CancellationToken cancellationToken)
        {
            try
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    await Task.Delay(1000, cancellationToken);
                    _eventAggregator.GetEvent<MessageEvent>().Publish(($"SettingsModule running.", 0));
                }
            }
            catch (OperationCanceledException)
            {
                _eventAggregator.GetEvent<MessageEvent>().Publish(("SettingsModule cancelled.", 0));
            }
        }

        private void OnModuleCancel(string moduleName)
        {
            if (moduleName == nameof(SettingsModule) || moduleName == "All")
            {
                try
                {
                    _cts.Cancel();
                    _eventAggregator.GetEvent<MessageEvent>().Publish(("SettingsModule cancelled.", 0));
                }
                catch (ObjectDisposedException)
                {
                    // CTS 已释放,忽略
                }
            }
        }

        public void Cleanup()
        {
            try
            {
                if (!_cts.IsCancellationRequested)
                    _cts.Cancel();
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
            }
            finally
            {
                _cts.Dispose();
            }
        }
    }
}
  • 依赖取消SettingsModule 在接收到 ModuleCancelEvent(来自 CoreModule 或全局)时取消任务。

3. 性能调试

3.1 概述

性能调试旨在识别和优化任务调度、取消和事件处理的性能瓶颈。你的应用涉及高频事件(如 Global.OilTemp 监控)和模块化任务,需要监控执行时间、内存使用和线程池负载。

  • 目标
    • 记录任务执行时间和取消延迟。
    • 监控线程池使用情况。
    • 优化高频事件处理(如限流、优先级队列)。

3.2 实现

步骤 1:添加性能日志

扩展 MainViewModelDataService 以记录性能指标。

using Prism.Commands;
using Prism.Events;
using Prism.Mvvm;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.ViewModels
{
    public class MainViewModel : BindableBase
    {
        private readonly IEventAggregator _eventAggregator;
        private string _message;
        private readonly ConcurrentPriorityQueue<(string Message, int Priority)> _messageQueue;
        private CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
        private readonly TimeSpan _throttleInterval = TimeSpan.FromSeconds(2);
        private DateTime _lastProcessedTime;
        private SubscriptionToken _messageSubscriptionToken;
        private SubscriptionToken _cancelSubscriptionToken;
        private SubscriptionToken _timeoutSubscriptionToken;
        private readonly Stopwatch _stopwatch = new Stopwatch();

        public string Message
        {
            get => _message;
            set => SetProperty(ref _message, value);
        }

        public DelegateCommand StopMonitoringCommand { get; }
        public DelegateCommand CancelAllModulesCommand { get; }

        public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
        {
            _eventAggregator = eventAggregator;
            _messageQueue = new ConcurrentPriorityQueue<(string Message, int Priority)>(Comparer<(string Message, int Priority)>.Create((a, b) => b.Priority.CompareTo(a.Priority)));
            _messageSubscriptionToken = _eventAggregator.GetEvent<MessageEvent>().Subscribe(OnMessageReceivedAsync, ThreadOption.BackgroundThread);
            _cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
            _timeoutSubscriptionToken = _eventAggregator.GetEvent<TimeoutAdjustEvent>().Subscribe(OnTimeoutAdjust, ThreadOption.BackgroundThread);
            StopMonitoringCommand = new DelegateCommand(StopMonitoring);
            CancelAllModulesCommand = new DelegateCommand(CancelAllModules);

            Task.Factory.StartNew(() => ProcessMessageQueueAsync(dataService), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        private async Task OnMessageReceivedAsync((string Message, int Priority) message)
        {
            try
            {
                _stopwatch.Restart();
                _messageQueue.Enqueue(message);
                Debug.WriteLine($"Message enqueued: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = $"Error: {ex.Message}";
                });
            }
        }

        private async Task ProcessMessageQueueAsync(DataService dataService)
        {
            try
            {
                _stopwatch.Restart();
                await dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30));
                Debug.WriteLine($"MonitorAsync started, Time: {_stopwatch.ElapsedMilliseconds}ms");

                while (!_cts.Token.IsCancellationRequested)
                {
                    if (_messageQueue.TryDequeue(out var message))
                    {
                        var now = DateTime.Now;
                        if (now - _lastProcessedTime < _throttleInterval)
                        {
                            await Task.Delay(_throttleInterval - (now - _lastProcessedTime), _cts.Token);
                        }

                        await _semaphore.WaitAsync(_cts.Token);
                        try
                        {
                            _stopwatch.Restart();
                            await Task.Delay(500, _cts.Token);
                            await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                            {
                                Message = message.Message;
                            });
                            _lastProcessedTime = now;
                            Debug.WriteLine($"Message processed: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
                        }
                        finally
                        {
                            _semaphore.Release();
                        }
                    }
                    await Task.Delay(100, _cts.Token);
                }
            }
            catch (OperationCanceledException)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = "Monitoring cancelled or timed out.";
                });
                Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = $"Error: {ex.Message}";
                });
                Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
        }

        private void OnTimeoutAdjust(TimeSpan newTimeout)
        {
            try
            {
                _stopwatch.Restart();
                var newCts = new CancellationTokenSource(newTimeout);
                Interlocked.Exchange(ref _cts, newCts)?.Dispose();
                Message = $"Timeout adjusted to {newTimeout.TotalSeconds} seconds.";
                Debug.WriteLine($"Timeout adjusted to {newTimeout.TotalSeconds}s, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                Message = $"Error adjusting timeout: {ex.Message}";
            }
        }

        private void OnModuleCancel(string moduleName)
        {
            if (moduleName == nameof(MainViewModel) || moduleName == "All")
            {
                StopMonitoring();
            }
        }

        private void StopMonitoring()
        {
            try
            {
                _stopwatch.Restart();
                _cts.Cancel();
                _eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
                _eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
                Message = "Monitoring stopped.";
                Debug.WriteLine($"Monitoring stopped, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (ObjectDisposedException)
            {
                // CTS 已释放,忽略
            }
        }

        private void CancelAllModules()
        {
            try
            {
                _stopwatch.Restart();
                _eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
                StopMonitoring();
                Debug.WriteLine($"All modules cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                Message = $"Error cancelling modules: {ex.Message}";
            }
        }

        public void Cleanup()
        {
            try
            {
                if (!_cts.IsCancellationRequested)
                    _cts.Cancel();
                _eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
                _eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
            }
            finally
            {
                _cts.Dispose();
                _semaphore.Dispose();
            }
        }
    }
}
步骤 2:DataService 添加性能日志
using Prism.Events;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.Services
{
    public class DataService
    {
        private readonly IEventAggregator _eventAggregator;
        private readonly Stopwatch _stopwatch = new Stopwatch();

        public DataService(IEventAggregator eventAggregator)
        {
            _eventAggregator = eventAggregator;
        }

        public async Task MonitorAsync(CancellationToken cancellationToken, Func<double, TimeSpan> timeoutAdjuster)
        {
            try
            {
                _stopwatch.Restart();
                await Task.Run(async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        cancellationToken.ThrowIfCancellationRequested();
                        await Task.Delay(500, cancellationToken);
                        if (Global.OilTemp > 100)
                        {
                            await Task.Delay(1000, cancellationToken);
                            _eventAggregator.GetEvent<MessageEvent>().Publish(($"Critical: OilTemp {Global.OilTemp}°C!", 2));
                            _eventAggregator.GetEvent<TimeoutAdjustEvent>().Publish(timeoutAdjuster(Global.OilTemp));
                            Debug.WriteLine($"Published critical message, Time: {_stopwatch.ElapsedMilliseconds}ms");
                        }
                        else
                        {
                            _eventAggregator.GetEvent<MessageEvent>().Publish(($"OilTemp: {Global.OilTemp}°C", 0));
                        }
                    }
                }, cancellationToken);
            }
            catch (OperationCanceledException)
            {
                _eventAggregator.GetEvent<MessageEvent>().Publish(("Monitoring cancelled or timed out.", 0));
                Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                _eventAggregator.GetEvent<MessageEvent>().Publish(($"Error: {ex.Message}", 0));
                Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
        }
    }
}
步骤 3:监控线程池

添加线程池监控以优化性能:

public class MainViewModel : BindableBase
{
    private void MonitorThreadPool()
    {
        Task.Run(async () =>
        {
            while (!_cts.Token.IsCancellationRequested)
            {
                ThreadPool.GetMaxThreads(out int maxWorker, out int maxCompletion);
                ThreadPool.GetAvailableThreads(out int availableWorker, out int availableCompletion);
                Debug.WriteLine($"ThreadPool: Worker={availableWorker}/{maxWorker}, Completion={availableCompletion}/{maxCompletion}");
                await Task.Delay(5000, _cts.Token);
            }
        });
    }

    public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
    {
        // ... 其他代码 ...
        MonitorThreadPool();
    }
}
  • 性能日志:使用 Stopwatch 记录关键操作的时间(如消息处理、取消、超时调整)。
  • 线程池监控:定期检查线程池状态,识别资源瓶颈。
  • 优化建议
    • 如果线程池耗尽,调整 ThreadPool.SetMaxThreads 或减少并发任务。
    • 使用限流(_throttleInterval)和优先级队列(ConcurrentPriorityQueue)减少事件处理开销。

4. 最佳实践总结

4.1 动态超时调整

  • 实践
    • 使用 Func<double, TimeSpan> 动态计算超时。
    • 通过 TimeoutAdjustEvent 通知调整,线程安全替换 _cts
    • 更新 messageBox 显示超时状态。
  • 应用:油温 >100°C 时缩短超时到 10 秒,否则保持 30 秒。

4.2 复杂模块依赖取消

  • 实践
    • 定义模块依赖(SettingsModule 依赖 CoreModule)。
    • 使用 ModuleCancelEvent 按依赖顺序取消(CoreModule 取消触发 SettingsModule)。
    • 每个模块实现 Cleanup 释放资源。
  • 应用CoreModule 取消后通知 SettingsModule,更新 messageBox

4.3 性能调试

  • 实践
    • 使用 Stopwatch 记录任务执行时间。
    • 监控线程池状态(ThreadPool.GetAvailableThreads)。
    • 优化高频事件处理(限流、优先级队列)。
  • 应用:记录消息处理和取消时间,监控线程池负载,优化 DataServiceMainViewModel

4.4 注意事项

  • 线程安全:使用 Interlocked.Exchange 替换 _cts,确保 UI 更新通过 Dispatcher.InvokeAsync
  • 资源清理:释放 _cts_semaphore 和事件订阅。
  • 异常处理:捕获 OperationCanceledExceptionObjectDisposedException
  • 性能优化:结合限流、优先级队列和线程池监控。

5. 整合到你的代码

以下是完整整合代码,支持动态超时、模块依赖取消和性能调试:

5.1 MainViewModel.cs

using Prism.Commands;
using Prism.Events;
using Prism.Mvvm;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.ViewModels
{
    public class MainViewModel : BindableBase
    {
        private readonly IEventAggregator _eventAggregator;
        private string _message;
        private readonly ConcurrentPriorityQueue<(string Message, int Priority)> _messageQueue;
        private CancellationTokenSource _cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
        private readonly TimeSpan _throttleInterval = TimeSpan.FromSeconds(2);
        private DateTime _lastProcessedTime;
        private SubscriptionToken _messageSubscriptionToken;
        private SubscriptionToken _cancelSubscriptionToken;
        private SubscriptionToken _timeoutSubscriptionToken;
        private readonly Stopwatch _stopwatch = new Stopwatch();

        public string Message
        {
            get => _message;
            set => SetProperty(ref _message, value);
        }

        public DelegateCommand StopMonitoringCommand { get; }
        public DelegateCommand CancelAllModulesCommand { get; }

        public MainViewModel(IEventAggregator eventAggregator, DataService dataService)
        {
            _eventAggregator = eventAggregator;
            _messageQueue = new ConcurrentPriorityQueue<(string Message, int Priority)>(Comparer<(string Message, int Priority)>.Create((a, b) => b.Priority.CompareTo(a.Priority)));
            _messageSubscriptionToken = _eventAggregator.GetEvent<MessageEvent>().Subscribe(OnMessageReceivedAsync, ThreadOption.BackgroundThread);
            _cancelSubscriptionToken = _eventAggregator.GetEvent<ModuleCancelEvent>().Subscribe(OnModuleCancel, ThreadOption.BackgroundThread);
            _timeoutSubscriptionToken = _eventAggregator.GetEvent<TimeoutAdjustEvent>().Subscribe(OnTimeoutAdjust, ThreadOption.BackgroundThread);
            StopMonitoringCommand = new DelegateCommand(StopMonitoring);
            CancelAllModulesCommand = new DelegateCommand(CancelAllModules);

            Task.Factory.StartNew(() => ProcessMessageQueueAsync(dataService), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
            MonitorThreadPool();
        }

        private async Task OnMessageReceivedAsync((string Message, int Priority) message)
        {
            try
            {
                _stopwatch.Restart();
                _messageQueue.Enqueue(message);
                Debug.WriteLine($"Message enqueued: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = $"Error: {ex.Message}";
                });
            }
        }

        private async Task ProcessMessageQueueAsync(DataService dataService)
        {
            try
            {
                _stopwatch.Restart();
                await dataService.MonitorAsync(_cts.Token, temp => temp > 100 ? TimeSpan.FromSeconds(10) : TimeSpan.FromSeconds(30));
                Debug.WriteLine($"MonitorAsync started, Time: {_stopwatch.ElapsedMilliseconds}ms");

                while (!_cts.Token.IsCancellationRequested)
                {
                    if (_messageQueue.TryDequeue(out var message))
                    {
                        var now = DateTime.Now;
                        if (now - _lastProcessedTime < _throttleInterval)
                        {
                            await Task.Delay(_throttleInterval - (now - _lastProcessedTime), _cts.Token);
                        }

                        await _semaphore.WaitAsync(_cts.Token);
                        try
                        {
                            _stopwatch.Restart();
                            await Task.Delay(500, _cts.Token);
                            await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                            {
                                Message = message.Message;
                            });
                            _lastProcessedTime = now;
                            Debug.WriteLine($"Message processed: {message.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
                        }
                        finally
                        {
                            _semaphore.Release();
                        }
                    }
                    await Task.Delay(100, _cts.Token);
                }
            }
            catch (OperationCanceledException)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = "Monitoring cancelled or timed out.";
                });
                Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                await System.Windows.Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    Message = $"Error: {ex.Message}";
                });
                Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
        }

        private void OnTimeoutAdjust(TimeSpan newTimeout)
        {
            try
            {
                _stopwatch.Restart();
                var newCts = new CancellationTokenSource(newTimeout);
                Interlocked.Exchange(ref _cts, newCts)?.Dispose();
                Message = $"Timeout adjusted to {newTimeout.TotalSeconds} seconds.";
                Debug.WriteLine($"Timeout adjusted to {newTimeout.TotalSeconds}s, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                Message = $"Error adjusting timeout: {ex.Message}";
            }
        }

        private void OnModuleCancel(string moduleName)
        {
            if (moduleName == nameof(MainViewModel) || moduleName == "All")
            {
                StopMonitoring();
            }
        }

        private void StopMonitoring()
        {
            try
            {
                _stopwatch.Restart();
                _cts.Cancel();
                _eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
                _eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
                Message = "Monitoring stopped.";
                Debug.WriteLine($"Monitoring stopped, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (ObjectDisposedException)
            {
                // CTS 已释放,忽略
            }
        }

        private void CancelAllModules()
        {
            try
            {
                _stopwatch.Restart();
                _eventAggregator.GetEvent<ModuleCancelEvent>().Publish("All");
                StopMonitoring();
                Debug.WriteLine($"All modules cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                Message = $"Error cancelling modules: {ex.Message}";
            }
        }

        private void MonitorThreadPool()
        {
            Task.Run(async () =>
            {
                while (!_cts.Token.IsCancellationRequested)
                {
                    ThreadPool.GetMaxThreads(out int maxWorker, out int maxCompletion);
                    ThreadPool.GetAvailableThreads(out int availableWorker, out int availableCompletion);
                    Debug.WriteLine($"ThreadPool: Worker={availableWorker}/{maxWorker}, Completion={availableCompletion}/{maxCompletion}");
                    await Task.Delay(5000, _cts.Token);
                }
            });
        }

        public void Cleanup()
        {
            try
            {
                if (!_cts.IsCancellationRequested)
                    _cts.Cancel();
                _eventAggregator.GetEvent<MessageEvent>().Unsubscribe(_messageSubscriptionToken);
                _eventAggregator.GetEvent<ModuleCancelEvent>().Unsubscribe(_cancelSubscriptionToken);
                _eventAggregator.GetEvent<TimeoutAdjustEvent>().Unsubscribe(_timeoutSubscriptionToken);
            }
            finally
            {
                _cts.Dispose();
                _semaphore.Dispose();
            }
        }
    }
}

5.2 DataService.cs

using Prism.Events;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace PowerCycling.Services
{
    public class DataService
    {
        private readonly IEventAggregator _eventAggregator;
        private readonly Stopwatch _stopwatch = new Stopwatch();

        public DataService(IEventAggregator eventAggregator)
        {
            _eventAggregator = eventAggregator;
        }

        public async Task MonitorAsync(CancellationToken cancellationToken, Func<double, TimeSpan> timeoutAdjuster)
        {
            try
            {
                _stopwatch.Restart();
                await Task.Run(async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        cancellationToken.ThrowIfCancellationRequested();
                        await Task.Delay(500, cancellationToken);
                        if (Global.OilTemp > 100)
                        {
                            await Task.Delay(1000, cancellationToken);
                            _eventAggregator.GetEvent<MessageEvent>().Publish(($"Critical: OilTemp {Global.OilTemp}°C!", 2));
                            _eventAggregator.GetEvent<TimeoutAdjustEvent>().Publish(timeoutAdjuster(Global.OilTemp));
                            Debug.WriteLine($"Published critical message, Time: {_stopwatch.ElapsedMilliseconds}ms");
                        }
                        else
                        {
                            _eventAggregator.GetEvent<MessageEvent>().Publish(($"OilTemp: {Global.OilTemp}°C", 0));
                        }
                    }
                }, cancellationToken);
            }
            catch (OperationCanceledException)
            {
                _eventAggregator.GetEvent<MessageEvent>().Publish(("Monitoring cancelled or timed out.", 0));
                Debug.WriteLine($"Monitoring cancelled, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
            catch (Exception ex)
            {
                _eventAggregator.GetEvent<MessageEvent>().Publish(($"Error: {ex.Message}", 0));
                Debug.WriteLine($"Error: {ex.Message}, Time: {_stopwatch.ElapsedMilliseconds}ms");
            }
        }
    }
}

6. 总结

  • 动态超时调整
    • 使用 timeoutAdjuster 根据 Global.OilTemp 动态调整超时。
    • 通过 TimeoutAdjustEventInterlocked.Exchange 线程安全替换 _cts
    • 更新 messageBox 显示超时状态。
  • 复杂模块依赖取消
    • 定义模块依赖(SettingsModule 依赖 CoreModule)。
    • CoreModule 取消时通知 SettingsModule,通过 ModuleCancelEvent
    • 每个模块实现 Cleanup 释放资源。
  • 性能调试
    • 使用 Stopwatch 记录任务执行和取消时间。
    • 监控线程池状态,优化资源分配。
    • 结合限流和优先级队列减少高频事件开销。
  • 在你的代码中的应用
    • DataService 动态调整超时,监控 Global.OilTemp
    • MainViewModel 调度优先级消息,响应模块取消和超时调整。
    • CoreModuleSettingsModule 按依赖顺序取消。
    • messageBox 显示状态和错误信息。

如果你需要进一步优化(如更复杂的超时策略、模块间任务协调或性能分析工具整合),或者需要调试特定代码,请告诉我!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐