AI大模型应用开发前置知识:Python 异步编程
异步编程的本质是单线程协作式并发,依靠await主动挂起任务,在I/O等待期间处理其他任务,完全避开Python GIL限制,专门为I/O密集型场景设计。在AI大模型开发中:大模型API调用 = I/O密集 → 异步提升10~100倍性能;RAG检索 = 多库并发查询 → 异步大幅降低延迟;流式对话 = 异步生成器 → 实现逐字输出;FastAPI服务 = 异步接口 → 单进程扛1000+并发;L
我们先从最根本的问题讲起:为什么在写AI大模型应用、Web接口、网络请求时,同步代码根本无法用,而异步能直接提升10~100倍性能?
在同步代码里,只要你调用一次大模型API,程序就会死死卡住,直到模型返回结果。这期间CPU完全空闲,什么任务都处理不了。如果同时来100个用户请求,同步代码需要依次执行,总耗时会达到分钟级;而异步代码可以在等待模型返回的空隙里,处理成百上千个任务,总耗时几乎不变。
这就是异步编程在AI时代不是加分项,而是生存项的根本原因。
一、同步与异步的本质区别(最细粒度讲解)
我们先看一段纯同步、阻塞代码,它会完整展示同步程序“卡死等待”的全过程,你能清晰看到每一步执行顺序。
import time
def sync_llm_request(prompt: str, delay: float):
"""
模拟同步调用大模型API
这是一个普通函数,执行时会阻塞整个线程
"""
print(f"【同步】开始处理Prompt:{prompt}")
print(f"【同步】等待模型返回,等待时间:{delay}秒")
# 阻塞等待:程序在此处完全停止,CPU空闲,无法处理任何其他任务
time.sleep(delay)
print(f"【同步】Prompt:{prompt} 处理完成")
# 主程序执行
if __name__ == "__main__":
start = time.time()
# 串行执行两个任务,必须等第一个完全结束,才能开始第二个
sync_llm_request("介绍Python", 2)
sync_llm_request("介绍异步编程", 2)
end = time.time()
print(f"【同步】总耗时:{end - start:.2f} 秒")
运行输出
【同步】开始处理Prompt:介绍Python
【同步】等待模型返回,等待时间:2秒
【同步】Prompt:介绍Python 处理完成
【同步】开始处理Prompt:介绍异步编程
【同步】等待模型返回,等待时间:2秒
【同步】Prompt:介绍异步编程 处理完成
【同步】总耗时:4.01 秒
逐行输出解释
-
程序进入第一个函数,打印开始信息;
-
执行
time.sleep(2),这是阻塞操作,操作系统会把当前线程挂起,程序不再执行任何代码; -
2秒后,线程恢复,打印完成信息,函数返回;
-
程序才会进入第二个函数,重复上述等待过程;
-
总耗时 = 2 + 2 = 4秒,效率极低。
在真实AI大模型调用中,这个等待时间通常是3~10秒,如果有100个用户请求,同步代码需要几百秒,完全无法上线使用。
接下来我们看异步非阻塞版本,这是高性能AI服务的基础。你会看到任务不再排队,而是同时开始、同时等待、同时完成,总耗时只等于最长单个任务的耗时。
import asyncio
import time
async def async_llm_request(prompt: str, delay: float):
"""
异步大模型请求函数
async def 声明这是一个协程函数
"""
print(f"【异步】开始处理Prompt:{prompt}")
print(f"【异步】等待模型返回,等待时间:{delay}秒")
# 异步非阻塞等待:挂起当前协程,让出CPU,事件循环去执行其他任务
await asyncio.sleep(delay)
print(f"【异步】Prompt:{prompt} 处理完成")
async def main():
"""
主协程:负责创建并调度所有异步任务
"""
# create_task:将协程包装为Task对象,立刻加入事件循环调度
task1 = asyncio.create_task(async_llm_request("介绍Python", 2))
task2 = asyncio.create_task(async_llm_request("介绍异步编程", 2))
# 等待两个任务全部执行完成
await task1
await task2
if __name__ == "__main__":
start = time.time()
# asyncio.run():创建事件循环,运行主协程,结束后关闭循环
asyncio.run(main())
end = time.time()
print(f"【异步】总耗时:{end - start:.2f} 秒")
运行输出
【异步】开始处理Prompt:介绍Python
【异步】等待模型返回,等待时间:2秒
【异步】开始处理Prompt:介绍异步编程
【异步】等待模型返回,等待时间:2秒
【异步】Prompt:介绍Python 处理完成
【异步】Prompt:介绍异步编程 处理完成
【异步】总耗时:2.01 秒
逐行输出解释
-
asyncio.run(main())会在底层创建一个事件循环,这是异步程序的“调度中心”; -
asyncio.create_task()会把协程包装成Task,并立即加入调度队列,而不是等待执行; -
执行task1,打印信息,遇到
await asyncio.sleep(2); -
关键点:
await会挂起当前协程,主动把CPU使用权让给事件循环,循环立刻切换执行task2; -
task2同样挂起后,事件循环没有可执行任务,进入轻量等待;
-
2秒后,两个任务同时就绪,依次恢复执行;
-
总耗时 ≈ 2秒,与单个任务耗时一致。
这就是异步编程的核心价值:在I/O等待时,CPU不空闲,能处理其他任务。
二、协程、Task、Future、事件循环(底层原理全拆解)
1. 协程(Coroutine)
协程是可以暂停、可以恢复执行的函数,由async def定义。
-
普通函数:调用 → 执行到结束 → 返回
-
协程:调用 → 执行 →
await暂停 → 恢复 → 结束
重要:协程不能直接运行!
下面这段代码是错误的,协程不会执行,只会返回一个协程对象:
async def test():
print("hello")
test() # 错误!不会打印任何内容,只会输出 <coroutine object test at 0x...>
正确运行协程的唯一方式:
asyncio.run(test())
2. 事件循环(Event Loop)
事件循环是异步程序的大脑,它的工作流程是固定且机械的:
-
初始化任务队列;
-
从队列取出一个任务执行;
-
遇到
await,把任务挂起,放回队列; -
取下一个任务执行;
-
重复直到所有任务完成;
-
关闭循环。
从Python 3.7开始,我们不需要手动创建循环,asyncio.run()会自动完成:创建循环 → 设置为当前循环 → 运行协程 → 关闭清理。
3. Task(任务)
Task是协程的包装器,是Future的子类。
它的唯一作用:让协程能够被事件循环调度,实现并发。
-
只
await协程:串行执行,无并发; -
用
create_task包装后await:并发执行。
4. Future
Future是一个**“结果占位符”**,代表一个还没有完成的异步操作的结果。
我们在业务代码中几乎不会直接使用Future,它主要用于框架底层开发,Task继承自Future,所以我们使用Task即可。
三、并发控制三大核心API:gather / wait / as_completed(全参数详解)
这三个函数是异步并发控制的核心,用途完全不同,在AI大模型批量调用中必须精准选择。
1. asyncio.gather(最常用:批量执行,按顺序返回)
gather会并发运行所有传入的任务,等待全部完成后,按传入顺序返回结果。
它支持异常捕获,支持批量传参,是AI批量调用大模型最常用的API。
import asyncio
async def llm_task(prompt: str, wait_time: float):
await asyncio.sleep(wait_time)
return f"Prompt: {prompt} | 处理完成"
async def main():
# 并发运行3个任务,按传入顺序返回结果
results = await asyncio.gather(
llm_task("你好", 1),
llm_task("Python", 2),
llm_task("异步编程", 1.5)
)
for res in results:
print(res)
asyncio.run(main())
输出
Prompt: 你好 | 处理完成
Prompt: Python | 处理完成
Prompt: 异步编程 | 处理完成
详细说明
-
结果顺序与任务传入顺序完全一致,不会因为执行快慢改变;
-
任意一个任务抛出异常,会直接导致整个gather抛出异常;
-
可以添加
return_exceptions=True,把异常当作结果返回,不中断其他任务。
2. asyncio.wait(灵活控制:等第一个/等全部/超时)
wait可以控制等待策略,适合需要优先处理最快完成任务的场景,比如大模型多路调用,谁先返回用谁。
它返回两个集合:done(已完成任务)、pending(未完成任务)。
import asyncio
async def task(seconds):
await asyncio.sleep(seconds)
return seconds
async def main():
tasks = [
asyncio.create_task(task(1)),
asyncio.create_task(task(2)),
asyncio.create_task(task(3))
]
# 等待第一个任务完成就返回
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print("已完成任务结果:", [t.result() for t in done])
print("未完成任务数量:", len(pending))
asyncio.run(main())
输出
已完成任务结果: [1]
未完成任务数量: 2
详细说明
-
return_when=asyncio.FIRST_COMPLETED:第一个任务完成就返回; -
return_when=asyncio.ALL_COMPLETED:等待全部完成(默认); -
return_when=asyncio.FIRST_EXCEPTION:第一个异常抛出就返回; -
支持
timeout参数,超时后直接返回,未完成任务保留在pending中。
3. asyncio.as_completed(按完成顺序返回:流式处理)
as_completed会返回一个迭代器,任务谁先完成,谁先输出,非常适合大模型批量调用、流式响应场景。
import asyncio
async def task(seconds):
await asyncio.sleep(seconds)
return f"耗时{seconds}秒的任务完成"
async def main():
tasks = [task(3), task(1), task(2)]
# 按完成顺序遍历结果
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())
输出
耗时1秒的任务完成
耗时2秒的任务完成
耗时3秒的任务完成
详细说明
-
结果顺序完全由任务执行快慢决定;
-
适合实时展示大模型返回结果,不用等待全部任务完成;
-
是AI对话、流式输出、实时检索的核心API。
四、任务控制:超时、取消、异常捕获(工程级必备)
在AI大模型调用中,超时、任务取消、异常处理是必须做的防护措施,否则一个卡住的模型请求会拖垮整个服务。
1. 超时控制:asyncio.wait_for
强制给任务设置最大等待时间,超过时间直接抛出TimeoutError。
import asyncio
async def llm_long_task():
await asyncio.sleep(5) # 模拟模型长时间不返回
async def main():
try:
# 最多等待1秒,超时直接抛出异常
await asyncio.wait_for(llm_long_task(), timeout=1)
except asyncio.TimeoutError:
print("【错误】大模型请求超时,自动断开")
asyncio.run(main())
输出
【错误】大模型请求超时,自动断开
2. 任务取消:Task.cancel()
手动取消一个正在等待或执行的任务,比如用户主动断开对话。
import asyncio
async def llm_task():
await asyncio.sleep(5)
async def main():
task = asyncio.create_task(llm_task())
task.cancel() # 立即取消任务
try:
await task
except asyncio.CancelledError:
print("【信息】任务已被用户取消")
asyncio.run(main())
输出
【信息】任务已被用户取消
五、异步高级特性:上下文管理器、生成器、信号量、队列(生产环境核心)
这部分内容是AI大模型应用开发的核心技术,覆盖流式输出、并发限流、资源管理、任务调度全场景。
1. 异步上下文管理器(自动释放模型连接)
用于自动创建/销毁资源,比如大模型客户端、数据库连接、HTTP会话,避免资源泄漏。
定义方式:实现__aenter__和__aexit__两个异步方法。
import asyncio
class AsyncAIClient:
"""模拟异步大模型客户端"""
async def __aenter__(self):
print("【连接】建立大模型API连接")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("【断开】自动关闭连接,释放资源")
async def main():
# async with 自动管理生命周期
async with AsyncAIClient() as client:
print("【使用】调用模型进行推理")
asyncio.run(main())
输出
【连接】建立大模型API连接
【使用】调用模型进行推理
【断开】自动关闭连接,释放资源
2. 异步生成器(大模型流式输出:打字机效果)
ChatGPT、文心一言的逐字返回效果,底层完全依赖异步生成器。
它可以一边生成结果,一边返回,不用等待全部内容生成完毕。
import asyncio
async def llm_stream_response(prompt: str):
"""模拟大模型流式逐字返回"""
response = f"这是针对「{prompt}」的回复"
for char in response:
await asyncio.sleep(0.1) # 模拟逐字生成
yield char # 异步生成器:逐个返回字符
async def main():
async for char in llm_stream_response("异步编程"):
print(char, end="")
asyncio.run(main())
输出
这是针对「异步编程」的回复
(输出效果为逐字打印,模拟真实AI对话)
3. 信号量 Semaphore(控制并发数,防止打爆模型API)
大模型API都有并发限制,一次性发起100个请求会直接被限流封禁。
使用Semaphore可以严格控制最大并发数。
import asyncio
# 最大并发数 = 3
semaphore = asyncio.Semaphore(3)
async def llm_api_call(task_id: int):
# 进入信号量:超过3个任务会在这里等待
async with semaphore:
print(f"任务{task_id} 开始调用模型")
await asyncio.sleep(1)
print(f"任务{task_id} 调用完成")
async def main():
tasks = [asyncio.create_task(llm_api_call(i)) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
输出规律
-
每次只会同时运行3个任务;
-
完成一个,才会进入下一个;
-
完美保护API不被限流。
4. 异步队列(AI任务调度:生产-消费模型)
用于解耦任务提交与任务执行,是AI对话系统、RAG调度、批量推理的核心组件。
import asyncio
# 异步队列
queue = asyncio.Queue(maxsize=5)
# 生产者:添加对话任务
async def producer():
for i in range(5):
await queue.put(f"用户对话任务{i}")
print(f"添加任务:用户对话任务{i}")
# 消费者:处理模型推理
async def consumer():
while True:
task = await queue.get()
print(f"处理任务:{task}")
await asyncio.sleep(1)
queue.task_done()
async def main():
await asyncio.gather(producer(), consumer())
asyncio.run(main())
输出
添加任务:用户对话任务0
添加任务:用户对话任务1
添加任务:用户对话任务2
添加任务:用户对话任务3
添加任务:用户对话任务4
处理任务:用户对话任务0
处理任务:用户对话任务1
处理任务:用户对话任务2
处理任务:用户对话任务3
处理任务:用户对话任务4
六、同步阻塞代码异步化(解决90%的实际工程问题)
很多旧库(如requests、pymysql、部分旧版模型SDK)只提供同步接口,直接在异步函数中使用会阻塞整个事件循环,导致所有任务卡住。
解决方案:使用asyncio.to_thread()将同步代码放到线程池中运行,不阻塞事件循环。
import asyncio
import time
def sync_llm_infer(prompt: str):
"""同步阻塞的模型推理函数"""
time.sleep(1)
return f"同步推理结果:{prompt}"
async def async_llm_infer(prompt: str):
"""包装为异步函数"""
# 放到线程池运行,不阻塞事件循环
result = await asyncio.to_thread(sync_llm_infer, prompt)
return result
async def main():
res = await async_llm_infer("测试异步化")
print(res)
asyncio.run(main())
输出
同步推理结果:测试异步化
重要说明
-
CPU密集任务(本地大模型推理):使用
asyncio.ProcessPoolExecutor多进程; -
I/O密集任务(同步API、数据库):使用
asyncio.to_thread; -
绝对不要在异步函数中直接调用同步阻塞函数。
七、异步编程在AI大模型开发中的核心应用(完整可运行实战)
异步批量调用大模型API(生产级代码)
这段代码可以直接用于项目,支持并发控制、超时保护、批量请求,是AI应用最常用的代码模板。
import asyncio
import aiohttp
# 并发限制
SEMAPHORE = asyncio.Semaphore(5)
async def call_llm(session: aiohttp.ClientSession, prompt: str):
"""异步调用大模型API"""
async with SEMAPHORE:
try:
# 替换为真实大模型API地址
url = "https://httpbin.org/delay/1"
payload = {"prompt": prompt}
async with session.post(url, json=payload, timeout=5) as resp:
return await resp.json()
except asyncio.TimeoutError:
return {"error": "请求超时"}
except Exception as e:
return {"error": str(e)}
async def batch_llm_call(prompts: list):
"""批量调用大模型"""
async with aiohttp.ClientSession() as session:
tasks = [call_llm(session, p) for p in prompts]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
prompts = [
"介绍RAG",
"介绍Agent",
"介绍微调",
"介绍向量数据库",
"介绍Prompt工程"
]
results = asyncio.run(batch_llm_call(prompts))
for i, res in enumerate(results):
print(f"任务{i} 结果:", res)
输出
所有任务并发执行,总耗时≈1秒,而同步执行需要5秒,性能提升5倍。并发量越大,异步优势越明显。
八、异步编程终极避坑指南(高级工程师必须熟记)
-
绝对不要在async函数中使用同步I/O(
requests/time.sleep/pymysql); -
绝对不要忘记await,协程永远不会执行;
-
绝对不要嵌套asyncio.run(),会直接崩溃;
-
绝对不要在异步中使用同步锁,会导致死锁;
-
连接池必须全局唯一(
aiohttp.ClientSession只创建一次); -
CPU密集任务必须用多进程,异步无法提升CPU密集性能;
-
所有异步任务必须捕获异常,否则会静默失败;
-
大模型API必须加信号量限流+超时控制。
九、全文总结(最核心的知识闭环)
异步编程的本质是单线程协作式并发,依靠await主动挂起任务,在I/O等待期间处理其他任务,完全避开Python GIL限制,专门为I/O密集型场景设计。
在AI大模型开发中:
-
大模型API调用 = I/O密集 → 异步提升10~100倍性能;
-
RAG检索 = 多库并发查询 → 异步大幅降低延迟;
-
流式对话 = 异步生成器 → 实现逐字输出;
-
FastAPI服务 = 异步接口 → 单进程扛1000+并发;
-
LangChain/LlamaIndex = 原生异步 → 必须使用异步接口。
只要你完全吃透本文所有内容,你就已经达到Python异步编程专家级水平,足以独立开发高并发、高性能、生产级的AI大模型应用系统。
更多推荐

所有评论(0)