From 059bd7dcec888d6b6486ca2a2dffd9f3df303a1c Mon Sep 17 00:00:00 2001 From: Au_Miner <358671982@qq.com> Date: Thu, 30 Apr 2026 11:28:47 +0800 Subject: [PATCH] [FLINK-36779][table] Fix metric is incorrect and non-changing during the time in Rank --- .../operators/rank/AbstractTopNFunction.java | 26 ++++++---- .../operators/rank/UpdatableTopNFunction.java | 2 +- .../rank/utils/AppendOnlyTopNHelper.java | 2 +- .../operators/rank/utils/FastTop1Helper.java | 2 +- .../rank/AppendOnlyTopNFunctionTest.java | 32 ++++++++++++ .../operators/rank/FastTop1FunctionTest.java | 31 ++++++++++++ .../operators/rank/TopNFunctionTestBase.java | 49 +++++++++++++++++++ .../rank/UpdatableTopNFunctionTest.java | 29 +++++++++++ 8 files changed, 160 insertions(+), 13 deletions(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java index 6be7035611637..3e806c44060b5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java @@ -43,6 +43,7 @@ import java.util.Comparator; import java.util.Objects; import java.util.function.Function; +import java.util.function.LongSupplier; /** * Base class for TopN Function. @@ -209,23 +210,28 @@ protected boolean checkSortKeyInBufferRange(RowData sortKey, TopNBuffer buffer) return buffer.checkSortKeyInBufferRange(sortKey, getDefaultTopNSize()); } - protected void registerMetric(long heapSize) { - registerMetric(heapSize, requestCount, hitCount); + protected void registerMetric(LongSupplier heapSizeSupplier) { + registerMetric(heapSizeSupplier, () -> requestCount, () -> hitCount); } - protected void registerMetric(long heapSize, long requestCount, long hitCount) { + protected void registerMetric( + LongSupplier heapSizeSupplier, + LongSupplier requestCountSupplier, + LongSupplier hitCountSupplier) { getRuntimeContext() .getMetricGroup() .>gauge( "topn.cache.hitRate", - () -> - requestCount == 0 - ? 1.0 - : Long.valueOf(hitCount).doubleValue() / requestCount); + () -> { + long requests = requestCountSupplier.getAsLong(); + return requests == 0 + ? 1.0 + : (double) hitCountSupplier.getAsLong() / requests; + }); getRuntimeContext() .getMetricGroup() - .>gauge("topn.cache.size", () -> heapSize); + .>gauge("topn.cache.size", heapSizeSupplier::getAsLong); } protected void collectInsert( @@ -386,8 +392,8 @@ public void accHitCount() { hitCount++; } - protected void registerMetric(long heapSize) { - topNFunction.registerMetric(heapSize, requestCount, hitCount); + protected void registerMetric(LongSupplier heapSizeSupplier) { + topNFunction.registerMetric(heapSizeSupplier, () -> requestCount, () -> hitCount); } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java index cce7d6a5c7553..2362e99b16c57 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java @@ -165,7 +165,7 @@ public void open(OpenContext openContext) throws Exception { dataState = getRuntimeContext().getMapState(mapStateDescriptor); // metrics - registerMetric(cacheSize); + registerMetric(() -> kvRowKeyMap.size() * getDefaultTopNSize()); } @Override diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java index 62b9e7d470d3c..1b6d7302826f2 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java @@ -70,7 +70,7 @@ public AppendOnlyTopNHelper(AbstractTopNFunction topNFunction, long cacheSize, l } public void registerMetric() { - registerMetric(kvSortedMap.size() * topNSize); + registerMetric(() -> kvSortedMap.size() * topNSize); } @Nullable diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java index 7632dc136131b..6053f2bfd9a39 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java @@ -126,6 +126,6 @@ public void flushAllCacheToState() throws Exception { public abstract void flushBufferToState(RowData currentKey, RowData value) throws Exception; public void registerMetric() { - registerMetric(kvCache.size() * topNSize); + registerMetric(() -> kvCache.size() * topNSize); } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java index f124e9700e53d..14325d708aec7 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.runtime.operators.rank; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction; @@ -29,6 +30,8 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeFalse; /** Tests for {@link AppendOnlyTopNFunction} and {@link AsyncStateAppendOnlyTopNFunction}. */ class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase { @@ -95,4 +98,33 @@ void testVariableRankRange() throws Exception { assertorWithoutRowNumber.assertOutputEquals( "output wrong.", expectedOutput, testHarness.getOutput()); } + + /** + * Verifies that {@code topn.cache.size} reflects the live cache state instead of staying at 0 + * for {@link AppendOnlyTopNFunction}. + */ + @TestTemplate + void testCacheMetricsReflectLiveState() throws Exception { + // async harness does not allow injecting a custom metric group; sync path covers the fix. + assumeFalse(enableAsyncState); + AbstractTopNFunction func = + createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, false); + InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup(); + OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithMetrics(func, metricGroup); + testHarness.open(); + // no partition cached yet -> cache size is 0. + assertThat(readCacheSizeMetric(metricGroup)).isZero(); + // no requests issued yet -> hit rate falls back to 1.0. + assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0); + + testHarness.processElement(insertRecord("book", 1L, 12)); + testHarness.processElement(insertRecord("book", 2L, 19)); + testHarness.processElement(insertRecord("fruit", 4L, 33)); + // 2 partitions cached -> live size = 2 * topN(2) = 4. + assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(4L); + // hit rate is now driven by live counters and lies in [0.0, 1.0]. + assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0); + testHarness.close(); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java index 90784dcfd348c..dc1c1056855cc 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.operators.rank; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function; @@ -31,6 +32,8 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeFalse; /** Tests for {@link FastTop1Function} and {@link AsyncStateFastTop1Function}. */ public class FastTop1FunctionTest extends TopNFunctionTestBase { @@ -376,4 +379,32 @@ void testConstantRankRangeWithOffset() throws Exception { void testOutputRankNumberWithVariableRankRange() throws Exception { // skip } + + /** + * Verifies that {@code topn.cache.size} reflects the live cache state instead of staying at 0 + * for {@link FastTop1Function}. + */ + @TestTemplate + void testCacheMetricsReflectLiveState() throws Exception { + // async harness does not allow injecting a custom metric group; sync path covers the fix. + assumeFalse(enableAsyncState); + AbstractTopNFunction func = + createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 1), true, false); + InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup(); + OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithMetrics(func, metricGroup); + testHarness.open(); + // no partition cached yet -> cache size is 0. + assertThat(readCacheSizeMetric(metricGroup)).isZero(); + // no requests issued yet -> hit rate falls back to 1.0. + assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0); + + testHarness.processElement(insertRecord("book", 1L, 12)); + testHarness.processElement(insertRecord("fruit", 4L, 33)); + // 2 partitions cached -> live size = 2 * topN(1) = 2. + assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(2L); + // hit rate is now driven by live counters and lies in [0.0, 1.0]. + assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0); + testHarness.close(); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java index 2a444117c60e8..0afbc96534343 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java @@ -19,8 +19,15 @@ package org.apache.flink.table.runtime.operators.rank; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -53,11 +60,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assumptions.assumeFalse; @@ -425,4 +434,44 @@ abstract AbstractTopNFunction createFunction( /** TODO remove this method after all rank function support async state. */ abstract boolean supportedAsyncState(); + + // shared helpers for verifying cache metrics in subclasses. + + static final String CACHE_SIZE_METRIC = "topn.cache.size"; + static final String CACHE_HIT_RATE_METRIC = "topn.cache.hitRate"; + + /** Creates a sync test harness that exposes the operator metric group for assertions. */ + OneInputStreamOperatorTestHarness createTestHarnessWithMetrics( + AbstractTopNFunction rankFunction, InterceptingOperatorMetricGroup operatorMetricGroup) + throws Exception { + KeyedProcessOperator operator = + new KeyedProcessOperator<>(rankFunction); + rankFunction.setKeyContext(operator); + InterceptingTaskMetricGroup taskMetricGroup = + new InterceptingTaskMetricGroup() { + @Override + public InternalOperatorMetricGroup getOrAddOperator( + OperatorID id, String name, Map additionalVariables) { + return operatorMetricGroup; + } + }; + MockEnvironment environment = + new MockEnvironmentBuilder().setMetricGroup(taskMetricGroup).build(); + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, keySelector, keySelector.getProducedType(), environment); + } + + @SuppressWarnings("unchecked") + static long readCacheSizeMetric(InterceptingOperatorMetricGroup metricGroup) { + Gauge gauge = (Gauge) metricGroup.get(CACHE_SIZE_METRIC); + assertThat(gauge).as("topn.cache.size gauge should be registered").isNotNull(); + return gauge.getValue(); + } + + @SuppressWarnings("unchecked") + static double readCacheHitRateMetric(InterceptingOperatorMetricGroup metricGroup) { + Gauge gauge = (Gauge) metricGroup.get(CACHE_HIT_RATE_METRIC); + assertThat(gauge).as("topn.cache.hitRate gauge should be registered").isNotNull(); + return gauge.getValue(); + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunctionTest.java index 736eba002c2f7..a33b563ead1a4 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunctionTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.runtime.operators.rank; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; @@ -30,6 +31,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link UpdatableTopNFunction}. */ class UpdatableTopNFunctionTest extends TopNFunctionTestBase { @@ -251,4 +253,31 @@ void testSortKeyChangesWhenNotOutputRankNumberAndNotGenerateUpdateBefore() throw assertorWithRowNumber.assertOutputEquals( "output wrong.", expectedOutput, testHarness.getOutput()); } + + /** + * Verifies that {@code topn.cache.size} reflects the live cache state instead of being stuck at + * the configured {@code cacheSize} constant for {@link UpdatableTopNFunction}. + */ + @TestTemplate + void testCacheMetricsReflectLiveState() throws Exception { + AbstractTopNFunction func = + createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, false); + InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup(); + OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithMetrics(func, metricGroup); + testHarness.open(); + // no partition cached yet -> live size is 0 (not the configured cacheSize). + assertThat(readCacheSizeMetric(metricGroup)).isZero(); + // no requests issued yet -> hit rate falls back to 1.0. + assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0); + + testHarness.processElement(insertRecord("book", 2L, 19)); + testHarness.processElement(insertRecord("book", 3L, 16)); + testHarness.processElement(insertRecord("fruit", 1L, 33)); + // 2 partitions cached -> live size = 2 * topN(2) = 4. + assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(4L); + // hit rate is now driven by live counters and lies in [0.0, 1.0]. + assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0); + testHarness.close(); + } }