diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index e3497a6ff88aa..b6e995b3d08b3 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -23,13 +23,15 @@ import org.apache.kafka.common.config.{ConfigDef, ConfigResource} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.utils.{AppInfoParser, Time} import org.apache.kafka.common.{KafkaException, Uuid} +import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.metadata.KafkaConfigSchema import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata} import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble} import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} -import org.apache.kafka.server.config.ServerTopicConfigSynonyms +import org.apache.kafka.server.config.{QuotaConfig, ServerTopicConfigSynonyms} +import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog} import org.slf4j.Logger @@ -186,6 +188,18 @@ object KafkaRaftServer { (metaPropsEnsemble, bootstrapMetadata) } + def getAllDynamicConfigNames: util.Set[String] = { + val topicConfigs = LogConfig.nonInternalConfigNames.asScala.toSet + val brokerConfigs = DynamicConfig.Broker.names.asScala.toSet + val userConfigs = QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala.toSet + val clientConfigs = QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSet + val ipConfigs = QuotaConfig.ipConfigs.names.asScala.toSet + val clientMetricsConfigs = ClientMetricsConfigs.configDef().names.asScala.toSet + val groupConfigs = GroupConfig.configDef().names.asScala.toSet + + (topicConfigs ++ brokerConfigs ++ userConfigs ++ clientConfigs ++ ipConfigs ++ clientMetricsConfigs ++ groupConfigs).asJava + } + val configSchema = new KafkaConfigSchema(Map( ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef), ConfigResource.Type.TOPIC -> LogConfig.configDefCopy,