From 2bccb64790cccf67a1d6d76c359170744177771e Mon Sep 17 00:00:00 2001 From: CyberSecurityErial <2710555967@qq.com> Date: Mon, 22 Jun 2026 23:50:18 +0800 Subject: [PATCH 1/5] docs(distributed): audit all-reduce call sites for issue 112 --- docs/.nav.yml | 2 + .../deterministic_allreduce_audit.md | 117 ++++++++++++++++++ overnight_report_issue112.md | 89 +++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 docs/distributed/deterministic_allreduce_audit.md create mode 100644 overnight_report_issue112.md diff --git a/docs/.nav.yml b/docs/.nav.yml index 6ba2e50..a2316c9 100644 --- a/docs/.nav.yml +++ b/docs/.nav.yml @@ -19,6 +19,8 @@ nav: - General: - contributing/* - contributing/operator-doc-template.md + - Distributed: + - distributed/* - Design Documents: - design/* - Benchmarking: diff --git a/docs/distributed/deterministic_allreduce_audit.md b/docs/distributed/deterministic_allreduce_audit.md new file mode 100644 index 0000000..c1fdd1f --- /dev/null +++ b/docs/distributed/deterministic_allreduce_audit.md @@ -0,0 +1,117 @@ +# Deterministic All-Reduce Audit + +This document audits the current RL-Kernel repository for issue +[RL-Align/RL-Kernel#112](https://github.com/RL-Align/RL-Kernel/issues/112): +deterministic NCCL all-reduce, including DP gradient all-reduce. + +Issue #112 is part of the WS2 train-inference consistency roadmap in +[RL-Align/RL-Kernel#83](https://github.com/RL-Align/RL-Kernel/issues/83). The +current repository does not expose a direct deterministic all-reduce API yet, so +the first implementation should add one as the intended entry point rather than +claiming that existing distributed paths are already deterministic. + +## Audit Commands + +The initial audit used these repository-local searches: + +```bash +rg -n "all_reduce|allreduce|reduce_scatter|all_gather|DistributedDataParallel|FSDP|deepspeed|gradient" \ + rl_engine csrc tests examples benchmarks scripts docs .github +rg -n "torch\.distributed|distributed|dist\.|process_group|ProcessGroup|nccl|NCCL|reduce|all_reduce|reduce_scatter|all_gather|gradient" \ + rl_engine csrc tests examples benchmarks scripts docs .github +rg --files rl_engine csrc tests examples benchmarks scripts docs .github +``` + +## Summary + +No direct `torch.distributed` collective call sites were found in the audited +paths. In particular, no direct calls to `all_reduce`, `reduce_scatter`, +`all_gather`, `DistributedDataParallel`, or `FSDP` were found. + +The only training-side path that can imply DP gradient synchronization today is +the optional DeepSpeed worker. That synchronization, when present, is owned by +the DeepSpeed engine called from RL-Kernel, not by an RL-Kernel collective helper. + +The repository also contains CUDA IPC uses of PyTorch's `reduce_tensor`, but +those calls serialize CUDA IPC handles for same-node weight handoff. They are +not collective reductions and do not implement NCCL all-reduce. + +## Call Site Inventory + +| Location | Kind | Train-inference consistency impact | In scope for #112 | Proposed deterministic handling | +| --- | --- | --- | --- | --- | +| `rl_engine/executors/deepspeed_trainer.py` `DeepSpeedTrainingWorker.train` calls `self.engine.backward(loss)` and `self.engine.step()` | Backward / optimizer path; possible DP gradient synchronization inside DeepSpeed | Yes, when the DeepSpeed runtime is configured for DP > 1 | Yes, as the current DP-gradient-adjacent training path | Add an RL-Kernel deterministic all-reduce helper first. For DeepSpeed, do not claim ordering control until a tested integration point exists. A focused DP gradient smoke test should compare DP=1 gradients against DP=N reduced gradients under a fixed global batch and fixed seed. | +| `rl_engine/executors/deepspeed_trainer.py` `deepspeed.initialize(...)` with `zero_optimization` and `gradient_accumulation_steps` config | Optional distributed training runtime setup | Indirect | Yes, because DeepSpeed may perform gradient communication after initialization | Document that DeepSpeed communication order is not currently controlled by RL-Kernel. Keep fallback behavior explicit when DeepSpeed is missing. | +| `tests/test_deepspeed_training_worker.py` fake DeepSpeed engine tests | Unit tests for the DeepSpeed worker contract | No direct collective behavior | Adjacent only | Existing tests prove the worker delegates to `engine.backward` and `engine.step`, but they do not validate distributed gradient ordering. Add new distributed smoke tests separately instead of extending these fake-engine tests into false coverage. | +| `rl_engine/executors/bridge.py` `VLLMIPCWeightUpdateRequestBuilder._resolve_reduce_tensor` and `IPCWeightBridge._resolve_reduce_tensor` | CUDA IPC handle serialization via `torch.multiprocessing.reductions.reduce_tensor` | No collective reduction; affects same-node weight handoff only | No | Keep out of deterministic all-reduce scope. Do not rename or classify this as all-reduce. | +| `rl_engine/executors/bridge.py` `WeightLayout.validate_supported` and `make_weight_bridge` reject multi-node/RDMA/NCCL transports | Explicit unsupported transport guards | Prevents unsupported distributed weight transport from silently succeeding | Adjacent only | Preserve the explicit blockers. If a future NCCL/RDMA transport is added, it needs its own deterministic contract and validation. | +| `docs/usage/weight-sync-bridge.md` mentions vLLM IPC or NCCL public update APIs and RDMA/NCCL as not implemented | User-facing documentation | Adjacent; weight sync rather than gradient all-reduce | Adjacent only | Keep the documentation clear that current weight bridge transports are same-node or local fallbacks. Do not cite this as NCCL all-reduce validation. | +| `rl_engine/utils/logger.py` `info_on_rank` uses `device_ctx.rank` | Rank-filtered logging utility | No numeric impact | No | No deterministic collective handling needed. | + +## Direct Collectives + +None found. + +Because the repository currently has no direct collective entry point, the next +implementation should introduce a small, explicit API for deterministic +all-reduce. That API can become the reference path for future WS2 distributed +ops and the test oracle for DP gradient synchronization experiments. + +## Determinism Contract To Implement + +A deterministic all-reduce mode should define stability only under explicit +conditions: + +- same world size; +- same global rank order; +- same input tensors; +- same dtype; +- same backend mode; +- same environment and process-group configuration. + +For a slow reference path, the concrete operation order should be: + +1. gather tensors to rank 0 in ascending global rank order; +2. accumulate on rank 0 in rank order `0, 1, ..., world_size - 1`; +3. use FP32 accumulation when configured; +4. apply `sum` or `mean` at a fixed point; +5. cast back to the original dtype when needed; +6. broadcast the final tensor from rank 0 to all ranks. + +The NCCL fast path should be opt-in and should not promise cross-version or +cross-hardware bitwise determinism unless the validation proves it. A helper may +set these environment variables before process-group initialization: + +```bash +NCCL_ALGO=Ring +NCCL_PROTO=Simple +NCCL_MIN_NCHANNELS=1 +NCCL_MAX_NCHANNELS=1 +``` + +## Unsupported Or Not Yet Covered + +- NVLink-Sharp / NVLS deterministic reduce is not implemented or validated in + the repository. +- Multi-node/RDMA weight transport is explicitly unsupported by the current + weight bridge. +- DeepSpeed DP gradient communication order is not currently controlled by + RL-Kernel. +- The current audit environment has CUDA driver visibility and an isolated + PyTorch/NCCL venv under `.codex-nightly/envs/issue112-py312`, but distributed + GPU validation has not been run yet. +- No claim is made for multi-node NCCL, RDMA, FSDP, DDP, or DeepSpeed internals. + +## Next Reviewable Step + +Add `rl_engine.distributed.deterministic_allreduce` with two modes: + +- `ordered_rank_fallback`: explicit gather, rank-ordered accumulation on rank 0, + and broadcast; +- `nccl_ring`: ordinary `torch.distributed.all_reduce` after the caller has + opted into deterministic NCCL environment configuration before process-group + initialization. + +The first tests should be small distributed smoke tests that can run with +`torchrun` on CPU/Gloo for fallback behavior and on CUDA/NCCL when the runtime is +available. diff --git a/overnight_report_issue112.md b/overnight_report_issue112.md new file mode 100644 index 0000000..9c23795 --- /dev/null +++ b/overnight_report_issue112.md @@ -0,0 +1,89 @@ +# Overnight Report - RL-Kernel Issue #112 + +## Current branch + +`cse/issue-112-deterministic-nccl` + +## Environment + +- Host: `dedicated-developjob-wtl-t1wjo-7c6d5f4d56-qzkfm` +- GPU: 8x `NVIDIA L20X` reported by `nvidia-smi` +- CUDA: driver reports CUDA 12.8, driver version 570.172.08 +- PyTorch: `torch 2.11.0+cu128` in `.codex-nightly/envs/issue112-py312` +- NCCL: `torch.cuda.nccl.version()` reports `(2, 28, 9)` +- Python: system `python3` is 3.12.3; isolated venv is `.codex-nightly/envs/issue112-py312`; `python` is not on PATH +- Commit base: `a302be4593bc1715688558a7eb3d3704bf625c4d` + +## Work completed + +- Re-checked issue #112 and roadmap issue #83. +- Created local working branch `cse/issue-112-deterministic-nccl`. +- Audited repository call sites for all-reduce, reduce-scatter, all-gather, + DDP/FSDP, DeepSpeed, NCCL, and gradient synchronization keywords. +- Added a distributed audit document. +- Created an isolated workspace-local Python environment under `.codex-nightly/envs/issue112-py312`. +- Installed PyTorch CUDA 12.8 and RL-Kernel dev/docs dependencies into that isolated environment. + +## Commits created + +- `docs(distributed): audit all-reduce call sites for issue 112` + +## Files changed + +- `docs/.nav.yml` +- `docs/distributed/deterministic_allreduce_audit.md` +- `overnight_report_issue112.md` + +## Tests run + +- `git diff --check` +- `.codex-nightly/envs/issue112-py312/bin/python -c "import torch; ..."` +- `.codex-nightly/envs/issue112-py312/bin/mkdocs build --strict -f mkdocs.yaml` + +## Test results + +- `git diff --check` passed. +- PyTorch environment check passed: CUDA available, 8 devices visible, NCCL available. +- `mkdocs build --strict -f mkdocs.yaml` passed. It emitted expected git revision-date warnings for the new uncommitted audit doc. + +## GPU validation results + +No distributed GPU validation has been run yet. `nvidia-smi` is available and reports 8 GPUs. The isolated venv can import PyTorch with CUDA and NCCL support. + +## PR split recommendation + +- PR 1: audit and deterministic all-reduce contract documentation. +- PR 2: deterministic all-reduce helper with ordered rank fallback and smoke + tests. +- PR 3: NCCL ring fast path and GPU validation report, after PyTorch/NCCL runtime + is available. + +## PR body files created + +None yet. + +## Blockers + +- `python` is not available on PATH. +- System `python3 -m venv` cannot create venvs because `ensurepip` is unavailable; used workspace-local `virtualenv` bootstrap instead. +- Current GPU model reported by `nvidia-smi` is `NVIDIA L20X`, not the H200 model + assumed by the original overnight manual. + +## Unsafe operations skipped + +- No push attempted. +- No system dependency installation attempted; all Python dependencies were installed under `.codex-nightly/`. +- No upstream branch or PR creation attempted. + +## Remaining work + +- Commit the audit phase. +- Add deterministic all-reduce helper API and focused distributed tests. +- Use `.codex-nightly/envs/issue112-py312` for GPU/NCCL validation. +- Prepare PR body drafts and patch artifacts. + +## Suggested next Codex prompt + +Continue issue #112 from `overnight_report_issue112.md`. Prioritize the next +reviewable phase: add `rl_engine.distributed.deterministic_allreduce` with an +ordered-rank fallback and focused smoke tests, then update this report. From 9a043fb18a6370c1f788031622afdea7183efa3d Mon Sep 17 00:00:00 2001 From: CyberSecurityErial <2710555967@qq.com> Date: Tue, 23 Jun 2026 00:12:34 +0800 Subject: [PATCH 2/5] feat(distributed): add deterministic all-reduce helper --- docs/distributed/deterministic_allreduce.md | 124 ++++++++++++ overnight_report_issue112.md | 82 ++++++-- rl_engine/distributed/__init__.py | 16 ++ .../distributed/deterministic_allreduce.py | 164 ++++++++++++++++ .../test_deterministic_allreduce.py | 184 ++++++++++++++++++ 5 files changed, 553 insertions(+), 17 deletions(-) create mode 100644 docs/distributed/deterministic_allreduce.md create mode 100644 rl_engine/distributed/__init__.py create mode 100644 rl_engine/distributed/deterministic_allreduce.py create mode 100644 tests/distributed/test_deterministic_allreduce.py diff --git a/docs/distributed/deterministic_allreduce.md b/docs/distributed/deterministic_allreduce.md new file mode 100644 index 0000000..a025872 --- /dev/null +++ b/docs/distributed/deterministic_allreduce.md @@ -0,0 +1,124 @@ +# Deterministic All-Reduce + +RL-Kernel exposes a small deterministic all-reduce helper for WS2 distributed +operator work. The helper is intentionally conservative: it defines an explicit +contract, provides a slow ordered-rank reference path, and keeps the NCCL fast +path opt-in. + +## Contract + +`deterministic_all_reduce(tensor, config)` reduces `tensor` in place and returns +the same tensor object. The result is expected to be stable when all of these +inputs are the same: + +- world size; +- global rank order; +- input tensor values; +- dtype; +- reduction op; +- helper mode; +- process-group and backend environment. + +For `op="mean"`, RL-Kernel performs a sum and divides by `world_size` at a +fixed point. Integer tensors are not accepted for `mean`. + +## Modes + +### `ordered_rank_fallback` + +This is the slow reference mode. Each rank contributes a tensor through +`torch.distributed.all_gather`. Rank 0 then accumulates the gathered tensors in +ascending global rank order, casts the result back to the original dtype, and +broadcasts it to every rank. + +The operation order is: + +1. make each rank input contiguous; +2. gather rank inputs in global-rank order; +3. on rank 0, accumulate rank `0, 1, ..., world_size - 1`; +4. use FP32 accumulation for floating-point tensors when configured; +5. divide once for `op="mean"`; +6. broadcast the final tensor from rank 0. + +This path is memory-heavy because every rank receives the gathered input list. It +is intended for small validation tensors, unsupported-hardware fallbacks, and +test or debug oracles. + +### `nccl_ring` + +This mode uses `torch.distributed.all_reduce` and optionally the NCCL environment +helper below. It is a fast path, not a blanket promise of bitwise determinism +across all NCCL versions and hardware. Validate it on the target machine before +claiming support. + +Call `configure_deterministic_nccl_env()` before +`torch.distributed.init_process_group` when using NCCL: + +```python +from rl_engine.distributed import configure_deterministic_nccl_env + +configure_deterministic_nccl_env(overwrite=True) +``` + +The helper sets these variables when they are unset, or overwrites them when +`overwrite=True`: + +```bash +NCCL_ALGO=Ring +NCCL_PROTO=Simple +NCCL_MIN_NCHANNELS=1 +NCCL_MAX_NCHANNELS=1 +``` + +If the process group is already initialized, the helper warns because NCCL may +have already consumed its collective configuration. + +## Fallback Behavior + +- If `torch.distributed` is unavailable, RL-Kernel raises a clear runtime error. +- If no process group is initialized and `WORLD_SIZE` is unset or `1`, the helper + returns the input tensor unchanged. +- If no process group is initialized and `WORLD_SIZE > 1`, the helper raises a + runtime error. +- If `world_size == 1`, the helper returns the input tensor unchanged. +- `async_op=True` is not implemented for the deterministic helper. + +## Smoke Tests + +Run the unit checks: + +```bash +PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 python -m pytest tests/distributed/test_deterministic_allreduce.py -q +``` + +Run the ordered fallback smoke on CPU/Gloo: + +```bash +torchrun --standalone --nproc_per_node=2 \ + tests/distributed/test_deterministic_allreduce.py \ + --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu +``` + +Run the NCCL ring smoke on two GPUs: + +```bash +CUDA_VISIBLE_DEVICES=0,1 \ +NCCL_ALGO=Ring \ +NCCL_PROTO=Simple \ +NCCL_MIN_NCHANNELS=1 \ +NCCL_MAX_NCHANNELS=1 \ +torchrun --standalone --nproc_per_node=2 \ + tests/distributed/test_deterministic_allreduce.py \ + --backend nccl --mode nccl_ring --dtype fp32 --device cuda +``` + +The smoke test prints rank-0 JSON with `max_abs_diff`, `max_rel_diff`, +`mismatch_count`, and `bitwise_equal`. + +## Current Limitations + +- NVLS / NVLink-Sharp is not claimed by this helper. It needs a separate probe + with NCCL logs that prove NVLS was used. +- Multi-node and RDMA behavior are not validated by this document. +- DeepSpeed gradient synchronization order is not controlled by this helper + until a tested integration point is added. diff --git a/overnight_report_issue112.md b/overnight_report_issue112.md index 9c23795..e5c74f7 100644 --- a/overnight_report_issue112.md +++ b/overnight_report_issue112.md @@ -20,43 +20,90 @@ - Created local working branch `cse/issue-112-deterministic-nccl`. - Audited repository call sites for all-reduce, reduce-scatter, all-gather, DDP/FSDP, DeepSpeed, NCCL, and gradient synchronization keywords. -- Added a distributed audit document. +- Added `docs/distributed/deterministic_allreduce_audit.md`. - Created an isolated workspace-local Python environment under `.codex-nightly/envs/issue112-py312`. - Installed PyTorch CUDA 12.8 and RL-Kernel dev/docs dependencies into that isolated environment. +- Added `rl_engine.distributed.deterministic_allreduce` with: + - `configure_deterministic_nccl_env()`; + - `DeterministicAllReduceConfig`; + - `nccl_ring` fast path using `torch.distributed.all_reduce`; + - `ordered_rank_fallback` using all-gather, rank-ordered accumulation on rank 0, and broadcast. +- Added user-facing docs for deterministic all-reduce modes and fallback behavior. +- Added a torchrun-compatible distributed smoke test. ## Commits created -- `docs(distributed): audit all-reduce call sites for issue 112` +- `2bccb64 docs(distributed): audit all-reduce call sites for issue 112` +- `feat(distributed): add deterministic all-reduce helper` ## Files changed - `docs/.nav.yml` - `docs/distributed/deterministic_allreduce_audit.md` +- `docs/distributed/deterministic_allreduce.md` +- `rl_engine/distributed/__init__.py` +- `rl_engine/distributed/deterministic_allreduce.py` +- `tests/distributed/test_deterministic_allreduce.py` - `overnight_report_issue112.md` ## Tests run - `git diff --check` - `.codex-nightly/envs/issue112-py312/bin/python -c "import torch; ..."` +- `.codex-nightly/envs/issue112-py312/bin/black --check --line-length 100 rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` +- `.codex-nightly/envs/issue112-py312/bin/isort --check-only --profile black --line-length 100 rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` +- `.codex-nightly/envs/issue112-py312/bin/flake8 --max-line-length=100 --extend-ignore=E203,E704 rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` +- `.codex-nightly/envs/issue112-py312/bin/ruff check rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` +- `.codex-nightly/envs/issue112-py312/bin/python -m mypy --ignore-missing-imports rl_engine/distributed` +- `PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 .codex-nightly/envs/issue112-py312/bin/python -m pytest tests/distributed/test_deterministic_allreduce.py -q` +- `.codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_deterministic_allreduce.py --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu` +- `CUDA_VISIBLE_DEVICES=0,1 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_deterministic_allreduce.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` +- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_deterministic_allreduce.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` +- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_deterministic_allreduce.py --backend nccl --mode ordered_rank_fallback --dtype fp32 --device cuda` - `.codex-nightly/envs/issue112-py312/bin/mkdocs build --strict -f mkdocs.yaml` +- `PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 .codex-nightly/envs/issue112-py312/bin/python -m pytest rl_engine/tests/test_dispatch.py -v` ## Test results - `git diff --check` passed. - PyTorch environment check passed: CUDA available, 8 devices visible, NCCL available. -- `mkdocs build --strict -f mkdocs.yaml` passed. It emitted expected git revision-date warnings for the new uncommitted audit doc. +- black, isort, flake8, ruff, and mypy passed for the new Python files. +- `tests/distributed/test_deterministic_allreduce.py`: 3 passed. +- 2-rank CPU/Gloo ordered fallback smoke passed with bitwise equality. +- 2-rank CUDA/NCCL `nccl_ring` smoke passed with bitwise equality against the ordered fallback oracle. +- 8-rank CUDA/NCCL `nccl_ring` smoke passed within tolerance against the ordered fallback oracle; it was not bitwise equal. +- 8-rank CUDA/NCCL ordered fallback smoke passed with bitwise equality. +- `mkdocs build --strict -f mkdocs.yaml` passed. It emitted expected git revision-date warnings for new uncommitted docs. +- `rl_engine/tests/test_dispatch.py`: 3 passed. ## GPU validation results -No distributed GPU validation has been run yet. `nvidia-smi` is available and reports 8 GPUs. The isolated venv can import PyTorch with CUDA and NCCL support. +Current machine is 8x `NVIDIA L20X`, not H200. No H200- or NVLS-specific claim is made. + +2-rank NCCL ring smoke result: + +```json +{"backend":"nccl","bitwise_equal":true,"device":"cuda:0","dtype":"fp32","iterations":3,"max_abs_diff":0.0,"max_rel_diff":0.0,"mismatch_count":0,"mode":"nccl_ring","op":"sum","status":"pass","world_size":2} +``` + +8-rank NCCL ring smoke result: + +```json +{"backend":"nccl","bitwise_equal":false,"device":"cuda:0","dtype":"fp32","iterations":3,"max_abs_diff":4.76837158203125e-07,"max_rel_diff":3.2424927098873013e-07,"mismatch_count":62,"mode":"nccl_ring","op":"sum","status":"pass","world_size":8} +``` + +8-rank ordered fallback smoke result: + +```json +{"backend":"nccl","bitwise_equal":true,"device":"cuda:0","dtype":"fp32","iterations":3,"max_abs_diff":0.0,"max_rel_diff":0.0,"mismatch_count":0,"mode":"ordered_rank_fallback","op":"sum","status":"pass","world_size":8} +``` ## PR split recommendation - PR 1: audit and deterministic all-reduce contract documentation. -- PR 2: deterministic all-reduce helper with ordered rank fallback and smoke - tests. -- PR 3: NCCL ring fast path and GPU validation report, after PyTorch/NCCL runtime - is available. +- PR 2: deterministic all-reduce helper with ordered rank fallback and smoke tests. +- PR 3: DP gradient fixed-step comparison against a DP=1 baseline. +- PR 4: NVLS/NVLink-Sharp probe and documentation only if hardware and logs prove it. ## PR body files created @@ -66,24 +113,25 @@ None yet. - `python` is not available on PATH. - System `python3 -m venv` cannot create venvs because `ensurepip` is unavailable; used workspace-local `virtualenv` bootstrap instead. -- Current GPU model reported by `nvidia-smi` is `NVIDIA L20X`, not the H200 model - assumed by the original overnight manual. +- Current GPU model reported by `nvidia-smi` is `NVIDIA L20X`, not the H200 model assumed by the original overnight manual. +- NVLS has not been probed or validated. +- DeepSpeed DP gradient synchronization order is not controlled by the new helper yet. ## Unsafe operations skipped - No push attempted. - No system dependency installation attempted; all Python dependencies were installed under `.codex-nightly/`. - No upstream branch or PR creation attempted. +- No sudo used. ## Remaining work -- Commit the audit phase. -- Add deterministic all-reduce helper API and focused distributed tests. -- Use `.codex-nightly/envs/issue112-py312` for GPU/NCCL validation. -- Prepare PR body drafts and patch artifacts. +- Commit the deterministic all-reduce helper phase. +- Add DP gradient all-reduce deterministic fixed-step comparison. +- Prepare PR body drafts. +- Generate patch artifacts under `.codex-nightly/artifacts` if push/PR remains unsafe. +- Probe NVLS only if the current hardware/software setup clearly supports it. ## Suggested next Codex prompt -Continue issue #112 from `overnight_report_issue112.md`. Prioritize the next -reviewable phase: add `rl_engine.distributed.deterministic_allreduce` with an -ordered-rank fallback and focused smoke tests, then update this report. +Continue issue #112 from `overnight_report_issue112.md`. Prioritize the next reviewable phase: add a DP gradient deterministic fixed-step test comparing DP=1 gradients against DP=N reduced gradients, then update this report. diff --git a/rl_engine/distributed/__init__.py b/rl_engine/distributed/__init__.py new file mode 100644 index 0000000..45d29e1 --- /dev/null +++ b/rl_engine/distributed/__init__.py @@ -0,0 +1,16 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 RL-Kernel Contributors + +from rl_engine.distributed.deterministic_allreduce import ( + DETERMINISTIC_NCCL_ENV, + DeterministicAllReduceConfig, + configure_deterministic_nccl_env, + deterministic_all_reduce, +) + +__all__ = [ + "DETERMINISTIC_NCCL_ENV", + "DeterministicAllReduceConfig", + "configure_deterministic_nccl_env", + "deterministic_all_reduce", +] diff --git a/rl_engine/distributed/deterministic_allreduce.py b/rl_engine/distributed/deterministic_allreduce.py new file mode 100644 index 0000000..020ccee --- /dev/null +++ b/rl_engine/distributed/deterministic_allreduce.py @@ -0,0 +1,164 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 RL-Kernel Contributors + +from __future__ import annotations + +import os +import warnings +from dataclasses import dataclass +from typing import Literal, Optional + +import torch +import torch.distributed as dist + +DETERMINISTIC_NCCL_ENV = { + "NCCL_ALGO": "Ring", + "NCCL_PROTO": "Simple", + "NCCL_MIN_NCHANNELS": "1", + "NCCL_MAX_NCHANNELS": "1", +} + + +@dataclass(frozen=True) +class DeterministicAllReduceConfig: + """Configuration for :func:`deterministic_all_reduce`.""" + + mode: Literal["nccl_ring", "ordered_rank_fallback"] = "nccl_ring" + op: Literal["sum", "mean"] = "sum" + force_fp32_accumulation: bool = True + async_op: bool = False + group: Optional[dist.ProcessGroup] = None + + +def configure_deterministic_nccl_env(*, overwrite: bool = False) -> dict[str, Optional[str]]: + """Configure the opt-in NCCL single-ring fast path environment. + + NCCL reads these variables during process-group initialization, so callers + should run this helper before ``torch.distributed.init_process_group``. The + helper returns the previous values so callers can log or restore them. + + Existing environment values are preserved by default. Pass + ``overwrite=True`` to force the RL-Kernel deterministic NCCL settings. + """ + + if dist.is_available() and dist.is_initialized(): + warnings.warn( + "configure_deterministic_nccl_env() was called after " + "torch.distributed was initialized; NCCL may have already read its " + "collective configuration.", + RuntimeWarning, + stacklevel=2, + ) + + previous: dict[str, Optional[str]] = {} + for key, value in DETERMINISTIC_NCCL_ENV.items(): + previous[key] = os.environ.get(key) + if overwrite or key not in os.environ: + os.environ[key] = value + elif os.environ[key] != value: + warnings.warn( + f"{key} is already set to {os.environ[key]!r}; leaving it unchanged. " + f"Pass overwrite=True to set {value!r}.", + RuntimeWarning, + stacklevel=2, + ) + return previous + + +def deterministic_all_reduce( + tensor: torch.Tensor, + config: Optional[DeterministicAllReduceConfig] = None, +) -> torch.Tensor: + """Reduce ``tensor`` in place and return it. + + ``mode="nccl_ring"`` uses ``torch.distributed.all_reduce``. Call + :func:`configure_deterministic_nccl_env` before process-group initialization + when using NCCL and single-ring/single-channel behavior is desired. + + ``mode="ordered_rank_fallback"`` gathers rank inputs, accumulates them on + rank 0 in ascending global rank order, and broadcasts the result. This path + is slow and memory-heavy, but it gives a concrete reference order for smoke + tests and unsupported hardware fallbacks. + """ + + cfg = config or DeterministicAllReduceConfig() + _validate_config(tensor, cfg) + + if cfg.async_op: + raise NotImplementedError("deterministic_all_reduce currently requires async_op=False") + + if not dist.is_available(): + raise RuntimeError("torch.distributed is unavailable in this PyTorch build") + + if not dist.is_initialized(): + if int(os.environ.get("WORLD_SIZE", "1")) > 1: + raise RuntimeError( + "torch.distributed is not initialized, but WORLD_SIZE indicates a " + "multi-rank launch" + ) + return tensor + + world_size = dist.get_world_size(group=cfg.group) + if world_size == 1: + return tensor + + if cfg.mode == "nccl_ring": + return _all_reduce_fast_path(tensor, cfg, world_size) + if cfg.mode == "ordered_rank_fallback": + return _ordered_rank_fallback(tensor, cfg, world_size) + raise ValueError(f"unsupported deterministic all-reduce mode: {cfg.mode!r}") + + +def _validate_config(tensor: torch.Tensor, cfg: DeterministicAllReduceConfig) -> None: + if not isinstance(tensor, torch.Tensor): + raise TypeError(f"tensor must be a torch.Tensor, got {type(tensor)!r}") + if cfg.op not in {"sum", "mean"}: + raise ValueError(f"unsupported reduction op: {cfg.op!r}") + if cfg.mode not in {"nccl_ring", "ordered_rank_fallback"}: + raise ValueError(f"unsupported deterministic all-reduce mode: {cfg.mode!r}") + if cfg.op == "mean" and not (tensor.is_floating_point() or tensor.is_complex()): + raise TypeError("op='mean' requires a floating-point or complex tensor") + + +def _all_reduce_fast_path( + tensor: torch.Tensor, + cfg: DeterministicAllReduceConfig, + world_size: int, +) -> torch.Tensor: + dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=cfg.group, async_op=False) + if cfg.op == "mean": + tensor.div_(world_size) + return tensor + + +def _ordered_rank_fallback( + tensor: torch.Tensor, + cfg: DeterministicAllReduceConfig, + world_size: int, +) -> torch.Tensor: + send = tensor.detach().contiguous() + gathered = [torch.empty_like(send) for _ in range(world_size)] + dist.all_gather(gathered, send, group=cfg.group) + + rank = dist.get_rank(group=cfg.group) + result = torch.empty_like(send) + if rank == 0: + accumulation_dtype = _accumulation_dtype(send, cfg.force_fp32_accumulation) + reduced = gathered[0].to(dtype=accumulation_dtype) + for rank_tensor in gathered[1:]: + reduced.add_(rank_tensor.to(dtype=accumulation_dtype)) + if cfg.op == "mean": + reduced.div_(world_size) + result.copy_(reduced.to(dtype=send.dtype)) + + dist.broadcast(result, src=0, group=cfg.group) + tensor.copy_(result.view_as(tensor)) + return tensor + + +def _accumulation_dtype(tensor: torch.Tensor, force_fp32_accumulation: bool) -> torch.dtype: + if not force_fp32_accumulation or not tensor.is_floating_point(): + return tensor.dtype + if tensor.dtype == torch.float64: + return torch.float64 + return torch.float32 diff --git a/tests/distributed/test_deterministic_allreduce.py b/tests/distributed/test_deterministic_allreduce.py new file mode 100644 index 0000000..db3d794 --- /dev/null +++ b/tests/distributed/test_deterministic_allreduce.py @@ -0,0 +1,184 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 RL-Kernel Contributors + +from __future__ import annotations + +import argparse +import json +import os +from typing import Any + +import pytest +import torch +import torch.distributed as dist + +from rl_engine.distributed import ( + DETERMINISTIC_NCCL_ENV, + DeterministicAllReduceConfig, + configure_deterministic_nccl_env, + deterministic_all_reduce, +) + + +def test_configure_deterministic_nccl_env_preserves_existing_value(monkeypatch): + monkeypatch.setenv("NCCL_ALGO", "Tree") + with pytest.warns(RuntimeWarning, match="NCCL_ALGO"): + previous = configure_deterministic_nccl_env() + + assert previous["NCCL_ALGO"] == "Tree" + assert os.environ["NCCL_ALGO"] == "Tree" + for key, value in DETERMINISTIC_NCCL_ENV.items(): + if key != "NCCL_ALGO": + assert os.environ[key] == value + + +def test_configure_deterministic_nccl_env_can_overwrite(monkeypatch): + monkeypatch.setenv("NCCL_ALGO", "Tree") + configure_deterministic_nccl_env(overwrite=True) + + assert os.environ["NCCL_ALGO"] == "Ring" + + +def test_single_process_without_process_group_is_noop(): + tensor = torch.tensor([1.0, 2.0, 3.0]) + + reduced = deterministic_all_reduce( + tensor, + DeterministicAllReduceConfig(mode="ordered_rank_fallback", op="mean"), + ) + + assert reduced is tensor + assert torch.equal(tensor, torch.tensor([1.0, 2.0, 3.0])) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Deterministic all-reduce smoke test") + parser.add_argument("--backend", choices=("gloo", "nccl"), default="gloo") + parser.add_argument( + "--mode", + choices=("ordered_rank_fallback", "nccl_ring"), + default="ordered_rank_fallback", + ) + parser.add_argument("--op", choices=("sum", "mean"), default="sum") + parser.add_argument("--dtype", choices=("fp32", "fp16", "bf16"), default="fp32") + parser.add_argument("--device", choices=("auto", "cpu", "cuda"), default="auto") + parser.add_argument("--numel", type=int, default=257) + parser.add_argument("--iterations", type=int, default=3) + parser.add_argument("--configure-nccl-env", action="store_true") + parser.add_argument("--rtol", type=float, default=None) + parser.add_argument("--atol", type=float, default=None) + return parser.parse_args() + + +def _dtype(name: str) -> torch.dtype: + return {"fp32": torch.float32, "fp16": torch.float16, "bf16": torch.bfloat16}[name] + + +def _device(args: argparse.Namespace) -> torch.device: + if args.device == "cpu" or args.backend == "gloo": + return torch.device("cpu") + if not torch.cuda.is_available(): + raise RuntimeError("CUDA device requested but torch.cuda.is_available() is false") + local_rank = int(os.environ.get("LOCAL_RANK", "0")) + torch.cuda.set_device(local_rank) + return torch.device("cuda", local_rank) + + +def _make_input(rank: int, dtype: torch.dtype, device: torch.device, numel: int) -> torch.Tensor: + base = torch.arange(numel, dtype=torch.float32, device=device) + values = ((base % 17) - 8.0) / 17.0 + values = values + (rank + 1) * 0.03125 + return values.to(dtype=dtype) + + +def _tolerances(dtype: torch.dtype, args: argparse.Namespace) -> tuple[float, float]: + if args.atol is not None and args.rtol is not None: + return args.atol, args.rtol + if dtype == torch.float32: + return 0.0 if args.mode == "ordered_rank_fallback" else 1.0e-6, 0.0 + if dtype == torch.bfloat16: + return 8.0e-3, 8.0e-3 + return 2.0e-3, 2.0e-3 + + +def _diff_stats(actual: torch.Tensor, expected: torch.Tensor) -> dict[str, Any]: + actual_f32 = actual.detach().to(torch.float32).cpu() + expected_f32 = expected.detach().to(torch.float32).cpu() + diff = (actual_f32 - expected_f32).abs() + denom = expected_f32.abs().clamp_min(1.0e-12) + rel = diff / denom + return { + "bitwise_equal": bool(torch.equal(actual.detach().cpu(), expected.detach().cpu())), + "max_abs_diff": float(diff.max().item()), + "max_rel_diff": float(rel.max().item()), + "mismatch_count": int((diff != 0).sum().item()), + } + + +def _assert_close(actual: torch.Tensor, expected: torch.Tensor, atol: float, rtol: float) -> None: + if not torch.allclose(actual, expected, atol=atol, rtol=rtol): + stats = _diff_stats(actual, expected) + raise AssertionError( + "deterministic all-reduce mismatch: " + f"max_abs_diff={stats['max_abs_diff']} " + f"max_rel_diff={stats['max_rel_diff']} " + f"mismatch_count={stats['mismatch_count']}" + ) + + +def _run_distributed_smoke(args: argparse.Namespace) -> None: + if args.configure_nccl_env or (args.backend == "nccl" and args.mode == "nccl_ring"): + configure_deterministic_nccl_env() + device = _device(args) + dist.init_process_group(backend=args.backend) + try: + rank = dist.get_rank() + world_size = dist.get_world_size() + dtype = _dtype(args.dtype) + atol, rtol = _tolerances(dtype, args) + previous: torch.Tensor | None = None + final_stats: dict[str, Any] = {} + + for _ in range(args.iterations): + original = _make_input(rank, dtype, device, args.numel) + candidate = original.clone() + reference = original.clone() + + deterministic_all_reduce( + candidate, + DeterministicAllReduceConfig(mode=args.mode, op=args.op), + ) + deterministic_all_reduce( + reference, + DeterministicAllReduceConfig(mode="ordered_rank_fallback", op=args.op), + ) + + _assert_close(candidate, reference, atol=atol, rtol=rtol) + if previous is not None: + _assert_close(candidate, previous, atol=atol, rtol=rtol) + previous = candidate.clone() + final_stats = _diff_stats(candidate, reference) + + if rank == 0: + print( + json.dumps( + { + "status": "pass", + "backend": args.backend, + "mode": args.mode, + "op": args.op, + "dtype": args.dtype, + "device": str(device), + "world_size": world_size, + "iterations": args.iterations, + **final_stats, + }, + sort_keys=True, + ) + ) + finally: + dist.destroy_process_group() + + +if __name__ == "__main__": + _run_distributed_smoke(_parse_args()) From 6b9672c2c041890cabf170845eeaef50b46c4915 Mon Sep 17 00:00:00 2001 From: CyberSecurityErial <2710555967@qq.com> Date: Tue, 23 Jun 2026 00:28:11 +0800 Subject: [PATCH 3/5] test(distributed): compare DP gradients against single-rank baseline --- docs/distributed/deterministic_allreduce.md | 33 ++ overnight_report_issue112.md | 43 ++- .../test_dp_gradient_determinism.py | 309 ++++++++++++++++++ 3 files changed, 376 insertions(+), 9 deletions(-) create mode 100644 tests/distributed/test_dp_gradient_determinism.py diff --git a/docs/distributed/deterministic_allreduce.md b/docs/distributed/deterministic_allreduce.md index a025872..749b47e 100644 --- a/docs/distributed/deterministic_allreduce.md +++ b/docs/distributed/deterministic_allreduce.md @@ -115,6 +115,39 @@ torchrun --standalone --nproc_per_node=2 \ The smoke test prints rank-0 JSON with `max_abs_diff`, `max_rel_diff`, `mismatch_count`, and `bitwise_equal`. +## DP Gradient Smoke Test + +The DP gradient smoke test compares a DP=1 baseline gradient on a fixed global +batch against DP=N local gradients reduced with `deterministic_all_reduce(..., +op="mean")`. + +Run the CPU/Gloo fallback test: + +```bash +torchrun --standalone --nproc_per_node=2 \ + tests/distributed/test_dp_gradient_determinism.py \ + --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu +``` + +Run the NCCL ring test on GPUs: + +```bash +CUDA_VISIBLE_DEVICES=0,1 \ +NCCL_ALGO=Ring \ +NCCL_PROTO=Simple \ +NCCL_MIN_NCHANNELS=1 \ +NCCL_MAX_NCHANNELS=1 \ +torchrun --standalone --nproc_per_node=2 \ + tests/distributed/test_dp_gradient_determinism.py \ + --backend nccl --mode nccl_ring --dtype fp32 --device cuda +``` + +The DP smoke reports aggregate and per-parameter `max_abs_diff`, +`max_rel_diff`, `mismatch_count`, and `bitwise_equal` fields. The DP=N path is +expected to match the DP=1 baseline within tolerance; it is not required to be +bitwise equal because the model backward pass and cross-rank gradient averaging +use a different arithmetic grouping than the single-rank full-batch backward. + ## Current Limitations - NVLS / NVLink-Sharp is not claimed by this helper. It needs a separate probe diff --git a/overnight_report_issue112.md b/overnight_report_issue112.md index e5c74f7..82edc24 100644 --- a/overnight_report_issue112.md +++ b/overnight_report_issue112.md @@ -29,12 +29,16 @@ - `nccl_ring` fast path using `torch.distributed.all_reduce`; - `ordered_rank_fallback` using all-gather, rank-ordered accumulation on rank 0, and broadcast. - Added user-facing docs for deterministic all-reduce modes and fallback behavior. -- Added a torchrun-compatible distributed smoke test. +- Added a torchrun-compatible distributed all-reduce smoke test. +- Added a DP gradient fixed-step smoke test comparing a DP=1 full-batch baseline against DP=N local gradients reduced with the deterministic all-reduce helper. +- Created local PR body drafts `.pr_body_issue112_pr1.md`, `.pr_body_issue112_pr2.md`, and `.pr_body_issue112_pr3.md`. +- Generated patch artifacts under `.codex-nightly/artifacts`. ## Commits created - `2bccb64 docs(distributed): audit all-reduce call sites for issue 112` - `feat(distributed): add deterministic all-reduce helper` +- `test(distributed): compare DP gradients against single-rank baseline` ## Files changed @@ -44,6 +48,7 @@ - `rl_engine/distributed/__init__.py` - `rl_engine/distributed/deterministic_allreduce.py` - `tests/distributed/test_deterministic_allreduce.py` +- `tests/distributed/test_dp_gradient_determinism.py` - `overnight_report_issue112.md` ## Tests run @@ -62,6 +67,11 @@ - `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_deterministic_allreduce.py --backend nccl --mode ordered_rank_fallback --dtype fp32 --device cuda` - `.codex-nightly/envs/issue112-py312/bin/mkdocs build --strict -f mkdocs.yaml` - `PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 .codex-nightly/envs/issue112-py312/bin/python -m pytest rl_engine/tests/test_dispatch.py -v` +- `PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 .codex-nightly/envs/issue112-py312/bin/python -m pytest tests/distributed/test_dp_gradient_determinism.py -q` +- `.codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_dp_gradient_determinism.py --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu` +- `CUDA_VISIBLE_DEVICES=0,1 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_dp_gradient_determinism.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` +- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_dp_gradient_determinism.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` +- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_dp_gradient_determinism.py --backend nccl --mode ordered_rank_fallback --dtype fp32 --device cuda` ## Test results @@ -75,6 +85,11 @@ - 8-rank CUDA/NCCL ordered fallback smoke passed with bitwise equality. - `mkdocs build --strict -f mkdocs.yaml` passed. It emitted expected git revision-date warnings for new uncommitted docs. - `rl_engine/tests/test_dispatch.py`: 3 passed. +- DP gradient unit test: 1 passed. +- 2-rank CPU/Gloo DP gradient ordered fallback smoke passed against DP=1 baseline. +- 2-rank CUDA/NCCL DP gradient `nccl_ring` smoke passed against DP=1 baseline. +- 8-rank CUDA/NCCL DP gradient `nccl_ring` smoke passed against DP=1 baseline. +- 8-rank CUDA/NCCL DP gradient ordered fallback smoke passed against DP=1 baseline. ## GPU validation results @@ -98,16 +113,30 @@ Current machine is 8x `NVIDIA L20X`, not H200. No H200- or NVLS-specific claim i {"backend":"nccl","bitwise_equal":true,"device":"cuda:0","dtype":"fp32","iterations":3,"max_abs_diff":0.0,"max_rel_diff":0.0,"mismatch_count":0,"mode":"ordered_rank_fallback","op":"sum","status":"pass","world_size":8} ``` +8-rank DP gradient NCCL ring smoke result: + +```json +{"backend":"nccl","bitwise_equal":false,"device":"cuda:0","dtype":"fp32","global_batch_size":16,"max_abs_diff":5.960464477539063e-08,"max_rel_diff":6.11946063600044e-07,"mismatch_count":81,"mode":"nccl_ring","status":"pass","world_size":8} +``` + +8-rank DP gradient ordered fallback smoke result: + +```json +{"backend":"nccl","bitwise_equal":false,"device":"cuda:0","dtype":"fp32","global_batch_size":16,"max_abs_diff":2.9802322387695312e-08,"max_rel_diff":3.059730261156801e-06,"mismatch_count":74,"mode":"ordered_rank_fallback","status":"pass","world_size":8} +``` + ## PR split recommendation - PR 1: audit and deterministic all-reduce contract documentation. - PR 2: deterministic all-reduce helper with ordered rank fallback and smoke tests. -- PR 3: DP gradient fixed-step comparison against a DP=1 baseline. +- PR 3: split DP gradient fixed-step comparison if maintainers prefer it separate from the helper. - PR 4: NVLS/NVLink-Sharp probe and documentation only if hardware and logs prove it. ## PR body files created -None yet. +- `.pr_body_issue112_pr1.md` +- `.pr_body_issue112_pr2.md` +- `.pr_body_issue112_pr3.md` ## Blockers @@ -115,7 +144,7 @@ None yet. - System `python3 -m venv` cannot create venvs because `ensurepip` is unavailable; used workspace-local `virtualenv` bootstrap instead. - Current GPU model reported by `nvidia-smi` is `NVIDIA L20X`, not the H200 model assumed by the original overnight manual. - NVLS has not been probed or validated. -- DeepSpeed DP gradient synchronization order is not controlled by the new helper yet. +- DeepSpeed DP gradient synchronization order is not controlled by the new helper yet; the DP gradient smoke uses a tiny local model rather than DeepSpeed internals. ## Unsafe operations skipped @@ -126,12 +155,8 @@ None yet. ## Remaining work -- Commit the deterministic all-reduce helper phase. -- Add DP gradient all-reduce deterministic fixed-step comparison. -- Prepare PR body drafts. -- Generate patch artifacts under `.codex-nightly/artifacts` if push/PR remains unsafe. - Probe NVLS only if the current hardware/software setup clearly supports it. ## Suggested next Codex prompt -Continue issue #112 from `overnight_report_issue112.md`. Prioritize the next reviewable phase: add a DP gradient deterministic fixed-step test comparing DP=1 gradients against DP=N reduced gradients, then update this report. +Continue issue #112 from `overnight_report_issue112.md`. Prioritize PR body drafts, patch artifact generation, and optional NVLS probing only if the hardware/software logs clearly prove support. diff --git a/tests/distributed/test_dp_gradient_determinism.py b/tests/distributed/test_dp_gradient_determinism.py new file mode 100644 index 0000000..57691b1 --- /dev/null +++ b/tests/distributed/test_dp_gradient_determinism.py @@ -0,0 +1,309 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright (c) 2026 RL-Kernel Contributors + +from __future__ import annotations + +import argparse +import json +import os +from dataclasses import dataclass +from typing import Any + +import torch +import torch.distributed as dist +import torch.nn.functional as F + +from rl_engine.distributed import ( + DeterministicAllReduceConfig, + configure_deterministic_nccl_env, + deterministic_all_reduce, +) + + +class TinyGradientModel(torch.nn.Module): + def __init__(self, input_dim: int, hidden_dim: int, output_dim: int): + super().__init__() + self.net = torch.nn.Sequential( + torch.nn.Linear(input_dim, hidden_dim), + torch.nn.ReLU(), + torch.nn.Linear(hidden_dim, output_dim), + ) + + def forward(self, inputs: torch.Tensor) -> torch.Tensor: + return self.net(inputs) + + +@dataclass(frozen=True) +class GradientStats: + bitwise_equal: bool + max_abs_diff: float + max_rel_diff: float + mismatch_count: int + + +def test_fixed_batch_is_reproducible(): + first = _fixed_batch(global_batch_size=8, input_dim=3, output_dim=2) + second = _fixed_batch(global_batch_size=8, input_dim=3, output_dim=2) + + assert torch.equal(first[0], second[0]) + assert torch.equal(first[1], second[1]) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="DP gradient determinism smoke test") + parser.add_argument("--backend", choices=("gloo", "nccl"), default="gloo") + parser.add_argument( + "--mode", + choices=("ordered_rank_fallback", "nccl_ring"), + default="ordered_rank_fallback", + ) + parser.add_argument("--dtype", choices=("fp32", "fp16", "bf16"), default="fp32") + parser.add_argument("--device", choices=("auto", "cpu", "cuda"), default="auto") + parser.add_argument("--global-batch-size", type=int, default=16) + parser.add_argument("--input-dim", type=int, default=7) + parser.add_argument("--hidden-dim", type=int, default=13) + parser.add_argument("--output-dim", type=int, default=5) + parser.add_argument("--seed", type=int, default=2026) + parser.add_argument("--configure-nccl-env", action="store_true") + parser.add_argument("--rtol", type=float, default=None) + parser.add_argument("--atol", type=float, default=None) + return parser.parse_args() + + +def _dtype(name: str) -> torch.dtype: + return {"fp32": torch.float32, "fp16": torch.float16, "bf16": torch.bfloat16}[name] + + +def _device(args: argparse.Namespace) -> torch.device: + if args.device == "cpu" or args.backend == "gloo": + return torch.device("cpu") + if not torch.cuda.is_available(): + raise RuntimeError("CUDA device requested but torch.cuda.is_available() is false") + local_rank = int(os.environ.get("LOCAL_RANK", "0")) + torch.cuda.set_device(local_rank) + return torch.device("cuda", local_rank) + + +def _set_deterministic_controls(seed: int) -> None: + torch.manual_seed(seed) + if torch.cuda.is_available(): + torch.cuda.manual_seed_all(seed) + torch.backends.cuda.matmul.allow_tf32 = False + torch.backends.cudnn.allow_tf32 = False + torch.use_deterministic_algorithms(True, warn_only=True) + + +def _fixed_batch( + *, + global_batch_size: int, + input_dim: int, + output_dim: int, +) -> tuple[torch.Tensor, torch.Tensor]: + inputs = torch.linspace( + -1.0, + 1.0, + steps=global_batch_size * input_dim, + dtype=torch.float32, + ).reshape(global_batch_size, input_dim) + targets = torch.cos( + torch.linspace( + -0.7, + 0.9, + steps=global_batch_size * output_dim, + dtype=torch.float32, + ) + ).reshape(global_batch_size, output_dim) + return inputs, targets + + +def _make_model( + *, + input_dim: int, + hidden_dim: int, + output_dim: int, + seed: int, + dtype: torch.dtype, + device: torch.device, +) -> TinyGradientModel: + torch.manual_seed(seed) + model = TinyGradientModel(input_dim, hidden_dim, output_dim) + return model.to(device=device, dtype=dtype) + + +def _compute_gradients( + model: torch.nn.Module, + inputs: torch.Tensor, + targets: torch.Tensor, + *, + dtype: torch.dtype, + device: torch.device, +) -> dict[str, torch.Tensor]: + model.zero_grad(set_to_none=True) + batch_inputs = inputs.to(device=device, dtype=dtype) + batch_targets = targets.to(device=device, dtype=dtype) + predictions = model(batch_inputs) + loss = F.mse_loss(predictions.float(), batch_targets.float(), reduction="mean") + loss.backward() + return { + name: parameter.grad.detach().clone() + for name, parameter in model.named_parameters() + if parameter.grad is not None + } + + +def _reduce_gradients(model: torch.nn.Module, mode: str) -> dict[str, torch.Tensor]: + reduced: dict[str, torch.Tensor] = {} + for name, parameter in model.named_parameters(): + if parameter.grad is None: + continue + deterministic_all_reduce( + parameter.grad, + DeterministicAllReduceConfig(mode=mode, op="mean"), + ) + reduced[name] = parameter.grad.detach().clone() + return reduced + + +def _stats(actual: torch.Tensor, expected: torch.Tensor) -> GradientStats: + actual_f32 = actual.detach().to(torch.float32).cpu() + expected_f32 = expected.detach().to(torch.float32).cpu() + diff = (actual_f32 - expected_f32).abs() + rel = diff / expected_f32.abs().clamp_min(1.0e-12) + return GradientStats( + bitwise_equal=bool(torch.equal(actual.detach().cpu(), expected.detach().cpu())), + max_abs_diff=float(diff.max().item()), + max_rel_diff=float(rel.max().item()), + mismatch_count=int((diff != 0).sum().item()), + ) + + +def _tolerances(dtype: torch.dtype, args: argparse.Namespace) -> tuple[float, float]: + if args.atol is not None and args.rtol is not None: + return args.atol, args.rtol + if dtype == torch.float32: + return 1.0e-5, 1.0e-5 + if dtype == torch.bfloat16: + return 2.0e-2, 2.0e-2 + return 5.0e-3, 5.0e-3 + + +def _compare_gradients( + actual: dict[str, torch.Tensor], + expected: dict[str, torch.Tensor], + *, + atol: float, + rtol: float, +) -> tuple[GradientStats, list[dict[str, Any]]]: + if set(actual) != set(expected): + raise AssertionError( + f"gradient key mismatch: actual={sorted(actual)}, expected={sorted(expected)}" + ) + + global_stats = GradientStats(True, 0.0, 0.0, 0) + parameters: list[dict[str, Any]] = [] + for name in sorted(actual): + param_stats = _stats(actual[name], expected[name]) + parameters.append({"name": name, **param_stats.__dict__}) + global_stats = GradientStats( + bitwise_equal=global_stats.bitwise_equal and param_stats.bitwise_equal, + max_abs_diff=max(global_stats.max_abs_diff, param_stats.max_abs_diff), + max_rel_diff=max(global_stats.max_rel_diff, param_stats.max_rel_diff), + mismatch_count=global_stats.mismatch_count + param_stats.mismatch_count, + ) + if not torch.allclose(actual[name], expected[name], atol=atol, rtol=rtol): + raise AssertionError( + f"gradient mismatch for {name}: " + f"max_abs_diff={param_stats.max_abs_diff} " + f"max_rel_diff={param_stats.max_rel_diff} " + f"mismatch_count={param_stats.mismatch_count}" + ) + return global_stats, parameters + + +def _run_distributed_smoke(args: argparse.Namespace) -> None: + if args.configure_nccl_env or (args.backend == "nccl" and args.mode == "nccl_ring"): + configure_deterministic_nccl_env() + device = _device(args) + _set_deterministic_controls(args.seed) + dist.init_process_group(backend=args.backend) + try: + rank = dist.get_rank() + world_size = dist.get_world_size() + if args.global_batch_size % world_size != 0: + raise ValueError("global batch size must be divisible by world size") + + dtype = _dtype(args.dtype) + atol, rtol = _tolerances(dtype, args) + inputs, targets = _fixed_batch( + global_batch_size=args.global_batch_size, + input_dim=args.input_dim, + output_dim=args.output_dim, + ) + + baseline_model = _make_model( + input_dim=args.input_dim, + hidden_dim=args.hidden_dim, + output_dim=args.output_dim, + seed=args.seed, + dtype=dtype, + device=device, + ) + baseline_grads = _compute_gradients( + baseline_model, + inputs, + targets, + dtype=dtype, + device=device, + ) + + local_batch_size = args.global_batch_size // world_size + start = rank * local_batch_size + end = start + local_batch_size + dp_model = _make_model( + input_dim=args.input_dim, + hidden_dim=args.hidden_dim, + output_dim=args.output_dim, + seed=args.seed, + dtype=dtype, + device=device, + ) + _compute_gradients( + dp_model, + inputs[start:end], + targets[start:end], + dtype=dtype, + device=device, + ) + reduced_grads = _reduce_gradients(dp_model, args.mode) + global_stats, parameter_stats = _compare_gradients( + reduced_grads, + baseline_grads, + atol=atol, + rtol=rtol, + ) + + if rank == 0: + print( + json.dumps( + { + "status": "pass", + "backend": args.backend, + "mode": args.mode, + "dtype": args.dtype, + "device": str(device), + "world_size": world_size, + "global_batch_size": args.global_batch_size, + "atol": atol, + "rtol": rtol, + **global_stats.__dict__, + "parameters": parameter_stats, + }, + sort_keys=True, + ) + ) + finally: + dist.destroy_process_group() + + +if __name__ == "__main__": + _run_distributed_smoke(_parse_args()) From 6403bfa741f6674c8aaa2b12c61838be8ab88175 Mon Sep 17 00:00:00 2001 From: CyberSecurityErial <2710555967@qq.com> Date: Tue, 23 Jun 2026 00:48:42 +0800 Subject: [PATCH 4/5] docs(distributed): clarify validation hardware note --- overnight_report_issue112.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/overnight_report_issue112.md b/overnight_report_issue112.md index 82edc24..50222bd 100644 --- a/overnight_report_issue112.md +++ b/overnight_report_issue112.md @@ -7,7 +7,7 @@ ## Environment - Host: `dedicated-developjob-wtl-t1wjo-7c6d5f4d56-qzkfm` -- GPU: 8x `NVIDIA L20X` reported by `nvidia-smi` +- GPU: physical machine confirmed by user as H200; `nvidia-smi` labels devices as `NVIDIA L20X` - CUDA: driver reports CUDA 12.8, driver version 570.172.08 - PyTorch: `torch 2.11.0+cu128` in `.codex-nightly/envs/issue112-py312` - NCCL: `torch.cuda.nccl.version()` reports `(2, 28, 9)` @@ -93,7 +93,7 @@ ## GPU validation results -Current machine is 8x `NVIDIA L20X`, not H200. No H200- or NVLS-specific claim is made. +Physical machine is user-confirmed H200, while `nvidia-smi` labels devices as `NVIDIA L20X`. No NVLS- or H200-specific support claim is made from this label mismatch. 2-rank NCCL ring smoke result: @@ -142,7 +142,7 @@ Current machine is 8x `NVIDIA L20X`, not H200. No H200- or NVLS-specific claim i - `python` is not available on PATH. - System `python3 -m venv` cannot create venvs because `ensurepip` is unavailable; used workspace-local `virtualenv` bootstrap instead. -- Current GPU model reported by `nvidia-smi` is `NVIDIA L20X`, not the H200 model assumed by the original overnight manual. +- Hardware label mismatch: physical machine is user-confirmed H200, while `nvidia-smi` labels devices as `NVIDIA L20X`. - NVLS has not been probed or validated. - DeepSpeed DP gradient synchronization order is not controlled by the new helper yet; the DP gradient smoke uses a tiny local model rather than DeepSpeed internals. From ee8c6c37dfbb985db9d23e35c4f4f928d4a0efd7 Mon Sep 17 00:00:00 2001 From: CyberSecurityErial <2710555967@qq.com> Date: Tue, 23 Jun 2026 01:39:53 +0800 Subject: [PATCH 5/5] refactor(distributed): tighten all-reduce review surface --- docs/distributed/deterministic_allreduce.md | 141 +++++---------- .../deterministic_allreduce_audit.md | 123 +++---------- overnight_report_issue112.md | 162 ----------------- .../distributed/deterministic_allreduce.py | 95 ++++------ .../test_deterministic_allreduce.py | 168 +++++++++++++++--- .../test_dp_gradient_determinism.py | 48 ++++- 6 files changed, 291 insertions(+), 446 deletions(-) delete mode 100644 overnight_report_issue112.md diff --git a/docs/distributed/deterministic_allreduce.md b/docs/distributed/deterministic_allreduce.md index 749b47e..86f3d59 100644 --- a/docs/distributed/deterministic_allreduce.md +++ b/docs/distributed/deterministic_allreduce.md @@ -1,58 +1,42 @@ # Deterministic All-Reduce -RL-Kernel exposes a small deterministic all-reduce helper for WS2 distributed -operator work. The helper is intentionally conservative: it defines an explicit -contract, provides a slow ordered-rank reference path, and keeps the NCCL fast -path opt-in. +RL-Kernel provides a small all-reduce helper for distributed smoke tests and +future WS2 integration work. It has two modes: -## Contract - -`deterministic_all_reduce(tensor, config)` reduces `tensor` in place and returns -the same tensor object. The result is expected to be stable when all of these -inputs are the same: +- `torch_all_reduce`: calls `torch.distributed.all_reduce`. +- `ordered_rank_reference`: gathers all rank tensors, accumulates them on process-group rank 0 in process-group rank order, then broadcasts the result. -- world size; -- global rank order; -- input tensor values; -- dtype; -- reduction op; -- helper mode; -- process-group and backend environment. +The helper reduces the input tensor in place and returns it. -For `op="mean"`, RL-Kernel performs a sum and divides by `world_size` at a -fixed point. Integer tensors are not accepted for `mean`. +## Contract -## Modes +Results are expected to be stable only when the world size, process-group rank +order, inputs, dtype, operation, backend, and environment are unchanged. -### `ordered_rank_fallback` +`op="mean"` performs a sum and divides by world size at a fixed point. Integer +tensors are rejected for `mean`. -This is the slow reference mode. Each rank contributes a tensor through -`torch.distributed.all_gather`. Rank 0 then accumulates the gathered tensors in -ascending global rank order, casts the result back to the original dtype, and -broadcasts it to every rank. +## Ordered-Rank Reference -The operation order is: +`ordered_rank_reference` is a reference path, not a high-performance transport. +It uses `all_gather` and `broadcast`, so the active backend must support those +collectives for the tensor device. The operation order is: 1. make each rank input contiguous; -2. gather rank inputs in global-rank order; -3. on rank 0, accumulate rank `0, 1, ..., world_size - 1`; -4. use FP32 accumulation for floating-point tensors when configured; +2. gather tensors in process-group rank order; +3. accumulate on process-group rank 0 in that order; +4. optionally accumulate floating-point inputs in FP32; 5. divide once for `op="mean"`; -6. broadcast the final tensor from rank 0. - -This path is memory-heavy because every rank receives the gathered input list. It -is intended for small validation tensors, unsupported-hardware fallbacks, and -test or debug oracles. +6. broadcast from process-group rank 0. -### `nccl_ring` +This mode is meant for small tensors in tests, debug runs, and reference +comparisons. -This mode uses `torch.distributed.all_reduce` and optionally the NCCL environment -helper below. It is a fast path, not a blanket promise of bitwise determinism -across all NCCL versions and hardware. Validate it on the target machine before -claiming support. +## Torch All-Reduce -Call `configure_deterministic_nccl_env()` before -`torch.distributed.init_process_group` when using NCCL: +`torch_all_reduce` is a thin wrapper around `torch.distributed.all_reduce`. For +NCCL runs, callers may set best-effort ring settings before process-group +initialization: ```python from rl_engine.distributed import configure_deterministic_nccl_env @@ -60,8 +44,7 @@ from rl_engine.distributed import configure_deterministic_nccl_env configure_deterministic_nccl_env(overwrite=True) ``` -The helper sets these variables when they are unset, or overwrites them when -`overwrite=True`: +The helper writes: ```bash NCCL_ALGO=Ring @@ -70,36 +53,25 @@ NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 ``` -If the process group is already initialized, the helper warns because NCCL may -have already consumed its collective configuration. +These settings do not prove bitwise determinism. Validate on the target machine +before making a hardware-specific claim. -## Fallback Behavior +## Behavior -- If `torch.distributed` is unavailable, RL-Kernel raises a clear runtime error. -- If no process group is initialized and `WORLD_SIZE` is unset or `1`, the helper - returns the input tensor unchanged. -- If no process group is initialized and `WORLD_SIZE > 1`, the helper raises a - runtime error. -- If `world_size == 1`, the helper returns the input tensor unchanged. -- `async_op=True` is not implemented for the deterministic helper. +- `world_size == 1`: returns the input tensor unchanged. +- no initialized process group and `WORLD_SIZE <= 1`: returns the input tensor unchanged. +- no initialized process group and `WORLD_SIZE > 1`: raises `RuntimeError`. +- `async_op=True`: raises `NotImplementedError`. ## Smoke Tests -Run the unit checks: +Unit and CPU/Gloo smoke checks: ```bash PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 python -m pytest tests/distributed/test_deterministic_allreduce.py -q ``` -Run the ordered fallback smoke on CPU/Gloo: - -```bash -torchrun --standalone --nproc_per_node=2 \ - tests/distributed/test_deterministic_allreduce.py \ - --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu -``` - -Run the NCCL ring smoke on two GPUs: +Manual NCCL all-reduce smoke: ```bash CUDA_VISIBLE_DEVICES=0,1 \ @@ -109,49 +81,20 @@ NCCL_MIN_NCHANNELS=1 \ NCCL_MAX_NCHANNELS=1 \ torchrun --standalone --nproc_per_node=2 \ tests/distributed/test_deterministic_allreduce.py \ - --backend nccl --mode nccl_ring --dtype fp32 --device cuda + --backend nccl --mode torch_all_reduce --dtype fp32 --device cuda ``` -The smoke test prints rank-0 JSON with `max_abs_diff`, `max_rel_diff`, -`mismatch_count`, and `bitwise_equal`. - -## DP Gradient Smoke Test - -The DP gradient smoke test compares a DP=1 baseline gradient on a fixed global -batch against DP=N local gradients reduced with `deterministic_all_reduce(..., -op="mean")`. - -Run the CPU/Gloo fallback test: +DP gradient smoke compares a fixed DP=1 full-batch gradient with DP=N local +gradients reduced by this helper: ```bash torchrun --standalone --nproc_per_node=2 \ tests/distributed/test_dp_gradient_determinism.py \ - --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu + --backend gloo --mode ordered_rank_reference --dtype fp32 --device cpu ``` -Run the NCCL ring test on GPUs: - -```bash -CUDA_VISIBLE_DEVICES=0,1 \ -NCCL_ALGO=Ring \ -NCCL_PROTO=Simple \ -NCCL_MIN_NCHANNELS=1 \ -NCCL_MAX_NCHANNELS=1 \ -torchrun --standalone --nproc_per_node=2 \ - tests/distributed/test_dp_gradient_determinism.py \ - --backend nccl --mode nccl_ring --dtype fp32 --device cuda -``` - -The DP smoke reports aggregate and per-parameter `max_abs_diff`, -`max_rel_diff`, `mismatch_count`, and `bitwise_equal` fields. The DP=N path is -expected to match the DP=1 baseline within tolerance; it is not required to be -bitwise equal because the model backward pass and cross-rank gradient averaging -use a different arithmetic grouping than the single-rank full-batch backward. - -## Current Limitations +## Limitations -- NVLS / NVLink-Sharp is not claimed by this helper. It needs a separate probe - with NCCL logs that prove NVLS was used. -- Multi-node and RDMA behavior are not validated by this document. -- DeepSpeed gradient synchronization order is not controlled by this helper - until a tested integration point is added. +- NVLS / NVLink-Sharp is not implemented or claimed here. +- Multi-node and RDMA behavior are not validated here. +- DeepSpeed gradient synchronization is not controlled by this helper yet. diff --git a/docs/distributed/deterministic_allreduce_audit.md b/docs/distributed/deterministic_allreduce_audit.md index c1fdd1f..8ca9edc 100644 --- a/docs/distributed/deterministic_allreduce_audit.md +++ b/docs/distributed/deterministic_allreduce_audit.md @@ -1,117 +1,46 @@ # Deterministic All-Reduce Audit -This document audits the current RL-Kernel repository for issue -[RL-Align/RL-Kernel#112](https://github.com/RL-Align/RL-Kernel/issues/112): -deterministic NCCL all-reduce, including DP gradient all-reduce. +This audit records the distributed communication points relevant to +[RL-Align/RL-Kernel#112](https://github.com/RL-Align/RL-Kernel/issues/112). -Issue #112 is part of the WS2 train-inference consistency roadmap in -[RL-Align/RL-Kernel#83](https://github.com/RL-Align/RL-Kernel/issues/83). The -current repository does not expose a direct deterministic all-reduce API yet, so -the first implementation should add one as the intended entry point rather than -claiming that existing distributed paths are already deterministic. - -## Audit Commands - -The initial audit used these repository-local searches: +## Search ```bash rg -n "all_reduce|allreduce|reduce_scatter|all_gather|DistributedDataParallel|FSDP|deepspeed|gradient" \ rl_engine csrc tests examples benchmarks scripts docs .github rg -n "torch\.distributed|distributed|dist\.|process_group|ProcessGroup|nccl|NCCL|reduce|all_reduce|reduce_scatter|all_gather|gradient" \ rl_engine csrc tests examples benchmarks scripts docs .github -rg --files rl_engine csrc tests examples benchmarks scripts docs .github ``` ## Summary -No direct `torch.distributed` collective call sites were found in the audited -paths. In particular, no direct calls to `all_reduce`, `reduce_scatter`, -`all_gather`, `DistributedDataParallel`, or `FSDP` were found. - -The only training-side path that can imply DP gradient synchronization today is -the optional DeepSpeed worker. That synchronization, when present, is owned by -the DeepSpeed engine called from RL-Kernel, not by an RL-Kernel collective helper. - -The repository also contains CUDA IPC uses of PyTorch's `reduce_tensor`, but -those calls serialize CUDA IPC handles for same-node weight handoff. They are -not collective reductions and do not implement NCCL all-reduce. - -## Call Site Inventory - -| Location | Kind | Train-inference consistency impact | In scope for #112 | Proposed deterministic handling | -| --- | --- | --- | --- | --- | -| `rl_engine/executors/deepspeed_trainer.py` `DeepSpeedTrainingWorker.train` calls `self.engine.backward(loss)` and `self.engine.step()` | Backward / optimizer path; possible DP gradient synchronization inside DeepSpeed | Yes, when the DeepSpeed runtime is configured for DP > 1 | Yes, as the current DP-gradient-adjacent training path | Add an RL-Kernel deterministic all-reduce helper first. For DeepSpeed, do not claim ordering control until a tested integration point exists. A focused DP gradient smoke test should compare DP=1 gradients against DP=N reduced gradients under a fixed global batch and fixed seed. | -| `rl_engine/executors/deepspeed_trainer.py` `deepspeed.initialize(...)` with `zero_optimization` and `gradient_accumulation_steps` config | Optional distributed training runtime setup | Indirect | Yes, because DeepSpeed may perform gradient communication after initialization | Document that DeepSpeed communication order is not currently controlled by RL-Kernel. Keep fallback behavior explicit when DeepSpeed is missing. | -| `tests/test_deepspeed_training_worker.py` fake DeepSpeed engine tests | Unit tests for the DeepSpeed worker contract | No direct collective behavior | Adjacent only | Existing tests prove the worker delegates to `engine.backward` and `engine.step`, but they do not validate distributed gradient ordering. Add new distributed smoke tests separately instead of extending these fake-engine tests into false coverage. | -| `rl_engine/executors/bridge.py` `VLLMIPCWeightUpdateRequestBuilder._resolve_reduce_tensor` and `IPCWeightBridge._resolve_reduce_tensor` | CUDA IPC handle serialization via `torch.multiprocessing.reductions.reduce_tensor` | No collective reduction; affects same-node weight handoff only | No | Keep out of deterministic all-reduce scope. Do not rename or classify this as all-reduce. | -| `rl_engine/executors/bridge.py` `WeightLayout.validate_supported` and `make_weight_bridge` reject multi-node/RDMA/NCCL transports | Explicit unsupported transport guards | Prevents unsupported distributed weight transport from silently succeeding | Adjacent only | Preserve the explicit blockers. If a future NCCL/RDMA transport is added, it needs its own deterministic contract and validation. | -| `docs/usage/weight-sync-bridge.md` mentions vLLM IPC or NCCL public update APIs and RDMA/NCCL as not implemented | User-facing documentation | Adjacent; weight sync rather than gradient all-reduce | Adjacent only | Keep the documentation clear that current weight bridge transports are same-node or local fallbacks. Do not cite this as NCCL all-reduce validation. | -| `rl_engine/utils/logger.py` `info_on_rank` uses `device_ctx.rank` | Rank-filtered logging utility | No numeric impact | No | No deterministic collective handling needed. | - -## Direct Collectives - -None found. - -Because the repository currently has no direct collective entry point, the next -implementation should introduce a small, explicit API for deterministic -all-reduce. That API can become the reference path for future WS2 distributed -ops and the test oracle for DP gradient synchronization experiments. +No direct `torch.distributed` all-reduce, reduce-scatter, all-gather, DDP, or +FSDP call sites were found in RL-Kernel source code. The current DP-gradient +communication risk is indirect: `DeepSpeedTrainingWorker` delegates backward and +optimizer behavior to the optional DeepSpeed engine. -## Determinism Contract To Implement - -A deterministic all-reduce mode should define stability only under explicit -conditions: - -- same world size; -- same global rank order; -- same input tensors; -- same dtype; -- same backend mode; -- same environment and process-group configuration. - -For a slow reference path, the concrete operation order should be: - -1. gather tensors to rank 0 in ascending global rank order; -2. accumulate on rank 0 in rank order `0, 1, ..., world_size - 1`; -3. use FP32 accumulation when configured; -4. apply `sum` or `mean` at a fixed point; -5. cast back to the original dtype when needed; -6. broadcast the final tensor from rank 0 to all ranks. - -The NCCL fast path should be opt-in and should not promise cross-version or -cross-hardware bitwise determinism unless the validation proves it. A helper may -set these environment variables before process-group initialization: - -```bash -NCCL_ALGO=Ring -NCCL_PROTO=Simple -NCCL_MIN_NCHANNELS=1 -NCCL_MAX_NCHANNELS=1 -``` +CUDA IPC uses of `torch.multiprocessing.reductions.reduce_tensor` are not +collective reductions. They serialize CUDA IPC handles for same-node weight +handoff. -## Unsupported Or Not Yet Covered +## Inventory -- NVLink-Sharp / NVLS deterministic reduce is not implemented or validated in - the repository. -- Multi-node/RDMA weight transport is explicitly unsupported by the current - weight bridge. -- DeepSpeed DP gradient communication order is not currently controlled by - RL-Kernel. -- The current audit environment has CUDA driver visibility and an isolated - PyTorch/NCCL venv under `.codex-nightly/envs/issue112-py312`, but distributed - GPU validation has not been run yet. -- No claim is made for multi-node NCCL, RDMA, FSDP, DDP, or DeepSpeed internals. +| Location | Kind | In scope for #112 | Handling | +| --- | --- | --- | --- | +| `rl_engine/executors/deepspeed_trainer.py` `DeepSpeedTrainingWorker.train` | Backward / optimizer delegation to DeepSpeed | Yes, indirectly | Do not claim control over DeepSpeed communication order until a tested integration point exists. | +| `rl_engine/executors/deepspeed_trainer.py` `deepspeed.initialize(...)` | Optional distributed runtime setup | Yes, indirectly | Keep missing-DeepSpeed behavior explicit. Any future integration must document the DeepSpeed hook used for gradient reduction. | +| `tests/test_deepspeed_training_worker.py` fake engine tests | Unit tests for worker delegation | Adjacent | These tests prove delegation only; they do not validate distributed gradient ordering. | +| `rl_engine/executors/bridge.py` CUDA IPC `reduce_tensor` use | CUDA IPC handle serialization | No | Keep out of all-reduce scope. | +| `rl_engine/executors/bridge.py` multi-node/RDMA/NCCL transport blockers | Unsupported weight transport guards | Adjacent | Preserve explicit blockers until a tested transport exists. | +| `rl_engine/utils/logger.py` `info_on_rank` | Rank-filtered logging | No | No numeric reduction behavior. | -## Next Reviewable Step +## Entry Point -Add `rl_engine.distributed.deterministic_allreduce` with two modes: +New distributed code should route through `rl_engine.distributed` so the +all-reduce contract and fallback/reference behavior stay testable in one place. -- `ordered_rank_fallback`: explicit gather, rank-ordered accumulation on rank 0, - and broadcast; -- `nccl_ring`: ordinary `torch.distributed.all_reduce` after the caller has - opted into deterministic NCCL environment configuration before process-group - initialization. +## Not Covered -The first tests should be small distributed smoke tests that can run with -`torchrun` on CPU/Gloo for fallback behavior and on CUDA/NCCL when the runtime is -available. +- NVLS / NVLink-Sharp. +- Multi-node or RDMA collectives. +- DeepSpeed internal gradient synchronization order. diff --git a/overnight_report_issue112.md b/overnight_report_issue112.md deleted file mode 100644 index 50222bd..0000000 --- a/overnight_report_issue112.md +++ /dev/null @@ -1,162 +0,0 @@ -# Overnight Report - RL-Kernel Issue #112 - -## Current branch - -`cse/issue-112-deterministic-nccl` - -## Environment - -- Host: `dedicated-developjob-wtl-t1wjo-7c6d5f4d56-qzkfm` -- GPU: physical machine confirmed by user as H200; `nvidia-smi` labels devices as `NVIDIA L20X` -- CUDA: driver reports CUDA 12.8, driver version 570.172.08 -- PyTorch: `torch 2.11.0+cu128` in `.codex-nightly/envs/issue112-py312` -- NCCL: `torch.cuda.nccl.version()` reports `(2, 28, 9)` -- Python: system `python3` is 3.12.3; isolated venv is `.codex-nightly/envs/issue112-py312`; `python` is not on PATH -- Commit base: `a302be4593bc1715688558a7eb3d3704bf625c4d` - -## Work completed - -- Re-checked issue #112 and roadmap issue #83. -- Created local working branch `cse/issue-112-deterministic-nccl`. -- Audited repository call sites for all-reduce, reduce-scatter, all-gather, - DDP/FSDP, DeepSpeed, NCCL, and gradient synchronization keywords. -- Added `docs/distributed/deterministic_allreduce_audit.md`. -- Created an isolated workspace-local Python environment under `.codex-nightly/envs/issue112-py312`. -- Installed PyTorch CUDA 12.8 and RL-Kernel dev/docs dependencies into that isolated environment. -- Added `rl_engine.distributed.deterministic_allreduce` with: - - `configure_deterministic_nccl_env()`; - - `DeterministicAllReduceConfig`; - - `nccl_ring` fast path using `torch.distributed.all_reduce`; - - `ordered_rank_fallback` using all-gather, rank-ordered accumulation on rank 0, and broadcast. -- Added user-facing docs for deterministic all-reduce modes and fallback behavior. -- Added a torchrun-compatible distributed all-reduce smoke test. -- Added a DP gradient fixed-step smoke test comparing a DP=1 full-batch baseline against DP=N local gradients reduced with the deterministic all-reduce helper. -- Created local PR body drafts `.pr_body_issue112_pr1.md`, `.pr_body_issue112_pr2.md`, and `.pr_body_issue112_pr3.md`. -- Generated patch artifacts under `.codex-nightly/artifacts`. - -## Commits created - -- `2bccb64 docs(distributed): audit all-reduce call sites for issue 112` -- `feat(distributed): add deterministic all-reduce helper` -- `test(distributed): compare DP gradients against single-rank baseline` - -## Files changed - -- `docs/.nav.yml` -- `docs/distributed/deterministic_allreduce_audit.md` -- `docs/distributed/deterministic_allreduce.md` -- `rl_engine/distributed/__init__.py` -- `rl_engine/distributed/deterministic_allreduce.py` -- `tests/distributed/test_deterministic_allreduce.py` -- `tests/distributed/test_dp_gradient_determinism.py` -- `overnight_report_issue112.md` - -## Tests run - -- `git diff --check` -- `.codex-nightly/envs/issue112-py312/bin/python -c "import torch; ..."` -- `.codex-nightly/envs/issue112-py312/bin/black --check --line-length 100 rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` -- `.codex-nightly/envs/issue112-py312/bin/isort --check-only --profile black --line-length 100 rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` -- `.codex-nightly/envs/issue112-py312/bin/flake8 --max-line-length=100 --extend-ignore=E203,E704 rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` -- `.codex-nightly/envs/issue112-py312/bin/ruff check rl_engine/distributed tests/distributed/test_deterministic_allreduce.py` -- `.codex-nightly/envs/issue112-py312/bin/python -m mypy --ignore-missing-imports rl_engine/distributed` -- `PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 .codex-nightly/envs/issue112-py312/bin/python -m pytest tests/distributed/test_deterministic_allreduce.py -q` -- `.codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_deterministic_allreduce.py --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu` -- `CUDA_VISIBLE_DEVICES=0,1 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_deterministic_allreduce.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` -- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_deterministic_allreduce.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` -- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_deterministic_allreduce.py --backend nccl --mode ordered_rank_fallback --dtype fp32 --device cuda` -- `.codex-nightly/envs/issue112-py312/bin/mkdocs build --strict -f mkdocs.yaml` -- `PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 .codex-nightly/envs/issue112-py312/bin/python -m pytest rl_engine/tests/test_dispatch.py -v` -- `PYTEST_DISABLE_PLUGIN_AUTOLOAD=1 .codex-nightly/envs/issue112-py312/bin/python -m pytest tests/distributed/test_dp_gradient_determinism.py -q` -- `.codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_dp_gradient_determinism.py --backend gloo --mode ordered_rank_fallback --dtype fp32 --device cpu` -- `CUDA_VISIBLE_DEVICES=0,1 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=2 tests/distributed/test_dp_gradient_determinism.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` -- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 NCCL_ALGO=Ring NCCL_PROTO=Simple NCCL_MIN_NCHANNELS=1 NCCL_MAX_NCHANNELS=1 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_dp_gradient_determinism.py --backend nccl --mode nccl_ring --dtype fp32 --device cuda` -- `CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 .codex-nightly/envs/issue112-py312/bin/torchrun --standalone --nproc_per_node=8 tests/distributed/test_dp_gradient_determinism.py --backend nccl --mode ordered_rank_fallback --dtype fp32 --device cuda` - -## Test results - -- `git diff --check` passed. -- PyTorch environment check passed: CUDA available, 8 devices visible, NCCL available. -- black, isort, flake8, ruff, and mypy passed for the new Python files. -- `tests/distributed/test_deterministic_allreduce.py`: 3 passed. -- 2-rank CPU/Gloo ordered fallback smoke passed with bitwise equality. -- 2-rank CUDA/NCCL `nccl_ring` smoke passed with bitwise equality against the ordered fallback oracle. -- 8-rank CUDA/NCCL `nccl_ring` smoke passed within tolerance against the ordered fallback oracle; it was not bitwise equal. -- 8-rank CUDA/NCCL ordered fallback smoke passed with bitwise equality. -- `mkdocs build --strict -f mkdocs.yaml` passed. It emitted expected git revision-date warnings for new uncommitted docs. -- `rl_engine/tests/test_dispatch.py`: 3 passed. -- DP gradient unit test: 1 passed. -- 2-rank CPU/Gloo DP gradient ordered fallback smoke passed against DP=1 baseline. -- 2-rank CUDA/NCCL DP gradient `nccl_ring` smoke passed against DP=1 baseline. -- 8-rank CUDA/NCCL DP gradient `nccl_ring` smoke passed against DP=1 baseline. -- 8-rank CUDA/NCCL DP gradient ordered fallback smoke passed against DP=1 baseline. - -## GPU validation results - -Physical machine is user-confirmed H200, while `nvidia-smi` labels devices as `NVIDIA L20X`. No NVLS- or H200-specific support claim is made from this label mismatch. - -2-rank NCCL ring smoke result: - -```json -{"backend":"nccl","bitwise_equal":true,"device":"cuda:0","dtype":"fp32","iterations":3,"max_abs_diff":0.0,"max_rel_diff":0.0,"mismatch_count":0,"mode":"nccl_ring","op":"sum","status":"pass","world_size":2} -``` - -8-rank NCCL ring smoke result: - -```json -{"backend":"nccl","bitwise_equal":false,"device":"cuda:0","dtype":"fp32","iterations":3,"max_abs_diff":4.76837158203125e-07,"max_rel_diff":3.2424927098873013e-07,"mismatch_count":62,"mode":"nccl_ring","op":"sum","status":"pass","world_size":8} -``` - -8-rank ordered fallback smoke result: - -```json -{"backend":"nccl","bitwise_equal":true,"device":"cuda:0","dtype":"fp32","iterations":3,"max_abs_diff":0.0,"max_rel_diff":0.0,"mismatch_count":0,"mode":"ordered_rank_fallback","op":"sum","status":"pass","world_size":8} -``` - -8-rank DP gradient NCCL ring smoke result: - -```json -{"backend":"nccl","bitwise_equal":false,"device":"cuda:0","dtype":"fp32","global_batch_size":16,"max_abs_diff":5.960464477539063e-08,"max_rel_diff":6.11946063600044e-07,"mismatch_count":81,"mode":"nccl_ring","status":"pass","world_size":8} -``` - -8-rank DP gradient ordered fallback smoke result: - -```json -{"backend":"nccl","bitwise_equal":false,"device":"cuda:0","dtype":"fp32","global_batch_size":16,"max_abs_diff":2.9802322387695312e-08,"max_rel_diff":3.059730261156801e-06,"mismatch_count":74,"mode":"ordered_rank_fallback","status":"pass","world_size":8} -``` - -## PR split recommendation - -- PR 1: audit and deterministic all-reduce contract documentation. -- PR 2: deterministic all-reduce helper with ordered rank fallback and smoke tests. -- PR 3: split DP gradient fixed-step comparison if maintainers prefer it separate from the helper. -- PR 4: NVLS/NVLink-Sharp probe and documentation only if hardware and logs prove it. - -## PR body files created - -- `.pr_body_issue112_pr1.md` -- `.pr_body_issue112_pr2.md` -- `.pr_body_issue112_pr3.md` - -## Blockers - -- `python` is not available on PATH. -- System `python3 -m venv` cannot create venvs because `ensurepip` is unavailable; used workspace-local `virtualenv` bootstrap instead. -- Hardware label mismatch: physical machine is user-confirmed H200, while `nvidia-smi` labels devices as `NVIDIA L20X`. -- NVLS has not been probed or validated. -- DeepSpeed DP gradient synchronization order is not controlled by the new helper yet; the DP gradient smoke uses a tiny local model rather than DeepSpeed internals. - -## Unsafe operations skipped - -- No push attempted. -- No system dependency installation attempted; all Python dependencies were installed under `.codex-nightly/`. -- No upstream branch or PR creation attempted. -- No sudo used. - -## Remaining work - -- Probe NVLS only if the current hardware/software setup clearly supports it. - -## Suggested next Codex prompt - -Continue issue #112 from `overnight_report_issue112.md`. Prioritize PR body drafts, patch artifact generation, and optional NVLS probing only if the hardware/software logs clearly prove support. diff --git a/rl_engine/distributed/deterministic_allreduce.py b/rl_engine/distributed/deterministic_allreduce.py index 020ccee..6e2de84 100644 --- a/rl_engine/distributed/deterministic_allreduce.py +++ b/rl_engine/distributed/deterministic_allreduce.py @@ -21,9 +21,9 @@ @dataclass(frozen=True) class DeterministicAllReduceConfig: - """Configuration for :func:`deterministic_all_reduce`.""" + """Options for :func:`deterministic_all_reduce`.""" - mode: Literal["nccl_ring", "ordered_rank_fallback"] = "nccl_ring" + mode: Literal["torch_all_reduce", "ordered_rank_reference"] = "torch_all_reduce" op: Literal["sum", "mean"] = "sum" force_fp32_accumulation: bool = True async_op: bool = False @@ -31,21 +31,11 @@ class DeterministicAllReduceConfig: def configure_deterministic_nccl_env(*, overwrite: bool = False) -> dict[str, Optional[str]]: - """Configure the opt-in NCCL single-ring fast path environment. - - NCCL reads these variables during process-group initialization, so callers - should run this helper before ``torch.distributed.init_process_group``. The - helper returns the previous values so callers can log or restore them. - - Existing environment values are preserved by default. Pass - ``overwrite=True`` to force the RL-Kernel deterministic NCCL settings. - """ + """Set best-effort NCCL ring settings before process-group init.""" if dist.is_available() and dist.is_initialized(): warnings.warn( - "configure_deterministic_nccl_env() was called after " - "torch.distributed was initialized; NCCL may have already read its " - "collective configuration.", + "NCCL environment was configured after torch.distributed initialization", RuntimeWarning, stacklevel=2, ) @@ -55,10 +45,10 @@ def configure_deterministic_nccl_env(*, overwrite: bool = False) -> dict[str, Op previous[key] = os.environ.get(key) if overwrite or key not in os.environ: os.environ[key] = value - elif os.environ[key] != value: + continue + if os.environ[key] != value: warnings.warn( - f"{key} is already set to {os.environ[key]!r}; leaving it unchanged. " - f"Pass overwrite=True to set {value!r}.", + f"{key} is {os.environ[key]!r}; expected {value!r}", RuntimeWarning, stacklevel=2, ) @@ -69,58 +59,41 @@ def deterministic_all_reduce( tensor: torch.Tensor, config: Optional[DeterministicAllReduceConfig] = None, ) -> torch.Tensor: - """Reduce ``tensor`` in place and return it. - - ``mode="nccl_ring"`` uses ``torch.distributed.all_reduce``. Call - :func:`configure_deterministic_nccl_env` before process-group initialization - when using NCCL and single-ring/single-channel behavior is desired. - - ``mode="ordered_rank_fallback"`` gathers rank inputs, accumulates them on - rank 0 in ascending global rank order, and broadcasts the result. This path - is slow and memory-heavy, but it gives a concrete reference order for smoke - tests and unsupported hardware fallbacks. - """ + """Reduce ``tensor`` in place and return it.""" cfg = config or DeterministicAllReduceConfig() - _validate_config(tensor, cfg) + _validate(tensor, cfg) if cfg.async_op: - raise NotImplementedError("deterministic_all_reduce currently requires async_op=False") - + raise NotImplementedError("async deterministic all-reduce is not implemented") if not dist.is_available(): - raise RuntimeError("torch.distributed is unavailable in this PyTorch build") - + raise RuntimeError("torch.distributed is unavailable") if not dist.is_initialized(): if int(os.environ.get("WORLD_SIZE", "1")) > 1: - raise RuntimeError( - "torch.distributed is not initialized, but WORLD_SIZE indicates a " - "multi-rank launch" - ) + raise RuntimeError("torch.distributed is not initialized") return tensor world_size = dist.get_world_size(group=cfg.group) if world_size == 1: return tensor - if cfg.mode == "nccl_ring": - return _all_reduce_fast_path(tensor, cfg, world_size) - if cfg.mode == "ordered_rank_fallback": - return _ordered_rank_fallback(tensor, cfg, world_size) - raise ValueError(f"unsupported deterministic all-reduce mode: {cfg.mode!r}") + if cfg.mode == "torch_all_reduce": + return _torch_all_reduce(tensor, cfg, world_size) + return _ordered_rank_reference(tensor, cfg, world_size) -def _validate_config(tensor: torch.Tensor, cfg: DeterministicAllReduceConfig) -> None: +def _validate(tensor: torch.Tensor, cfg: DeterministicAllReduceConfig) -> None: if not isinstance(tensor, torch.Tensor): raise TypeError(f"tensor must be a torch.Tensor, got {type(tensor)!r}") + if cfg.mode not in {"torch_all_reduce", "ordered_rank_reference"}: + raise ValueError(f"unsupported all-reduce mode: {cfg.mode!r}") if cfg.op not in {"sum", "mean"}: raise ValueError(f"unsupported reduction op: {cfg.op!r}") - if cfg.mode not in {"nccl_ring", "ordered_rank_fallback"}: - raise ValueError(f"unsupported deterministic all-reduce mode: {cfg.mode!r}") if cfg.op == "mean" and not (tensor.is_floating_point() or tensor.is_complex()): raise TypeError("op='mean' requires a floating-point or complex tensor") -def _all_reduce_fast_path( +def _torch_all_reduce( tensor: torch.Tensor, cfg: DeterministicAllReduceConfig, world_size: int, @@ -131,7 +104,7 @@ def _all_reduce_fast_path( return tensor -def _ordered_rank_fallback( +def _ordered_rank_reference( tensor: torch.Tensor, cfg: DeterministicAllReduceConfig, world_size: int, @@ -140,24 +113,34 @@ def _ordered_rank_fallback( gathered = [torch.empty_like(send) for _ in range(world_size)] dist.all_gather(gathered, send, group=cfg.group) - rank = dist.get_rank(group=cfg.group) result = torch.empty_like(send) - if rank == 0: - accumulation_dtype = _accumulation_dtype(send, cfg.force_fp32_accumulation) - reduced = gathered[0].to(dtype=accumulation_dtype) - for rank_tensor in gathered[1:]: - reduced.add_(rank_tensor.to(dtype=accumulation_dtype)) + if dist.get_rank(group=cfg.group) == 0: + dtype = _accumulation_dtype(send, cfg.force_fp32_accumulation) + reduced = gathered[0].to(dtype=dtype) + for item in gathered[1:]: + reduced.add_(item.to(dtype=dtype)) if cfg.op == "mean": reduced.div_(world_size) result.copy_(reduced.to(dtype=send.dtype)) - dist.broadcast(result, src=0, group=cfg.group) + dist.broadcast(result, src=_group_root_global_rank(cfg.group), group=cfg.group) tensor.copy_(result.view_as(tensor)) return tensor -def _accumulation_dtype(tensor: torch.Tensor, force_fp32_accumulation: bool) -> torch.dtype: - if not force_fp32_accumulation or not tensor.is_floating_point(): +def _group_root_global_rank(group: Optional[dist.ProcessGroup]) -> int: + if group is None: + return 0 + try: + return int(dist.get_global_rank(group, 0)) + except AttributeError as exc: + raise RuntimeError( + "custom process groups require torch.distributed.get_global_rank" + ) from exc + + +def _accumulation_dtype(tensor: torch.Tensor, force_fp32: bool) -> torch.dtype: + if not force_fp32 or not tensor.is_floating_point(): return tensor.dtype if tensor.dtype == torch.float64: return torch.float64 diff --git a/tests/distributed/test_deterministic_allreduce.py b/tests/distributed/test_deterministic_allreduce.py index db3d794..9a4c71b 100644 --- a/tests/distributed/test_deterministic_allreduce.py +++ b/tests/distributed/test_deterministic_allreduce.py @@ -6,6 +6,9 @@ import argparse import json import os +import subprocess +import sys +from pathlib import Path from typing import Any import pytest @@ -44,20 +47,101 @@ def test_single_process_without_process_group_is_noop(): reduced = deterministic_all_reduce( tensor, - DeterministicAllReduceConfig(mode="ordered_rank_fallback", op="mean"), + DeterministicAllReduceConfig(mode="ordered_rank_reference", op="mean"), ) assert reduced is tensor assert torch.equal(tensor, torch.tensor([1.0, 2.0, 3.0])) +def test_ordered_rank_reference_gloo_smoke_runs_under_torchrun(): + if not (dist.is_available() and dist.is_gloo_available()): + pytest.skip("Gloo is unavailable") + + repo = Path(__file__).resolve().parents[2] + env = os.environ.copy() + env["PYTHONPATH"] = f"{repo}{os.pathsep}{env.get('PYTHONPATH', '')}" + cmd = [ + sys.executable, + "-m", + "torch.distributed.run", + "--standalone", + "--nproc_per_node=2", + str(Path(__file__).resolve()), + "--backend", + "gloo", + "--mode", + "ordered_rank_reference", + "--dtype", + "fp32", + "--device", + "cpu", + "--iterations", + "2", + ] + completed = subprocess.run( + cmd, + cwd=repo, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + timeout=120, + check=False, + ) + + assert completed.returncode == 0, completed.stdout + assert '"status": "pass"' in completed.stdout + + +def test_ordered_rank_reference_reverse_group_runs_under_torchrun(): + if not (dist.is_available() and dist.is_gloo_available()): + pytest.skip("Gloo is unavailable") + + repo = Path(__file__).resolve().parents[2] + env = os.environ.copy() + env["PYTHONPATH"] = f"{repo}{os.pathsep}{env.get('PYTHONPATH', '')}" + cmd = [ + sys.executable, + "-m", + "torch.distributed.run", + "--standalone", + "--nproc_per_node=2", + str(Path(__file__).resolve()), + "--backend", + "gloo", + "--mode", + "ordered_rank_reference", + "--dtype", + "fp32", + "--device", + "cpu", + "--iterations", + "2", + "--reverse-group", + ] + completed = subprocess.run( + cmd, + cwd=repo, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + timeout=120, + check=False, + ) + + assert completed.returncode == 0, completed.stdout + assert '"status": "pass"' in completed.stdout + + def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Deterministic all-reduce smoke test") + parser = argparse.ArgumentParser(description="all-reduce smoke test") parser.add_argument("--backend", choices=("gloo", "nccl"), default="gloo") parser.add_argument( "--mode", - choices=("ordered_rank_fallback", "nccl_ring"), - default="ordered_rank_fallback", + choices=("ordered_rank_reference", "torch_all_reduce"), + default="ordered_rank_reference", ) parser.add_argument("--op", choices=("sum", "mean"), default="sum") parser.add_argument("--dtype", choices=("fp32", "fp16", "bf16"), default="fp32") @@ -65,6 +149,7 @@ def _parse_args() -> argparse.Namespace: parser.add_argument("--numel", type=int, default=257) parser.add_argument("--iterations", type=int, default=3) parser.add_argument("--configure-nccl-env", action="store_true") + parser.add_argument("--reverse-group", action="store_true") parser.add_argument("--rtol", type=float, default=None) parser.add_argument("--atol", type=float, default=None) return parser.parse_args() @@ -78,7 +163,7 @@ def _device(args: argparse.Namespace) -> torch.device: if args.device == "cpu" or args.backend == "gloo": return torch.device("cpu") if not torch.cuda.is_available(): - raise RuntimeError("CUDA device requested but torch.cuda.is_available() is false") + raise RuntimeError("CUDA requested but unavailable") local_rank = int(os.environ.get("LOCAL_RANK", "0")) torch.cuda.set_device(local_rank) return torch.device("cuda", local_rank) @@ -87,15 +172,37 @@ def _device(args: argparse.Namespace) -> torch.device: def _make_input(rank: int, dtype: torch.dtype, device: torch.device, numel: int) -> torch.Tensor: base = torch.arange(numel, dtype=torch.float32, device=device) values = ((base % 17) - 8.0) / 17.0 - values = values + (rank + 1) * 0.03125 - return values.to(dtype=dtype) + return (values + (rank + 1) * 0.03125).to(dtype=dtype) + + +def _group_rank_order(group: dist.ProcessGroup | None, device: torch.device) -> list[int]: + rank = torch.tensor([dist.get_rank()], dtype=torch.int64, device=device) + gathered = [torch.empty_like(rank) for _ in range(dist.get_world_size(group=group))] + dist.all_gather(gathered, rank, group=group) + return [int(item.item()) for item in gathered] + + +def _expected_reduce( + rank_order: list[int], + dtype: torch.dtype, + device: torch.device, + numel: int, + op: str, +) -> torch.Tensor: + acc_dtype = torch.float32 if dtype != torch.float64 else torch.float64 + reduced = _make_input(rank_order[0], dtype, device, numel).to(dtype=acc_dtype) + for rank in rank_order[1:]: + reduced.add_(_make_input(rank, dtype, device, numel).to(dtype=acc_dtype)) + if op == "mean": + reduced.div_(len(rank_order)) + return reduced.to(dtype=dtype) def _tolerances(dtype: torch.dtype, args: argparse.Namespace) -> tuple[float, float]: if args.atol is not None and args.rtol is not None: return args.atol, args.rtol if dtype == torch.float32: - return 0.0 if args.mode == "ordered_rank_fallback" else 1.0e-6, 0.0 + return (0.0, 0.0) if args.mode == "ordered_rank_reference" else (1.0e-6, 0.0) if dtype == torch.bfloat16: return 8.0e-3, 8.0e-3 return 2.0e-3, 2.0e-3 @@ -105,8 +212,7 @@ def _diff_stats(actual: torch.Tensor, expected: torch.Tensor) -> dict[str, Any]: actual_f32 = actual.detach().to(torch.float32).cpu() expected_f32 = expected.detach().to(torch.float32).cpu() diff = (actual_f32 - expected_f32).abs() - denom = expected_f32.abs().clamp_min(1.0e-12) - rel = diff / denom + rel = diff / expected_f32.abs().clamp_min(1.0e-12) return { "bitwise_equal": bool(torch.equal(actual.detach().cpu(), expected.detach().cpu())), "max_abs_diff": float(diff.max().item()), @@ -118,46 +224,50 @@ def _diff_stats(actual: torch.Tensor, expected: torch.Tensor) -> dict[str, Any]: def _assert_close(actual: torch.Tensor, expected: torch.Tensor, atol: float, rtol: float) -> None: if not torch.allclose(actual, expected, atol=atol, rtol=rtol): stats = _diff_stats(actual, expected) - raise AssertionError( - "deterministic all-reduce mismatch: " - f"max_abs_diff={stats['max_abs_diff']} " - f"max_rel_diff={stats['max_rel_diff']} " - f"mismatch_count={stats['mismatch_count']}" - ) + raise AssertionError(f"all-reduce mismatch: {stats}") def _run_distributed_smoke(args: argparse.Namespace) -> None: - if args.configure_nccl_env or (args.backend == "nccl" and args.mode == "nccl_ring"): + if args.configure_nccl_env or (args.backend == "nccl" and args.mode == "torch_all_reduce"): configure_deterministic_nccl_env() + device = _device(args) dist.init_process_group(backend=args.backend) try: rank = dist.get_rank() world_size = dist.get_world_size() + group = None + if args.reverse_group: + group = dist.new_group(ranks=list(reversed(range(world_size)))) + dtype = _dtype(args.dtype) atol, rtol = _tolerances(dtype, args) + rank_order = _group_rank_order(group, device) + expected = _expected_reduce(rank_order, dtype, device, args.numel, args.op) previous: torch.Tensor | None = None - final_stats: dict[str, Any] = {} + stats: dict[str, Any] = {} for _ in range(args.iterations): - original = _make_input(rank, dtype, device, args.numel) - candidate = original.clone() - reference = original.clone() - + candidate = _make_input(rank, dtype, device, args.numel) + reference = candidate.clone() deterministic_all_reduce( candidate, - DeterministicAllReduceConfig(mode=args.mode, op=args.op), + DeterministicAllReduceConfig(mode=args.mode, op=args.op, group=group), ) deterministic_all_reduce( reference, - DeterministicAllReduceConfig(mode="ordered_rank_fallback", op=args.op), + DeterministicAllReduceConfig( + mode="ordered_rank_reference", + op=args.op, + group=group, + ), ) - - _assert_close(candidate, reference, atol=atol, rtol=rtol) + _assert_close(candidate, expected, atol, rtol) + _assert_close(reference, expected, atol, rtol) if previous is not None: - _assert_close(candidate, previous, atol=atol, rtol=rtol) + _assert_close(candidate, previous, atol, rtol) previous = candidate.clone() - final_stats = _diff_stats(candidate, reference) + stats = _diff_stats(candidate, expected) if rank == 0: print( @@ -171,7 +281,7 @@ def _run_distributed_smoke(args: argparse.Namespace) -> None: "device": str(device), "world_size": world_size, "iterations": args.iterations, - **final_stats, + **stats, }, sort_keys=True, ) diff --git a/tests/distributed/test_dp_gradient_determinism.py b/tests/distributed/test_dp_gradient_determinism.py index 57691b1..744e161 100644 --- a/tests/distributed/test_dp_gradient_determinism.py +++ b/tests/distributed/test_dp_gradient_determinism.py @@ -6,9 +6,13 @@ import argparse import json import os +import subprocess +import sys from dataclasses import dataclass +from pathlib import Path from typing import Any +import pytest import torch import torch.distributed as dist import torch.nn.functional as F @@ -49,13 +53,51 @@ def test_fixed_batch_is_reproducible(): assert torch.equal(first[1], second[1]) +def test_dp_gradient_gloo_smoke_runs_under_torchrun(): + if not (dist.is_available() and dist.is_gloo_available()): + pytest.skip("Gloo is unavailable") + + repo = Path(__file__).resolve().parents[2] + env = os.environ.copy() + env["PYTHONPATH"] = f"{repo}{os.pathsep}{env.get('PYTHONPATH', '')}" + cmd = [ + sys.executable, + "-m", + "torch.distributed.run", + "--standalone", + "--nproc_per_node=2", + str(Path(__file__).resolve()), + "--backend", + "gloo", + "--mode", + "ordered_rank_reference", + "--dtype", + "fp32", + "--device", + "cpu", + ] + completed = subprocess.run( + cmd, + cwd=repo, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + timeout=120, + check=False, + ) + + assert completed.returncode == 0, completed.stdout + assert '"status": "pass"' in completed.stdout + + def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="DP gradient determinism smoke test") parser.add_argument("--backend", choices=("gloo", "nccl"), default="gloo") parser.add_argument( "--mode", - choices=("ordered_rank_fallback", "nccl_ring"), - default="ordered_rank_fallback", + choices=("ordered_rank_reference", "torch_all_reduce"), + default="ordered_rank_reference", ) parser.add_argument("--dtype", choices=("fp32", "fp16", "bf16"), default="fp32") parser.add_argument("--device", choices=("auto", "cpu", "cuda"), default="auto") @@ -221,7 +263,7 @@ def _compare_gradients( def _run_distributed_smoke(args: argparse.Namespace) -> None: - if args.configure_nccl_env or (args.backend == "nccl" and args.mode == "nccl_ring"): + if args.configure_nccl_env or (args.backend == "nccl" and args.mode == "torch_all_reduce"): configure_deterministic_nccl_env() device = _device(args) _set_deterministic_controls(args.seed)