diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index d4c2b2c8754b..6bb740339f78 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -81,6 +81,7 @@ import org.apache.pinot.common.version.PinotVersion; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory; +import org.apache.pinot.core.routing.MultiClusterRoutingContext; import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.core.util.ListenerConfigUtil; @@ -150,6 +151,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable { protected HelixAdmin _helixAdmin; protected ZkHelixPropertyStore _propertyStore; protected HelixDataAccessor _helixDataAccessor; + protected TableCache _tableCache; protected PinotMetricsRegistry _metricsRegistry; protected BrokerMetrics _brokerMetrics; protected BrokerRoutingManager _routingManager; @@ -310,12 +312,7 @@ public void start() Utils.logVersions(); LOGGER.info("Connecting spectator Helix manager"); - _spectatorHelixManager = - HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.SPECTATOR, _zkServers); - _spectatorHelixManager.connect(); - _helixAdmin = _spectatorHelixManager.getClusterManagmentTool(); - _propertyStore = _spectatorHelixManager.getHelixPropertyStore(); - _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor(); + initSpectatorHelixManager(); LOGGER.info("Setting up broker request handler"); // Set up metric registry and broker metrics @@ -336,8 +333,8 @@ public void start() // Set up request handling classes _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, _brokerMetrics); _serverRoutingStatsManager.init(); - _routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf); - _routingManager.init(_spectatorHelixManager); + initRoutingManager(); + final PinotConfiguration factoryConf = _brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX); // Adding cluster name to the config so that it can be used by the AccessControlFactory factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME)); @@ -353,7 +350,7 @@ public void start() FunctionRegistry.init(); boolean caseInsensitive = _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE); - TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive); + _tableCache = new ZkTableCache(_propertyStore, caseInsensitive); LOGGER.info("Initializing Broker Event Listener Factory"); BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX)); @@ -382,6 +379,9 @@ public void start() org.apache.pinot.spi.config.instance.InstanceType.BROKER); _threadAccountant.startWatcherTask(); + // TODO: Hook multiClusterRoutingContext into request handlers subsequently. + MultiClusterRoutingContext multiClusterRoutingContext = getMultiClusterRoutingContext(); + // Create Broker request handler. String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId()); BrokerRequestIdGenerator requestIdGenerator = new BrokerRequestIdGenerator(); @@ -391,7 +391,7 @@ public void start() if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) { singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager, - _accessControlFactory, _queryQuotaManager, tableCache, _failureDetector, _threadAccountant); + _accessControlFactory, _queryQuotaManager, _tableCache, _failureDetector, _threadAccountant); } else { // Default request handler type, i.e. netty NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX); @@ -402,7 +402,7 @@ public void start() } singleStageBrokerRequestHandler = new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager, - _accessControlFactory, _queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, + _accessControlFactory, _queryQuotaManager, _tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager, _failureDetector, _threadAccountant); } MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null; @@ -416,7 +416,7 @@ public void start() queryDispatcher = createQueryDispatcher(_brokerConf); multiStageBrokerRequestHandler = new MultiStageBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager, - _accessControlFactory, _queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector, + _accessControlFactory, _queryQuotaManager, _tableCache, _multiStageQueryThrottler, _failureDetector, _threadAccountant); } TimeSeriesRequestHandler timeSeriesRequestHandler = null; @@ -424,7 +424,7 @@ public void start() Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine"); timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager, - _accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher, _threadAccountant); + _accessControlFactory, _queryQuotaManager, _tableCache, queryDispatcher, _threadAccountant); } LOGGER.info("Initializing PinotFSFactory"); @@ -471,6 +471,45 @@ public void start() } LOGGER.info("Initializing cluster change mediator"); + initClusterChangeMediator(); + + LOGGER.info("Connecting participant Helix manager"); + _participantHelixManager = + HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers); + // Register state model factory + _participantHelixManager.getStateMachineEngine() + .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), + new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _routingManager, + _queryQuotaManager)); + // Register user-define message handler factory + _participantHelixManager.getMessagingService() + .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), + new BrokerUserDefinedMessageHandlerFactory(_routingManager, _queryQuotaManager)); + _participantHelixManager.connect(); + updateInstanceConfigAndBrokerResourceIfNeeded(); + _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, + () -> _participantHelixManager.isConnected() ? 1L : 0L); + _participantHelixManager.addPreConnectCallback( + () -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); + + // Initializing Groovy execution security + GroovyFunctionEvaluator.configureGroovySecurity( + _brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG, + _brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG))); + + // Register the service status handler + registerServiceStatusHandler(); + + _isStarting = false; + _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS, + System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); + + _defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE); + + LOGGER.info("Finish starting Pinot broker"); + } + + protected void initClusterChangeMediator() throws Exception { for (ClusterChangeHandler clusterConfigChangeHandler : _clusterConfigChangeHandlers) { clusterConfigChangeHandler.init(_spectatorHelixManager); } @@ -515,41 +554,27 @@ public void start() if (!_liveInstanceChangeHandlers.isEmpty()) { _spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator); } + } - LOGGER.info("Connecting participant Helix manager"); - _participantHelixManager = - HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers); - // Register state model factory - _participantHelixManager.getStateMachineEngine() - .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), - new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _routingManager, - _queryQuotaManager)); - // Register user-define message handler factory - _participantHelixManager.getMessagingService() - .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), - new BrokerUserDefinedMessageHandlerFactory(_routingManager, _queryQuotaManager)); - _participantHelixManager.connect(); - updateInstanceConfigAndBrokerResourceIfNeeded(); - _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, - () -> _participantHelixManager.isConnected() ? 1L : 0L); - _participantHelixManager.addPreConnectCallback( - () -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); - - // Initializing Groovy execution security - GroovyFunctionEvaluator.configureGroovySecurity( - _brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG, - _brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG))); - - // Register the service status handler - registerServiceStatusHandler(); - - _isStarting = false; - _brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS, - System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS); + protected void initRoutingManager() throws Exception { + _routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf); + _routingManager.init(_spectatorHelixManager); + } - _defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE); + protected void initSpectatorHelixManager() throws Exception { + _spectatorHelixManager = + HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.SPECTATOR, _zkServers); + _spectatorHelixManager.connect(); + _helixAdmin = _spectatorHelixManager.getClusterManagmentTool(); + _propertyStore = _spectatorHelixManager.getHelixPropertyStore(); + _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor(); + } - LOGGER.info("Finish starting Pinot broker"); + /** + * Can be overridden to inject a custom MultiClusterRoutingContext from MultiClusterBrokerStarter. + */ + protected MultiClusterRoutingContext getMultiClusterRoutingContext() { + return null; } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java new file mode 100644 index 000000000000..8c40f3a04998 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/MultiClusterHelixBrokerStarter.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.broker.helix; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixConstants.ChangeType; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.pinot.broker.routing.manager.MultiClusterRoutingManager; +import org.apache.pinot.broker.routing.manager.RemoteClusterBrokerRoutingManager; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.config.provider.ZkTableCache; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.core.routing.MultiClusterRoutingContext; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants.Helix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Multi-cluster broker starter that extends the base Helix broker functionality + * to support federation across multiple Pinot clusters. + * + * This class handles: + * - Connection to remote clusters via separate ZooKeeper instances + * - Federated routing across primary and remote clusters + * - Cross-cluster query federation + * - Cluster change monitoring for remote clusters + */ +@SuppressWarnings("unused") +public class MultiClusterHelixBrokerStarter extends BaseBrokerStarter { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiClusterHelixBrokerStarter.class); + + // Remote cluster configuration + protected List _remoteClusterNames; + protected Map _remoteZkServers; + protected String _remoteInstanceId; + + // Remote cluster Helix managers and routing + protected Map _remoteSpectatorHelixManager; + protected Map _remoteRoutingManagers; + protected MultiClusterRoutingManager _multiClusterRoutingManager; + protected MultiClusterRoutingContext _multiClusterRoutingContext; + protected Map _remoteClusterChangeMediator; + + public MultiClusterHelixBrokerStarter() { + } + + @Override + public void init(PinotConfiguration brokerConf) + throws Exception { + super.init(brokerConf); + _remoteInstanceId = _instanceId + "_remote"; + initRemoteClusterNamesAndZk(brokerConf); + } + + @Override + public void start() + throws Exception { + LOGGER.info("[multi-cluster] Starting multi-cluster broker"); + super.start(); + // build routing tables for remote clusters + initRemoteClusterRouting(); + LOGGER.info("[multi-cluster] Multi-cluster broker started successfully"); + } + + @Override + protected void initSpectatorHelixManager() throws Exception { + super.initSpectatorHelixManager(); + try { + initRemoteClusterSpectatorHelixManagers(); + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to initialize remote cluster spectator Helix managers", e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + @Override + protected void initRoutingManager() throws Exception { + super.initRoutingManager(); + try { + initRemoteClusterFederatedRoutingManager(); + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to initialize remote cluster federated routing manager", e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + @Override + protected void initClusterChangeMediator() throws Exception { + super.initClusterChangeMediator(); + try { + initRemoteClusterChangeMediator(); + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to initialize remote cluster change mediator", e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + private void initRemoteClusterSpectatorHelixManagers() throws Exception { + if (_remoteZkServers == null || _remoteZkServers.isEmpty()) { + LOGGER.info("[multi-cluster] No remote ZK servers configured - skipping spectator Helix manager init"); + return; + } + + LOGGER.info("[multi-cluster] Initializing spectator Helix managers for {} remote clusters", + _remoteZkServers.size()); + _remoteSpectatorHelixManager = new HashMap<>(); + + for (Map.Entry entry : _remoteZkServers.entrySet()) { + String clusterName = entry.getKey(); + String zkServers = entry.getValue(); + try { + HelixManager helixManager = HelixManagerFactory.getZKHelixManager( + clusterName, _instanceId, InstanceType.SPECTATOR, zkServers); + helixManager.connect(); + _remoteSpectatorHelixManager.put(clusterName, helixManager); + LOGGER.info("[multi-cluster] Connected to remote cluster '{}' at ZK: {}", clusterName, zkServers); + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to connect to cluster '{}' at ZK: {}", clusterName, zkServers, e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + if (_remoteSpectatorHelixManager.isEmpty()) { + LOGGER.warn("[multi-cluster] Failed to connect to any remote clusters - " + + "multi-cluster will not be functional"); + } else { + LOGGER.info("[multi-cluster] Connected to {}/{} remote clusters: {}", _remoteSpectatorHelixManager.size(), + _remoteZkServers.size(), _remoteSpectatorHelixManager.keySet()); + } + } + + protected void stopRemoteClusterComponents() { + if (_remoteClusterChangeMediator != null) { + _remoteClusterChangeMediator.values().forEach(ClusterChangeMediator::stop); + } + if (_remoteRoutingManagers != null) { + for (RemoteClusterBrokerRoutingManager routingManager : _remoteRoutingManagers.values()) { + routingManager.shutdown(); + } + } + if (_remoteSpectatorHelixManager != null) { + _remoteSpectatorHelixManager.values().forEach(HelixManager::disconnect); + } + } + + private void initRemoteClusterNamesAndZk(PinotConfiguration brokerConf) { + LOGGER.info("[multi-cluster] Initializing remote cluster configuration"); + String remoteClusterNames = brokerConf.getProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES); + + if (remoteClusterNames == null || remoteClusterNames.trim().isEmpty()) { + LOGGER.info("[multi-cluster] No remote cluster configured - multi-cluster mode disabled"); + return; + } + + _remoteClusterNames = Arrays.asList(remoteClusterNames.replaceAll("\\s+", "").split(",")); + if (_remoteClusterNames.isEmpty()) { + LOGGER.warn("[multi-cluster] Remote cluster names list is empty after parsing"); + return; + } + LOGGER.info("[multi-cluster] Configured remote cluster names: {}", _remoteClusterNames); + + _remoteZkServers = new HashMap<>(); + for (String name : _remoteClusterNames) { + String zkConfig = String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS, name); + String zkServers = brokerConf.getProperty(zkConfig); + + if (zkServers == null || zkServers.trim().isEmpty()) { + LOGGER.error("[multi-cluster] Missing ZooKeeper configuration for cluster '{}', expected: {}", name, zkConfig); + continue; + } + _remoteZkServers.put(name, zkServers.replaceAll("\\s+", "")); + } + + if (_remoteZkServers.isEmpty()) { + LOGGER.error("[multi-cluster] No valid ZooKeeper configurations found - multi-cluster will not be functional"); + _remoteClusterNames = null; + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } else { + LOGGER.info("[multi-cluster] Initialized {} remote cluster(s): {}", _remoteZkServers.size(), + _remoteZkServers.keySet()); + } + } + + private void initRemoteClusterFederatedRoutingManager() { + if (_remoteSpectatorHelixManager == null || _remoteSpectatorHelixManager.isEmpty()) { + LOGGER.info("[multi-cluster] No remote spectator Helix managers - skipping federated routing manager init"); + return; + } + + LOGGER.info("[multi-cluster] Initializing federated routing manager for {} clusters", + _remoteSpectatorHelixManager.size()); + _remoteRoutingManagers = new HashMap<>(); + + for (Map.Entry entry : _remoteSpectatorHelixManager.entrySet()) { + String clusterName = entry.getKey(); + try { + RemoteClusterBrokerRoutingManager routingManager = + new RemoteClusterBrokerRoutingManager(clusterName, _brokerMetrics, _serverRoutingStatsManager, _brokerConf); + routingManager.init(entry.getValue()); + _remoteRoutingManagers.put(clusterName, routingManager); + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to initialize routing manager for cluster '{}'", clusterName, e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + if (_remoteRoutingManagers.isEmpty()) { + LOGGER.error("[multi-cluster] Failed to initialize any routing managers - federated routing unavailable"); + } else { + _multiClusterRoutingManager = new MultiClusterRoutingManager(_routingManager, + new ArrayList<>(_remoteRoutingManagers.values())); + LOGGER.info("[multi-cluster] Created federated routing manager with {}/{} remote clusters", + _remoteRoutingManagers.size(), _remoteSpectatorHelixManager.size()); + } + } + + private void initRemoteClusterFederationProvider(TableCache primaryTableCache, boolean caseInsensitive) { + if (_multiClusterRoutingManager == null) { + LOGGER.info("[multi-cluster] Federation is not enabled - FederationProvider will be null"); + _multiClusterRoutingContext = null; + return; + } + + Map tableCacheMap = new HashMap<>(); + tableCacheMap.put(_clusterName, primaryTableCache); + + if (_remoteSpectatorHelixManager == null || _remoteSpectatorHelixManager.isEmpty()) { + LOGGER.info("[multi-cluster] No remote spectator Helix managers - " + + "creating provider with primary cluster only"); + _multiClusterRoutingContext = null; + return; + } + + LOGGER.info("[multi-cluster] Initializing federation provider with {} remote clusters", + _remoteSpectatorHelixManager.size()); + + for (Map.Entry entry : _remoteSpectatorHelixManager.entrySet()) { + String clusterName = entry.getKey(); + try { + TableCache remoteCache = new ZkTableCache(entry.getValue().getHelixPropertyStore(), caseInsensitive); + tableCacheMap.put(clusterName, remoteCache); + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to create table cache for cluster '{}'", clusterName, e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + _multiClusterRoutingContext = new MultiClusterRoutingContext(tableCacheMap, _routingManager, + _multiClusterRoutingManager); + LOGGER.info("[multi-cluster] Created federation provider with {}/{} clusters (1 primary + {} remote)", + tableCacheMap.size(), _remoteSpectatorHelixManager.size() + 1, tableCacheMap.size() - 1); + } + + private void initRemoteClusterRouting() { + if (_remoteRoutingManagers == null || _remoteRoutingManagers.isEmpty()) { + LOGGER.info("[multi-cluster] No remote routing managers - skipping routing table initialization"); + return; + } + + LOGGER.info("[multi-cluster] Initializing routing tables for {} remote clusters", + _remoteRoutingManagers.size()); + int initialized = 0; + + for (Map.Entry entry : _remoteRoutingManagers.entrySet()) { + try { + entry.getValue().determineRoutingChangeForTables(); + initialized++; + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to initialize routing tables for cluster '{}'", entry.getKey(), e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + LOGGER.info("[multi-cluster] Initialized routing tables for {}/{} remote clusters", + initialized, _remoteRoutingManagers.size()); + } + + private void initRemoteClusterChangeMediator() throws Exception { + if (_remoteSpectatorHelixManager == null || _remoteSpectatorHelixManager.isEmpty()) { + LOGGER.info("[multi-cluster] No remote spectator Helix managers - skipping cluster change mediator init"); + return; + } + + if (_remoteRoutingManagers == null || _remoteRoutingManagers.isEmpty()) { + LOGGER.error("[multi-cluster] Remote routing managers not initialized - " + + "cannot create cluster change mediators"); + return; + } + + LOGGER.info("[multi-cluster] Initializing cluster change mediators for {} remote clusters", + _remoteSpectatorHelixManager.size()); + _remoteClusterChangeMediator = new HashMap<>(); + + for (String clusterName : _remoteSpectatorHelixManager.keySet()) { + RemoteClusterBrokerRoutingManager routingManager = _remoteRoutingManagers.get(clusterName); + if (routingManager == null) { + LOGGER.error("[multi-cluster] Routing manager not found for cluster '{}' - skipping mediator setup", + clusterName); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + continue; + } + + try { + Map> handlers = new HashMap<>(); + handlers.put(ChangeType.CLUSTER_CONFIG, new ArrayList<>()); + handlers.put(ChangeType.IDEAL_STATE, Collections.singletonList(routingManager)); + handlers.put(ChangeType.EXTERNAL_VIEW, Collections.singletonList(routingManager)); + handlers.put(ChangeType.INSTANCE_CONFIG, Collections.singletonList(routingManager)); + handlers.put(ChangeType.RESOURCE_CONFIG, Collections.singletonList(routingManager)); + + ClusterChangeMediator mediator = new ClusterChangeMediator(handlers, _brokerMetrics); + mediator.start(); + _remoteClusterChangeMediator.put(clusterName, mediator); + + HelixManager helixManager = _remoteSpectatorHelixManager.get(clusterName); + helixManager.addIdealStateChangeListener(mediator); + helixManager.addExternalViewChangeListener(mediator); + helixManager.addInstanceConfigChangeListener(mediator); + helixManager.addClusterfigChangeListener(mediator); + } catch (Exception e) { + LOGGER.error("[multi-cluster] Failed to initialize cluster change mediator for cluster '{}'", clusterName, e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_CLUSTER_BROKER_STARTUP_FAILURE, 1); + } + } + + LOGGER.info("[multi-cluster] Initialized {}/{} cluster change mediators", _remoteClusterChangeMediator.size(), + _remoteSpectatorHelixManager.size()); + } + + @Override + protected MultiClusterRoutingContext getMultiClusterRoutingContext() { + initRemoteClusterFederationProvider(_tableCache, + _brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE)); + return _multiClusterRoutingContext; + } + + @Override + public void stop() { + LOGGER.info("[multi-cluster] Shutting down multi-cluster broker"); + super.stop(); + stopRemoteClusterComponents(); + LOGGER.info("[multi-cluster] Multi-cluster broker shut down successfully"); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 93c4270eb7c4..14c8f8297e34 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -178,6 +178,9 @@ public class BrokerMeter implements AbstractMetrics.Meter { public static final BrokerMeter HELIX_ZOOKEEPER_RECONNECTS = create("HELIX_ZOOKEEPER_RECONNECTS", "reconnects", true); + public static final BrokerMeter MULTI_CLUSTER_BROKER_STARTUP_FAILURE = create( + "MULTI_CLUSTER_BROKER_STARTUP_FAILURE", "failureCount", true); + public static final BrokerMeter REQUEST_DROPPED_DUE_TO_ACCESS_ERROR = create( "REQUEST_DROPPED_DUE_TO_ACCESS_ERROR", "requestsDropped", false); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java new file mode 100644 index 000000000000..1f139da56acf --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/MultiClusterRoutingContext.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.routing; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.config.provider.TableCache; + + +/** + * A generic class which provides the dependencies for federation routing. + * This class is responsible for managing routing managers and providing the appropriate + * routing manager based on query options (e.g., whether federation is enabled). + */ +public class MultiClusterRoutingContext { + // Maps clusterName to TableCache. Includes the local and all remote clusters. + private final Map _tableCacheMap; + + // Local + private final RoutingManager _localRoutingManager; + + // Federated routing manager (for federated queries, may be null if federation is not configured) + @Nullable + private final RoutingManager _multiClusterRoutingManager; + + /** + * Constructor for FederationProvider with routing managers. + * + * @param tableCacheMap Map of cluster name to TableCache + * @param localRoutingManager Local routing manager for non-federated queries + * @param multiClusterRoutingManager Multi cluster routing manager for cross-cluster queries (can be null) + */ + public MultiClusterRoutingContext(Map tableCacheMap, RoutingManager localRoutingManager, + @Nullable RoutingManager multiClusterRoutingManager) { + _tableCacheMap = tableCacheMap; + _localRoutingManager = localRoutingManager; + _multiClusterRoutingManager = multiClusterRoutingManager; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java new file mode 100644 index 000000000000..e9ac332e2e99 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/multicluster/MultiClusterIntegrationTest.java @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.multicluster; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.http.HttpStatus; +import org.apache.pinot.broker.broker.helix.BaseBrokerStarter; +import org.apache.pinot.broker.broker.helix.MultiClusterHelixBrokerStarter; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.ZkStarter; +import org.apache.pinot.controller.BaseControllerStarter; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; +import org.apache.pinot.integration.tests.ClusterTest; +import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Broker; +import org.apache.pinot.spi.utils.CommonConstants.Helix; +import org.apache.pinot.spi.utils.CommonConstants.Server; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.NetUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public class MultiClusterIntegrationTest extends ClusterTest { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiClusterIntegrationTest.class); + + protected static final String SCHEMA_FILE = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema"; + protected static final String TIME_COLUMN = "DaysSinceEpoch"; + // TODO: N clusters instead of 2 in future iterations. + protected static final String CLUSTER_1_NAME = "DualIsolatedCluster1"; + protected static final String CLUSTER_2_NAME = "DualIsolatedCluster2"; + protected static final ClusterConfig CLUSTER_1_CONFIG = new ClusterConfig(CLUSTER_1_NAME, 30000); + protected static final ClusterConfig CLUSTER_2_CONFIG = new ClusterConfig(CLUSTER_2_NAME, 40000); + + protected ClusterComponents _cluster1; + protected ClusterComponents _cluster2; + protected List _cluster1AvroFiles; + protected List _cluster2AvroFiles; + + @BeforeClass + public void setUp() throws Exception { + LOGGER.info("Setting up MultiClusterIntegrationTest"); + + // Initialize cluster components + _cluster1 = new ClusterComponents(); + _cluster2 = new ClusterComponents(); + + // Setup directories + setupDirectories(); + + // Start ZooKeeper instances for both clusters + startZookeeper(_cluster1); + startZookeeper(_cluster2); + + // Start controllers for both clusters + startControllerInit(_cluster1, CLUSTER_1_CONFIG); + startControllerInit(_cluster2, CLUSTER_2_CONFIG); + + // Start brokers and servers for both clusters + // Note: Each cluster's broker is configured to know about the other cluster as remote + startCluster(_cluster1, _cluster2, CLUSTER_1_CONFIG); + startCluster(_cluster2, _cluster1, CLUSTER_2_CONFIG); + + LOGGER.info("MultiClusterIntegrationTest setup complete"); + } + + // TODO: Add more tests for cross-cluster queries in subsequent iterations. + @Test + public void testMultiClusterBrokerStartsAndIsQueryable() throws Exception { + LOGGER.info("Testing that multi-cluster broker starts successfully and is queryable"); + + // Verify both clusters' brokers are running (MultiClusterHelixBrokerStarter) + assertNotNull(_cluster1._brokerStarter, "Cluster 1 broker should be started"); + assertNotNull(_cluster2._brokerStarter, "Cluster 2 broker should be started"); + assertTrue(_cluster1._brokerStarter instanceof MultiClusterHelixBrokerStarter, + "Cluster 1 broker should be MultiClusterHelixBrokerStarter"); + assertTrue(_cluster2._brokerStarter instanceof MultiClusterHelixBrokerStarter, + "Cluster 2 broker should be MultiClusterHelixBrokerStarter"); + + // Setup a test table on both clusters + String testTableName = "multicluster_test_table"; + createSchemaAndTableOnBothClusters(testTableName); + + // Create and load test data into both clusters + _cluster1AvroFiles = createAvroData(100, 1); + _cluster2AvroFiles = createAvroData(100, 2); + + loadDataIntoCluster(_cluster1AvroFiles, testTableName, _cluster1); + loadDataIntoCluster(_cluster2AvroFiles, testTableName, _cluster2); + + // Verify cluster 1 is queryable + String query = "SELECT COUNT(*) FROM " + testTableName; + String result1 = executeQuery(query, _cluster1); + assertNotNull(result1, "Query result from cluster 1 should not be null"); + long count1 = parseCountResult(result1); + assertEquals(count1, 100, "Cluster 1 should have 100 records"); + + // Verify cluster 2 is queryable + String result2 = executeQuery(query, _cluster2); + assertNotNull(result2, "Query result from cluster 2 should not be null"); + long count2 = parseCountResult(result2); + assertEquals(count2, 100, "Cluster 2 should have 100 records"); + + LOGGER.info("Multi-cluster broker test passed: both clusters started and queryable"); + } + + @Override + protected BaseBrokerStarter createBrokerStarter() { + return new MultiClusterHelixBrokerStarter(); + } + + protected static class ClusterConfig { + final String _name; + final int _basePort; + + ClusterConfig(String name, int basePort) { + _name = name; + _basePort = basePort; + } + } + + protected static class ClusterComponents { + ZkStarter.ZookeeperInstance _zkInstance; + BaseControllerStarter _controllerStarter; + BaseBrokerStarter _brokerStarter; + BaseServerStarter _serverStarter; + int _controllerPort; + int _brokerPort; + int _serverPort; + String _zkUrl; + String _controllerBaseApiUrl; + File _tempDir; + File _segmentDir; + File _tarDir; + } + + protected void setupDirectories() throws Exception { + setupClusterDirectories(_cluster1, "cluster1"); + setupClusterDirectories(_cluster2, "cluster2"); + } + + private void setupClusterDirectories(ClusterComponents cluster, String clusterPrefix) throws Exception { + cluster._tempDir = new File(FileUtils.getTempDirectory(), clusterPrefix + "_" + getClass().getSimpleName()); + cluster._segmentDir = new File(cluster._tempDir, "segmentDir"); + cluster._tarDir = new File(cluster._tempDir, "tarDir"); + TestUtils.ensureDirectoriesExistAndEmpty(cluster._tempDir, cluster._segmentDir, cluster._tarDir); + } + + protected void startZookeeper(ClusterComponents cluster) throws Exception { + cluster._zkInstance = ZkStarter.startLocalZkServer(); + cluster._zkUrl = cluster._zkInstance.getZkUrl(); + } + + protected void startControllerInit(ClusterComponents cluster, ClusterConfig config) throws Exception { + cluster._controllerPort = findAvailablePort(config._basePort); + startController(cluster, config); + } + + protected void startCluster(ClusterComponents cluster, ClusterComponents remoteCluster, + ClusterConfig config) throws Exception { + cluster._brokerPort = findAvailablePort(cluster._controllerPort + 1000); + startBroker(cluster, remoteCluster, config); + cluster._serverPort = findAvailablePort(cluster._brokerPort + 1000); + startServerWithMSE(cluster, config); + } + + protected void startController(ClusterComponents cluster, ClusterConfig config) throws Exception { + Map controllerConfig = new HashMap<>(); + controllerConfig.put(ControllerConf.ZK_STR, cluster._zkUrl); + controllerConfig.put(ControllerConf.HELIX_CLUSTER_NAME, config._name); + controllerConfig.put(ControllerConf.CONTROLLER_HOST, ControllerTest.LOCAL_HOST); + controllerConfig.put(ControllerConf.CONTROLLER_PORT, cluster._controllerPort); + controllerConfig.put(ControllerConf.DATA_DIR, cluster._tempDir.getAbsolutePath()); + controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, cluster._tempDir.getAbsolutePath()); + controllerConfig.put(ControllerConf.DISABLE_GROOVY, false); + controllerConfig.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false); + controllerConfig.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC"); + + cluster._controllerStarter = createControllerStarter(); + cluster._controllerStarter.init(new PinotConfiguration(controllerConfig)); + cluster._controllerStarter.start(); + cluster._controllerBaseApiUrl = "http://localhost:" + cluster._controllerPort; + } + + protected void startBroker(ClusterComponents cluster, ClusterComponents remoteCluster, + ClusterConfig config) throws Exception { + PinotConfiguration brokerConfig = new PinotConfiguration(); + brokerConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl); + String remoteClusterName = CLUSTER_1_NAME.equalsIgnoreCase(config._name) ? CLUSTER_2_NAME : CLUSTER_1_NAME; + brokerConfig.setProperty(Helix.CONFIG_OF_REMOTE_CLUSTER_NAMES, remoteClusterName); + brokerConfig.setProperty(String.format(Helix.CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS, remoteClusterName), + remoteCluster._zkUrl); + brokerConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name); + brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_HOSTNAME, ControllerTest.LOCAL_HOST); + brokerConfig.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, cluster._brokerPort); + brokerConfig.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L); + brokerConfig.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0); + brokerConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC"); + cluster._brokerStarter = createBrokerStarter(); + cluster._brokerStarter.init(brokerConfig); + cluster._brokerStarter.start(); + } + + protected void startServerWithMSE(ClusterComponents cluster, ClusterConfig config) throws Exception { + PinotConfiguration serverConfig = new PinotConfiguration(); + serverConfig.setProperty(Helix.CONFIG_OF_ZOOKEEPER_SERVER, cluster._zkUrl); + serverConfig.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, config._name); + serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_HOST, ControllerTest.LOCAL_HOST); + serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_DATA_DIR, cluster._tempDir + "/dataDir"); + serverConfig.setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, cluster._tempDir + "/segmentTar"); + serverConfig.setProperty(Server.CONFIG_OF_SEGMENT_FORMAT_VERSION, "v3"); + serverConfig.setProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, false); + serverConfig.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, findAvailablePort(cluster._serverPort)); + serverConfig.setProperty(Helix.KEY_OF_SERVER_NETTY_PORT, findAvailablePort(cluster._serverPort + 1)); + serverConfig.setProperty(Server.CONFIG_OF_GRPC_PORT, findAvailablePort(cluster._serverPort + 2)); + serverConfig.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true); + serverConfig.setProperty(CommonConstants.CONFIG_OF_TIMEZONE, "UTC"); + serverConfig.setProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true); + + cluster._serverStarter = createServerStarter(); + cluster._serverStarter.init(serverConfig); + cluster._serverStarter.start(); + } + + protected int findAvailablePort(int basePort) { + try { + return NetUtils.findOpenPort(basePort); + } catch (Exception e) { + throw new RuntimeException("Failed to find available port starting from " + basePort, e); + } + } + + protected List createAvroData(int dataSize, int clusterId) throws Exception { + return createAvroDataMultipleSegments(dataSize, clusterId, 1); + } + + protected List createAvroDataMultipleSegments(int totalDataSize, int clusterId, int numSegments) + throws Exception { + Schema schema = createSchema(SCHEMA_FILE); + org.apache.avro.Schema avroSchema = createAvroSchema(schema); + File tempDir = (clusterId == 1) ? _cluster1._tempDir : _cluster2._tempDir; + List avroFiles = new ArrayList<>(); + + for (int segment = 0; segment < numSegments; segment++) { + File avroFile = new File(tempDir, "cluster" + clusterId + "_data_segment" + segment + ".avro"); + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + writer.create(avroSchema, avroFile); + int start = segment * (totalDataSize / numSegments); + int end = (segment == numSegments - 1) ? totalDataSize : (segment + 1) * (totalDataSize / numSegments); + for (int i = start; i < end; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + record.put(fieldSpec.getName(), generateFieldValue(fieldSpec.getName(), i, clusterId, + fieldSpec.getDataType())); + } + writer.append(record); + } + } + avroFiles.add(avroFile); + } + return avroFiles; + } + + private org.apache.avro.Schema createAvroSchema(Schema schema) { + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + List fields = new ArrayList<>(); + + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + org.apache.avro.Schema.Type avroType = getAvroType(fieldSpec.getDataType()); + fields.add(new org.apache.avro.Schema.Field(fieldSpec.getName(), + org.apache.avro.Schema.create(avroType), null, null)); + } + avroSchema.setFields(fields); + return avroSchema; + } + + private org.apache.avro.Schema.Type getAvroType(FieldSpec.DataType type) { + switch (type) { + case INT: return org.apache.avro.Schema.Type.INT; + case LONG: return org.apache.avro.Schema.Type.LONG; + case FLOAT: return org.apache.avro.Schema.Type.FLOAT; + case DOUBLE: return org.apache.avro.Schema.Type.DOUBLE; + case BOOLEAN: return org.apache.avro.Schema.Type.BOOLEAN; + default: return org.apache.avro.Schema.Type.STRING; + } + } + + private Object generateFieldValue(String fieldName, int index, int clusterId, FieldSpec.DataType dataType) { + int baseValue = index + (clusterId * 10000); + switch (dataType) { + case INT: return index + 10000; + case LONG: return (long) baseValue; + case FLOAT: return (float) (baseValue + 0.1); + case DOUBLE: return (double) (baseValue + 0.1); + case BOOLEAN: return (baseValue % 2) == 0; + default: return "cluster_" + fieldName + "_" + index; + } + } + + protected void loadDataIntoCluster(List avroFiles, String tableName, ClusterComponents cluster) + throws Exception { + cleanDirectories(cluster._segmentDir, cluster._tarDir); + Schema schema = createSchema(SCHEMA_FILE); + schema.setSchemaName(tableName); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(tableName) + .setTimeColumnName(TIME_COLUMN) + .build(); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, + cluster._segmentDir, cluster._tarDir); + uploadSegmentsToCluster(tableName, cluster._tarDir, cluster._controllerBaseApiUrl); + Thread.sleep(2000); + } + + private void cleanDirectories(File... dirs) { + for (File dir : dirs) { + try { + FileUtils.cleanDirectory(dir); + } catch (IOException e) { + // Ignore cleanup errors + } + } + } + + protected void uploadSegmentsToCluster(String tableName, File tarDir, String controllerBaseApiUrl) throws Exception { + File[] segmentTarFiles = tarDir.listFiles(); + assertNotNull(segmentTarFiles); + assertTrue(segmentTarFiles.length > 0); + + URI uploadSegmentHttpURI = URI.create(controllerBaseApiUrl + "/segments"); + + try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { + for (File segmentTarFile : segmentTarFiles) { + int status = fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, + segmentTarFile.getName(), segmentTarFile, List.of(), tableName, TableType.OFFLINE) + .getStatusCode(); + assertEquals(status, HttpStatus.SC_OK); + } + } + + Thread.sleep(3000); + } + + + protected void createSchemaAndTableForCluster(String tableName, String controllerBaseApiUrl) throws IOException { + Schema schema = createSchema(SCHEMA_FILE); + schema.setSchemaName(tableName); + addSchemaToCluster(schema, controllerBaseApiUrl); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(tableName) + .setTimeColumnName(TIME_COLUMN) + .build(); + addTableConfigToCluster(tableConfig, controllerBaseApiUrl); + } + + protected void createSchemaAndTableOnBothClusters(String tableName) throws Exception { + dropTableAndSchemaIfExists(tableName, _cluster1._controllerBaseApiUrl); + dropTableAndSchemaIfExists(tableName, _cluster2._controllerBaseApiUrl); + createSchemaAndTableForCluster(tableName, _cluster1._controllerBaseApiUrl); + createSchemaAndTableForCluster(tableName, _cluster2._controllerBaseApiUrl); + } + + protected void dropTableAndSchemaIfExists(String tableName, String controllerBaseApiUrl) { + dropResource(controllerBaseApiUrl + "/tables/" + tableName); + dropResource(controllerBaseApiUrl + "/schemas/" + tableName); + } + + private void dropResource(String url) { + try { + ControllerTest.sendDeleteRequest(url); + } catch (Exception e) { + // Ignore + } + } + + protected void addSchemaToCluster(Schema schema, String controllerBaseApiUrl) throws IOException { + String url = controllerBaseApiUrl + "/schemas"; + String schemaJson = schema.toPrettyJsonString(); + String response = ControllerTest.sendPostRequest(url, schemaJson); + assertNotNull(response); + } + + protected void addTableConfigToCluster(TableConfig tableConfig, String controllerBaseApiUrl) throws IOException { + String url = controllerBaseApiUrl + "/tables"; + String tableConfigJson = JsonUtils.objectToPrettyString(tableConfig); + String response = ControllerTest.sendPostRequest(url, tableConfigJson); + assertNotNull(response); + } + + protected String executeQuery(String query, ClusterComponents cluster) throws Exception { + Map payload = Map.of("sql", query); + String url = "http://localhost:" + cluster._brokerPort + "/query/sql"; + return ControllerTest.sendPostRequest(url, JsonUtils.objectToPrettyString(payload)); + } + + protected long parseCountResult(String result) { + try { + JsonNode rows = JsonMapper.builder().build().readTree(result).path("resultTable").path("rows"); + if (rows.isArray() && rows.size() > 0) { + JsonNode firstRow = rows.get(0); + if (firstRow.isArray() && firstRow.size() > 0) { + return Long.parseLong(firstRow.get(0).asText()); + } + } + } catch (Exception e) { + // Ignore + } + return 0; + } + + protected Schema createSchema(String schemaFileName) throws IOException { + InputStream schemaInputStream = getClass().getClassLoader().getResourceAsStream(schemaFileName); + assertNotNull(schemaInputStream, "Schema file not found: " + schemaFileName); + return Schema.fromInputStream(schemaInputStream); + } + + @AfterClass + public void tearDown() throws Exception { + stopCluster(_cluster1); + stopCluster(_cluster2); + } + + private void stopCluster(ClusterComponents cluster) { + if (cluster == null) { + return; + } + try { + if (cluster._serverStarter != null) { + cluster._serverStarter.stop(); + } + if (cluster._brokerStarter != null) { + cluster._brokerStarter.stop(); + } + if (cluster._controllerStarter != null) { + cluster._controllerStarter.stop(); + } + if (cluster._zkInstance != null) { + ZkStarter.stopLocalZkServer(cluster._zkInstance); + } + FileUtils.deleteQuietly(cluster._tempDir); + } catch (Exception e) { + LOGGER.warn("Error stopping cluster", e); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index b2fc58b11c27..3004cafdc732 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -251,6 +251,9 @@ public static class Instance { @Deprecated(since = "1.5.0", forRemoval = true) public static final String CONFIG_OF_ZOOKEEPR_SERVER = "pinot.zk.server"; + public static final String CONFIG_OF_REMOTE_CLUSTER_NAMES = "pinot.remote.cluster.names"; + public static final String CONFIG_OF_REMOTE_ZOOKEEPER_SERVERS = "pinot.remote.zk.server.%s"; + public static final String CONFIG_OF_PINOT_CONTROLLER_STARTABLE_CLASS = "pinot.controller.startable.class"; public static final String CONFIG_OF_PINOT_BROKER_STARTABLE_CLASS = "pinot.broker.startable.class"; public static final String CONFIG_OF_PINOT_SERVER_STARTABLE_CLASS = "pinot.server.startable.class";