From 3b0d8746e8702fc03f0788c9a4b8fb9b00f21de2 Mon Sep 17 00:00:00 2001 From: majialong Date: Wed, 3 Dec 2025 00:31:26 +0800 Subject: [PATCH] KAFKA-18952: Fix flaky test in MonitorableSinkIntegrationTest --- .../MonitorableSinkIntegrationTest.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java index c37206902687d..e9d74836e0101 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java @@ -58,6 +58,7 @@ public class MonitorableSinkIntegrationTest { private static final int NUM_TASKS = 1; private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60); private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(30); + private static final long METRICS_CONVERGENCE_DURATION_MS = TimeUnit.SECONDS.toMillis(5); private EmbeddedConnectStandalone connect; private ConnectorHandle connectorHandle; @@ -119,11 +120,19 @@ public void testMonitorableSinkConnectorAndTask() throws Exception { // check task metric metrics = connect.connectMetrics().metrics().metrics(); - MetricName taskMetric = MonitorableSinkConnector.MonitorableSinkTask.metricsName; - assertTrue(metrics.containsKey(taskMetric)); - assertEquals(CONNECTOR_NAME, taskMetric.tags().get("connector")); - assertEquals("0", taskMetric.tags().get("task")); - assertEquals((double) NUM_RECORDS_PRODUCED, metrics.get(taskMetric).metricValue()); + MetricName taskMetricName = MonitorableSinkConnector.MonitorableSinkTask.metricsName; + assertTrue(metrics.containsKey(taskMetricName)); + assertEquals(CONNECTOR_NAME, taskMetricName.tags().get("connector")); + assertEquals("0", taskMetricName.tags().get("task")); + + KafkaMetric taskMetric = metrics.get(taskMetricName); + // The metric value may not be updated immediately after awaitRecords() returns, + // because MonitorableSinkTask.count is incremented after TestableSinkTask.put() + // which triggers the latch countdown. Use waitForCondition to handle this race condition. + waitForCondition( + () -> (double) NUM_RECORDS_PRODUCED == (double) taskMetric.metricValue(), + METRICS_CONVERGENCE_DURATION_MS, + "Task metric did not converge to expected value in time."); connect.deleteConnector(CONNECTOR_NAME); connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, @@ -132,7 +141,7 @@ public void testMonitorableSinkConnectorAndTask() throws Exception { // verify connector and task metrics have been deleted metrics = connect.connectMetrics().metrics().metrics(); assertFalse(metrics.containsKey(connectorMetric)); - assertFalse(metrics.containsKey(taskMetric)); + assertFalse(metrics.containsKey(taskMetricName)); } /**