[WS2][distributed] Add deterministic all-reduce contract and smoke tests#181
[WS2][distributed] Add deterministic all-reduce contract and smoke tests#181CyberSecurityErial wants to merge 6 commits into
Conversation
📝 WalkthroughWalkthroughIntroduces the ChangesDeterministic All-Reduce Package
Sequence Diagram(s)sequenceDiagram
actor Caller
participant deterministic_all_reduce
participant dist as torch.distributed
participant rank0 as Rank 0 accumulator
Caller->>deterministic_all_reduce: tensor, DeterministicAllReduceConfig
alt mode = torch_all_reduce
deterministic_all_reduce->>dist: all_reduce(tensor, op=SUM)
dist-->>deterministic_all_reduce: reduced tensor
deterministic_all_reduce->>deterministic_all_reduce: tensor /= world_size (if mean)
else mode = ordered_rank_reference
deterministic_all_reduce->>dist: all_gather(tensor_list, tensor)
dist-->>deterministic_all_reduce: list of per-rank tensors
deterministic_all_reduce->>rank0: accumulate in rank order (FP32 if forced)
rank0->>rank0: /= world_size (if mean)
rank0->>dist: broadcast(result, src=global_root)
dist-->>deterministic_all_reduce: broadcast result on all ranks
deterministic_all_reduce->>deterministic_all_reduce: tensor.copy_(result)
end
deterministic_all_reduce-->>Caller: tensor (in-place)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Reviews may only be requested from collaborators. The workflow uses a hardcoded reviewer list, and at least one listed user is not a collaborator of RL-Align/RL-Kernel. This happens before any code/test validation from this PR. I noticed #177 removes .github/workflows/auto-reviewer.yml, which should address this repository-level issue. Could maintainers ignore/rerun this check after the workflow fix, or mark it non-blocking for this PR? |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
docs/distributed/deterministic_allreduce.md (2)
19-33: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueDocument the optional
force_fp32_accumulationflag for reproducibility.The algorithm description mentions "optionally accumulate floating-point inputs in FP32" (line 28), but does not explain the corresponding
force_fp32_accumulationconfiguration option. Users seeking bitwise reproducibility across different hardware or precision contexts may want to know when and why to enable this flag. Consider adding a brief note or cross-reference to the config documentation.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/distributed/deterministic_allreduce.md` around lines 19 - 33, The Ordered-Rank Reference section describes an optional FP32 accumulation step but does not document the corresponding `force_fp32_accumulation` configuration flag. Add a brief explanation or note in the Ordered-Rank Reference section that clarifies what the `force_fp32_accumulation` flag does, when users should enable it (specifically for achieving bitwise reproducibility across different hardware or precision contexts), and optionally cross-reference the configuration documentation if it exists elsewhere. This should be inserted after the step list or in a separate paragraph to help users understand this reproducibility option.
37-57: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueClarify when to use
configure_deterministic_nccl_env()vs. manual env var setting.Lines 37–45 recommend calling
configure_deterministic_nccl_env(overwrite=True)in user code, while the smoke-test example at lines 74–85 manually sets the same env vars before runningtorchrun. A clarifying note explaining that these are two alternative approaches (programmatic in-process initialization vs. shell environment setup) would help users understand which to use in their context.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/distributed/deterministic_allreduce.md` around lines 37 - 57, The documentation shows the configure_deterministic_nccl_env() approach for programmatic NCCL setup but does not clarify how it relates to the manual environment variable setting approach shown in the smoke-test example later in the document. Add a clarifying note in the section discussing configure_deterministic_nccl_env() that explicitly states these are two alternative approaches: one using programmatic in-process initialization with the function call, and one using shell environment setup before running torchrun. Include guidance on which approach to use based on user context, such as when to choose programmatic setup versus manual env var configuration.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rl_engine/distributed/deterministic_allreduce.py`:
- Around line 96-104: The _torch_all_reduce function has two issues: it ignores
the cfg.force_fp32_accumulation configuration flag (which defaults to True) and
does not call .contiguous() on the tensor before passing it to dist.all_reduce.
To fix this, follow the pattern used in _ordered_rank_reference by (1) checking
cfg.force_fp32_accumulation and if True, upcasting the tensor to float32 before
the dist.all_reduce call, then downcasting it back to its original dtype after
the operation, and (2) ensure the tensor is contiguous by calling .contiguous()
on it before passing to dist.all_reduce, matching the behavior in
_ordered_rank_reference.
- Line 101: The `_torch_all_reduce` function at line 101 passes a tensor
directly to `dist.all_reduce` without ensuring it is contiguous, while the
`_ordered_rank_reference` function at line 112 properly calls
`tensor.detach().contiguous()` before collective operations. To maintain
consistency and avoid runtime errors from the NCCL backend (which requires
contiguous tensors), modify the tensor argument in `_torch_all_reduce` to call
`.contiguous()` on it before passing to the `dist.all_reduce` call, matching the
defensive approach used in `_ordered_rank_reference`.
In `@tests/distributed/test_deterministic_allreduce.py`:
- Around line 26-42: Create a pytest fixture that snapshots the complete
os.environ state before each test executes and restores it fully after the test
completes. Apply this fixture to both
test_configure_deterministic_nccl_env_preserves_existing_value and
test_configure_deterministic_nccl_env_can_overwrite functions to prevent
environment variable mutations from leaking across tests and causing
order-dependent or environment-dependent test failures.
---
Nitpick comments:
In `@docs/distributed/deterministic_allreduce.md`:
- Around line 19-33: The Ordered-Rank Reference section describes an optional
FP32 accumulation step but does not document the corresponding
`force_fp32_accumulation` configuration flag. Add a brief explanation or note in
the Ordered-Rank Reference section that clarifies what the
`force_fp32_accumulation` flag does, when users should enable it (specifically
for achieving bitwise reproducibility across different hardware or precision
contexts), and optionally cross-reference the configuration documentation if it
exists elsewhere. This should be inserted after the step list or in a separate
paragraph to help users understand this reproducibility option.
- Around line 37-57: The documentation shows the
configure_deterministic_nccl_env() approach for programmatic NCCL setup but does
not clarify how it relates to the manual environment variable setting approach
shown in the smoke-test example later in the document. Add a clarifying note in
the section discussing configure_deterministic_nccl_env() that explicitly states
these are two alternative approaches: one using programmatic in-process
initialization with the function call, and one using shell environment setup
before running torchrun. Include guidance on which approach to use based on user
context, such as when to choose programmatic setup versus manual env var
configuration.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1e17992c-707e-431d-9a23-72a83d09394b
📒 Files selected for processing (7)
docs/.nav.ymldocs/distributed/deterministic_allreduce.mddocs/distributed/deterministic_allreduce_audit.mdrl_engine/distributed/__init__.pyrl_engine/distributed/deterministic_allreduce.pytests/distributed/test_deterministic_allreduce.pytests/distributed/test_dp_gradient_determinism.py
| def _torch_all_reduce( | ||
| tensor: torch.Tensor, | ||
| cfg: DeterministicAllReduceConfig, | ||
| world_size: int, | ||
| ) -> torch.Tensor: | ||
| dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=cfg.group, async_op=False) | ||
| if cfg.op == "mean": | ||
| tensor.div_(world_size) | ||
| return tensor |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟡 Minor
🧩 Analysis chain
🌐 Web query:
Does torch.distributed.all_reduce with NCCL accumulate in the input tensor's native dtype (e.g. float16) or in float32?
💡 Result:
When using torch.distributed.all_reduce with the NCCL backend, the accumulation precision generally depends on the specific implementation and configuration rather than a blanket rule, though it frequently defaults to the input tensor's native data type (e.g., float16 or bfloat16) to ensure communication efficiency [1][2]. However, there are important nuances to this behavior: 1. Standard NCCL Accumulation: In standard NCCL collective operations, data is typically communicated and accumulated in its native dtype [1][2]. If you pass float16 tensors to all_reduce, the summation is performed using float16 precision [1]. 2. High-Precision Symmetric Accumulation: PyTorch supports a feature known as symmetric memory / symmetric kernels for certain NCCL collectives (like all_reduce). When this is active, NCCL may automatically perform internal reductions with higher precision (e.g., converting float16 or bfloat16 inputs to float32 for accumulation) before casting the result back to the original format [3]. This is designed to improve numerical accuracy without requiring user code changes [3]. 3. Specialized Accumulation for Low-Precision Types: For specific low-precision types supported by NCCL's device-level APIs (such as fp8), NCCL explicitly accumulates in a wider, more precise type (e.g., float16) during the reduction process [4][5]. 4. Algorithmic Impact: Beyond just the dtype, the order of summation—which is affected by the NCCL algorithm selected (e.g., Ring vs. Tree)—can also significantly impact numerical precision and determinism [6][7][2]. Ring-based algorithms, which are often used for peak bandwidth, may exhibit lower numerical precision compared to tree-based algorithms because they do not balance the operands as effectively [7][2]. If numerical stability is a primary concern for your application, it is generally recommended to ensure your tensors are in float32 before calling all_reduce, or to ensure that the environment is configured to utilize high-precision kernels if available [1][2].
Citations:
- 1: https://discuss.pytorch.org/t/does-nccl-allreduce-use-fp16/141461
- 2: half precision reduction accumulation in fp32? NVIDIA/nccl#1026
- 3: https://docs.pytorch.org/docs/2.11/_sources/distributed.md.txt
- 4: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/device_reducecopy.html
- 5: https://docs.nvidia.com/deeplearning/nccl/archives/nccl_2303/user-guide/docs/api/device_reducecopy.html
- 6: https://github.com/sgl-project/sglang/blob/efee62ef/sgl-kernel/tests/test_amd_nccl_allreduce_determinism.py
- 7: [Bug] NCCL all_reduce failed with A800 when NCCL_ALGO uses Ring NVIDIA/nccl#1055
🏁 Script executed:
# First, let's examine the full context of the file in question
wc -l rl_engine/distributed/deterministic_allreduce.pyRepository: RL-Align/RL-Kernel
Length of output: 209
🏁 Script executed:
# Search for force_fp32_accumulation usage in the file
rg "force_fp32_accumulation" rl_engine/distributed/deterministic_allreduce.py -A 2 -B 2Repository: RL-Align/RL-Kernel
Length of output: 628
🏁 Script executed:
# Look for the _ordered_rank_reference function to see how it uses force_fp32_accumulation
rg "_ordered_rank_reference" rl_engine/distributed/deterministic_allreduce.py -A 10 -B 2Repository: RL-Align/RL-Kernel
Length of output: 1231
🏁 Script executed:
# Check if .contiguous() is used in _torch_all_reduce or elsewhere
rg "contiguous" rl_engine/distributed/deterministic_allreduce.py -A 2 -B 2Repository: RL-Align/RL-Kernel
Length of output: 356
🏁 Script executed:
# Get the full implementation to understand the context better
cat -n rl_engine/distributed/deterministic_allreduce.py | head -130 | tail -50Repository: RL-Align/RL-Kernel
Length of output: 2369
🏁 Script executed:
# Find and examine the _accumulation_dtype function
rg "_accumulation_dtype" rl_engine/distributed/deterministic_allreduce.py -A 5 -B 2Repository: RL-Align/RL-Kernel
Length of output: 762
🏁 Script executed:
# Check the DeterministicAllReduceConfig dataclass definition completely
rg "class DeterministicAllReduceConfig" rl_engine/distributed/deterministic_allreduce.py -A 15Repository: RL-Align/RL-Kernel
Length of output: 819
force_fp32_accumulation is silently ignored in _torch_all_reduce mode.
The config defaults force_fp32_accumulation=True, and _ordered_rank_reference honors it (line 118), but _torch_all_reduce never reads it. Since NCCL's default behavior accumulates in the tensor's native dtype (float16/bfloat16), the fp32-accumulation guarantee implied by the config is not provided in this mode. Either honor the flag (e.g., upcast → all_reduce → downcast) or emit a warning.
Additionally, _torch_all_reduce passes the tensor directly to dist.all_reduce without calling .contiguous(), whereas _ordered_rank_reference explicitly calls it (line 112). NCCL requires contiguous buffers; this inconsistency may cause issues.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rl_engine/distributed/deterministic_allreduce.py` around lines 96 - 104, The
_torch_all_reduce function has two issues: it ignores the
cfg.force_fp32_accumulation configuration flag (which defaults to True) and does
not call .contiguous() on the tensor before passing it to dist.all_reduce. To
fix this, follow the pattern used in _ordered_rank_reference by (1) checking
cfg.force_fp32_accumulation and if True, upcasting the tensor to float32 before
the dist.all_reduce call, then downcasting it back to its original dtype after
the operation, and (2) ensure the tensor is contiguous by calling .contiguous()
on it before passing to dist.all_reduce, matching the behavior in
_ordered_rank_reference.
| cfg: DeterministicAllReduceConfig, | ||
| world_size: int, | ||
| ) -> torch.Tensor: | ||
| dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=cfg.group, async_op=False) |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟡 Minor
🧩 Analysis chain
🌐 Web query:
Does torch.distributed.all_reduce with the NCCL backend require contiguous tensors, and what happens with a non-contiguous input?
💡 Result:
Yes, torch.distributed.all_reduce using the NCCL backend requires input tensors to be contiguous [1]. In the PyTorch C++ implementation for the NCCL backend, explicit checks are performed to ensure that all input and output tensors are contiguous [1]. If a non-contiguous tensor is passed to a collective operation that requires contiguity, these checks typically raise a runtime error (e.g., a TORCH_CHECK failure) to prevent undefined behavior or data corruption [1]. The requirement for contiguity exists because NCCL expects data buffers to be stored in a single, continuous block of memory to perform efficient, high-performance collective communications across devices [2]. When a tensor is non-contiguous (e.g., due to slicing or permuting), its memory layout does not match the flat memory representation that NCCL requires [2]. While some higher-level PyTorch collective functions may implicitly handle non-contiguous tensors by performing a clone/copy into a contiguous buffer before passing them to the backend [3], core distributed primitives often mandate that the user provide contiguous tensors [2][1]. Users are generally expected to ensure their tensors are contiguous, often by calling.contiguous on the tensor before passing it to an all_reduce operation [2]. Passing non-contiguous tensors to operations that do not explicitly perform this check can lead to incorrect computational results or internal NCCL errors [4][5].
Citations:
- 1: https://github.com/pytorch/pytorch/blob/main/torch/csrc/cuda/nccl.cpp
- 2: [RFC] Communication primitives for non-contiguous tensors pytorch/pytorch#177427
- 3: Functional Collectives shouldn't assert contiguous inputs pytorch/pytorch#177902
- 4: Distributed 'gather' with the NCCL backend returns wrong results on noncontiguous tensors. pytorch/pytorch#159548
- 5:
ncclInternalError: Internal check failedwhen usingirecvwith non-contiguous tensor in NCCL backend NVIDIA/nccl#1655
🏁 Script executed:
wc -l rl_engine/distributed/deterministic_allreduce.pyRepository: RL-Align/RL-Kernel
Length of output: 209
🏁 Script executed:
cat -n rl_engine/distributed/deterministic_allreduce.pyRepository: RL-Align/RL-Kernel
Length of output: 6068
🏁 Script executed:
rg -n "deterministic_all_reduce" --type py -A 3 -B 1Repository: RL-Align/RL-Kernel
Length of output: 156
🏁 Script executed:
rg "deterministic_all_reduce" -nRepository: RL-Align/RL-Kernel
Length of output: 156
🏁 Script executed:
fd -type f -name "*.py" | xargs rg "from.*deterministic_allreduce|import.*deterministic_allreduce" -lRepository: RL-Align/RL-Kernel
Length of output: 361
🏁 Script executed:
cat -n rl_engine/distributed/__init__.pyRepository: RL-Align/RL-Kernel
Length of output: 696
🏁 Script executed:
rg "deterministic_all_reduce" --all-types -B 2 -A 3Repository: RL-Align/RL-Kernel
Length of output: 570
🏁 Script executed:
find . -type f -name "*.py" -exec grep -l "deterministic_all_reduce" {} \;Repository: RL-Align/RL-Kernel
Length of output: 347
🏁 Script executed:
find . -type f -name "*.py" -exec grep -l "from rl_engine.distributed import\|import rl_engine.distributed" {} \;Repository: RL-Align/RL-Kernel
Length of output: 260
🏁 Script executed:
cat -n ./tests/distributed/test_deterministic_allreduce.pyRepository: RL-Align/RL-Kernel
Length of output: 12024
🏁 Script executed:
cat -n ./tests/distributed/test_dp_gradient_determinism.pyRepository: RL-Align/RL-Kernel
Length of output: 13969
Fix contiguity handling in _torch_all_reduce to match _ordered_rank_reference.
_ordered_rank_reference defensively calls tensor.detach().contiguous() before collective operations (line 112), but _torch_all_reduce passes tensor directly to dist.all_reduce at line 101. The NCCL backend requires contiguous tensors and will raise a runtime error if a non-contiguous tensor (e.g., from gradient operations or slicing) is passed. Add .contiguous() handling to _torch_all_reduce to ensure consistent and robust behavior across both code paths.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rl_engine/distributed/deterministic_allreduce.py` at line 101, The
`_torch_all_reduce` function at line 101 passes a tensor directly to
`dist.all_reduce` without ensuring it is contiguous, while the
`_ordered_rank_reference` function at line 112 properly calls
`tensor.detach().contiguous()` before collective operations. To maintain
consistency and avoid runtime errors from the NCCL backend (which requires
contiguous tensors), modify the tensor argument in `_torch_all_reduce` to call
`.contiguous()` on it before passing to the `dist.all_reduce` call, matching the
defensive approach used in `_ordered_rank_reference`.
| def test_configure_deterministic_nccl_env_preserves_existing_value(monkeypatch): | ||
| monkeypatch.setenv("NCCL_ALGO", "Tree") | ||
| with pytest.warns(RuntimeWarning, match="NCCL_ALGO"): | ||
| previous = configure_deterministic_nccl_env() | ||
|
|
||
| assert previous["NCCL_ALGO"] == "Tree" | ||
| assert os.environ["NCCL_ALGO"] == "Tree" | ||
| for key, value in DETERMINISTIC_NCCL_ENV.items(): | ||
| if key != "NCCL_ALGO": | ||
| assert os.environ[key] == value | ||
|
|
||
|
|
||
| def test_configure_deterministic_nccl_env_can_overwrite(monkeypatch): | ||
| monkeypatch.setenv("NCCL_ALGO", "Tree") | ||
| configure_deterministic_nccl_env(overwrite=True) | ||
|
|
||
| assert os.environ["NCCL_ALGO"] == "Ring" |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Env mutations leak across tests and make the preserve-test environment-dependent.
configure_deterministic_nccl_env writes the DETERMINISTIC_NCCL_ENV keys directly into os.environ, but monkeypatch only tracks/restores NCCL_ALGO. Consequently:
- The remaining keys (e.g.
NCCL_PROTO, etc.) persist into later tests and into the subprocess smoke tests (which copyos.environ), causing order-dependent state. - Line 33-35 asserts every non-
NCCL_ALGOkey equals its default. If any of those vars is already set to a different value in the runtime (common on real GPU/CI nodes),configure_deterministic_nccl_envpreserves the preset value, and this assertion fails — making the test environment-dependent rather than hermetic.
Snapshot/restore the full environment for these two tests.
🧹 Suggested fixture to isolate env state
`@pytest.fixture`
def _isolated_env():
snapshot = dict(os.environ)
try:
yield
finally:
os.environ.clear()
os.environ.update(snapshot)
def test_configure_deterministic_nccl_env_preserves_existing_value(monkeypatch, _isolated_env):
...
def test_configure_deterministic_nccl_env_can_overwrite(monkeypatch, _isolated_env):
...🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/distributed/test_deterministic_allreduce.py` around lines 26 - 42,
Create a pytest fixture that snapshots the complete os.environ state before each
test executes and restores it fully after the test completes. Apply this fixture
to both test_configure_deterministic_nccl_env_preserves_existing_value and
test_configure_deterministic_nccl_env_can_overwrite functions to prevent
environment variable mutations from leaking across tests and causing
order-dependent or environment-dependent test failures.
| cfg: DeterministicAllReduceConfig, | ||
| world_size: int, | ||
| ) -> torch.Tensor: | ||
| dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=cfg.group, async_op=False) |
There was a problem hiding this comment.
torch_all_reduce can silently give wrong results for non-contiguous tensors. I reproduced it with a 2-rank Gloo run on a base[:, ::2] view. Could we make it use a contiguous buffer and copy back, or reject non-contiguous inputs? A small smoke test would cover this.
There was a problem hiding this comment.
Thanks for catching this. I added a contiguity check for torch_all_reduce and a 2-rank Gloo smoke test covering a base[:, ::2] view in a new pr.
Summary
Part of #112.
This PR adds the first deterministic all-reduce slice for WS2:
rl_engine.distributedall-reduce helper;API
deterministic_all_reduce(tensor, config)reduces the input tensor in place.Modes:
torch_all_reduce: usestorch.distributed.all_reduce.ordered_rank_reference: gathers rank-local tensors, accumulates them on process-group rank 0 in process-group rank order, then broadcasts the result.The NCCL environment helper is best-effort and must run before process-group initialization. It does not claim NVLS or hardware-wide bitwise determinism.
Validation
Results:
Manual CUDA/NCCL smoke on the local 2-rank CUDA 12.8 environment:
Results:
Limitations
Summary by CodeRabbit