楼主: cxvm1234
313 0

[其他] 吴恩达深度学习课程一:神经网络和深度学习 第二周:神经网络基础 课后习题和代码实践 [推广有奖]

  • 0关注
  • 0粉丝

学前班

80%

还不是VIP/贵宾

-

威望
0
论坛币
10 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
30 点
帖子
2
精华
0
在线时间
0 小时
注册时间
2018-9-13
最后登录
2018-9-13

楼主
cxvm1234 发表于 2025-11-22 08:36:41 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
在分析蹦从瓷瞬参考verl对dapo的实现时,首先需要查看入口脚本文件.sh与.py。相关文件位于./recipe/dapo/目录下,其结构如下所示:

.
├── config
│   ├── dapo_megatron_trainer.yaml
│   └── dapo_trainer.yaml
├── dapo_ray_trainer.py
├── main_dapo.py
├── prepare_dapo_data.sh
├── README.md
├── run_dapo_qwen2.5_32b.sh

[此处为图片1]

整个流程的执行顺序主要分为两个核心模块:
- main_dapo.py:负责数据加载初始化、构建并初始化actor_rollout模型和rm模型,并加载reward_manager组件。
- dapo_ray_trainer.py:承担强化学习(RL)训练的核心流程。

具体执行逻辑如下:
程序会将输入的batch进行重复扩展,针对每个问题q采样n次输出结果。在此过程中,系统记录每一次采样的log概率、对应的reward_score以及advantage值。

为了保证训练质量,系统会对采样结果进行过滤:若某个q的所有生成样本得分全为1或全为0,则该组样本将被舍弃,并重新选取新的q进行采样,直到累积满足条件的样本数量达到train_prompt_bsz的要求。值得注意的是,实际使用的批量大小设置为gen_prompt_bsz = 3 × train_prompt_bsz,通过增加初始查询q的数量,有效避免了因合格样本不足而导致无法填满训练批次的问题。

随后,训练过程按以下层级推进:
- 每个mini_batch的数据用于更新模型参数;
- 每个micro_batch的数据则执行前向传播计算(采用token-mean loss方式),并完成梯度反传。

以下是main_dapo.py的部分代码示例:

# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
注意:此处未将主函数与ray_trainer合并,因为ray_trainer可能被其他主程序调用。
"""

import os
import socket
import hydra
import ray
from omegaconf import OmegaConf
from verl.trainer.ppo.reward import load_reward_manager
from verl.utils.device import is_cuda_available
from .dapo_ray_trainer import RayDAPOTrainer

@hydra.main(config_path="config", config_name="dapo_trainer", version_base=None)
def main(config):
    run_ppo(config)

#################################################################
# 强化学习训练的主入口函数
#################################################################
def run_ppo(config) -> None:
    if not ray.is_initialized():
        # 针对本地ray集群的初始化配置
        default_runtime_env = {
import socket
import os
from pprint import pprint
from omegaconf import OmegaConf
from verl.utils.fs import copy_to_local
from verl.utils import hf_processor, hf_tokenizer
from verl.single_controller.ray import RayWorkerGroup

# 设置环境变量,控制并行行为与日志输出等级
env_vars = {
    "TOKENIZERS_PARALLELISM": "true",
    "NCCL_DEBUG": "WARN",
    "VLLM_LOGGING_LEVEL": "WARN"
}

# 合并默认运行时环境与用户自定义配置
default_runtime_env = {}
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)

# 更新 ray 初始化参数中的 runtime_env
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
print(f"ray init kwargs: {ray_init_kwargs}")

# 初始化 Ray 分布式运行时
ray.init(**OmegaConf.to_container(ray_init_kwargs))

try:
    # 判断是否启用 Nsight 系统性能分析工具
    if (is_cuda_available and 
        config.global_profiler.tool == "nsys" and 
        OmegaConf.select(config.global_profiler, "steps") is not None and 
        len(OmegaConf.select(config.global_profiler, "steps")) > 0):
        
        # 提取 Nsight 控制器选项
        nsight_options = OmegaConf.to_container(
            config.global_profiler.global_tool_config.nsys.controller_nsight_options
        )
        # 创建带有 nsight 配置的远程任务执行器
        runner = TaskRunner.options(runtime_env={"nsight": nsight_options}).remote()
    else:
        # 默认情况启动普通远程任务执行器
        runner = TaskRunner.remote()

    # 触发远程任务执行并等待完成
    ray.get(runner.run.remote(config))
finally:
    # 确保在程序结束时正确关闭 Ray 运行时
    if ray.is_initialized():
        ray.shutdown()

[此处为图片1]

@ray.remote(num_cpus=1)  # 确保主任务不会被调度到 head 节点上
class TaskRunner:
    def run(self, config):
        # 输出当前执行节点的主机名和进程ID
        print(f"TaskRunner hostname: {socket.gethostname()}, PID: {os.getpid()}")
        
        # 打印解析后的完整配置信息(包含符号值展开)
        pprint(OmegaConf.to_container(config, resolve=True))
        OmegaConf.resolve(config)
        
        # 从 HDFS 下载模型检查点至本地
        local_path = copy_to_local(config.actor_rollout_ref.model.path)
        
        # 初始化分词器及处理器(用于多模态大模型,processor 可能为空)
        tokenizer = hf_tokenizer(local_path)
        processor = hf_processor(local_path, use_fast=True)
        
        #################################################################
        # 加载 actor worker
        #################################################################
        
        # 根据策略类型判断是否使用 FSDP 或 FSDP2 分布式训练策略
        if config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}:
assert config.critic.strategy in {"fsdp", "fsdp2"}
if config.actor_rollout_ref.actor.strategy == "megatron":
    assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
    from verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorker
    ray_worker_group_cls = RayWorkerGroup
elif config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}:
    from verl.workers.fsdp_workers import ActorRolloutRefWorker, CriticWorker
    ray_worker_group_cls = RayWorkerGroup
else:
    raise NotImplementedError

from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role

role_worker_mapping = {
    Role.ActorRollout: ray.remote(ActorRolloutRefWorker),
    Role.Critic: ray.remote(CriticWorker),
}

global_pool_id = "global_pool"
resource_pool_spec = {
    global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
    Role.ActorRollout: global_pool_id,
    Role.Critic: global_pool_id,
}

# reference model 的配置逻辑
if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss:
    role_worker_mapping[Role.RefPolicy] = ray.remote(ActorRolloutRefWorker)
    mapping[Role.RefPolicy] = global_pool_id

# reward model 相关组件的导入与注册
if config.reward_model.enable:
    if config.reward_model.strategy == "megatron":
        from verl.workers.megatron_workers import RewardModelWorker
    elif config.reward_model.strategy in {"fsdp", "fsdp2"}:
        from verl.workers.fsdp_workers import RewardModelWorker
    else:
        raise NotImplementedError

    role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
    mapping[Role.RewardModel] = global_pool_id

# 多源奖励函数的设计说明:
# - 针对基于规则的奖励模型(rule-based rm),直接返回预定义评分
# - 对于模型驱动的奖励机制(model-based rm),调用对应模型进行打分
# - 若输入涉及代码生成且包含测试用例,则提交至沙箱环境执行验证
# - 最终将所有来源的奖励信号进行融合处理
# - 奖励类型由数据样本中的标签字段决定

[此处为图片1]

#################################################################
# 初始化 reward manager,用于依据输入数据计算相应的 reward score
#################################################################
reward_fn = load_reward_manager(
    config,
    tokenizer,
    0,

我们首先来看 from verl.trainer.ppo.reward import load_reward_manager 的实现逻辑。在配置文件 verl/recipe/dapo/run_dapo_qwen2.5_32b.sh 中,定义了 reward 模块的相关参数:

其中涉及的关键 reward 配置包括:

  • enable_overlong_buffer=True
  • overlong_buffer_len=$((1024 * 4)) —— 表示启用较长的缓冲区长度,用于处理超长响应
  • overlong_penalty_factor=1.0 —— 超长惩罚因子
  • reward_model.reward_manager=dapo
  • reward_model.overlong_buffer.enable=${enable_overlong_buffer}
  • reward_model.overlong_buffer.len=${overlong_buffer_len}
  • reward_model.overlong_buffer.penalty_factor=${overlong_penalty_factor}
[此处为图片1]

接下来查看 verl.trainer.ppo.reward.py 文件中的核心函数 load_reward_manager

def load_reward_manager(
    config: DictConfig, tokenizer: Any, num_examine: int, **reward_kwargs: Any
) -> AbstractRewardManager:
    """
    根据配置加载并初始化一个 reward manager 实例。

    参数说明:
    config: 包含 reward_model 相关字段的 PPO 训练器配置对象。
    tokenizer: 用于文本处理的分词器。
    num_examine: 需要检查的样本数量。
    **reward_kwargs: 传递给 reward manager 的额外关键字参数。

    返回值:
    指定类型的 reward manager 实例,继承自 AbstractRewardManager。
    """
    # 尝试根据配置获取自定义的 reward 函数
    # 用户自定义的 reward manager 可通过 custom_reward_fn 进行注册
    

该函数的主要作用是依据传入的配置信息,动态构建对应的奖励管理器(Reward Manager),支持扩展用户自定义的 reward 逻辑。所有关键参数如最大响应长度、超长响应处理策略等均通过 config 传入,并结合 tokenizer 和其他运行时参数完成初始化。

随后,在主训练流程中,使用该 reward manager 构建 RayDAPOTrainer 实例:

resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)

trainer = RayDAPOTrainer(
    config=config,
    tokenizer=tokenizer,
    processor=processor,
    role_worker_mapping=role_worker_mapping,
    resource_pool_manager=resource_pool_manager,
    ray_worker_group_cls=ray_worker_group_cls,
    reward_fn=reward_fn,
    val_reward_fn=val_reward_fn,
)
trainer.init_workers()
trainer.fit()

if __name__ == "__main__":
    main()
    

注意:验证阶段始终采用基于函数的 Reward Model(function-based RM)进行评估。训练启动前会先初始化所有分布式工作节点,再执行 .fit() 开始 DAPO 强化学习训练流程。

reward_manager_name = config.reward_model.get("reward_manager", "naive")
reward_manager_cls = get_reward_manager_cls(reward_manager_name)

# 根据配置决定是否使用沙箱融合机制
sandbox_config = config.reward_model.get("sandbox_fusion")
sandbox_url = sandbox_config.get("url") if sandbox_config else None
memory_limit_mb = sandbox_config.get("memory_limit_mb", 1024)

# 若存在沙箱地址,则初始化共享管理器与并发信号量
if sandbox_url:
    sandbox_manager = multiprocessing.Manager()
    _concurrent_semaphore = sandbox_manager.Semaphore(sandbox_config.get("max_concurrent", 64))
    final_compute_score = partial(
        default_compute_score,
        sandbox_fusion_url=sandbox_url,
        concurrent_semaphore=_concurrent_semaphore,
        memory_limit_mb=memory_limit_mb,
    )
else:
    final_compute_score = default_compute_score

# 获取自定义评分函数(若未定义则使用默认)
compute_score = get_custom_reward_fn(config)
final_compute_score = compute_score or final_compute_score

# 注意:自定义的reward manager需提前导入并注册到系统中
# 注册方式为:verl.workers.reward_manager.register
# 预设的reward manager类型位于 `verl/workers/reward_manager/` 目录下,包括:
# - naive: NaiveRewardManager
# - prime: PrimeRewardManager  
# - batch: BatchRewardManager
# - dapo: DAPORewardManager
# 默认情况下,reward_manager 使用的是 naive 类型(即 NaiveRewardManager)

# 此处实际使用的 reward_manager_cls 为 DAPO 类型
# 实例化并返回指定参数的 reward manager 对象
return reward_manager_cls(
    tokenizer=tokenizer,
    num_examine=num_examine,
    compute_score=final_compute_score,
    reward_fn_key=config.data.reward_fn_key,
    **reward_kwargs,
)
这里需要明确 dapo 的 reward_manager_cls 具体指向哪个类,因为 reward 的计算依赖于 batch 数据。因此,在深入分析 reward manager 之前,我们暂时搁置这一部分(实际上,dapo 对应的 reward_manager_cls 定义在 verl/verl/workers/reward_manager/dapo.py 中)。接下来,我们先查看 dapo_ray_trainer.py 文件中 batch 数据是如何采样的,之后再回过头来详细解析 reward 的具体计算逻辑。

dapo_ray_trainer.py

#################################################################
# RayDAPOTrainer 类继承自 RayPPOTrainer
# fit() 方法:执行 DAPO 训练流程,主要包括以下三个环节:
# (1)动态采样
# (2)overlong soft reward 的计算
# (3)token-level loss 的处理
#################################################################
[此处为图片1]
PPO 的训练主循环。 驱动进程仅需通过 RPC 调用工作节点组的计算函数,即可构建完整的 PPO 数据流。 轻量级的优势值(advantage)计算则在驱动进程中完成。 导入 OmegaConf 模块与 Tracking 工具用于日志追踪: from omegaconf import OmegaConf from verl.utils.tracking import Tracking 初始化日志跟踪器 logger: logger = Tracking( project_name = self.config.trainer.project_name, experiment_name = self.config.trainer.experiment_name, default_backend = self.config.trainer.logger, config = OmegaConf.to_container(self.config, resolve=True) ) 设置全局步数和生成步数初始值: self.global_steps = 0 self.gen_steps = 0 # 加载检查点以恢复训练状态 self._load_checkpoint() # 训练前执行验证 # 当前版本仅支持使用 reward_function 进行验证 if self.val_reward_fn is not None and self.config.trainer.get("val_before_train", True): val_metrics = self._validate() assert val_metrics, f"{val_metrics=}" pprint(f"Initial validation metrics: {val_metrics}") logger.log(data=val_metrics, step=self.global_steps) # 若配置为仅验证,则直接返回 if self.config.trainer.get("val_only", False): return # 若 rollout 配置中设置了跳过标志 if self.config.actor_rollout_ref.rollout.get("skip_rollout", False): rollout_skip = RolloutSkip(self.config, self.actor_rollout_wg) rollout_skip.wrap_generate_sequences() # 创建训练进度条 progress_bar = tqdm( total=self.total_training_steps, initial=self.global_steps, desc="Training Progress" ) # 步数从 1 开始递增 self.global_steps += 1 self.gen_steps += 1 last_val_metrics = None

curr_step_profile = ( self.global_steps in self.config.global_profiler.steps if self.config.global_profiler.steps is not None else False ) next_step_profile = False prev_step_profile = False timing_raw = defaultdict(float) batch = None

num_prompt_in_batch = 0 num_gen_batches = 0

[此处为图片1]

for epoch in range(self.config.trainer.total_epochs): for batch_dict in self.train_dataloader: metrics = {}

with marked_timer("start_profile", timing_raw): self._start_profiling( curr_step_profile if not self.config.global_profiler.profile_continuous_steps else (not prev_step_profile and curr_step_profile) )

new_batch: DataProto = DataProto.from_single_dict(batch_dict) num_gen_batches += 1

if "multi_modal_data" in new_batch.non_tensor_batch.keys(): gen_batch = new_batch.pop( batch_keys=["input_ids", "attention_mask", "position_ids"], non_tensor_batch_keys=["raw_prompt_ids", "multi_modal_data"] ) else: gen_batch = new_batch.pop( batch_keys=["input_ids", "attention_mask", "position_ids"], non_tensor_batch_keys=["raw_prompt_ids"] )

gen_batch_output = gen_batch.repeat( repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True )

is_last_step = self.global_steps >= self.total_training_steps

with marked_timer("step", timing_raw):
    # 生成一个批次数据
    with marked_timer("gen", timing_raw, "red"):
        gen_batch_output = self.actor_rollout_wg.generate_sequences(gen_batch_output)
        timing_raw.update(gen_batch_output.meta_info["timing"])
        gen_batch_output.meta_info.pop("timing", None)

# 当前配置使用REMAX作为优势估计器时,需额外生成贪心采样结果用于后续advantage计算的baseline
if self.config.algorithm.adv_estimator == AdvantageEstimator.REMAX:
    with marked_timer("gen_max", timing_raw, "red"):
        gen_baseline_batch = deepcopy(gen_batch)
        # 使用贪心解码方式生成baseline输出(do_sample=False)
        gen_baseline_batch.meta_info["do_sample"] = False
        gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch)
        new_batch = new_batch.union(gen_baseline_output)

# 在合并后的批次上计算奖励模型得分
rm_scores = None
if self.use_rm and "rm_scores" not in new_batch.batch.keys():
    rm_scores = self.rm_wg.compute_rm_score(new_batch)
    new_batch = new_batch.union(rm_scores)

# 基于新批次数据计算奖励基准值
reward_baseline_tensor, _ = compute_reward(new_batch, self.reward_fn)
reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1)

# 确定需要从批次中移除的键名集合
keys_to_pop = set(gen_baseline_output.batch.keys())
if rm_scores is not None:
    keys_to_pop.update(rm_scores.batch.keys())
new_batch.pop(batch_keys=list(keys_to_pop))

# 将计算得到的奖励基准张量加入批次
new_batch.batch["reward_baselines"] = reward_baseline_tensor
del rm_scores, gen_baseline_batch, gen_baseline_output

[此处为图片1]

# 此时new_batch的大小等于生成prompt的批量尺寸(gen_prompt_bsz)
# 为每个prompt分配唯一标识符uid
# 引入uid的原因在于:后续对同一问题的多个采样样本进行reward标准化时,需依据其归属的原始query进行分组处理

new_batch.non_tensor_batch["uid"] = np.array(
    [str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object
)

# 对batch中涉及的字段执行重复操作(主要作用于uid字段)
# 使各项信息与rollout阶段重复生成的响应序列保持维度对齐
new_batch = new_batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True)

# 完成采样后,将结果整合至new_batch中

# 计算奖励值并处理相关逻辑,支持模型和函数两种方式
# 首先通过奖励模型计算得分,随后调用 reward_fn 融合模型输出与基于规则的结果

if self.use_rm and "rm_scores" not in new_batch.batch.keys():
    reward_tensor = self.rm_wg.compute_rm_score(new_batch)
    new_batch = new_batch.union(reward_tensor)

# 根据配置的 self.reward_fn 函数,计算 new_batch 中各采样样本的奖励值
reward_tensor, reward_extra_infos_dict = compute_reward(new_batch, self.reward_fn)
new_batch.batch["token_level_scores"] = reward_tensor

if reward_extra_infos_dict:
    new_batch.non_tensor_batch.update(
        {k: np.array(v) for k, v in reward_extra_infos_dict.items()}
    )

# 应用KL散度惩罚(如配置启用)
if self.config.algorithm.use_kl_in_reward:
    new_batch, kl_metrics = apply_kl_penalty(
        new_batch,
        kl_ctrl=self.kl_ctrl_in_reward,
        kl_penalty=self.config.algorithm.kl_penalty
    )
    metrics.update(kl_metrics)  # 注意:若使用多个生成批次,此处指标可能被覆盖
else:
    new_batch.batch["token_level_rewards"] = new_batch.batch["token_level_scores"]

[此处为图片1]

#################################################################
# dapo 的动态样本过滤(filter groups)模块
#################################################################

if not self.config.algorithm.filter_groups.enable:
    batch = new_batch
else:
    # 当过滤后提示数量少于训练批次大小时,跳过当前生成批次
    metric_name = self.config.algorithm.filter_groups.metric

    if metric_name == "seq_final_reward":
        # 将序列最终奖励转换为 numpy 格式以便过滤操作
        new_batch.non_tensor_batch["seq_final_reward"] = \
            new_batch.batch["token_level_rewards"].sum(dim=-1).numpy()
    elif metric_name == "seq_reward":
        new_batch.non_tensor_batch["seq_reward"] = \
            new_batch.batch["token_level_scores"].sum(dim=-1).numpy()

    # 数据结构示例:{uid: [r1, r2, r3, ..., rn], uid: [...], ...}
    # 用于记录每个轨迹下所有采样结果的奖励值

统计每个轨迹的序列奖励:

prompt_uid2metric_vals = defaultdict(list)
for uid, metric_val in zip(
    new_batch.non_tensor_batch["uid"], 
    new_batch.non_tensor_batch[metric_name], 
    strict=True
):
    prompt_uid2metric_vals[uid].append(metric_val)

计算每个问题(q)对应奖励的标准差:

prompt_uid2metric_std = {}
for prompt_uid, metric_vals in prompt_uid2metric_vals.items():
    prompt_uid2metric_std[prompt_uid] = np.std(metric_vals)
[此处为图片1]

筛选出奖励标准差不为0的问题UID,同时保留那些仅有一个样本的问题(避免因样本单一被过滤):

kept_prompt_uids = [
    uid
    for uid, std in prompt_uid2metric_std.items()
    if std > 0 or len(prompt_uid2metric_vals[uid]) == 1
]

累计当前批次中保留下来的有效问题数量:

num_prompt_in_batch += len(kept_prompt_uids)

记录被保留下来的轨迹在当前批次中的索引位置:

kept_traj_idxs = []
for idx, traj_from_prompt_uid in enumerate(new_batch.non_tensor_batch["uid"]):
    if traj_from_prompt_uid in kept_prompt_uids:
        kept_traj_idxs.append(idx)
[此处为图片2]

根据保留的轨迹索引,从当前批次中提取有效数据:

new_batch = new_batch[kept_traj_idxs]

将筛选后的批次数据累积到主批次中,用于后续训练:

batch = new_batch if batch is None else DataProto.concat([batch, new_batch])

获取配置文件中定义的可训练批次最小规模(以问题数量为准):

prompt_bsz = self.config.data.train_batch_size

若当前累计的有效问题数仍小于设定的最小训练批次大小,则继续累积下一个批次:

if num_prompt_in_batch < prompt_bsz:
    print(f"{num_prompt_in_batch=} < {prompt_bsz=}")
    max_num_gen_batches = self.config.algorithm.filter_groups.max_num_gen_batches
    if max_num_gen_batches <= 0 or num_gen_batches < max_num_gen_batches:
        print(f"{num_gen_batches=}. Keep generating...")
        self.gen_steps += 1
        is_last_step = self.global_steps >= self.total_training_steps
        continue
    else:
        raise ValueError(
            f"{num_gen_batches=} >= {max_num_gen_batches=}."
            + " Generated too many. Please check if your data are too difficult."
            + " You could also try set max_num_gen_batches=0 to enable endless trials."
        )
[此处为图片3]

当累积的有效问题数量达到或超过最小训练批次要求时,进行批次对齐处理:

else:
    # Align the batch
    traj_bsz = self.config.data.train_batch_size * self.config.actor_rollout_ref.rollout.n
# 计算响应掩码并更新 batch 数据
batch.batch["response_mask"] = compute_response_mask(batch)

# 对 DP rank 间的有效 token 数量进行均衡处理
# 注意:此操作通常会改变 batch 中数据的顺序,
# 虽然不会影响基于 uid 的优势计算,
# 但可能对损失计算产生影响(由于小批量组合发生变化)
# TODO: 将 DP 均衡与小批量分组逻辑解耦
if self.config.trainer.balance_batch:
    self._balance_batch(batch, metrics=metrics)

# 统计每个样本的全局 token 数量
batch.meta_info["global_token_num"] = torch.sum(batch.batch["attention_mask"], dim=-1).tolist()

[此处为图片1]

# 提取经过 filter 后的 batch 中每条轨迹在采样时的 logits(按 token 级别)
# 用于后续重要性采样比率的计算

# 重新计算旧策略下的对数概率
with marked_timer("old_log_prob", timing_raw, "blue"):
    old_log_prob = self.actor_rollout_wg.compute_log_prob(batch)
    entropys = old_log_prob.batch["entropys"]
    response_masks = batch.batch["response_mask"]
    loss_agg_mode = self.config.actor_rollout_ref.actor.loss_agg_mode  # dapo 场景下该模式为 "token_mean"

    # 对熵值按指定方式聚合
    entropy_agg = agg_loss(loss_mat=entropys, loss_mask=response_masks, loss_agg_mode=loss_agg_mode)
    old_log_prob_metrics = {"actor/entropy": entropy_agg.detach().item()}
    metrics.update(old_log_prob_metrics)
    
    # 移除已处理的 entropy 字段
    old_log_prob.batch.pop("entropys")
    
    # 合并旧对数概率结果到主 batch
    batch = batch.union(old_log_prob)

# 若启用参考策略,则计算其对应的对数概率
if self.use_reference_policy:
    with marked_timer("ref", timing_raw, "olive"):
        ref_log_prob = self.ref_policy_wg.compute_ref_log_prob(batch)
        batch = batch.union(ref_log_prob)

# 若使用 critic 模型,则进行价值估计
if self.use_critic:

with marked_timer("values", timing_raw, "cyan"):

    values = self.critic_wg.compute_values(batch)

    batch = batch.union(values)

# Compute rollout IS weights and mismatch metrics (inherited from RayPPOTrainer)

batch, is_metrics = self.compute_rollout_importance_weights_and_add_to_batch(batch)

# IS and mismatch metrics already have mismatch/ prefix

metrics.update(is_metrics)

#################################################################

# 计算advantage

#################################################################

with marked_timer("adv", timing_raw, "brown"):

    # compute advantages, executed on the driver process

    norm_adv_by_std_in_grpo = self.config.algorithm.get("norm_adv_by_std_in_grpo", True)

    batch = compute_advantage(

        batch,

        adv_estimator=self.config.algorithm.adv_estimator,

        gamma=self.config.algorithm.gamma,

        lam=self.config.algorithm.lam,

        num_repeat=self.config.actor_rollout_ref.rollout.n,

        norm_adv_by_std_in_grpo=norm_adv_by_std_in_grpo,

    )

if self.use_critic:

    with marked_timer("update_critic", timing_raw, "pink"):

        critic_output = self.critic_wg.update_critic(batch)

        critic_output_metrics = reduce_metrics(critic_output.meta_info["metrics"])

        metrics.update(critic_output_metrics)

# implement critic warmup

if self.config.trainer.critic_warmup <= self.global_steps:

#################################################################

# 更新actor model(batch的大小是train_prompt_size)

# 每个mini_bsz 更新一次模型(参数-累积梯度)

# 每个micro_bsz 累积一次梯度

#################################################################

with marked_timer("update_actor", timing_raw, "red"):

    actor_output = self.actor_rollout_wg.update_actor(batch)

    actor_output_metrics = reduce_metrics(actor_output.meta_info["metrics"])

    metrics.update(actor_output_metrics)

# Log rollout generations if enabled

rollout_data_dir = self.config.trainer.get("rollout_data_dir", None)

if rollout_data_dir:

    self._log_rollout_data(batch, reward_extra_infos_dict, timing_raw, rollout_data_dir)

# validate

if (

    self.val_reward_fn is not None

if self.config.trainer.test_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.test_freq == 0):
    with marked_timer("testing", timing_raw, "green"):
        val_metrics: dict = self._validate()
        if is_last_step:
            last_val_metrics = val_metrics
        metrics.update(val_metrics)

if self.config.trainer.save_freq > 0 and (is_last_step or self.global_steps % self.config.trainer.save_freq == 0):
    with marked_timer("save_checkpoint", timing_raw, "green"):
        self._save_checkpoint()

with marked_timer("stop_profile", timing_raw):
    next_step_profile = (
        self.global_steps + 1 in self.config.global_profiler.steps
        if self.config.global_profiler.steps is not None
        else False
    )
    self._stop_profiling(
        curr_step_profile and not next_step_profile
        if self.config.global_profiler.profile_continuous_steps
        else curr_step_profile
    )
    prev_step_profile = curr_step_profile
    curr_step_profile = next_step_profile

# Gather performance and data-related metrics
metrics.update(compute_data_metrics(batch=batch, use_critic=self.use_critic))
metrics.update(compute_timing_metrics(batch=batch, timing_raw=timing_raw))
metrics.update(compute_throughout_metrics(batch=batch, timing_raw=timing_raw, n_gpus=n_gpus))

# Reset timing measurements after collection
timing_raw = defaultdict(float)
metrics["train/num_gen_batches"] = num_gen_batches

# Clear batch variables for next iteration
batch = None
num_prompt_in_batch = 0
num_gen_batches = 0

# TODO: implement actual tflpo and theoretical tflpo
# TODO: make a canonical logger that supports various backend
logger.log(data=metrics, step=self.global_steps)

if is_last_step:
    pprint(f"Final validation metrics: {last_val_metrics}")
    progress_bar.close()
    return

# Update training progress indicators
progress_bar.update(1)
self.global_steps += 1
self.gen_steps += 1

# Check if checkpoint directory exists for current step
checkpoint_dir = os.path.join(self.config.trainer.default_local_dir, f"global_step_{self.global_steps}")
if not os.path.exists(checkpoint_dir):
    # Create final checkpoint if missing
    timing_raw = defaultdict(float)
    with marked_timer("save_checkpoint", timing_raw, "green"):
        self._save_checkpoint()
metrics = {f"timing/{k}": v for k, v in timing_raw.items()}
logger.log(data=metrics, step=self.global_steps)

接下来我们分析一下 dapo 中 reward manager 的实现逻辑。与 PPO 相比,其核心差异在于引入了 overlong_buffer 机制,并基于响应长度计算相应的奖励值。

[此处为图片1]

源码路径位于:
verl/verl/workers/reward_manager/dapo.py

#################################################################
# 通过 dapo 注册 DAPORewardManager,后续可通过
# reward_manager_cls = get_reward_manager_cls(reward_manager_name) 获取该类
#################################################################
@register("dapo")
class DAPORewardManager(AbstractRewardManager):
    """The reward manager."""
    def __init__(
        self,
        tokenizer,
        num_examine,
        compute_score=None,
        reward_fn_key="data_source",
        max_resp_len=None,
        overlong_buffer_cfg=None,
    ) -> None:
        self.tokenizer = tokenizer
        self.num_examine = num_examine  # 表示解码后输出到控制台的响应批次数
        self.compute_score = compute_score or default_compute_score
        self.reward_fn_key = reward_fn_key
        self.overlong_buffer_cfg = overlong_buffer_cfg
        self.max_resp_len = max_resp_len

        if self.overlong_buffer_cfg is not None:
            assert self.max_resp_len is not None, (
                f"max_resp_len 必须设置,当使用 {overlong_buffer_cfg=} 时不能为 None"
            )
            assert self.max_resp_len >= self.overlong_buffer_cfg.len, (
                "max_resp_len 需大于等于 overlong_buffer 的长度"
            )

这是 DAPO reward manager 的主要实现部分。其关键功能集中在 __call__ 方法中,用于根据输入数据动态生成奖励信号。

#################################################################
# DAPO reward manager 的核心调用方法
#################################################################
def __call__(self, data: DataProto, return_dict: bool = False):
    """此函数将根据可用的数据集逐步扩展功能"""
    # 若 batch 中已包含 rm_scores,则直接返回;否则使用 rm_score_fn 计算
    if "rm_scores" in data.batch.keys():
        if return_dict:
            reward_extra_keys = data.meta_info.get("reward_extra_keys", [])
            reward_extra_info = {
                key: data.non_tensor_batch[key] for key in reward_extra_keys
            }
            return {
                "reward_tensor": data.batch["rm_scores"],
                "reward_extra_info": reward_extra_info
            }
        else:
            return data.batch["rm_scores"]
reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32)
reward_extra_info = defaultdict(list)
already_print_data_sources = {}

for i in range(len(data)):
    data_item = data[i]  # DataProtoItem
    prompt_ids = data_item.batch["prompts"]
    prompt_length = prompt_ids.shape[-1]

    # 注意:prompt_ids 使用的是左填充方式
    # 而 response_ids 则采用右填充
    valid_prompt_length = data_item.batch["attention_mask"][:prompt_length].sum()
    valid_prompt_ids = prompt_ids[-valid_prompt_length:]
    
    response_ids = data_item.batch["responses"]
    valid_response_length = data_item.batch["attention_mask"][prompt_length:].sum()
    valid_response_ids = response_ids[:valid_response_length]

    # 进行解码操作
    prompt_str = self.tokenizer.decode(valid_prompt_ids, skip_special_tokens=True)
    response_str = self.tokenizer.decode(valid_response_ids, skip_special_tokens=True)

    eos_token = self.tokenizer.eos_token
    if response_str.endswith(eos_token):
        response_str = response_str[: -len(eos_token)]

    ground_truth = data_item.non_tensor_batch["reward_model"]["ground_truth"]
    data_source = data_item.non_tensor_batch[self.reward_fn_key]
    extra_info = data_item.non_tensor_batch.get("extra_info", {})
    
    rollout_reward_scores = data_item.non_tensor_batch.get("reward_scores", {})
    extra_info["rollout_reward_scores"] = rollout_reward_scores

    result = self.compute_score(
        data_source=data_source,
        solution_str=response_str,
        ground_truth=ground_truth,
        extra_info=extra_info,
    )

    score: float
    if isinstance(result, dict):
        score = result["score"]
        # 保存包括原始 reward 在内的所有信息
        for key, value in result.items():
            reward_extra_info[key].append(value)
    else:
        score = result
        reward_extra_info["acc"].append(score)

    reward = score

    # 开始处理 overlong reward 的计算逻辑
    if self.overlong_buffer_cfg.enable:
        overlong_buffer_len = self.overlong_buffer_cfg.len
expected_len = self.max_resp_len - overlong_buffer_len  
exceed_len = valid_response_length - expected_len  
overlong_penalty_factor = self.overlong_buffer_cfg.penalty_factor  
overlong_reward = min(-exceed_len / overlong_buffer_len * overlong_penalty_factor, 0)  
reward += overlong_reward

if self.overlong_buffer_cfg.log:  
    reward_extra_info["overlong_reward"].append(overlong_reward)  
    reward_extra_info["overlong"].append(overlong_reward < 0)

reward_tensor[i, valid_response_length - 1] = reward

if data_source not in already_print_data_sources:  
    already_print_data_sources[data_source] = 0

if already_print_data_sources[data_source] < self.num_examine:  
    already_print_data_sources[data_source] += 1  
    print("[prompt]", prompt_str)  
    print("[response]", response_str)  
    print("[ground_truth]", ground_truth)  

    if isinstance(result, dict):  
        for key, value in result.items():  
            print(f"[{key}]", value)  
    else:  
        print("[score]", score)

if return_dict:  
    return {  
        "reward_tensor": reward_tensor,  
        "reward_extra_info": reward_extra_info,  
    }  
else:  
    return reward_tensor
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:深度学习 神经网络 课后习题 网络基础 神经网
相关提问:深度学习习题
相关内容:深度学习习题课后

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
jg-xs1
拉您进交流群
GMT+8, 2025-12-6 03:13