Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/.nav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ nav:
- General:
- contributing/*
- contributing/operator-doc-template.md
- Distributed:
- distributed/*
- Design Documents:
- design/*
- Benchmarking:
Expand Down
100 changes: 100 additions & 0 deletions docs/distributed/deterministic_allreduce.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Deterministic All-Reduce

RL-Kernel provides a small all-reduce helper for distributed smoke tests and
future WS2 integration work. It has two modes:

- `torch_all_reduce`: calls `torch.distributed.all_reduce`.
- `ordered_rank_reference`: gathers all rank tensors, accumulates them on process-group rank 0 in process-group rank order, then broadcasts the result.

The helper reduces the input tensor in place and returns it.

## Contract

Results are expected to be stable only when the world size, process-group rank
order, inputs, dtype, operation, backend, and environment are unchanged.

`op="mean"` performs a sum and divides by world size at a fixed point. Integer
tensors are rejected for `mean`.

## Ordered-Rank Reference

`ordered_rank_reference` is a reference path, not a high-performance transport.
It uses `all_gather` and `broadcast`, so the active backend must support those
collectives for the tensor device. The operation order is:

1. make each rank input contiguous;
2. gather tensors in process-group rank order;
3. accumulate on process-group rank 0 in that order;
4. optionally accumulate floating-point inputs in FP32;
5. divide once for `op="mean"`;
6. broadcast from process-group rank 0.

This mode is meant for small tensors in tests, debug runs, and reference
comparisons.

## Torch All-Reduce

`torch_all_reduce` is a thin wrapper around `torch.distributed.all_reduce`. For
NCCL runs, callers may set best-effort ring settings before process-group
initialization:

```python
from rl_engine.distributed import configure_deterministic_nccl_env

configure_deterministic_nccl_env(overwrite=True)
```

The helper writes:

```bash
NCCL_ALGO=Ring
NCCL_PROTO=Simple
NCCL_MIN_NCHANNELS=1
NCCL_MAX_NCHANNELS=1
```

These settings do not prove bitwise determinism. Validate on the target machine
before making a hardware-specific claim.

## Behavior

- `world_size == 1`: returns the input tensor unchanged.
- no initialized process group and `WORLD_SIZE <= 1`: returns the input tensor unchanged.
- no initialized process group and `WORLD_SIZE > 1`: raises `RuntimeError`.
- `async_op=True`: raises `NotImplementedError`.

## Smoke Tests

Unit and CPU/Gloo smoke checks:

```bash
PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 python -m pytest tests/distributed/test_deterministic_allreduce.py -q
```

Manual NCCL all-reduce smoke:

```bash
CUDA_VISIBLE_DEVICES=0,1 \
NCCL_ALGO=Ring \
NCCL_PROTO=Simple \
NCCL_MIN_NCHANNELS=1 \
NCCL_MAX_NCHANNELS=1 \
torchrun --standalone --nproc_per_node=2 \
tests/distributed/test_deterministic_allreduce.py \
--backend nccl --mode torch_all_reduce --dtype fp32 --device cuda
```

DP gradient smoke compares a fixed DP=1 full-batch gradient with DP=N local
gradients reduced by this helper:

```bash
torchrun --standalone --nproc_per_node=2 \
tests/distributed/test_dp_gradient_determinism.py \
--backend gloo --mode ordered_rank_reference --dtype fp32 --device cpu
```

## Limitations

- NVLS / NVLink-Sharp is not implemented or claimed here.
- Multi-node and RDMA behavior are not validated here.
- DeepSpeed gradient synchronization is not controlled by this helper yet.
46 changes: 46 additions & 0 deletions docs/distributed/deterministic_allreduce_audit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Deterministic All-Reduce Audit

This audit records the distributed communication points relevant to
[RL-Align/RL-Kernel#112](https://github.com/RL-Align/RL-Kernel/issues/112).

## Search

```bash
rg -n "all_reduce|allreduce|reduce_scatter|all_gather|DistributedDataParallel|FSDP|deepspeed|gradient" \
rl_engine csrc tests examples benchmarks scripts docs .github
rg -n "torch\.distributed|distributed|dist\.|process_group|ProcessGroup|nccl|NCCL|reduce|all_reduce|reduce_scatter|all_gather|gradient" \
rl_engine csrc tests examples benchmarks scripts docs .github
```

## Summary

No direct `torch.distributed` all-reduce, reduce-scatter, all-gather, DDP, or
FSDP call sites were found in RL-Kernel source code. The current DP-gradient
communication risk is indirect: `DeepSpeedTrainingWorker` delegates backward and
optimizer behavior to the optional DeepSpeed engine.

CUDA IPC uses of `torch.multiprocessing.reductions.reduce_tensor` are not
collective reductions. They serialize CUDA IPC handles for same-node weight
handoff.

## Inventory

| Location | Kind | In scope for #112 | Handling |
| --- | --- | --- | --- |
| `rl_engine/executors/deepspeed_trainer.py` `DeepSpeedTrainingWorker.train` | Backward / optimizer delegation to DeepSpeed | Yes, indirectly | Do not claim control over DeepSpeed communication order until a tested integration point exists. |
| `rl_engine/executors/deepspeed_trainer.py` `deepspeed.initialize(...)` | Optional distributed runtime setup | Yes, indirectly | Keep missing-DeepSpeed behavior explicit. Any future integration must document the DeepSpeed hook used for gradient reduction. |
| `tests/test_deepspeed_training_worker.py` fake engine tests | Unit tests for worker delegation | Adjacent | These tests prove delegation only; they do not validate distributed gradient ordering. |
| `rl_engine/executors/bridge.py` CUDA IPC `reduce_tensor` use | CUDA IPC handle serialization | No | Keep out of all-reduce scope. |
| `rl_engine/executors/bridge.py` multi-node/RDMA/NCCL transport blockers | Unsupported weight transport guards | Adjacent | Preserve explicit blockers until a tested transport exists. |
| `rl_engine/utils/logger.py` `info_on_rank` | Rank-filtered logging | No | No numeric reduction behavior. |

## Entry Point

New distributed code should route through `rl_engine.distributed` so the
all-reduce contract and fallback/reference behavior stay testable in one place.

## Not Covered

- NVLS / NVLink-Sharp.
- Multi-node or RDMA collectives.
- DeepSpeed internal gradient synchronization order.
16 changes: 16 additions & 0 deletions rl_engine/distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2026 RL-Kernel Contributors

from rl_engine.distributed.deterministic_allreduce import (
DETERMINISTIC_NCCL_ENV,
DeterministicAllReduceConfig,
configure_deterministic_nccl_env,
deterministic_all_reduce,
)

__all__ = [
"DETERMINISTIC_NCCL_ENV",
"DeterministicAllReduceConfig",
"configure_deterministic_nccl_env",
"deterministic_all_reduce",
]
147 changes: 147 additions & 0 deletions rl_engine/distributed/deterministic_allreduce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2026 RL-Kernel Contributors

from __future__ import annotations

import os
import warnings
from dataclasses import dataclass
from typing import Literal, Optional

import torch
import torch.distributed as dist

DETERMINISTIC_NCCL_ENV = {
"NCCL_ALGO": "Ring",
"NCCL_PROTO": "Simple",
"NCCL_MIN_NCHANNELS": "1",
"NCCL_MAX_NCHANNELS": "1",
}


@dataclass(frozen=True)
class DeterministicAllReduceConfig:
"""Options for :func:`deterministic_all_reduce`."""

mode: Literal["torch_all_reduce", "ordered_rank_reference"] = "torch_all_reduce"
op: Literal["sum", "mean"] = "sum"
force_fp32_accumulation: bool = True
async_op: bool = False
group: Optional[dist.ProcessGroup] = None


def configure_deterministic_nccl_env(*, overwrite: bool = False) -> dict[str, Optional[str]]:
"""Set best-effort NCCL ring settings before process-group init."""

if dist.is_available() and dist.is_initialized():
warnings.warn(
"NCCL environment was configured after torch.distributed initialization",
RuntimeWarning,
stacklevel=2,
)

previous: dict[str, Optional[str]] = {}
for key, value in DETERMINISTIC_NCCL_ENV.items():
previous[key] = os.environ.get(key)
if overwrite or key not in os.environ:
os.environ[key] = value
continue
if os.environ[key] != value:
warnings.warn(
f"{key} is {os.environ[key]!r}; expected {value!r}",
RuntimeWarning,
stacklevel=2,
)
return previous


def deterministic_all_reduce(
tensor: torch.Tensor,
config: Optional[DeterministicAllReduceConfig] = None,
) -> torch.Tensor:
"""Reduce ``tensor`` in place and return it."""

cfg = config or DeterministicAllReduceConfig()
_validate(tensor, cfg)

if cfg.async_op:
raise NotImplementedError("async deterministic all-reduce is not implemented")
if not dist.is_available():
raise RuntimeError("torch.distributed is unavailable")
if not dist.is_initialized():
if int(os.environ.get("WORLD_SIZE", "1")) > 1:
raise RuntimeError("torch.distributed is not initialized")
return tensor

world_size = dist.get_world_size(group=cfg.group)
if world_size == 1:
return tensor

if cfg.mode == "torch_all_reduce":
return _torch_all_reduce(tensor, cfg, world_size)
return _ordered_rank_reference(tensor, cfg, world_size)


def _validate(tensor: torch.Tensor, cfg: DeterministicAllReduceConfig) -> None:
if not isinstance(tensor, torch.Tensor):
raise TypeError(f"tensor must be a torch.Tensor, got {type(tensor)!r}")
if cfg.mode not in {"torch_all_reduce", "ordered_rank_reference"}:
raise ValueError(f"unsupported all-reduce mode: {cfg.mode!r}")
if cfg.op not in {"sum", "mean"}:
raise ValueError(f"unsupported reduction op: {cfg.op!r}")
if cfg.op == "mean" and not (tensor.is_floating_point() or tensor.is_complex()):
raise TypeError("op='mean' requires a floating-point or complex tensor")


def _torch_all_reduce(
tensor: torch.Tensor,
cfg: DeterministicAllReduceConfig,
world_size: int,
) -> torch.Tensor:
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=cfg.group, async_op=False)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Does torch.distributed.all_reduce with the NCCL backend require contiguous tensors, and what happens with a non-contiguous input?

💡 Result:

Yes, torch.distributed.all_reduce using the NCCL backend requires input tensors to be contiguous [1]. In the PyTorch C++ implementation for the NCCL backend, explicit checks are performed to ensure that all input and output tensors are contiguous [1]. If a non-contiguous tensor is passed to a collective operation that requires contiguity, these checks typically raise a runtime error (e.g., a TORCH_CHECK failure) to prevent undefined behavior or data corruption [1]. The requirement for contiguity exists because NCCL expects data buffers to be stored in a single, continuous block of memory to perform efficient, high-performance collective communications across devices [2]. When a tensor is non-contiguous (e.g., due to slicing or permuting), its memory layout does not match the flat memory representation that NCCL requires [2]. While some higher-level PyTorch collective functions may implicitly handle non-contiguous tensors by performing a clone/copy into a contiguous buffer before passing them to the backend [3], core distributed primitives often mandate that the user provide contiguous tensors [2][1]. Users are generally expected to ensure their tensors are contiguous, often by calling.contiguous on the tensor before passing it to an all_reduce operation [2]. Passing non-contiguous tensors to operations that do not explicitly perform this check can lead to incorrect computational results or internal NCCL errors [4][5].

Citations:


🏁 Script executed:

wc -l rl_engine/distributed/deterministic_allreduce.py

Repository: RL-Align/RL-Kernel

Length of output: 209


🏁 Script executed:

cat -n rl_engine/distributed/deterministic_allreduce.py

Repository: RL-Align/RL-Kernel

Length of output: 6068


🏁 Script executed:

rg -n "deterministic_all_reduce" --type py -A 3 -B 1

Repository: RL-Align/RL-Kernel

Length of output: 156


🏁 Script executed:

rg "deterministic_all_reduce" -n

Repository: RL-Align/RL-Kernel

Length of output: 156


🏁 Script executed:

fd -type f -name "*.py" | xargs rg "from.*deterministic_allreduce|import.*deterministic_allreduce" -l

Repository: RL-Align/RL-Kernel

Length of output: 361


🏁 Script executed:

cat -n rl_engine/distributed/__init__.py

Repository: RL-Align/RL-Kernel

Length of output: 696


🏁 Script executed:

rg "deterministic_all_reduce" --all-types -B 2 -A 3

Repository: RL-Align/RL-Kernel

Length of output: 570


🏁 Script executed:

find . -type f -name "*.py" -exec grep -l "deterministic_all_reduce" {} \;

Repository: RL-Align/RL-Kernel

Length of output: 347


🏁 Script executed:

find . -type f -name "*.py" -exec grep -l "from rl_engine.distributed import\|import rl_engine.distributed" {} \;

Repository: RL-Align/RL-Kernel

Length of output: 260


🏁 Script executed:

cat -n ./tests/distributed/test_deterministic_allreduce.py

Repository: RL-Align/RL-Kernel

Length of output: 12024


🏁 Script executed:

cat -n ./tests/distributed/test_dp_gradient_determinism.py

Repository: RL-Align/RL-Kernel

Length of output: 13969


Fix contiguity handling in _torch_all_reduce to match _ordered_rank_reference.

_ordered_rank_reference defensively calls tensor.detach().contiguous() before collective operations (line 112), but _torch_all_reduce passes tensor directly to dist.all_reduce at line 101. The NCCL backend requires contiguous tensors and will raise a runtime error if a non-contiguous tensor (e.g., from gradient operations or slicing) is passed. Add .contiguous() handling to _torch_all_reduce to ensure consistent and robust behavior across both code paths.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rl_engine/distributed/deterministic_allreduce.py` at line 101, The
`_torch_all_reduce` function at line 101 passes a tensor directly to
`dist.all_reduce` without ensuring it is contiguous, while the
`_ordered_rank_reference` function at line 112 properly calls
`tensor.detach().contiguous()` before collective operations. To maintain
consistency and avoid runtime errors from the NCCL backend (which requires
contiguous tensors), modify the tensor argument in `_torch_all_reduce` to call
`.contiguous()` on it before passing to the `dist.all_reduce` call, matching the
defensive approach used in `_ordered_rank_reference`.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

torch_all_reduce can silently give wrong results for non-contiguous tensors. I reproduced it with a 2-rank Gloo run on a base[:, ::2] view. Could we make it use a contiguous buffer and copy back, or reject non-contiguous inputs? A small smoke test would cover this.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. I added a contiguity check for torch_all_reduce and a 2-rank Gloo smoke test covering a base[:, ::2] view in a new pr.

if cfg.op == "mean":
tensor.div_(world_size)
return tensor
Comment on lines +96 to +104

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Does torch.distributed.all_reduce with NCCL accumulate in the input tensor's native dtype (e.g. float16) or in float32?

💡 Result:

When using torch.distributed.all_reduce with the NCCL backend, the accumulation precision generally depends on the specific implementation and configuration rather than a blanket rule, though it frequently defaults to the input tensor's native data type (e.g., float16 or bfloat16) to ensure communication efficiency [1][2]. However, there are important nuances to this behavior: 1. Standard NCCL Accumulation: In standard NCCL collective operations, data is typically communicated and accumulated in its native dtype [1][2]. If you pass float16 tensors to all_reduce, the summation is performed using float16 precision [1]. 2. High-Precision Symmetric Accumulation: PyTorch supports a feature known as symmetric memory / symmetric kernels for certain NCCL collectives (like all_reduce). When this is active, NCCL may automatically perform internal reductions with higher precision (e.g., converting float16 or bfloat16 inputs to float32 for accumulation) before casting the result back to the original format [3]. This is designed to improve numerical accuracy without requiring user code changes [3]. 3. Specialized Accumulation for Low-Precision Types: For specific low-precision types supported by NCCL's device-level APIs (such as fp8), NCCL explicitly accumulates in a wider, more precise type (e.g., float16) during the reduction process [4][5]. 4. Algorithmic Impact: Beyond just the dtype, the order of summation—which is affected by the NCCL algorithm selected (e.g., Ring vs. Tree)—can also significantly impact numerical precision and determinism [6][7][2]. Ring-based algorithms, which are often used for peak bandwidth, may exhibit lower numerical precision compared to tree-based algorithms because they do not balance the operands as effectively [7][2]. If numerical stability is a primary concern for your application, it is generally recommended to ensure your tensors are in float32 before calling all_reduce, or to ensure that the environment is configured to utilize high-precision kernels if available [1][2].

Citations:


🏁 Script executed:

# First, let's examine the full context of the file in question
wc -l rl_engine/distributed/deterministic_allreduce.py

Repository: RL-Align/RL-Kernel

Length of output: 209


🏁 Script executed:

# Search for force_fp32_accumulation usage in the file
rg "force_fp32_accumulation" rl_engine/distributed/deterministic_allreduce.py -A 2 -B 2

Repository: RL-Align/RL-Kernel

Length of output: 628


🏁 Script executed:

# Look for the _ordered_rank_reference function to see how it uses force_fp32_accumulation
rg "_ordered_rank_reference" rl_engine/distributed/deterministic_allreduce.py -A 10 -B 2

Repository: RL-Align/RL-Kernel

Length of output: 1231


🏁 Script executed:

# Check if .contiguous() is used in _torch_all_reduce or elsewhere
rg "contiguous" rl_engine/distributed/deterministic_allreduce.py -A 2 -B 2

Repository: RL-Align/RL-Kernel

Length of output: 356


🏁 Script executed:

# Get the full implementation to understand the context better
cat -n rl_engine/distributed/deterministic_allreduce.py | head -130 | tail -50

Repository: RL-Align/RL-Kernel

Length of output: 2369


🏁 Script executed:

# Find and examine the _accumulation_dtype function
rg "_accumulation_dtype" rl_engine/distributed/deterministic_allreduce.py -A 5 -B 2

Repository: RL-Align/RL-Kernel

Length of output: 762


🏁 Script executed:

# Check the DeterministicAllReduceConfig dataclass definition completely
rg "class DeterministicAllReduceConfig" rl_engine/distributed/deterministic_allreduce.py -A 15

Repository: RL-Align/RL-Kernel

Length of output: 819


force_fp32_accumulation is silently ignored in _torch_all_reduce mode.

The config defaults force_fp32_accumulation=True, and _ordered_rank_reference honors it (line 118), but _torch_all_reduce never reads it. Since NCCL's default behavior accumulates in the tensor's native dtype (float16/bfloat16), the fp32-accumulation guarantee implied by the config is not provided in this mode. Either honor the flag (e.g., upcast → all_reduce → downcast) or emit a warning.

Additionally, _torch_all_reduce passes the tensor directly to dist.all_reduce without calling .contiguous(), whereas _ordered_rank_reference explicitly calls it (line 112). NCCL requires contiguous buffers; this inconsistency may cause issues.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rl_engine/distributed/deterministic_allreduce.py` around lines 96 - 104, The
_torch_all_reduce function has two issues: it ignores the
cfg.force_fp32_accumulation configuration flag (which defaults to True) and does
not call .contiguous() on the tensor before passing it to dist.all_reduce. To
fix this, follow the pattern used in _ordered_rank_reference by (1) checking
cfg.force_fp32_accumulation and if True, upcasting the tensor to float32 before
the dist.all_reduce call, then downcasting it back to its original dtype after
the operation, and (2) ensure the tensor is contiguous by calling .contiguous()
on it before passing to dist.all_reduce, matching the behavior in
_ordered_rank_reference.



def _ordered_rank_reference(
tensor: torch.Tensor,
cfg: DeterministicAllReduceConfig,
world_size: int,
) -> torch.Tensor:
send = tensor.detach().contiguous()
gathered = [torch.empty_like(send) for _ in range(world_size)]
dist.all_gather(gathered, send, group=cfg.group)

result = torch.empty_like(send)
if dist.get_rank(group=cfg.group) == 0:
dtype = _accumulation_dtype(send, cfg.force_fp32_accumulation)
reduced = gathered[0].to(dtype=dtype)
for item in gathered[1:]:
reduced.add_(item.to(dtype=dtype))
if cfg.op == "mean":
reduced.div_(world_size)
result.copy_(reduced.to(dtype=send.dtype))

dist.broadcast(result, src=_group_root_global_rank(cfg.group), group=cfg.group)
tensor.copy_(result.view_as(tensor))
return tensor


def _group_root_global_rank(group: Optional[dist.ProcessGroup]) -> int:
if group is None:
return 0
try:
return int(dist.get_global_rank(group, 0))
except AttributeError as exc:
raise RuntimeError(
"custom process groups require torch.distributed.get_global_rank"
) from exc


def _accumulation_dtype(tensor: torch.Tensor, force_fp32: bool) -> torch.dtype:
if not force_fp32 or not tensor.is_floating_point():
return tensor.dtype
if tensor.dtype == torch.float64:
return torch.float64
return torch.float32
Loading
Loading