Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Comparator;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.LongSupplier;

/**
* Base class for TopN Function.
Expand Down Expand Up @@ -209,23 +210,28 @@ protected boolean checkSortKeyInBufferRange(RowData sortKey, TopNBuffer buffer)
return buffer.checkSortKeyInBufferRange(sortKey, getDefaultTopNSize());
}

protected void registerMetric(long heapSize) {
registerMetric(heapSize, requestCount, hitCount);
protected void registerMetric(LongSupplier heapSizeSupplier) {
registerMetric(heapSizeSupplier, () -> requestCount, () -> hitCount);
}

protected void registerMetric(long heapSize, long requestCount, long hitCount) {
protected void registerMetric(
LongSupplier heapSizeSupplier,
LongSupplier requestCountSupplier,
LongSupplier hitCountSupplier) {
getRuntimeContext()
.getMetricGroup()
.<Double, Gauge<Double>>gauge(
"topn.cache.hitRate",
() ->
requestCount == 0
? 1.0
: Long.valueOf(hitCount).doubleValue() / requestCount);
() -> {
long requests = requestCountSupplier.getAsLong();
return requests == 0
? 1.0
: (double) hitCountSupplier.getAsLong() / requests;
});

getRuntimeContext()
.getMetricGroup()
.<Long, Gauge<Long>>gauge("topn.cache.size", () -> heapSize);
.<Long, Gauge<Long>>gauge("topn.cache.size", heapSizeSupplier::getAsLong);
}

protected void collectInsert(
Expand Down Expand Up @@ -386,8 +392,8 @@ public void accHitCount() {
hitCount++;
}

protected void registerMetric(long heapSize) {
topNFunction.registerMetric(heapSize, requestCount, hitCount);
protected void registerMetric(LongSupplier heapSizeSupplier) {
topNFunction.registerMetric(heapSizeSupplier, () -> requestCount, () -> hitCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void open(OpenContext openContext) throws Exception {
dataState = getRuntimeContext().getMapState(mapStateDescriptor);

// metrics
registerMetric(cacheSize);
registerMetric(() -> kvRowKeyMap.size() * getDefaultTopNSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public AppendOnlyTopNHelper(AbstractTopNFunction topNFunction, long cacheSize, l
}

public void registerMetric() {
registerMetric(kvSortedMap.size() * topNSize);
registerMetric(() -> kvSortedMap.size() * topNSize);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ public void flushAllCacheToState() throws Exception {
public abstract void flushBufferToState(RowData currentKey, RowData value) throws Exception;

public void registerMetric() {
registerMetric(kvCache.size() * topNSize);
registerMetric(() -> kvCache.size() * topNSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.runtime.operators.rank;

import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
Expand All @@ -29,6 +30,8 @@

import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

/** Tests for {@link AppendOnlyTopNFunction} and {@link AsyncStateAppendOnlyTopNFunction}. */
class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase {
Expand Down Expand Up @@ -95,4 +98,33 @@ void testVariableRankRange() throws Exception {
assertorWithoutRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
}

/**
* Verifies that {@code topn.cache.size} reflects the live cache state instead of staying at 0
* for {@link AppendOnlyTopNFunction}.
*/
@TestTemplate
void testCacheMetricsReflectLiveState() throws Exception {
// async harness does not allow injecting a custom metric group; sync path covers the fix.
assumeFalse(enableAsyncState);
AbstractTopNFunction func =
createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, false);
InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarnessWithMetrics(func, metricGroup);
testHarness.open();
// no partition cached yet -> cache size is 0.
assertThat(readCacheSizeMetric(metricGroup)).isZero();
// no requests issued yet -> hit rate falls back to 1.0.
assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0);

testHarness.processElement(insertRecord("book", 1L, 12));
testHarness.processElement(insertRecord("book", 2L, 19));
testHarness.processElement(insertRecord("fruit", 4L, 33));
// 2 partitions cached -> live size = 2 * topN(2) = 4.
assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(4L);
// hit rate is now driven by live counters and lies in [0.0, 1.0].
assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0);
testHarness.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.operators.rank;

import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function;
Expand All @@ -31,6 +32,8 @@
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

/** Tests for {@link FastTop1Function} and {@link AsyncStateFastTop1Function}. */
public class FastTop1FunctionTest extends TopNFunctionTestBase {
Expand Down Expand Up @@ -376,4 +379,32 @@ void testConstantRankRangeWithOffset() throws Exception {
void testOutputRankNumberWithVariableRankRange() throws Exception {
// skip
}

/**
* Verifies that {@code topn.cache.size} reflects the live cache state instead of staying at 0
* for {@link FastTop1Function}.
*/
@TestTemplate
void testCacheMetricsReflectLiveState() throws Exception {
// async harness does not allow injecting a custom metric group; sync path covers the fix.
assumeFalse(enableAsyncState);
AbstractTopNFunction func =
createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 1), true, false);
InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarnessWithMetrics(func, metricGroup);
testHarness.open();
// no partition cached yet -> cache size is 0.
assertThat(readCacheSizeMetric(metricGroup)).isZero();
// no requests issued yet -> hit rate falls back to 1.0.
assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0);

testHarness.processElement(insertRecord("book", 1L, 12));
testHarness.processElement(insertRecord("fruit", 4L, 33));
// 2 partitions cached -> live size = 2 * topN(1) = 2.
assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(2L);
// hit rate is now driven by live counters and lies in [0.0, 1.0].
assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0);
testHarness.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@
package org.apache.flink.table.runtime.operators.rank;

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
Expand Down Expand Up @@ -53,11 +60,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

Expand Down Expand Up @@ -425,4 +434,44 @@ abstract AbstractTopNFunction createFunction(

/** TODO remove this method after all rank function support async state. */
abstract boolean supportedAsyncState();

// shared helpers for verifying cache metrics in subclasses.

static final String CACHE_SIZE_METRIC = "topn.cache.size";
static final String CACHE_HIT_RATE_METRIC = "topn.cache.hitRate";

/** Creates a sync test harness that exposes the operator metric group for assertions. */
OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarnessWithMetrics(
AbstractTopNFunction rankFunction, InterceptingOperatorMetricGroup operatorMetricGroup)
throws Exception {
KeyedProcessOperator<RowData, RowData, RowData> operator =
new KeyedProcessOperator<>(rankFunction);
rankFunction.setKeyContext(operator);
InterceptingTaskMetricGroup taskMetricGroup =
new InterceptingTaskMetricGroup() {
@Override
public InternalOperatorMetricGroup getOrAddOperator(
OperatorID id, String name, Map<String, String> additionalVariables) {
return operatorMetricGroup;
}
};
MockEnvironment environment =
new MockEnvironmentBuilder().setMetricGroup(taskMetricGroup).build();
return new KeyedOneInputStreamOperatorTestHarness<>(
operator, keySelector, keySelector.getProducedType(), environment);
}

@SuppressWarnings("unchecked")
static long readCacheSizeMetric(InterceptingOperatorMetricGroup metricGroup) {
Gauge<Long> gauge = (Gauge<Long>) metricGroup.get(CACHE_SIZE_METRIC);
assertThat(gauge).as("topn.cache.size gauge should be registered").isNotNull();
return gauge.getValue();
}

@SuppressWarnings("unchecked")
static double readCacheHitRateMetric(InterceptingOperatorMetricGroup metricGroup) {
Gauge<Double> gauge = (Gauge<Double>) metricGroup.get(CACHE_HIT_RATE_METRIC);
assertThat(gauge).as("topn.cache.hitRate gauge should be registered").isNotNull();
return gauge.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.runtime.operators.rank;

import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;

Expand All @@ -30,6 +31,7 @@
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link UpdatableTopNFunction}. */
class UpdatableTopNFunctionTest extends TopNFunctionTestBase {
Expand Down Expand Up @@ -251,4 +253,31 @@ void testSortKeyChangesWhenNotOutputRankNumberAndNotGenerateUpdateBefore() throw
assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
}

/**
* Verifies that {@code topn.cache.size} reflects the live cache state instead of being stuck at
* the configured {@code cacheSize} constant for {@link UpdatableTopNFunction}.
*/
@TestTemplate
void testCacheMetricsReflectLiveState() throws Exception {
AbstractTopNFunction func =
createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), true, false);
InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup();
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarnessWithMetrics(func, metricGroup);
testHarness.open();
// no partition cached yet -> live size is 0 (not the configured cacheSize).
assertThat(readCacheSizeMetric(metricGroup)).isZero();
// no requests issued yet -> hit rate falls back to 1.0.
assertThat(readCacheHitRateMetric(metricGroup)).isEqualTo(1.0);

testHarness.processElement(insertRecord("book", 2L, 19));
testHarness.processElement(insertRecord("book", 3L, 16));
testHarness.processElement(insertRecord("fruit", 1L, 33));
// 2 partitions cached -> live size = 2 * topN(2) = 4.
assertThat(readCacheSizeMetric(metricGroup)).isEqualTo(4L);
// hit rate is now driven by live counters and lies in [0.0, 1.0].
assertThat(readCacheHitRateMetric(metricGroup)).isBetween(0.0, 1.0);
testHarness.close();
}
}