From dd0271e9daa01894f13b3355e3a59c7198a38567 Mon Sep 17 00:00:00 2001 From: Sean Rooney Date: Wed, 1 Oct 2025 15:50:44 +0200 Subject: [PATCH] added offset --- .../cudf-exchange/Communicator.cpp | 4 +++- .../experimental/cudf-exchange/Communicator.h | 19 ++++++++++++++++++- .../cudf-exchange/CudfExchangeSource.cpp | 4 +++- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/velox/experimental/cudf-exchange/Communicator.cpp b/velox/experimental/cudf-exchange/Communicator.cpp index 5727a425d71..e18ea0d060a 100644 --- a/velox/experimental/cudf-exchange/Communicator.cpp +++ b/velox/experimental/cudf-exchange/Communicator.cpp @@ -29,13 +29,15 @@ std::once_flag Communicator::onceFlag; std::shared_ptr Communicator::instancePtr_ = nullptr; /* static */ -std::shared_ptr Communicator::initAndGet(uint16_t port) { +std::shared_ptr Communicator::initAndGet(uint16_t port,uint16_t offset) { if (!FLAGS_velox_cudf_exchange) { return nullptr; } std::call_once(onceFlag, [&] { instancePtr_ = std::shared_ptr(new Communicator()); instancePtr_->port_ = port; + instancePtr_->portOffset_ = offset; + }); VELOX_CHECK( instancePtr_->port_ == port, diff --git a/velox/experimental/cudf-exchange/Communicator.h b/velox/experimental/cudf-exchange/Communicator.h index 5ea99f88db4..597c4696a6e 100644 --- a/velox/experimental/cudf-exchange/Communicator.h +++ b/velox/experimental/cudf-exchange/Communicator.h @@ -53,7 +53,7 @@ class Communicator { const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "velox"; const ucxx::AmReceiverCallbackIdType kAmCallbackId = 123; - static std::shared_ptr initAndGet(uint16_t port); + static std::shared_ptr initAndGet(uint16_t port,uint16_t offset=3); /// @brief Method to get the Communicator reference static std::shared_ptr getInstance(); @@ -97,6 +97,22 @@ class Communicator { /// the endpoint has become stale since the other side has disappeared. void removeEndpointRef(std::shared_ptr ep); + /// @brief the port that the CudfExchangeServer will us + /// @return the port + + uint16_t getPort() { + return port_; + } + + /// @brief the offset that the CudfExchangeServer is from the HttpExchangeServer + /// This allows a client to determine the port to use to do a CudfExchange given the + /// task URL on the remote worker. It is a hack but allows us not to have change the coordinator code + + uint16_t getPortOffset () { + return portOffset_; + } + + private: Communicator() = default; // Private constructor to prevent direct instantiation @@ -119,6 +135,7 @@ class Communicator { std::shared_ptr worker_; std::shared_ptr listener_; uint16_t port_; + uint16_t portOffset_; std::atomic running_; Acceptor acceptor_; diff --git a/velox/experimental/cudf-exchange/CudfExchangeSource.cpp b/velox/experimental/cudf-exchange/CudfExchangeSource.cpp index 2d567776c0a..3c9a7f1637f 100644 --- a/velox/experimental/cudf-exchange/CudfExchangeSource.cpp +++ b/velox/experimental/cudf-exchange/CudfExchangeSource.cpp @@ -52,8 +52,10 @@ std::shared_ptr CudfExchangeSource::create( // For the time being, there's an ugly hack that just increases the port by 3. VLOG(3) << " Creating CudfExchangeSource " << url; const std::string host = uri.host(); - int port = uri.port() + 3; std::shared_ptr communicator = Communicator::getInstance(); + uint16_t offset = communicator->getPortOffset(); + int port = uri.port() + offset; + auto key = extractTaskAndDestinationId(uri.path()); auto source = std::shared_ptr( new CudfExchangeSource(communicator, host, port, key, queue));