在分析蹦从瓷瞬参考verl对dapo的实现时,首先需要查看入口脚本文件.sh与.py。相关文件位于./recipe/dapo/目录下,其结构如下所示:
.
├── config
│ ├── dapo_megatron_trainer.yaml
│ └── dapo_trainer.yaml
├── dapo_ray_trainer.py
├── main_dapo.py
├── prepare_dapo_data.sh
├── README.md
├── run_dapo_qwen2.5_32b.sh
[此处为图片1]
整个流程的执行顺序主要分为两个核心模块:
- main_dapo.py:负责数据加载初始化、构建并初始化actor_rollout模型和rm模型,并加载reward_manager组件。
- dapo_ray_trainer.py:承担强化学习(RL)训练的核心流程。
具体执行逻辑如下:
程序会将输入的batch进行重复扩展,针对每个问题q采样n次输出结果。在此过程中,系统记录每一次采样的log概率、对应的reward_score以及advantage值。
为了保证训练质量,系统会对采样结果进行过滤:若某个q的所有生成样本得分全为1或全为0,则该组样本将被舍弃,并重新选取新的q进行采样,直到累积满足条件的样本数量达到train_prompt_bsz的要求。值得注意的是,实际使用的批量大小设置为gen_prompt_bsz = 3 × train_prompt_bsz,通过增加初始查询q的数量,有效避免了因合格样本不足而导致无法填满训练批次的问题。
随后,训练过程按以下层级推进:
- 每个mini_batch的数据用于更新模型参数;
- 每个micro_batch的数据则执行前向传播计算(采用token-mean loss方式),并完成梯度反传。
以下是main_dapo.py的部分代码示例:
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
注意:此处未将主函数与ray_trainer合并,因为ray_trainer可能被其他主程序调用。
"""
import os
import socket
import hydra
import ray
from omegaconf import OmegaConf
from verl.trainer.ppo.reward import load_reward_manager
from verl.utils.device import is_cuda_available
from .dapo_ray_trainer import RayDAPOTrainer
@hydra.main(config_path="config", config_name="dapo_trainer", version_base=None)
def main(config):
run_ppo(config)
#################################################################
# 强化学习训练的主入口函数
#################################################################
def run_ppo(config) -> None:
if not ray.is_initialized():
# 针对本地ray集群的初始化配置
default_runtime_env = {
import socket
import os
from pprint import pprint
from omegaconf import OmegaConf
from verl.utils.fs import copy_to_local
from verl.utils import hf_processor, hf_tokenizer
from verl.single_controller.ray import RayWorkerGroup
# 设置环境变量,控制并行行为与日志输出等级
env_vars = {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "WARN"
}
# 合并默认运行时环境与用户自定义配置
default_runtime_env = {}
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
# 更新 ray 初始化参数中的 runtime_env
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")
# 初始化 Ray 分布式运行时
ray.init(**OmegaConf.to_container(ray_init_kwargs))
try:
# 判断是否启用 Nsight 系统性能分析工具
if (is_cuda_available and
config.global_profiler.tool == "nsys" and
OmegaConf.select(config.global_profiler, "steps") is not None and
len(OmegaConf.select(config.global_profiler, "steps")) > 0):
# 提取 Nsight 控制器选项
nsight_options = OmegaConf.to_container(
config.global_profiler.global_tool_config.nsys.controller_nsight_options
)
# 创建带有 nsight 配置的远程任务执行器
runner = TaskRunner.options(runtime_env={"nsight": nsight_options}).remote()
else:
# 默认情况启动普通远程任务执行器
runner = TaskRunner.remote()
# 触发远程任务执行并等待完成
ray.get(runner.run.remote(config))
finally:
# 确保在程序结束时正确关闭 Ray 运行时
if ray.is_initialized():
ray.shutdown()
[此处为图片1]
@ray.remote(num_cpus=1) # 确保主任务不会被调度到 head 节点上
class TaskRunner:
def run(self, config):
# 输出当前执行节点的主机名和进程ID
print(f"TaskRunner hostname: {socket.gethostname()}, PID: {os.getpid()}")
# 打印解析后的完整配置信息(包含符号值展开)
pprint(OmegaConf.to_container(config, resolve=True))
OmegaConf.resolve(config)
# 从 HDFS 下载模型检查点至本地
local_path = copy_to_local(config.actor_rollout_ref.model.path)
# 初始化分词器及处理器(用于多模态大模型,processor 可能为空)
tokenizer = hf_tokenizer(local_path)
processor = hf_processor(local_path, use_fast=True)
#################################################################
# 加载 actor worker
#################################################################
# 根据策略类型判断是否使用 FSDP 或 FSDP2 分布式训练策略
if config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}:
assert config.critic.strategy in {"fsdp", "fsdp2"}
if config.actor_rollout_ref.actor.strategy == "megatron":
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
from verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorker
ray_worker_group_cls = RayWorkerGroup
elif config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}:
from verl.workers.fsdp_workers import ActorRolloutRefWorker, CriticWorker
ray_worker_group_cls = RayWorkerGroup
else:
raise NotImplementedError
from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role
role_worker_mapping = {
Role.ActorRollout: ray.remote(ActorRolloutRefWorker),
Role.Critic: ray.remote(CriticWorker),
}
global_pool_id = "global_pool"
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
Role.ActorRollout: global_pool_id,
Role.Critic: global_pool_id,
}
# reference model 的配置逻辑
if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss:
role_worker_mapping[Role.RefPolicy] = ray.remote(ActorRolloutRefWorker)
mapping[Role.RefPolicy] = global_pool_id
# reward model 相关组件的导入与注册
if config.reward_model.enable:
if config.reward_model.strategy == "megatron":
from verl.workers.megatron_workers import RewardModelWorker
elif config.reward_model.strategy in {"fsdp", "fsdp2"}:
from verl.workers.fsdp_workers import RewardModelWorker
else:
raise NotImplementedError
role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
mapping[Role.RewardModel] = global_pool_id
# 多源奖励函数的设计说明:
# - 针对基于规则的奖励模型(rule-based rm),直接返回预定义评分
# - 对于模型驱动的奖励机制(model-based rm),调用对应模型进行打分
# - 若输入涉及代码生成且包含测试用例,则提交至沙箱环境执行验证
# - 最终将所有来源的奖励信号进行融合处理
# - 奖励类型由数据样本中的标签字段决定
[此处为图片1]
#################################################################
# 初始化 reward manager,用于依据输入数据计算相应的 reward score
#################################################################
reward_fn = load_reward_manager(
config,
tokenizer,
0,
我们首先来看 from verl.trainer.ppo.reward import load_reward_manager 的实现逻辑。在配置文件 verl/recipe/dapo/run_dapo_qwen2.5_32b.sh 中,定义了 reward 模块的相关参数:
其中涉及的关键 reward 配置包括:
enable_overlong_buffer=Trueoverlong_buffer_len=$((1024 * 4))—— 表示启用较长的缓冲区长度,用于处理超长响应overlong_penalty_factor=1.0—— 超长惩罚因子reward_model.reward_manager=daporeward_model.overlong_buffer.enable=${enable_overlong_buffer}reward_model.overlong_buffer.len=${overlong_buffer_len}reward_model.overlong_buffer.penalty_factor=${overlong_penalty_factor}
接下来查看 verl.trainer.ppo.reward.py 文件中的核心函数 load_reward_manager:
def load_reward_manager(
config: DictConfig, tokenizer: Any, num_examine: int, **reward_kwargs: Any
) -> AbstractRewardManager:
"""
根据配置加载并初始化一个 reward manager 实例。
参数说明:
config: 包含 reward_model 相关字段的 PPO 训练器配置对象。
tokenizer: 用于文本处理的分词器。
num_examine: 需要检查的样本数量。
**reward_kwargs: 传递给 reward manager 的额外关键字参数。
返回值:
指定类型的 reward manager 实例,继承自 AbstractRewardManager。
"""
# 尝试根据配置获取自定义的 reward 函数
# 用户自定义的 reward manager 可通过 custom_reward_fn 进行注册
该函数的主要作用是依据传入的配置信息,动态构建对应的奖励管理器(Reward Manager),支持扩展用户自定义的 reward 逻辑。所有关键参数如最大响应长度、超长响应处理策略等均通过 config 传入,并结合 tokenizer 和其他运行时参数完成初始化。
随后,在主训练流程中,使用该 reward manager 构建 RayDAPOTrainer 实例:
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)
trainer = RayDAPOTrainer(
config=config,
tokenizer=tokenizer,
processor=processor,
role_worker_mapping=role_worker_mapping,
resource_pool_manager=resource_pool_manager,
ray_worker_group_cls=ray_worker_group_cls,
reward_fn=reward_fn,
val_reward_fn=val_reward_fn,
)
trainer.init_workers()
trainer.fit()
if __name__ == "__main__":
main()
注意:验证阶段始终采用基于函数的 Reward Model(function-based RM)进行评估。训练启动前会先初始化所有分布式工作节点,再执行 .fit() 开始 DAPO 强化学习训练流程。
reward_manager_name = config.reward_model.get("reward_manager", "naive")
reward_manager_cls = get_reward_manager_cls(reward_manager_name)
# 根据配置决定是否使用沙箱融合机制
sandbox_config = config.reward_model.get("sandbox_fusion")
sandbox_url = sandbox_config.get("url") if sandbox_config else None
memory_limit_mb = sandbox_config.get("memory_limit_mb", 1024)
# 若存在沙箱地址,则初始化共享管理器与并发信号量
if sandbox_url:
sandbox_manager = multiprocessing.Manager()
_concurrent_semaphore = sandbox_manager.Semaphore(sandbox_config.get("max_concurrent", 64))
final_compute_score = partial(
default_compute_score,
sandbox_fusion_url=sandbox_url,
concurrent_semaphore=_concurrent_semaphore,
memory_limit_mb=memory_limit_mb,
)
else:
final_compute_score = default_compute_score
# 获取自定义评分函数(若未定义则使用默认)
compute_score = get_custom_reward_fn(config)
final_compute_score = compute_score or final_compute_score
# 注意:自定义的reward manager需提前导入并注册到系统中
# 注册方式为:verl.workers.reward_manager.register
# 预设的reward manager类型位于 `verl/workers/reward_manager/` 目录下,包括:
# - naive: NaiveRewardManager
# - prime: PrimeRewardManager
# - batch: BatchRewardManager
# - dapo: DAPORewardManager
# 默认情况下,reward_manager 使用的是 naive 类型(即 NaiveRewardManager)
# 此处实际使用的 reward_manager_cls 为 DAPO 类型
# 实例化并返回指定参数的 reward manager 对象
return reward_manager_cls(
tokenizer=tokenizer,
num_examine=num_examine,
compute_score=final_compute_score,
reward_fn_key=config.data.reward_fn_key,
**reward_kwargs,
)
这里需要明确 dapo 的 reward_manager_cls 具体指向哪个类,因为 reward 的计算依赖于 batch 数据。因此,在深入分析 reward manager 之前,我们暂时搁置这一部分(实际上,dapo 对应的 reward_manager_cls 定义在 verl/verl/workers/reward_manager/dapo.py 中)。接下来,我们先查看 dapo_ray_trainer.py 文件中 batch 数据是如何采样的,之后再回过头来详细解析 reward 的具体计算逻辑。
dapo_ray_trainer.py
################################################################# # RayDAPOTrainer 类继承自 RayPPOTrainer # fit() 方法:执行 DAPO 训练流程,主要包括以下三个环节: # (1)动态采样 # (2)overlong soft reward 的计算 # (3)token-level loss 的处理 #################################################################
[此处为图片1]
curr_step_profile = ( self.global_steps in self.config.global_profiler.steps if self.config.global_profiler.steps is not None else False ) next_step_profile = False prev_step_profile = False timing_raw = defaultdict(float) batch = None
num_prompt_in_batch = 0 num_gen_batches = 0
[此处为图片1]for epoch in range(self.config.trainer.total_epochs): for batch_dict in self.train_dataloader: metrics = {}
with marked_timer("start_profile", timing_raw): self._start_profiling( curr_step_profile if not self.config.global_profiler.profile_continuous_steps else (not prev_step_profile and curr_step_profile) )
new_batch: DataProto = DataProto.from_single_dict(batch_dict) num_gen_batches += 1
if "multi_modal_data" in new_batch.non_tensor_batch.keys(): gen_batch = new_batch.pop( batch_keys=["input_ids", "attention_mask", "position_ids"], non_tensor_batch_keys=["raw_prompt_ids", "multi_modal_data"] ) else: gen_batch = new_batch.pop( batch_keys=["input_ids", "attention_mask", "position_ids"], non_tensor_batch_keys=["raw_prompt_ids"] )
gen_batch_output = gen_batch.repeat( repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True )
is_last_step = self.global_steps >= self.total_training_steps
with marked_timer("step", timing_raw):
# 生成一个批次数据
with marked_timer("gen", timing_raw, "red"):
gen_batch_output = self.actor_rollout_wg.generate_sequences(gen_batch_output)
timing_raw.update(gen_batch_output.meta_info["timing"])
gen_batch_output.meta_info.pop("timing", None)
# 当前配置使用REMAX作为优势估计器时,需额外生成贪心采样结果用于后续advantage计算的baseline
if self.config.algorithm.adv_estimator == AdvantageEstimator.REMAX:
with marked_timer("gen_max", timing_raw, "red"):
gen_baseline_batch = deepcopy(gen_batch)
# 使用贪心解码方式生成baseline输出(do_sample=False)
gen_baseline_batch.meta_info["do_sample"] = False
gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch)
new_batch = new_batch.union(gen_baseline_output)
# 在合并后的批次上计算奖励模型得分
rm_scores = None
if self.use_rm and "rm_scores" not in new_batch.batch.keys():
rm_scores = self.rm_wg.compute_rm_score(new_batch)
new_batch = new_batch.union(rm_scores)
# 基于新批次数据计算奖励基准值
reward_baseline_tensor, _ = compute_reward(new_batch, self.reward_fn)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)
# 确定需要从批次中移除的键名集合
keys_to_pop = set(gen_baseline_output.batch.keys())
if rm_scores is not None:
keys_to_pop.update(rm_scores.batch.keys())
new_batch.pop(batch_keys=list(keys_to_pop))
# 将计算得到的奖励基准张量加入批次
new_batch.batch["reward_baselines"] = reward_baseline_tensor
del rm_scores, gen_baseline_batch, gen_baseline_output
[此处为图片1]
# 此时new_batch的大小等于生成prompt的批量尺寸(gen_prompt_bsz)
# 为每个prompt分配唯一标识符uid
# 引入uid的原因在于:后续对同一问题的多个采样样本进行reward标准化时,需依据其归属的原始query进行分组处理
new_batch.non_tensor_batch["uid"] = np.array(
[str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object
)
# 对batch中涉及的字段执行重复操作(主要作用于uid字段)
# 使各项信息与rollout阶段重复生成的响应序列保持维度对齐
new_batch = new_batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True)
# 完成采样后,将结果整合至new_batch中
# 计算奖励值并处理相关逻辑,支持模型和函数两种方式
# 首先通过奖励模型计算得分,随后调用 reward_fn 融合模型输出与基于规则的结果
if self.use_rm and "rm_scores" not in new_batch.batch.keys():
reward_tensor = self.rm_wg.compute_rm_score(new_batch)
new_batch = new_batch.union(reward_tensor)
# 根据配置的 self.reward_fn 函数,计算 new_batch 中各采样样本的奖励值
reward_tensor, reward_extra_infos_dict = compute_reward(new_batch, self.reward_fn)
new_batch.batch["token_level_scores"] = reward_tensor
if reward_extra_infos_dict:
new_batch.non_tensor_batch.update(
{k: np.array(v) for k, v in reward_extra_infos_dict.items()}
)
# 应用KL散度惩罚(如配置启用)
if self.config.algorithm.use_kl_in_reward:
new_batch, kl_metrics = apply_kl_penalty(
new_batch,
kl_ctrl=self.kl_ctrl_in_reward,
kl_penalty=self.config.algorithm.kl_penalty
)
metrics.update(kl_metrics) # 注意:若使用多个生成批次,此处指标可能被覆盖
else:
new_batch.batch["token_level_rewards"] = new_batch.batch["token_level_scores"]
[此处为图片1]
#################################################################
# dapo 的动态样本过滤(filter groups)模块
#################################################################
if not self.config.algorithm.filter_groups.enable:
batch = new_batch
else:
# 当过滤后提示数量少于训练批次大小时,跳过当前生成批次
metric_name = self.config.algorithm.filter_groups.metric
if metric_name == "seq_final_reward":
# 将序列最终奖励转换为 numpy 格式以便过滤操作
new_batch.non_tensor_batch["seq_final_reward"] = \
new_batch.batch["token_level_rewards"].sum(dim=-1).numpy()
elif metric_name == "seq_reward":
new_batch.non_tensor_batch["seq_reward"] = \
new_batch.batch["token_level_scores"].sum(dim=-1).numpy()
# 数据结构示例:{uid: [r1, r2, r3, ..., rn], uid: [...], ...}
# 用于记录每个轨迹下所有采样结果的奖励值
统计每个轨迹的序列奖励:
prompt_uid2metric_vals = defaultdict(list)
for uid, metric_val in zip(
new_batch.non_tensor_batch["uid"],
new_batch.non_tensor_batch[metric_name],
strict=True
):
prompt_uid2metric_vals[uid].append(metric_val)
计算每个问题(q)对应奖励的标准差:
prompt_uid2metric_std = {}
for prompt_uid, metric_vals in prompt_uid2metric_vals.items():
prompt_uid2metric_std[prompt_uid] = np.std(metric_vals)
[此处为图片1]
筛选出奖励标准差不为0的问题UID,同时保留那些仅有一个样本的问题(避免因样本单一被过滤):
kept_prompt_uids = [
uid
for uid, std in prompt_uid2metric_std.items()
if std > 0 or len(prompt_uid2metric_vals[uid]) == 1
]
累计当前批次中保留下来的有效问题数量:
num_prompt_in_batch += len(kept_prompt_uids)
记录被保留下来的轨迹在当前批次中的索引位置:
kept_traj_idxs = []
for idx, traj_from_prompt_uid in enumerate(new_batch.non_tensor_batch["uid"]):
if traj_from_prompt_uid in kept_prompt_uids:
kept_traj_idxs.append(idx)
[此处为图片2]
根据保留的轨迹索引,从当前批次中提取有效数据:
new_batch = new_batch[kept_traj_idxs]
将筛选后的批次数据累积到主批次中,用于后续训练:
batch = new_batch if batch is None else DataProto.concat([batch, new_batch])
获取配置文件中定义的可训练批次最小规模(以问题数量为准):
prompt_bsz = self.config.data.train_batch_size
若当前累计的有效问题数仍小于设定的最小训练批次大小,则继续累积下一个批次:
if num_prompt_in_batch < prompt_bsz:
print(f"{num_prompt_in_batch=} < {prompt_bsz=}")
max_num_gen_batches = self.config.algorithm.filter_groups.max_num_gen_batches
if max_num_gen_batches <= 0 or num_gen_batches < max_num_gen_batches:
print(f"{num_gen_batches=}. Keep generating...")
self.gen_steps += 1
is_last_step = self.global_steps >= self.total_training_steps
continue
else:
raise ValueError(
f"{num_gen_batches=} >= {max_num_gen_batches=}."
+ " Generated too many. Please check if your data are too difficult."
+ " You could also try set max_num_gen_batches=0 to enable endless trials."
)
[此处为图片3]
当累积的有效问题数量达到或超过最小训练批次要求时,进行批次对齐处理:
else:
# Align the batch
traj_bsz = self.config.data.train_batch_size * self.config.actor_rollout_ref.rollout.n
# 计算响应掩码并更新 batch 数据
batch.batch["response_mask"] = compute_response_mask(batch)
# 对 DP rank 间的有效 token 数量进行均衡处理
# 注意:此操作通常会改变 batch 中数据的顺序,
# 虽然不会影响基于 uid 的优势计算,
# 但可能对损失计算产生影响(由于小批量组合发生变化)
# TODO: 将 DP 均衡与小批量分组逻辑解耦
if self.config.trainer.balance_batch:
self._balance_batch(batch, metrics=metrics)
# 统计每个样本的全局 token 数量
batch.meta_info["global_token_num"] = torch.sum(batch.batch["attention_mask"], dim=-1).tolist()
[此处为图片1]
# 提取经过 filter 后的 batch 中每条轨迹在采样时的 logits(按 token 级别)
# 用于后续重要性采样比率的计算
# 重新计算旧策略下的对数概率
with marked_timer("old_log_prob", timing_raw, "blue"):
old_log_prob = self.actor_rollout_wg.compute_log_prob(batch)
entropys = old_log_prob.batch["entropys"]
response_masks = batch.batch["response_mask"]
loss_agg_mode = self.config.actor_rollout_ref.actor.loss_agg_mode # dapo 场景下该模式为 "token_mean"
# 对熵值按指定方式聚合
entropy_agg = agg_loss(loss_mat=entropys, loss_mask=response_masks, loss_agg_mode=loss_agg_mode)
old_log_prob_metrics = {"actor/entropy": entropy_agg.detach().item()}
metrics.update(old_log_prob_metrics)
# 移除已处理的 entropy 字段
old_log_prob.batch.pop("entropys")
# 合并旧对数概率结果到主 batch
batch = batch.union(old_log_prob)
# 若启用参考策略,则计算其对应的对数概率
if self.use_reference_policy:
with marked_timer("ref", timing_raw, "olive"):
ref_log_prob = self.ref_policy_wg.compute_ref_log_prob(batch)
batch = batch.union(ref_log_prob)
# 若使用 critic 模型,则进行价值估计
if self.use_critic:
with marked_timer("values", timing_raw, "cyan"):
values = self.critic_wg.compute_values(batch)
batch = batch.union(values)
# Compute rollout IS weights and mismatch metrics (inherited from RayPPOTrainer)
batch, is_metrics = self.compute_rollout_importance_weights_and_add_to_batch(batch)
# IS and mismatch metrics already have mismatch/ prefix
metrics.update(is_metrics)
#################################################################
# 计算advantage
#################################################################
with marked_timer("adv", timing_raw, "brown"):
# compute advantages, executed on the driver process
norm_adv_by_std_in_grpo = self.config.algorithm.get("norm_adv_by_std_in_grpo", True)
batch = compute_advantage(
batch,
adv_estimator=self.config.algorithm.adv_estimator,
gamma=self.config.algorithm.gamma,
lam=self.config.algorithm.lam,
num_repeat=self.config.actor_rollout_ref.rollout.n,
norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo,
)
if self.use_critic:
with marked_timer("update_critic", timing_raw, "pink"):
critic_output = self.critic_wg.update_critic(batch)
critic_output_metrics = reduce_metrics(critic_output.meta_info["metrics"])
metrics.update(critic_output_metrics)
# implement critic warmup
if self.config.trainer.critic_warmup <= self.global_steps:
#################################################################
# 更新actor model(batch的大小是train_prompt_size)
# 每个mini_bsz 更新一次模型(参数-累积梯度)
# 每个micro_bsz 累积一次梯度
#################################################################
with marked_timer("update_actor", timing_raw, "red"):
actor_output = self.actor_rollout_wg.update_actor(batch)
actor_output_metrics = reduce_metrics(actor_output.meta_info["metrics"])
metrics.update(actor_output_metrics)
# Log rollout generations if enabled
rollout_data_dir = self.config.trainer.get("rollout_data_dir", None)
if rollout_data_dir:
self._log_rollout_data(batch, reward_extra_infos_dict, timing_raw, rollout_data_dir)
# validate
if (
self.val_reward_fn is not None
if self.config.trainer.test_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.test_freq == 0):
with marked_timer("testing", timing_raw, "green"):
val_metrics: dict = self._validate()
if is_last_step:
last_val_metrics = val_metrics
metrics.update(val_metrics)
if self.config.trainer.save_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.save_freq == 0):
with marked_timer("save_checkpoint", timing_raw, "green"):
self._save_checkpoint()
with marked_timer("stop_profile", timing_raw):
next_step_profile = (
self.global_steps + 1 in self.config.global_profiler.steps
if self.config.global_profiler.steps is not None
else False
)
self._stop_profiling(
curr_step_profile and not next_step_profile
if self.config.global_profiler.profile_continuous_steps
else curr_step_profile
)
prev_step_profile = curr_step_profile
curr_step_profile = next_step_profile
# Gather performance and data-related metrics
metrics.update(compute_data_metrics(batch=batch, use_critic=self.use_critic))
metrics.update(compute_timing_metrics(batch=batch, timing_raw=timing_raw))
metrics.update(compute_throughout_metrics(batch=batch, timing_raw=timing_raw, n_gpus=n_gpus))
# Reset timing measurements after collection
timing_raw = defaultdict(float)
metrics["train/num_gen_batches"] = num_gen_batches
# Clear batch variables for next iteration
batch = None
num_prompt_in_batch = 0
num_gen_batches = 0
# TODO: implement actual tflpo and theoretical tflpo
# TODO: make a canonical logger that supports various backend
logger.log(data=metrics, step=self.global_steps)
if is_last_step:
pprint(f"Final validation metrics: {last_val_metrics}")
progress_bar.close()
return
# Update training progress indicators
progress_bar.update(1)
self.global_steps += 1
self.gen_steps += 1
# Check if checkpoint directory exists for current step
checkpoint_dir = os.path.join(self.config.trainer.default_local_dir, f"global_step_{self.global_steps}")
if not os.path.exists(checkpoint_dir):
# Create final checkpoint if missing
timing_raw = defaultdict(float)
with marked_timer("save_checkpoint", timing_raw, "green"):
self._save_checkpoint()
metrics = {f"timing/{k}": v for k, v in timing_raw.items()}
logger.log(data=metrics, step=self.global_steps)
接下来我们分析一下 dapo 中 reward manager 的实现逻辑。与 PPO 相比,其核心差异在于引入了 overlong_buffer 机制,并基于响应长度计算相应的奖励值。
[此处为图片1]
源码路径位于:
verl/verl/workers/reward_manager/dapo.py
#################################################################
# 通过 dapo 注册 DAPORewardManager,后续可通过
# reward_manager_cls = get_reward_manager_cls(reward_manager_name) 获取该类
#################################################################
@register("dapo")
class DAPORewardManager(AbstractRewardManager):
"""The reward manager."""
def __init__(
self,
tokenizer,
num_examine,
compute_score=None,
reward_fn_key="data_source",
max_resp_len=None,
overlong_buffer_cfg=None,
) -> None:
self.tokenizer = tokenizer
self.num_examine = num_examine # 表示解码后输出到控制台的响应批次数
self.compute_score = compute_score or default_compute_score
self.reward_fn_key = reward_fn_key
self.overlong_buffer_cfg = overlong_buffer_cfg
self.max_resp_len = max_resp_len
if self.overlong_buffer_cfg is not None:
assert self.max_resp_len is not None, (
f"max_resp_len 必须设置,当使用 {overlong_buffer_cfg=} 时不能为 None"
)
assert self.max_resp_len >= self.overlong_buffer_cfg.len, (
"max_resp_len 需大于等于 overlong_buffer 的长度"
)
这是 DAPO reward manager 的主要实现部分。其关键功能集中在 __call__ 方法中,用于根据输入数据动态生成奖励信号。
#################################################################
# DAPO reward manager 的核心调用方法
#################################################################
def __call__(self, data: DataProto, return_dict: bool = False):
"""此函数将根据可用的数据集逐步扩展功能"""
# 若 batch 中已包含 rm_scores,则直接返回;否则使用 rm_score_fn 计算
if "rm_scores" in data.batch.keys():
if return_dict:
reward_extra_keys = data.meta_info.get("reward_extra_keys", [])
reward_extra_info = {
key: data.non_tensor_batch[key] for key in reward_extra_keys
}
return {
"reward_tensor": data.batch["rm_scores"],
"reward_extra_info": reward_extra_info
}
else:
return data.batch["rm_scores"]
reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32)
reward_extra_info = defaultdict(list)
already_print_data_sources = {}
for i in range(len(data)):
data_item = data[i] # DataProtoItem
prompt_ids = data_item.batch["prompts"]
prompt_length = prompt_ids.shape[-1]
# 注意:prompt_ids 使用的是左填充方式
# 而 response_ids 则采用右填充
valid_prompt_length = data_item.batch["attention_mask"][:prompt_length].sum()
valid_prompt_ids = prompt_ids[-valid_prompt_length:]
response_ids = data_item.batch["responses"]
valid_response_length = data_item.batch["attention_mask"][prompt_length:].sum()
valid_response_ids = response_ids[:valid_response_length]
# 进行解码操作
prompt_str = self.tokenizer.decode(valid_prompt_ids, skip_special_tokens=True)
response_str = self.tokenizer.decode(valid_response_ids, skip_special_tokens=True)
eos_token = self.tokenizer.eos_token
if response_str.endswith(eos_token):
response_str = response_str[: -len(eos_token)]
ground_truth = data_item.non_tensor_batch["reward_model"]["ground_truth"]
data_source = data_item.non_tensor_batch[self.reward_fn_key]
extra_info = data_item.non_tensor_batch.get("extra_info", {})
rollout_reward_scores = data_item.non_tensor_batch.get("reward_scores", {})
extra_info["rollout_reward_scores"] = rollout_reward_scores
result = self.compute_score(
data_source=data_source,
solution_str=response_str,
ground_truth=ground_truth,
extra_info=extra_info,
)
score: float
if isinstance(result, dict):
score = result["score"]
# 保存包括原始 reward 在内的所有信息
for key, value in result.items():
reward_extra_info[key].append(value)
else:
score = result
reward_extra_info["acc"].append(score)
reward = score
# 开始处理 overlong reward 的计算逻辑
if self.overlong_buffer_cfg.enable:
overlong_buffer_len = self.overlong_buffer_cfg.len
expected_len = self.max_resp_len - overlong_buffer_len
exceed_len = valid_response_length - expected_len
overlong_penalty_factor = self.overlong_buffer_cfg.penalty_factor
overlong_reward = min(-exceed_len / overlong_buffer_len * overlong_penalty_factor, 0)
reward += overlong_reward
if self.overlong_buffer_cfg.log:
reward_extra_info["overlong_reward"].append(overlong_reward)
reward_extra_info["overlong"].append(overlong_reward < 0)
reward_tensor[i, valid_response_length - 1] = reward
if data_source not in already_print_data_sources:
already_print_data_sources[data_source] = 0
if already_print_data_sources[data_source] < self.num_examine:
already_print_data_sources[data_source] += 1
print("[prompt]", prompt_str)
print("[response]", response_str)
print("[ground_truth]", ground_truth)
if isinstance(result, dict):
for key, value in result.items():
print(f"[{key}]", value)
else:
print("[score]", score)
if return_dict:
return {
"reward_tensor": reward_tensor,
"reward_extra_info": reward_extra_info,
}
else:
return reward_tensor


雷达卡


京公网安备 11010802022788号







