-
Notifications
You must be signed in to change notification settings - Fork 34
[WS2][distributed] Add deterministic all-reduce contract and smoke tests #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2bccb64
9a043fb
6b9672c
6403bfa
ee8c6c3
165cefa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| # Deterministic All-Reduce | ||
|
|
||
| RL-Kernel provides a small all-reduce helper for distributed smoke tests and | ||
| future WS2 integration work. It has two modes: | ||
|
|
||
| - `torch_all_reduce`: calls `torch.distributed.all_reduce`. | ||
| - `ordered_rank_reference`: gathers all rank tensors, accumulates them on process-group rank 0 in process-group rank order, then broadcasts the result. | ||
|
|
||
| The helper reduces the input tensor in place and returns it. | ||
|
|
||
| ## Contract | ||
|
|
||
| Results are expected to be stable only when the world size, process-group rank | ||
| order, inputs, dtype, operation, backend, and environment are unchanged. | ||
|
|
||
| `op="mean"` performs a sum and divides by world size at a fixed point. Integer | ||
| tensors are rejected for `mean`. | ||
|
|
||
| ## Ordered-Rank Reference | ||
|
|
||
| `ordered_rank_reference` is a reference path, not a high-performance transport. | ||
| It uses `all_gather` and `broadcast`, so the active backend must support those | ||
| collectives for the tensor device. The operation order is: | ||
|
|
||
| 1. make each rank input contiguous; | ||
| 2. gather tensors in process-group rank order; | ||
| 3. accumulate on process-group rank 0 in that order; | ||
| 4. optionally accumulate floating-point inputs in FP32; | ||
| 5. divide once for `op="mean"`; | ||
| 6. broadcast from process-group rank 0. | ||
|
|
||
| This mode is meant for small tensors in tests, debug runs, and reference | ||
| comparisons. | ||
|
|
||
| ## Torch All-Reduce | ||
|
|
||
| `torch_all_reduce` is a thin wrapper around `torch.distributed.all_reduce`. For | ||
| NCCL runs, callers may set best-effort ring settings before process-group | ||
| initialization: | ||
|
|
||
| ```python | ||
| from rl_engine.distributed import configure_deterministic_nccl_env | ||
|
|
||
| configure_deterministic_nccl_env(overwrite=True) | ||
| ``` | ||
|
|
||
| The helper writes: | ||
|
|
||
| ```bash | ||
| NCCL_ALGO=Ring | ||
| NCCL_PROTO=Simple | ||
| NCCL_MIN_NCHANNELS=1 | ||
| NCCL_MAX_NCHANNELS=1 | ||
| ``` | ||
|
|
||
| These settings do not prove bitwise determinism. Validate on the target machine | ||
| before making a hardware-specific claim. | ||
|
|
||
| ## Behavior | ||
|
|
||
| - `world_size == 1`: returns the input tensor unchanged. | ||
| - no initialized process group and `WORLD_SIZE <= 1`: returns the input tensor unchanged. | ||
| - no initialized process group and `WORLD_SIZE > 1`: raises `RuntimeError`. | ||
| - `async_op=True`: raises `NotImplementedError`. | ||
|
|
||
| ## Smoke Tests | ||
|
|
||
| Unit and CPU/Gloo smoke checks: | ||
|
|
||
| ```bash | ||
| PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 python -m pytest tests/distributed/test_deterministic_allreduce.py -q | ||
| ``` | ||
|
|
||
| Manual NCCL all-reduce smoke: | ||
|
|
||
| ```bash | ||
| CUDA_VISIBLE_DEVICES=0,1 \ | ||
| NCCL_ALGO=Ring \ | ||
| NCCL_PROTO=Simple \ | ||
| NCCL_MIN_NCHANNELS=1 \ | ||
| NCCL_MAX_NCHANNELS=1 \ | ||
| torchrun --standalone --nproc_per_node=2 \ | ||
| tests/distributed/test_deterministic_allreduce.py \ | ||
| --backend nccl --mode torch_all_reduce --dtype fp32 --device cuda | ||
| ``` | ||
|
|
||
| DP gradient smoke compares a fixed DP=1 full-batch gradient with DP=N local | ||
| gradients reduced by this helper: | ||
|
|
||
| ```bash | ||
| torchrun --standalone --nproc_per_node=2 \ | ||
| tests/distributed/test_dp_gradient_determinism.py \ | ||
| --backend gloo --mode ordered_rank_reference --dtype fp32 --device cpu | ||
| ``` | ||
|
|
||
| ## Limitations | ||
|
|
||
| - NVLS / NVLink-Sharp is not implemented or claimed here. | ||
| - Multi-node and RDMA behavior are not validated here. | ||
| - DeepSpeed gradient synchronization is not controlled by this helper yet. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| # Deterministic All-Reduce Audit | ||
|
|
||
| This audit records the distributed communication points relevant to | ||
| [RL-Align/RL-Kernel#112](https://github.com/RL-Align/RL-Kernel/issues/112). | ||
|
|
||
| ## Search | ||
|
|
||
| ```bash | ||
| rg -n "all_reduce|allreduce|reduce_scatter|all_gather|DistributedDataParallel|FSDP|deepspeed|gradient" \ | ||
| rl_engine csrc tests examples benchmarks scripts docs .github | ||
| rg -n "torch\.distributed|distributed|dist\.|process_group|ProcessGroup|nccl|NCCL|reduce|all_reduce|reduce_scatter|all_gather|gradient" \ | ||
| rl_engine csrc tests examples benchmarks scripts docs .github | ||
| ``` | ||
|
|
||
| ## Summary | ||
|
|
||
| No direct `torch.distributed` all-reduce, reduce-scatter, all-gather, DDP, or | ||
| FSDP call sites were found in RL-Kernel source code. The current DP-gradient | ||
| communication risk is indirect: `DeepSpeedTrainingWorker` delegates backward and | ||
| optimizer behavior to the optional DeepSpeed engine. | ||
|
|
||
| CUDA IPC uses of `torch.multiprocessing.reductions.reduce_tensor` are not | ||
| collective reductions. They serialize CUDA IPC handles for same-node weight | ||
| handoff. | ||
|
|
||
| ## Inventory | ||
|
|
||
| | Location | Kind | In scope for #112 | Handling | | ||
| | --- | --- | --- | --- | | ||
| | `rl_engine/executors/deepspeed_trainer.py` `DeepSpeedTrainingWorker.train` | Backward / optimizer delegation to DeepSpeed | Yes, indirectly | Do not claim control over DeepSpeed communication order until a tested integration point exists. | | ||
| | `rl_engine/executors/deepspeed_trainer.py` `deepspeed.initialize(...)` | Optional distributed runtime setup | Yes, indirectly | Keep missing-DeepSpeed behavior explicit. Any future integration must document the DeepSpeed hook used for gradient reduction. | | ||
| | `tests/test_deepspeed_training_worker.py` fake engine tests | Unit tests for worker delegation | Adjacent | These tests prove delegation only; they do not validate distributed gradient ordering. | | ||
| | `rl_engine/executors/bridge.py` CUDA IPC `reduce_tensor` use | CUDA IPC handle serialization | No | Keep out of all-reduce scope. | | ||
| | `rl_engine/executors/bridge.py` multi-node/RDMA/NCCL transport blockers | Unsupported weight transport guards | Adjacent | Preserve explicit blockers until a tested transport exists. | | ||
| | `rl_engine/utils/logger.py` `info_on_rank` | Rank-filtered logging | No | No numeric reduction behavior. | | ||
|
|
||
| ## Entry Point | ||
|
|
||
| New distributed code should route through `rl_engine.distributed` so the | ||
| all-reduce contract and fallback/reference behavior stay testable in one place. | ||
|
|
||
| ## Not Covered | ||
|
|
||
| - NVLS / NVLink-Sharp. | ||
| - Multi-node or RDMA collectives. | ||
| - DeepSpeed internal gradient synchronization order. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # Copyright (c) 2026 RL-Kernel Contributors | ||
|
|
||
| from rl_engine.distributed.deterministic_allreduce import ( | ||
| DETERMINISTIC_NCCL_ENV, | ||
| DeterministicAllReduceConfig, | ||
| configure_deterministic_nccl_env, | ||
| deterministic_all_reduce, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "DETERMINISTIC_NCCL_ENV", | ||
| "DeterministicAllReduceConfig", | ||
| "configure_deterministic_nccl_env", | ||
| "deterministic_all_reduce", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # Copyright (c) 2026 RL-Kernel Contributors | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import os | ||
| import warnings | ||
| from dataclasses import dataclass | ||
| from typing import Literal, Optional | ||
|
|
||
| import torch | ||
| import torch.distributed as dist | ||
|
|
||
| DETERMINISTIC_NCCL_ENV = { | ||
| "NCCL_ALGO": "Ring", | ||
| "NCCL_PROTO": "Simple", | ||
| "NCCL_MIN_NCHANNELS": "1", | ||
| "NCCL_MAX_NCHANNELS": "1", | ||
| } | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class DeterministicAllReduceConfig: | ||
| """Options for :func:`deterministic_all_reduce`.""" | ||
|
|
||
| mode: Literal["torch_all_reduce", "ordered_rank_reference"] = "torch_all_reduce" | ||
| op: Literal["sum", "mean"] = "sum" | ||
| force_fp32_accumulation: bool = True | ||
| async_op: bool = False | ||
| group: Optional[dist.ProcessGroup] = None | ||
|
|
||
|
|
||
| def configure_deterministic_nccl_env(*, overwrite: bool = False) -> dict[str, Optional[str]]: | ||
| """Set best-effort NCCL ring settings before process-group init.""" | ||
|
|
||
| if dist.is_available() and dist.is_initialized(): | ||
| warnings.warn( | ||
| "NCCL environment was configured after torch.distributed initialization", | ||
| RuntimeWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| previous: dict[str, Optional[str]] = {} | ||
| for key, value in DETERMINISTIC_NCCL_ENV.items(): | ||
| previous[key] = os.environ.get(key) | ||
| if overwrite or key not in os.environ: | ||
| os.environ[key] = value | ||
| continue | ||
| if os.environ[key] != value: | ||
| warnings.warn( | ||
| f"{key} is {os.environ[key]!r}; expected {value!r}", | ||
| RuntimeWarning, | ||
| stacklevel=2, | ||
| ) | ||
| return previous | ||
|
|
||
|
|
||
| def deterministic_all_reduce( | ||
| tensor: torch.Tensor, | ||
| config: Optional[DeterministicAllReduceConfig] = None, | ||
| ) -> torch.Tensor: | ||
| """Reduce ``tensor`` in place and return it.""" | ||
|
|
||
| cfg = config or DeterministicAllReduceConfig() | ||
| _validate(tensor, cfg) | ||
|
|
||
| if cfg.async_op: | ||
| raise NotImplementedError("async deterministic all-reduce is not implemented") | ||
| if not dist.is_available(): | ||
| raise RuntimeError("torch.distributed is unavailable") | ||
| if not dist.is_initialized(): | ||
| if int(os.environ.get("WORLD_SIZE", "1")) > 1: | ||
| raise RuntimeError("torch.distributed is not initialized") | ||
| return tensor | ||
|
|
||
| world_size = dist.get_world_size(group=cfg.group) | ||
| if world_size == 1: | ||
| return tensor | ||
|
|
||
| if cfg.mode == "torch_all_reduce": | ||
| return _torch_all_reduce(tensor, cfg, world_size) | ||
| return _ordered_rank_reference(tensor, cfg, world_size) | ||
|
|
||
|
|
||
| def _validate(tensor: torch.Tensor, cfg: DeterministicAllReduceConfig) -> None: | ||
| if not isinstance(tensor, torch.Tensor): | ||
| raise TypeError(f"tensor must be a torch.Tensor, got {type(tensor)!r}") | ||
| if cfg.mode not in {"torch_all_reduce", "ordered_rank_reference"}: | ||
| raise ValueError(f"unsupported all-reduce mode: {cfg.mode!r}") | ||
| if cfg.op not in {"sum", "mean"}: | ||
| raise ValueError(f"unsupported reduction op: {cfg.op!r}") | ||
| if cfg.op == "mean" and not (tensor.is_floating_point() or tensor.is_complex()): | ||
| raise TypeError("op='mean' requires a floating-point or complex tensor") | ||
|
|
||
|
|
||
| def _torch_all_reduce( | ||
| tensor: torch.Tensor, | ||
| cfg: DeterministicAllReduceConfig, | ||
| world_size: int, | ||
| ) -> torch.Tensor: | ||
| dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=cfg.group, async_op=False) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. torch_all_reduce can silently give wrong results for non-contiguous tensors. I reproduced it with a 2-rank Gloo run on a base[:, ::2] view. Could we make it use a contiguous buffer and copy back, or reject non-contiguous inputs? A small smoke test would cover this.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching this. I added a contiguity check for |
||
| if cfg.op == "mean": | ||
| tensor.div_(world_size) | ||
| return tensor | ||
|
Comment on lines
+96
to
+104
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🎯 Functional Correctness | 🟡 Minor 🧩 Analysis chain🌐 Web query:
💡 Result: When using torch.distributed.all_reduce with the NCCL backend, the accumulation precision generally depends on the specific implementation and configuration rather than a blanket rule, though it frequently defaults to the input tensor's native data type (e.g., float16 or bfloat16) to ensure communication efficiency [1][2]. However, there are important nuances to this behavior: 1. Standard NCCL Accumulation: In standard NCCL collective operations, data is typically communicated and accumulated in its native dtype [1][2]. If you pass float16 tensors to all_reduce, the summation is performed using float16 precision [1]. 2. High-Precision Symmetric Accumulation: PyTorch supports a feature known as symmetric memory / symmetric kernels for certain NCCL collectives (like all_reduce). When this is active, NCCL may automatically perform internal reductions with higher precision (e.g., converting float16 or bfloat16 inputs to float32 for accumulation) before casting the result back to the original format [3]. This is designed to improve numerical accuracy without requiring user code changes [3]. 3. Specialized Accumulation for Low-Precision Types: For specific low-precision types supported by NCCL's device-level APIs (such as fp8), NCCL explicitly accumulates in a wider, more precise type (e.g., float16) during the reduction process [4][5]. 4. Algorithmic Impact: Beyond just the dtype, the order of summation—which is affected by the NCCL algorithm selected (e.g., Ring vs. Tree)—can also significantly impact numerical precision and determinism [6][7][2]. Ring-based algorithms, which are often used for peak bandwidth, may exhibit lower numerical precision compared to tree-based algorithms because they do not balance the operands as effectively [7][2]. If numerical stability is a primary concern for your application, it is generally recommended to ensure your tensors are in float32 before calling all_reduce, or to ensure that the environment is configured to utilize high-precision kernels if available [1][2]. Citations:
🏁 Script executed: # First, let's examine the full context of the file in question
wc -l rl_engine/distributed/deterministic_allreduce.pyRepository: RL-Align/RL-Kernel Length of output: 209 🏁 Script executed: # Search for force_fp32_accumulation usage in the file
rg "force_fp32_accumulation" rl_engine/distributed/deterministic_allreduce.py -A 2 -B 2Repository: RL-Align/RL-Kernel Length of output: 628 🏁 Script executed: # Look for the _ordered_rank_reference function to see how it uses force_fp32_accumulation
rg "_ordered_rank_reference" rl_engine/distributed/deterministic_allreduce.py -A 10 -B 2Repository: RL-Align/RL-Kernel Length of output: 1231 🏁 Script executed: # Check if .contiguous() is used in _torch_all_reduce or elsewhere
rg "contiguous" rl_engine/distributed/deterministic_allreduce.py -A 2 -B 2Repository: RL-Align/RL-Kernel Length of output: 356 🏁 Script executed: # Get the full implementation to understand the context better
cat -n rl_engine/distributed/deterministic_allreduce.py | head -130 | tail -50Repository: RL-Align/RL-Kernel Length of output: 2369 🏁 Script executed: # Find and examine the _accumulation_dtype function
rg "_accumulation_dtype" rl_engine/distributed/deterministic_allreduce.py -A 5 -B 2Repository: RL-Align/RL-Kernel Length of output: 762 🏁 Script executed: # Check the DeterministicAllReduceConfig dataclass definition completely
rg "class DeterministicAllReduceConfig" rl_engine/distributed/deterministic_allreduce.py -A 15Repository: RL-Align/RL-Kernel Length of output: 819
The config defaults Additionally, 🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| def _ordered_rank_reference( | ||
| tensor: torch.Tensor, | ||
| cfg: DeterministicAllReduceConfig, | ||
| world_size: int, | ||
| ) -> torch.Tensor: | ||
| send = tensor.detach().contiguous() | ||
| gathered = [torch.empty_like(send) for _ in range(world_size)] | ||
| dist.all_gather(gathered, send, group=cfg.group) | ||
|
|
||
| result = torch.empty_like(send) | ||
| if dist.get_rank(group=cfg.group) == 0: | ||
| dtype = _accumulation_dtype(send, cfg.force_fp32_accumulation) | ||
| reduced = gathered[0].to(dtype=dtype) | ||
| for item in gathered[1:]: | ||
| reduced.add_(item.to(dtype=dtype)) | ||
| if cfg.op == "mean": | ||
| reduced.div_(world_size) | ||
| result.copy_(reduced.to(dtype=send.dtype)) | ||
|
|
||
| dist.broadcast(result, src=_group_root_global_rank(cfg.group), group=cfg.group) | ||
| tensor.copy_(result.view_as(tensor)) | ||
| return tensor | ||
|
|
||
|
|
||
| def _group_root_global_rank(group: Optional[dist.ProcessGroup]) -> int: | ||
| if group is None: | ||
| return 0 | ||
| try: | ||
| return int(dist.get_global_rank(group, 0)) | ||
| except AttributeError as exc: | ||
| raise RuntimeError( | ||
| "custom process groups require torch.distributed.get_global_rank" | ||
| ) from exc | ||
|
|
||
|
|
||
| def _accumulation_dtype(tensor: torch.Tensor, force_fp32: bool) -> torch.dtype: | ||
| if not force_fp32 or not tensor.is_floating_point(): | ||
| return tensor.dtype | ||
| if tensor.dtype == torch.float64: | ||
| return torch.float64 | ||
| return torch.float32 | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🩺 Stability & Availability | 🟡 Minor
🧩 Analysis chain
🌐 Web query:
Does torch.distributed.all_reduce with the NCCL backend require contiguous tensors, and what happens with a non-contiguous input?💡 Result:
Yes, torch.distributed.all_reduce using the NCCL backend requires input tensors to be contiguous [1]. In the PyTorch C++ implementation for the NCCL backend, explicit checks are performed to ensure that all input and output tensors are contiguous [1]. If a non-contiguous tensor is passed to a collective operation that requires contiguity, these checks typically raise a runtime error (e.g., a TORCH_CHECK failure) to prevent undefined behavior or data corruption [1]. The requirement for contiguity exists because NCCL expects data buffers to be stored in a single, continuous block of memory to perform efficient, high-performance collective communications across devices [2]. When a tensor is non-contiguous (e.g., due to slicing or permuting), its memory layout does not match the flat memory representation that NCCL requires [2]. While some higher-level PyTorch collective functions may implicitly handle non-contiguous tensors by performing a clone/copy into a contiguous buffer before passing them to the backend [3], core distributed primitives often mandate that the user provide contiguous tensors [2][1]. Users are generally expected to ensure their tensors are contiguous, often by calling.contiguous on the tensor before passing it to an all_reduce operation [2]. Passing non-contiguous tensors to operations that do not explicitly perform this check can lead to incorrect computational results or internal NCCL errors [4][5].
Citations:
ncclInternalError: Internal check failedwhen usingirecvwith non-contiguous tensor in NCCL backend NVIDIA/nccl#1655🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 209
🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 6068
🏁 Script executed:
rg -n "deterministic_all_reduce" --type py -A 3 -B 1Repository: RL-Align/RL-Kernel
Length of output: 156
🏁 Script executed:
rg "deterministic_all_reduce" -nRepository: RL-Align/RL-Kernel
Length of output: 156
🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 361
🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 696
🏁 Script executed:
rg "deterministic_all_reduce" --all-types -B 2 -A 3Repository: RL-Align/RL-Kernel
Length of output: 570
🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 347
🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 260
🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 12024
🏁 Script executed:
Repository: RL-Align/RL-Kernel
Length of output: 13969
Fix contiguity handling in
_torch_all_reduceto match_ordered_rank_reference._ordered_rank_referencedefensively callstensor.detach().contiguous()before collective operations (line 112), but_torch_all_reducepassestensordirectly todist.all_reduceat line 101. The NCCL backend requires contiguous tensors and will raise a runtime error if a non-contiguous tensor (e.g., from gradient operations or slicing) is passed. Add.contiguous()handling to_torch_all_reduceto ensure consistent and robust behavior across both code paths.🤖 Prompt for AI Agents