aibrix和envoy gateway的交互

aibrix通过github.com/envoyproxy/go-control-plane包与envoy gateway进行交互。

梳理所有使用 github.com/envoyproxy/go-control-plane 包的代码逻辑:
以下代码片段和实际项目代码有所出入,主要体现主流程。

一、核心交互文件

1. cmd/plugins/main.go - Gateway插件入口

// File: E:\src\aibrix\cmd\plugins\main.go
import (
    extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
    // ... 其他导入
)

func main() {
    // 创建gRPC服务器
    grpcServer := grpc.NewServer()
    
    // 注册External Processing服务
    extProcPb.RegisterExternalProcessorServer(grpcServer, gatewayServer)
    
    // 启动服务监听
    grpcServer.Serve(lis)
}

作用: 启动Gateway插件的gRPC服务,实现Envoy的External Processing协议(一个gRPC API)


2. pkg/plugins/gateway/gateway.go - 核心处理逻辑

// File: E:\src\aibrix\pkg\plugins\gateway\gateway.go
import (
    configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
    envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
)

// 实现ExternalProcessor接口
func (s *Server) Process(stream extProcPb.ExternalProcessor_ProcessServer) error {
    for {
         // 接收来自Envoy的请求
        req, err := stream.Recv()
        if err == io.EOF {
          // Envoy关闭gRPC stream,成功结束处理。
          return nil
        }
        if err != nil {
          return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
        }
        // 根据请求类型处理
        switch v := req.Request.(type) {
        case *extProcPb.ProcessingRequest_RequestHeaders:
            // 处理请求头
            resp := s.handleRequestHeaders(v.RequestHeaders)
            
        case *extProcPb.ProcessingRequest_RequestBody:
            // 处理请求体
            resp := s.handleRequestBody(v.RequestBody)
            
        case *extProcPb.ProcessingRequest_ResponseHeaders:
            // 处理响应头
            resp := s.handleResponseHeaders(v.ResponseHeaders)
        }
        stream.Send(resp)
    }
}

核心流程:

  1. 建立与Envoy的双向流式gRPC连接,这个stream中,他们之间进行了多轮交互。

  2. 接收Envoy发送的处理请求

  3. 根据请求信息类型(Headers/Body/Response)执行不同逻辑

  4. 返回处理响应给Envoy

  5. 处理完这几轮交互后,Envoy关闭这个gRPC stream, 退出Process函数。


3. pkg/plugins/gateway/gateway_req_headers.go - 请求头处理

// File: E:\src\aibrix\pkg\plugins\gateway\gateway_req_headers.go
func (s *Server) handleRequestHeaders(
    headers *extProcPb.HttpHeaders,
) *extProcPb.ProcessingResponse {
    
    // 提取路由策略
    routingStrategy, _ := getRoutingStrategy(headers.Headers.Headers)
    
    // 返回响应对象,用于告诉Envoy继续处理请求体
    return &extProcPb.ProcessingResponse{
		Response: &extProcPb.ProcessingResponse_RequestHeaders{
			RequestHeaders: &extProcPb.HeadersResponse{
				Response: &extProcPb.CommonResponse{
					HeaderMutation: &extProcPb.HeaderMutation{
						SetHeaders: []*configPb.HeaderValueOption{
							{
								Header: &configPb.HeaderValue{
									Key:      HeaderWentIntoReqHeaders,
									RawValue: []byte("true"),
								},
							},
						},
					},
					ClearRouteCache: true,
				},
			},
		},
	}
}

关键操作:

  • 提取自定义Header(如routing-strategy),在变量中保存相关信息,在后续的交互中发送给Envoy

  • 提取Header:user、authorization,在变量中保存相关信息,在后续的交互中发送给Envoy

  • 设置响应的Header,返回响应Envoy的对象。


4. pkg/plugins/gateway/gateway_req_body.go - 请求体处理与路由决策

// File: E:\src\aibrix\pkg\plugins\gateway\gateway_req_body.go
func (s *Server) handleRequestBody(
    body *extProcPb.HttpBody,
) *extProcPb.ProcessingResponse {
    
    // 1. 解析请求体(OpenAI格式)
    model, message, stream := validateRequestBody(body.Body)
    
    // 2. 执行路由算法
    targetPod := s.routingAlgorithm.Route(
        model, 
        message, 
        routingStrategy,
    )
    
    // 3. 设置目标Pod的Header
    return &extProcPb.ProcessingResponse{
        Response: &extProcPb.ProcessingResponse_RequestBody{
            RequestBody: &extProcPb.BodyResponse{
                Response: &extProcPb.CommonResponse{
                    HeaderMutation: &extProcPb.HeaderMutation{

                        SetHeaders: [ ]*configPb.HeaderValueOption{

                            {
                                Header: &configPb.HeaderValue{
                                    Key:   "target-pod",
                                    Value: targetPod.IP,
                                },
                            },
                        },
                    },
                },
            },
        },
    }
}

核心逻辑:

  1. 解析OpenAI API请求(chat/completions或completions)

  2. 提取model、message等信息

  3. 调用路由算法选择目标Pod

  4. 通过Header告诉Envoy转发到哪个Pod


二、完整交互流程

Client Envoy GatewayPlugin Router InferencePod POST /v1/chat/completions ProcessingRequest_RequestHeaders 验证用户、提取路由策略 ProcessingResponse(继续处理Body) ProcessingRequest_RequestBody 解析OpenAI请求 执行路由算法 返回目标Pod ProcessingResponse(设置target-pod Header) 转发请求到选定Pod 返回推理结果 ProcessingRequest_ResponseHeaders 记录指标、更新缓存状态 ProcessingResponse(继续) 返回最终响应 Client Envoy GatewayPlugin Router InferencePod

Router是aibrix的一个代码包

三、关键数据结构

Envoy网关定义的的protobuf
调用方向:aibrix的plugin实现此API,Envoy网关发起调用。

ProcessingRequest (Envoy → Plugin)

message ProcessingRequest {
  oneof request {
    HttpHeaders request_headers = 1;
    HttpBody request_body = 2;
    HttpHeaders response_headers = 3;
    HttpBody response_body = 4;
  }
}

ProcessingResponse (Plugin → Envoy)

message ProcessingResponse {
  oneof response {
    HeadersResponse request_headers = 1;
    BodyResponse request_body = 2;
    HeadersResponse response_headers = 3;
    ImmediateResponse immediate_response = 4;
  }
}

总结

AIBrix使用Envoy的External Processing扩展机制实现智能路由:

  1. 请求拦截: 在请求到达后端前拦截

  2. 智能路由: 基于KV缓存、负载等因素选择最优Pod

  3. 透明代理: 对客户端完全透明

  4. 指标收集: 收集响应指标用于后续决策

  5. 错误处理: 统一的错误响应机制

这种架构实现了控制平面(Go)与数据平面(Envoy)的解耦,提供了灵活的扩展能力。

四、扩展说明:

Router 是 AIBrix 的一个 Go 包

Router 不是独立的服务,它是 AIBrix Gateway Plugin 内部的一个代码模块。


架构层次

┌─────────────────────────────────────────────────────┐
│  Kubernetes Pod: envoy-gateway-system               │
│  ┌───────────────────────────────────────────────┐  │
│  │  Container 1: Envoy Proxy (C++)               │  │
│  │  - 监听 80/443 端口                           │  │
│  │  - 处理HTTP请求                               │  │
│  └───────────────────────────────────────────────┘  │
│                                                     │
│  ┌───────────────────────────────────────────────┐  │
│  │  Container 2: Shutdown Manager                │  │
│  └───────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────┘
                        ↓ gRPC调用
┌─────────────────────────────────────────────────────┐
│  Kubernetes Pod: aibrix-system                      │
│  ┌───────────────────────────────────────────────┐  │
│  │  Container: aibrix-gateway-plugins (Go)       │  │
│  │  ┌─────────────────────────────────────────┐  │  │
│  │  │  main.go (gRPC Server)                  │  │  │
│  │  │  - 实现 ExternalProcessor 接口          │  │  │
│  │  └─────────────────────────────────────────┘  │  │
│  │                                               │  │
│  │  ┌─────────────────────────────────────────┐  │  │
│  │  │  gateway.Server                         │  │  │
│  │  │  - handleRequestHeaders()               │  │  │
│  │  │  - handleRequestBody()                  │  │  │
│  │  │  - handleResponseHeaders()              │  │  │
│  │  └─────────────────────────────────────────┘  │  │
│  │                                               │  │
│  │  ┌─────────────────────────────────────────┐  │  │
│  │  │  Router (代码包)                        │  │  │ ← 这里!
│  │  │  pkg/plugins/gateway/algorithms/        │  │  │
│  │  │  - prefix_cache.go                      │  │  │
│  │  │  - least_request.go                     │  │  │
│  │  │  - throughput.go                        │  │  │
│  │  └─────────────────────────────────────────┘  │  │
│  └───────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────┘


代码结构证明

1. Router 是一个接口定义
// File: E:\src\aibrix\pkg\types\router.go
package types

// Router 只是一个接口,不是服务
type Router interface {
    Route(ctx *RoutingContext) (*Pod, error)
}

// QueueRouter 也是接口
type QueueRouter interface {
    Router
    GetQueueStatus() QueueStatus
}


2. Router 的具体实现都是 Go 包
// File: E:\src\aibrix\pkg\plugins\gateway\algorithms\prefix_cache.go
package algorithms

import (
    "github.com/vllm-project/aibrix/pkg/types"
    "github.com/vllm-project/aibrix/pkg/cache"
)

// PrefixCacheRouter 是一个结构体,不是服务
type PrefixCacheRouter struct {
    cache cache.KVCacheManager
}

// 实现 Router 接口
func (r *PrefixCacheRouter) Route(
    ctx *types.RoutingContext,
) (*types.Pod, error) {
    // 路由逻辑
    return selectedPod, nil
}

// 注册到全局 Registry
func init() {
    Register(types.PrefixCache, NewPrefixCacheRouter)
}


3. Router 在 Gateway Plugin 中被调用
// File: E:\src\aibrix\pkg\plugins\gateway\gateway_req_body.go
package gateway

import (
    "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms"
    "github.com/vllm-project/aibrix/pkg/types"
)

type Server struct {
    // Router 是 Server 的一个字段
    router types.Router
}

func (s *Server) handleRequestBody(
    body *extProcPb.HttpBody,
) *extProcPb.ProcessingResponse {
    
    // 1. 解析请求
    model, message := parseRequest(body.Body)
    
    // 2. 调用 Router (函数调用,不是服务调用)
    targetPod, err := s.router.Route(&types.RoutingContext{
        Model:   model,
        Message: message,
    })
    
    // 3. 返回结果给 Envoy
    return buildResponse(targetPod)
}


部署架构

查看实际运行的 Pod
# File: E:\src\aibrix\docs\source\features\gateway-plugins.rst
$ kubectl get pods -n aibrix-system

NAME                                        READY   STATUS    RESTARTS   AGE
aibrix-gateway-plugins-6bd9fcd5b9-2bwpr     1/1     Running   0          22m  ← 这个Pod包含Router代码
aibrix-controller-manager-fb4495448-j9k6g   1/1     Running   0          22m
aibrix-metadata-service-9d4cd7f77-mq7tr     1/1     Running   0          22m

$ kubectl get pods -n envoy-gateway-system

NAME                                                      READY   STATUS    RESTARTS   AGE
envoy-aibrix-system-aibrix-eg-903790dc-84ccfcbc6b-hw2lq   2/2     Running   0          13m  ← Envoy Proxy
envoy-gateway-7c7659ffc9-rvm5s                            1/1     Running   0          16m


代码包结构

pkg/
├── plugins/
│   └── gateway/
│       ├── gateway.go              ← Server 主逻辑
│       ├── gateway_req_body.go     ← 调用 Router
│       └── algorithms/             ← Router 实现包
│           ├── prefix_cache.go     ← PrefixCacheRouter
│           ├── least_request.go    ← LeastRequestRouter
│           ├── throughput.go       ← ThroughputRouter
│           └── registry.go         ← Router 注册表
│
├── types/
│   └── router.go                   ← Router 接口定义
│
└── cache/
    └── model.go                    ← KVCacheManager (Router依赖)


配置文件证明

# File: E:\src\aibrix\config\manager\manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: aibrix-gateway-plugins
  namespace: aibrix-system
spec:
  template:
    spec:
      containers:
      - name: gateway-plugins
        image: aibrix/gateway-plugins:latest
        command:
        - /aibrix-gateway-plugins  ← 单个二进制文件
        env:
        - name: AIBRIX_ROUTING_ALGORITHM
          value: "prefix-cache"     ← 配置默认Router算法


总结

  • ✅ Router 是 代码包,不是服务

  • ✅ Router 运行在 aibrix-gateway-plugins Pod

  • ✅ Router 通过 函数调用 被使用,不是 gRPC/HTTP 调用

  • ✅ 多个 Router 实现可以在运行时切换 (通过 Header)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐