From 83f65f21011abef0e44b37684ba31d1929c5b68e Mon Sep 17 00:00:00 2001 From: Washington L Braga Jr Date: Wed, 11 Jun 2025 17:13:58 -0300 Subject: [PATCH 1/3] Support skipping segment kafka configs Conflicts: bulkerapp/app/topic_manager.go --- bulkerapp/app/topic_manager.go | 31 +++++-------------------------- kafkabase/kafka_config.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/bulkerapp/app/topic_manager.go b/bulkerapp/app/topic_manager.go index 0d0bf4ff..960ef7bc 100644 --- a/bulkerapp/app/topic_manager.go +++ b/bulkerapp/app/topic_manager.go @@ -339,11 +339,7 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics } tm.allTopics = allTopics tm.staleTopics = staleTopics - err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, - map[string]string{ - "retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - }) + err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, tm.config.TopicConfig("destination")) if err != nil { metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err) @@ -352,22 +348,13 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata, nonEmptyTopics metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create multi-threaded destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err) } - err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, map[string]string{ - "cleanup.policy": "delete,compact", - "retention.ms": fmt.Sprint(tm.config.KafkaDeadTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - }) + err = tm.ensureTopic(tm.config.KafkaDestinationsDeadLetterTopicName, 1, tm.config.TopicConfig("dead")) if err != nil { metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create destination dead letter topic [%s]: %v", tm.config.KafkaDestinationsDeadLetterTopicName, err) } destinationsRetryTopicName := tm.config.KafkaDestinationsRetryTopicName - err = tm.ensureTopic(destinationsRetryTopicName, 1, map[string]string{ - "cleanup.policy": "delete,compact", - "segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes), - "retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - }) + err = tm.ensureTopic(destinationsRetryTopicName, 1, tm.config.TopicConfig("retry")) if err != nil { metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create destination retry topic [%s]: %v", destinationsRetryTopicName, err) @@ -602,11 +589,7 @@ func (tm *TopicManager) createDestinationTopic(topic string, config map[string]s errorType = "unknown stream mode" return tm.NewError("Unknown stream mode: %s for topic: %s", mode, topic) } - topicConfig := map[string]string{ - "retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - "compression.type": tm.config.KafkaTopicCompression, - } + topicConfig := tm.config.TopicConfig("destination") utils.MapPutAll(topicConfig, config) topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{ { @@ -645,11 +628,7 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str metrics.TopicManagerCreate(topic, "", "", "", "success", "").Inc() } }() - topicConfig := map[string]string{ - "compression.type": tm.config.KafkaTopicCompression, - "retention.ms": fmt.Sprint(tm.config.KafkaTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - } + topicConfig := tm.config.TopicConfig("destination") utils.MapPutAll(topicConfig, config) topicRes, err := tm.kaftaAdminClient.CreateTopics(context.Background(), []kafka.TopicSpecification{ { diff --git a/kafkabase/kafka_config.go b/kafkabase/kafka_config.go index 83c717b2..13f43447 100644 --- a/kafkabase/kafka_config.go +++ b/kafkabase/kafka_config.go @@ -24,6 +24,7 @@ type KafkaConfig struct { KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"` KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"48"` KafkaTopicSegmentHours int `mapstructure:"KAFKA_TOPIC_SEGMENT_HOURS" default:"24"` + KafkaAllowSegmentConfig bool `mapstructure:"KAFKA_ALLOW_SEGMENT_CONFIG" default:"true"` KafkaTopicPrefix string `mapstructure:"KAFKA_TOPIC_PREFIX" default:""` KafkaFetchMessageMaxBytes int `mapstructure:"KAFKA_FETCH_MESSAGE_MAX_BYTES" default:"1048576"` @@ -91,6 +92,34 @@ func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap { return kafkaConfig } +func (c *KafkaConfig) TopicConfig(mode string) map[string]string { + config := map[string]string{} + + switch mode { + case "retry": + config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000) + config["cleanup.policy"] = "delete,compact" + + if c.KafkaAllowSegmentConfig { + config["segment.bytes"] = fmt.Sprint(c.KafkaRetryTopicSegmentBytes) + config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) + } + case "dead": + config["retention.ms"] = fmt.Sprint(c.KafkaDeadTopicRetentionHours * 60 * 60 * 1000) + config["cleanup.policy"] = "delete,compact" + if c.KafkaAllowSegmentConfig { + config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) + } + default: + config["compression.type"] = c.KafkaTopicCompression + config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000) + if c.KafkaAllowSegmentConfig { + config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) + } + } + return config +} + func (c *KafkaConfig) PostInit(settings *appbase.AppSettings) error { return nil } From d7d6fd239023ef7f7b249232b6dd279e1f33d6f7 Mon Sep 17 00:00:00 2001 From: Washington L Braga Jr Date: Wed, 11 Jun 2025 18:24:43 -0300 Subject: [PATCH 2/3] use TopicConfig on batch and stream consumer --- bulkerapp/app/batch_consumer.go | 2 +- bulkerapp/app/stream_consumer.go | 2 +- bulkerapp/app/topic_manager.go | 9 --------- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/bulkerapp/app/batch_consumer.go b/bulkerapp/app/batch_consumer.go index 98392e6e..cbf1988a 100644 --- a/bulkerapp/app/batch_consumer.go +++ b/bulkerapp/app/batch_consumer.go @@ -259,7 +259,7 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition, } }() - err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.RetryTopicConfig()) + err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.config.TopicConfig("retry")) if err != nil { return counters, fmt.Errorf("failed to create retry topic %s: %v", bc.retryTopic, err) } diff --git a/bulkerapp/app/stream_consumer.go b/bulkerapp/app/stream_consumer.go index 23ac88a2..4c13b0fb 100644 --- a/bulkerapp/app/stream_consumer.go +++ b/bulkerapp/app/stream_consumer.go @@ -282,7 +282,7 @@ func (sc *StreamConsumerImpl) start() { metrics.ConnectionMessageStatuses(sc.destination.Id(), sc.tableName, "deadLettered").Inc() failedTopic = sc.config.KafkaDestinationsDeadLetterTopicName } else { - err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.RetryTopicConfig()) + err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.config.TopicConfig("retry")) if err != nil { sc.Errorf("failed to create retry topic %s: %v", sc.retryTopic, err) } diff --git a/bulkerapp/app/topic_manager.go b/bulkerapp/app/topic_manager.go index 960ef7bc..c45cce78 100644 --- a/bulkerapp/app/topic_manager.go +++ b/bulkerapp/app/topic_manager.go @@ -658,15 +658,6 @@ func (tm *TopicManager) createTopic(topic string, partitions int, config map[str return nil } -func (tm *TopicManager) RetryTopicConfig() map[string]string { - return map[string]string{ - "cleanup.policy": "delete,compact", - "segment.bytes": fmt.Sprint(tm.config.KafkaRetryTopicSegmentBytes), - "retention.ms": fmt.Sprint(tm.config.KafkaRetryTopicRetentionHours * 60 * 60 * 1000), - "segment.ms": fmt.Sprint(tm.config.KafkaTopicSegmentHours * 60 * 60 * 1000), - } -} - func (tm *TopicManager) Refresh() { select { case tm.refreshChan <- true: From b5d58a0e9017aa940e094a1202259a58773eda5a Mon Sep 17 00:00:00 2001 From: Washington L Braga Jr Date: Sun, 28 Sep 2025 11:26:37 -0300 Subject: [PATCH 3/3] set compression type for all modes --- bulkerapp/app/batch_consumer.go | 2 +- bulkerapp/app/stream_consumer.go | 2 +- kafkabase/kafka_config.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bulkerapp/app/batch_consumer.go b/bulkerapp/app/batch_consumer.go index cbf1988a..47826dd2 100644 --- a/bulkerapp/app/batch_consumer.go +++ b/bulkerapp/app/batch_consumer.go @@ -259,7 +259,7 @@ func (bc *BatchConsumerImpl) processFailed(firstPosition *kafka.TopicPartition, } }() - err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.topicManager.config.TopicConfig("retry")) + err = bc.topicManager.ensureTopic(bc.retryTopic, 1, bc.config.TopicConfig("retry")) if err != nil { return counters, fmt.Errorf("failed to create retry topic %s: %v", bc.retryTopic, err) } diff --git a/bulkerapp/app/stream_consumer.go b/bulkerapp/app/stream_consumer.go index 4c13b0fb..f83c0a1c 100644 --- a/bulkerapp/app/stream_consumer.go +++ b/bulkerapp/app/stream_consumer.go @@ -282,7 +282,7 @@ func (sc *StreamConsumerImpl) start() { metrics.ConnectionMessageStatuses(sc.destination.Id(), sc.tableName, "deadLettered").Inc() failedTopic = sc.config.KafkaDestinationsDeadLetterTopicName } else { - err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.topicManager.config.TopicConfig("retry")) + err = sc.topicManager.ensureTopic(sc.retryTopic, 1, sc.config.TopicConfig("retry")) if err != nil { sc.Errorf("failed to create retry topic %s: %v", sc.retryTopic, err) } diff --git a/kafkabase/kafka_config.go b/kafkabase/kafka_config.go index 13f43447..32a9fe7f 100644 --- a/kafkabase/kafka_config.go +++ b/kafkabase/kafka_config.go @@ -94,10 +94,11 @@ func (ac *KafkaConfig) GetKafkaConfig() *kafka.ConfigMap { func (c *KafkaConfig) TopicConfig(mode string) map[string]string { config := map[string]string{} + config["compression.type"] = c.KafkaTopicCompression switch mode { case "retry": - config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000) + config["retention.ms"] = fmt.Sprint(c.KafkaRetryTopicRetentionHours * 60 * 60 * 1000) config["cleanup.policy"] = "delete,compact" if c.KafkaAllowSegmentConfig { @@ -111,7 +112,6 @@ func (c *KafkaConfig) TopicConfig(mode string) map[string]string { config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000) } default: - config["compression.type"] = c.KafkaTopicCompression config["retention.ms"] = fmt.Sprint(c.KafkaTopicRetentionHours * 60 * 60 * 1000) if c.KafkaAllowSegmentConfig { config["segment.ms"] = fmt.Sprint(c.KafkaTopicSegmentHours * 60 * 60 * 1000)