【vllm】AsyncLLM 协程学习
的用法是定义一个可以被并发执行的、非阻塞的生成任务。为什么用async?vLLM 的 API 服务器需要同时处理成百上千个用户的请求。如果使用同步模式,一个用户的请求在等待 GPU 计算时,整个服务器线程就会被阻塞,其他所有用户都得排队等着,这会造成巨大的性能瓶颈。async解决了什么问题?通过async和await,当一个请求在等待 GPU 时,服务器可以立即切换去处理另一个请求,或者从 GPU
async def generate(
engine: **AsyncLLM**,
request_id: str,
prompt: PromptType,
output_kind: RequestOutputKind,
max_tokens: int,
n: int = 1,
prompt_logprobs: int | None = None,
cancel_after: int | None = None,
) -> tuple[int, str]:
# Ensure generate doesn't complete too fast for cancellation test.
await asyncio.sleep(0.2)
count = 0
sampling_params = SamplingParams(
max_tokens=max_tokens,
ignore_eos=True,
output_kind=output_kind,
temperature=0.5,
seed=33,
n=n,
prompt_logprobs=prompt_logprobs,
)
async for out in engine.generate(
request_id=request_id, prompt=prompt, sampling_params=sampling_params
):
num_tokens = sum(len(output.token_ids) for output in out.outputs)
if output_kind == RequestOutputKind.DELTA:
count += num_tokens
else:
count = num_tokens
if cancel_after is not None and count >= cancel_after:
return count, request_id
await asyncio.sleep(0.0)
return count, request_id
这是什么用法? async 没有太明白?
当然!这是一个非常好的问题,async / await 是现代高性能网络编程(比如 vLLM 的 API 服务器)的基石。不理解它,就很难明白 vLLM 为什么能同时处理那么多请求。
简单来说,async 的核心思想是:“在等待的时候,去做点别的事。”
让我们用一个生活中的比喻和一个技术上的解释来彻底搞懂它。
一、生活中的比喻:一个厨师做两道菜
想象一个厨师(代表一个程序线程)需要做两道菜:一道是“烧水”,一道是“切菜”。
1. 同步 (Synchronous) 的厨师 - 没有 async
- 把水壶放到炉子上。
- 一直站在炉子前,死死地盯着水壶,直到水烧开。 (这个过程叫阻塞 (Blocking),因为厨师被“烧水”这件事完全卡住了,做不了别的事情。)
- 水烧开后,他才拿起刀开始切菜。
缺点:效率极低。在等水开的几分钟里,厨师完全被浪费了。如果这时来了新客人点菜,他根本没法处理。
2. 异步 (Asynchronous) 的厨师 - 使用 async
- 把水壶放到炉子上。(这是一个需要等待的任务,我们称之为**“异步任务”**)
- 他对自己说:“水在烧了,等它开的时候会通知我(比如水壶会响)。在这期间,我先去做别的事。”
- 于是他转身去切菜。
- 切菜切到一半,水壶响了(异步任务完成了)。
- 他暂停切菜,去处理开水。
- 处理完开水后,再回来继续切菜。
优点:效率极高。厨师的时间没有被浪费在“等待”上。他可以同时处理多个任务的“非等待”部分。如果来了新客人,他也能立刻响应。
二、技术上的解释和代码分析
在程序中,“等待”通常指的是 I/O 操作 (Input/Output),例如:
- 等待网络请求返回数据(比如 vLLM 等待 GPU 计算出结果)。
- 等待从硬盘读取文件。
- 等待数据库查询结果。
time.sleep()这样的人为等待。
async 和 await 就是 Python 中实现“异步厨师”模式的语法。
async def:声明一个函数是“异步函数”(一个“异步菜谱”)。它不会立即执行到底,而是返回一个协程 (Coroutine) 对象。你可以把它想象成一个可以被暂停和恢复的任务。await:告诉程序:“接下来这个操作需要等待,请在这里暂停当前任务,然后让程序去做点别的(运行其他异步任务)。等这个操作完成后,再回到这里继续执行。”- 事件循环 (Event Loop):它就是那个“总调度师”,负责管理所有的异步任务。当一个任务
await暂停时,事件循环就会去找另一个已经准备就绪的任务来执行。
现在我们来逐行分析你提供的 generate 函数:
// 1. 声明这是一个异步函数
async def generate(
engine: AsyncLLM,
# ... 其他参数 ...
) -> tuple[int, str]:
// 2. 第一次等待:人为暂停
// 这里告诉事件循环:“请暂停这个 generate 任务 0.2 秒”。
// 在这 0.2 秒里,CPU 不是空闲的!事件循环可以去处理其他用户的API请求、
// 或者检查其他已经完成的GPU计算任务。
// 这使得服务器在人为延迟时也能保持响应。
await asyncio.sleep(0.2)
count = 0
sampling_params = SamplingParams(...)
// 3. 最核心的等待:异步循环
// engine.generate(...) 本身就是一个异步生成器。它会向vLLM后端提交一个生成请求。
// GPU 计算 token 需要时间,这就是一个典型的 I/O 等待。
// async for 的意思是:
// a. 向 engine 请求下一个生成块 (output chunk)。
// b. await:等待 engine 返回结果。在等待 GPU 计算的这段时间里,此 generate 任务暂停,
// 事件循环会去执行其他任务(比如处理另一个用户的请求,或者从另一个已完成的请求中取结果)。
// c. 当 GPU 计算完一小块 token 并返回 (`out`) 时,事件循环会唤醒这个任务,
// 从这里继续执行循环体内的代码。
// d. 循环体执行完毕后,回到 a,请求下一个块,然后再次 await 等待...
async for out in engine.generate(
request_id=request_id, prompt=prompt, sampling_params=sampling_params
):
# ... (处理返回的 token) ...
num_tokens = sum(len(output.token_ids) for output in out.outputs)
# ...
if cancel_after is not None and count >= cancel_after:
return count, request_id
// 4. 又一次巧妙的等待
// await asyncio.sleep(0.0) 的意思是:“立即放弃CPU控制权,让事件循环处理一下其他任务,然后马上把我加回到待执行队列里”。
// 这是一个“合作”的信号,确保即使这个 for 循环非常快,也不会长时间霸占CPU,
// 从而让其他任务(比如检查是否有取消请求)有机会执行。它能提高服务器的整体响应性。
await asyncio.sleep(0.0)
return count, request_id
总结
async def generate(...) 的用法是定义一个可以被并发执行的、非阻塞的生成任务。
-
为什么用
async?
vLLM 的 API 服务器需要同时处理成百上千个用户的请求。如果使用同步模式,一个用户的请求在等待 GPU 计算时,整个服务器线程就会被阻塞,其他所有用户都得排队等着,这会造成巨大的性能瓶颈。 -
async解决了什么问题?
通过async和await,当一个请求在等待 GPU 时,服务器可以立即切换去处理另一个请求,或者从 GPU 接收另一个已经计算好的请求结果。这样,一个单独的 Python 进程就能高效地管理大量的并发请求,最大限度地利用 GPU 资源,实现极高的吞吐量。
所以,你看到的 async 和 await 是 vLLM 实现高性能、高并发 API 服务的核心魔法。它将“等待”的时间片利用起来,让单个程序看起来像在“同时”做很多事情。
太棒了,这个问题问到了 asyncio 编程模式的一个精髓!
await asyncio.sleep(0.0) 的意思不是“等待0秒”,因为这听起来毫无意义。
它的真正含义是:“立即放弃CPU控制权,让事件循环(Event Loop)去处理一下其他待办任务,然后马上把我重新排队,尽快恢复执行。”
这是一个“合作式多任务”中的“礼让”行为。
让我们用一个更生动的比喻来解释。
比喻:一场需要合作的会议
想象你正在参加一个重要的在线会议,主持人是 事件循环 (Event Loop),每个参会者都是一个 异步任务 (Coroutine)。
这个会议有一个规则:同一时间只能有一个人说话。
-
没有
await asyncio.sleep(0.0)的情况:一个“话痨”参会者
你有一个非常复杂的观点要阐述(一个计算量很大的for循环)。你开始滔滔不绝地讲,一讲就是5分钟,中间没有任何停顿。
结果: 在这5分钟里,其他所有参会者(其他任务,比如处理一个新的API请求、检查一个任务是否被取消)都只能干等着。主持人(事件循环)也只能看着你,无法切换给别人。整个会议(你的程序)都“卡”在你一个人身上了,显得非常不响应。 -
有
await asyncio.sleep(0.0)的情况:一个“有礼貌”的参会者
你还是有那个复杂的观点要讲。但你非常有礼貌,你每讲一小段(for循环的一次迭代),就停下来对主持人说一句:“我先停一下,看看大家有没有别的事要说?如果没有,我再继续。”
这句话,就是await asyncio.sleep(0.0)。
结果:- 你“礼让”了发言权,主持人(事件循环)立刻获得控制权。
- 主持人会快速检查一下:有没有其他人举手了?(比如,有没有新的网络请求进来?有没有其他任务已经完成了等待,可以继续执行了?)
- 如果有,就让那个人说几句(执行其他任务)。
- 处理完紧急的事情后,主持人发现你还在等待发言,于是立刻把控制权交还给你,让你继续讲下一小段。
因为这个“礼让”的动作非常快(sleep(0) 意味着你不想真的“睡觉”,只是想让出控制权),所以你自己的任务几乎没有被耽误,但整个会议(程序)却变得极其响应和高效。
技术上的解释
-
合作式多任务 (Cooperative Multitasking):
asyncio的调度模式是“合作式”的。一个任务一旦开始执行,它会一直执行下去,直到它主动说“我需要等待”或“我愿意让出控制权”。这个主动的行为就是通过await关键字实现的。 -
阻塞事件循环 (Blocking the Event Loop):如果一个异步任务中包含了长时间运行的、纯CPU计算的同步代码(例如一个没有任何
await的大循环),它就会“霸占”事件循环,导致整个异步程序停止响应。这是asyncio编程中的大忌。 -
await asyncio.sleep(0.0)的作用:它就是解决上述问题的最佳工具。通过在耗时循环中插入这行代码,你手动地、周期性地将控制权交还给事件循环,让它有机会去处理其他事情。这确保了程序的公平性 (Fairness) 和 响应性 (Responsiveness)。
回到你的 vLLM 代码
async for out in engine.generate(...):
# ... 一些快速的计算 ...
# 为什么在这里要加这个?
await asyncio.sleep(0.0)
在 engine.generate 的 async for 循环中,await 已经发生在 for 循环的每次迭代开始时(等待 GPU 返回下一个 token)。那么为什么在循环体内部还要加一个 await asyncio.sleep(0.0) 呢?
这是一种防御性编程,为了应对一种可能的边缘情况:
- 如果 GPU 生成 token 的速度非常非常快,并且循环体内部的代码执行得也很快,那么这个
async for循环可能会在极短的时间内连续执行很多次。 - 虽然每次
async for都会await,但如果结果已经准备好了,这个await的等待时间可能趋近于零。 - 在这种高速运转下,这个任务仍然可能在一段时间内“霸占”CPU,导致其他不相关的任务(比如处理一个外部的“取消请求”命令)得不到及时的响应。
通过在循环体末尾加上 await asyncio.sleep(0.0),开发者强制保证了在每一次循环迭代之后,事件循环都有机会去处理其他任何待办事项。这使得服务器在极端高负载下也能保持最高的响应性,比如能立即响应一个取消请求。
总结
await asyncio.sleep(0.0) 是一个优雅的技巧,它的核心目的不是“睡眠”,而是:
向 asyncio 的事件循环发出的一个信号:“我主动让出执行权,请你去处理一下其他任务,然后尽快回来继续执行我。”
它是在 asyncio 世界里保持程序流畅、不卡顿的关键工具之一。
问题2. 虽然每次 async for 都会 await,但如果结果已经准备好了,这个 await 的等待时间可能趋近于零。
在这种高速运转下,这个任务仍然可能在一段时间内“霸占”CPU,导致其他不相关的任务(比如处理一个外部的“取消请求”命令)得不到及时的响应。 执行await 也会让出CPU 吧,与await asyncio.sleep(0.0)是一样的吧?
这是一个非常好的追问,触及了 await 行为的更深层次细节。你的理解非常接近了,但有一个关键的细微差别。
执行
await也会让出 CPU 吧,与await asyncio.sleep(0.0)是一样的吧?
答案是:不完全一样。 await asyncio.sleep(0.0) 是无条件地让出 CPU,而普通的 await 是有条件地让出。
让我们来详细解释这个关键区别。
await 的真实工作机制
当你执行 await some_coroutine() 时,asyncio 的事件循环会做以下事情:
-
检查
some_coroutine是否已经完成?- 这个协程可能代表一个 I/O 操作。事件循环会检查这个 I/O 操作是否已经有结果了。
- 例如,
await reader.read(100),事件循环会看网络缓冲区里是否已经收到了100个字节。
-
根据检查结果做决定:
-
情况 A:协程尚未完成 (The common case)
- 这通常意味着程序需要等待外部资源(网络、磁盘、GPU 等)。
- 此时,
await才会真正地暂停当前任务,并将控制权交还给事件循环。 事件循环会去执行其他准备就绪的任务。这是我们通常理解的await的行为。
-
情况 B:协程已经完成或可以立即完成 (The subtle case)
- 如果事件循环检查发现,
await的目标操作已经有结果了(比如,数据已经在缓冲区里了,或者它是一个已经完成的Future对象),那么事件循环就没有必要进行任务切换。 - 它会立即从协程中提取结果,然后让当前任务继续往下执行,而不会暂停。
- 在这种情况下,
await只是一个获取结果的动作,并没有发生控制权的转移,当前任务仍然“霸占”着 CPU。
- 如果事件循环检查发现,
-
await asyncio.sleep(0.0) 的特殊性
await asyncio.sleep(0.0) 是一个非常特殊的协程。它的设计目的就是为了触发上述的“让出”行为。
当你 await asyncio.sleep(0.0) 时,事件循环的逻辑是:
- 事件循环收到一个“睡眠0秒”的指令。
- 它会将当前任务标记为“暂停”,并把它放入一个“下一轮就绪”的队列中。
- 然后,它无条件地立即寻找并执行其他准备就绪的任务。
- 在事件循环的下一次“滴答”(tick)中,它会立刻发现之前那个“睡眠0秒”的任务可以被唤醒了,于是恢复它的执行。
关键区别就在于:await asyncio.sleep(0.0) 强制事件循环进行一次调度,保证了控制权一定会交出去一次。而普通的 await 只有在它等待的目标确实需要时间时,才会交出控制权。
回到 vLLM 的例子
# 假设 GPU 性能极好,网络延迟极低
async for out in engine.generate(...): # 1. 这里的 await
# ... 一些快速的计算 ...
await asyncio.sleep(0.0) # 2. 这里的 await
-
async for中的await:- 当循环请求下一个
out时,它会awaitengine.generate的__anext__()方法。 engine会去检查 GPU 是否已经产出了新的 token。- 如果 GPU 产出 token 的速度比 Python 代码处理的速度还要快,那么很可能每次
engine去检查时,新的 token 已经准备好了。 - 在这种情况下,
async for里的await就可能属于我们上面说的 情况 B:它直接拿到结果,不暂停任务,不让出 CPU,直接进入循环体。
- 当循环请求下一个
-
可能出现的问题:
如果这种情况连续发生,比如连续 100 次迭代,async for中的await都发现结果已就绪,那么这个for循环就会像一个普通的同步for循环一样,连续、不间断地执行 100 次。在这段时间内,它就“霸占”了事件循环。如果此时有一个外部的“取消请求”到达,事件循环根本没有机会去处理它。 -
await asyncio.sleep(0.0)的作用:
通过在循环体末尾加入这行代码,我们强制实现了**“礼让”。即使async for中的await没有让出控制权,await asyncio.sleep(0.0)也保证了在每一次循环迭代结束时,控制权都会被交还给事件循环一次**。这就给了事件循环一个“喘息”的机会,去检查和处理其他任何待办任务(比如那个重要的“取消请求”)。
总结
await 类型 |
是否让出 CPU 控制权? | 用途 |
|---|---|---|
await some_io_operation() |
有条件的:仅当 some_io_operation 未完成时才让出。 |
等待一个耗时的 I/O 操作(网络、磁盘、GPU)。 |
await asyncio.sleep(0.0) |
无条件的:总是立即让出,并在下一轮事件循环中恢复。 | 强制进行任务调度,防止长时间运行的计算任务阻塞事件循环,提高程序响应性。 |
所以,await asyncio.sleep(0.0) 是一种更强的、更明确的“让出 CPU”的指令,它被用作一种保险机制,确保即使在最高速运转的情况下,asyncio 程序的合作精神和响应性也不会被破坏。
async for 上面什么用法。 另外让出CPU, CPU执行其他线程了,还是执行当前线程的其他任务?
好的,我们来逐一分解这两个非常关键的问题。
问题一:async for 是什么用法?
async for 用于遍历一个异步迭代器(Asynchronous Iterator)。
让我们先回顾一下普通的 for 循环:
my_list = [1, 2, 3]
for item in my_list:
print(item)
这里的 my_list 是一个普通的、已经存在于内存中的列表。for 循环只是依次取出里面的元素。
现在,想象一个场景:你要获取的数据不是一次性全部准备好的,而是需要等待才能逐个产生的。比如:
- 从一个很慢的网络连接中逐行读取数据。
- 从数据库中分批次(pagination)获取大量记录。
- 在我们的 vLLM 例子中,等待 GPU 生成下一个 token 或 token 块。
这些场景就是 async for 的用武之地。
async for 的工作流程:
- 请求下一个元素:循环开始时,它会调用异步迭代器的
__anext__()方法。这个方法会立即返回一个协程 (Coroutine),表示“我正在去获取下一个元素的路上”。 await等待:async for内部隐式地对这个协程执行了await。这意味着:- 当前任务(比如
generate函数)在这里暂停。 - 事件循环获得控制权,可以去执行其他不相关的异步任务。
- 当那个“获取下一个元素”的操作完成时(比如 GPU 算出了新的 token),事件循环会唤醒这个暂停的任务。
- 当前任务(比如
- 获取元素并执行循环体:任务被唤醒后,
await结束,__anext__()返回了真正的元素值(在 vLLM 中是out对象)。然后,循环体内的代码被执行。 - 重复:循环体执行完毕后,回到第 1 步,请求再下一个元素,直到异步迭代器耗尽(发出
StopAsyncIteration信号),循环结束。
总结 async for 的用法:
它是一个语法糖,让你能像写普通 for 循环一样,去优雅地处理那些需要在每次迭代之间进行 await 等待的序列。
在你的代码中:
async for out in engine.generate(...):
# ...
engine.generate(...) 返回的就是一个异步迭代器。每一次循环,它都在 await 等待 vLLM 引擎从 GPU 计算并返回下一个文本块 out。
问题二:让出 CPU,CPU 是去执行其他线程了,还是执行当前线程的其他任务?
这是一个非常精准且重要的问题!答案是:
执行当前线程中的其他任务。
让我们来澄清一下 线程 (Thread) 和 异步任务 (Task/Coroutine) 的区别,这是理解 asyncio 的关键。
1. 线程 (Threading) - 操作系统级别的并发
- 一个 Python 进程可以包含多个线程。
- 线程是由操作系统(OS)来调度的。操作系统可以在任何时候中断一个线程,把 CPU 时间片分配给另一个线程。这被称为抢占式多任务(Preemptive Multitasking)。
- 优点:可以真正利用多核 CPU,一个线程在做计算,另一个线程可以在另一个核上做计算。
- 缺点(在 Python 中):由于全局解释器锁(GIL)的存在,同一时间只有一个线程能执行 Python 字节码。所以 Python 的多线程对于 CPU 密集型任务效果不佳,但对于 I/O 密集型任务(因为等待 I/O 时会释放 GIL)仍然有效。线程切换的开销也相对较大。
2. 异步任务 (Asyncio Tasks) - 程序级别的并发
asyncio运行在一个单一的线程里。- 在这个单线程中,有一个事件循环 (Event Loop),它负责管理一堆的异步任务 (Tasks)。
- 任务的调度是合作式(Cooperative Multitasking)的。一个任务会一直运行,直到它自己通过
await关键字主动让出控制权。 - 当一个任务通过
await让出 CPU 时,事件循环(它自己也在这个线程里运行)就会获得控制权,然后它会查看任务列表,选择一个已经准备就绪的(比如等待的 I/O 操作完成了)任务来继续执行。
所以,当 await asyncio.sleep(0.0) 执行时:
- 当前的异步任务(比如处理某个用户请求的
generate函数)被暂停。 - 控制权交给了运行在同一个线程中的事件循环。
- 事件循环检查它的任务队列,可能会去做以下事情(全都在同一个线程里!):
- 启动一个新收到的 API 请求对应的异步任务。
- 继续执行另一个之前因为等待 GPU 而暂停,但现在 GPU 结果已经返回的任务。
- 处理网络连接、发送数据等底层 I/O。
- 当事件循环发现
sleep(0.0)对应的任务可以立即恢复时(或者在处理完其他紧急事务后),它就会回到原来的任务,从await之后的那行代码继续执行。
结论:
await 导致的“让出 CPU”不是线程切换。CPU 并没有离开当前的 Python 进程或线程。它只是在这个单一线程内部,从执行一个异步任务,切换到了执行该线程内的另一个异步任务。
这种模式的开销极小,因为不涉及操作系统层面的上下文切换,只是在程序内部改变了几个指针。这正是 asyncio 在处理成千上万个 I/O 密集型并发连接(如 vLLM 的 API 服务器)时,比多线程模型更高效、更轻量的原因。
更多推荐


所有评论(0)