diff --git a/examples/qwen2.5-vl-7B-math/rlvr_math_lora.yaml b/examples/qwen2.5-vl-7B-math/rlvr_math_lora.yaml deleted file mode 100644 index c8ab855e..00000000 --- a/examples/qwen2.5-vl-7B-math/rlvr_math_lora.yaml +++ /dev/null @@ -1,143 +0,0 @@ -defaults: - - ../config/deepspeed_zero@_here_ - - ../config/deepspeed_zero2@_here_ - - ../config/deepspeed_zero3@_here_ - - ../config/deepspeed_zero3_cpuoffload@_here_ - -hydra: - run: - dir: . - output_subdir: null - -exp_name: "qwen2_5_vl_7B_math_config" -seed: 42 -logging_dir: ./output/logs -output_dir: ./output - -checkpoint_config: - type: file_system - output_dir: /data/cpfs_0/rl_examples/models/${exp_name} - -track_with: tensorboard -tracker_kwargs: - log_dir: /data/oss_bucket_0/shidie/llm/tensorboard/roll_exp/rlvr_math - -save_steps: 20 -logging_steps: 1 -eval_steps: 1 -resume_from_checkpoint: false - -rollout_batch_size: 512 -num_return_sequences_in_group: 8 -is_num_return_sequences_expand: true -prompt_length: 1024 -response_length: 4096 -generate_opt_level: 0 - -ppo_epochs: 1 -value_clip: 0.5 -reward_clip: 10 -advantage_clip: 10.0 -whiten_advantages: false -init_kl_coef: 0.0 -adv_estimator: "grpo" -use_kl_loss: true -kl_loss_coef: 1.0e-2 - -# lora -lora_target: o_proj,q_proj,k_proj,v_proj -lora_rank: 32 -lora_alpha: 32 - -pretrain: Qwen/Qwen2.5-VL-7B-Instruct - -actor_train: - model_args: - attn_implementation: fa2 - # Recomputed tensor size does not match for LoRA with Zero3 when activating checkpointing, See https://github.com/huggingface/transformers/issues/34928 for details - disable_gradient_checkpointing: true - dtype: bf16 - lora_target: ${lora_target} - lora_rank: ${lora_rank} - lora_alpha: ${lora_alpha} - model_type: ~ - training_args: - learning_rate: 1.0e-5 - weight_decay: 1.0e-2 - per_device_train_batch_size: 1 - gradient_accumulation_steps: 64 - warmup_steps: 0 - num_train_epochs: 50 - data_args: - template: qwen2-vl - # use leonardPKU/GEOQA_R1V_Train_8K as dataset - # download to ./data/geoqa_data from https://huggingface.co/datasets/leonardPKU/GEOQA_R1V_Train_8K - file_name: ./data/geoqa_data/ - dataset_dir: ./ - preprocessing_num_workers: 16 - strategy_args: - strategy_name: deepspeed_train - strategy_config: ${deepspeed_zero3} - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -actor_infer: - model_args: - disable_gradient_checkpointing: true - dtype: bf16 - lora_target: ${lora_target} - lora_rank: ${lora_rank} - lora_alpha: ${lora_alpha} - generating_args: - max_new_tokens: ${response_length} - top_p: 0.99 - top_k: 100 - num_beams: 1 - temperature: 0.99 - num_return_sequences: ${num_return_sequences_in_group} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: vllm - strategy_config: - gpu_memory_utilization: 0.8 - block_size: 16 - # mm preprocessor cache mismatch error occured in vllm084 - enable_prefix_caching: false - num_gpus_per_worker: 1 - device_mapping: list(range(0,16)) - infer_batch_size: 32 - -reference: - model_args: - attn_implementation: fa2 - disable_gradient_checkpointing: true - dtype: bf16 - # In transformers>=4.50.0, if model.from_pretrained with auto device_map, None - # tp_plan (and tp_plan of model is not None) and WORLD_SIZE>1, TP would be used. - # Thus using device_map=0 to disable HF transformers parallel, otherwise use - # zero3 for reference model - device_map: "cuda:0" - model_type: ~ - data_args: - template: qwen2-vl - strategy_args: - strategy_name: hf_infer - strategy_config: ~ - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -rewards: - math_rule: - # vl pipeline support MathRuleRewardWorker only, at present. - # Support for rewards in other domains will be retained for future implementation. - worker_cls: roll.pipeline.rlvr.rewards.math_rule_reward_worker.MathRuleRewardWorker - model_args: - model_name_or_path: ${pretrain} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: hf_infer - strategy_config: ~ - world_size: 16 - infer_batch_size: 4 \ No newline at end of file diff --git a/examples/qwen2.5-vl-7B-math/rlvr_math_megatron.yaml b/examples/qwen2.5-vl-7B-math/rlvr_math_megatron.yaml deleted file mode 100644 index 0d3c45a0..00000000 --- a/examples/qwen2.5-vl-7B-math/rlvr_math_megatron.yaml +++ /dev/null @@ -1,153 +0,0 @@ -defaults: - - ../config/deepspeed_zero@_here_ - - ../config/deepspeed_zero2@_here_ - - ../config/deepspeed_zero3@_here_ - - ../config/deepspeed_zero3_cpuoffload@_here_ - -hydra: - run: - dir: . - output_subdir: null - -exp_name: "qwen2_5_vl_7B_math_config" -seed: 42 -logging_dir: ./output/logs -output_dir: ./output - -checkpoint_config: - type: file_system - output_dir: /data/cpfs_0/yuzhao/models - -track_with: tensorboard -tracker_kwargs: - log_dir: /data/oss_bucket_0/yuzhao/llm/tensorboard - -save_steps: 20 -logging_steps: 1 -eval_steps: 1 -resume_from_checkpoint: false - -rollout_batch_size: 512 -num_return_sequences_in_group: 8 -is_num_return_sequences_expand: true -prompt_length: 1024 -response_length: 4096 -generate_opt_level: 0 - -ppo_epochs: 1 -value_clip: 0.5 -reward_clip: 10 -advantage_clip: 10.0 -whiten_advantages: false -init_kl_coef: 0.0 -adv_estimator: "grpo" -use_kl_loss: true -kl_loss_coef: 1.0e-2 - -pretrain: Qwen/Qwen2.5-VL-7B-Instruct - -#validation: -# data_args: -# template: qwen2-vl -# file_name: ./data/geoqa_data/ -# dataset_dir: ./ -# generating_args: -# max_new_tokens: ${response_length} -# top_p: 0.99 -# top_k: 100 -# num_beams: 1 -# temperature: 0.99 -# num_return_sequences: 1 -# eval_steps: 10 - -actor_train: - model_args: - disable_gradient_checkpointing: false - dtype: bf16 - model_type: ~ - training_args: - learning_rate: 1.0e-6 - weight_decay: 1.0e-2 - per_device_train_batch_size: 4 - gradient_accumulation_steps: 256 - warmup_steps: 0 - num_train_epochs: 50 - data_args: - template: qwen2-vl - # use leonardPKU/GEOQA_R1V_Train_8K as dataset - # download to ./data/geoqa_data from https://huggingface.co/datasets/leonardPKU/GEOQA_R1V_Train_8K - file_name: ./data/geoqa_data/ - dataset_dir: ./ - preprocessing_num_workers: 16 - strategy_args: - strategy_name: megatron_train - strategy_config: - sequence_parallel: true - tensor_model_parallel_size: 4 - context_parallel_size: 1 - expert_model_parallel_size: 1 - pipeline_model_parallel_size: 1 - overlap_grad_reduce: true - use_distributed_optimizer: true - bf16: true - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -actor_infer: - model_args: - disable_gradient_checkpointing: true - dtype: bf16 - generating_args: - max_new_tokens: ${response_length} - top_p: 0.99 - top_k: 100 - num_beams: 1 - temperature: 0.99 - num_return_sequences: ${num_return_sequences_in_group} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: vllm - strategy_config: - gpu_memory_utilization: 0.9 - block_size: 16 - disable_mm_preprocessor_cache: true # RAM leak: https://github.com/vllm-project/vllm/issues/15085 - # mm preprocessor cache mismatch error occured in vllm084 - enable_prefix_caching: false - num_gpus_per_worker: 1 - device_mapping: list(range(0,16)) - infer_batch_size: 32 - -reference: - model_args: - disable_gradient_checkpointing: true - dtype: bf16 - model_type: ~ - data_args: - template: qwen2-vl - strategy_args: - strategy_name: megatron_infer - strategy_config: - sequence_parallel: true - tensor_model_parallel_size: 1 - context_parallel_size: 1 - pipeline_model_parallel_size: 1 - expert_model_parallel_size: 1 - bf16: true - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -rewards: - # vl pipeline support MathRuleRewardWorker only, at present. - # Support for rewards in other domains will be retained for future implementation. - math_rule: - worker_cls: roll.pipeline.rlvr.rewards.math_rule_reward_worker.MathRuleRewardWorker - model_args: - model_name_or_path: ${pretrain} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: hf_infer - strategy_config: ~ - world_size: 16 - infer_batch_size: 4 diff --git a/examples/qwen2.5-vl-7B-math/rlvr_math_megatron_amd.yaml b/examples/qwen2.5-vl-7B-math/rlvr_math_megatron_amd.yaml deleted file mode 100644 index 89f26657..00000000 --- a/examples/qwen2.5-vl-7B-math/rlvr_math_megatron_amd.yaml +++ /dev/null @@ -1,150 +0,0 @@ -defaults: - - ../config/deepspeed_zero@_here_ - - ../config/deepspeed_zero2@_here_ - - ../config/deepspeed_zero3@_here_ - - ../config/deepspeed_zero3_cpuoffload@_here_ - -hydra: - run: - dir: . - output_subdir: null - -exp_name: "qwen2_5_vl_7B_math_config" -seed: 42 -logging_dir: ./output/logs -output_dir: ./output - -checkpoint_config: - type: file_system - output_dir: /data/cpfs_0/yuzhao/models - -track_with: tensorboard -tracker_kwargs: - log_dir: /data/oss_bucket_0/yuzhao/llm/tensorboard - -save_steps: 20 -logging_steps: 1 -eval_steps: 1 -resume_from_checkpoint: false - -rollout_batch_size: 512 -num_return_sequences_in_group: 8 -is_num_return_sequences_expand: true -prompt_length: 1024 -response_length: 4096 -generate_opt_level: 0 - -ppo_epochs: 1 -value_clip: 0.5 -reward_clip: 10 -advantage_clip: 10.0 -whiten_advantages: false -init_kl_coef: 0.0 -adv_estimator: "grpo" -use_kl_loss: true -kl_loss_coef: 1.0e-2 - -pretrain: Qwen/Qwen2.5-VL-7B-Instruct - -#validation: -# data_args: -# template: qwen2-vl -# file_name: ./data/geoqa_data/ -# dataset_dir: ./ -# generating_args: -# max_new_tokens: ${response_length} -# top_p: 0.99 -# top_k: 100 -# num_beams: 1 -# temperature: 0.99 -# num_return_sequences: 1 - -actor_train: - model_args: - disable_gradient_checkpointing: false - dtype: bf16 - model_type: ~ - training_args: - learning_rate: 1.0e-6 - weight_decay: 1.0e-2 - per_device_train_batch_size: 4 - gradient_accumulation_steps: 256 - warmup_steps: 0 - num_train_epochs: 50 - data_args: - template: qwen2-vl - # use leonardPKU/GEOQA_R1V_Train_8K as dataset - # download to ./data/geoqa_data from https://huggingface.co/datasets/leonardPKU/GEOQA_R1V_Train_8K - file_name: ./data/geoqa_data/ - dataset_dir: ./ - preprocessing_num_workers: 16 - strategy_args: - strategy_name: megatron_train - strategy_config: - sequence_parallel: true - tensor_model_parallel_size: 4 - context_parallel_size: 1 - expert_model_parallel_size: 1 - pipeline_model_parallel_size: 1 - overlap_grad_reduce: true - use_distributed_optimizer: true - bf16: true - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -actor_infer: - model_args: - disable_gradient_checkpointing: true - dtype: bf16 - generating_args: - max_new_tokens: ${response_length} - top_p: 0.99 - top_k: 100 - num_beams: 1 - temperature: 0.99 - num_return_sequences: ${num_return_sequences_in_group} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: vllm - strategy_config: - gpu_memory_utilization: 0.6 - block_size: 16 - disable_mm_preprocessor_cache: true # RAM leak: https://github.com/vllm-project/vllm/issues/15085 - num_gpus_per_worker: 1 - device_mapping: list(range(0,16)) - infer_batch_size: 32 - -reference: - model_args: - disable_gradient_checkpointing: true - dtype: bf16 - model_type: ~ - data_args: - template: qwen2-vl - strategy_args: - strategy_name: megatron_infer - strategy_config: - sequence_parallel: true - tensor_model_parallel_size: 1 - context_parallel_size: 1 - pipeline_model_parallel_size: 1 - expert_model_parallel_size: 1 - bf16: true - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -rewards: - # vl pipeline support MathRuleRewardWorker only, at present. - # Support for rewards in other domains will be retained for future implementation. - math_rule: - worker_cls: roll.pipeline.rlvr.rewards.math_rule_reward_worker.MathRuleRewardWorker - model_args: - model_name_or_path: ${pretrain} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: hf_infer - strategy_config: ~ - world_size: 16 - infer_batch_size: 4 diff --git a/examples/qwen2.5-vl-7B-math/rlvr_math_zero3.yaml b/examples/qwen2.5-vl-7B-math/rlvr_math_zero3.yaml deleted file mode 100644 index 2499ab15..00000000 --- a/examples/qwen2.5-vl-7B-math/rlvr_math_zero3.yaml +++ /dev/null @@ -1,131 +0,0 @@ -defaults: - - ../config/deepspeed_zero@_here_ - - ../config/deepspeed_zero2@_here_ - - ../config/deepspeed_zero3@_here_ - - ../config/deepspeed_zero3_cpuoffload@_here_ - -hydra: - run: - dir: . - output_subdir: null - -exp_name: "qwen2_5_vl_7B_math_config" -seed: 42 -logging_dir: ./output/logs -output_dir: ./output - -checkpoint_config: - type: file_system - output_dir: /data/cpfs_0/yuzhao/models - -track_with: tensorboard -tracker_kwargs: - log_dir: /data/oss_bucket_0/yuzhao/llm/tensorboard - -save_steps: 20 -logging_steps: 1 -eval_steps: 1 -resume_from_checkpoint: false - -rollout_batch_size: 512 -num_return_sequences_in_group: 8 -is_num_return_sequences_expand: true -prompt_length: 1024 -response_length: 4096 -generate_opt_level: 0 - -ppo_epochs: 1 -value_clip: 0.5 -reward_clip: 10 -advantage_clip: 10.0 -whiten_advantages: false -init_kl_coef: 0.0 -adv_estimator: "grpo" -use_kl_loss: true -kl_loss_coef: 1.0e-2 - -pretrain: Qwen/Qwen2.5-VL-7B-Instruct - -actor_train: - model_args: - attn_implementation: fa2 - disable_gradient_checkpointing: false - dtype: bf16 - model_type: ~ - training_args: - learning_rate: 1.0e-6 - weight_decay: 1.0e-2 - per_device_train_batch_size: 4 - gradient_accumulation_steps: 64 - warmup_steps: 0 - num_train_epochs: 50 - data_args: - template: qwen2-vl - # use leonardPKU/GEOQA_R1V_Train_8K as dataset - # download to ./data/geoqa_data from https://huggingface.co/datasets/leonardPKU/GEOQA_R1V_Train_8K - file_name: ./data/geoqa_data/ - dataset_dir: ./ - preprocessing_num_workers: 16 - strategy_args: - strategy_name: deepspeed_train - strategy_config: ${deepspeed_zero3} - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -actor_infer: - model_args: - disable_gradient_checkpointing: true - dtype: bf16 - generating_args: - max_new_tokens: ${response_length} - top_p: 0.99 - top_k: 100 - num_beams: 1 - temperature: 0.99 - num_return_sequences: ${num_return_sequences_in_group} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: vllm - strategy_config: - gpu_memory_utilization: 0.9 - block_size: 16 - # mm preprocessor cache mismatch error occured in vllm084 - enable_prefix_caching: false - num_gpus_per_worker: 1 - device_mapping: list(range(0,16)) - infer_batch_size: 32 - -reference: - model_args: - attn_implementation: fa2 - disable_gradient_checkpointing: true - dtype: bf16 - # In transformers>=4.50.0, if model.from_pretrained with auto device_map, None - # tp_plan (and tp_plan of model is not None) and WORLD_SIZE>1, TP would be used. - # Thus using device_map=0 to disable HF transformers parallel, otherwise use - # zero3 for reference model - device_map: "cuda:0" - model_type: ~ - data_args: - template: qwen2-vl - strategy_args: - strategy_name: hf_infer - strategy_config: ~ - device_mapping: list(range(0,16)) - infer_batch_size: 8 - -rewards: - math_rule: - # vl pipeline support MathRuleRewardWorker only, at present. - # Support for rewards in other domains will be retained for future implementation. - worker_cls: roll.pipeline.rlvr.rewards.math_rule_reward_worker.MathRuleRewardWorker - model_args: - model_name_or_path: ${pretrain} - data_args: - template: qwen2-vl - strategy_args: - strategy_name: hf_infer - strategy_config: ~ - world_size: 16 - infer_batch_size: 4 diff --git a/examples/qwen2.5-vl-7B-math/run_vl_rlvr_pipeline.sh b/examples/qwen2.5-vl-7B-math/run_vl_rlvr_pipeline.sh deleted file mode 100755 index 7b95b652..00000000 --- a/examples/qwen2.5-vl-7B-math/run_vl_rlvr_pipeline.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -set +x - -CONFIG_PATH=$(basename $(dirname $0)) -python examples/start_rlvr_vlmath_pipeline.py --config_path $CONFIG_PATH --config_name rlvr_math_megatron diff --git a/examples/qwen3-vl-4B-rlvr_megatron/rlvr_megatron_80G.yaml b/examples/qwen3-vl-4B-rlvr_megatron/rlvr_megatron_80G.yaml index 0e1900e6..76a4166b 100644 --- a/examples/qwen3-vl-4B-rlvr_megatron/rlvr_megatron_80G.yaml +++ b/examples/qwen3-vl-4B-rlvr_megatron/rlvr_megatron_80G.yaml @@ -73,7 +73,7 @@ actor_train: learning_rate: 1.0e-6 weight_decay: 1.0e-2 per_device_train_batch_size: 2 - gradient_accumulation_steps: 64 + gradient_accumulation_steps: 128 warmup_steps: 0 num_train_epochs: 50 data_args: diff --git a/examples/start_rlvr_vlmath_pipeline.py b/examples/start_rlvr_vlmath_pipeline.py deleted file mode 100644 index cb7cb41b..00000000 --- a/examples/start_rlvr_vlmath_pipeline.py +++ /dev/null @@ -1,34 +0,0 @@ -import argparse - -from dacite import from_dict -from hydra.experimental import compose, initialize -from omegaconf import OmegaConf - -from roll.distributed.scheduler.initialize import init -from roll.pipeline.rlvr.rlvr_math_vlm_pipeline import RLVRConfig, RLVRMathVLMPipeline - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--config_path", help="The path of the main configuration file", default="config") - parser.add_argument( - "--config_name", help="The name of the main configuration file (without extension).", default="sppo_config" - ) - args = parser.parse_args() - - initialize(config_path=args.config_path, job_name="app") - cfg = compose(config_name=args.config_name) - - print(OmegaConf.to_yaml(cfg, resolve=True)) - - ppo_config = from_dict(data_class=RLVRConfig, data=OmegaConf.to_container(cfg, resolve=True)) - - init() - - pipeline = RLVRMathVLMPipeline(pipeline_config=ppo_config) - - pipeline.run() - - -if __name__ == "__main__": - main() diff --git a/roll/distributed/strategy/deepspeed_strategy.py b/roll/distributed/strategy/deepspeed_strategy.py index f81fb19d..58b7e1b4 100644 --- a/roll/distributed/strategy/deepspeed_strategy.py +++ b/roll/distributed/strategy/deepspeed_strategy.py @@ -159,6 +159,9 @@ def forward_step( position_ids = data.batch["position_ids"] forward_args = data.meta_info.get("forward_args", {}) if position_ids.dim() == 3: + # same as megatron to be compatible with fsdp packing which change position_ids.size(1) to 4 + if position_ids.size(1) == 4: + position_ids = position_ids[:, 1:, :].contiguous() # (bsz, 4, seqlen) -> (bsz, 3, seqlen) # qwen2vl mrope, maybe use a placeholder and let model generate position_ids position_ids = position_ids.transpose(0, 1) # (bsz, 3, seqlen) -> (3, bsz, seqlen) if "multi_modal_inputs" in data.non_tensor_batch: diff --git a/roll/pipeline/rlvr/rlvr_math_vlm_pipeline.py b/roll/pipeline/rlvr/rlvr_math_vlm_pipeline.py deleted file mode 100644 index fcc12e0d..00000000 --- a/roll/pipeline/rlvr/rlvr_math_vlm_pipeline.py +++ /dev/null @@ -1,699 +0,0 @@ -import json -import os -import uuid -from typing import Any, Dict, List, Optional - -import ray -import torch -import datasets -import PIL.Image as Image -from transformers import ProcessorMixin, AutoConfig -from transformers.image_utils import load_images -from transformers.models.qwen2_vl.image_processing_qwen2_vl import smart_resize -from datasets import load_dataset, load_from_disk -from codetiming import Timer -from ray.util.timer import _Timer -from torch.utils.data import DataLoader -from tqdm import tqdm -import numpy as np - -from roll.datasets.collator import DataCollatorWithPaddingForMM -from roll.distributed.executor.cluster import Cluster -from roll.distributed.scheduler.generate_scheduler import GenerateScheduler -from roll.distributed.scheduler.protocol import DataProto -from roll.models.model_providers import default_processor_provider -from roll.pipeline.base_pipeline import BasePipeline -from roll.pipeline.rlvr.rlvr_config import RLVRConfig -from roll.utils.checkpoint_manager import download_model -from roll.utils.constants import GENERATE_SCHEDULER_NAME, RAY_NAMESPACE -from roll.utils.functionals import ( - apply_kl_penalty, - compute_advantage, - reduce_metrics, - masked_mean, - RunningMoments, - compute_clip_fraction, - group_reward_norm, - expand_to_token_level, - get_sample_level_mask -) -from roll.utils.kl_controller import get_kl_controller -from roll.utils.logging import get_logger - -from .rlvr_vlm_pipeline import format_prompt, process_images, get_extra_data_provider - -logger = get_logger() - - -def is_lora_training(pipeline_config: RLVRConfig) -> bool: - if pipeline_config.actor_train.model_args.lora_target is None: - return False - assert pipeline_config.actor_train.strategy_args.strategy_name == "deepspeed_train", ( - "LoRA only supports deepspeed_train" - ) - return True - - -def encode_function(data_i, processor, prompt_key, answer_key, image_key): - image_flag = [True] * len(data_i[prompt_key]) - image_list = [] - for idx, image in enumerate(data_i[image_key]): - if image is None: - image_flag[idx] = False - try: - image_out = load_images(image if isinstance(image, (list, tuple)) else [image], timeout=None) - except Exception as e: - image_out = [Image.new("RGB", (224, 224), (255, 255, 255))] - logger.error(f"Failed to get image due to {e}") - # since infer-image use pil image as input while train-engine use - # processed data, process image here to make them use same image - image_out = process_images(image_out, processor) - image_list.append(image_out) - text_list = [] - for idx, instruct in enumerate(data_i[prompt_key]): - # provide prompt_image_token if image_token in prompt - text = format_prompt(instruct, processor, use_image=image_flag[idx], prompt_image_token=None) - text_list.append(text) - encodings = { - # for area seperated validation, no need currently - "tag": [""] * len(data_i[prompt_key]), - "prompt": text_list, - # no need to extract currently, answer can be by math_verify.parse - "ground_truth": [solution for solution in data_i[answer_key]], - "image": image_list, - "image_flag": image_flag, - } - return encodings - - -FILEEXT2TYPE = { - "arrow": "arrow", - "csv": "csv", - "json": "json", - "jsonl": "json", - "parquet": "parquet", - "txt": "text", -} - - -def get_dataset(data_args, encode_function, processor, features=None, get_eval=False): - cache_path = getattr(data_args, "cache_path", None) - if cache_path: - cache_path = os.path.join(cache_path, "val" if get_eval else "train") - if cache_path and os.path.exists(cache_path): - dataset = load_from_disk(cache_path) - return dataset - data_path = None - data_name = data_args.file_name - data_files = [] - dataset_dir = getattr(data_args, "dataset_dir", ".") - local_path: str = os.path.join(dataset_dir, data_name) - if os.path.isdir(local_path): - for file_name in os.listdir(local_path): - data_files.append(os.path.join(local_path, file_name)) - if data_path is None: - data_path = FILEEXT2TYPE.get(file_name.split(".")[-1], None) - elif data_path != FILEEXT2TYPE.get(file_name.split(".")[-1], None): - raise ValueError("File types should be identical.") - elif os.path.isfile(local_path): # is file - data_files.append(local_path) - data_path = FILEEXT2TYPE.get(local_path.split(".")[-1], None) - else: - raise ValueError("File not found.") - dataset = load_dataset(path=data_path, data_files=data_files)["train"] - remove_columns = list(dataset.features.keys() - features.keys()) - # TODO: add fileds into config dataclass, actually these config attrs cannot - # be used temporarily and equal to hard-code - prompt_key = getattr(data_args, "prompt") if getattr(data_args, "prompt", None) else "problem" - answer_key = getattr(data_args, "response") if getattr(data_args, "response", None) else "solution" - image_key = getattr(data_args, "image") if getattr(data_args, "image", None) else "image" - print(f"Begin : {dataset}") - dataset = dataset.map( - lambda data: encode_function(data, processor, prompt_key, answer_key, image_key), - batched=True, - batch_size=100, - num_proc=32, - features=features, - remove_columns=remove_columns, - desc="Encoding dataset", - ) - print(f"Encoding: {dataset}") - if cache_path: - dataset.save_to_disk(cache_path) - return dataset - - -def get_dataloader(dataset, batch_size, data_collator): - dataloader = DataLoader( - dataset=dataset, - batch_size=batch_size, - shuffle=True, - drop_last=True, - num_workers=4, # larger shm for bigger num_workers - collate_fn=data_collator, - ) - return dataloader - - -class RLVRMathVLMPipeline(BasePipeline): - """This pipeline is deprecated and use `RLVRPipeline` instead""" - - def __init__(self, pipeline_config: RLVRConfig): - logger.warning(f"`{self.__class__.__name__}` is deprecated, and use `RLVRPipeline` instead") - super().__init__(pipeline_config) - self.pipeline_config = pipeline_config - self.is_lora = is_lora_training(self.pipeline_config) - - self.processor = default_processor_provider(self.pipeline_config.actor_train.model_args) - # set max_pixels to avoid image token num is larger than prompt length - self.processor.image_processor.max_pixels, self.processor.image_processor.min_pixels = ( - getattr(self.pipeline_config.actor_train.model_args, "max_pixels", 768 * 768), - getattr(self.pipeline_config.actor_train.model_args, "min_pixels", 56 * 56), - ) - self.tokenizer = self.processor.tokenizer - self.tokenizer.padding_side = "left" - # regularized data filed - features = datasets.Features( - { - # only support single image temporarily since sglang usage - # "image": datasets.Image(decode=True), - "image": datasets.Sequence(feature=datasets.Image(decode=True)), - "prompt": datasets.Value("string"), - "ground_truth": datasets.Value("string"), - # for text and multi-modal mixed data usage, indicating valid image - "image_flag": datasets.Value("bool"), - # for area seperated validation, dummy currently - "tag": datasets.Value("string"), - } - ) - dataset = get_dataset( - self.pipeline_config.actor_train.data_args, encode_function, self.processor, features, get_eval=False - ) - val_dataset = None - if self.pipeline_config.validation and self.pipeline_config.validation.data_args: - val_dataset = get_dataset( - self.pipeline_config.validation.data_args, encode_function, self.processor, features, get_eval=True - ) - - data_collator = DataCollatorWithPaddingForMM( - tokenizer=self.tokenizer, - processor=self.processor, - extra_data_provider=get_extra_data_provider( - self.pipeline_config.actor_train.model_args.model_name_or_path, processor=self.processor - ), - max_length=self.pipeline_config.prompt_length, - padding="max_length", - ) - self.dataloader = get_dataloader(dataset, self.pipeline_config.rollout_batch_size, data_collator) - self.val_dataloader = None - if val_dataset: - self.val_dataloader = get_dataloader(val_dataset, len(val_dataset), data_collator) - max_steps = len(self.dataloader) * self.pipeline_config.actor_train.training_args.num_train_epochs - self.pipeline_config.set_max_steps(max_steps=max_steps) - - self.actor_train: Any = Cluster( - name=self.pipeline_config.actor_train.name, - worker_cls=self.pipeline_config.actor_train.worker_cls, - resource_manager=self.resource_manager, - worker_config=self.pipeline_config.actor_train, - ) - self.actor_infer: Any = Cluster( - name=self.pipeline_config.actor_infer.name, - worker_cls=self.pipeline_config.actor_infer.worker_cls, - resource_manager=self.resource_manager, - worker_config=self.pipeline_config.actor_infer, - ) - # use unwrapped model as reference for lora training - if not self.is_lora and self.pipeline_config.enable_reference: - self.reference: Any = Cluster( - name=self.pipeline_config.reference.name, - worker_cls=self.pipeline_config.reference.worker_cls, - resource_manager=self.resource_manager, - worker_config=self.pipeline_config.reference, - ) - self.rewards: Dict[str, Any] = { - key: Cluster( - name=f"reward-{key}", - worker_cls=worker_config.worker_cls, - resource_manager=self.resource_manager, - worker_config=worker_config, - ) - for key, worker_config in self.pipeline_config.rewards.items() - } - self.reward: Any = self.rewards[list(self.rewards.keys())[0]] - if self.pipeline_config.adv_estimator == "gae": - self.critic: Any = Cluster( - name=self.pipeline_config.critic.name, - worker_cls=self.pipeline_config.critic.worker_cls, - resource_manager=self.resource_manager, - worker_config=self.pipeline_config.critic, - ) - - self.generate_scheduler = GenerateScheduler.options( - name=f"{GENERATE_SCHEDULER_NAME}_{self.actor_infer.cluster_name}", - get_if_exists=True, - namespace=RAY_NAMESPACE, - ).remote() - - self.kl_ctrl = get_kl_controller( - init_kl_coef=self.pipeline_config.init_kl_coef, - target_kl=self.pipeline_config.target_kl, - kl_horizon=self.pipeline_config.kl_horizon, - ) - - refs = [] - refs.extend(self.actor_infer.initialize(pipeline_config=self.pipeline_config, blocking=False)) - ray.get(refs) - - refs = [] - if not self.is_lora and self.pipeline_config.enable_reference: - refs.extend(self.reference.initialize(pipeline_config=self.pipeline_config, blocking=False)) - refs.extend(self.reward.initialize(pipeline_config=self.pipeline_config, blocking=False)) - ray.get(refs) - - refs: List[ray.ObjectRef] = [] - refs.extend(self.actor_train.initialize(pipeline_config=self.pipeline_config, blocking=False)) - if self.pipeline_config.adv_estimator == "gae": - refs.extend(self.critic.initialize(pipeline_config=self.pipeline_config, blocking=False)) - ray.get(refs) - - self.set_model_update_pair( - src_cluster=self.actor_train, - tgt_cluster=self.actor_infer, - frequency=self.pipeline_config.actor_train.model_update_frequency, - ) - - if self.pipeline_config.adv_estimator == "gae": - self.set_checkpoint_clusters(self.actor_train, self.critic) - else: - self.set_checkpoint_clusters(self.actor_train) - - self.running = RunningMoments() - - @torch.no_grad() - def run(self): - global_step = 0 - - # throughput for tokens per second - tps_timer = _Timer(window_size=5) - actor_infer_timer = _Timer(window_size=5) - actor_infer_response_timer = _Timer(window_size=5) - actor_train_timer = _Timer(window_size=5) - - for epoch in range(int(self.pipeline_config.actor_train.training_args.num_train_epochs)): - logger.info(f"epoch {epoch} start...") - for batch_dict in tqdm(self.dataloader): - if global_step <= self.state.step: - global_step += 1 - continue - - logger.info(f"pipeline step {global_step} start...") - - metrics = {} - with tps_timer: - if self.pipeline_config.adv_estimator == "gae": - self.critic.offload_states(blocking=True) - self.actor_train.offload_states(blocking=True) - model_update_metrics: Dict = self.model_update(global_step) - metrics.update(model_update_metrics) - - if self.val_dataloader and global_step % self.pipeline_config.eval_steps == 0: - metrics.update(self.val()) - - batch_dict: Dict - batch: DataProto = DataProto.from_single_dict(batch_dict) - batch.meta_info = { - "global_step": global_step, - # mark here to make megatron get_data_input broadcast with non_batch_tensor - "_broadcast_non_tensor_batch": True, - } - - with actor_infer_timer, actor_infer_response_timer: - # donot support hf/deepspeed infer generate which use - # multi_modal_inputs tensors - gen_batch = batch.pop( - batch_keys=["input_ids", "attention_mask", "position_ids"], - non_tensor_batch_keys=( - ["multi_modal_data"] if "multi_modal_data" in batch.non_tensor_batch else [] - ), - ) - gen_batch.meta_info = {"global_step": global_step} - generate_output: DataProto = ray.get( - self.generate_scheduler.generate.remote( - data=gen_batch, - actor_cluster=self.actor_infer, - pipeline_config=self.pipeline_config, - ), - timeout=self.pipeline_config.rpc_timeout, - ) - metrics.update(reduce_metrics(generate_output.meta_info.pop("metrics", {}))) - - # generate_output is repeated by num_return_sequences, thus - # reset batch.batch before union to make batch size same, - batch.batch = generate_output.batch - batch = batch.union(generate_output) - - # repeat num_return_sequences for fields not in gen_batch - # which has been repeated in generate_scheduler - for key, value in batch.non_tensor_batch.items(): - batch.non_tensor_batch[key] = np.repeat( - value, self.actor_infer.worker_config.generating_args.num_return_sequences - ) - batch.non_tensor_batch['sample_uuid'] = np.array([str(uuid.uuid4()) for _ in range(batch.batch.shape[0])], dtype=object) - batch.meta_info["loss_mask_keys"] = ["response_mask", "final_response_mask"] - - with Timer(name="cal_ref_log_probs_reward", logger=None) as cal_timer: - if self.pipeline_config.enable_reference: - if self.is_lora: - batch.meta_info["disable_adapter"] = True - batch.meta_info["is_offload_states"] = False - ref_log_probs_refs: List[ray.ObjectRef] = self.actor_train.compute_log_probs( - batch, blocking=False - ) - else: - ref_log_probs_refs: List[ray.ObjectRef] = self.reference.compute_log_probs( - batch, blocking=False - ) - ref_log_probs = DataProto.materialize_concat(data_refs=ref_log_probs_refs) - metrics.update(reduce_metrics(ref_log_probs.meta_info.pop("metrics", {}))) - ref_log_probs.rename(old_keys="log_probs", new_keys="ref_log_probs") - batch = batch.union(ref_log_probs) - rewards_refs: List[ray.ObjectRef] = self.reward.compute_rewards(batch, blocking=False) - rewards = DataProto.materialize_concat(data_refs=rewards_refs) - metrics.update(reduce_metrics(rewards.meta_info.pop("metrics", {}))) - batch = batch.union(rewards) - metrics["time/ref_log_probs_values_reward"] = cal_timer.last - - with Timer(name="get_sample_level_mask", logger=None) as get_sample_level_mask_timer: - batch, mask_metrics = get_sample_level_mask(batch, self.pipeline_config) - metrics.update(mask_metrics) - metrics["time/get_sample_level_mask"] = get_sample_level_mask_timer.last - - with Timer(name="cal_old_log_probs_values", logger=None) as cal_old_logpb_timer: - if self.is_lora: - batch.meta_info["disable_adapter"] = False - batch.meta_info["is_offload_states"] = False - if self.pipeline_config.adv_estimator == "gae": - values_refs: List[ray.ObjectRef] = self.critic.compute_values(batch, blocking=False) - - if self.pipeline_config.enable_old_logprobs_recompute: - old_log_probs_refs: List[ray.ObjectRef] = self.actor_train.compute_log_probs( - batch, blocking=False - ) - old_log_probs = DataProto.materialize_concat(data_refs=old_log_probs_refs) - batch.batch["old_log_probs"] = old_log_probs.batch["log_probs"] - metrics.update(reduce_metrics(old_log_probs.meta_info.pop("metrics", {}))) - else: - # Use zeros when optimization is enabled - batch.batch["old_log_probs"] = torch.zeros_like(batch.batch["attention_mask"][:, 1:]) - - if self.pipeline_config.adv_estimator == "gae": - values = DataProto.materialize_concat(data_refs=values_refs) - batch = batch.union(values) - metrics.update(reduce_metrics(values.meta_info.pop("metrics", {}))) - - # Mock ref_log_probs using old_log_probs if reference is disabled - if not self.pipeline_config.enable_reference: - batch.batch["ref_log_probs"] = batch.batch["old_log_probs"].clone() - - metrics["time/old_log_probs"] = cal_old_logpb_timer.last - - with Timer(name="adv", logger=None) as timer: - if self.pipeline_config.use_reward_scaling: - self.running.update(batch.batch["response_level_rewards"]) - reward_scaling_factor = ( - self.running.std + torch.finfo(batch.batch["response_level_rewards"].dtype).eps - ) - if self.pipeline_config.use_reward_norm: - batch.batch["response_level_rewards"] = ( - batch.batch["response_level_rewards"] - self.running.mean - ) / reward_scaling_factor - else: - batch.batch["response_level_rewards"] /= ( - reward_scaling_factor # do not -= mean since advantage will be normalized again - ) - - if self.pipeline_config.reward_clip: - reward_clip_frac = compute_clip_fraction( - values=batch.batch["response_level_rewards"], - clip_max=self.pipeline_config.reward_clip, - clip_min=-self.pipeline_config.reward_clip, - ) - metrics["critic/reward_clip_frac"] = reward_clip_frac - batch.batch["response_level_rewards"] = torch.clamp( - batch.batch["response_level_rewards"], - min=-self.pipeline_config.reward_clip, - max=self.pipeline_config.reward_clip, - ) - - if self.pipeline_config.adv_estimator == "grpo": - batch = group_reward_norm( - batch, - n_sample=self.pipeline_config.actor_infer.generating_args.num_return_sequences, - div_std=True, - ) - - if not self.pipeline_config.use_kl_loss: # not grpo's kl loss - batch, kl_metrics = apply_kl_penalty( - data=batch, kl_ctrl=self.kl_ctrl, kl_penalty=self.pipeline_config.kl_penalty - ) - else: - token_level_rewards = expand_to_token_level(data=batch) - batch.batch["token_level_rewards"] = token_level_rewards - kl_metrics = {} - - if self.pipeline_config.reward_clip: - reward_clip_frac = compute_clip_fraction( - values=batch.batch["token_level_rewards"], - clip_max=self.pipeline_config.reward_clip, - clip_min=-self.pipeline_config.reward_clip, - ) - metrics["critic/token_reward_clip_frac"] = reward_clip_frac - batch.batch["token_level_rewards"] = torch.clamp( - batch.batch["token_level_rewards"], - min=-self.pipeline_config.reward_clip, - max=self.pipeline_config.reward_clip, - ) - - batch = compute_advantage( - data=batch, - gamma=self.pipeline_config.gamma, - lambd=self.pipeline_config.lambd, - adv_estimator=self.pipeline_config.adv_estimator, - advantage_clip=self.pipeline_config.advantage_clip, - whiten_advantages=self.pipeline_config.whiten_advantages, - whiten_rewards=self.pipeline_config.whiten_rewards, - ) - metrics.update(reduce_metrics(batch.meta_info.pop("metrics", {}))) - - metrics.update(kl_metrics) - metrics["time/adv"] = timer.last - - if self.pipeline_config.adv_estimator == "gae": - critic_train_metrics_refs: List[ray.ObjectRef] = self.critic.train_step(batch, blocking=False) - - with actor_train_timer: - # implement critic warmup - if not hasattr(self, "critic") or self.pipeline_config.critic_warmup <= global_step: - # update actor - actor_train_metrics_refs = self.actor_train.train_step(batch, blocking=False) - actor_train_metrics: DataProto = DataProto.materialize_concat( - data_refs=actor_train_metrics_refs - ) - metrics.update(reduce_metrics(actor_train_metrics.meta_info.pop("metrics", {}))) - - if self.pipeline_config.adv_estimator == "gae": - critic_train_metrics = DataProto.materialize_concat(data_refs=critic_train_metrics_refs) - metrics.update(reduce_metrics(critic_train_metrics.meta_info.pop("metrics", {}))) - - tps_timer.push_units_processed(n=torch.sum(batch.batch["attention_mask"]).detach().item()) - actor_infer_timer.push_units_processed(n=torch.sum(batch.batch["attention_mask"]).detach().item()) - actor_infer_response_timer.push_units_processed( - n=torch.sum(batch.batch["response_mask"]).detach().item() - ) - actor_train_timer.push_units_processed(n=torch.sum(batch.batch["attention_mask"]).detach().item()) - - data_metrics = compute_data_metrics(batch=batch) - metrics.update(data_metrics) - metrics["system/tps"] = tps_timer.mean_throughput - metrics["system/actor_infer/tps"] = actor_infer_timer.mean_throughput - metrics["system/actor_infer/response/tps"] = actor_infer_response_timer.mean_throughput - metrics["system/actor_train/tps"] = actor_train_timer.mean_throughput - metrics["system/tps_gpu"] = tps_timer.mean_throughput / self.resource_manager.num_gpus - metrics["system/actor_infer/tps_gpu"] = actor_infer_timer.mean_throughput / self.actor_infer.world_size - metrics["system/actor_infer//response/tps_gpu"] = ( - actor_infer_response_timer.mean_throughput / self.actor_infer.world_size - ) - metrics["system/actor_train/tps_gpu"] = actor_train_timer.mean_throughput / self.actor_train.world_size - metrics["system/actor_infer/tps_dp"] = actor_infer_timer.mean_throughput / self.actor_infer.dp_size - metrics["system/actor_infer/response/tps_dp"] = ( - actor_infer_response_timer.mean_throughput / self.actor_infer.dp_size - ) - metrics["system/actor_train/tps_dp"] = actor_train_timer.mean_throughput / self.actor_train.dp_size - metrics["system/samples"] = (global_step + 1) * batch.batch.shape[0] - - # do ckpt - self.state.step = global_step - self.state.log_history.append(metrics) - - self.do_checkpoint(global_step=global_step) - - self.tracker.log(values=metrics, step=global_step) - - if global_step % self.pipeline_config.logging_steps == 0: - if int(os.environ.get("RAY_PROFILING", "0")): - timeline_dir = os.path.join(self.pipeline_config.profiler_output_dir, "timeline") - os.makedirs(timeline_dir, exist_ok=True) - ray.timeline( - filename=os.path.join(timeline_dir, f"timeline-step-{global_step}.json"), - ) - - prompt_ids = generate_output.batch["prompts"] - response_ids = generate_output.batch["responses"] - - generate_res = [] - # skip_special_tokens=True would output without image token, maybe do not skip - prompts = self.tokenizer.batch_decode(prompt_ids, skip_special_tokens=True) - responses = self.tokenizer.batch_decode(response_ids, skip_special_tokens=True) - for prompt, prompt_id, response, response_id in zip( - prompts, - prompt_ids, - responses, - response_ids, - ): - generate_res.append( - { - "prompt": prompt, - # "prompt_id": prompt_id.tolist(), - "response": response, - # "response_id": response_id.tolist(), - } - ) - logger.info(json.dumps(generate_res[:10], ensure_ascii=False)) - logger.info(json.dumps(metrics, ensure_ascii=False)) - - logger.info(f"pipeline step {global_step} finished") - global_step += 1 - - if global_step >= self.pipeline_config.max_steps: - logger.info(f"pipeline step {global_step} finished, reached max steps: {self.pipeline_config.max_steps}") - return - - logger.info(f"epoch {epoch} finished") - logger.info("pipeline complete!") - - @torch.no_grad() - def val(self): - # throughput for tokens per second - tps_timer = _Timer(window_size=5) - metrics = {} - epoch_batch = [] - for batch_dict in tqdm(self.val_dataloader): - with tps_timer: - batch_dict: Dict - batch: DataProto = DataProto.from_single_dict(batch_dict) - gen_batch = batch.pop( - batch_keys=["input_ids", "attention_mask", "position_ids"], - non_tensor_batch_keys=["multi_modal_data"] if "multi_modal_data" in batch.non_tensor_batch else [], - ) - gen_batch.meta_info["is_offload_states"] = False - generate_output: DataProto = ray.get( - self.generate_scheduler.generate.remote( - data=gen_batch, - actor_cluster=self.actor_infer, - pipeline_config=self.pipeline_config, - ), - timeout=self.pipeline_config.rpc_timeout, - ) - batch.batch = generate_output.batch - batch = batch.union(generate_output) - - for key, value in batch.non_tensor_batch.items(): - batch.non_tensor_batch[key] = np.repeat( - value, self.actor_infer.worker_config.generating_args.num_return_sequences - ) - - with Timer(name="cal_reward", logger=None) as cal_timer: - rewards = ray.get(self.reward.workers[0].compute_rewards.remote(batch)) - batch = batch.union(rewards) - logger.info( - json.dumps( - {"val_correct/mean": (batch.batch["scores"] == 1).detach().float().mean().item()}, - ensure_ascii=False, - ) - ) - epoch_batch.append(batch) - - if len(epoch_batch) == 0: - logger.info(f"len(self.val_dataloader): {len(self.val_dataloader)}, skip val...") - return {} - - epoch_batch = DataProto.concat(epoch_batch) - logger.info(f"total eval information: {epoch_batch}") - logger.info(f"total eval information --- scores mean: {epoch_batch.batch['scores'].mean().item()} " - f"scores: {epoch_batch.batch['scores'].tolist()}") - metrics[ f"val_correct/mean"] = (epoch_batch.batch["scores"] == 1).detach().float().mean().item() - return metrics - - -def compute_data_metrics(batch): - sequence_score = batch.batch["scores"] - sequence_reward = batch.batch["token_level_rewards"].sum(-1) - sequence_reward_mean = batch.batch["token_level_rewards"].mean(-1) - - max_response_length = batch.batch["responses"].shape[-1] - advantages = batch.batch["advantages"] - prompt_mask = batch.batch["prompt_mask"].bool() - response_mask = batch.batch["response_mask"][:, 1:].bool() - raw_advantages = batch.batch["raw_advantages"] - prompt_length = prompt_mask.sum(-1).float() # (batch_size,) - response_length = response_mask.sum(-1).float() # (batch_size,) - returns = batch.batch["returns"] - - metrics = { - # correct - "critic/correct/mean": (sequence_score == 1).detach().float().mean().item(), - # score - "critic/score/mean": torch.mean(sequence_score).detach().item(), - "critic/score/max": torch.max(sequence_score).detach().item(), - "critic/score/min": torch.min(sequence_score).detach().item(), - # reward - "critic/rewards/mean": torch.mean(sequence_reward).detach().item(), - "critic/rewards/max": torch.max(sequence_reward).detach().item(), - "critic/rewards/min": torch.min(sequence_reward).detach().item(), - "critic/rewards_mean/mean": torch.mean(sequence_reward_mean).detach().item(), - "critic/rewards_mean/max": torch.max(sequence_reward_mean).detach().item(), - "critic/rewards_mean/min": torch.min(sequence_reward_mean).detach().item(), - # adv - "critic/advantages/mean": masked_mean(advantages, response_mask).detach().item(), - "critic/advantages/max": torch.max(advantages[response_mask]).detach().item(), - "critic/advantages/min": torch.min(advantages[response_mask]).detach().item(), - # raw_adv - "critic/raw_advantages/mean": masked_mean(raw_advantages, response_mask).detach().item(), - "critic/raw_advantages/max": torch.max(raw_advantages[response_mask]).detach().item(), - "critic/raw_advantages/min": torch.min(raw_advantages[response_mask]).detach().item(), - # returns - "critic/returns/mean": masked_mean(returns, response_mask).detach().item(), - "critic/returns/max": torch.max(returns[response_mask]).detach().item(), - "critic/returns/min": torch.min(returns[response_mask]).detach().item(), - # response length - "tokens/response_length/mean": torch.mean(response_length).detach().item(), - "tokens/response_length/max": torch.max(response_length).detach().item(), - "tokens/response_length/min": torch.min(response_length).detach().item(), - # prompt length - "tokens/prompt_length/mean": torch.mean(prompt_length).detach().item(), - "tokens/prompt_length/max": torch.max(prompt_length).detach().item(), - "tokens/prompt_length/min": torch.min(prompt_length).detach().item(), - } - - if "values" in batch.batch.keys(): - values = batch.batch["values"] - # values - metrics.update( - { - "critic/values/mean": masked_mean(values, response_mask).detach().item(), - "critic/values/max": torch.max(values[response_mask]).detach().item(), - "critic/values/min": torch.min(values[response_mask]).detach().item(), - } - ) - return metrics diff --git a/roll/pipeline/rlvr/rlvr_vlm_pipeline.py b/roll/pipeline/rlvr/rlvr_vlm_pipeline.py index 19a46280..807e0a29 100644 --- a/roll/pipeline/rlvr/rlvr_vlm_pipeline.py +++ b/roll/pipeline/rlvr/rlvr_vlm_pipeline.py @@ -43,8 +43,8 @@ from roll.utils.kl_controller import get_kl_controller from roll.utils.logging import get_logger from roll.utils.metrics.metrics_manager import MetricsManager -from roll.utils.packages import is_transformers_version_greater_than from roll.utils.offload_states import OffloadStateType +from roll.utils.train_infer_corrections import apply_train_infer_correction_to_batch logger = get_logger() @@ -643,6 +643,10 @@ def run(self): batch_grouped: Dict[str, DataProto] = batch.group_by("domain") metrics_mgr.add_domain_all_metrics(global_step, batch_grouped) + if self.pipeline_config.enable_old_logprobs_recompute: + batch, corr_metrics = apply_train_infer_correction_to_batch(self.pipeline_config, batch) + metrics_mgr.add_metrics(corr_metrics) + with Timer(name="step_train", logger=None) as step_train_timer: if self.pipeline_config.adv_estimator == "gae": critic_train_metrics_refs: List[ray.ObjectRef] = self.critic.train_step(batch, blocking=False)