From 5fcca8bb8c59c9d2e5a1da14c92658be1ba7bedd Mon Sep 17 00:00:00 2001 From: tony tang Date: Tue, 25 Nov 2025 17:16:38 -0600 Subject: [PATCH 1/4] Delete dynamic config that were removed by Kafka --- .../scala/kafka/server/KafkaRaftServer.scala | 4 + .../ConfigurationControlManager.java | 11 +++ .../kafka/controller/QuorumController.java | 3 + .../kafka/image/ConfigurationDelta.java | 12 +++ .../kafka/image/ConfigurationsDelta.java | 67 ++++++++++++++++- .../kafka/metadata/KafkaConfigSchema.java | 24 ++++++ .../ConfigurationControlManagerTest.java | 28 +++++++ .../kafka/image/ConfigurationsImageTest.java | 73 +++++++++++++++++++ 8 files changed, 220 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index e3497a6ff88aa..fd6e45bc8995c 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -29,7 +29,9 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble} import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} +import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.server.config.ServerTopicConfigSynonyms +import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog} import org.slf4j.Logger @@ -189,5 +191,7 @@ object KafkaRaftServer { val configSchema = new KafkaConfigSchema(Map( ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef), ConfigResource.Type.TOPIC -> LogConfig.configDefCopy, + ConfigResource.Type.GROUP -> GroupConfig.configDef(), + ConfigResource.Type.CLIENT_METRICS -> ClientMetricsConfigs.configDef(), ).asJava, ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4a0219d175949..881b655f8f0f8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -51,6 +51,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.function.Consumer; import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND; @@ -508,6 +509,16 @@ private List getParts(String value, String key, ConfigResource configRes */ public void replay(ConfigRecord record) { Type type = Type.forId(record.resourceType()); + // Filter out invalid configs + if (type != Type.UNKNOWN) { + Set validConfigNames = configSchema.validConfigNames(type); + if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) { + // Ignore the record if it's a removed/invalid config + log.debug("Ignoring ConfigRecord for {} with invalid/removed config name: {}", + new ConfigResource(type, record.resourceName()), record.name()); + return; + } + } ConfigResource configResource = new ConfigResource(type, record.resourceName()); TimelineHashMap configs = configData.get(configResource); if (configs == null) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index dfde76ecba580..cf01de036a7eb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -96,6 +96,7 @@ import org.apache.kafka.controller.metrics.QuorumControllerMetrics; import org.apache.kafka.deferred.DeferredEvent; import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.image.ConfigurationsDelta; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FinalizedControllerFeatures; @@ -1538,6 +1539,8 @@ private QuorumController( setNodeId(nodeId). setFeatureControl(featureControl). build(); + // Initialize the config schema supplier for ConfigurationsDelta to filter invalid configs + ConfigurationsDelta.setConfigSchemaSupplier(() -> configSchema); this.producerIdControlManager = new ProducerIdControlManager.Builder(). setLogContext(logContext). setSnapshotRegistry(snapshotRegistry). diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java index dc550d8c72aa1..ca37ee0d15165 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java @@ -17,12 +17,14 @@ package org.apache.kafka.image; +import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.metadata.ConfigRecord; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; /** @@ -57,7 +59,17 @@ public void deleteAll() { public ConfigurationImage apply() { Map newData = new HashMap<>(image.data().size()); + Type resourceType = image.resource().type(); + Set validConfigNames = resourceType != Type.UNKNOWN ? + ConfigurationsDelta.getValidConfigNames(resourceType) : Set.of(); + + // Filter out invalid configs from the base image for (Entry entry : image.data().entrySet()) { + if (!validConfigNames.isEmpty() && + !validConfigNames.contains(entry.getKey())) { + continue; + } + Optional change = changes.get(entry.getKey()); if (change == null) { newData.put(entry.getKey(), entry.getValue()); diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java index 0b3fcbb386722..c7debce8d86ee 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java @@ -21,17 +21,48 @@ import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.server.common.MetadataVersion; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.function.Supplier; /** * Represents changes to the configurations in the metadata image. */ public final class ConfigurationsDelta { + /** + * Supplier for KafkaConfigSchema to get valid config names whitelist by resource type. + */ + private static volatile Supplier configSchemaSupplier = null; + + /** + * Set the supplier for KafkaConfigSchema. This should be called during initialization. + */ + public static void setConfigSchemaSupplier(Supplier supplier) { + configSchemaSupplier = supplier; + } + + /** + * Get the set of valid configuration names for a given resource type. + * Returns empty set if configSchema is not initialized. + */ + static Set getValidConfigNames(Type resourceType) { + Supplier supplier = configSchemaSupplier; + if (supplier == null) { + return Set.of(); + } + KafkaConfigSchema configSchema = supplier.get(); + if (configSchema == null) { + return Set.of(); + } + return configSchema.validConfigNames(resourceType); + } + private final ConfigurationsImage image; private final Map changes = new HashMap<>(); @@ -58,8 +89,18 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) { } public void replay(ConfigRecord record) { + // Filter out invalid configs when building the image + Type resourceType = Type.forId(record.resourceType()); + if (resourceType != Type.UNKNOWN) { + Set validConfigNames = getValidConfigNames(resourceType); + if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) { + // Ignore this record + return; + } + } + ConfigResource resource = - new ConfigResource(Type.forId(record.resourceType()), record.resourceName()); + new ConfigResource(resourceType, record.resourceName()); ConfigurationImage configImage = image.resourceData().getOrDefault(resource, new ConfigurationImage(resource, Map.of())); ConfigurationDelta delta = changes.computeIfAbsent(resource, @@ -84,7 +125,11 @@ public ConfigurationsImage apply() { ConfigResource resource = entry.getKey(); ConfigurationDelta delta = changes.get(resource); if (delta == null) { - newData.put(resource, entry.getValue()); + // Filter invalid configs from the base image + ConfigurationImage filteredImage = filterBaseImage(entry.getValue()); + if (!filteredImage.isEmpty()) { + newData.put(resource, filteredImage); + } } else { ConfigurationImage newImage = delta.apply(); if (!newImage.isEmpty()) { @@ -103,6 +148,24 @@ public ConfigurationsImage apply() { return new ConfigurationsImage(newData); } + private ConfigurationImage filterBaseImage(ConfigurationImage baseImage) { + Type resourceType = baseImage.resource().type(); + Set validConfigNames = resourceType != Type.UNKNOWN ? + getValidConfigNames(resourceType) : Set.of(); + + if (validConfigNames.isEmpty()) { + return baseImage; + } + + Map filteredData = new HashMap<>(); + for (Entry entry : baseImage.data().entrySet()) { + if (validConfigNames.contains(entry.getKey())) { + filteredData.put(entry.getKey(), entry.getValue()); + } + } + return new ConfigurationImage(baseImage.resource(), filteredData); + } + @Override public String toString() { return "ConfigurationsDelta(" + diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index 7eff72cca8a15..0707a7e84af1b 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -28,9 +28,11 @@ import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; @@ -281,4 +283,26 @@ public int getStaticallyConfiguredMinInsyncReplicas(Map staticNodeCon minInsyncReplicasString, ConfigDef.Type.INT); } + + /** + * Get the set of valid dynamic configuration names for a given resource type. + * whitelists: + * - Topic: LogConfig.configDef + * - Broker: KafkaConfig.configDef + * - Group: GroupConfig.configDef + * - ClientMetrics: ClientMetricsConfigs.configDef + * + * @param type The resource type + * @return A set of valid configuration names for the given resource type + */ + public Set validConfigNames(ConfigResource.Type type) { + ConfigDef configDef = configDefs.getOrDefault(type, EMPTY_CONFIG_DEF); + Set validNames = new HashSet<>(); + for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) { + if (!configKey.internalConfig) { + validNames.add(configKey.name); + } + } + return Collections.unmodifiableSet(validNames); + } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 2c93d1100ecae..6705497bd0194 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -44,6 +44,7 @@ import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -153,6 +154,33 @@ public void testReplay() { assertEquals("x,y,z", manager.getTopicConfig(MYTOPIC.name(), "abc").value()); } + @Test + public void testReplayFiltersRemovedConfigs() { + // Create a schema that doesn't include removed configs + Map configDefs = new HashMap<>(); + ConfigDef topicConfigDef = new ConfigDef(); + topicConfigDef.define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc"); + configDefs.put(TOPIC, topicConfigDef); + KafkaConfigSchema testSchema = new KafkaConfigSchema(configDefs, Collections.emptyMap()); + + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setKafkaConfigSchema(testSchema). + build(); + + // Replay a removed config + manager.replay(new ConfigRecord(). + setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("removed.config").setValue("value")); + assertEquals(Map.of(), manager.getConfigs(MYTOPIC), "Removed config should not be in configData"); + + // Replay a valid config + manager.replay(new ConfigRecord(). + setResourceType(TOPIC.id()).setResourceName("mytopic"). + setName("abc").setValue("x,y,z")); + assertEquals(toMap(entry("abc", "x,y,z")), manager.getConfigs(MYTOPIC), + "Valid config should be in configData"); + } + @Test public void testIncrementalAlterConfigs() { ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 6300e1293e826..986327df93cb7 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.image; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -27,14 +29,19 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(value = 40) @@ -137,4 +144,70 @@ private static List getImageRecords(ConfigurationsImage im image.write(writer); return writer.records(); } + + @Test + public void testRemovedConfigFiltered() { + // Create a schema that doesn't include removed configs + Map configDefs = new HashMap<>(); + ConfigDef topicConfigDef = new ConfigDef(); + topicConfigDef.define("retention.ms", ConfigDef.Type.LONG, ConfigDef.Importance.MEDIUM, "retention ms doc"); + configDefs.put(TOPIC, topicConfigDef); + KafkaConfigSchema testSchema = new KafkaConfigSchema(configDefs, Collections.emptyMap()); + ConfigurationsDelta.setConfigSchemaSupplier(() -> testSchema); + + try { + String testTopic = "test-topic"; + String removedConfig = "message.format.version"; + String validConfig = "retention.ms"; + String validConfigValue = "604800000"; + + // Test 1: Filter removed configs from base image + Map initialData = new HashMap<>(); + Map topicConfigs = new HashMap<>(); + topicConfigs.put(removedConfig, "0.10.0"); + topicConfigs.put(validConfig, validConfigValue); + initialData.put(new ConfigResource(TOPIC, testTopic), + new ConfigurationImage(new ConfigResource(TOPIC, testTopic), topicConfigs)); + + ConfigurationsImage initialImage = new ConfigurationsImage(initialData); + ConfigurationsDelta delta = new ConfigurationsDelta(initialImage); + ConfigurationsImage finalImage = delta.apply(); + + ConfigResource topicResource = new ConfigResource(TOPIC, testTopic); + ConfigurationImage topicConfig = finalImage.resourceData().get(topicResource); + assertNotNull(topicConfig); + assertFalse(topicConfig.data().containsKey(removedConfig), "Removed config should be filtered from base image"); + assertTrue(topicConfig.data().containsKey(validConfig), "Valid config should be present"); + assertEquals(validConfigValue, topicConfig.data().get(validConfig)); + + // Test 2: Filter removed configs when replaying records + ConfigurationsDelta delta2 = new ConfigurationsDelta(ConfigurationsImage.EMPTY); + List records = new ArrayList<>(); + records.add(new ApiMessageAndVersion( + new ConfigRecord() + .setResourceType(TOPIC.id()) + .setResourceName(testTopic) + .setName(removedConfig) + .setValue("0.10.0"), + CONFIG_RECORD.highestSupportedVersion())); + records.add(new ApiMessageAndVersion( + new ConfigRecord() + .setResourceType(TOPIC.id()) + .setResourceName(testTopic) + .setName(validConfig) + .setValue(validConfigValue), + CONFIG_RECORD.highestSupportedVersion())); + + RecordTestUtils.replayAll(delta2, records); + ConfigurationsImage finalImage2 = delta2.apply(); + + ConfigurationImage topicConfig2 = finalImage2.resourceData().get(topicResource); + assertNotNull(topicConfig2); + assertFalse(topicConfig2.data().containsKey(removedConfig), "Removed config should be filtered on replay"); + assertTrue(topicConfig2.data().containsKey(validConfig), "Valid config should be present"); + assertEquals(1, topicConfig2.data().size(), "Only valid config should remain"); + } finally { + ConfigurationsDelta.setConfigSchemaSupplier(null); + } + } } From 573e4b39e5f17830911b278d1480897673c58917 Mon Sep 17 00:00:00 2001 From: tony tang Date: Tue, 25 Nov 2025 19:12:34 -0600 Subject: [PATCH 2/4] fix --- .../java/org/apache/kafka/image/ConfigurationsImageTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 986327df93cb7..7e31d16004995 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -150,7 +150,7 @@ public void testRemovedConfigFiltered() { // Create a schema that doesn't include removed configs Map configDefs = new HashMap<>(); ConfigDef topicConfigDef = new ConfigDef(); - topicConfigDef.define("retention.ms", ConfigDef.Type.LONG, ConfigDef.Importance.MEDIUM, "retention ms doc"); + topicConfigDef.define("valid.config", ConfigDef.Type.LONG, ConfigDef.Importance.MEDIUM, "doc"); configDefs.put(TOPIC, topicConfigDef); KafkaConfigSchema testSchema = new KafkaConfigSchema(configDefs, Collections.emptyMap()); ConfigurationsDelta.setConfigSchemaSupplier(() -> testSchema); @@ -158,7 +158,7 @@ public void testRemovedConfigFiltered() { try { String testTopic = "test-topic"; String removedConfig = "message.format.version"; - String validConfig = "retention.ms"; + String validConfig = "valid.config"; String validConfigValue = "604800000"; // Test 1: Filter removed configs from base image From 43f676f116f72ce780c275a16564ce41e9208716 Mon Sep 17 00:00:00 2001 From: tony tang Date: Mon, 15 Dec 2025 18:12:43 -0600 Subject: [PATCH 3/4] fix --- .../scala/kafka/server/KafkaRaftServer.scala | 16 +++- .../ConfigurationControlManager.java | 11 --- .../kafka/controller/QuorumController.java | 3 - .../kafka/image/ConfigurationDelta.java | 12 --- .../kafka/image/ConfigurationsDelta.java | 67 +---------------- .../kafka/metadata/KafkaConfigSchema.java | 24 ------ .../ConfigurationControlManagerTest.java | 28 ------- .../kafka/image/ConfigurationsImageTest.java | 73 ------------------- 8 files changed, 15 insertions(+), 219 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index fd6e45bc8995c..967af435b68a1 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -30,7 +30,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} import org.apache.kafka.coordinator.group.GroupConfig -import org.apache.kafka.server.config.ServerTopicConfigSynonyms +import org.apache.kafka.server.config.{QuotaConfig, ServerTopicConfigSynonyms} import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog} import org.slf4j.Logger @@ -188,10 +188,20 @@ object KafkaRaftServer { (metaPropsEnsemble, bootstrapMetadata) } + def getAllDynamicConfigNames: util.Set[String] = { + val topicConfigs = LogConfig.nonInternalConfigNames.asScala.toSet + val brokerConfigs = DynamicConfig.Broker.names.asScala.toSet + val userConfigs = QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala.toSet + val clientConfigs = QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSet + val ipConfigs = QuotaConfig.ipConfigs.names.asScala.toSet + val clientMetricsConfigs = ClientMetricsConfigs.configDef().names.asScala.toSet + val groupConfigs = GroupConfig.configDef().names.asScala.toSet + + (topicConfigs ++ brokerConfigs ++ userConfigs ++ clientConfigs ++ ipConfigs ++ clientMetricsConfigs ++ groupConfigs).asJava + } + val configSchema = new KafkaConfigSchema(Map( ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef), ConfigResource.Type.TOPIC -> LogConfig.configDefCopy, - ConfigResource.Type.GROUP -> GroupConfig.configDef(), - ConfigResource.Type.CLIENT_METRICS -> ClientMetricsConfigs.configDef(), ).asJava, ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 881b655f8f0f8..4a0219d175949 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -51,7 +51,6 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.function.Consumer; import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND; @@ -509,16 +508,6 @@ private List getParts(String value, String key, ConfigResource configRes */ public void replay(ConfigRecord record) { Type type = Type.forId(record.resourceType()); - // Filter out invalid configs - if (type != Type.UNKNOWN) { - Set validConfigNames = configSchema.validConfigNames(type); - if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) { - // Ignore the record if it's a removed/invalid config - log.debug("Ignoring ConfigRecord for {} with invalid/removed config name: {}", - new ConfigResource(type, record.resourceName()), record.name()); - return; - } - } ConfigResource configResource = new ConfigResource(type, record.resourceName()); TimelineHashMap configs = configData.get(configResource); if (configs == null) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index cf01de036a7eb..dfde76ecba580 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -96,7 +96,6 @@ import org.apache.kafka.controller.metrics.QuorumControllerMetrics; import org.apache.kafka.deferred.DeferredEvent; import org.apache.kafka.deferred.DeferredEventQueue; -import org.apache.kafka.image.ConfigurationsDelta; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FinalizedControllerFeatures; @@ -1539,8 +1538,6 @@ private QuorumController( setNodeId(nodeId). setFeatureControl(featureControl). build(); - // Initialize the config schema supplier for ConfigurationsDelta to filter invalid configs - ConfigurationsDelta.setConfigSchemaSupplier(() -> configSchema); this.producerIdControlManager = new ProducerIdControlManager.Builder(). setLogContext(logContext). setSnapshotRegistry(snapshotRegistry). diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java index ca37ee0d15165..dc550d8c72aa1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java @@ -17,14 +17,12 @@ package org.apache.kafka.image; -import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.metadata.ConfigRecord; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Set; /** @@ -59,17 +57,7 @@ public void deleteAll() { public ConfigurationImage apply() { Map newData = new HashMap<>(image.data().size()); - Type resourceType = image.resource().type(); - Set validConfigNames = resourceType != Type.UNKNOWN ? - ConfigurationsDelta.getValidConfigNames(resourceType) : Set.of(); - - // Filter out invalid configs from the base image for (Entry entry : image.data().entrySet()) { - if (!validConfigNames.isEmpty() && - !validConfigNames.contains(entry.getKey())) { - continue; - } - Optional change = changes.get(entry.getKey()); if (change == null) { newData.put(entry.getKey(), entry.getValue()); diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java index c7debce8d86ee..0b3fcbb386722 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java @@ -21,48 +21,17 @@ import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; -import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.server.common.MetadataVersion; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; -import java.util.function.Supplier; /** * Represents changes to the configurations in the metadata image. */ public final class ConfigurationsDelta { - /** - * Supplier for KafkaConfigSchema to get valid config names whitelist by resource type. - */ - private static volatile Supplier configSchemaSupplier = null; - - /** - * Set the supplier for KafkaConfigSchema. This should be called during initialization. - */ - public static void setConfigSchemaSupplier(Supplier supplier) { - configSchemaSupplier = supplier; - } - - /** - * Get the set of valid configuration names for a given resource type. - * Returns empty set if configSchema is not initialized. - */ - static Set getValidConfigNames(Type resourceType) { - Supplier supplier = configSchemaSupplier; - if (supplier == null) { - return Set.of(); - } - KafkaConfigSchema configSchema = supplier.get(); - if (configSchema == null) { - return Set.of(); - } - return configSchema.validConfigNames(resourceType); - } - private final ConfigurationsImage image; private final Map changes = new HashMap<>(); @@ -89,18 +58,8 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) { } public void replay(ConfigRecord record) { - // Filter out invalid configs when building the image - Type resourceType = Type.forId(record.resourceType()); - if (resourceType != Type.UNKNOWN) { - Set validConfigNames = getValidConfigNames(resourceType); - if (!validConfigNames.isEmpty() && !validConfigNames.contains(record.name())) { - // Ignore this record - return; - } - } - ConfigResource resource = - new ConfigResource(resourceType, record.resourceName()); + new ConfigResource(Type.forId(record.resourceType()), record.resourceName()); ConfigurationImage configImage = image.resourceData().getOrDefault(resource, new ConfigurationImage(resource, Map.of())); ConfigurationDelta delta = changes.computeIfAbsent(resource, @@ -125,11 +84,7 @@ public ConfigurationsImage apply() { ConfigResource resource = entry.getKey(); ConfigurationDelta delta = changes.get(resource); if (delta == null) { - // Filter invalid configs from the base image - ConfigurationImage filteredImage = filterBaseImage(entry.getValue()); - if (!filteredImage.isEmpty()) { - newData.put(resource, filteredImage); - } + newData.put(resource, entry.getValue()); } else { ConfigurationImage newImage = delta.apply(); if (!newImage.isEmpty()) { @@ -148,24 +103,6 @@ public ConfigurationsImage apply() { return new ConfigurationsImage(newData); } - private ConfigurationImage filterBaseImage(ConfigurationImage baseImage) { - Type resourceType = baseImage.resource().type(); - Set validConfigNames = resourceType != Type.UNKNOWN ? - getValidConfigNames(resourceType) : Set.of(); - - if (validConfigNames.isEmpty()) { - return baseImage; - } - - Map filteredData = new HashMap<>(); - for (Entry entry : baseImage.data().entrySet()) { - if (validConfigNames.contains(entry.getKey())) { - filteredData.put(entry.getKey(), entry.getValue()); - } - } - return new ConfigurationImage(baseImage.resource(), filteredData); - } - @Override public String toString() { return "ConfigurationsDelta(" + diff --git a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java index 0707a7e84af1b..7eff72cca8a15 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java @@ -28,11 +28,9 @@ import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; @@ -283,26 +281,4 @@ public int getStaticallyConfiguredMinInsyncReplicas(Map staticNodeCon minInsyncReplicasString, ConfigDef.Type.INT); } - - /** - * Get the set of valid dynamic configuration names for a given resource type. - * whitelists: - * - Topic: LogConfig.configDef - * - Broker: KafkaConfig.configDef - * - Group: GroupConfig.configDef - * - ClientMetrics: ClientMetricsConfigs.configDef - * - * @param type The resource type - * @return A set of valid configuration names for the given resource type - */ - public Set validConfigNames(ConfigResource.Type type) { - ConfigDef configDef = configDefs.getOrDefault(type, EMPTY_CONFIG_DEF); - Set validNames = new HashSet<>(); - for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) { - if (!configKey.internalConfig) { - validNames.add(configKey.name); - } - } - return Collections.unmodifiableSet(validNames); - } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 6705497bd0194..2c93d1100ecae 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -44,7 +44,6 @@ import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -154,33 +153,6 @@ public void testReplay() { assertEquals("x,y,z", manager.getTopicConfig(MYTOPIC.name(), "abc").value()); } - @Test - public void testReplayFiltersRemovedConfigs() { - // Create a schema that doesn't include removed configs - Map configDefs = new HashMap<>(); - ConfigDef topicConfigDef = new ConfigDef(); - topicConfigDef.define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc"); - configDefs.put(TOPIC, topicConfigDef); - KafkaConfigSchema testSchema = new KafkaConfigSchema(configDefs, Collections.emptyMap()); - - ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). - setKafkaConfigSchema(testSchema). - build(); - - // Replay a removed config - manager.replay(new ConfigRecord(). - setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("removed.config").setValue("value")); - assertEquals(Map.of(), manager.getConfigs(MYTOPIC), "Removed config should not be in configData"); - - // Replay a valid config - manager.replay(new ConfigRecord(). - setResourceType(TOPIC.id()).setResourceName("mytopic"). - setName("abc").setValue("x,y,z")); - assertEquals(toMap(entry("abc", "x,y,z")), manager.getConfigs(MYTOPIC), - "Valid config should be in configData"); - } - @Test public void testIncrementalAlterConfigs() { ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java index 7e31d16004995..6300e1293e826 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java @@ -17,11 +17,9 @@ package org.apache.kafka.image; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.image.writer.RecordListWriter; -import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -29,19 +27,14 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import static org.apache.kafka.common.config.ConfigResource.Type.BROKER; -import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(value = 40) @@ -144,70 +137,4 @@ private static List getImageRecords(ConfigurationsImage im image.write(writer); return writer.records(); } - - @Test - public void testRemovedConfigFiltered() { - // Create a schema that doesn't include removed configs - Map configDefs = new HashMap<>(); - ConfigDef topicConfigDef = new ConfigDef(); - topicConfigDef.define("valid.config", ConfigDef.Type.LONG, ConfigDef.Importance.MEDIUM, "doc"); - configDefs.put(TOPIC, topicConfigDef); - KafkaConfigSchema testSchema = new KafkaConfigSchema(configDefs, Collections.emptyMap()); - ConfigurationsDelta.setConfigSchemaSupplier(() -> testSchema); - - try { - String testTopic = "test-topic"; - String removedConfig = "message.format.version"; - String validConfig = "valid.config"; - String validConfigValue = "604800000"; - - // Test 1: Filter removed configs from base image - Map initialData = new HashMap<>(); - Map topicConfigs = new HashMap<>(); - topicConfigs.put(removedConfig, "0.10.0"); - topicConfigs.put(validConfig, validConfigValue); - initialData.put(new ConfigResource(TOPIC, testTopic), - new ConfigurationImage(new ConfigResource(TOPIC, testTopic), topicConfigs)); - - ConfigurationsImage initialImage = new ConfigurationsImage(initialData); - ConfigurationsDelta delta = new ConfigurationsDelta(initialImage); - ConfigurationsImage finalImage = delta.apply(); - - ConfigResource topicResource = new ConfigResource(TOPIC, testTopic); - ConfigurationImage topicConfig = finalImage.resourceData().get(topicResource); - assertNotNull(topicConfig); - assertFalse(topicConfig.data().containsKey(removedConfig), "Removed config should be filtered from base image"); - assertTrue(topicConfig.data().containsKey(validConfig), "Valid config should be present"); - assertEquals(validConfigValue, topicConfig.data().get(validConfig)); - - // Test 2: Filter removed configs when replaying records - ConfigurationsDelta delta2 = new ConfigurationsDelta(ConfigurationsImage.EMPTY); - List records = new ArrayList<>(); - records.add(new ApiMessageAndVersion( - new ConfigRecord() - .setResourceType(TOPIC.id()) - .setResourceName(testTopic) - .setName(removedConfig) - .setValue("0.10.0"), - CONFIG_RECORD.highestSupportedVersion())); - records.add(new ApiMessageAndVersion( - new ConfigRecord() - .setResourceType(TOPIC.id()) - .setResourceName(testTopic) - .setName(validConfig) - .setValue(validConfigValue), - CONFIG_RECORD.highestSupportedVersion())); - - RecordTestUtils.replayAll(delta2, records); - ConfigurationsImage finalImage2 = delta2.apply(); - - ConfigurationImage topicConfig2 = finalImage2.resourceData().get(topicResource); - assertNotNull(topicConfig2); - assertFalse(topicConfig2.data().containsKey(removedConfig), "Removed config should be filtered on replay"); - assertTrue(topicConfig2.data().containsKey(validConfig), "Valid config should be present"); - assertEquals(1, topicConfig2.data().size(), "Only valid config should remain"); - } finally { - ConfigurationsDelta.setConfigSchemaSupplier(null); - } - } } From 47f1d901d4c05dd5844f8e4216171381a704ad6f Mon Sep 17 00:00:00 2001 From: tony tang Date: Mon, 15 Dec 2025 18:31:48 -0600 Subject: [PATCH 4/4] fix --- core/src/main/scala/kafka/server/KafkaRaftServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index 967af435b68a1..b6e995b3d08b3 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -23,13 +23,13 @@ import org.apache.kafka.common.config.{ConfigDef, ConfigResource} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.utils.{AppInfoParser, Time} import org.apache.kafka.common.{KafkaException, Uuid} +import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.metadata.KafkaConfigSchema import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata} import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble} import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} -import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.server.config.{QuotaConfig, ServerTopicConfigSynonyms} import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}