在这里插入图片描述

第20章 异步编程

异步编程是现代软件开发中的重要范式,它允许程序在等待I/O操作(如网络请求、文件读写等)时执行其他任务,从而大大提高程序的并发性能和资源利用率。Rust的异步编程模型以其零成本抽象和高性能而闻名,本章将深入探讨Rust异步编程的各个方面。

20.1 async/await语法

Rust通过asyncawait关键字提供了直观的异步编程语法,让开发者能够以近乎同步的方式编写异步代码,同时保持高性能。

异步函数和异步块

在Rust中,任何函数都可以通过添加async关键字转变为异步函数。异步函数在调用时不会立即执行,而是返回一个实现了Future trait的类型。

// 基本的异步函数
async fn simple_async_function() -> u32 {
    println!("Starting async function");
    42
}

// 异步方法
struct AsyncProcessor;

impl AsyncProcessor {
    async fn process_data(&self, data: &str) -> String {
        format!("Processed: {}", data)
    }
}

// 异步块
async fn async_block_example() {
    let result = async {
        println!("Inside async block");
        "Hello from async block"
    }.await;
    
    println!("{}", result);
}

// 在trait中使用异步方法(需要async-trait crate或使用nightly特性)
// 使用async-trait crate的示例
// #[async_trait]
// trait DataFetcher {
//     async fn fetch(&self) -> Result<String, Box<dyn Error>>;
// }

fn demonstrate_async_basics() {
    // 异步函数返回Future,需要执行器来运行
    let future = simple_async_function();
    println!("Async function returned a Future: {:?}", future);
    
    // 在实际应用中,我们会使用运行时来执行这些future
}

await关键字和Future执行

await关键字用于等待异步操作的完成。它只能在异步上下文中使用,并且会挂起当前任务直到Future完成。

use std::time::Duration;
use tokio::time::sleep;

// 模拟异步操作
async fn simulated_async_work(name: &str, duration_ms: u64) -> String {
    println!("[{}] Starting work", name);
    sleep(Duration::from_millis(duration_ms)).await;
    println!("[{}] Work completed", name);
    format!("Result from {}", name)
}

// 顺序执行异步操作
async fn sequential_execution() {
    println!("=== Sequential Execution ===");
    
    let start = std::time::Instant::now();
    
    let result1 = simulated_async_work("Task 1", 500).await;
    println!("Got: {}", result1);
    
    let result2 = simulated_async_work("Task 2", 300).await;
    println!("Got: {}", result2);
    
    let result3 = simulated_async_work("Task 3", 200).await;
    println!("Got: {}", result3);
    
    let duration = start.elapsed();
    println!("Total time: {:?}", duration);
}

// 使用join并发执行
async fn concurrent_execution() {
    println!("\n=== Concurrent Execution ===");
    
    let start = std::time::Instant::now();
    
    // 使用tokio::join!并发执行多个future
    let (result1, result2, result3) = tokio::join!(
        simulated_async_work("Task A", 500),
        simulated_async_work("Task B", 300),
        simulated_async_work("Task C", 200)
    );
    
    println!("Got: {}, {}, {}", result1, result2, result3);
    
    let duration = start.elapsed();
    println!("Total time: {:?}", duration);
}

// 使用select处理多个异步操作
async fn selective_execution() {
    println!("\n=== Selective Execution ===");
    
    let mut fast_task = simulated_async_work("Fast Task", 100);
    let mut slow_task = simulated_async_work("Slow Task", 500);
    
    tokio::select! {
        result = &mut fast_task => {
            println!("Fast task finished first: {}", result);
        }
        result = &mut slow_task => {
            println!("Slow task finished first: {}", result);
        }
    }
    
    // 另一个任务可能还在运行,我们可以选择等待它
    // 或者直接丢弃它(会被取消)
}

#[tokio::main]
async fn main() {
    sequential_execution().await;
    concurrent_execution().await;
    selective_execution().await;
}

错误处理 in Async Code

异步代码中的错误处理与同步代码类似,但有一些特殊的考虑因素。

use std::io;
use tokio::fs;

// 异步错误处理
async fn read_file_async(filename: &str) -> Result<String, io::Error> {
    let content = fs::read_to_string(filename).await?;
    Ok(content)
}

// 组合多个异步操作的错误处理
async fn process_multiple_files() -> Result<(), Box<dyn std::error::Error>> {
    // 使用?在异步函数中传播错误
    let file1 = read_file_async("file1.txt").await?;
    let file2 = read_file_async("file2.txt").await?;
    
    println!("File1: {}", file1);
    println!("File2: {}", file2);
    
    Ok(())
}

// 使用try_join!处理并发操作的错误
async fn process_files_concurrently() -> Result<(), Box<dyn std::error::Error>> {
    let (result1, result2) = tokio::try_join!(
        read_file_async("file1.txt"),
        read_file_async("file2.txt")
    )?;
    
    println!("File1: {}", result1);
    println!("File2: {}", result2);
    
    Ok(())
}

// 超时处理
async fn operation_with_timeout() -> Result<String, Box<dyn std::error::Error>> {
    use tokio::time::timeout;
    
    let slow_operation = async {
        sleep(Duration::from_secs(10)).await;
        "Operation completed".to_string()
    };
    
    // 设置5秒超时
    match timeout(Duration::from_secs(5), slow_operation).await {
        Ok(result) => Ok(result),
        Err(_) => Err("Operation timed out".into()),
    }
}

// 重试机制
async fn retry_async_operation<F, T, E>(
    mut operation: F,
    max_retries: usize,
) -> Result<T, E>
where
    F: FnMut() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T, E>> + Send>>,
    E: std::fmt::Debug,
{
    for attempt in 0..=max_retries {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if attempt == max_retries => return Err(e),
            Err(e) => {
                println!("Attempt {} failed: {:?}, retrying...", attempt + 1, e);
                sleep(Duration::from_millis(100 * 2u64.pow(attempt as u32))).await;
            }
        }
    }
    unreachable!()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 错误处理示例
    match read_file_async("nonexistent.txt").await {
        Ok(content) => println!("File content: {}", content),
        Err(e) => println!("Error reading file: {}", e),
    }
    
    // 超时示例
    match operation_with_timeout().await {
        Ok(result) => println!("Success: {}", result),
        Err(e) => println!("Error: {}", e),
    }
    
    // 重试示例
    let mut attempts = 0;
    let result = retry_async_operation(
        || {
            attempts += 1;
            Box::pin(async move {
                if attempts < 3 {
                    Err("Temporary failure")
                } else {
                    Ok("Success")
                }
            })
        },
        3,
    ).await;
    
    println!("Retry result: {:?}", result);
    
    Ok(())
}

20.2 Future trait执行

理解Future trait是掌握Rust异步编程的关键。Future代表了一个可能还没有就绪的值,它构成了Rust异步编程的基础。

Future trait详解

Future trait定义了异步计算的基本接口。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

// 自定义Future实现
struct SimpleTimer {
    duration: Duration,
    elapsed: bool,
}

impl SimpleTimer {
    fn new(duration: Duration) -> Self {
        Self {
            duration,
            elapsed: false,
        }
    }
}

impl Future for SimpleTimer {
    type Output = &'static str;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.elapsed {
            Poll::Ready("Timer finished!")
        } else {
            // 在实际实现中,这里会设置一个waker来在时间到时唤醒任务
            // 这里简化实现,直接标记为就绪
            self.elapsed = true;
            
            // 克隆waker以便在定时器完成后唤醒任务
            let waker = cx.waker().clone();
            
            // 在实际场景中,我们会在这里启动一个定时器
            // 当定时器到期时调用waker.wake()
            tokio::spawn(async move {
                sleep(Duration::from_millis(100)).await;
                waker.wake();
            });
            
            Poll::Pending
        }
    }
}

// 组合Future
struct AndThen<F1, F2> {
    first: F1,
    second: Option<F2>,
}

impl<F1, F2> AndThen<F1, F2> {
    fn new(first: F1, second: F2) -> Self {
        Self {
            first,
            second: Some(second),
        }
    }
}

impl<F1, F2, A, B, E> Future for AndThen<F1, F2>
where
    F1: Future<Output = Result<A, E>>,
    F2: Future<Output = Result<B, E>>,
{
    type Output = Result<B, E>;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if let Some(second) = &mut self.second {
            // 轮询第一个future
            match Pin::new(&mut self.first).poll(cx) {
                Poll::Ready(Ok(_)) => {
                    // 第一个future成功完成,开始轮询第二个
                    let second_future = self.second.take().unwrap();
                    Pin::new(&mut Some(second_future)).poll(cx)
                }
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Pending => Poll::Pending,
            }
        } else {
            // 这种情况不应该发生
            panic!("AndThen future in invalid state");
        }
    }
}

// 使用自定义Future
async fn use_custom_future() {
    println!("Starting custom timer...");
    let result = SimpleTimer::new(Duration::from_secs(1)).await;
    println!("Custom timer: {}", result);
}

#[tokio::main]
async fn main() {
    use_custom_future().await;
}

Pin和内存安全

Pin是Rust异步编程中的关键概念,它确保了Future在内存中的位置不会改变,这对于自引用结构体至关重要。

use std::marker::PhantomPinned;
use std::pin::Pin;

// 自引用结构体示例
struct SelfReferential {
    data: String,
    pointer_to_data: *const String,
    _pin: PhantomPinned, // 标记为!Unpin
}

impl SelfReferential {
    fn new(data: String) -> Self {
        Self {
            data,
            pointer_to_data: std::ptr::null(),
            _pin: PhantomPinned,
        }
    }
    
    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.data;
        unsafe {
            let this = self.get_unchecked_mut();
            this.pointer_to_data = self_ptr;
        }
    }
    
    fn get_data(self: Pin<&Self>) -> &str {
        unsafe {
            &*self.pointer_to_data
        }
    }
}

// 为SelfReferential实现Future
impl Future for SelfReferential {
    type Output = &'static str;
    
    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Ready("SelfReferential future completed")
    }
}

// Pin的实用示例
async fn pin_examples() {
    // 栈上固定
    let mut value = SelfReferential::new("hello".to_string());
    let mut pinned_value = unsafe { Pin::new_unchecked(&mut value) };
    pinned_value.as_mut().init();
    
    println!("Data through pointer: {}", pinned_value.as_ref().get_data());
    
    // 堆上固定
    let boxed_value = Box::pin(SelfReferential::new("world".to_string()));
    
    // 使用pin!宏(Rust 1.68+)
    // let pinned_value = pin!(SelfReferential::new("hello".to_string()));
}

// 处理Unpin类型
async fn unpin_examples() {
    // 大多数类型都是Unpin的,可以自由移动
    let mut x = 5;
    let pinned_x = Pin::new(&mut x);
    
    // Unpin类型可以被安全地移动
    let mut y = 10;
    let pinned_y = Pin::new(&mut y);
    
    // 即使被pin住,Unpin类型仍然可以移动
    std::mem::swap(&mut x, &mut y);
}

#[tokio::main]
async fn main() {
    pin_examples().await;
    unpin_examples().await;
}

手动实现复杂的Future

对于高级用例,我们可能需要手动实现复杂的Future。

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

// 异步通道实现
struct AsyncChannel<T> {
    queue: Arc<Mutex<VecDeque<T>>>,
    wakers: Arc<Mutex<Vec<std::task::Waker>>>,
}

impl<T> AsyncChannel<T> {
    fn new() -> Self {
        Self {
            queue: Arc::new(Mutex::new(VecDeque::new())),
            wakers: Arc::new(Mutex::new(Vec::new())),
        }
    }
    
    fn send(&self, value: T) {
        let mut queue = self.queue.lock().unwrap();
        queue.push_back(value);
        
        // 唤醒一个等待的接收者
        let mut wakers = self.wakers.lock().unwrap();
        if let Some(waker) = wakers.pop() {
            waker.wake();
        }
    }
    
    fn try_receive(&self) -> Option<T> {
        let mut queue = self.queue.lock().unwrap();
        queue.pop_front()
    }
}

// 接收Future
struct ReceiveFuture<T> {
    channel: Arc<AsyncChannel<T>>,
}

impl<T> ReceiveFuture<T> {
    fn new(channel: &AsyncChannel<T>) -> Self {
        Self {
            channel: Arc::clone(&channel.queue),
        }
    }
}

impl<T> Future for ReceiveFuture<T> {
    type Output = T;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 尝试立即接收
        if let Some(value) = self.channel.try_receive() {
            return Poll::Ready(value);
        }
        
        // 没有数据可用,注册waker
        let mut wakers = self.channel.wakers.lock().unwrap();
        wakers.push(cx.waker().clone());
        Poll::Pending
    }
}

// 使用自定义通道
async fn use_async_channel() {
    let channel = Arc::new(AsyncChannel::new());
    let channel_clone = Arc::clone(&channel);
    
    // 生产者任务
    let producer = tokio::spawn(async move {
        for i in 0..5 {
            channel_clone.send(i);
            println!("Sent: {}", i);
            sleep(Duration::from_millis(100)).await;
        }
    });
    
    // 消费者任务
    let consumer = tokio::spawn(async move {
        for _ in 0..5 {
            let receive_future = ReceiveFuture::new(&channel);
            let value = receive_future.await;
            println!("Received: {}", value);
        }
    });
    
    let _ = tokio::join!(producer, consumer);
}

#[tokio::main]
async fn main() {
    use_async_channel().await;
}

20.3 异步运行时选择

Rust的标准库只提供了Future trait,实际的异步执行需要依赖第三方运行时。目前最流行的运行时是tokio和async-std。

Tokio运行时

Tokio是功能最全面的异步运行时,提供了完整的异步I/O、定时器、同步原语等。

use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, Mutex, RwLock};
use std::sync::Arc;

// Tokio TCP服务器示例
async fn tcp_server() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Server listening on 127.0.0.1:8080");
    
    // 使用共享状态
    let connection_count = Arc::new(Mutex::new(0u32));
    
    loop {
        let (socket, addr) = listener.accept().await?;
        println!("Accepted connection from: {}", addr);
        
        let count = Arc::clone(&connection_count);
        tokio::spawn(async move {
            // 更新连接计数
            let mut guard = count.lock().await;
            *guard += 1;
            println!("Active connections: {}", *guard);
            drop(guard); // 尽早释放锁
            
            if let Err(e) = handle_client(socket).await {
                eprintln!("Error handling client: {}", e);
            }
            
            // 连接关闭,减少计数
            let mut guard = count.lock().await;
            *guard -= 1;
            println!("Active connections: {}", *guard);
        });
    }
}

async fn handle_client(mut socket: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
    let mut buf = [0; 1024];
    
    loop {
        let n = socket.read(&mut buf).await?;
        if n == 0 {
            return Ok(());
        }
        
        // 回显接收到的数据
        socket.write_all(&buf[0..n]).await?;
    }
}

// Tokio通道示例
async fn channel_example() {
    let (tx, mut rx) = mpsc::channel(32);
    
    // 生产者任务
    let producer = tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.expect("Failed to send");
            sleep(Duration::from_millis(100)).await;
        }
    });
    
    // 消费者任务
    let consumer = tokio::spawn(async move {
        while let Some(value) = rx.recv().await {
            println!("Received: {}", value);
        }
    });
    
    tokio::join!(producer, consumer);
}

// Tokio同步原语
async fn synchronization_examples() {
    // Mutex示例
    let mutex = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    
    for i in 0..10 {
        let mutex_clone = Arc::clone(&mutex);
        let handle = tokio::spawn(async move {
            let mut guard = mutex_clone.lock().await;
            *guard += i;
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
    
    let value = mutex.lock().await;
    println!("Final mutex value: {}", *value);
    
    // RwLock示例
    let rwlock = Arc::new(RwLock::new(String::new()));
    let reader_lock = Arc::clone(&rwlock);
    
    let writer = tokio::spawn(async move {
        let mut guard = rwlock.write().await;
        *guard = "Hello, World!".to_string();
    });
    
    let reader = tokio::spawn(async move {
        // 等待写入完成
        sleep(Duration::from_millis(10)).await;
        let guard = reader_lock.read().await;
        println!("Read: {}", *guard);
    });
    
    tokio::join!(writer, reader);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("=== Tokio Channel Example ===");
    channel_example().await;
    
    println!("\n=== Tokio Synchronization Example ===");
    synchronization_examples().await;
    
    // 注意:TCP服务器示例被注释掉了,以免阻塞测试
    // println!("\n=== Starting TCP Server ===");
    // tcp_server().await?;
    
    Ok(())
}

Async-std运行时

Async-std提供了与标准库类似的API,但所有阻塞操作都是异步的。

// 需要在Cargo.toml中添加async-std依赖
// async-std = "1.12"

use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;

// Async-std TCP服务器
async fn async_std_server() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8081").await?;
    println!("Async-std server listening on 127.0.0.1:8081");
    
    let mut incoming = listener.incoming();
    
    while let Some(stream) = incoming.next().await {
        let mut stream = stream?;
        task::spawn(async move {
            let mut buf = [0; 1024];
            
            loop {
                let n = stream.read(&mut buf).await.unwrap_or(0);
                if n == 0 {
                    break;
                }
                stream.write_all(&buf[0..n]).await.unwrap();
            }
        });
    }
    
    Ok(())
}

// Async-std文件操作
async fn async_file_operations() -> Result<(), Box<dyn std::error::Error>> {
    use async_std::fs;
    
    // 异步写入文件
    let mut file = fs::File::create("test.txt").await?;
    file.write_all(b"Hello, async-std!").await?;
    
    // 异步读取文件
    let content = fs::read_to_string("test.txt").await?;
    println!("File content: {}", content);
    
    // 清理
    fs::remove_file("test.txt").await?;
    
    Ok(())
}

// 手动运行async-std
fn run_async_std_examples() -> Result<(), Box<dyn std::error::Error>> {
    // 使用async_std::main属性宏更简单
    // 这里展示手动运行的方式
    task::block_on(async {
        println!("=== Async-std File Operations ===");
        async_file_operations().await?;
        
        // 注意:服务器示例被注释掉了
        // println!("\n=== Starting Async-std TCP Server ===");
        // async_std_server().await?;
        
        Ok(())
    })
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_async_std_examples()
}

选择运行时和性能考虑

在选择运行时和设计异步应用时,需要考虑多个因素。

use std::time::Instant;

// 性能基准测试
async fn benchmark_async_operations() {
    const OPERATIONS: usize = 10000;
    
    // 测试生成大量任务的开销
    let start = Instant::now();
    let mut handles = Vec::with_capacity(OPERATIONS);
    
    for i in 0..OPERATIONS {
        let handle = tokio::spawn(async move {
            i * 2
        });
        handles.push(handle);
    }
    
    for handle in handles {
        let _ = handle.await;
    }
    
    let duration = start.elapsed();
    println!("Spawned {} tasks in {:?}", OPERATIONS, duration);
    
    // 测试异步通道性能
    let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
    
    let producer = tokio::spawn(async move {
        for i in 0..OPERATIONS {
            tx.send(i).await.unwrap();
        }
    });
    
    let consumer = tokio::spawn(async move {
        let mut count = 0;
        while let Some(_) = rx.recv().await {
            count += 1;
        }
        count
    });
    
    let start = Instant::now();
    let (_, received_count) = tokio::join!(producer, consumer);
    let duration = start.elapsed();
    
    println!("Processed {} messages in {:?}", received_count, duration);
}

// 运行时配置
fn configure_runtime() {
    // Tokio运行时配置
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4) // 配置工作线程数
        .enable_io()       // 启用I/O
        .enable_time()     // 启用定时器
        .thread_name("my-tokio-worker")
        .build()
        .unwrap();
    
    rt.block_on(async {
        println!("Running on custom configured runtime");
        benchmark_async_operations().await;
    });
}

// 选择策略
fn runtime_selection_guidance() {
    println!("\n=== Runtime Selection Guidance ===");
    println!("Choose Tokio when you need:");
    println!("- Full-featured async ecosystem");
    println!("- High-performance networking");
    println!("- Advanced features like tracing, metrics");
    println!("- Production-grade reliability");
    
    println!("\nChoose async-std when you need:");
    println!("- Standard library-like API");
    println!("- Simplicity and ease of use");
    println!("- Good performance for general use cases");
    
    println!("\nConsider smol for:");
    println!("- Minimal dependencies");
    println!("- Embedded systems");
    println!("- Custom runtime needs");
}

#[tokio::main]
async fn main() {
    println!("=== Runtime Performance Benchmark ===");
    benchmark_async_operations().await;
    
    println!("\n=== Custom Runtime Configuration ===");
    configure_runtime();
    
    runtime_selection_guidance();
}

20.4 构建高性能异步应用

构建高性能异步应用需要仔细考虑任务调度、资源管理和错误处理等方面。

任务调度和负载均衡

use tokio::sync::mpsc;
use std::collections::VecDeque;

// 简单的负载均衡器
struct LoadBalancer<T> {
    workers: Vec<mpsc::Sender<T>>,
    next_worker: usize,
}

impl<T> LoadBalancer<T>
where
    T: Send + 'static,
{
    fn new(worker_count: usize, buffer_size: usize) -> Self {
        let mut workers = Vec::with_capacity(worker_count);
        
        for i in 0..worker_count {
            let (tx, mut rx) = mpsc::channel(buffer_size);
            
            // 启动工作线程
            tokio::spawn(async move {
                while let Some(task) = rx.recv().await {
                    println!("Worker {} processing task: {:?}", i, task);
                    // 模拟工作负载
                    sleep(Duration::from_millis(100)).await;
                }
            });
            
            workers.push(tx);
        }
        
        Self {
            workers,
            next_worker: 0,
        }
    }
    
    async fn dispatch(&mut self, task: T) -> Result<(), mpsc::error::SendError<T>> {
        let worker_index = self.next_worker;
        self.next_worker = (self.next_worker + 1) % self.workers.len();
        
        self.workers[worker_index].send(task).await
    }
}

// 工作窃取调度器
struct WorkStealingScheduler<T> {
    queues: Vec<Mutex<VecDeque<T>>>,
}

impl<T> WorkStealingScheduler<T>
where
    T: Send + 'static,
{
    fn new(worker_count: usize) -> Self {
        let mut queues = Vec::with_capacity(worker_count);
        
        for _ in 0..worker_count {
            queues.push(Mutex::new(VecDeque::new()));
        }
        
        Self { queues }
    }
    
    async fn push(&self, worker_id: usize, task: T) {
        let mut queue = self.queues[worker_id].lock().await;
        queue.push_back(task);
    }
    
    async fn pop(&self, worker_id: usize) -> Option<T> {
        // 首先尝试自己的队列
        if let Some(task) = self.queues[worker_id].lock().await.pop_front() {
            return Some(task);
        }
        
        // 工作窃取:尝试其他队列
        for i in 0..self.queues.len() {
            if i != worker_id {
                if let Some(task) = self.queues[i].lock().await.pop_back() { // 从尾部窃取
                    return Some(task);
                }
            }
        }
        
        None
    }
}

async fn demonstrate_scheduling() {
    println!("=== Load Balancer Example ===");
    
    let mut load_balancer = LoadBalancer::new(4, 10);
    
    for i in 0..20 {
        load_balancer.dispatch(format!("Task {}", i)).await.unwrap();
    }
    
    // 等待任务完成
    sleep(Duration::from_secs(3)).await;
    
    println!("\n=== Work Stealing Example ===");
    
    let scheduler = Arc::new(WorkStealingScheduler::new(4));
    
    // 启动工作线程
    let mut handles = Vec::new();
    for worker_id in 0..4 {
        let scheduler = Arc::clone(&scheduler);
        let handle = tokio::spawn(async move {
            for task_id in 0..5 {
                scheduler.push(worker_id, format!("Worker {} Task {}", worker_id, task_id)).await;
            }
            
            // 处理任务
            while let Some(task) = scheduler.pop(worker_id).await {
                println!("Worker {} got: {}", worker_id, task);
                sleep(Duration::from_millis(50)).await;
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    demonstrate_scheduling().await;
}

连接池和资源管理

use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::Semaphore;

// 简单的连接池
struct ConnectionPool<T> {
    connections: Mutex<Vec<T>>,
    semaphore: Semaphore,
    max_size: usize,
    current_size: AtomicUsize,
}

impl<T> ConnectionPool<T>
where
    T: Default + Send + 'static,
{
    fn new(max_size: usize) -> Self {
        Self {
            connections: Mutex::new(Vec::new()),
            semaphore: Semaphore::new(max_size),
            max_size,
            current_size: AtomicUsize::new(0),
        }
    }
    
    async fn get_connection(&self) -> PooledConnection<T> {
        // 等待可用许可
        let permit = self.semaphore.acquire().await.unwrap();
        
        // 尝试从池中获取连接
        if let Some(connection) = self.connections.lock().await.pop() {
            return PooledConnection {
                connection: Some(connection),
                pool: self,
                _permit: permit,
            };
        }
        
        // 池为空,创建新连接
        let current = self.current_size.fetch_add(1, Ordering::SeqCst);
        if current < self.max_size {
            PooledConnection {
                connection: Some(T::default()),
                pool: self,
                _permit: permit,
            }
        } else {
            self.current_size.fetch_sub(1, Ordering::SeqCst);
            panic!("Connection pool exhausted");
        }
    }
    
    fn return_connection(&self, connection: T) {
        self.connections.lock().blocking_lock().push(connection);
    }
}

// 池化连接
struct PooledConnection<'a, T> {
    connection: Option<T>,
    pool: &'a ConnectionPool<T>,
    _permit: tokio::sync::SemaphorePermit<'a>,
}

impl<'a, T> std::ops::Deref for PooledConnection<'a, T> {
    type Target = T;
    
    fn deref(&self) -> &Self::Target {
        self.connection.as_ref().unwrap()
    }
}

impl<'a, T> std::ops::DerefMut for PooledConnection<'a, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.connection.as_mut().unwrap()
    }
}

impl<'a, T> Drop for PooledConnection<'a, T> {
    fn drop(&mut self) {
        if let Some(connection) = self.connection.take() {
            self.pool.return_connection(connection);
        }
    }
}

// 模拟数据库连接
#[derive(Default, Debug)]
struct DatabaseConnection {
    id: u32,
}

static CONNECTION_ID: AtomicUsize = AtomicUsize::new(1);

impl Default for DatabaseConnection {
    fn default() -> Self {
        let id = CONNECTION_ID.fetch_add(1, Ordering::SeqCst) as u32;
        Self { id }
    }
}

impl DatabaseConnection {
    async fn query(&self, sql: &str) -> String {
        sleep(Duration::from_millis(50)).await;
        format!("Result from connection {}: {}", self.id, sql)
    }
}

async fn demonstrate_connection_pool() {
    println!("=== Connection Pool Example ===");
    
    let pool = Arc::new(ConnectionPool::<DatabaseConnection>::new(3));
    
    let mut handles = Vec::new();
    
    // 启动多个任务同时使用连接池
    for i in 0..10 {
        let pool = Arc::clone(&pool);
        let handle = tokio::spawn(async move {
            let connection = pool.get_connection().await;
            let result = connection.query(&format!("SELECT * FROM table_{}", i)).await;
            println!("Task {}: {}", i, result);
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
    
    println!("All tasks completed");
}

#[tokio::main]
async fn main() {
    demonstrate_connection_pool().await;
}

背压和流量控制

use tokio::sync::broadcast;

// 带背压的生产者-消费者模式
struct BoundedProcessor<T> {
    processor_tx: mpsc::Sender<T>,
    capacity: usize,
}

impl<T> BoundedProcessor<T>
where
    T: Send + 'static,
{
    fn new<F>(capacity: usize, processor: F) -> Self
    where
        F: Fn(T) -> std::pin::Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static,
    {
        let (tx, mut rx) = mpsc::channel(capacity);
        
        // 启动处理器任务
        tokio::spawn(async move {
            while let Some(item) = rx.recv().await {
                processor(item).await;
            }
        });
        
        Self {
            processor_tx: tx,
            capacity,
        }
    }
    
    async fn process(&self, item: T) -> Result<(), mpsc::error::SendError<T>> {
        self.processor_tx.send(item).await
    }
    
    fn capacity(&self) -> usize {
        self.capacity
    }
}

// 自适应速率限制器
struct AdaptiveRateLimiter {
    semaphore: Semaphore,
    max_permits: usize,
    current_permits: AtomicUsize,
}

impl AdaptiveRateLimiter {
    fn new(initial_permits: usize) -> Self {
        Self {
            semaphore: Semaphore::new(initial_permits),
            max_permits: initial_permits,
            current_permits: AtomicUsize::new(initial_permits),
        }
    }
    
    async fn acquire(&self) -> tokio::sync::SemaphorePermit {
        self.semaphore.acquire().await.unwrap()
    }
    
    fn adjust_capacity(&self, new_capacity: usize) {
        let current = self.current_permits.load(Ordering::SeqCst);
        
        if new_capacity > current {
            // 增加许可
            let additional = new_capacity - current;
            self.semaphore.add_permits(additional);
            self.current_permits.store(new_capacity, Ordering::SeqCst);
        } else if new_capacity < current {
            // 减少许可(更复杂,通常需要更精细的实现)
            println!("Reducing capacity from {} to {}", current, new_capacity);
        }
        
        self.max_permits = new_capacity;
    }
}

// 监控和指标收集
struct MetricsCollector {
    request_count: AtomicUsize,
    error_count: AtomicUsize,
    latency_sum: AtomicUsize,
    request_times: Mutex<Vec<Instant>>,
}

impl MetricsCollector {
    fn new() -> Self {
        Self {
            request_count: AtomicUsize::new(0),
            error_count: AtomicUsize::new(0),
            latency_sum: AtomicUsize::new(0),
            request_times: Mutex::new(Vec::new()),
        }
    }
    
    fn record_request(&self) -> Instant {
        self.request_count.fetch_add(1, Ordering::SeqCst);
        Instant::now()
    }
    
    fn record_completion(&self, start: Instant, success: bool) {
        let latency = start.elapsed().as_millis() as usize;
        self.latency_sum.fetch_add(latency, Ordering::SeqCst);
        
        if !success {
            self.error_count.fetch_add(1, Ordering::SeqCst);
        }
    }
    
    fn get_metrics(&self) -> Metrics {
        let total_requests = self.request_count.load(Ordering::SeqCst);
        let errors = self.error_count.load(Ordering::SeqCst);
        let total_latency = self.latency_sum.load(Ordering::SeqCst);
        
        let avg_latency = if total_requests > 0 {
            total_latency / total_requests
        } else {
            0
        };
        
        Metrics {
            total_requests,
            errors,
            avg_latency,
            error_rate: if total_requests > 0 {
                errors as f64 / total_requests as f64
            } else {
                0.0
            },
        }
    }
}

#[derive(Debug)]
struct Metrics {
    total_requests: usize,
    errors: usize,
    avg_latency: usize,
    error_rate: f64,
}

async fn demonstrate_backpressure() {
    println!("=== Backpressure and Flow Control ===");
    
    // 创建有界处理器
    let processor = Arc::new(BoundedProcessor::new(5, |item: usize| {
        Box::pin(async move {
            println!("Processing item: {}", item);
            sleep(Duration::from_millis(100)).await;
        })
    }));
    
    // 创建速率限制器
    let rate_limiter = Arc::new(AdaptiveRateLimiter::new(3));
    
    // 创建指标收集器
    let metrics = Arc::new(MetricsCollector::new());
    
    let mut handles = Vec::new();
    
    // 生产者任务
    for i in 0..20 {
        let processor = Arc::clone(&processor);
        let rate_limiter = Arc::clone(&rate_limiter);
        let metrics = Arc::clone(&metrics);
        
        let handle = tokio::spawn(async move {
            let start = metrics.record_request();
            
            // 获取速率限制许可
            let _permit = rate_limiter.acquire().await;
            
            match processor.process(i).await {
                Ok(()) => {
                    metrics.record_completion(start, true);
                }
                Err(e) => {
                    metrics.record_completion(start, false);
                    eprintln!("Failed to process item {}: {}", i, e);
                }
            }
        });
        
        handles.push(handle);
    }
    
    // 动态调整容量
    let rate_limiter_clone = Arc::clone(&rate_limiter);
    let adjust_handle = tokio::spawn(async move {
        sleep(Duration::from_secs(1)).await;
        println!("Increasing capacity to 10");
        rate_limiter_clone.adjust_capacity(10);
    });
    
    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }
    adjust_handle.await.unwrap();
    
    // 输出指标
    println!("Final metrics: {:?}", metrics.get_metrics());
}

#[tokio::main]
async fn main() {
    demonstrate_backpressure().await;
}

总结

本章深入探讨了Rust异步编程的各个方面:

  1. async/await语法:直观的异步编程接口,让开发者能够以同步风格编写异步代码
  2. Future trait执行:理解Future的工作原理、Pin的内存安全保证以及手动实现复杂Future
  3. 异步运行时选择:比较Tokio和async-std等主流运行时,了解各自的优势和适用场景
  4. 高性能异步应用构建:任务调度、连接池、背压控制等高级技术

Rust的异步编程模型提供了零成本抽象的承诺,意味着异步代码在性能上可以与手动编写的回调代码相媲美,同时保持了更高的可读性和可维护性。

掌握异步编程是构建高性能网络服务、并发数据处理系统和其他I/O密集型应用的关键。通过合理运用本章介绍的技术和模式,你可以构建出既高效又可靠的异步Rust应用。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐