Skip to content

[Feature] Kafka ActionStateStore should clean up pruned ActionState records #691

@da-daken

Description

@da-daken

Search before asking

  • I searched in the issues and found nothing similar.

Description

Problem

ActionState is persisted to the external ActionStateStore to support durable execution recovery. The current production store implementation is Kafka.
When a checkpoint completes

actionStateStore.pruneState(entry.getKey(), entry.getValue());

The intent is to prune action states for a Flink key up to the completed sequence number. However, the current KafkaActionStateStore.pruneState(...) implementation only removes matching entries from the in-memory actionStates cache. It does not delete the corresponding records from the Kafka topic. These historical keys are no longer useful for recovery once the completed sequence number has been covered by a successful checkpoint, but they can still accumulate indefinitely in Kafka.

Solution

After a checkpoint has completed,should delete from kafkaActionStateStore. Because the Kafka topic is configured with cleanup.policy=compact, writing a tombstone record (key = stateKey, value = null) allows Kafka log compaction to eventually remove all historical records for that ActionState key.

producer.send(new ProducerRecord<>(topic, stateKey, null));

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    feature[Issue Type] New features or improvements to existing features.priority/majorDefault priority of the PR or issue.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions