第20章 异步编程

文章目录
第20章 异步编程
异步编程是现代软件开发中的重要范式,它允许程序在等待I/O操作(如网络请求、文件读写等)时执行其他任务,从而大大提高程序的并发性能和资源利用率。Rust的异步编程模型以其零成本抽象和高性能而闻名,本章将深入探讨Rust异步编程的各个方面。
20.1 async/await语法
Rust通过async和await关键字提供了直观的异步编程语法,让开发者能够以近乎同步的方式编写异步代码,同时保持高性能。
异步函数和异步块
在Rust中,任何函数都可以通过添加async关键字转变为异步函数。异步函数在调用时不会立即执行,而是返回一个实现了Future trait的类型。
// 基本的异步函数
async fn simple_async_function() -> u32 {
println!("Starting async function");
42
}
// 异步方法
struct AsyncProcessor;
impl AsyncProcessor {
async fn process_data(&self, data: &str) -> String {
format!("Processed: {}", data)
}
}
// 异步块
async fn async_block_example() {
let result = async {
println!("Inside async block");
"Hello from async block"
}.await;
println!("{}", result);
}
// 在trait中使用异步方法(需要async-trait crate或使用nightly特性)
// 使用async-trait crate的示例
// #[async_trait]
// trait DataFetcher {
// async fn fetch(&self) -> Result<String, Box<dyn Error>>;
// }
fn demonstrate_async_basics() {
// 异步函数返回Future,需要执行器来运行
let future = simple_async_function();
println!("Async function returned a Future: {:?}", future);
// 在实际应用中,我们会使用运行时来执行这些future
}
await关键字和Future执行
await关键字用于等待异步操作的完成。它只能在异步上下文中使用,并且会挂起当前任务直到Future完成。
use std::time::Duration;
use tokio::time::sleep;
// 模拟异步操作
async fn simulated_async_work(name: &str, duration_ms: u64) -> String {
println!("[{}] Starting work", name);
sleep(Duration::from_millis(duration_ms)).await;
println!("[{}] Work completed", name);
format!("Result from {}", name)
}
// 顺序执行异步操作
async fn sequential_execution() {
println!("=== Sequential Execution ===");
let start = std::time::Instant::now();
let result1 = simulated_async_work("Task 1", 500).await;
println!("Got: {}", result1);
let result2 = simulated_async_work("Task 2", 300).await;
println!("Got: {}", result2);
let result3 = simulated_async_work("Task 3", 200).await;
println!("Got: {}", result3);
let duration = start.elapsed();
println!("Total time: {:?}", duration);
}
// 使用join并发执行
async fn concurrent_execution() {
println!("\n=== Concurrent Execution ===");
let start = std::time::Instant::now();
// 使用tokio::join!并发执行多个future
let (result1, result2, result3) = tokio::join!(
simulated_async_work("Task A", 500),
simulated_async_work("Task B", 300),
simulated_async_work("Task C", 200)
);
println!("Got: {}, {}, {}", result1, result2, result3);
let duration = start.elapsed();
println!("Total time: {:?}", duration);
}
// 使用select处理多个异步操作
async fn selective_execution() {
println!("\n=== Selective Execution ===");
let mut fast_task = simulated_async_work("Fast Task", 100);
let mut slow_task = simulated_async_work("Slow Task", 500);
tokio::select! {
result = &mut fast_task => {
println!("Fast task finished first: {}", result);
}
result = &mut slow_task => {
println!("Slow task finished first: {}", result);
}
}
// 另一个任务可能还在运行,我们可以选择等待它
// 或者直接丢弃它(会被取消)
}
#[tokio::main]
async fn main() {
sequential_execution().await;
concurrent_execution().await;
selective_execution().await;
}
错误处理 in Async Code
异步代码中的错误处理与同步代码类似,但有一些特殊的考虑因素。
use std::io;
use tokio::fs;
// 异步错误处理
async fn read_file_async(filename: &str) -> Result<String, io::Error> {
let content = fs::read_to_string(filename).await?;
Ok(content)
}
// 组合多个异步操作的错误处理
async fn process_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
// 使用?在异步函数中传播错误
let file1 = read_file_async("file1.txt").await?;
let file2 = read_file_async("file2.txt").await?;
println!("File1: {}", file1);
println!("File2: {}", file2);
Ok(())
}
// 使用try_join!处理并发操作的错误
async fn process_files_concurrently() -> Result<(), Box<dyn std::error::Error>> {
let (result1, result2) = tokio::try_join!(
read_file_async("file1.txt"),
read_file_async("file2.txt")
)?;
println!("File1: {}", result1);
println!("File2: {}", result2);
Ok(())
}
// 超时处理
async fn operation_with_timeout() -> Result<String, Box<dyn std::error::Error>> {
use tokio::time::timeout;
let slow_operation = async {
sleep(Duration::from_secs(10)).await;
"Operation completed".to_string()
};
// 设置5秒超时
match timeout(Duration::from_secs(5), slow_operation).await {
Ok(result) => Ok(result),
Err(_) => Err("Operation timed out".into()),
}
}
// 重试机制
async fn retry_async_operation<F, T, E>(
mut operation: F,
max_retries: usize,
) -> Result<T, E>
where
F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
E: std::fmt::Debug,
{
for attempt in 0..=max_retries {
match operation().await {
Ok(result) => return Ok(result),
Err(e) if attempt == max_retries => return Err(e),
Err(e) => {
println!("Attempt {} failed: {:?}, retrying...", attempt + 1, e);
sleep(Duration::from_millis(100 * 2u64.pow(attempt as u32))).await;
}
}
}
unreachable!()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 错误处理示例
match read_file_async("nonexistent.txt").await {
Ok(content) => println!("File content: {}", content),
Err(e) => println!("Error reading file: {}", e),
}
// 超时示例
match operation_with_timeout().await {
Ok(result) => println!("Success: {}", result),
Err(e) => println!("Error: {}", e),
}
// 重试示例
let mut attempts = 0;
let result = retry_async_operation(
|| {
attempts += 1;
Box::pin(async move {
if attempts < 3 {
Err("Temporary failure")
} else {
Ok("Success")
}
})
},
3,
).await;
println!("Retry result: {:?}", result);
Ok(())
}
20.2 Future trait执行
理解Future trait是掌握Rust异步编程的关键。Future代表了一个可能还没有就绪的值,它构成了Rust异步编程的基础。
Future trait详解
Future trait定义了异步计算的基本接口。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
// 自定义Future实现
struct SimpleTimer {
duration: Duration,
elapsed: bool,
}
impl SimpleTimer {
fn new(duration: Duration) -> Self {
Self {
duration,
elapsed: false,
}
}
}
impl Future for SimpleTimer {
type Output = &'static str;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.elapsed {
Poll::Ready("Timer finished!")
} else {
// 在实际实现中,这里会设置一个waker来在时间到时唤醒任务
// 这里简化实现,直接标记为就绪
self.elapsed = true;
// 克隆waker以便在定时器完成后唤醒任务
let waker = cx.waker().clone();
// 在实际场景中,我们会在这里启动一个定时器
// 当定时器到期时调用waker.wake()
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
waker.wake();
});
Poll::Pending
}
}
}
// 组合Future
struct AndThen<F1, F2> {
first: F1,
second: Option<F2>,
}
impl<F1, F2> AndThen<F1, F2> {
fn new(first: F1, second: F2) -> Self {
Self {
first,
second: Some(second),
}
}
}
impl<F1, F2, A, B, E> Future for AndThen<F1, F2>
where
F1: Future<Output = Result<A, E>>,
F2: Future<Output = Result<B, E>>,
{
type Output = Result<B, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(second) = &mut self.second {
// 轮询第一个future
match Pin::new(&mut self.first).poll(cx) {
Poll::Ready(Ok(_)) => {
// 第一个future成功完成,开始轮询第二个
let second_future = self.second.take().unwrap();
Pin::new(&mut Some(second_future)).poll(cx)
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
} else {
// 这种情况不应该发生
panic!("AndThen future in invalid state");
}
}
}
// 使用自定义Future
async fn use_custom_future() {
println!("Starting custom timer...");
let result = SimpleTimer::new(Duration::from_secs(1)).await;
println!("Custom timer: {}", result);
}
#[tokio::main]
async fn main() {
use_custom_future().await;
}
Pin和内存安全
Pin是Rust异步编程中的关键概念,它确保了Future在内存中的位置不会改变,这对于自引用结构体至关重要。
use std::marker::PhantomPinned;
use std::pin::Pin;
// 自引用结构体示例
struct SelfReferential {
data: String,
pointer_to_data: *const String,
_pin: PhantomPinned, // 标记为!Unpin
}
impl SelfReferential {
fn new(data: String) -> Self {
Self {
data,
pointer_to_data: std::ptr::null(),
_pin: PhantomPinned,
}
}
fn init(self: Pin<&mut Self>) {
let self_ptr: *const String = &self.data;
unsafe {
let this = self.get_unchecked_mut();
this.pointer_to_data = self_ptr;
}
}
fn get_data(self: Pin<&Self>) -> &str {
unsafe {
&*self.pointer_to_data
}
}
}
// 为SelfReferential实现Future
impl Future for SelfReferential {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready("SelfReferential future completed")
}
}
// Pin的实用示例
async fn pin_examples() {
// 栈上固定
let mut value = SelfReferential::new("hello".to_string());
let mut pinned_value = unsafe { Pin::new_unchecked(&mut value) };
pinned_value.as_mut().init();
println!("Data through pointer: {}", pinned_value.as_ref().get_data());
// 堆上固定
let boxed_value = Box::pin(SelfReferential::new("world".to_string()));
// 使用pin!宏(Rust 1.68+)
// let pinned_value = pin!(SelfReferential::new("hello".to_string()));
}
// 处理Unpin类型
async fn unpin_examples() {
// 大多数类型都是Unpin的,可以自由移动
let mut x = 5;
let pinned_x = Pin::new(&mut x);
// Unpin类型可以被安全地移动
let mut y = 10;
let pinned_y = Pin::new(&mut y);
// 即使被pin住,Unpin类型仍然可以移动
std::mem::swap(&mut x, &mut y);
}
#[tokio::main]
async fn main() {
pin_examples().await;
unpin_examples().await;
}
手动实现复杂的Future
对于高级用例,我们可能需要手动实现复杂的Future。
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
// 异步通道实现
struct AsyncChannel<T> {
queue: Arc<Mutex<VecDeque<T>>>,
wakers: Arc<Mutex<Vec<std::task::Waker>>>,
}
impl<T> AsyncChannel<T> {
fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
wakers: Arc::new(Mutex::new(Vec::new())),
}
}
fn send(&self, value: T) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(value);
// 唤醒一个等待的接收者
let mut wakers = self.wakers.lock().unwrap();
if let Some(waker) = wakers.pop() {
waker.wake();
}
}
fn try_receive(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop_front()
}
}
// 接收Future
struct ReceiveFuture<T> {
channel: Arc<AsyncChannel<T>>,
}
impl<T> ReceiveFuture<T> {
fn new(channel: &AsyncChannel<T>) -> Self {
Self {
channel: Arc::clone(&channel.queue),
}
}
}
impl<T> Future for ReceiveFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 尝试立即接收
if let Some(value) = self.channel.try_receive() {
return Poll::Ready(value);
}
// 没有数据可用,注册waker
let mut wakers = self.channel.wakers.lock().unwrap();
wakers.push(cx.waker().clone());
Poll::Pending
}
}
// 使用自定义通道
async fn use_async_channel() {
let channel = Arc::new(AsyncChannel::new());
let channel_clone = Arc::clone(&channel);
// 生产者任务
let producer = tokio::spawn(async move {
for i in 0..5 {
channel_clone.send(i);
println!("Sent: {}", i);
sleep(Duration::from_millis(100)).await;
}
});
// 消费者任务
let consumer = tokio::spawn(async move {
for _ in 0..5 {
let receive_future = ReceiveFuture::new(&channel);
let value = receive_future.await;
println!("Received: {}", value);
}
});
let _ = tokio::join!(producer, consumer);
}
#[tokio::main]
async fn main() {
use_async_channel().await;
}
20.3 异步运行时选择
Rust的标准库只提供了Future trait,实际的异步执行需要依赖第三方运行时。目前最流行的运行时是tokio和async-std。
Tokio运行时
Tokio是功能最全面的异步运行时,提供了完整的异步I/O、定时器、同步原语等。
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, Mutex, RwLock};
use std::sync::Arc;
// Tokio TCP服务器示例
async fn tcp_server() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
// 使用共享状态
let connection_count = Arc::new(Mutex::new(0u32));
loop {
let (socket, addr) = listener.accept().await?;
println!("Accepted connection from: {}", addr);
let count = Arc::clone(&connection_count);
tokio::spawn(async move {
// 更新连接计数
let mut guard = count.lock().await;
*guard += 1;
println!("Active connections: {}", *guard);
drop(guard); // 尽早释放锁
if let Err(e) = handle_client(socket).await {
eprintln!("Error handling client: {}", e);
}
// 连接关闭,减少计数
let mut guard = count.lock().await;
*guard -= 1;
println!("Active connections: {}", *guard);
});
}
}
async fn handle_client(mut socket: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).await?;
if n == 0 {
return Ok(());
}
// 回显接收到的数据
socket.write_all(&buf[0..n]).await?;
}
}
// Tokio通道示例
async fn channel_example() {
let (tx, mut rx) = mpsc::channel(32);
// 生产者任务
let producer = tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.expect("Failed to send");
sleep(Duration::from_millis(100)).await;
}
});
// 消费者任务
let consumer = tokio::spawn(async move {
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
});
tokio::join!(producer, consumer);
}
// Tokio同步原语
async fn synchronization_examples() {
// Mutex示例
let mutex = Arc::new(Mutex::new(0));
let mut handles = vec![];
for i in 0..10 {
let mutex_clone = Arc::clone(&mutex);
let handle = tokio::spawn(async move {
let mut guard = mutex_clone.lock().await;
*guard += i;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let value = mutex.lock().await;
println!("Final mutex value: {}", *value);
// RwLock示例
let rwlock = Arc::new(RwLock::new(String::new()));
let reader_lock = Arc::clone(&rwlock);
let writer = tokio::spawn(async move {
let mut guard = rwlock.write().await;
*guard = "Hello, World!".to_string();
});
let reader = tokio::spawn(async move {
// 等待写入完成
sleep(Duration::from_millis(10)).await;
let guard = reader_lock.read().await;
println!("Read: {}", *guard);
});
tokio::join!(writer, reader);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Tokio Channel Example ===");
channel_example().await;
println!("\n=== Tokio Synchronization Example ===");
synchronization_examples().await;
// 注意:TCP服务器示例被注释掉了,以免阻塞测试
// println!("\n=== Starting TCP Server ===");
// tcp_server().await?;
Ok(())
}
Async-std运行时
Async-std提供了与标准库类似的API,但所有阻塞操作都是异步的。
// 需要在Cargo.toml中添加async-std依赖
// async-std = "1.12"
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
// Async-std TCP服务器
async fn async_std_server() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8081").await?;
println!("Async-std server listening on 127.0.0.1:8081");
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let mut stream = stream?;
task::spawn(async move {
let mut buf = [0; 1024];
loop {
let n = stream.read(&mut buf).await.unwrap_or(0);
if n == 0 {
break;
}
stream.write_all(&buf[0..n]).await.unwrap();
}
});
}
Ok(())
}
// Async-std文件操作
async fn async_file_operations() -> Result<(), Box<dyn std::error::Error>> {
use async_std::fs;
// 异步写入文件
let mut file = fs::File::create("test.txt").await?;
file.write_all(b"Hello, async-std!").await?;
// 异步读取文件
let content = fs::read_to_string("test.txt").await?;
println!("File content: {}", content);
// 清理
fs::remove_file("test.txt").await?;
Ok(())
}
// 手动运行async-std
fn run_async_std_examples() -> Result<(), Box<dyn std::error::Error>> {
// 使用async_std::main属性宏更简单
// 这里展示手动运行的方式
task::block_on(async {
println!("=== Async-std File Operations ===");
async_file_operations().await?;
// 注意:服务器示例被注释掉了
// println!("\n=== Starting Async-std TCP Server ===");
// async_std_server().await?;
Ok(())
})
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
run_async_std_examples()
}
选择运行时和性能考虑
在选择运行时和设计异步应用时,需要考虑多个因素。
use std::time::Instant;
// 性能基准测试
async fn benchmark_async_operations() {
const OPERATIONS: usize = 10000;
// 测试生成大量任务的开销
let start = Instant::now();
let mut handles = Vec::with_capacity(OPERATIONS);
for i in 0..OPERATIONS {
let handle = tokio::spawn(async move {
i * 2
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let duration = start.elapsed();
println!("Spawned {} tasks in {:?}", OPERATIONS, duration);
// 测试异步通道性能
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
let producer = tokio::spawn(async move {
for i in 0..OPERATIONS {
tx.send(i).await.unwrap();
}
});
let consumer = tokio::spawn(async move {
let mut count = 0;
while let Some(_) = rx.recv().await {
count += 1;
}
count
});
let start = Instant::now();
let (_, received_count) = tokio::join!(producer, consumer);
let duration = start.elapsed();
println!("Processed {} messages in {:?}", received_count, duration);
}
// 运行时配置
fn configure_runtime() {
// Tokio运行时配置
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 配置工作线程数
.enable_io() // 启用I/O
.enable_time() // 启用定时器
.thread_name("my-tokio-worker")
.build()
.unwrap();
rt.block_on(async {
println!("Running on custom configured runtime");
benchmark_async_operations().await;
});
}
// 选择策略
fn runtime_selection_guidance() {
println!("\n=== Runtime Selection Guidance ===");
println!("Choose Tokio when you need:");
println!("- Full-featured async ecosystem");
println!("- High-performance networking");
println!("- Advanced features like tracing, metrics");
println!("- Production-grade reliability");
println!("\nChoose async-std when you need:");
println!("- Standard library-like API");
println!("- Simplicity and ease of use");
println!("- Good performance for general use cases");
println!("\nConsider smol for:");
println!("- Minimal dependencies");
println!("- Embedded systems");
println!("- Custom runtime needs");
}
#[tokio::main]
async fn main() {
println!("=== Runtime Performance Benchmark ===");
benchmark_async_operations().await;
println!("\n=== Custom Runtime Configuration ===");
configure_runtime();
runtime_selection_guidance();
}
20.4 构建高性能异步应用
构建高性能异步应用需要仔细考虑任务调度、资源管理和错误处理等方面。
任务调度和负载均衡
use tokio::sync::mpsc;
use std::collections::VecDeque;
// 简单的负载均衡器
struct LoadBalancer<T> {
workers: Vec<mpsc::Sender<T>>,
next_worker: usize,
}
impl<T> LoadBalancer<T>
where
T: Send + 'static,
{
fn new(worker_count: usize, buffer_size: usize) -> Self {
let mut workers = Vec::with_capacity(worker_count);
for i in 0..worker_count {
let (tx, mut rx) = mpsc::channel(buffer_size);
// 启动工作线程
tokio::spawn(async move {
while let Some(task) = rx.recv().await {
println!("Worker {} processing task: {:?}", i, task);
// 模拟工作负载
sleep(Duration::from_millis(100)).await;
}
});
workers.push(tx);
}
Self {
workers,
next_worker: 0,
}
}
async fn dispatch(&mut self, task: T) -> Result<(), mpsc::error::SendError<T>> {
let worker_index = self.next_worker;
self.next_worker = (self.next_worker + 1) % self.workers.len();
self.workers[worker_index].send(task).await
}
}
// 工作窃取调度器
struct WorkStealingScheduler<T> {
queues: Vec<Mutex<VecDeque<T>>>,
}
impl<T> WorkStealingScheduler<T>
where
T: Send + 'static,
{
fn new(worker_count: usize) -> Self {
let mut queues = Vec::with_capacity(worker_count);
for _ in 0..worker_count {
queues.push(Mutex::new(VecDeque::new()));
}
Self { queues }
}
async fn push(&self, worker_id: usize, task: T) {
let mut queue = self.queues[worker_id].lock().await;
queue.push_back(task);
}
async fn pop(&self, worker_id: usize) -> Option<T> {
// 首先尝试自己的队列
if let Some(task) = self.queues[worker_id].lock().await.pop_front() {
return Some(task);
}
// 工作窃取:尝试其他队列
for i in 0..self.queues.len() {
if i != worker_id {
if let Some(task) = self.queues[i].lock().await.pop_back() { // 从尾部窃取
return Some(task);
}
}
}
None
}
}
async fn demonstrate_scheduling() {
println!("=== Load Balancer Example ===");
let mut load_balancer = LoadBalancer::new(4, 10);
for i in 0..20 {
load_balancer.dispatch(format!("Task {}", i)).await.unwrap();
}
// 等待任务完成
sleep(Duration::from_secs(3)).await;
println!("\n=== Work Stealing Example ===");
let scheduler = Arc::new(WorkStealingScheduler::new(4));
// 启动工作线程
let mut handles = Vec::new();
for worker_id in 0..4 {
let scheduler = Arc::clone(&scheduler);
let handle = tokio::spawn(async move {
for task_id in 0..5 {
scheduler.push(worker_id, format!("Worker {} Task {}", worker_id, task_id)).await;
}
// 处理任务
while let Some(task) = scheduler.pop(worker_id).await {
println!("Worker {} got: {}", worker_id, task);
sleep(Duration::from_millis(50)).await;
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::main]
async fn main() {
demonstrate_scheduling().await;
}
连接池和资源管理
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Semaphore;
// 简单的连接池
struct ConnectionPool<T> {
connections: Mutex<Vec<T>>,
semaphore: Semaphore,
max_size: usize,
current_size: AtomicUsize,
}
impl<T> ConnectionPool<T>
where
T: Default + Send + 'static,
{
fn new(max_size: usize) -> Self {
Self {
connections: Mutex::new(Vec::new()),
semaphore: Semaphore::new(max_size),
max_size,
current_size: AtomicUsize::new(0),
}
}
async fn get_connection(&self) -> PooledConnection<T> {
// 等待可用许可
let permit = self.semaphore.acquire().await.unwrap();
// 尝试从池中获取连接
if let Some(connection) = self.connections.lock().await.pop() {
return PooledConnection {
connection: Some(connection),
pool: self,
_permit: permit,
};
}
// 池为空,创建新连接
let current = self.current_size.fetch_add(1, Ordering::SeqCst);
if current < self.max_size {
PooledConnection {
connection: Some(T::default()),
pool: self,
_permit: permit,
}
} else {
self.current_size.fetch_sub(1, Ordering::SeqCst);
panic!("Connection pool exhausted");
}
}
fn return_connection(&self, connection: T) {
self.connections.lock().blocking_lock().push(connection);
}
}
// 池化连接
struct PooledConnection<'a, T> {
connection: Option<T>,
pool: &'a ConnectionPool<T>,
_permit: tokio::sync::SemaphorePermit<'a>,
}
impl<'a, T> std::ops::Deref for PooledConnection<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.connection.as_ref().unwrap()
}
}
impl<'a, T> std::ops::DerefMut for PooledConnection<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.connection.as_mut().unwrap()
}
}
impl<'a, T> Drop for PooledConnection<'a, T> {
fn drop(&mut self) {
if let Some(connection) = self.connection.take() {
self.pool.return_connection(connection);
}
}
}
// 模拟数据库连接
#[derive(Default, Debug)]
struct DatabaseConnection {
id: u32,
}
static CONNECTION_ID: AtomicUsize = AtomicUsize::new(1);
impl Default for DatabaseConnection {
fn default() -> Self {
let id = CONNECTION_ID.fetch_add(1, Ordering::SeqCst) as u32;
Self { id }
}
}
impl DatabaseConnection {
async fn query(&self, sql: &str) -> String {
sleep(Duration::from_millis(50)).await;
format!("Result from connection {}: {}", self.id, sql)
}
}
async fn demonstrate_connection_pool() {
println!("=== Connection Pool Example ===");
let pool = Arc::new(ConnectionPool::<DatabaseConnection>::new(3));
let mut handles = Vec::new();
// 启动多个任务同时使用连接池
for i in 0..10 {
let pool = Arc::clone(&pool);
let handle = tokio::spawn(async move {
let connection = pool.get_connection().await;
let result = connection.query(&format!("SELECT * FROM table_{}", i)).await;
println!("Task {}: {}", i, result);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("All tasks completed");
}
#[tokio::main]
async fn main() {
demonstrate_connection_pool().await;
}
背压和流量控制
use tokio::sync::broadcast;
// 带背压的生产者-消费者模式
struct BoundedProcessor<T> {
processor_tx: mpsc::Sender<T>,
capacity: usize,
}
impl<T> BoundedProcessor<T>
where
T: Send + 'static,
{
fn new<F>(capacity: usize, processor: F) -> Self
where
F: Fn(T) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static,
{
let (tx, mut rx) = mpsc::channel(capacity);
// 启动处理器任务
tokio::spawn(async move {
while let Some(item) = rx.recv().await {
processor(item).await;
}
});
Self {
processor_tx: tx,
capacity,
}
}
async fn process(&self, item: T) -> Result<(), mpsc::error::SendError<T>> {
self.processor_tx.send(item).await
}
fn capacity(&self) -> usize {
self.capacity
}
}
// 自适应速率限制器
struct AdaptiveRateLimiter {
semaphore: Semaphore,
max_permits: usize,
current_permits: AtomicUsize,
}
impl AdaptiveRateLimiter {
fn new(initial_permits: usize) -> Self {
Self {
semaphore: Semaphore::new(initial_permits),
max_permits: initial_permits,
current_permits: AtomicUsize::new(initial_permits),
}
}
async fn acquire(&self) -> tokio::sync::SemaphorePermit {
self.semaphore.acquire().await.unwrap()
}
fn adjust_capacity(&self, new_capacity: usize) {
let current = self.current_permits.load(Ordering::SeqCst);
if new_capacity > current {
// 增加许可
let additional = new_capacity - current;
self.semaphore.add_permits(additional);
self.current_permits.store(new_capacity, Ordering::SeqCst);
} else if new_capacity < current {
// 减少许可(更复杂,通常需要更精细的实现)
println!("Reducing capacity from {} to {}", current, new_capacity);
}
self.max_permits = new_capacity;
}
}
// 监控和指标收集
struct MetricsCollector {
request_count: AtomicUsize,
error_count: AtomicUsize,
latency_sum: AtomicUsize,
request_times: Mutex<Vec<Instant>>,
}
impl MetricsCollector {
fn new() -> Self {
Self {
request_count: AtomicUsize::new(0),
error_count: AtomicUsize::new(0),
latency_sum: AtomicUsize::new(0),
request_times: Mutex::new(Vec::new()),
}
}
fn record_request(&self) -> Instant {
self.request_count.fetch_add(1, Ordering::SeqCst);
Instant::now()
}
fn record_completion(&self, start: Instant, success: bool) {
let latency = start.elapsed().as_millis() as usize;
self.latency_sum.fetch_add(latency, Ordering::SeqCst);
if !success {
self.error_count.fetch_add(1, Ordering::SeqCst);
}
}
fn get_metrics(&self) -> Metrics {
let total_requests = self.request_count.load(Ordering::SeqCst);
let errors = self.error_count.load(Ordering::SeqCst);
let total_latency = self.latency_sum.load(Ordering::SeqCst);
let avg_latency = if total_requests > 0 {
total_latency / total_requests
} else {
0
};
Metrics {
total_requests,
errors,
avg_latency,
error_rate: if total_requests > 0 {
errors as f64 / total_requests as f64
} else {
0.0
},
}
}
}
#[derive(Debug)]
struct Metrics {
total_requests: usize,
errors: usize,
avg_latency: usize,
error_rate: f64,
}
async fn demonstrate_backpressure() {
println!("=== Backpressure and Flow Control ===");
// 创建有界处理器
let processor = Arc::new(BoundedProcessor::new(5, |item: usize| {
Box::pin(async move {
println!("Processing item: {}", item);
sleep(Duration::from_millis(100)).await;
})
}));
// 创建速率限制器
let rate_limiter = Arc::new(AdaptiveRateLimiter::new(3));
// 创建指标收集器
let metrics = Arc::new(MetricsCollector::new());
let mut handles = Vec::new();
// 生产者任务
for i in 0..20 {
let processor = Arc::clone(&processor);
let rate_limiter = Arc::clone(&rate_limiter);
let metrics = Arc::clone(&metrics);
let handle = tokio::spawn(async move {
let start = metrics.record_request();
// 获取速率限制许可
let _permit = rate_limiter.acquire().await;
match processor.process(i).await {
Ok(()) => {
metrics.record_completion(start, true);
}
Err(e) => {
metrics.record_completion(start, false);
eprintln!("Failed to process item {}: {}", i, e);
}
}
});
handles.push(handle);
}
// 动态调整容量
let rate_limiter_clone = Arc::clone(&rate_limiter);
let adjust_handle = tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
println!("Increasing capacity to 10");
rate_limiter_clone.adjust_capacity(10);
});
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
adjust_handle.await.unwrap();
// 输出指标
println!("Final metrics: {:?}", metrics.get_metrics());
}
#[tokio::main]
async fn main() {
demonstrate_backpressure().await;
}
总结
本章深入探讨了Rust异步编程的各个方面:
- async/await语法:直观的异步编程接口,让开发者能够以同步风格编写异步代码
- Future trait执行:理解Future的工作原理、Pin的内存安全保证以及手动实现复杂Future
- 异步运行时选择:比较Tokio和async-std等主流运行时,了解各自的优势和适用场景
- 高性能异步应用构建:任务调度、连接池、背压控制等高级技术
Rust的异步编程模型提供了零成本抽象的承诺,意味着异步代码在性能上可以与手动编写的回调代码相媲美,同时保持了更高的可读性和可维护性。
掌握异步编程是构建高性能网络服务、并发数据处理系统和其他I/O密集型应用的关键。通过合理运用本章介绍的技术和模式,你可以构建出既高效又可靠的异步Rust应用。
更多推荐



所有评论(0)