作者:海天一色y

关键词:RLHF、Reward Model、DeepSpeed、Pairwise Ranking Loss、DeepSpeedChat

一、前言:为什么需要Reward Model?

在大型语言模型(LLM)的RLHF(Reinforcement Learning from Human Feedback)训练流程中,第二阶段——Reward Model(奖励模型)的训练是承上启下的关键环节:

  SFT (第一阶段) → Reward Model (第二阶段) → RLHF/PPO (第三阶段)
       ↑                ↑                      ↑
    模仿学习         学习人类偏好              策略优化

Reward Model 的核心目标是学习人类的价值偏好,为第三阶段的强化学习提供可靠的奖励信号。本文将深入剖析 DeepSpeedChat 中 Reward Model 的完整实现,包括数据处理、模型架构、损失函数设计以及分布式训练实战。


二、Reward Model 的数据处理机制

2.1 数据格式与来源

Reward Model 训练需要成对的偏好数据,每条数据包含:

  • Prompt:相同的输入提示

  • Chosen:人类偏好的回答(高质量)

  • Rejected:人类不喜欢的回答(低质量)

  # 数据示例格式
  {
      "prompt": "请解释什么是机器学习?",
      "chosen": "机器学习是人工智能的一个分支,它使计算机能够从数据中学习模式...",  # 优选回答
      "rejected": "机器学习就是机器在学习,具体我也不太清楚。"  # 劣选回答
  }

2.2 核心数据处理流程(data_utils.py)

create_dataset_split 函数中,针对第二阶段(train_phase == 2)有专门的处理逻辑:

  def create_dataset_split(current_dataset, raw_dataset, train_phase, tokenizer, 
                          end_of_conversation_token, max_seq_len):
      prompt_dataset = []
      chosen_dataset = []
      reject_dataset = []
      
      # ==================== 第二阶段:Reward Model 训练 ====================
      elif train_phase == 2:
          for i, tmp_data in enumerate(current_dataset):
              # 1. 提取符合人类喜好的问答对 (prompt, chosen)
              chosen_sentence = tmp_data['prompt'] + tmp_data['chosen']
              # 2. 提取不符合人类喜好的问答对 (prompt, reject)  
              reject_sentence = tmp_data['prompt'] + tmp_data['rejected']
              
              if chosen_sentence is not None and reject_sentence is not None:
                  # 3. 添加结束符,确保模型知道序列边界
                  chosen_sentence += end_of_conversation_token
                  reject_sentence += end_of_conversation_token
                  
                  # 4. Tokenizer 处理:将文本转为数字 ID
                  chosen_token = tokenizer(
                      chosen_sentence,
                      max_length=max_seq_len,
                      padding="max_length",
                      truncation=True,
                      return_tensors="pt"
                  )
                  reject_token = tokenizer(
                      reject_sentence,
                      max_length=max_seq_len,
                      padding="max_length", 
                      truncation=True,
                      return_tensors="pt"
                  )
                  
                  # 5. 提取 input_ids 和 attention_mask,去除 batch 维度
                  chosen_token["input_ids"] = chosen_token["input_ids"].squeeze(0)
                  chosen_token["attention_mask"] = chosen_token["attention_mask"].squeeze(0)
                  reject_token["input_ids"] = reject_token["input_ids"].squeeze(0)
                  reject_token["attention_mask"] = reject_token["attention_mask"].squeeze(0)
                  
                  chosen_dataset.append(chosen_token)
                  reject_dataset.append(reject_token)

2.3 数据返回格式

PromptDataset 类的 __getitem__ 方法定义了第二阶段的数据返回格式:

  def __getitem__(self, idx):
      # 第二阶段 Reward Model 训练返回数据的格式
      elif self.train_phase == 2:
          return (
              self.chosen_dataset[idx]["input_ids"],
              self.chosen_dataset[idx]["attention_mask"],
              self.reject_dataset[idx]["input_ids"], 
              self.reject_dataset[idx]["attention_mask"]
          )

关键设计:返回4个张量,分别对应 chosen 和 rejected 的 input_ids 与 attention_mask,便于后续组成 pair 计算 ranking loss。

2.4 DataCollator 的巧妙设计

在第二阶段,DataCollatorReward 类会将一个 batch 的数据组装成拼接形式

  # 假设 batch_size = 4
  # 输入数据: [chosen_1, chosen_2, chosen_3, chosen_4, rejected_1, rejected_2, rejected_3, rejected_4]
  # 实际 batch_size 变为 8,但在模型内部会再切分为两半处理

这种设计使得同一个 batch 内天然包含配对关系,便于计算 pairwise loss。


三、Reward Model 的模型架构

3.1 整体架构设计

Reward Model 由两部分组成:

  1. Base Model(主干网络):预训练的语言模型(如 Qwen、OPT、LLaMA 等)

  2. Value Head(价值头):新增的线性层,将 hidden states 映射为标量奖励值

  class RewardModel(nn.Module):
      def __init__(self, base_model, tokenizer, num_padding_at_beginning=0):
          super().__init__()
          self.config = base_model.config
          self.num_padding_at_beginning = num_padding_at_beginning
          
          # 适配不同模型的输出维度
          if hasattr(self.config, "word_embed_proj_dim"):
              # OPT 模型使用 word_embed_proj_dim
              self.v_head = nn.Linear(self.config.word_embed_proj_dim, 1, bias=False)
          else:
              # 其他模型使用 hidden_size 或 n_embd
              self.config.n_embd = self.config.hidden_size if hasattr(self.config, "hidden_size") else self.config.n_embd
              self.v_head = nn.Linear(self.config.n_embd, 1, bias=False)
          
          # 主干网络 + 价值头
          self.rwtransformer = base_model
          self.PAD_ID = tokenizer.pad_token_id

架构示意图

  Input Tokens → [Base Model] → Hidden States (batch_size, seq_len, hidden_size) 
                                      ↓
                                [V_Head Linear] 
                                      ↓
                                Rewards (batch_size, seq_len)  # 每个位置一个分数

3.2 关键超参数说明

参数 作用 典型值
num_padding_at_beginning 处理前缀 padding(如 OPT 模型) 1 或 0
hidden_size / n_embd 主干网络隐藏层维度 768/1024/4096
word_embed_proj_dim OPT 模型特殊投影维度 与 hidden_size 相同

四、核心:Pairwise Ranking Loss 的实现

4.1 损失函数的理论基础

Reward Model 使用 Bradley-Terry 模型 的变体,通过成对比较学习人类偏好:

\mathcal{L}_{rank} = -\log\sigma(r_\theta(x, y_c) - r_\theta(x, y_r))

其中:

  • y_c:人类偏好的回答(chosen)

  • y_r:人类不喜欢的回答(rejected)

  • \sigma:sigmoid 函数

直观理解:模型需要学会给 chosen 打高分,给 rejected 打低分,且差距越大越好。

4.2 关键代码实现(forward 函数)

  def forward(self, input_ids=None, past_key_values=None, attention_mask=None, ...):
      # ==================== 1. 主干网络前向传播 ====================
      transformer_outputs = self.rwtranrsformer(
          input_ids=input_ids,
          past_key_values=past_key_values,
          attention_mask=attention_mask,
          inputs_embeds=inputs_embeds,
          use_cache=use_cache,
          **kwargs
      )
      # hidden_states.shape: (batch_size * 2, max_seq_len, hidden_size)
      hidden_states = transformer_outputs[0]
      
      # ==================== 2. 价值头预测每个位置的分数 ====================
      # rewards.shape: (batch_size * 2, max_seq_len)
      rewards = self.v_head(hidden_states).squeeze(-1)
      
      # ==================== 3. 切分 chosen 和 rejected ====================
      bs = input_ids.shape[0] // 2  # 实际 batch_size 是输入的一半
      seq_len = input_ids.shape[1]
      
      chosen_ids = input_ids[:bs]
      rejected_ids = input_ids[bs:]
      chosen_rewards = rewards[:bs]
      rejected_rewards = rewards[bs:]
      
      # ==================== 4. 计算 Pairwise Ranking Loss ====================
      loss = 0
      for i in range(bs):
          chosen_id = chosen_ids[i]
          rejected_id = rejected_ids[i]
          chosen_reward = chosen_rewards[i]
          rejected_reward = rejected_rewards[i]
          
          # --- 4.1 找到 chosen 序列中第一个 pad_token 的位置 ---
          # 例如: sentence = [1,2,3,4,5,6,0,0,0,0], pad_token=0
          # c_ind = 6 (第一个 pad 的位置)
          c_inds = (chosen_id == self.PAD_ID).nonzero()
          c_ind = c_inds[self.num_padding_at_beginning].item() if len(c_inds) > self.num_padding_at_beginning else seq_len
          
          # --- 4.2 找到 rejected 序列中第一个 pad_token 的位置 ---
          r_inds = (rejected_id == self.PAD_ID).nonzero()
          r_ind = r_inds[self.num_padding_at_beginning].item() if len(r_inds) > self.num_padding_at_beginning else seq_len
          
          # --- 4.3 找到两者回答开始分叉的位置(关键!)---
          # divergence_ind: chosen 和 rejected 第一个不同的地方
          # 即 response 中两个回答自由发挥的第一个 token 的 index
          check_divergence = (chosen_id != rejected_id).nonzero()
          
          if len(check_divergence) == 0:
              end_ind = rejected_reward.size(-1)
              divergence_ind = end_ind - 1
              r_ind = c_ind
          else:
              end_ind = max(c_ind, r_ind)
              divergence_ind = check_divergence[0]
          
          # --- 4.4 提取"对齐部分"的奖励(核心!)---
          # 对齐部分 = 非 prompt 部分 + 非 padding 部分 + 长度对齐
          # 示例: 
          #   prompt: [1,2,3]
          #   chosen: [1,2,3,4,5,6,0,0,0,0]
          #   reject: [1,2,3,7,8,0,0,0,0,0]
          #   chosen_truncated: [4,5,6] (divergence_ind=3, end_ind=6)
          #   reject_truncated: [7,8,0] (补零对齐)
          
          c_truncated_reward = chosen_reward[divergence_ind:end_ind]
          r_truncated_reward = rejected_reward[divergence_ind:end_ind]
          
          # --- 4.5 取最后一个有效 token 的分数作为整体分数(策略选择)---
          # 注:DeepSpeed 论文说明这是开放性策略,也可用平均、加权平均等
          chosen_mean_scores.append(chosen_reward[c_ind - 1])
          rejected_mean_scores.append(rejected_reward[r_ind - 1])
          
          # --- 4.6 计算 Ranking Loss ---
          # 对对齐部分的每个位置计算 sigmoid(log(c_truncated - r_truncated))
          loss += -torch.nn.functional.logsigmoid(c_truncated_reward - r_truncated_reward).mean()
      
      loss = loss / bs  # 平均

4.3 对齐部分(Truncated)处理的精妙之处

  # 示例说明(max_seq_len=10, pad_token_id=0)
  prompt:          [1, 2, 3]
  chosen_sentence: [1, 2, 3, 4, 5, 6, 0, 0, 0, 0]  # 回答: [4,5,6]
  reject_sentence: [1, 2, 3, 7, 8, 0, 0, 0, 0, 0]  # 回答: [7,8]
  ​
  # 处理步骤:
  # 1. 找到 divergence_ind = 3 (prompt后的第一个位置,即4 vs 7)
  # 2. c_ind = 6 (chosen中第一个pad的位置)
  # 3. r_ind = 5 (reject中第一个pad的位置)  
  # 4. end_ind = max(6, 5) = 6
  # 5. 对齐部分:
  #    chosen_truncated: chosen_reward[3:6] → 对应 token [4,5,6] 的奖励
  #    reject_truncated: reject_reward[3:6] → 对应 token [7,8,0] 的奖励 (补零对齐)

为什么需要对齐?

  • 确保比较的是相同位置的语义内容

  • 避免 padding 部分和 prompt 部分干扰损失计算

  • 处理不同长度回答的公平比较


五、训练流程与评估指标

5.1 主训练循环(main.py)

  def main():
      # ... 初始化代码 ...
      
      # 创建 Reward Model(critic_model 是 Reward Model 的副本)
      rm_model = create_critic_model(
          model_name_or_path=args.model_name_or_path,
          tokenizer=tokenizer,
          ds_config=ds_config,
          num_padding_at_beginning=args.num_padding_at_beginning,
          rlhf_training=False,  # 第二阶段为 False,第三阶段为 True
          disable_dropout=args.disable_dropout,
          zero_stage=args.zero_stage
      )
      
      # 设置训练阶段为 2
      train_phase = 2
      
      # 创建数据集
      train_dataset, eval_dataset = create_dataset(
          args.local_rank, args.dataset_name, args.data_split,
          args.output_path, train_phase, args.seed, tokenizer,
          end_of_conversation_token, args.max_seq_len
      )
      
      # DataLoader 配置
      data_collator = DataCollatorReward()
      train_dataloader = DataLoader(
          train_dataset,
          collate_fn=data_collator,
          sampler=train_sampler,
          batch_size=args.per_device_train_batch_size
      )
      
      # 优化器与调度器
      optimizer_grouped_parameters = get_optimizer_grouped_parameters(
          rm_model, args.weight_decay, args.lora_learning_rate
      )
      optimizer = AdamOptimizer(optimizer_grouped_parameters, lr=args.learning_rate)
      lr_scheduler = get_scheduler(...)
      
      # DeepSpeed 封装
      rm_model, optimizer, _, lr_scheduler = deepspeed.initialize(
          model=rm_model,
          optimizer=optimizer,
          args=args,
          lr_scheduler=lr_scheduler,
          config=ds_config
      )
      
      # ==================== 训练循环 ====================
      for epoch in range(args.num_train_epochs):
          rm_model.train()
          mean_loss = 0
          
          for step, batch in enumerate(train_dataloader):
              batch = to_device(batch, device)
              outputs = rm_model(**batch, use_cache=False)
              
              loss = outputs["loss"]
              rm_model.backward(loss)
              rm_model.step()
              
              mean_loss += loss.item()
              
          # 每个 epoch 评估
          reward_score, acc = evaluation_reward(rm_model, eval_dataloader)
          print(f"chosen_last_scores: {reward_score}, acc: {acc}")

5.2 评估指标详解

  def evaluation_reward(model, eval_dataloader):
      model.eval()
      correct_predictions = 0
      total_predictions = 0
      scores = 0  # 累加 chosen 分数
      
      for step, batch in enumerate(eval_dataloader):
          with torch.no_grad():
              outputs = model(**batch)
              
          chosen = outputs["chosen_mean_scores"]      # (batch_size,)
          rejected = outputs["rejected_mean_scores"]  # (batch_size,)
          
          # 关键指标 1:Accuracy(排序正确率)
          # chosen 分数 > rejected 分数 视为预测正确
          correct_predictions += (chosen > rejected).sum().item()
          total_predictions += chosen.size(0)
          
          # 关键指标 2:Chosen Mean Score(越高越好)
          scores += chosen.mean().item()
      
      acc = correct_predictions / total_predictions
      scores = scores / (step + 1)
      
      return scores, acc

5.3 关键评估指标说明

指标 含义 目标值 说明
chosen_last_scores 优选回答的平均分数 越高越好 训练初期可能为负,后期应显著为正
rejected_last_scores 劣选回答的平均分数 越低越好 应与 chosen 分数拉开差距
acc 排序正确率 接近 1.0 模型正确判断偏好的比例

六、实战:单卡与多卡训练

6.1 单卡 4090 训练配置(Qwen3-0.6B)

  #!/bin/bash
  ​
  deepspeed --num_gpus 1 main.py \
      --model_name_or_path /hy-tmp/Qwen3-0.6B \
      --data_path /hy-tmp/DeepSpeedExamples-master/applications/DeepSpeedChat/dschat/utils/data/data/train.jsonl \
      --num_padding_at_beginning 1 \
      --weight_decay 0.1 \
      --dropout 0.0 \
      --gradient_accumulation_steps 4 \
      --per_device_train_batch_size 1 \
      --per_device_eval_batch_size 1 \
      --zero_stage 2 \
      --enable_tensorboard \
      --tensorboard_path ./output \
      --deepspeed \
      --output_dir ./output

训练日志示例

  *****Running training *****
  *****Evaluating reward, Epoch 0/1*****
  chosen_last_scores (higher is better): -0.13518264889717102
  rejected_last_scores (lower is better): 0.01801300048828125
  acc (higher is better): 0.4099999964237213  # 随机水平,尚未学习
  ​
  Beginning of Epoch 1/1, Total Micro Batches 2000
  Epoch 1/1 with loss 0.15901985424757004
  ​
  *****Evaluating reward, Epoch 1/1*****
  chosen_last_scores (higher is better): 15.501015663146973      # 显著提升!
  rejected_last_scores (lower is better): -23.632709503173828    # 显著降低!
  acc (higher is better): 0.9899999499320984                     # 接近完美!
  ​
  saving model ...
  验证通过! 成功加载了 311 个张量。

关键观察

  • 初始阶段:acc ≈ 0.41(接近随机),分数接近 0(未学习)

  • 训练后:acc ≈ 0.99,chosen 分数 (+15.5) >> rejected 分数 (-23.6),差距约 39 分

6.2 多卡 4×4090 训练配置(Qwen3-4B)

  #!/bin/bash
  ​
  deepspeed --num_gpus 4 main.py \
      --model_name_or_path /hy-tmp/Qwen3-4B \
      --data_path /hy-tmp/DeepSpeedExamples-master/applications/DeepSpeedChat/dschat/utils/data/data/train.jsonl \
      --num_padding_at_beginning 1 \
      --weight_decay 0.1 \
      --dropout 0.0 \
      --gradient_accumulation_steps 4 \
      --per_device_train_batch_size 1 \
      --per_device_eval_batch_size 1 \
      --zero_stage 2 \
      --enable_tensorboard \
      --tensorboard_path ./output \
      --deepspeed \
      --output_dir ./output

GPU 显存占用监控

GPU 显存占用 功率
0 38029 MiB / 49140 MiB 128W/450W
1 38037 MiB / 49140 MiB 128W/450W
2 38077 MiB / 49140 MiB 117W/450W
3 38037 MiB / 49140 MiB 102W/450W

多卡训练结果

  *****Evaluating reward, Epoch 0/1*****
  chosen_last_scores: -0.5199811458587646
  rejected_last_scores: -0.43560153245925903
  acc: 0.44999998807907104
  ​
  Beginning of Epoch 1/1, Total Micro Batches 50
  Epoch 1/1 with loss 0.46155920267105105
  ​
  *****Evaluating reward, Epoch 1/1*****
  chosen_last_scores: -18.122968673706055     # 注意:可以是负值,关键是相对大小
  rejected_last_scores: -27.339218139648438
  acc: 0.99  # 具体数值被截断,但接近 1.0

关于负值的说明:Reward Model 学习的是相对分数,绝对值可为负。关键是 chosen (-18.1) > rejected (-27.3),差距约 9.2 分


七、模型保存与加载

7.1 保存的文件结构

训练完成后,输出目录包含:

  output/
  ├── added_tokens.json          # 新增 token 配置
  ├── chat_template.jinja        # 对话模板
  ├── config.json                # 模型配置
  ├── ds_tensorboard_logs/       # 训练日志
  ├── merges.txt                 # BPE 合并规则
  ├── model.safetensors          # 模型权重(主要文件,约 1.2GB/8GB)
  ├── special_tokens_map.json   # 特殊 token 映射
  ├── tokenizer_config.json      # 分词器配置
  ├── tokenizer.json            # 分词器词汇表
  └── vocab.json                 # 词表文件

7.2 SafeTensors 格式支持

针对 Qwen3 等新型模型,DeepSpeedChat 需要适配 safetensors 格式保存:

  # 在 main.py 中的保存逻辑
  if args.global_rank == 0:
      # 标准格式保存(非 safetensors)
      # save_hf_format(rm_model, tokenizer, args)
      
      # Qwen3 适配:safetensors 格式
      save_hf_format_safetensors(rm_model, tokenizer, args)
  ​
  # ZeRO-3 阶段特殊处理
  if args.zero_stage == 3:
      save_zero_three_model(rm_model, args.output_dir)

八、进阶:与第三阶段的衔接

8.1 Reward Model 的推理模式(forward_value)

除了训练用的 forward,Reward Model 还实现了第三阶段的推理接口 forward_value

  def forward_value(self, input_ids, attention_mask, ..., return_value_only=False, prompt_length=0):
      """
      与 forward 的区别:
      - forward: 处理 chosen-rejected pair,计算 ranking loss
      - forward_value: 处理单个输入,直接返回分值(用于 PPO 的奖励计算)
      """
      transformer_outputs = self.rwtranrsformer(...)
      hidden_states = transformer_outputs[0]
      values = self.v_head(hidden_states).squeeze(-1)  # (batch_size, seq_len)
      
      if return_value_only:
          return values  # 返回每个位置的分数
      
      # 否则返回最后一个有效 token 的分数(作为整体奖励)
      chosen_end_scores = []
      for i in range(batch_size):
          input_id = input_ids[i]
          value = values[i]
          
          # 找到 prompt 后的第一个 pad
          c_inds = (input_id[prompt_length:] == self.PAD_ID).nonzero()
          c_ind = c_inds[0].item() + prompt_length if len(c_inds) > 0 else seq_len
          
          # 取 answer 最后一个 token 的分数
          chosen_end_scores.append(value[c_ind - 1])
      
      return {
          "values": values,                    # 每个位置的分数
          "chosen_end_scores": torch.stack(chosen_end_scores)  # 最终奖励
      }

8.2 Critic Model 的初始化

在第三阶段(RLHF),Critic Model 本质上是 Reward Model 的副本:

  def create_critic_model(...):
      # 1. 创建基础模型(与 Reward Model 相同架构)
      critic_model = create_hf_model(AutoModel, model_name_or_path, ...)
      
      # 2. 包装为 RewardModel 类
      critic_model = RewardModel(critic_model, tokenizer, num_padding_at_beginning)
      
      # 3. 加载第二阶段训练好的权重
      if rlhf_training:
          model_ckpt_state_dict = torch.load(model_ckpt_path, map_location='cpu')
          load_state_dict_into_model(critic_model, model_ckpt_state_dict, "", zero_stage=zero_stage)
      
      return critic_model

九、常见问题与调试技巧

9.1 评估时遇到的报错

问题:单独运行 run_eval.sh 时可能出现分布式初始化错误。

解决:临时注释掉以下两行(仅单卡评估时):

  # if torch.distributed.get_rank() == 0:
  #     print(f">Creating model from_config took {end - start} seconds")

9.2 Loss 不下降或 Acc 不提升

现象 可能原因 解决方案
Loss 震荡 学习率过高 降低 learning_rate 至 1e-5 或 5e-6
Acc stuck 在 0.5 数据质量问题 检查 chosen/rejected 是否有明显偏好差异
显存溢出 序列过长 减小 max_seq_len 或启用 gradient checkpointing
分数差距小 模型容量不足 尝试更大基座模型或增加训练轮数

9.3 ZeRO 阶段选择建议

ZeRO Stage 适用场景 显存节省 通信开销
ZeRO-1 单卡/小模型 优化器状态分片
ZeRO-2(推荐) 中等规模 优化器+梯度分片
ZeRO-3 超大模型 参数+优化器+梯度分片

十、总结

Reward Model 是 RLHF 流程中的价值锚点,其训练质量直接决定第三阶段强化学习的效果。通过本文的深入剖析,我们掌握了:

  1. 数据处理:成对偏好数据的构造与对齐策略

  2. 模型架构:Base Model + Value Head 的组合设计

  3. 损失函数:Pairwise Ranking Loss 的精妙实现与对齐处理

  4. 训练实战:从单卡到多卡的完整配置与监控

  5. 阶段衔接:如何为 RLHF/PPO 提供可靠的奖励信号

关键心得:Reward Model 学习的是相对偏好而非绝对分数,因此关注 chosen 与 rejected 的差距比关注绝对值更重要。一个优秀的 Reward Model 应该具备高准确率(acc > 0.95)和显著的分数区分度(差距 > 10 分)。


参考资源

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐