我们先从最根本的问题讲起:为什么在写AI大模型应用、Web接口、网络请求时,同步代码根本无法用,而异步能直接提升10~100倍性能?

在同步代码里,只要你调用一次大模型API,程序就会死死卡住,直到模型返回结果。这期间CPU完全空闲,什么任务都处理不了。如果同时来100个用户请求,同步代码需要依次执行,总耗时会达到分钟级;而异步代码可以在等待模型返回的空隙里,处理成百上千个任务,总耗时几乎不变。

这就是异步编程在AI时代不是加分项,而是生存项的根本原因。


一、同步与异步的本质区别(最细粒度讲解)

我们先看一段纯同步、阻塞代码,它会完整展示同步程序“卡死等待”的全过程,你能清晰看到每一步执行顺序。


import time

def sync_llm_request(prompt: str, delay: float):
    """
    模拟同步调用大模型API
    这是一个普通函数,执行时会阻塞整个线程
    """
    print(f"【同步】开始处理Prompt:{prompt}")
    print(f"【同步】等待模型返回,等待时间:{delay}秒")
    
    # 阻塞等待:程序在此处完全停止,CPU空闲,无法处理任何其他任务
    time.sleep(delay)
    
    print(f"【同步】Prompt:{prompt} 处理完成")

# 主程序执行
if __name__ == "__main__":
    start = time.time()
    
    # 串行执行两个任务,必须等第一个完全结束,才能开始第二个
    sync_llm_request("介绍Python", 2)
    sync_llm_request("介绍异步编程", 2)
    
    end = time.time()
    print(f"【同步】总耗时:{end - start:.2f} 秒")

运行输出


【同步】开始处理Prompt:介绍Python
【同步】等待模型返回,等待时间:2秒
【同步】Prompt:介绍Python 处理完成
【同步】开始处理Prompt:介绍异步编程
【同步】等待模型返回,等待时间:2秒
【同步】Prompt:介绍异步编程 处理完成
【同步】总耗时:4.01 秒

逐行输出解释

  1. 程序进入第一个函数,打印开始信息;

  2. 执行time.sleep(2),这是阻塞操作,操作系统会把当前线程挂起,程序不再执行任何代码;

  3. 2秒后,线程恢复,打印完成信息,函数返回;

  4. 程序才会进入第二个函数,重复上述等待过程;

  5. 总耗时 = 2 + 2 = 4秒,效率极低。

在真实AI大模型调用中,这个等待时间通常是3~10秒,如果有100个用户请求,同步代码需要几百秒,完全无法上线使用。


接下来我们看异步非阻塞版本,这是高性能AI服务的基础。你会看到任务不再排队,而是同时开始、同时等待、同时完成,总耗时只等于最长单个任务的耗时。


import asyncio
import time

async def async_llm_request(prompt: str, delay: float):
    """
    异步大模型请求函数
    async def 声明这是一个协程函数
    """
    print(f"【异步】开始处理Prompt:{prompt}")
    print(f"【异步】等待模型返回,等待时间:{delay}秒")
    
    # 异步非阻塞等待:挂起当前协程,让出CPU,事件循环去执行其他任务
    await asyncio.sleep(delay)
    
    print(f"【异步】Prompt:{prompt} 处理完成")

async def main():
    """
    主协程:负责创建并调度所有异步任务
    """
    # create_task:将协程包装为Task对象,立刻加入事件循环调度
    task1 = asyncio.create_task(async_llm_request("介绍Python", 2))
    task2 = asyncio.create_task(async_llm_request("介绍异步编程", 2))
    
    # 等待两个任务全部执行完成
    await task1
    await task2

if __name__ == "__main__":
    start = time.time()
    
    # asyncio.run():创建事件循环,运行主协程,结束后关闭循环
    asyncio.run(main())
    
    end = time.time()
    print(f"【异步】总耗时:{end - start:.2f} 秒")

运行输出


【异步】开始处理Prompt:介绍Python
【异步】等待模型返回,等待时间:2秒
【异步】开始处理Prompt:介绍异步编程
【异步】等待模型返回,等待时间:2秒
【异步】Prompt:介绍Python 处理完成
【异步】Prompt:介绍异步编程 处理完成
【异步】总耗时:2.01 秒

逐行输出解释

  1. asyncio.run(main()) 会在底层创建一个事件循环,这是异步程序的“调度中心”;

  2. asyncio.create_task() 会把协程包装成Task,并立即加入调度队列,而不是等待执行;

  3. 执行task1,打印信息,遇到await asyncio.sleep(2)

  4. 关键点await挂起当前协程,主动把CPU使用权让给事件循环,循环立刻切换执行task2;

  5. task2同样挂起后,事件循环没有可执行任务,进入轻量等待;

  6. 2秒后,两个任务同时就绪,依次恢复执行;

  7. 总耗时 ≈ 2秒,与单个任务耗时一致。

这就是异步编程的核心价值:在I/O等待时,CPU不空闲,能处理其他任务


二、协程、Task、Future、事件循环(底层原理全拆解)

1. 协程(Coroutine)

协程是可以暂停、可以恢复执行的函数,由async def定义。

  • 普通函数:调用 → 执行到结束 → 返回

  • 协程:调用 → 执行 → await暂停 → 恢复 → 结束

重要:协程不能直接运行!

下面这段代码是错误的,协程不会执行,只会返回一个协程对象:


async def test():
    print("hello")

test()  # 错误!不会打印任何内容,只会输出 <coroutine object test at 0x...>

正确运行协程的唯一方式:


asyncio.run(test())

2. 事件循环(Event Loop)

事件循环是异步程序的大脑,它的工作流程是固定且机械的:

  1. 初始化任务队列;

  2. 从队列取出一个任务执行;

  3. 遇到await,把任务挂起,放回队列;

  4. 取下一个任务执行;

  5. 重复直到所有任务完成;

  6. 关闭循环。

从Python 3.7开始,我们不需要手动创建循环,asyncio.run()会自动完成:创建循环 → 设置为当前循环 → 运行协程 → 关闭清理。

3. Task(任务)

Task是协程的包装器,是Future的子类。

它的唯一作用:让协程能够被事件循环调度,实现并发

  • await协程:串行执行,无并发;

  • create_task包装后await:并发执行。

4. Future

Future是一个**“结果占位符”**,代表一个还没有完成的异步操作的结果。

我们在业务代码中几乎不会直接使用Future,它主要用于框架底层开发,Task继承自Future,所以我们使用Task即可。


三、并发控制三大核心API:gather / wait / as_completed(全参数详解)

这三个函数是异步并发控制的核心,用途完全不同,在AI大模型批量调用中必须精准选择。

1. asyncio.gather(最常用:批量执行,按顺序返回)

gather会并发运行所有传入的任务,等待全部完成后,按传入顺序返回结果

它支持异常捕获,支持批量传参,是AI批量调用大模型最常用的API。


import asyncio

async def llm_task(prompt: str, wait_time: float):
    await asyncio.sleep(wait_time)
    return f"Prompt: {prompt} | 处理完成"

async def main():
    # 并发运行3个任务,按传入顺序返回结果
    results = await asyncio.gather(
        llm_task("你好", 1),
        llm_task("Python", 2),
        llm_task("异步编程", 1.5)
    )
    
    for res in results:
        print(res)

asyncio.run(main())

输出


Prompt: 你好 | 处理完成
Prompt: Python | 处理完成
Prompt: 异步编程 | 处理完成

详细说明

  • 结果顺序与任务传入顺序完全一致,不会因为执行快慢改变;

  • 任意一个任务抛出异常,会直接导致整个gather抛出异常;

  • 可以添加return_exceptions=True,把异常当作结果返回,不中断其他任务。

2. asyncio.wait(灵活控制:等第一个/等全部/超时)

wait可以控制等待策略,适合需要优先处理最快完成任务的场景,比如大模型多路调用,谁先返回用谁。

它返回两个集合:done(已完成任务)、pending(未完成任务)。


import asyncio

async def task(seconds):
    await asyncio.sleep(seconds)
    return seconds

async def main():
    tasks = [
        asyncio.create_task(task(1)),
        asyncio.create_task(task(2)),
        asyncio.create_task(task(3))
    ]
    
    # 等待第一个任务完成就返回
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    print("已完成任务结果:", [t.result() for t in done])
    print("未完成任务数量:", len(pending))

asyncio.run(main())

输出


已完成任务结果: [1]
未完成任务数量: 2

详细说明

  • return_when=asyncio.FIRST_COMPLETED:第一个任务完成就返回;

  • return_when=asyncio.ALL_COMPLETED:等待全部完成(默认);

  • return_when=asyncio.FIRST_EXCEPTION:第一个异常抛出就返回;

  • 支持timeout参数,超时后直接返回,未完成任务保留在pending中。

3. asyncio.as_completed(按完成顺序返回:流式处理)

as_completed会返回一个迭代器,任务谁先完成,谁先输出,非常适合大模型批量调用、流式响应场景。


import asyncio

async def task(seconds):
    await asyncio.sleep(seconds)
    return f"耗时{seconds}秒的任务完成"

async def main():
    tasks = [task(3), task(1), task(2)]
    
    # 按完成顺序遍历结果
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(result)

asyncio.run(main())

输出


耗时1秒的任务完成
耗时2秒的任务完成
耗时3秒的任务完成

详细说明

  • 结果顺序完全由任务执行快慢决定;

  • 适合实时展示大模型返回结果,不用等待全部任务完成;

  • 是AI对话、流式输出、实时检索的核心API。


四、任务控制:超时、取消、异常捕获(工程级必备)

在AI大模型调用中,超时、任务取消、异常处理是必须做的防护措施,否则一个卡住的模型请求会拖垮整个服务。

1. 超时控制:asyncio.wait_for

强制给任务设置最大等待时间,超过时间直接抛出TimeoutError


import asyncio

async def llm_long_task():
    await asyncio.sleep(5)  # 模拟模型长时间不返回

async def main():
    try:
        # 最多等待1秒,超时直接抛出异常
        await asyncio.wait_for(llm_long_task(), timeout=1)
    except asyncio.TimeoutError:
        print("【错误】大模型请求超时,自动断开")

asyncio.run(main())

输出


【错误】大模型请求超时,自动断开

2. 任务取消:Task.cancel()

手动取消一个正在等待或执行的任务,比如用户主动断开对话。


import asyncio

async def llm_task():
    await asyncio.sleep(5)

async def main():
    task = asyncio.create_task(llm_task())
    task.cancel()  # 立即取消任务
    
    try:
        await task
    except asyncio.CancelledError:
        print("【信息】任务已被用户取消")

asyncio.run(main())

输出


【信息】任务已被用户取消

五、异步高级特性:上下文管理器、生成器、信号量、队列(生产环境核心)

这部分内容是AI大模型应用开发的核心技术,覆盖流式输出、并发限流、资源管理、任务调度全场景。

1. 异步上下文管理器(自动释放模型连接)

用于自动创建/销毁资源,比如大模型客户端、数据库连接、HTTP会话,避免资源泄漏。

定义方式:实现__aenter____aexit__两个异步方法。


import asyncio

class AsyncAIClient:
    """模拟异步大模型客户端"""
    async def __aenter__(self):
        print("【连接】建立大模型API连接")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("【断开】自动关闭连接,释放资源")

async def main():
    # async with 自动管理生命周期
    async with AsyncAIClient() as client:
        print("【使用】调用模型进行推理")

asyncio.run(main())

输出


【连接】建立大模型API连接
【使用】调用模型进行推理
【断开】自动关闭连接,释放资源

2. 异步生成器(大模型流式输出:打字机效果)

ChatGPT、文心一言的逐字返回效果,底层完全依赖异步生成器。

它可以一边生成结果,一边返回,不用等待全部内容生成完毕。


import asyncio

async def llm_stream_response(prompt: str):
    """模拟大模型流式逐字返回"""
    response = f"这是针对「{prompt}」的回复"
    for char in response:
        await asyncio.sleep(0.1)  # 模拟逐字生成
        yield char  # 异步生成器:逐个返回字符

async def main():
    async for char in llm_stream_response("异步编程"):
        print(char, end="")

asyncio.run(main())

输出


这是针对「异步编程」的回复

(输出效果为逐字打印,模拟真实AI对话)

3. 信号量 Semaphore(控制并发数,防止打爆模型API)

大模型API都有并发限制,一次性发起100个请求会直接被限流封禁。

使用Semaphore可以严格控制最大并发数。


import asyncio

# 最大并发数 = 3
semaphore = asyncio.Semaphore(3)

async def llm_api_call(task_id: int):
    # 进入信号量:超过3个任务会在这里等待
    async with semaphore:
        print(f"任务{task_id} 开始调用模型")
        await asyncio.sleep(1)
        print(f"任务{task_id} 调用完成")

async def main():
    tasks = [asyncio.create_task(llm_api_call(i)) for i in range(10)]
    await asyncio.gather(*tasks)

asyncio.run(main())

输出规律

  • 每次只会同时运行3个任务

  • 完成一个,才会进入下一个;

  • 完美保护API不被限流。

4. 异步队列(AI任务调度:生产-消费模型)

用于解耦任务提交与任务执行,是AI对话系统、RAG调度、批量推理的核心组件。


import asyncio

# 异步队列
queue = asyncio.Queue(maxsize=5)

# 生产者:添加对话任务
async def producer():
    for i in range(5):
        await queue.put(f"用户对话任务{i}")
        print(f"添加任务:用户对话任务{i}")

# 消费者:处理模型推理
async def consumer():
    while True:
        task = await queue.get()
        print(f"处理任务:{task}")
        await asyncio.sleep(1)
        queue.task_done()

async def main():
    await asyncio.gather(producer(), consumer())

asyncio.run(main())

输出


添加任务:用户对话任务0
添加任务:用户对话任务1
添加任务:用户对话任务2
添加任务:用户对话任务3
添加任务:用户对话任务4
处理任务:用户对话任务0
处理任务:用户对话任务1
处理任务:用户对话任务2
处理任务:用户对话任务3
处理任务:用户对话任务4

六、同步阻塞代码异步化(解决90%的实际工程问题)

很多旧库(如requestspymysql、部分旧版模型SDK)只提供同步接口,直接在异步函数中使用会阻塞整个事件循环,导致所有任务卡住。

解决方案:使用asyncio.to_thread()将同步代码放到线程池中运行,不阻塞事件循环。


import asyncio
import time

def sync_llm_infer(prompt: str):
    """同步阻塞的模型推理函数"""
    time.sleep(1)
    return f"同步推理结果:{prompt}"

async def async_llm_infer(prompt: str):
    """包装为异步函数"""
    # 放到线程池运行,不阻塞事件循环
    result = await asyncio.to_thread(sync_llm_infer, prompt)
    return result

async def main():
    res = await async_llm_infer("测试异步化")
    print(res)

asyncio.run(main())

输出


同步推理结果:测试异步化

重要说明

  • CPU密集任务(本地大模型推理):使用asyncio.ProcessPoolExecutor多进程;

  • I/O密集任务(同步API、数据库):使用asyncio.to_thread

  • 绝对不要在异步函数中直接调用同步阻塞函数。


七、异步编程在AI大模型开发中的核心应用(完整可运行实战)

异步批量调用大模型API(生产级代码)

这段代码可以直接用于项目,支持并发控制、超时保护、批量请求,是AI应用最常用的代码模板。


import asyncio
import aiohttp

# 并发限制
SEMAPHORE = asyncio.Semaphore(5)

async def call_llm(session: aiohttp.ClientSession, prompt: str):
    """异步调用大模型API"""
    async with SEMAPHORE:
        try:
            # 替换为真实大模型API地址
            url = "https://httpbin.org/delay/1"
            payload = {"prompt": prompt}
            
            async with session.post(url, json=payload, timeout=5) as resp:
                return await resp.json()
        
        except asyncio.TimeoutError:
            return {"error": "请求超时"}
        except Exception as e:
            return {"error": str(e)}

async def batch_llm_call(prompts: list):
    """批量调用大模型"""
    async with aiohttp.ClientSession() as session:
        tasks = [call_llm(session, p) for p in prompts]
        return await asyncio.gather(*tasks)

if __name__ == "__main__":
    prompts = [
        "介绍RAG",
        "介绍Agent",
        "介绍微调",
        "介绍向量数据库",
        "介绍Prompt工程"
    ]
    
    results = asyncio.run(batch_llm_call(prompts))
    
    for i, res in enumerate(results):
        print(f"任务{i} 结果:", res)

输出

所有任务并发执行,总耗时≈1秒,而同步执行需要5秒,性能提升5倍。并发量越大,异步优势越明显。


八、异步编程终极避坑指南(高级工程师必须熟记)

  1. 绝对不要在async函数中使用同步I/Orequests/time.sleep/pymysql);

  2. 绝对不要忘记await,协程永远不会执行;

  3. 绝对不要嵌套asyncio.run(),会直接崩溃;

  4. 绝对不要在异步中使用同步锁,会导致死锁;

  5. 连接池必须全局唯一aiohttp.ClientSession只创建一次);

  6. CPU密集任务必须用多进程,异步无法提升CPU密集性能;

  7. 所有异步任务必须捕获异常,否则会静默失败;

  8. 大模型API必须加信号量限流+超时控制


九、全文总结(最核心的知识闭环)

异步编程的本质是单线程协作式并发,依靠await主动挂起任务,在I/O等待期间处理其他任务,完全避开Python GIL限制,专门为I/O密集型场景设计

在AI大模型开发中:

  • 大模型API调用 = I/O密集 → 异步提升10~100倍性能;

  • RAG检索 = 多库并发查询 → 异步大幅降低延迟;

  • 流式对话 = 异步生成器 → 实现逐字输出;

  • FastAPI服务 = 异步接口 → 单进程扛1000+并发;

  • LangChain/LlamaIndex = 原生异步 → 必须使用异步接口。

只要你完全吃透本文所有内容,你就已经达到Python异步编程专家级水平,足以独立开发高并发、高性能、生产级的AI大模型应用系统。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐