AI 查询计划生成:当强化学习接管 Join 顺序,优化器还靠谱吗?

一、Join 顺序的搜索空间爆炸:传统优化器的穷举极限

多表 Join 的顺序选择是查询优化中最经典的 NP-Hard 问题。N 张表的 Join 顺序有 (2N-2)!! 种可能(双阶乘),10 张表就有超过 170 万种排列。传统优化器通过动态规划(DP)枚举所有可能的 Join 顺序,在 N <= 15 时尚可应对,但超过 15 张表后 DP 的内存和时间开销指数级增长,优化器不得不退回到贪心搜索,而贪心搜索无法保证找到全局最优。

更棘手的是,DP 假设各表之间的 Join 是独立的,但实际上 Join 的代价高度依赖上下文。表 A 和表 B 先 Join 的代价,取决于后续与表 C 的 Join 方式——如果 C 只需要 A 的子集,那么先过滤 A 再与 B Join 可能更优。这种"后续决策影响当前最优选择"的依赖关系,正是动态规划无法处理的盲区。

强化学习(RL)的思路是:将 Join 顺序选择建模为序列决策问题,每一步选择一张表加入 Join 序列,通过与环境交互学习最优策略。RL 不需要枚举所有可能,而是通过策略网络直接输出高概率的 Join 顺序,搜索复杂度从指数级降为多项式级。

二、基于强化学习的 Join 顺序优化架构

RL 优化器的架构分为训练阶段和推理阶段,两者解耦运行。

flowchart TB
    subgraph 训练阶段
        A[查询工作负载<br/>历史执行记录] --> B[环境模拟器<br/>模拟各 Join 顺序的执行代价]
        B --> C[状态编码器<br/>编码当前 Join 状态]
        C --> D[策略网络<br/>输出下一张表的选择概率]
        D --> E[动作选择<br/>选择下一张表加入 Join]
        E --> B
        B --> F[奖励计算<br/>负执行耗时]
        F --> G[策略梯度更新<br/>REINFORCE / PPO]
        G --> D
    end

    subgraph 推理阶段
        H[新查询输入] --> I[状态编码器<br/>编码查询特征]
        I --> J[训练好的策略网络<br/>输出 Join 顺序概率分布]
        J --> K[Top-K 采样<br/>取概率最高的 K 个顺序]
        K --> L[代价模型评估<br/>用传统成本模型验证]
        L --> M[最优 Join 顺序输出]
    end

    style D fill:#e53e3e,color:#fff
    style G fill:#e53e3e,color:#fff
    style J fill:#38a169,color:#fff
    style L fill:#dd6b20,color:#fff

训练阶段:RL Agent 在模拟环境中反复尝试不同的 Join 顺序,根据执行代价获得奖励信号,通过策略梯度更新网络参数。环境模拟器使用历史执行记录构建代价模型,避免在真实数据库上试错。

推理阶段:新查询输入后,策略网络输出 Join 顺序的概率分布,取 Top-K 个候选顺序,用传统成本模型验证后选择最优。这种"RL 生成 + 成本模型验证"的混合策略,既利用了 RL 的搜索效率,又保留了成本模型的安全兜底。

三、生产级 RL Join 优化器的核心实现

3.1 查询状态编码

import torch
import torch.nn as nn
import numpy as np
from typing import List, Set

class QueryStateEncoder(nn.Module):
    """
    将当前 Join 状态编码为神经网络输入向量
    状态包括:已加入 Join 的表集合、剩余可选表、当前中间结果估计大小
    """

    def __init__(self, num_tables: int, embed_dim: int = 64):
        super().__init__()
        self.num_tables = num_tables
        self.embed_dim = embed_dim

        # 每张表的嵌入向量,编码表的统计特征
        self.table_embedding = nn.Embedding(num_tables, embed_dim)

        # 全连接层:将状态特征映射为固定维度向量
        self.state_fc = nn.Sequential(
            nn.Linear(embed_dim * 2 + 3, 128),
            nn.ReLU(),
            nn.Linear(128, embed_dim)
        )

    def forward(self, joined_tables: List[int],
                remaining_tables: List[int],
                intermediate_size: float,
                estimated_cost: float,
                selectivity: float) -> torch.Tensor:
        """
        编码当前 Join 状态
        joined_tables: 已加入 Join 的表索引列表
        remaining_tables: 剩余可选表索引列表
        intermediate_size: 当前中间结果的估计行数
        estimated_cost: 当前累计代价
        selectivity: 当前累计选择率
        """
        # 已加入表的嵌入均值
        if len(joined_tables) > 0:
            joined_emb = self.table_embedding(
                torch.tensor(joined_tables, dtype=torch.long))
            joined_repr = joined_emb.mean(dim=0)
        else:
            joined_repr = torch.zeros(self.embed_dim)

        # 剩余表的嵌入均值
        if len(remaining_tables) > 0:
            remaining_emb = self.table_embedding(
                torch.tensor(remaining_tables, dtype=torch.long))
            remaining_repr = remaining_emb.mean(dim=0)
        else:
            remaining_repr = torch.zeros(self.embed_dim)

        # 标量特征:中间结果大小、累计代价、累计选择率
        scalar_feat = torch.tensor([
            np.log1p(intermediate_size),  # 对数缩放,避免数值溢出
            np.log1p(estimated_cost),
            selectivity
        ], dtype=torch.float32)

        # 拼接所有特征
        state_vec = torch.cat([joined_repr, remaining_repr, scalar_feat])
        return self.state_fc(state_vec)

3.2 策略网络与动作选择

class JoinOrderPolicy(nn.Module):
    """
    Join 顺序策略网络
    输入当前状态,输出下一张表的选择概率分布
    """

    def __init__(self, num_tables: int, embed_dim: int = 64):
        super().__init__()
        self.num_tables = num_tables
        self.state_encoder = QueryStateEncoder(num_tables, embed_dim)

        # 策略头:从状态向量输出每张表的选择概率
        self.policy_head = nn.Sequential(
            nn.Linear(embed_dim, 128),
            nn.ReLU(),
            nn.Linear(128, num_tables),
            nn.Softmax(dim=-1)
        )

        # 价值头:估计当前状态的价值函数(用于 PPO)
        self.value_head = nn.Sequential(
            nn.Linear(embed_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, state: torch.Tensor,
                valid_actions: List[int]) -> tuple:
        """
        输出有效动作的概率分布和状态价值
        valid_actions: 当前可选择的表索引列表
        """
        # 编码状态
        state_repr = self.state_encoder(
            state['joined'], state['remaining'],
            state['intermediate_size'],
            state['estimated_cost'],
            state['selectivity'])

        # 计算所有表的概率
        all_probs = self.policy_head(state_repr)

        # 屏蔽无效动作(已加入 Join 的表不可再选)
        mask = torch.zeros(self.num_tables)
        for a in valid_actions:
            mask[a] = 1.0
        masked_probs = all_probs * mask

        # 重新归一化
        prob_sum = masked_probs.sum()
        if prob_sum > 1e-8:
            masked_probs = masked_probs / prob_sum
        else:
            # 所有有效动作概率为零时,均匀分布
            for a in valid_actions:
                masked_probs[a] = 1.0 / len(valid_actions)

        # 状态价值
        value = self.value_head(state_repr)

        return masked_probs, value

    def select_action(self, state: dict,
                      valid_actions: List[int],
                      epsilon: float = 0.1) -> int:
        """
        epsilon-greedy 动作选择
        epsilon 概率随机探索,1-epsilon 概率选择最优动作
        """
        if np.random.random() < epsilon:
            return np.random.choice(valid_actions)

        with torch.no_grad():
            probs, _ = self.forward(state, valid_actions)
            # 从概率分布中采样
            dist = torch.distributions.Categorical(probs)
            action = dist.sample()
            return action.item()

3.3 PPO 训练循环

class JoinOrderTrainer:
    """
    使用 PPO 算法训练 Join 顺序策略网络
    """

    def __init__(self, policy: JoinOrderPolicy,
                 lr: float = 3e-4,
                 gamma: float = 0.99,
                 clip_epsilon: float = 0.2):
        self.policy = policy
        self.optimizer = torch.optim.Adam(policy.parameters(), lr=lr)
        self.gamma = gamma
        self.clip_epsilon = clip_epsilon

    def train_episode(self, env, num_steps: int) -> dict:
        """
        训练一个 episode
        env: Join 顺序环境模拟器,提供 step() 和 reset() 接口
        """
        state = env.reset()
        trajectories = []  # 存储 (state, action, reward, log_prob, value)

        for step in range(num_steps):
            valid_actions = env.get_valid_actions()
            probs, value = self.policy(state, valid_actions)

            # 采样动作
            dist = torch.distributions.Categorical(probs)
            action = dist.sample()
            log_prob = dist.log_prob(action)

            # 执行动作,获取奖励
            next_state, reward, done = env.step(action.item())

            trajectories.append({
                'state': state,
                'action': action,
                'reward': reward,
                'log_prob': log_prob,
                'value': value.item(),
                'valid_actions': valid_actions
            })

            state = next_state
            if done:
                break

        # 计算折扣回报
        returns = self._compute_returns(trajectories)

        # PPO 更新
        loss = self._ppo_update(trajectories, returns)

        return {
            'loss': loss,
            'episode_reward': sum(t['reward'] for t in trajectories),
            'num_steps': len(trajectories)
        }

    def _compute_returns(self, trajectories: list) -> list:
        """计算折扣回报"""
        returns = []
        G = 0
        for t in reversed(trajectories):
            # 奖励 = 负执行耗时(耗时越短奖励越高)
            G = t['reward'] + self.gamma * G
            returns.insert(0, G)
        return returns

    def _ppo_update(self, trajectories: list, returns: list) -> float:
        """PPO 策略梯度更新"""
        total_loss = 0.0

        for t, G in zip(trajectories, returns):
            # 重新计算当前策略下的 log_prob
            probs, value = self.policy(
                t['state'], t['valid_actions'])
            dist = torch.distributions.Categorical(probs)
            new_log_prob = dist.log_prob(t['action'])

            # 重要性采样比率
            ratio = torch.exp(new_log_prob - t['log_prob'])

            # 优势函数
            advantage = G - t['value']

            # PPO 裁剪目标
            surr1 = ratio * advantage
            surr2 = torch.clamp(
                ratio, 1 - self.clip_epsilon,
                1 + self.clip_epsilon) * advantage
            policy_loss = -torch.min(surr1, surr2)

            # 价值函数损失
            value_loss = (G - value) ** 2

            # 熵正则化:鼓励探索
            entropy = dist.entropy()

            loss = policy_loss + 0.5 * value_loss - 0.01 * entropy

            self.optimizer.zero_grad()
            loss.backward()
            # 梯度裁剪,防止梯度爆炸
            torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
            self.optimizer.step()

            total_loss += loss.item()

        return total_loss / len(trajectories)

四、RL 优化器的工程风险:训练不收敛与推理不可解释

RL Join 优化器在学术 benchmark 上表现优异,但生产落地面临严峻的工程挑战。

训练不收敛:RL 的训练稳定性远不如监督学习。策略梯度方法的方差极大,同样的超参数在不同工作负载上可能表现天差地别。PPO 的裁剪机制缓解了策略崩溃问题,但不能保证收敛到全局最优。更关键的是,训练需要大量样本——每个 episode 对应一次完整的 Join 顺序选择和代价评估,如果环境模拟器不够准确,Agent 学到的策略在真实环境中可能完全失效。

推理延迟:策略网络的前向推理需要毫秒级延迟,但传统优化器的 DP 搜索在表数较少时也是毫秒级。RL 的优势只在表数超过 15 时才显现,而大多数生产查询的 Join 表数不超过 10。在 Join 表数少的场景下,RL 的推理开销反而是负担。

策略退化:RL 策略可能收敛到局部最优——对训练工作负载表现良好,但对未见过的查询模式表现极差。这种"过拟合"在 RL 中比监督学习更难检测,因为测试集的查询模式可能完全不在训练分布内。

适用边界:RL Join 优化器适合查询模式固定、Join 表数多(>10)、且工作负载可重复的场景(如数据仓库的固定报表查询)。对于即席查询场景,RL 策略的泛化能力不足,应退回传统 DP 或贪心搜索。混合策略——RL 生成 Top-K 候选 + 传统成本模型验证——是当前最可行的生产方案。

五、总结

基于强化学习的 Join 顺序优化将组合搜索问题建模为序列决策问题,通过策略网络直接输出高概率的 Join 顺序,搜索复杂度从指数级降为多项式级。PPO 算法通过裁剪重要性采样比率和价值函数约束,提升了训练稳定性。但 RL 优化器的工程风险不容忽视:训练不收敛、推理延迟在少表场景下不占优、策略退化导致对未见查询泛化能力差。生产实践中,RL 优化器应与传统成本模型协同工作——RL 负责生成 Top-K 候选 Join 顺序,成本模型负责验证和选择最优方案。这种"RL 生成 + 成本验证"的混合架构,既利用了 RL 的搜索效率,又保留了传统优化器的安全兜底能力,是 RL 查询优化走向生产的可行路径。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐