Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.pinot.common.version.PinotVersion;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
Expand Down Expand Up @@ -150,6 +151,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixAdmin _helixAdmin;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected HelixDataAccessor _helixDataAccessor;
protected TableCache _tableCache;
protected PinotMetricsRegistry _metricsRegistry;
protected BrokerMetrics _brokerMetrics;
protected BrokerRoutingManager _routingManager;
Expand Down Expand Up @@ -310,12 +312,7 @@ public void start()
Utils.logVersions();

LOGGER.info("Connecting spectator Helix manager");
_spectatorHelixManager =
HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.SPECTATOR, _zkServers);
_spectatorHelixManager.connect();
_helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
initSpectatorHelixManager();

LOGGER.info("Setting up broker request handler");
// Set up metric registry and broker metrics
Expand All @@ -336,8 +333,8 @@ public void start()
// Set up request handling classes
_serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, _brokerMetrics);
_serverRoutingStatsManager.init();
_routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf);
_routingManager.init(_spectatorHelixManager);
initRoutingManager();

final PinotConfiguration factoryConf = _brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
// Adding cluster name to the config so that it can be used by the AccessControlFactory
factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
Expand All @@ -353,7 +350,7 @@ public void start()
FunctionRegistry.init();
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
TableCache tableCache = new ZkTableCache(_propertyStore, caseInsensitive);
_tableCache = new ZkTableCache(_propertyStore, caseInsensitive);

LOGGER.info("Initializing Broker Event Listener Factory");
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
Expand Down Expand Up @@ -382,6 +379,9 @@ public void start()
org.apache.pinot.spi.config.instance.InstanceType.BROKER);
_threadAccountant.startWatcherTask();

// TODO: Hook multiClusterRoutingContext into request handlers subsequently.
MultiClusterRoutingContext multiClusterRoutingContext = getMultiClusterRoutingContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this initialized in base broker starter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to initialize this in BaseBrokerStarter since it would be passed to all the request handlers subsequently. Alternatively, we could separate out initializing the request handlers in a separate method which can be overriden by MultiClusterHelixBrokerStarter. (The only thing there is we would be passing nulls for multiClusterRoutingContext in BaseBrokerStarter regardless).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I thought about this. I think we can leave this be. In the next PR, can you change the local var to an Optional.ofNullable? Just so it is clear for other folks that this will be null in almost all cases (except federated)


// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
BrokerRequestIdGenerator requestIdGenerator = new BrokerRequestIdGenerator();
Expand All @@ -391,7 +391,7 @@ public void start()
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, _failureDetector, _threadAccountant);
_accessControlFactory, _queryQuotaManager, _tableCache, _failureDetector, _threadAccountant);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
Expand All @@ -402,7 +402,7 @@ public void start()
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, nettyDefaults, tlsDefaults,
_accessControlFactory, _queryQuotaManager, _tableCache, nettyDefaults, tlsDefaults,
_serverRoutingStatsManager, _failureDetector, _threadAccountant);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
Expand All @@ -416,15 +416,15 @@ public void start()
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector,
_accessControlFactory, _queryQuotaManager, _tableCache, _multiStageQueryThrottler, _failureDetector,
_threadAccountant);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine");
timeSeriesRequestHandler =
new TimeSeriesRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher, _threadAccountant);
_accessControlFactory, _queryQuotaManager, _tableCache, queryDispatcher, _threadAccountant);
}

LOGGER.info("Initializing PinotFSFactory");
Expand Down Expand Up @@ -471,6 +471,45 @@ public void start()
}

LOGGER.info("Initializing cluster change mediator");
initClusterChangeMediator();

LOGGER.info("Connecting participant Helix manager");
_participantHelixManager =
HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers);
// Register state model factory
_participantHelixManager.getStateMachineEngine()
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _routingManager,
_queryQuotaManager));
// Register user-define message handler factory
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new BrokerUserDefinedMessageHandlerFactory(_routingManager, _queryQuotaManager));
_participantHelixManager.connect();
updateInstanceConfigAndBrokerResourceIfNeeded();
_brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
() -> _participantHelixManager.isConnected() ? 1L : 0L);
_participantHelixManager.addPreConnectCallback(
() -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));

// Initializing Groovy execution security
GroovyFunctionEvaluator.configureGroovySecurity(
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));

// Register the service status handler
registerServiceStatusHandler();

_isStarting = false;
_brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);

_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);

LOGGER.info("Finish starting Pinot broker");
}

protected void initClusterChangeMediator() throws Exception {
for (ClusterChangeHandler clusterConfigChangeHandler : _clusterConfigChangeHandlers) {
clusterConfigChangeHandler.init(_spectatorHelixManager);
}
Expand Down Expand Up @@ -515,41 +554,27 @@ public void start()
if (!_liveInstanceChangeHandlers.isEmpty()) {
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
}
}

LOGGER.info("Connecting participant Helix manager");
_participantHelixManager =
HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.PARTICIPANT, _zkServers);
// Register state model factory
_participantHelixManager.getStateMachineEngine()
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _routingManager,
_queryQuotaManager));
// Register user-define message handler factory
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new BrokerUserDefinedMessageHandlerFactory(_routingManager, _queryQuotaManager));
_participantHelixManager.connect();
updateInstanceConfigAndBrokerResourceIfNeeded();
_brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
() -> _participantHelixManager.isConnected() ? 1L : 0L);
_participantHelixManager.addPreConnectCallback(
() -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));

// Initializing Groovy execution security
GroovyFunctionEvaluator.configureGroovySecurity(
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_QUERY_STATIC_ANALYZER_CONFIG,
_brokerConf.getProperty(CommonConstants.Groovy.GROOVY_ALL_STATIC_ANALYZER_CONFIG)));

// Register the service status handler
registerServiceStatusHandler();

_isStarting = false;
_brokerMetrics.addTimedValue(BrokerTimer.STARTUP_SUCCESS_DURATION_MS,
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
protected void initRoutingManager() throws Exception {
_routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf);
_routingManager.init(_spectatorHelixManager);
}

_defaultClusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
protected void initSpectatorHelixManager() throws Exception {
_spectatorHelixManager =
HelixManagerFactory.getZKHelixManager(_clusterName, _instanceId, InstanceType.SPECTATOR, _zkServers);
_spectatorHelixManager.connect();
_helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
}

LOGGER.info("Finish starting Pinot broker");
/**
* Can be overridden to inject a custom MultiClusterRoutingContext from MultiClusterBrokerStarter.
*/
protected MultiClusterRoutingContext getMultiClusterRoutingContext() {
return null;
}

/**
Expand Down
Loading
Loading