Skip to content

[feat] Add RLHF rollout integration support (verl)#549

Open
sijyang wants to merge 12 commits intomainfrom
sijyang/verl_dev
Open

[feat] Add RLHF rollout integration support (verl)#549
sijyang wants to merge 12 commits intomainfrom
sijyang/verl_dev

Conversation

@sijyang
Copy link
Copy Markdown

@sijyang sijyang commented Apr 13, 2026

Overview

This PR enables ATOM to serve as a rollout backend for verl, a distributed RLHF training framework. In RLHF training, the system alternates between two phases:

  1. Training phase — the trainer updates model weights via gradient descent (handled by verl)
  2. Rollout phase — the inference engine generates responses using the latest weights (handled by ATOM)

This requires ATOM to support a new lifecycle that traditional serving doesn't need: receiving weight updates from an external trainer, dynamically releasing/reclaiming GPU memory between phases, and coordinating these operations across multiple DP ranks and TP ranks.

The integration is designed as a plugin layer (atom/rollout/) that extends ATOM's existing engine without modifying its core inference path. All changes to existing files are purely incremental additions (new methods, new fields, new message types), no existing behavior is altered.

Architecture

verl trainer (PyTorch DDP)

ATOMHttpServer (verl side, per-node)
│ ZMQ RPC
AsyncLLMEngine (atom/rollout/async_engine.py)
├── sleep() → release KV cache, free GPU memory for training
├── wake_up() → reallocate KV cache, ready for generation
├── load_weights() → receive updated weights via CUDA IPC
└── generate() → standard ATOM inference with logprobs

Weight Synchronization

Weight transfer uses CUDA IPC (weight_sync.py → weight_updater.py) for zero-copy GPU-to-GPU transfer via cudaIpcGetMemHandle/cudaIpcOpenMemHandle. Weights are packed into a GPU buffer, and IPC handles are sent to ModelRunner subprocesses. On multi-GPU setups (DP>1), per-GPU buffers ensure same-device IPC.

Weights are accumulated into fixed-size buckets and flushed incrementally, keeping peak memory overhead bounded regardless of model size.

Weight Update Pipeline

weight_updater.py handles the ModelRunner side of weight loading:

  • Maps incoming parameter names to ATOM's internal weight names (handling TP sharding, column/row parallel splits)
  • Supports packed weights (e.g., QKV fused) by slicing incoming tensors to correct offsets
  • Handles FP8 requantization — when the model uses FP8, incoming FP16/BF16 weights are quantized in-place with updated scales
  • Clears KV cache after weight update to prevent stale cache from previous weights

GPU Memory Lifecycle

memory_manager.py manages the sleep/wake cycle:

  • Sleep: deallocate KV cache blocks → torch.cuda.empty_cache() → memory returned to PyTorch/ROCm for trainer
  • Wake: empty_cache() → recalculate available blocks → reallocate KV cache → ready for inference
  • Each DP rank manages its own KV cache independently

DP Isolation

model_runner_ext.py (RLHFModelRunner) extends ATOM's ModelRunner for DP-isolated execution. Each DP rank's ModelRunners form an independent NCCL world scoped to TP only, with correct physical-to-logical device mapping and NCCL binding patches for ROCm multi-GPU setups.

Changes

New files (atom/rollout/)

File Purpose
__init__.py Package exports
async_engine.py AsyncLLMEngine wrapper (sleep/wake/load_weights API)
engine_utility.py Utility command handlers (update_weights, release/resume_memory)
memory_manager.py GPU memory lifecycle (KV cache alloc/release, weight discard/resume)
model_runner_ext.py RLHFModelRunner with DP isolation, NCCL device binding patch
weight_sync.py Weight transfer via CUDA IPC (per-GPU buffers)
weight_updater.py Weight update logic (packed weights, FP8 requantize, TP sharding)

Incremental changes to existing files

  • engine_core.py: utility queue, sleep mode, UTILITY_RESPONSE message type, DP sleep state sync
  • engine_core_mgr.py: utility_response_queue, broadcast_utility_command, broadcast_utility_command_sync
  • llm_engine.py: request_ids and logprobs support in add_request/generate/postprocess
  • async_proc.py: TP-rank barrier for safe weight update buffer reuse
  • scheduler.py: logprobs tracking in ScheduledBatch/ScheduledBatchOutput
  • sequence.py: request_id, return_logprobs, logprobs fields
  • sampling_params.py: logprobs parameter
  • config.py: runner_qualname, compilation_config dict→object conversion

@sijyang sijyang closed this Apr 13, 2026
@sijyang sijyang changed the title Sijyang/verl dev Add RLHF rollout integration support (verl) Apr 13, 2026
@sijyang sijyang reopened this Apr 13, 2026
@sijyang sijyang changed the title Add RLHF rollout integration support (verl) [feat]: Add RLHF rollout integration support (verl) Apr 13, 2026
@sijyang sijyang changed the title [feat]: Add RLHF rollout integration support (verl) [feat] Add RLHF rollout integration support (verl) Apr 13, 2026
shutdown = shutdown or self.pull_and_process_input_queue()
if shutdown:
break
if self._is_sleeping:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need such sleep? I think it's not a good idea to sleep in our engine core

# agree to skip model execution together — MoE expert routing and
# dummy_execution also contain DP-wide collectives that would hang
# if only some cores participated.
local_sleeping = self._is_sleeping
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same

# Recapture CUDA graphs after KV cache re-allocation (addresses changed)
self._recapture_cudagraphs_if_needed()

def _recapture_cudagraphs_if_needed(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be triggered frequently? Cuda Graph capture is overhead and is time-consuming, so it shouldn't be constantly recaptured.

return req_ids_out, processed_out, logprobs_map


class RLHFModelRunner(ModelRunner, WeightUpdaterMixin, MemoryManagerMixin):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find a better way to reuse parent model_runner in atom

logger = logging.getLogger("atom")


class LogprobsTokenIDProcessor(tokenIDProcessor):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use atom TokenIDProcessor it's enough, can we move logprobs in TokenIDProcessor and remove this class?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants