Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions comms/ctran/algos/AllGatherP/AlgoImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class AlgoImpl {
}
}

// Allocate pipeSync and other internal resources.
commResult_t initResources();

private:
// Wait till either the async initialization is done or hit async error.
// It is called before execution scheduling any CE copy to the stream.
Expand Down
17 changes: 11 additions & 6 deletions comms/ctran/algos/AllGatherP/AllGatherP.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ extern __global__ void ncclKernelAllGatherPInit(
int* flag,
CtranAlgoDeviceState* devState);

commResult_t AlgoImpl::initResources() {
void* base = nullptr;
FB_CUDACHECK(
cudaHostAlloc(&base, sizeof(GpeKernelSync), cudaHostAllocDefault));

resource_.pipeSync = reinterpret_cast<GpeKernelSync*>(base);
new (resource_.pipeSync) GpeKernelSync(1 /* numWorkers */);
return commSuccess;
}

commResult_t AlgoImpl::initialize() {
auto opCount = comm_->ctran_->getOpCount();
CTRAN_COLL_INFO(
Expand All @@ -80,12 +90,7 @@ commResult_t AlgoImpl::initialize() {
comm_,
stream_);

void* base = nullptr;
FB_CUDACHECK(
cudaHostAlloc(&base, sizeof(GpeKernelSync), cudaHostAllocDefault));

resource_.pipeSync = reinterpret_cast<GpeKernelSync*>(base);
new (resource_.pipeSync) GpeKernelSync(1 /* numWorkers */);
FB_COMMCHECK(initResources());

KernelConfig config = KernelConfig(
KernelConfig::KernelType::ALLGATHERP_INIT,
Expand Down
69 changes: 58 additions & 11 deletions comms/ncclx/v2_28/meta/rma/tests/RMATest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ class RMATest : public NcclxBaseTest {
NcclxBaseTest::SetUp();

this->comm = createNcclComm(
this->globalRank,
this->numRanks,
this->localRank,
false,
nullptr,
server.get());
this->globalRank, this->numRanks, this->localRank, bootstrap_.get());
ASSERT_NE(this->comm, nullptr);
}
void TearDown() override {
Expand Down Expand Up @@ -174,6 +169,47 @@ TEST_P(MultiWindowTestParam, multiWindow) {
class RMATestParam : public RMATest,
public ::testing::WithParamInterface<
std::tuple<size_t, size_t, bool, MemAllocType, bool>> {
protected:
// Share a single communicator across all parameterized test cases.
// createNcclComm is expensive in multi-node configs (cross-node transport
// setup via socket bootstrap). Reusing it avoids per-test-case overhead.
static inline ncclComm_t shared_comm_ = nullptr;

void SetUp() override {
setenv("NCCL_CTRAN_ENABLE", "1", 0);
setenv("NCCL_CTRAN_IB_EPOCH_LOCK_ENFORCE_CHECK", "true", 0);
NcclxBaseTest::SetUp();

if (shared_comm_ == nullptr) {
shared_comm_ = createNcclComm(
this->globalRank,
this->numRanks,
this->localRank,
bootstrap_.get());
ASSERT_NE(shared_comm_, nullptr);
}
this->comm = shared_comm_;
}

void TearDown() override {
// Barrier to sync all ranks before moving to the next test case.
// Ensures no rank starts TearDownTestSuite (ncclCommDestroy) while
// another is still using the shared comm.
if (shared_comm_ != nullptr) {
this->barrier(shared_comm_, nullptr);
}
// Don't destroy the shared comm — TearDownTestSuite handles it.
this->comm = nullptr;
EXPECT_TRUE(segments.empty()) << "Not all memory segments were freed";
NcclxBaseTest::TearDown();
}

static void TearDownTestSuite() {
if (shared_comm_ != nullptr) {
ncclCommDestroy(shared_comm_);
shared_comm_ = nullptr;
}
}
};

TEST_P(RMATestParam, winPutWait) {
Expand Down Expand Up @@ -505,7 +541,7 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Combine(
// kNumElements, kNumIters, ctranAllReduce, bufType, userBuf
::testing::Values(8192, 8 * 1024 * 1024),
::testing::Values(500),
::testing::Values(50),
::testing::Values(true, false),
::testing::Values(
MemAllocType::kMemNcclMemAlloc,
Expand Down Expand Up @@ -537,7 +573,20 @@ INSTANTIATE_TEST_SUITE_P(
class NvlEnabledTestParam
: public RMATest,
public ::testing::WithParamInterface<
std::tuple<std::vector<enum NCCL_CTRAN_BACKENDS>, bool>> {};
std::tuple<std::vector<enum NCCL_CTRAN_BACKENDS>, bool>> {
protected:
// Skip fixture comm creation — this test creates its own comm with
// parameterized backends and never uses the fixture's this->comm.
void SetUp() override {
setenv("NCCL_CTRAN_ENABLE", "1", 0);
setenv("NCCL_CTRAN_IB_EPOCH_LOCK_ENFORCE_CHECK", "true", 0);
NcclxBaseTest::SetUp();
}
void TearDown() override {
EXPECT_TRUE(segments.empty()) << "Not all memory segments were freed";
NcclxBaseTest::TearDown();
}
};

TEST_P(NvlEnabledTestParam, ncclWinGetAttributes) {
const auto& [backends, expectNvlEnabled] = GetParam();
Expand All @@ -548,9 +597,7 @@ TEST_P(NvlEnabledTestParam, ncclWinGetAttributes) {
this->globalRank,
this->numRanks,
this->localRank,
false,
nullptr,
server.get());
bootstrap_.get());
ASSERT_NE(comm, nullptr);

auto statex = comm->ctranComm_->statex_.get();
Expand Down
Loading