From fdb0eca5aac2ef4d7e8dbc21c4dc7ac3c79ed077 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 30 Jun 2026 12:35:08 -0700 Subject: [PATCH 1/2] Add opt-in Workload.drainProducerRate field (default -1 = unset) Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Oren Leiman --- .../io/openmessaging/benchmark/Workload.java | 8 +++++ .../openmessaging/benchmark/WorkloadTest.java | 33 +++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 benchmark-framework/src/test/java/io/openmessaging/benchmark/WorkloadTest.java diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java index 21a6b682..ddcf8cd1 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Workload.java @@ -67,6 +67,14 @@ public class Workload { */ public long consumerBacklogSizeGB = 0; + /** + * Producer publish rate (msg/s) to apply during the backlog-drain phase. + * Default -1 means "unset" — producers keep producerRate through the drain + * (original behavior). >= 0 throttles producers for the drain only; 0 is + * floored to ~1 msg/s by Worker.adjustPublishRate (effectively stopped). + */ + public int drainProducerRate = -1; + public int warmupDurationMinutes = 30; public int sampleRateMillis = 10000; public int testDurationMinutes; diff --git a/benchmark-framework/src/test/java/io/openmessaging/benchmark/WorkloadTest.java b/benchmark-framework/src/test/java/io/openmessaging/benchmark/WorkloadTest.java new file mode 100644 index 00000000..6c683a47 --- /dev/null +++ b/benchmark-framework/src/test/java/io/openmessaging/benchmark/WorkloadTest.java @@ -0,0 +1,33 @@ +package io.openmessaging.benchmark; + +import static org.junit.Assert.assertEquals; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.junit.Test; + +public class WorkloadTest { + + // Mirror the strict YAML workload mapper used in Benchmark.java. + private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory()); + + private static final String BASE = + "name: drain-test\n" + + "topics: 1\n" + + "partitionsPerTopic: 1\n" + + "messageSize: 100\n" + + "producerRate: 1000\n" + + "testDurationMinutes: 1\n"; + + @Test + public void drainProducerRateDefaultsToUnset() throws Exception { + Workload w = MAPPER.readValue(BASE, Workload.class); + assertEquals(-1, w.drainProducerRate); + } + + @Test + public void drainProducerRateParsesWhenPresent() throws Exception { + Workload w = MAPPER.readValue(BASE + "drainProducerRate: 5000\n", Workload.class); + assertEquals(5000, w.drainProducerRate); + } +} From 95f0ac4ef565265e0f336bd52dfa94b43952c750 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 30 Jun 2026 12:48:37 -0700 Subject: [PATCH 2/2] Throttle producers during backlog drain when drainProducerRate is set Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Oren Leiman --- .../java/io/openmessaging/benchmark/WorkloadGenerator.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java index 03498f81..357edc09 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java @@ -391,6 +391,11 @@ private void buildAndDrainBacklog(List topics) throws IOException { log.info("--- Start draining backlog ---"); + if (workload.drainProducerRate >= 0) { + log.info("Throttling producers to {} msg/s for backlog drain", workload.drainProducerRate); + worker.adjustPublishRate(workload.drainProducerRate); + } + worker.resumeConsumers(); final long minBacklog = 1000;