aibrix和envoy gateway的交互
AIBrix使用Envoy的扩展机制实现智能路由:✅请求拦截: 在请求到达后端前拦截✅智能路由: 基于KV缓存、负载等因素选择最优Pod✅透明代理: 对客户端完全透明✅指标收集: 收集响应指标用于后续决策✅错误处理: 统一的错误响应机制这种架构实现了控制平面(Go)与数据平面(Envoy)的解耦,提供了灵活的扩展能力。// Router 只是一个接口,不是服务// QueueRouter 也是接口
aibrix和envoy gateway的交互
aibrix通过github.com/envoyproxy/go-control-plane
包与envoy gateway进行交互。
梳理所有使用 github.com/envoyproxy/go-control-plane
包的代码逻辑:
以下代码片段和实际项目代码有所出入,主要体现主流程。
一、核心交互文件
1. cmd/plugins/main.go - Gateway插件入口
// File: E:\src\aibrix\cmd\plugins\main.go
import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
// ... 其他导入
)
func main() {
// 创建gRPC服务器
grpcServer := grpc.NewServer()
// 注册External Processing服务
extProcPb.RegisterExternalProcessorServer(grpcServer, gatewayServer)
// 启动服务监听
grpcServer.Serve(lis)
}
作用: 启动Gateway插件的gRPC服务,实现Envoy的External Processing协议(一个gRPC API)
2. pkg/plugins/gateway/gateway.go - 核心处理逻辑
// File: E:\src\aibrix\pkg\plugins\gateway\gateway.go
import (
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
)
// 实现ExternalProcessor接口
func (s *Server) Process(stream extProcPb.ExternalProcessor_ProcessServer) error {
for {
// 接收来自Envoy的请求
req, err := stream.Recv()
if err == io.EOF {
// Envoy关闭gRPC stream,成功结束处理。
return nil
}
if err != nil {
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
}
// 根据请求类型处理
switch v := req.Request.(type) {
case *extProcPb.ProcessingRequest_RequestHeaders:
// 处理请求头
resp := s.handleRequestHeaders(v.RequestHeaders)
case *extProcPb.ProcessingRequest_RequestBody:
// 处理请求体
resp := s.handleRequestBody(v.RequestBody)
case *extProcPb.ProcessingRequest_ResponseHeaders:
// 处理响应头
resp := s.handleResponseHeaders(v.ResponseHeaders)
}
stream.Send(resp)
}
}
核心流程:
-
建立与Envoy的双向流式gRPC连接,这个stream中,他们之间进行了多轮交互。
-
接收Envoy发送的处理请求
-
根据请求信息类型(Headers/Body/Response)执行不同逻辑
-
返回处理响应给Envoy
-
处理完这几轮交互后,Envoy关闭这个gRPC stream, 退出Process函数。
3. pkg/plugins/gateway/gateway_req_headers.go - 请求头处理
// File: E:\src\aibrix\pkg\plugins\gateway\gateway_req_headers.go
func (s *Server) handleRequestHeaders(
headers *extProcPb.HttpHeaders,
) *extProcPb.ProcessingResponse {
// 提取路由策略
routingStrategy, _ := getRoutingStrategy(headers.Headers.Headers)
// 返回响应对象,用于告诉Envoy继续处理请求体
return &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_RequestHeaders{
RequestHeaders: &extProcPb.HeadersResponse{
Response: &extProcPb.CommonResponse{
HeaderMutation: &extProcPb.HeaderMutation{
SetHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: HeaderWentIntoReqHeaders,
RawValue: []byte("true"),
},
},
},
},
ClearRouteCache: true,
},
},
},
}
}
关键操作:
-
提取自定义Header(如
routing-strategy
),在变量中保存相关信息,在后续的交互中发送给Envoy -
提取Header:user、authorization,在变量中保存相关信息,在后续的交互中发送给Envoy
-
设置响应的Header,返回响应Envoy的对象。
4. pkg/plugins/gateway/gateway_req_body.go - 请求体处理与路由决策
// File: E:\src\aibrix\pkg\plugins\gateway\gateway_req_body.go
func (s *Server) handleRequestBody(
body *extProcPb.HttpBody,
) *extProcPb.ProcessingResponse {
// 1. 解析请求体(OpenAI格式)
model, message, stream := validateRequestBody(body.Body)
// 2. 执行路由算法
targetPod := s.routingAlgorithm.Route(
model,
message,
routingStrategy,
)
// 3. 设置目标Pod的Header
return &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_RequestBody{
RequestBody: &extProcPb.BodyResponse{
Response: &extProcPb.CommonResponse{
HeaderMutation: &extProcPb.HeaderMutation{
SetHeaders: [ ]*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: "target-pod",
Value: targetPod.IP,
},
},
},
},
},
},
},
}
}
核心逻辑:
-
解析OpenAI API请求(chat/completions或completions)
-
提取model、message等信息
-
调用路由算法选择目标Pod
-
通过Header告诉Envoy转发到哪个Pod
二、完整交互流程
Router是aibrix的一个代码包
三、关键数据结构
Envoy网关定义的的protobuf
调用方向:aibrix的plugin实现此API,Envoy网关发起调用。
ProcessingRequest (Envoy → Plugin)
message ProcessingRequest {
oneof request {
HttpHeaders request_headers = 1;
HttpBody request_body = 2;
HttpHeaders response_headers = 3;
HttpBody response_body = 4;
}
}
ProcessingResponse (Plugin → Envoy)
message ProcessingResponse {
oneof response {
HeadersResponse request_headers = 1;
BodyResponse request_body = 2;
HeadersResponse response_headers = 3;
ImmediateResponse immediate_response = 4;
}
}
总结
AIBrix使用Envoy的External Processing扩展机制实现智能路由:
-
✅ 请求拦截: 在请求到达后端前拦截
-
✅ 智能路由: 基于KV缓存、负载等因素选择最优Pod
-
✅ 透明代理: 对客户端完全透明
-
✅ 指标收集: 收集响应指标用于后续决策
-
✅ 错误处理: 统一的错误响应机制
这种架构实现了控制平面(Go)与数据平面(Envoy)的解耦,提供了灵活的扩展能力。
四、扩展说明:
Router 是 AIBrix 的一个 Go 包
Router 不是独立的服务,它是 AIBrix Gateway Plugin 内部的一个代码模块。
架构层次
┌─────────────────────────────────────────────────────┐
│ Kubernetes Pod: envoy-gateway-system │
│ ┌───────────────────────────────────────────────┐ │
│ │ Container 1: Envoy Proxy (C++) │ │
│ │ - 监听 80/443 端口 │ │
│ │ - 处理HTTP请求 │ │
│ └───────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ Container 2: Shutdown Manager │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
↓ gRPC调用
┌─────────────────────────────────────────────────────┐
│ Kubernetes Pod: aibrix-system │
│ ┌───────────────────────────────────────────────┐ │
│ │ Container: aibrix-gateway-plugins (Go) │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ main.go (gRPC Server) │ │ │
│ │ │ - 实现 ExternalProcessor 接口 │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ gateway.Server │ │ │
│ │ │ - handleRequestHeaders() │ │ │
│ │ │ - handleRequestBody() │ │ │
│ │ │ - handleResponseHeaders() │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ Router (代码包) │ │ │ ← 这里!
│ │ │ pkg/plugins/gateway/algorithms/ │ │ │
│ │ │ - prefix_cache.go │ │ │
│ │ │ - least_request.go │ │ │
│ │ │ - throughput.go │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
代码结构证明
1. Router 是一个接口定义
// File: E:\src\aibrix\pkg\types\router.go
package types
// Router 只是一个接口,不是服务
type Router interface {
Route(ctx *RoutingContext) (*Pod, error)
}
// QueueRouter 也是接口
type QueueRouter interface {
Router
GetQueueStatus() QueueStatus
}
2. Router 的具体实现都是 Go 包
// File: E:\src\aibrix\pkg\plugins\gateway\algorithms\prefix_cache.go
package algorithms
import (
"github.com/vllm-project/aibrix/pkg/types"
"github.com/vllm-project/aibrix/pkg/cache"
)
// PrefixCacheRouter 是一个结构体,不是服务
type PrefixCacheRouter struct {
cache cache.KVCacheManager
}
// 实现 Router 接口
func (r *PrefixCacheRouter) Route(
ctx *types.RoutingContext,
) (*types.Pod, error) {
// 路由逻辑
return selectedPod, nil
}
// 注册到全局 Registry
func init() {
Register(types.PrefixCache, NewPrefixCacheRouter)
}
3. Router 在 Gateway Plugin 中被调用
// File: E:\src\aibrix\pkg\plugins\gateway\gateway_req_body.go
package gateway
import (
"github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms"
"github.com/vllm-project/aibrix/pkg/types"
)
type Server struct {
// Router 是 Server 的一个字段
router types.Router
}
func (s *Server) handleRequestBody(
body *extProcPb.HttpBody,
) *extProcPb.ProcessingResponse {
// 1. 解析请求
model, message := parseRequest(body.Body)
// 2. 调用 Router (函数调用,不是服务调用)
targetPod, err := s.router.Route(&types.RoutingContext{
Model: model,
Message: message,
})
// 3. 返回结果给 Envoy
return buildResponse(targetPod)
}
部署架构
查看实际运行的 Pod
# File: E:\src\aibrix\docs\source\features\gateway-plugins.rst
$ kubectl get pods -n aibrix-system
NAME READY STATUS RESTARTS AGE
aibrix-gateway-plugins-6bd9fcd5b9-2bwpr 1/1 Running 0 22m ← 这个Pod包含Router代码
aibrix-controller-manager-fb4495448-j9k6g 1/1 Running 0 22m
aibrix-metadata-service-9d4cd7f77-mq7tr 1/1 Running 0 22m
$ kubectl get pods -n envoy-gateway-system
NAME READY STATUS RESTARTS AGE
envoy-aibrix-system-aibrix-eg-903790dc-84ccfcbc6b-hw2lq 2/2 Running 0 13m ← Envoy Proxy
envoy-gateway-7c7659ffc9-rvm5s 1/1 Running 0 16m
代码包结构
pkg/
├── plugins/
│ └── gateway/
│ ├── gateway.go ← Server 主逻辑
│ ├── gateway_req_body.go ← 调用 Router
│ └── algorithms/ ← Router 实现包
│ ├── prefix_cache.go ← PrefixCacheRouter
│ ├── least_request.go ← LeastRequestRouter
│ ├── throughput.go ← ThroughputRouter
│ └── registry.go ← Router 注册表
│
├── types/
│ └── router.go ← Router 接口定义
│
└── cache/
└── model.go ← KVCacheManager (Router依赖)
配置文件证明
# File: E:\src\aibrix\config\manager\manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: aibrix-gateway-plugins
namespace: aibrix-system
spec:
template:
spec:
containers:
- name: gateway-plugins
image: aibrix/gateway-plugins:latest
command:
- /aibrix-gateway-plugins ← 单个二进制文件
env:
- name: AIBRIX_ROUTING_ALGORITHM
value: "prefix-cache" ← 配置默认Router算法
总结
-
✅ Router 是 代码包,不是服务
-
✅ Router 运行在 aibrix-gateway-plugins Pod 中
-
✅ Router 通过 函数调用 被使用,不是 gRPC/HTTP 调用
-
✅ 多个 Router 实现可以在运行时切换 (通过 Header)
更多推荐
所有评论(0)