[feat] Add RLHF rollout integration support (verl)#549
Open
[feat] Add RLHF rollout integration support (verl)#549
Conversation
…integration (TP+DP)
… ModelRunner with DP isolation handling
…ion parameters and comments across multiple files
ea9dc91 to
10bab61
Compare
| shutdown = shutdown or self.pull_and_process_input_queue() | ||
| if shutdown: | ||
| break | ||
| if self._is_sleeping: |
Contributor
There was a problem hiding this comment.
Why we need such sleep? I think it's not a good idea to sleep in our engine core
| # agree to skip model execution together — MoE expert routing and | ||
| # dummy_execution also contain DP-wide collectives that would hang | ||
| # if only some cores participated. | ||
| local_sleeping = self._is_sleeping |
| # Recapture CUDA graphs after KV cache re-allocation (addresses changed) | ||
| self._recapture_cudagraphs_if_needed() | ||
|
|
||
| def _recapture_cudagraphs_if_needed(self) -> None: |
Contributor
There was a problem hiding this comment.
Will be triggered frequently? Cuda Graph capture is overhead and is time-consuming, so it shouldn't be constantly recaptured.
| return req_ids_out, processed_out, logprobs_map | ||
|
|
||
|
|
||
| class RLHFModelRunner(ModelRunner, WeightUpdaterMixin, MemoryManagerMixin): |
Contributor
There was a problem hiding this comment.
Find a better way to reuse parent model_runner in atom
| logger = logging.getLogger("atom") | ||
|
|
||
|
|
||
| class LogprobsTokenIDProcessor(tokenIDProcessor): |
Contributor
There was a problem hiding this comment.
Use atom TokenIDProcessor it's enough, can we move logprobs in TokenIDProcessor and remove this class?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Overview
This PR enables ATOM to serve as a rollout backend for verl, a distributed RLHF training framework. In RLHF training, the system alternates between two phases:
This requires ATOM to support a new lifecycle that traditional serving doesn't need: receiving weight updates from an external trainer, dynamically releasing/reclaiming GPU memory between phases, and coordinating these operations across multiple DP ranks and TP ranks.
The integration is designed as a plugin layer (
atom/rollout/) that extends ATOM's existing engine without modifying its core inference path. All changes to existing files are purely incremental additions (new methods, new fields, new message types), no existing behavior is altered.Architecture
verl trainer (PyTorch DDP)
│
ATOMHttpServer (verl side, per-node)
│ ZMQ RPC
AsyncLLMEngine (atom/rollout/async_engine.py)
├── sleep() → release KV cache, free GPU memory for training
├── wake_up() → reallocate KV cache, ready for generation
├── load_weights() → receive updated weights via CUDA IPC
└── generate() → standard ATOM inference with logprobs
Weight Synchronization
Weight transfer uses CUDA IPC (
weight_sync.py → weight_updater.py) for zero-copy GPU-to-GPU transfer viacudaIpcGetMemHandle/cudaIpcOpenMemHandle. Weights are packed into a GPU buffer, and IPC handles are sent to ModelRunner subprocesses. On multi-GPU setups (DP>1), per-GPU buffers ensure same-device IPC.Weights are accumulated into fixed-size buckets and flushed incrementally, keeping peak memory overhead bounded regardless of model size.
Weight Update Pipeline
weight_updater.pyhandles the ModelRunner side of weight loading:GPU Memory Lifecycle
memory_manager.pymanages the sleep/wake cycle:torch.cuda.empty_cache()→ memory returned to PyTorch/ROCm for trainerempty_cache()→ recalculate available blocks → reallocate KV cache → ready for inferenceDP Isolation
model_runner_ext.py(RLHFModelRunner) extends ATOM's ModelRunner for DP-isolated execution. Each DP rank's ModelRunners form an independent NCCL world scoped to TP only, with correct physical-to-logical device mapping and NCCL binding patches for ROCm multi-GPU setups.Changes
New files (
atom/rollout/)__init__.pyasync_engine.pyengine_utility.pymemory_manager.pymodel_runner_ext.pyweight_sync.pyweight_updater.pyIncremental changes to existing files