From f0699529a83d327758e5a024eaa2129a135ce84b Mon Sep 17 00:00:00 2001 From: Feng Tian Date: Wed, 8 Apr 2026 19:06:16 -0700 Subject: [PATCH 1/2] refactor existing AGP to make its utils to be resuable (#1971) Summary: We need to align AGP's internal persistent request with the window API so that regular AllGather can be converted to AGP in graph capture mode (window init + dry-run exec at capture time, SM-free CE replay at execution time). In this diff, we extract `initResources` to a public func so that window AGP can directly call it Reviewed By: dsjohns2 Differential Revision: D99514784 --- comms/ctran/algos/AllGatherP/AlgoImpl.h | 3 +++ comms/ctran/algos/AllGatherP/AllGatherP.cc | 17 +++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/comms/ctran/algos/AllGatherP/AlgoImpl.h b/comms/ctran/algos/AllGatherP/AlgoImpl.h index d0b04033d..2fb0832c1 100644 --- a/comms/ctran/algos/AllGatherP/AlgoImpl.h +++ b/comms/ctran/algos/AllGatherP/AlgoImpl.h @@ -52,6 +52,9 @@ class AlgoImpl { } } + // Allocate pipeSync and other internal resources. + commResult_t initResources(); + private: // Wait till either the async initialization is done or hit async error. // It is called before execution scheduling any CE copy to the stream. diff --git a/comms/ctran/algos/AllGatherP/AllGatherP.cc b/comms/ctran/algos/AllGatherP/AllGatherP.cc index dacb0c545..5d04720d4 100644 --- a/comms/ctran/algos/AllGatherP/AllGatherP.cc +++ b/comms/ctran/algos/AllGatherP/AllGatherP.cc @@ -68,6 +68,16 @@ extern __global__ void ncclKernelAllGatherPInit( int* flag, CtranAlgoDeviceState* devState); +commResult_t AlgoImpl::initResources() { + void* base = nullptr; + FB_CUDACHECK( + cudaHostAlloc(&base, sizeof(GpeKernelSync), cudaHostAllocDefault)); + + resource_.pipeSync = reinterpret_cast(base); + new (resource_.pipeSync) GpeKernelSync(1 /* numWorkers */); + return commSuccess; +} + commResult_t AlgoImpl::initialize() { auto opCount = comm_->ctran_->getOpCount(); CTRAN_COLL_INFO( @@ -80,12 +90,7 @@ commResult_t AlgoImpl::initialize() { comm_, stream_); - void* base = nullptr; - FB_CUDACHECK( - cudaHostAlloc(&base, sizeof(GpeKernelSync), cudaHostAllocDefault)); - - resource_.pipeSync = reinterpret_cast(base); - new (resource_.pipeSync) GpeKernelSync(1 /* numWorkers */); + FB_COMMCHECK(initResources()); KernelConfig config = KernelConfig( KernelConfig::KernelType::ALLGATHERP_INIT, From 43cabb0b8077766dddfc383794f7f5d7116ced03 Mon Sep 17 00:00:00 2001 From: Feng Tian Date: Wed, 8 Apr 2026 19:06:16 -0700 Subject: [PATCH 2/2] fix RMATest failure and reduce testing cost (#2002) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: The NvlEnabledTestParam test was hanging because it had two NCCL communicators alive simultaneously — and it only needed one. How it happened: D95122239 migrated RMATest from ::testing::Test to NcclxBaseTest, which made SetUp() create a full NCCL communicator (this->comm) for every test. This was correct for RMATestParam and MultiWindowTestParam which use this->comm. But NvlEnabledTestParam never uses it — it creates its own communicator with parameterized backends (NVL+IB or IB-only). Nobody noticed the test body was now running with two active comms. Why it hangs: After 97 prior test cycles (each creating/destroying a comm with ~54MB pinned memory across 8 ranks), the CUDA pinned memory pool is fragmented. When NvlEnabledTestParam tries to bootstrap its second comm while the fixture's comm is still alive, one or more ranks can't complete cudaHostAlloc → ncclSocketAccept blocks waiting for that rank → all ranks hang → parent process sends SIGTERM. The fix: Override SetUp()/TearDown() in NvlEnabledTestParam to skip the unnecessary fixture comm creation. Now only one comm exists at a time, and the bootstrap completes cleanly. NOTE: we also decrease the NumIter to save test resources. And add re_timeout to avoid false alarm. Reviewed By: dolpm Differential Revision: D100051366 --- comms/ncclx/v2_28/meta/rma/tests/RMATest.cc | 69 +++++++++++++++++---- 1 file changed, 58 insertions(+), 11 deletions(-) diff --git a/comms/ncclx/v2_28/meta/rma/tests/RMATest.cc b/comms/ncclx/v2_28/meta/rma/tests/RMATest.cc index fa2e13c54..e6cc5d3d1 100644 --- a/comms/ncclx/v2_28/meta/rma/tests/RMATest.cc +++ b/comms/ncclx/v2_28/meta/rma/tests/RMATest.cc @@ -23,12 +23,7 @@ class RMATest : public NcclxBaseTest { NcclxBaseTest::SetUp(); this->comm = createNcclComm( - this->globalRank, - this->numRanks, - this->localRank, - false, - nullptr, - server.get()); + this->globalRank, this->numRanks, this->localRank, bootstrap_.get()); ASSERT_NE(this->comm, nullptr); } void TearDown() override { @@ -174,6 +169,47 @@ TEST_P(MultiWindowTestParam, multiWindow) { class RMATestParam : public RMATest, public ::testing::WithParamInterface< std::tuple> { + protected: + // Share a single communicator across all parameterized test cases. + // createNcclComm is expensive in multi-node configs (cross-node transport + // setup via socket bootstrap). Reusing it avoids per-test-case overhead. + static inline ncclComm_t shared_comm_ = nullptr; + + void SetUp() override { + setenv("NCCL_CTRAN_ENABLE", "1", 0); + setenv("NCCL_CTRAN_IB_EPOCH_LOCK_ENFORCE_CHECK", "true", 0); + NcclxBaseTest::SetUp(); + + if (shared_comm_ == nullptr) { + shared_comm_ = createNcclComm( + this->globalRank, + this->numRanks, + this->localRank, + bootstrap_.get()); + ASSERT_NE(shared_comm_, nullptr); + } + this->comm = shared_comm_; + } + + void TearDown() override { + // Barrier to sync all ranks before moving to the next test case. + // Ensures no rank starts TearDownTestSuite (ncclCommDestroy) while + // another is still using the shared comm. + if (shared_comm_ != nullptr) { + this->barrier(shared_comm_, nullptr); + } + // Don't destroy the shared comm — TearDownTestSuite handles it. + this->comm = nullptr; + EXPECT_TRUE(segments.empty()) << "Not all memory segments were freed"; + NcclxBaseTest::TearDown(); + } + + static void TearDownTestSuite() { + if (shared_comm_ != nullptr) { + ncclCommDestroy(shared_comm_); + shared_comm_ = nullptr; + } + } }; TEST_P(RMATestParam, winPutWait) { @@ -505,7 +541,7 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Combine( // kNumElements, kNumIters, ctranAllReduce, bufType, userBuf ::testing::Values(8192, 8 * 1024 * 1024), - ::testing::Values(500), + ::testing::Values(50), ::testing::Values(true, false), ::testing::Values( MemAllocType::kMemNcclMemAlloc, @@ -537,7 +573,20 @@ INSTANTIATE_TEST_SUITE_P( class NvlEnabledTestParam : public RMATest, public ::testing::WithParamInterface< - std::tuple, bool>> {}; + std::tuple, bool>> { + protected: + // Skip fixture comm creation — this test creates its own comm with + // parameterized backends and never uses the fixture's this->comm. + void SetUp() override { + setenv("NCCL_CTRAN_ENABLE", "1", 0); + setenv("NCCL_CTRAN_IB_EPOCH_LOCK_ENFORCE_CHECK", "true", 0); + NcclxBaseTest::SetUp(); + } + void TearDown() override { + EXPECT_TRUE(segments.empty()) << "Not all memory segments were freed"; + NcclxBaseTest::TearDown(); + } +}; TEST_P(NvlEnabledTestParam, ncclWinGetAttributes) { const auto& [backends, expectNvlEnabled] = GetParam(); @@ -548,9 +597,7 @@ TEST_P(NvlEnabledTestParam, ncclWinGetAttributes) { this->globalRank, this->numRanks, this->localRank, - false, - nullptr, - server.get()); + bootstrap_.get()); ASSERT_NE(comm, nullptr); auto statex = comm->ctranComm_->statex_.get();