Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion velox/experimental/cudf-exchange/Communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ std::once_flag Communicator::onceFlag;
std::shared_ptr<Communicator> Communicator::instancePtr_ = nullptr;

/* static */
std::shared_ptr<Communicator> Communicator::initAndGet(uint16_t port) {
std::shared_ptr<Communicator> Communicator::initAndGet(uint16_t port,uint16_t offset) {
if (!FLAGS_velox_cudf_exchange) {
return nullptr;
}
std::call_once(onceFlag, [&] {
instancePtr_ = std::shared_ptr<Communicator>(new Communicator());
instancePtr_->port_ = port;
instancePtr_->portOffset_ = offset;

});
VELOX_CHECK(
instancePtr_->port_ == port,
Expand Down
19 changes: 18 additions & 1 deletion velox/experimental/cudf-exchange/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Communicator {
const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "velox";
const ucxx::AmReceiverCallbackIdType kAmCallbackId = 123;

static std::shared_ptr<Communicator> initAndGet(uint16_t port);
static std::shared_ptr<Communicator> initAndGet(uint16_t port,uint16_t offset=3);

/// @brief Method to get the Communicator reference
static std::shared_ptr<Communicator> getInstance();
Expand Down Expand Up @@ -97,6 +97,22 @@ class Communicator {
/// the endpoint has become stale since the other side has disappeared.
void removeEndpointRef(std::shared_ptr<EndpointRef> ep);

/// @brief the port that the CudfExchangeServer will us
/// @return the port

uint16_t getPort() {
return port_;
}

/// @brief the offset that the CudfExchangeServer is from the HttpExchangeServer
/// This allows a client to determine the port to use to do a CudfExchange given the
/// task URL on the remote worker. It is a hack but allows us not to have change the coordinator code

uint16_t getPortOffset () {
return portOffset_;
}


private:
Communicator() =
default; // Private constructor to prevent direct instantiation
Expand All @@ -119,6 +135,7 @@ class Communicator {
std::shared_ptr<ucxx::Worker> worker_;
std::shared_ptr<ucxx::Listener> listener_;
uint16_t port_;
uint16_t portOffset_;
std::atomic<bool> running_;
Acceptor acceptor_;

Expand Down
4 changes: 3 additions & 1 deletion velox/experimental/cudf-exchange/CudfExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ std::shared_ptr<CudfExchangeSource> CudfExchangeSource::create(
// For the time being, there's an ugly hack that just increases the port by 3.
VLOG(3) << " Creating CudfExchangeSource " << url;
const std::string host = uri.host();
int port = uri.port() + 3;
std::shared_ptr<Communicator> communicator = Communicator::getInstance();
uint16_t offset = communicator->getPortOffset();
int port = uri.port() + offset;

auto key = extractTaskAndDestinationId(uri.path());
auto source = std::shared_ptr<CudfExchangeSource>(
new CudfExchangeSource(communicator, host, port, key, queue));
Expand Down