Search before asking
Description
Problem
ActionState is persisted to the external ActionStateStore to support durable execution recovery. The current production store implementation is Kafka.
When a checkpoint completes
actionStateStore.pruneState(entry.getKey(), entry.getValue());
The intent is to prune action states for a Flink key up to the completed sequence number. However, the current KafkaActionStateStore.pruneState(...) implementation only removes matching entries from the in-memory actionStates cache. It does not delete the corresponding records from the Kafka topic. These historical keys are no longer useful for recovery once the completed sequence number has been covered by a successful checkpoint, but they can still accumulate indefinitely in Kafka.
Solution
After a checkpoint has completed,should delete from kafkaActionStateStore. Because the Kafka topic is configured with cleanup.policy=compact, writing a tombstone record (key = stateKey, value = null) allows Kafka log compaction to eventually remove all historical records for that ActionState key.
producer.send(new ProducerRecord<>(topic, stateKey, null));
Are you willing to submit a PR?
Search before asking
Description
Problem
ActionState is persisted to the external ActionStateStore to support durable execution recovery. The current production store implementation is Kafka.
When a checkpoint completes
The intent is to prune action states for a Flink key up to the completed sequence number. However, the current KafkaActionStateStore.pruneState(...) implementation only removes matching entries from the in-memory actionStates cache. It does not delete the corresponding records from the Kafka topic. These historical keys are no longer useful for recovery once the completed sequence number has been covered by a successful checkpoint, but they can still accumulate indefinitely in Kafka.
Solution
After a checkpoint has completed,should delete from kafkaActionStateStore. Because the Kafka topic is configured with cleanup.policy=compact, writing a tombstone record (key = stateKey, value = null) allows Kafka log compaction to eventually remove all historical records for that ActionState key.
Are you willing to submit a PR?