C# 异步编程详解
理解async/await状态机:编译器将async方法转换为状态机合理使用ConfigureAwait:库代码使用false,UI代码根据需要选择正确处理异常和取消:避免未观察到的异常和资源泄漏避免阻塞异步代码:不要使用.Result或.Wait()考虑使用ValueTask:对于可能同步完成的高频调用利用异步流:处理大量异步数据注意线程上下文:了解执行在哪个线程上恢复。
一、异步编程基础概念
1.1 为什么要异步编程?
-
避免UI线程阻塞:保持界面响应
-
提高吞吐量:I/O密集型操作不阻塞线程
-
资源高效利用:更少的线程处理更多请求
1.2 同步 vs 异步示例
csharp
// 同步方法 - 会阻塞线程
public string DownloadString(string url)
{
using var client = new WebClient();
return client.DownloadString(url); // 阻塞直到完成
}
// 异步方法 - 不会阻塞线程
public async Task<string> DownloadStringAsync(string url)
{
using var client = new HttpClient();
return await client.GetStringAsync(url); // 立即返回,后台执行
}
二、async/await 基础语法
2.1 基本结构
csharp
public class AsyncBasics
{
// 1. async 修饰符
// 2. 返回类型:Task, Task<T>, ValueTask<T> 或 void(不推荐)
// 3. 方法名通常以Async结尾
public async Task<int> GetDataAsync()
{
// 4. await 等待异步操作完成
var result = await SomeAsyncOperation();
return result;
}
private async Task<int> SomeAsyncOperation()
{
await Task.Delay(1000); // 模拟异步操作
return 42;
}
}
2.2 三种返回类型
csharp
public class ReturnTypes
{
// 1. Task<T> - 有返回值
public async Task<string> GetNameAsync()
{
await Task.Delay(100);
return "Alice";
}
// 2. Task - 无返回值
public async Task LogAsync(string message)
{
await Task.Delay(100);
Console.WriteLine(message);
}
// 3. ValueTask<T> - 轻量级,适用于可能同步完成的操作
public async ValueTask<int> CalculateAsync(bool useCache)
{
if (useCache && cachedResult != null)
return cachedResult.Value; // 同步返回
return await ComputeAsync(); // 异步计算
}
}
三、Task 深入理解
3.1 Task 状态机
csharp
public class TaskStates
{
public async Task DemonstrateTaskStates()
{
var task1 = Task.CompletedTask; // 已完成
var task2 = Task.Delay(1000); // 运行中
var task3 = Task.FromException(new Exception("Error")); // 出错
var task4 = Task.FromCanceled(new CancellationToken(true)); // 取消
// 检查状态
Console.WriteLine($"task1状态: {task1.Status}"); // RanToCompletion
Console.WriteLine($"task2状态: {task2.Status}"); // WaitingForActivation
}
}
3.2 Task 创建方式
csharp
public class TaskCreation
{
// 1. Task.Run - 线程池执行
public async Task UsingTaskRun()
{
// CPU密集型操作
var result = await Task.Run(() =>
{
// 在后台线程执行
Thread.Sleep(1000);
return ComplexCalculation();
});
}
// 2. Task.Factory.StartNew - 更高级的控制
public async Task UsingTaskFactory()
{
var task = Task.Factory.StartNew(
() => "Hello",
CancellationToken.None,
TaskCreationOptions.LongRunning, // 长时间运行任务
TaskScheduler.Default);
}
// 3. 直接返回已完成任务
public Task<string> GetCachedData()
{
if (cache.TryGetValue("key", out var data))
return Task.FromResult(data); // 无需async/await
return FetchDataAsync();
}
}
四、异常处理
4.1 异步异常捕获
csharp
public class AsyncExceptionHandling
{
public async Task HandleExceptionsAsync()
{
try
{
await DangerousOperationAsync();
}
catch (HttpRequestException ex)
{
// 处理网络异常
Console.WriteLine($"网络错误: {ex.Message}");
}
catch (OperationCanceledException)
{
// 处理取消异常
Console.WriteLine("操作被取消");
}
catch (AggregateException ex)
{
// 处理多个异常(Task.WhenAll可能抛出)
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"内部异常: {innerEx.Message}");
}
}
}
private async Task DangerousOperationAsync()
{
await Task.Delay(100);
throw new HttpRequestException("模拟网络错误");
}
}
4.2 多个任务异常处理
csharp
public class MultipleTaskExceptionHandling
{
public async Task HandleMultipleTasksAsync()
{
var task1 = Task.Run(() => throw new Exception("任务1失败"));
var task2 = Task.Run(() => throw new Exception("任务2失败"));
// 方法1:分别处理
try { await task1; } catch { /* 处理任务1异常 */ }
try { await task2; } catch { /* 处理任务2异常 */ }
// 方法2:使用WhenAll并捕获AggregateException
var allTasks = Task.WhenAll(task1, task2);
try
{
await allTasks;
}
catch
{
// 检查每个任务的异常
if (allTasks.Exception != null)
{
foreach (var ex in allTasks.Exception.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
五、取消操作
5.1 CancellationToken 使用
csharp
public class CancellationDemo
{
public async Task DownloadWithCancellationAsync(
CancellationToken cancellationToken = default)
{
try
{
using var client = new HttpClient();
// 传递取消令牌
var response = await client.GetAsync(
"https://api.example.com/data",
cancellationToken);
// 检查是否已取消
cancellationToken.ThrowIfCancellationRequested();
var content = await response.Content.ReadAsStringAsync();
}
catch (OperationCanceledException)
{
Console.WriteLine("下载被取消");
// 清理资源
}
}
public async Task ProcessWithTimeoutAsync()
{
// 创建超时取消令牌
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
await LongRunningOperationAsync(cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("操作超时");
}
}
// 支持取消的操作
public async Task LongRunningOperationAsync(CancellationToken cancellationToken)
{
for (int i = 0; i < 100; i++)
{
// 定期检查是否取消
cancellationToken.ThrowIfCancellationRequested();
// 模拟工作
await Task.Delay(100, cancellationToken);
Console.WriteLine($"进度: {i}%");
}
}
}
六、高级模式
6.1 ConfigureAwait
csharp
public class ConfigureAwaitDemo
{
// UI应用中使用ConfigureAwait(false)避免死锁
public async Task<string> GetDataWithoutContextAsync()
{
// 在库代码中使用ConfigureAwait(false)提高性能
using var client = new HttpClient();
var response = await client.GetAsync("https://api.example.com")
.ConfigureAwait(false); // 不在原始上下文恢复
return await response.Content.ReadAsStringAsync()
.ConfigureAwait(false);
}
// UI线程中需要更新界面
public async Task UpdateUIAsync()
{
var data = await GetDataWithoutContextAsync();
// 需要返回UI上下文更新界面
await Dispatcher.InvokeAsync(() =>
{
textBox.Text = data;
});
}
}
6.2 ValueTask 优化
csharp
public class ValueTaskOptimization
{
private string cachedResult;
private DateTime cacheTime;
// 使用ValueTask优化频繁调用的异步方法
public async ValueTask<string> GetDataOptimizedAsync()
{
// 检查缓存
if (cachedResult != null &&
DateTime.UtcNow - cacheTime < TimeSpan.FromMinutes(5))
{
return cachedResult; // 同步返回,不分配Task
}
// 异步获取
cachedResult = await FetchFromNetworkAsync();
cacheTime = DateTime.UtcNow;
return cachedResult;
}
}
6.3 IAsyncEnumerable(C# 8.0+)
csharp
public class AsyncStreams
{
// 异步流 - 逐项异步生成数据
public async IAsyncEnumerable<int> GenerateSequenceAsync()
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(100); // 模拟异步操作
yield return i;
}
}
public async Task ConsumeAsyncStreamAsync()
{
// 使用await foreach消费异步流
await foreach (var number in GenerateSequenceAsync())
{
Console.WriteLine(number);
}
// 带取消令牌
var cts = new CancellationTokenSource();
await foreach (var item in GenerateSequenceAsync()
.WithCancellation(cts.Token))
{
// 处理每个项目
}
}
}
七、并行与并发
7.1 Task.WhenAll 和 Task.WhenAny
csharp
public class ParallelAsync
{
public async Task WhenAllDemoAsync()
{
var urls = new[] { "url1", "url2", "url3" };
// 并行执行多个异步操作
var downloadTasks = urls.Select(url => DownloadAsync(url));
var results = await Task.WhenAll(downloadTasks);
// 所有结果都完成了
foreach (var result in results)
{
Process(result);
}
}
public async Task WhenAnyDemoAsync()
{
var tasks = new[]
{
DownloadFromSource1Async(),
DownloadFromSource2Async(),
DownloadFromSource3Async()
};
// 等待任意一个完成
var firstFinishedTask = await Task.WhenAny(tasks);
var result = await firstFinishedTask;
// 取消其他任务
foreach (var task in tasks)
{
if (!task.IsCompleted)
{
// 尝试取消或忽略
}
}
}
// 限制并发数
public async Task ProcessWithConcurrencyLimitAsync()
{
var tasks = Enumerable.Range(1, 100)
.Select(i => ProcessItemAsync(i));
// 使用SemaphoreSlim限制并发数
var semaphore = new SemaphoreSlim(5); // 最多5个并发
var limitedTasks = tasks.Select(async task =>
{
await semaphore.WaitAsync();
try
{
return await task;
}
finally
{
semaphore.Release();
}
});
var results = await Task.WhenAll(limitedTasks);
}
}
八、异步模式与最佳实践
8.1 异步初始化模式
csharp
public class AsyncLazy<T>
{
private readonly Lazy<Task<T>> instance;
public AsyncLazy(Func<Task<T>> factory)
{
instance = new Lazy<Task<T>>(factory);
}
public Task<T> Value => instance.Value;
}
// 使用示例
public class DataService
{
private readonly AsyncLazy<DatabaseConnection> connection =
new AsyncLazy<DatabaseConnection>(async () =>
{
var conn = new DatabaseConnection();
await conn.InitializeAsync();
return conn;
});
public async Task ProcessDataAsync()
{
var conn = await connection.Value; // 只初始化一次
// 使用连接
}
}
8.2 异步锁
csharp
public class AsyncLock
{
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
public async Task<IDisposable> LockAsync()
{
await semaphore.WaitAsync();
return new Releaser(semaphore);
}
private class Releaser : IDisposable
{
private readonly SemaphoreSlim semaphore;
public Releaser(SemaphoreSlim semaphore) => this.semaphore = semaphore;
public void Dispose() => semaphore.Release();
}
}
// 使用示例
public class ResourceManager
{
private readonly AsyncLock asyncLock = new AsyncLock();
public async Task UpdateResourceAsync()
{
using (await asyncLock.LockAsync())
{
// 临界区代码
await CriticalOperationAsync();
}
}
}
九、性能注意事项
9.1 避免常见陷阱
csharp
public class PerformancePitfalls
{
// 错误:同步阻塞异步代码
public string GetDataBad()
{
return GetDataAsync().Result; // 可能导致死锁
}
// 正确
public async Task<string> GetDataGood()
{
return await GetDataAsync();
}
// 错误:async void(仅用于事件处理)
public async void BadMethod()
{
await Task.Delay(100);
// 异常无法捕获,调用者不知道何时完成
}
// 正确
public async Task GoodMethod()
{
await Task.Delay(100);
}
// 避免过度异步化
public async Task<string> UnnecessaryAsync()
{
// 这个不需要异步
var data = GetFromCache();
return await Task.FromResult(data); // 多余
}
// 正确:同步完成时直接返回
public Task<string> BetterMethod()
{
if (TryGetCached(out var data))
return Task.FromResult(data);
return GetFromNetworkAsync();
}
}
十、调试与诊断
10.1 异步调用堆栈
csharp
public class AsyncDebugging
{
public async Task ComplexAsyncOperation()
{
try
{
await Step1Async();
}
catch (Exception ex)
{
// 使用ex.ToString()获取完整异步堆栈
Console.WriteLine(ex.ToString());
}
}
// 使用async/await状态机标签
public async Task Step1Async()
{
await Task.Delay(100);
await Step2Async();
}
}
// 使用Visual Studio的并行堆栈窗口
// 使用System.Diagnostics.Activity跟踪异步流
10.2 异步日志记录
csharp
public class AsyncLogger
{
public async Task LogWithContextAsync()
{
// 记录异步操作上下文
var operationId = Guid.NewGuid();
using (LogContext.PushProperty("OperationId", operationId))
{
await PerformOperationAsync();
}
}
}
总结
关键要点:
-
理解async/await状态机:编译器将async方法转换为状态机
-
合理使用ConfigureAwait:库代码使用false,UI代码根据需要选择
-
正确处理异常和取消:避免未观察到的异常和资源泄漏
-
避免阻塞异步代码:不要使用.Result或.Wait()
-
考虑使用ValueTask:对于可能同步完成的高频调用
-
利用异步流:处理大量异步数据
-
注意线程上下文:了解执行在哪个线程上恢复
最佳实践:
-
异步方法名以Async结尾
-
避免async void(除了事件处理程序)
-
在库代码中始终使用ConfigureAwait(false)
-
及时处理取消请求
-
考虑并发限制,避免资源耗尽
-
使用合适的异步模式(如异步初始化、异步锁)
通过深入理解这些概念,你可以编写出高效、可维护且健壮的异步C#代码。
补充详解
一、异步编程的底层原理
1.1 状态机实现原理
csharp
// 编译器将async方法转换为状态机
public class AsyncStateMachineDemo
{
// 原始async方法
public async Task<int> ComputeAsync()
{
var result1 = await Step1Async();
var result2 = await Step2Async(result1);
return result1 + result2;
}
// 编译器转换后的伪代码
[StructLayout(LayoutKind.Auto)]
private struct ComputeAsyncStateMachine : IAsyncStateMachine
{
public int _state;
public AsyncTaskMethodBuilder<int> _builder;
public ComputeAsyncStateMachine _this;
private TaskAwaiter<int> _awaiter1;
private TaskAwaiter<int> _awaiter2;
private int _result1;
void IAsyncStateMachine.MoveNext()
{
int result;
try
{
if (_state == 0)
{
// 状态0:开始执行Step1Async
_awaiter1 = Step1Async().GetAwaiter();
if (!_awaiter1.IsCompleted)
{
_state = 1;
_builder.AwaitUnsafeOnCompleted(ref _awaiter1, ref this);
return;
}
}
else if (_state == 1)
{
// 状态1:Step1Async完成,继续执行
}
// ... 其他状态
}
catch (Exception ex)
{
_builder.SetException(ex);
return;
}
_builder.SetResult(result);
}
}
}
1.2 GetAwaiter和Awaiter模式
csharp
public class CustomAwaitable
{
// 实现可等待模式
public class CustomTaskAwaiter<TResult> : INotifyCompletion
{
private readonly Task<TResult> _task;
private Action _continuation;
public bool IsCompleted => _task.IsCompleted;
public CustomTaskAwaiter(Task<TResult> task) => _task = task;
public TResult GetResult() => _task.GetAwaiter().GetResult();
public void OnCompleted(Action continuation)
{
_continuation = continuation;
_task.ContinueWith(_ => continuation());
}
}
public class CustomAwaitable<T>
{
private readonly Task<T> _task;
public CustomAwaitable(Task<T> task) => _task = task;
public CustomTaskAwaiter<T> GetAwaiter() =>
new CustomTaskAwaiter<T>(_task);
}
}
二、高级异步模式
2.1 异步缓存模式
csharp
public class AsyncCache<TKey, TValue>
{
private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _cache;
private readonly Func<TKey, Task<TValue>> _valueFactory;
public AsyncCache(Func<TKey, Task<TValue>> valueFactory)
{
_valueFactory = valueFactory;
_cache = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();
}
public Task<TValue> GetAsync(TKey key)
{
return _cache.GetOrAdd(key, k =>
new Lazy<Task<TValue>>(() => _valueFactory(k), true)
).Value;
}
// 带过期时间的缓存
public class AsyncCacheWithExpiration<TKey, TValue>
{
private readonly ConcurrentDictionary<TKey, CacheItem> _cache;
private readonly TimeSpan _expiration;
private class CacheItem
{
public Task<TValue> Task { get; set; }
public DateTime ExpiresAt { get; set; }
public bool IsExpired => DateTime.UtcNow >= ExpiresAt;
}
public async Task<TValue> GetOrAddAsync(TKey key, Func<TKey, Task<TValue>> factory)
{
if (_cache.TryGetValue(key, out var item) && !item.IsExpired)
return await item.Task;
var newItem = new CacheItem
{
Task = factory(key),
ExpiresAt = DateTime.UtcNow.Add(_expiration)
};
_cache[key] = newItem;
return await newItem.Task;
}
}
}
2.2 异步批处理模式
csharp
public class AsyncBatchProcessor<T>
{
private readonly int _batchSize;
private readonly TimeSpan _batchTimeout;
private readonly Func<List<T>, Task> _processBatch;
private readonly object _lock = new object();
private List<T> _currentBatch;
private Timer _timeoutTimer;
public AsyncBatchProcessor(
int batchSize,
TimeSpan batchTimeout,
Func<List<T>, Task> processBatch)
{
_batchSize = batchSize;
_batchTimeout = batchTimeout;
_processBatch = processBatch;
_currentBatch = new List<T>(batchSize);
}
public async Task AddAsync(T item)
{
Task processTask = null;
lock (_lock)
{
_currentBatch.Add(item);
// 重置或启动超时计时器
_timeoutTimer?.Dispose();
_timeoutTimer = new Timer(
_ => ProcessBatchAsync(),
null,
_batchTimeout,
Timeout.InfiniteTimeSpan);
// 检查是否达到批量大小
if (_currentBatch.Count >= _batchSize)
{
processTask = ProcessBatchAsync();
_timeoutTimer?.Dispose();
_timeoutTimer = null;
}
}
if (processTask != null)
await processTask;
}
private async Task ProcessBatchAsync()
{
List<T> batchToProcess;
lock (_lock)
{
batchToProcess = _currentBatch;
_currentBatch = new List<T>(_batchSize);
_timeoutTimer?.Dispose();
_timeoutTimer = null;
}
if (batchToProcess.Count > 0)
await _processBatch(batchToProcess);
}
}
2.3 异步调度器
csharp
public class AsyncTaskScheduler : TaskScheduler
{
private readonly ConcurrentQueue<Task> _taskQueue = new();
private readonly SemaphoreSlim _semaphore;
private readonly CancellationTokenSource _cts = new();
private int _activeTasks;
public AsyncTaskScheduler(int maxDegreeOfParallelism)
{
_semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
}
protected override void QueueTask(Task task)
{
_taskQueue.Enqueue(task);
ProcessQueue();
}
private async void ProcessQueue()
{
while (!_cts.Token.IsCancellationRequested &&
_taskQueue.TryDequeue(out var task))
{
await _semaphore.WaitAsync(_cts.Token);
Interlocked.Increment(ref _activeTasks);
_ = Task.Run(async () =>
{
try
{
TryExecuteTask(task);
}
finally
{
Interlocked.Decrement(ref _activeTasks);
_semaphore.Release();
ProcessQueue(); // 处理下一个任务
}
}, _cts.Token);
}
}
protected override bool TryExecuteTaskInline(
Task task,
bool taskWasPreviouslyQueued)
{
// 内联执行策略
return false; // 或根据条件决定
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _taskQueue.ToArray();
}
}
三、异步与并发集合
3.1 异步生产者消费者队列
csharp
public class AsyncProducerConsumerQueue<T> : IAsyncEnumerable<T>
{
private readonly Queue<T> _queue = new();
private readonly SemaphoreSlim _itemsAvailable = new(0);
private readonly SemaphoreSlim _spaceAvailable;
private readonly object _lock = new();
private bool _completed;
public AsyncProducerConsumerQueue(int maxCapacity)
{
_spaceAvailable = new SemaphoreSlim(maxCapacity);
}
public async Task EnqueueAsync(T item, CancellationToken ct = default)
{
await _spaceAvailable.WaitAsync(ct);
lock (_lock)
{
if (_completed)
throw new InvalidOperationException("队列已关闭");
_queue.Enqueue(item);
}
_itemsAvailable.Release();
}
public async Task<T> DequeueAsync(CancellationToken ct = default)
{
await _itemsAvailable.WaitAsync(ct);
T item;
lock (_lock)
{
if (_queue.Count == 0 && _completed)
throw new InvalidOperationException("队列已关闭且为空");
item = _queue.Dequeue();
}
_spaceAvailable.Release();
return item;
}
public void Complete()
{
lock (_lock)
{
_completed = true;
_itemsAvailable.Release(int.MaxValue); // 唤醒所有等待者
}
}
public async IAsyncEnumerator<T> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
while (true)
{
try
{
yield return await DequeueAsync(cancellationToken);
}
catch (InvalidOperationException) when (_completed)
{
break;
}
}
}
}
3.2 异步限流队列
csharp
public class ThrottledAsyncQueue<T>
{
private readonly Channel<T> _channel;
private readonly TimeSpan _interval;
private readonly int _maxItemsPerInterval;
public ThrottledAsyncQueue(
TimeSpan interval,
int maxItemsPerInterval)
{
_interval = interval;
_maxItemsPerInterval = maxItemsPerInterval;
_channel = Channel.CreateUnbounded<T>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = true
});
}
public async Task ProcessAsync(
Func<T, Task> processor,
CancellationToken ct = default)
{
var reader = _channel.Reader;
var semaphore = new SemaphoreSlim(_maxItemsPerInterval);
var resetTimer = new Timer(_ => semaphore.Release(_maxItemsPerInterval));
resetTimer.Change(_interval, _interval);
try
{
await foreach (var item in reader.ReadAllAsync(ct))
{
await semaphore.WaitAsync(ct);
_ = ProcessItemAsync(item, processor);
}
}
finally
{
resetTimer.Dispose();
}
}
private async Task ProcessItemAsync(
T item,
Func<T, Task> processor)
{
try
{
await processor(item);
}
catch
{
// 记录日志,但不重新抛出以避免中断整个流程
}
}
public ValueTask EnqueueAsync(T item, CancellationToken ct = default)
{
return _channel.Writer.WriteAsync(item, ct);
}
}
四、异步性能优化
4.1 避免不必要的async/await
csharp
public class AsyncPerformanceOptimizations
{
// 情况1:简单传递任务
public Task<string> GetDataAsync()
{
return FetchDataAsync(); // 直接返回Task,不需要async/await
}
// 情况2:同步完成时
public Task<int> GetCachedValueAsync(int key)
{
if (_cache.TryGetValue(key, out var value))
return Task.FromResult(value); // 避免状态机分配
return GetFromSourceAsync(key);
}
// 情况3:使用ValueTask避免分配
public async ValueTask<DateTime> GetCurrentTimeAsync(bool useUtc)
{
if (useUtc)
return DateTime.UtcNow; // 同步路径
await Task.Yield(); // 异步路径
return DateTime.Now;
}
}
4.2 使用Pipelines进行高性能I/O
csharp
public class AsyncPipelineExample
{
public async Task ProcessFileWithPipelineAsync(string filePath)
{
await using var fileStream = File.OpenRead(filePath);
var pipe = new Pipe();
// 写入任务
var writing = FillPipeAsync(fileStream, pipe.Writer);
// 读取和处理任务
var reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
var memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
break;
writer.Advance(bytesRead);
var result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
// 处理数据
await ProcessBufferAsync(buffer);
reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
await reader.CompleteAsync();
}
}
4.3 异步对象池
csharp
public class AsyncObjectPool<T> where T : class
{
private readonly ConcurrentQueue<T> _pool = new();
private readonly Func<Task<T>> _factory;
private readonly Action<T> _reset;
private readonly SemaphoreSlim _semaphore;
private int _createdCount;
private readonly int _maxSize;
public AsyncObjectPool(
int maxSize,
Func<Task<T>> factory,
Action<T> reset = null)
{
_maxSize = maxSize;
_factory = factory;
_reset = reset;
_semaphore = new SemaphoreSlim(maxSize);
}
public async Task<PooledObject<T>> RentAsync(CancellationToken ct = default)
{
await _semaphore.WaitAsync(ct);
if (_pool.TryDequeue(out var item))
{
_reset?.Invoke(item);
return new PooledObject<T>(item, Return);
}
if (Interlocked.Increment(ref _createdCount) <= _maxSize)
{
item = await _factory();
return new PooledObject<T>(item, Return);
}
Interlocked.Decrement(ref _createdCount);
_semaphore.Release();
throw new InvalidOperationException("对象池已满");
}
private void Return(T item)
{
_pool.Enqueue(item);
_semaphore.Release();
}
}
public readonly struct PooledObject<T> : IDisposable where T : class
{
private readonly T _value;
private readonly Action<T> _returnAction;
public T Value => _value ?? throw new ObjectDisposedException(nameof(PooledObject<T>));
public PooledObject(T value, Action<T> returnAction)
{
_value = value;
_returnAction = returnAction;
}
public void Dispose()
{
_returnAction?.Invoke(_value);
}
}
五、异步异常高级处理
5.1 异常筛选器
csharp
public class AsyncExceptionFilters
{
public async Task ProcessWithExceptionFilterAsync()
{
try
{
await DangerousOperationAsync();
}
// 使用异常筛选器
catch (HttpRequestException ex)
when (ex.StatusCode == HttpStatusCode.NotFound)
{
// 只处理404错误
await HandleNotFoundAsync();
}
catch (HttpRequestException ex)
when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
// 处理速率限制
await HandleRateLimitAsync(ex);
}
catch (Exception ex)
when (LogException(ex)) // 筛选器可以调用方法
{
// 如果LogException返回true,会进入这里
throw;
}
}
private bool LogException(Exception ex)
{
Console.WriteLine($"记录异常: {ex.Message}");
return true;
}
}
5.2 聚合异常解包
csharp
public class AggregateExceptionUnwrapping
{
public async Task HandleMultipleExceptionsGracefullyAsync()
{
var tasks = new List<Task>
{
Task.Run(() => throw new InvalidOperationException("错误1")),
Task.Run(() => throw new ArgumentException("错误2")),
Task.Run(() => Task.CompletedTask)
};
try
{
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
// 处理聚合异常中的第一个异常
var firstException = ex is AggregateException aggEx
? aggEx.InnerExceptions.FirstOrDefault()
: ex;
Console.WriteLine($"第一个错误: {firstException?.Message}");
// 或者处理所有异常
if (ex is AggregateException aggregate)
{
foreach (var innerEx in aggregate.Flatten().InnerExceptions)
{
Console.WriteLine($"错误: {innerEx.Message}");
}
}
}
// 检查每个任务的状态
foreach (var task in tasks)
{
if (task.IsFaulted)
{
Console.WriteLine($"任务失败: {task.Exception?.Message}");
}
}
}
}
六、异步与依赖注入
6.1 异步工厂模式
csharp
public interface IAsyncFactory<T>
{
Task<T> CreateAsync();
}
public class AsyncDatabaseConnectionFactory : IAsyncFactory<IDbConnection>
{
private readonly string _connectionString;
public AsyncDatabaseConnectionFactory(string connectionString)
{
_connectionString = connectionString;
}
public async Task<IDbConnection> CreateAsync()
{
var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
return connection;
}
}
// 在DI容器中注册
services.AddSingleton<IAsyncFactory<IDbConnection>>(
sp => new AsyncDatabaseConnectionFactory(connectionString));
6.2 异步初始化组件
csharp
public interface IAsyncInitializable
{
Task InitializeAsync(CancellationToken ct = default);
}
public class DatabaseService : IAsyncInitializable
{
private IDbConnection _connection;
public async Task InitializeAsync(CancellationToken ct = default)
{
_connection = await CreateConnectionAsync(ct);
await SeedDataAsync(ct);
}
// 应用启动时初始化
public class StartupInitializer
{
private readonly IEnumerable<IAsyncInitializable> _services;
public StartupInitializer(IEnumerable<IAsyncInitializable> services)
{
_services = services;
}
public async Task InitializeAllAsync(CancellationToken ct = default)
{
var tasks = _services.Select(s => s.InitializeAsync(ct));
await Task.WhenAll(tasks);
}
}
}
七、异步调试技巧
7.1 可视化异步调用
csharp
public class AsyncDebuggingHelpers
{
[DebuggerStepThrough]
public static async Task<T> WithDebugInfo<T>(
Task<T> task,
string operationName)
{
Console.WriteLine($"开始异步操作: {operationName}");
var sw = Stopwatch.StartNew();
try
{
var result = await task;
Console.WriteLine($"完成异步操作 {operationName}, 耗时: {sw.Elapsed}");
return result;
}
catch (Exception ex)
{
Console.WriteLine($"异步操作 {operationName} 失败: {ex.Message}");
throw;
}
}
// 使用Caller信息自动获取操作名
public static async Task<T> TraceAsync<T>(
Task<T> task,
[CallerMemberName] string caller = "",
[CallerFilePath] string file = "",
[CallerLineNumber] int line = 0)
{
var operation = $"{caller} at {Path.GetFileName(file)}:{line}";
return await WithDebugInfo(task, operation);
}
}
7.2 异步死锁检测
csharp
public class AsyncDeadlockDetector
{
private readonly ConcurrentDictionary<int, DeadlockInfo> _activeOperations = new();
private readonly Timer _detectionTimer;
private class DeadlockInfo
{
public string Operation { get; set; }
public DateTime StartedAt { get; set; }
public StackTrace CreationStackTrace { get; set; }
}
public AsyncDeadlockDetector()
{
_detectionTimer = new Timer(CheckForDeadlocks, null,
TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
}
public IDisposable BeginOperation(string operationName)
{
var id = Task.CurrentId ?? Environment.CurrentManagedThreadId;
var info = new DeadlockInfo
{
Operation = operationName,
StartedAt = DateTime.UtcNow,
CreationStackTrace = new StackTrace(true)
};
_activeOperations[id] = info;
return new DisposableAction(() => _activeOperations.TryRemove(id, out _));
}
private void CheckForDeadlocks(object state)
{
var now = DateTime.UtcNow;
var timeout = TimeSpan.FromMinutes(5);
foreach (var kvp in _activeOperations)
{
if (now - kvp.Value.StartedAt > timeout)
{
Console.WriteLine($"检测到可能死锁: {kvp.Value.Operation}");
Console.WriteLine($"创建堆栈: {kvp.Value.CreationStackTrace}");
// 触发断点或记录到监控系统
Debugger.Break();
}
}
}
}
八、跨平台异步注意事项
8.1 平台特定的异步行为
csharp
public class PlatformAwareAsync
{
#if WINDOWS
// Windows特定的异步优化
public async Task WindowsSpecificAsync()
{
// 使用Windows特有的API
await Task.CompletedTask;
}
#elif LINUX
// Linux特定的异步处理
public async Task LinuxSpecificAsync()
{
// Linux特定的文件I/O优化
await Task.Delay(1);
}
#endif
// 通用的平台检测
public async Task PlatformAwareOperationAsync()
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
await WindowsOptimizedAsync();
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
await LinuxOptimizedAsync();
}
}
}
8.2 异步与线程文化
csharp
public class CultureAwareAsync
{
// 保持异步操作中的文化上下文
public static async Task WithCultureContextAsync(Func<Task> operation)
{
var originalCulture = Thread.CurrentThread.CurrentCulture;
var originalUICulture = Thread.CurrentThread.CurrentUICulture;
try
{
await operation();
}
finally
{
Thread.CurrentThread.CurrentCulture = originalCulture;
Thread.CurrentThread.CurrentUICulture = originalUICulture;
}
}
// 使用ExecutionContext保持上下文流动
public static async Task<T> WithExecutionContextAsync<T>(Func<Task<T>> operation)
{
var executionContext = ExecutionContext.Capture();
return await Task.Run(async () =>
{
ExecutionContext.Restore(executionContext);
return await operation();
});
}
}
九、未来发展趋势
9.1 .NET 7+ 异步改进
csharp
public class DotNet7AsyncFeatures
{
// 1. 静态接口方法中的异步支持
public interface IAsyncProcessor<T>
{
static abstract Task<T> ProcessAsync(T input);
}
// 2. 更高效的时间API
public async Task ModernTimerAsync()
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (await timer.WaitForNextTickAsync())
{
// 定时执行任务
await ProcessTickAsync();
}
}
// 3. 改进的ValueTask
public async ValueTask<int> OptimizedValueTaskAsync()
{
await Task.Yield();
return 42;
}
}
9.2 异步与Source Generators
csharp
// 使用Source Generators生成高性能异步代码
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
public async ValueTask<int> GeneratedAsyncMethod()
{
await Task.Delay(100);
return 42;
}
// 自定义AsyncMethodBuilder减少分配
public class PoolingAsyncValueTaskMethodBuilder<TResult>
{
// 实现自定义状态机池化
}
十、实用工具类汇总
10.1 异步工具类
csharp
public static class AsyncHelpers
{
// 异步重试
public static async Task<T> RetryAsync<T>(
Func<Task<T>> operation,
int maxRetries,
Func<Exception, TimeSpan> delayProvider)
{
for (int retry = 0; retry < maxRetries; retry++)
{
try
{
return await operation();
}
catch (Exception ex) when (retry < maxRetries - 1)
{
var delay = delayProvider(ex);
await Task.Delay(delay);
}
}
return await operation(); // 最后一次尝试
}
// 异步超时
public static async Task<T> WithTimeoutAsync<T>(
Task<T> task,
TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
var timeoutTask = Task.Delay(timeout, cts.Token);
var completedTask = await Task.WhenAny(task, timeoutTask);
if (completedTask == timeoutTask)
throw new TimeoutException();
cts.Cancel(); // 取消延迟任务
return await task;
}
// 异步延迟队列
public static async Task DelayAsync(
TimeSpan delay,
CancellationToken ct = default)
{
if (delay <= TimeSpan.Zero)
return;
var tcs = new TaskCompletionSource<bool>();
using var timer = new Timer(_ => tcs.TrySetResult(true));
timer.Change(delay, Timeout.InfiniteTimeSpan);
using var registration = ct.Register(() =>
tcs.TrySetCanceled());
await tcs.Task;
}
}
总结补充
关键补充点:
-
深入理解状态机:async/await本质是编译器生成的复杂状态机
-
自定义Awaitable:可以实现自己的可等待类型
-
高级模式应用:批处理、缓存、调度等复杂场景
-
性能极致优化:避免分配、使用ValueTask、对象池
-
异常处理进阶:异常筛选器、聚合异常解包
-
并发集合集成:与Channel、管道等新API结合
-
跨平台兼容性:不同平台的异步行为差异
-
调试诊断工具:可视化、死锁检测等实用工具
-
依赖注入集成:异步工厂、异步初始化模式
-
未来发展趋势:.NET新版本异步改进
更多推荐


所有评论(0)