大模型推理引擎vLLM(17): vllm/vllm/model_executor/layers/fused_moe/modular_kernel.py代码阅读笔记
·
文章目录
0 引言
把vllm/vllm/model_executor/layers/fused_moe/modular_kernel.py代码看了下,简单做一下笔记。
1 整体理解
#
# This file defines a set of base classes used to make MoE kernels more modular.
# The goal is to be able to utilize different communication mechanisms with
# any fused MoE kernel without needing to have combinatoric implementations.
#
# The fused moe kernels are broken down into the following components:
#
# [Router] → [Quantize-Dispatch] → [Permute-Experts-Unpermute] → [Combine]
#
# Each component will be independent of the others except for
# [Quantize-Dispatch] and `[Combine] (see below). The components can then be
# mixed and matched with so that DP+EP can be supported easily for multiple
# MoE kernel implementations.
#
# The following main classes are defined:
# * FusedMoEPrepareAndFinalize - an abstract base class for preparation of MoE
# inputs (e.g. quantization, distribution) and finalization of Moe outputs.
# The prepare method must take care of any needed quantization and the
# finalize method must apply weights and do the final reduction of the output.
# * FusedMoEPermuteExpertsUnpermute - an abstract base class for the main fused
# MoE operation. One important feature to note is that this class does not
# apply topk weights or reduce the final output.
# * FusedMoEModularKernel - an interface class that combines a
# FusedMoEPrepareAndFinalize and a FusedMoEPermuteExpertsUnpermute to
# provide the standard fused MoE kernel interface.
#
# [Quantize-Prepare] and [Finalize] functionality are bundled into a single
# class `FusedMoEPrepareAndFinalize` since they could use collective
# communication mechanisms that need to be consistent.
个人理解:
- 这里面其实就是说,这个文件就是为了让moe 的核函数更加模块化,目标是如果使用了
不同的通信方法,那么核函数也不需要再去做什么适配。 - 然后为了实现上面的目的,这里将MOE相当于进行了拆分,拆成了下面几个部分,
[Router] → [Quantize-Dispatch] → [Permute-Experts-Unpermute] → [Combine],然后这里面的每个模块基本上是独立的,其中Router看起来没在这个文件中实现,在其他代码中我记得deepseek_v3.py中有个什么gate()函数,Router应该是在那里做的。 - 然后这个文件实现了三个类:
- FusedMoEPrepareAndFinalize:这个类其实就相当于是做了前处理和后处,前处理包括什么量化和分发,EP通信,后处理应该就是按照topk_weights做加权求和。
- FusedMoEPermuteExpertsUnpermute:这个类相当于是做了最核心的计算,就是矩阵乘法+激活函数+矩阵乘法。
- FusedMoEModularKernel:这个类相当于就是一个接口类,上面的两个类是在这个接口类里面的。
基本上就是上面说的情况。
2 _moe_problem_size
def _moe_problem_size(
a1: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
topk_ids: torch.Tensor,
) -> tuple[int, int, int, int, int]:
"""
Extract the MoE problem size from the given tensor arguments:
- a: The hidden states, input to the MoE layer.
- w1: The first set of expert weights.
- w2: The second set of expert weights.
- topk_ids: The topk ids.
Note: extracting the problem shape from the weight and activation tensors is
not obvious. It needs to be done this way specifically due to subtle issues
with particular kernels, e.g. the int4 kernels divide the trailing dimension
by two, so it's not "correct" to extract N or K from the trailing dimension
of w1 or w2. Similarly, some kernels transpose the weights, so this needs
to be kept in mind.
"""
assert w1.dim() == 3 and w2.dim() == 3
E, N, _ = w1.size()
K = w2.size(1)
if a1.dim() == 2:
# Make sure we are using the correct a1 (pre-permute).
assert topk_ids.size(0) == a1.size(0), \
f"{topk_ids.size(0)} != {a1.size(0)}"
M = a1.size(0)
else:
assert a1.dim() == 3
assert a1.size(0) == E, f"{a1.size(0)} == {E}"
M = a1.size(1) # This is max_num_tokens
assert topk_ids.dim() == 2
topk = topk_ids.size(1)
return E, M, N, K, topk
个人理解:这个函数就是根据输入的几个tensor,获取了下面几个变量应该是
- E:应该是专家个数
- M: 看着像是token数量
- N:MLP里面第一个矩阵乘法之后的维度
- K:MLP里面第二个矩阵乘法之后的维度,这里应该就是恢复到了hidden_size了吧。
topk:可能是每个token选中的专家个数,topk_ids里面应该是每个token选中的哪些专家,
3 class FusedMoEActivationFormat(Enum)
class FusedMoEActivationFormat(Enum):
"""
The standard activation format (num_tokens, hidden dim).
"""
Standard = "standard",
"""
The batched experts format (num experts, max tokens per expert, hidden dim)
"""
BatchedExperts = "batched_experts",
这就是枚举类,里面就是说了两种格式,
- 一种是就直接是所有token就正常的格式,
- 另一种是将token根据专家分拆了,
4 class FusedMoEPrepareAndFinalize(ABC)----负责前处理和后处理
# TODO: pass FusedMoEParallelConfig in as ctor parameter?
class FusedMoEPrepareAndFinalize(ABC):
"""
An abstract base class for the [Quantize-Prepare] and [Finalize] steps
described above.
"""
@abstractmethod
def prepare(
self,
a1: torch.Tensor,
a1_scale: Optional[torch.Tensor],
a2_scale: Optional[torch.Tensor],
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
num_experts: int,
expert_map: Optional[torch.Tensor],
apply_router_weight_on_input: bool,
quant_config: FusedMoEQuantConfig,
) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[torch.Tensor],
Optional[torch.Tensor], Optional[torch.Tensor]]:
"""
Perform any quantization (and/or) dispatching needed
for this kernel.
- a1: The (unquantized) input to the MoE layer.
- a1_scale: Optional scales for a1
- a2_scale: Optional scales for the second MoE gemm. Required to make
sure the quantization is consistent for both gemms.
- topk_ids: The topk ids.
- topk_weights: The topk weights.
- num_experts: The total number of experts in the global expert space.
- expert_map: A tensor mapping expert indices from the global expert
space to the local expert space of the expert parallel shard.
- apply_router_weight_on_input: When True, apply the weights to the
activations, before quantization + dispatching.
Returns a tuple of:
- quantized + dispatched a.
- quantized + dispatched a1_scales.
- Optional tensor as big as number of local experts that contains the
number of tokens assigned to each local expert.
- Optional dispatched expert topk IDs
- Optional dispatched expert topk weight
"""
raise NotImplementedError
@abstractmethod
def finalize(
self,
output: torch.Tensor,
fused_expert_output: torch.Tensor,
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
apply_router_weight_on_input: bool,
) -> None:
"""
Perform any combine plus apply weights and perform a reduction on the
fused experts output.
- output: The output tensor, written in place. Must be (M, K) shape.
- fused_expert_output: The unweighted, unreduced output of the fused
experts, it will have (M, topk, K) shape.
- topk_weights: The weights to be applied to the fused_experts_output.
- topk_ids: The topk_ids.
- apply_router_weight_on_input: When False, apply the weights to
fused_expert_output.
"""
raise NotImplementedError
@property
@abstractmethod
def activation_format(self) -> FusedMoEActivationFormat:
"""
A property indicating the output format of the activations for the
'prepare' method.
"""
raise NotImplementedError
@abstractmethod
def topk_indices_dtype(self) -> Optional[torch.dtype]:
"""
The PrepareFinalize All2All implementations generally constrain the
dtype of the topk_ids they support. This function returns the
required topk indices dtype so it can be respected.
Return None if there are no such restrictions.
"""
raise NotImplementedError
@abstractmethod
def max_num_tokens_per_rank(self) -> Optional[int]:
"""
Some PrepareFinalize All2All implementations are batched. Meaning,
they can processes only as set of tokens at a time. This
function returns the batch size i.e the maximum number of tokens
the implementation can process at a time.
Return None if there are no such restrictions.
"""
raise NotImplementedError
@abstractmethod
def num_dispatchers(self) -> int:
raise NotImplementedError
4.1 prepare函数
个人理解:这就是所谓的前处理,主要就是传入是是原始的tensor,然后在这个函数里面可能是会做量化,以及将token根据不同的专家进行分拆,并且可能根据通信分发到不同的rank上进行处理,输入的几个参数是下面的
- a1: torch.Tensor:这个应该就是要做MOE的原始数据
- a1_scale: Optional[torch.Tensor]:可能是第一个矩阵的量化参数
- a2_scale: Optional[torch.Tensor]:可能是第二个矩阵的量化参数
- topk_weights: torch.Tensor:这就是Router输出的权重得分吧,就是softmax之后的那个
- topk_ids: torch.Tensor:这就是不同token对应要分到那些专家的对应关系
- num_experts: int:专家个数吧
- expert_map: Optional[torch.Tensor]:这个应该是专家要分到哪些rank的对应关系
- apply_router_weight_on_input: bool:是否先乘以权重再做其他处理。
- quant_config: FusedMoEQuantConfig:
4.2 finalize函数
个人理解:这个函数其实就是对每个token,将前面不同专家的输出结果做加权求和,然后最终输出是在output。
4.3 其他几个函数
剩下的其他几个函数相当于是获取一些属性,或者一些参数,配置,
- activation_format:告诉外部 prepare 输出的激活是 Standard 还是 BatchedExperts 布局。
- topk_indices_dtype:告诉外部 topk_ids 需要是什么 dtype,没要求就 None。
- max_num_tokens_per_rank:告诉外部这个实现一次在每个 rank 上最多能处理多少 token,没上限就 None。
5 class FusedMoEPermuteExpertsUnpermute(ABC): 负责具体的计算
5.1 一堆获取属性的函数----不看了
5.2 def workspace_shapes(
@abstractmethod
def workspace_shapes(
self,
a: torch.Tensor,
aq: torch.Tensor,
M: int,
N: int,
K: int,
topk: int,
global_num_experts: int,
local_num_experts: int,
) -> tuple[tuple[int, ...], tuple[int, ...], tuple[int, ...], torch.dtype]:
"""
Compute the shapes for the temporary and final outputs of the two gemms
and activation in the fused expert function. Since the gemms are
independent, the workspace for the first gemm can be shared with the
workspace for the last gemm.
Returns a tuple of:
- workspace13 shape tuple: must be large enough to hold the
result of either expert gemm.
- workspace2 shape tuple: must be large enough to hold the
result of the activation function.
- output shape tuple: must be exact size of the final gemm output.
- Workspace type: The dtype to use for the workspace tensors.
- Note: in order for activation chunking to work, the first dimension
of each tuple must be the number of tokens.
"""
raise NotImplementedError
个人理解:
- 其实就是根据输入的那些什么M N K以及输入tensor,计算出来moe中的两次矩阵乘法的维度信息,计算出来激活函数的维度信息,计算出来整个moeoutput的输出信息。
- 这里由于前后两次矩阵乘法是独立的,所以这里两个矩阵乘法用了同一个workspace。
- 还有一个注意事项,
Note: in order for activation chunking to work, the first dimension of each tuple must be the number of tokens.,这个意思就是第一维永远都是token个数,这样方便根据token做chunk分块。
5.3 def apply(
@abstractmethod
def apply(
self,
output: torch.Tensor,
hidden_states: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
topk_ids: torch.Tensor,
activation: str,
global_num_experts: int,
expert_map: Optional[torch.Tensor],
w1_scale: Optional[torch.Tensor],
w2_scale: Optional[torch.Tensor],
w1_zp: Optional[torch.Tensor],
w2_zp: Optional[torch.Tensor],
a1q_scale: Optional[torch.Tensor],
a2_scale: Optional[torch.Tensor],
workspace13: torch.Tensor,
workspace2: torch.Tensor,
expert_num_tokens: Optional[torch.Tensor],
):
"""
This function computes the intermediate result of a Mixture of Experts
(MoE) layer using two sets of weights, w1 and w2.
Parameters:
- output: (torch.Tensor): The unweighted, unreduced output tensor.
- hidden_states: (torch.Tensor): The (quantized) input tensor to the MoE
layer.
- w1 (torch.Tensor): The first set of expert weights.
- w2 (torch.Tensor): The second set of expert weights.
- topk_ids (torch.Tensor): A map of row to expert id.
- activation (str): The activation function to apply after the first
MoE layer.
- global_num_experts (int): The total number of experts in the global
expert space.
- expert_map (Optional[torch.Tensor]): A tensor mapping expert indices
from the global expert space to the local expert space of the expert
parallel shard.
- w1_scale (Optional[torch.Tensor]): Optional scale to be used for w1.
- w2_scale (Optional[torch.Tensor]): Optional scale to be used for w2.
- w1_zp (Optional[torch.Tensor]): Optional zero points to be used for
w1.
- w2_zp (Optional[torch.Tensor]): Optional zero points to be used for
w2.
- a1q_scale (Optional[torch.Tensor]): Optional quantized scale to be
used for a1.
- a2_scale (Optional[torch.Tensor]): Optional scale to be used for a2.
- workspace13 (torch.Tensor): A scratch tensor used for gemm outputs
must be large enough to hold output of either MoE gemm.
- workspace2 (torch.Tensor): A scratch tensor used for the activation
function.
- expert_num_tokens: An optional tensor containing the number of tokens
assigned to each expert when using batched experts format input.
"""
raise NotImplementedError
个人理解:
- 这个函数就是真正做MLP计算的函数了,将前面做完prepare(量化、分发、 EP通信)之后的数据,然后他做一次矩阵乘法+激活函数+第二次矩阵乘法.
- 这里的output是未做加权求和的结果,加权求和是在finalize做的。
6 class FusedMoEModularKernel(torch.nn.Module):
个人理解:
-
整体作用:这是把前面两个抽象类真正“串起来”的封装类:
成员里会持有一个 FusedMoEPrepareAndFinalize 实例 + 一个 FusedMoEPermuteExpertsUnpermute 实例;然后调用forward函数做具体的计算,其中forward里面其实就是调用了这两个子类的具体函数。 -
在 forward(…) 里调用顺序固定为:
-
prepare_finalize.prepare(…)(量化 + 分发 + EP 通信);
-
_fused_experts(函数,
- 调用fused_experts.workspace_shapes(…) 算 workspace 形状并分配workspace13/workspace2/fused_out;
- 其实这里面有个看起来容易混淆的东西,就是_fused_experts函数和fused_experts子类,他们的关系是这样的, _fused_experts 这个函数负责:分 chunk、分配 workspace,然后在循环里间接调用子类实例 self.fused_experts.apply(…),做真正的 w1/silu/w2 计算。
-
prepare_finalize.finalize(…)(乘topk_weights、做加权求和,写入最终 output)。
-
更多推荐


所有评论(0)