AI 查询计划生成:当强化学习接管 Join 顺序,优化器还靠谱吗?
AI 查询计划生成:当强化学习接管 Join 顺序,优化器还靠谱吗?
一、Join 顺序的搜索空间爆炸:传统优化器的穷举极限
多表 Join 的顺序选择是查询优化中最经典的 NP-Hard 问题。N 张表的 Join 顺序有 (2N-2)!! 种可能(双阶乘),10 张表就有超过 170 万种排列。传统优化器通过动态规划(DP)枚举所有可能的 Join 顺序,在 N <= 15 时尚可应对,但超过 15 张表后 DP 的内存和时间开销指数级增长,优化器不得不退回到贪心搜索,而贪心搜索无法保证找到全局最优。
更棘手的是,DP 假设各表之间的 Join 是独立的,但实际上 Join 的代价高度依赖上下文。表 A 和表 B 先 Join 的代价,取决于后续与表 C 的 Join 方式——如果 C 只需要 A 的子集,那么先过滤 A 再与 B Join 可能更优。这种"后续决策影响当前最优选择"的依赖关系,正是动态规划无法处理的盲区。
强化学习(RL)的思路是:将 Join 顺序选择建模为序列决策问题,每一步选择一张表加入 Join 序列,通过与环境交互学习最优策略。RL 不需要枚举所有可能,而是通过策略网络直接输出高概率的 Join 顺序,搜索复杂度从指数级降为多项式级。
二、基于强化学习的 Join 顺序优化架构
RL 优化器的架构分为训练阶段和推理阶段,两者解耦运行。
flowchart TB
subgraph 训练阶段
A[查询工作负载<br/>历史执行记录] --> B[环境模拟器<br/>模拟各 Join 顺序的执行代价]
B --> C[状态编码器<br/>编码当前 Join 状态]
C --> D[策略网络<br/>输出下一张表的选择概率]
D --> E[动作选择<br/>选择下一张表加入 Join]
E --> B
B --> F[奖励计算<br/>负执行耗时]
F --> G[策略梯度更新<br/>REINFORCE / PPO]
G --> D
end
subgraph 推理阶段
H[新查询输入] --> I[状态编码器<br/>编码查询特征]
I --> J[训练好的策略网络<br/>输出 Join 顺序概率分布]
J --> K[Top-K 采样<br/>取概率最高的 K 个顺序]
K --> L[代价模型评估<br/>用传统成本模型验证]
L --> M[最优 Join 顺序输出]
end
style D fill:#e53e3e,color:#fff
style G fill:#e53e3e,color:#fff
style J fill:#38a169,color:#fff
style L fill:#dd6b20,color:#fff
训练阶段:RL Agent 在模拟环境中反复尝试不同的 Join 顺序,根据执行代价获得奖励信号,通过策略梯度更新网络参数。环境模拟器使用历史执行记录构建代价模型,避免在真实数据库上试错。
推理阶段:新查询输入后,策略网络输出 Join 顺序的概率分布,取 Top-K 个候选顺序,用传统成本模型验证后选择最优。这种"RL 生成 + 成本模型验证"的混合策略,既利用了 RL 的搜索效率,又保留了成本模型的安全兜底。
三、生产级 RL Join 优化器的核心实现
3.1 查询状态编码
import torch
import torch.nn as nn
import numpy as np
from typing import List, Set
class QueryStateEncoder(nn.Module):
"""
将当前 Join 状态编码为神经网络输入向量
状态包括:已加入 Join 的表集合、剩余可选表、当前中间结果估计大小
"""
def __init__(self, num_tables: int, embed_dim: int = 64):
super().__init__()
self.num_tables = num_tables
self.embed_dim = embed_dim
# 每张表的嵌入向量,编码表的统计特征
self.table_embedding = nn.Embedding(num_tables, embed_dim)
# 全连接层:将状态特征映射为固定维度向量
self.state_fc = nn.Sequential(
nn.Linear(embed_dim * 2 + 3, 128),
nn.ReLU(),
nn.Linear(128, embed_dim)
)
def forward(self, joined_tables: List[int],
remaining_tables: List[int],
intermediate_size: float,
estimated_cost: float,
selectivity: float) -> torch.Tensor:
"""
编码当前 Join 状态
joined_tables: 已加入 Join 的表索引列表
remaining_tables: 剩余可选表索引列表
intermediate_size: 当前中间结果的估计行数
estimated_cost: 当前累计代价
selectivity: 当前累计选择率
"""
# 已加入表的嵌入均值
if len(joined_tables) > 0:
joined_emb = self.table_embedding(
torch.tensor(joined_tables, dtype=torch.long))
joined_repr = joined_emb.mean(dim=0)
else:
joined_repr = torch.zeros(self.embed_dim)
# 剩余表的嵌入均值
if len(remaining_tables) > 0:
remaining_emb = self.table_embedding(
torch.tensor(remaining_tables, dtype=torch.long))
remaining_repr = remaining_emb.mean(dim=0)
else:
remaining_repr = torch.zeros(self.embed_dim)
# 标量特征:中间结果大小、累计代价、累计选择率
scalar_feat = torch.tensor([
np.log1p(intermediate_size), # 对数缩放,避免数值溢出
np.log1p(estimated_cost),
selectivity
], dtype=torch.float32)
# 拼接所有特征
state_vec = torch.cat([joined_repr, remaining_repr, scalar_feat])
return self.state_fc(state_vec)
3.2 策略网络与动作选择
class JoinOrderPolicy(nn.Module):
"""
Join 顺序策略网络
输入当前状态,输出下一张表的选择概率分布
"""
def __init__(self, num_tables: int, embed_dim: int = 64):
super().__init__()
self.num_tables = num_tables
self.state_encoder = QueryStateEncoder(num_tables, embed_dim)
# 策略头:从状态向量输出每张表的选择概率
self.policy_head = nn.Sequential(
nn.Linear(embed_dim, 128),
nn.ReLU(),
nn.Linear(128, num_tables),
nn.Softmax(dim=-1)
)
# 价值头:估计当前状态的价值函数(用于 PPO)
self.value_head = nn.Sequential(
nn.Linear(embed_dim, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
def forward(self, state: torch.Tensor,
valid_actions: List[int]) -> tuple:
"""
输出有效动作的概率分布和状态价值
valid_actions: 当前可选择的表索引列表
"""
# 编码状态
state_repr = self.state_encoder(
state['joined'], state['remaining'],
state['intermediate_size'],
state['estimated_cost'],
state['selectivity'])
# 计算所有表的概率
all_probs = self.policy_head(state_repr)
# 屏蔽无效动作(已加入 Join 的表不可再选)
mask = torch.zeros(self.num_tables)
for a in valid_actions:
mask[a] = 1.0
masked_probs = all_probs * mask
# 重新归一化
prob_sum = masked_probs.sum()
if prob_sum > 1e-8:
masked_probs = masked_probs / prob_sum
else:
# 所有有效动作概率为零时,均匀分布
for a in valid_actions:
masked_probs[a] = 1.0 / len(valid_actions)
# 状态价值
value = self.value_head(state_repr)
return masked_probs, value
def select_action(self, state: dict,
valid_actions: List[int],
epsilon: float = 0.1) -> int:
"""
epsilon-greedy 动作选择
epsilon 概率随机探索,1-epsilon 概率选择最优动作
"""
if np.random.random() < epsilon:
return np.random.choice(valid_actions)
with torch.no_grad():
probs, _ = self.forward(state, valid_actions)
# 从概率分布中采样
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item()
3.3 PPO 训练循环
class JoinOrderTrainer:
"""
使用 PPO 算法训练 Join 顺序策略网络
"""
def __init__(self, policy: JoinOrderPolicy,
lr: float = 3e-4,
gamma: float = 0.99,
clip_epsilon: float = 0.2):
self.policy = policy
self.optimizer = torch.optim.Adam(policy.parameters(), lr=lr)
self.gamma = gamma
self.clip_epsilon = clip_epsilon
def train_episode(self, env, num_steps: int) -> dict:
"""
训练一个 episode
env: Join 顺序环境模拟器,提供 step() 和 reset() 接口
"""
state = env.reset()
trajectories = [] # 存储 (state, action, reward, log_prob, value)
for step in range(num_steps):
valid_actions = env.get_valid_actions()
probs, value = self.policy(state, valid_actions)
# 采样动作
dist = torch.distributions.Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
# 执行动作,获取奖励
next_state, reward, done = env.step(action.item())
trajectories.append({
'state': state,
'action': action,
'reward': reward,
'log_prob': log_prob,
'value': value.item(),
'valid_actions': valid_actions
})
state = next_state
if done:
break
# 计算折扣回报
returns = self._compute_returns(trajectories)
# PPO 更新
loss = self._ppo_update(trajectories, returns)
return {
'loss': loss,
'episode_reward': sum(t['reward'] for t in trajectories),
'num_steps': len(trajectories)
}
def _compute_returns(self, trajectories: list) -> list:
"""计算折扣回报"""
returns = []
G = 0
for t in reversed(trajectories):
# 奖励 = 负执行耗时(耗时越短奖励越高)
G = t['reward'] + self.gamma * G
returns.insert(0, G)
return returns
def _ppo_update(self, trajectories: list, returns: list) -> float:
"""PPO 策略梯度更新"""
total_loss = 0.0
for t, G in zip(trajectories, returns):
# 重新计算当前策略下的 log_prob
probs, value = self.policy(
t['state'], t['valid_actions'])
dist = torch.distributions.Categorical(probs)
new_log_prob = dist.log_prob(t['action'])
# 重要性采样比率
ratio = torch.exp(new_log_prob - t['log_prob'])
# 优势函数
advantage = G - t['value']
# PPO 裁剪目标
surr1 = ratio * advantage
surr2 = torch.clamp(
ratio, 1 - self.clip_epsilon,
1 + self.clip_epsilon) * advantage
policy_loss = -torch.min(surr1, surr2)
# 价值函数损失
value_loss = (G - value) ** 2
# 熵正则化:鼓励探索
entropy = dist.entropy()
loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
self.optimizer.zero_grad()
loss.backward()
# 梯度裁剪,防止梯度爆炸
torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(trajectories)
四、RL 优化器的工程风险:训练不收敛与推理不可解释
RL Join 优化器在学术 benchmark 上表现优异,但生产落地面临严峻的工程挑战。
训练不收敛:RL 的训练稳定性远不如监督学习。策略梯度方法的方差极大,同样的超参数在不同工作负载上可能表现天差地别。PPO 的裁剪机制缓解了策略崩溃问题,但不能保证收敛到全局最优。更关键的是,训练需要大量样本——每个 episode 对应一次完整的 Join 顺序选择和代价评估,如果环境模拟器不够准确,Agent 学到的策略在真实环境中可能完全失效。
推理延迟:策略网络的前向推理需要毫秒级延迟,但传统优化器的 DP 搜索在表数较少时也是毫秒级。RL 的优势只在表数超过 15 时才显现,而大多数生产查询的 Join 表数不超过 10。在 Join 表数少的场景下,RL 的推理开销反而是负担。
策略退化:RL 策略可能收敛到局部最优——对训练工作负载表现良好,但对未见过的查询模式表现极差。这种"过拟合"在 RL 中比监督学习更难检测,因为测试集的查询模式可能完全不在训练分布内。
适用边界:RL Join 优化器适合查询模式固定、Join 表数多(>10)、且工作负载可重复的场景(如数据仓库的固定报表查询)。对于即席查询场景,RL 策略的泛化能力不足,应退回传统 DP 或贪心搜索。混合策略——RL 生成 Top-K 候选 + 传统成本模型验证——是当前最可行的生产方案。
五、总结
基于强化学习的 Join 顺序优化将组合搜索问题建模为序列决策问题,通过策略网络直接输出高概率的 Join 顺序,搜索复杂度从指数级降为多项式级。PPO 算法通过裁剪重要性采样比率和价值函数约束,提升了训练稳定性。但 RL 优化器的工程风险不容忽视:训练不收敛、推理延迟在少表场景下不占优、策略退化导致对未见查询泛化能力差。生产实践中,RL 优化器应与传统成本模型协同工作——RL 负责生成 Top-K 候选 Join 顺序,成本模型负责验证和选择最优方案。这种"RL 生成 + 成本验证"的混合架构,既利用了 RL 的搜索效率,又保留了传统优化器的安全兜底能力,是 RL 查询优化走向生产的可行路径。
更多推荐


所有评论(0)