Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
9d490f5
Disabled some benchmarks and scaled
stanbrub Mar 13, 2026
47f066f
Scaled up basic math combo
stanbrub Mar 13, 2026
dea74d7
Merge branch 'deephaven:main' into gc-benchmarking
stanbrub Mar 19, 2026
15cf1f4
Added a Local Parquet Generator as opposed to going through Kafka
stanbrub Mar 20, 2026
8604111
Added local parquet generator and 1st training test
stanbrub Mar 24, 2026
83b1c11
Added more train benchmarks. Improved Local Parquet Generator
stanbrub Mar 25, 2026
c552c01
Revert BasicMathCombo
stanbrub Mar 26, 2026
62aa96a
Revert BasicMathCombo
stanbrub Mar 26, 2026
f78ca22
Reverted scale and disabled for pre-train standard tests used for pre…
stanbrub Mar 26, 2026
e5412e7
Parallelized local parquet. worked around directory link failures
stanbrub Mar 31, 2026
ff4d891
Added 1st pass at benchmark even retrieval with JFR
stanbrub Apr 1, 2026
f35ab4f
Merge branch 'deephaven:main' into gc-benchmarking
stanbrub Apr 7, 2026
25629cc
Added jfr events
stanbrub Apr 7, 2026
254cca0
Merge branch 'deephaven:main' into gc-benchmarking
stanbrub Apr 7, 2026
528c365
Added UGP events
stanbrub Apr 9, 2026
bd5ff02
Rescaled only static trained for 120 secs
stanbrub Apr 10, 2026
75449bb
Updated adhoc for local parquet env variables
stanbrub Apr 10, 2026
ec2d95e
Open up dh data dir so local parquet can work
stanbrub Apr 10, 2026
a402a54
More logging for benchmark runs
stanbrub Apr 10, 2026
4cf8357
Scaling back AggBy because of system lockup
stanbrub Apr 10, 2026
8507794
Restrict the number of parquet threads and memory for the runner
stanbrub Apr 10, 2026
c0b5e7a
Fixed NaturalJoin OOM
stanbrub Apr 11, 2026
8f1a77f
Added separate scalling for static vs inc
stanbrub Apr 22, 2026
2938992
Better separation for running static and inc. Added ugp deltas
stanbrub Apr 23, 2026
9b326e0
turn on JFR metrics
stanbrub Apr 23, 2026
7fe14cc
Turn off Inc runs
stanbrub Apr 23, 2026
5e1d59c
Added ss_log budget metric
stanbrub May 5, 2026
a1316d4
Added runner setting for auto tune cycle factor
stanbrub May 6, 2026
af3d82b
G1 inc release max
stanbrub May 15, 2026
28745a8
ParallelGC inc release max
stanbrub May 16, 2026
ac0c87f
Shenandoah GC inc release max
stanbrub May 16, 2026
9d630d9
ZGC inc release max
stanbrub May 16, 2026
dfa44df
Inc release filter min
stanbrub May 17, 2026
3ab1845
Updated for 100ms cycle at 90% min
stanbrub May 21, 2026
e0e763f
Added state log ugp times
stanbrub May 28, 2026
4fd604a
Pared down to two Filter/Nat tests
stanbrub May 28, 2026
07eacdf
Scaled for 1 sec cycles
stanbrub May 29, 2026
90e7153
Set autotune to 90% target release
stanbrub Jun 1, 2026
f18c994
User JVM 25 for adhoc
stanbrub Jun 2, 2026
8240602
Added update listener cycle times instead of server state log ones
stanbrub Jun 3, 2026
9951735
Set 20% inc release
stanbrub Jun 3, 2026
7c18a2c
Change inc release to 40%
stanbrub Jun 3, 2026
ce992b3
Change inc release to 60%
stanbrub Jun 4, 2026
fcde3fa
Change inc release to 80%
stanbrub Jun 4, 2026
ff2795a
Change inc release to 100%
stanbrub Jun 4, 2026
60839de
Run 50ms against zfc
stanbrub Jun 4, 2026
d18049c
Make release filter table smaller based on inc release factor
stanbrub Jun 4, 2026
c43719a
Change inc release to 40%
stanbrub Jun 4, 2026
45d5314
Change inc release to 20%
stanbrub Jun 4, 2026
36b1e83
Change inc release to 40%
stanbrub Jun 4, 2026
d4e0255
Change inc release to 60%
stanbrub Jun 4, 2026
1b351a5
Change inc release to 80%
stanbrub Jun 4, 2026
4f4124a
Change inc release to 100%
stanbrub Jun 5, 2026
11dbba7
Scale for Java 25
stanbrub Jun 5, 2026
c048170
Roll back changes some unwanted changes
stanbrub Jun 5, 2026
c9af58a
Switch to java 17
stanbrub Jun 8, 2026
5364601
Change to JVM 25
stanbrub Jun 9, 2026
4af7393
Rescaled for 100ms benchmarks
stanbrub Jun 10, 2026
920c89c
Changed autotune to 90% for testing
stanbrub Jun 10, 2026
37dc336
Add 1.10% inc target
stanbrub Jun 11, 2026
c6f4a00
Turn off static for now. Do inc 100p
stanbrub Jun 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/resources/adhoc-benchmark-docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
services:
deephaven:
image: ${DOCKER_IMG}
image: ghcr.io/stanbrub/server:jvm25
ports:
- "${DEEPHAVEN_PORT:-10000}:10000"
volumes:
- ./data:/data
- ./minio:/minio
environment:
- "JAVA_OPTS=-XX:+UseG1GC"
- "START_OPTS=-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler ${CONFIG_OPTS}"
- "DEEPHAVEN_HOST_OS_DIR=${ENV_DEEPHAVEN_HOST_OS_DIR}"

redpanda:
command:
Expand Down
2 changes: 1 addition & 1 deletion .github/resources/adhoc-scale-benchmark.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ schema.registry.addr=redpanda:8081
kafka.consumer.addr=redpanda:29092

# Default timeout to complete processes (Executing queries, generating records)
default.completion.timeout=10 minutes
default.completion.timeout=20 minutes

# Default data distribution for column data (random, ascending, descending, runlength)
default.data.distribution=${baseDistrib}
Expand Down
1 change: 1 addition & 0 deletions .github/scripts/manage-deephaven-remote.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ if [[ ${CONFIG_OPTS} == "<default>" ]]; then
CONFIG_OPTS="-Xmx24g"
fi
echo "CONFIG_OPTS=${CONFIG_OPTS}" > .env
echo "ENV_DEEPHAVEN_HOST_OS_DIR=${DEEPHAVEN_DIR}" >> .env

IS_BRANCH="false"
if [[ ${DOCKER_IMG} == *"@sha"*":"* ]]; then
Expand Down
2 changes: 1 addition & 1 deletion .github/scripts/run-benchmarks-remote.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ title "-- Running Benchmarks --"
set +f
cd ${RUN_DIR}
cat ${RUN_TYPE}-scale-benchmark.properties | sed 's|${baseRowCount}|'"${ROW_COUNT}|g" | sed 's|${baseDistrib}|'"${DISTRIB}|g" | sed 's|${userHome}|'"${HOME}|g" > scale-benchmark.properties
JAVA_OPTS=$(echo -Dbenchmark.profile=scale-benchmark.properties -jar deephaven-benchmark-*-standalone.jar -cp standard-tests.jar)
JAVA_OPTS=$(echo -Xmx4g -Dbenchmark.profile=scale-benchmark.properties -jar deephaven-benchmark-*-standalone.jar -cp standard-tests.jar)
set -f

if [ "${TAG_NAME}" = "Any" ]; then
Expand Down
3 changes: 2 additions & 1 deletion .github/scripts/setup-test-server-remote.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ sudo docker system prune --volumes --force
sudo rm -rf ${DEEPHAVEN_DIR}

title "-- Staging Docker Resources --"
mkdir -p ${DEEPHAVEN_DIR}
mkdir -p ${DEEPHAVEN_DIR}/data
chmod 777 ${DEEPHAVEN_DIR}/data
cd ${DEEPHAVEN_DIR}
cp ${GIT_DIR}/benchmark/.github/resources/${RUN_TYPE}-benchmark-docker-compose.yml docker-compose.yml

Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@
<file>${project.basedir}/eclipse-java-google-style.xml</file>
</eclipse>
<licenseHeader>
<content>/* Copyright (c) 2022-$YEAR Deephaven Data Labs and Patent Pending */</content>
<content>/* Copyright (c) $YEAR Deephaven Data Labs and Patent Pending */</content>
</licenseHeader>
</java>
</configuration>
Expand Down Expand Up @@ -271,6 +271,11 @@
<artifactId>kafka-protobuf-serializer</artifactId>
<version>8.1.1</version>
</dependency>
<dependency>
<groupId>blue.strategic.parquet</groupId>
<artifactId>parquet-floor</artifactId>
<version>1.64</version>
</dependency>
<dependency>
<groupId>io.deephaven</groupId>
<artifactId>deephaven-java-client-barrage-dagger</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
* practical purposes, though it is not ideal.
*/
public class CompareTestRunner {
static {
System.setProperty("root.test.package", "io.deephaven.benchmark.tests");
}
final Object testInst;
final Set<String> requiredPackages = new LinkedHashSet<>();
final Map<String, String> downloadFiles = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */
/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */
package io.deephaven.benchmark.tests.standard;

import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -10,6 +10,7 @@
import io.deephaven.benchmark.controller.Controller;
import io.deephaven.benchmark.controller.DeephavenDockerController;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.util.Log;
import io.deephaven.benchmark.util.Timer;

/**
Expand All @@ -21,17 +22,25 @@
* conventions are followed (ex. main file is "source")
*/
final public class StandardTestRunner {
static {
System.setProperty("root.test.package", "io.deephaven.benchmark.tests");
}
final Object testInst;
final List<String> supportTables = new ArrayList<>();
final List<String> setupQueries = new ArrayList<>();
final List<String> preOpQueries = new ArrayList<>();
final List<String> teardownQueries = new ArrayList<>();
final Set<String> requiredServices = new TreeSet<>(List.of("deephaven"));
private String mainTable = "source";
private Bench api;
private Controller controller;
private int rowCountFactor = 1;
private int staticFactor = 1;
private int incFactor = 1;
private int rowCountFactor = 1;
private boolean useCachedSource = true;
private boolean useLocalParquet = false;
private float incCycleFactor = 1.0f;
private long incReleaseRowCount = 1000000;

public StandardTestRunner(Object testInst) {
this.testInst = testInst;
Expand Down Expand Up @@ -96,6 +105,25 @@ public void setServices(String... services) {
requiredServices.addAll(Arrays.asList(services));
}

/**
* Set if the generated tables are loaded into memory before running the test queries.
*
* @return true if in memory source, otherwise false
*/
public void useCachedSource(boolean useMemorySource) {
this.useCachedSource = useMemorySource;
}

/**
* Set if the generated tables are created through Deephaven (i.e. real client-server) or through the local file
* system (i.e. a local copy). The default of "false" is preferred.
*
* @param useLocalParquet false to generate tables through Deephaven, otherwise false
*/
public void useLocalParquet(boolean useLocalParquet) {
this.useLocalParquet = useLocalParquet;
}

/**
* Add a query to be run directly after the main table is loaded. It is not measured. This query can transform the
* main table or supporting table, set up aggregations or updateby operations, etc.
Expand All @@ -117,6 +145,16 @@ public void addPreOpQuery(String query) {
preOpQueries.add(query);
}

/**
* Add a query to be run after everything else is done. This is useful for teardown of any resources after the test
* is run like logging, temporary files, perf table retrieval, etc.
*
* @param query the query to run after the measured operation
*/
public void addTeardownQuery(String query) {
teardownQueries.add(query);
}

/**
* The {@code scale.row.count} property supplies a default for the number of rows generated for benchmark tests.
* Given that some operations use less memory than others, scaling up the generated rows per operation is more
Expand All @@ -143,6 +181,18 @@ public void setScaleFactors(int staticFactor, int incFactor) {
this.incFactor = incFactor;
}

/**
* Set the incremental release filter to use for the incremental test. By default, this is an auto-tuning release
* filter with a target cycle factor of 1.0f.
*
* @param cycleFactor if isAutoTune is true, the target cycle factor, otherwise ignored
* @param releaseRowsCount if isAutoTune is true, ignored, otherwise the number of rows to release per cycle
*/
public void setIncReleaseFilter(float cycleFactor, long releaseRowCount) {
this.incCycleFactor = cycleFactor;
this.incReleaseRowCount = releaseRowCount;
}

/**
* Run a single operation test through the Bench API with no upper bound expected on the resulting row count
*
Expand Down Expand Up @@ -193,40 +243,42 @@ public void test(String name, long maxExpectedRowCount, String operation, String
}
}

long getWarmupRowCount() {
return (long) (api.propertyAsIntegral("warmup.row.count", "0") * rowCountFactor);
public long getGeneratedRowCount() {
return (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor);
}

long getGeneratedRowCount() {
return (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor);
long getWarmupRowCount() {
return (long) (api.propertyAsIntegral("warmup.row.count", "0") * rowCountFactor);
}

long getMaxExpectedRowCount(long expectedRowCount, long scaleFactor) {
return (expectedRowCount < 1) ? Long.MAX_VALUE : expectedRowCount;
}

String getReadOperation(int scaleFactor, long rowCount, String... loadColumns) {
var headRows = (rowCount >= getGeneratedRowCount())?"":".head(${rows})";
var headRows = (rowCount >= getGeneratedRowCount()) ? "" : ".head(${rows})";
var selectStr = useCachedSource ? "select" : "view";
if (scaleFactor > 1 && mainTable.equals("timed") && Arrays.asList(loadColumns).contains("timestamp")) {
var read = """
merge([
read('/data/timed.parquet').view(formulas=[${loadColumns}])${headRows}
bench_api_read('/data/timed.parquet').view(formulas=[${loadColumns}])${headRows}
] * ${scaleFactor}).update_view([
'timestamp=timestamp.plusMillis((long)(ii / ${rows}) * ${rows})'

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't use the timestamp from the file? I have a few worries about doing rowset calculation as part of the benchmark (to come up with ii).

For the actual test benchmarks, without a select we would also just prefer more/bigger parquet files to avoid the overhead of going through the merge data structures. We might even be able to get away with symlinks to have the data just repeate itself.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the "train" benchmarks, since we don't use Scale Factors, that section of code will not be hit. This is only used when we are doing merges to simulate larger data sets. So for the nightly runs, this will happen BEFORE the "select" into memory, which is not included in the measurement. But for the "train" benchmarks, we only read timestamps directly from the parquet file(s), and that only if they are used in the benchmark (like for rollingtime).

]).select()
]).${selectStr}()
""";
read = read.replace("${headRows}",headRows);
read = read.replace("${headRows}", headRows).replace("${selectStr}", selectStr);
return read.replace("${scaleFactor}", "" + scaleFactor).replace("${rows}", "" + rowCount);
}

var read = "read('/data/${mainTable}.parquet')${headRows}.select(formulas=[${loadColumns}])";
var read = "bench_api_read('/data/${mainTable}.parquet')${headRows}.${selectStr}(formulas=[${loadColumns}])";
read = (loadColumns.length == 0) ? ("empty_table(${rows})") : read;

if (scaleFactor > 1) {
read = "merge([${readTable}] * ${scaleFactor})".replace("${readTable}", read);
read = read.replace("${scaleFactor}", "" + scaleFactor);
}
return read.replace("${headRows}",headRows).replace("${rows}", "" + rowCount);
read = read.replace("${headRows}", headRows).replace("${rows}", "" + rowCount);
return read.replace("${selectStr}", selectStr);
}

String getStaticQuery(String name, String operation, long rowCount, String... loadColumns) {
Expand All @@ -241,9 +293,11 @@ String getStaticQuery(String name, String operation, long rowCount, String... lo
bench_api_metrics_start()
print('${logOperationBegin}')

begin_clock = time.time_ns()
begin_time = time.perf_counter_ns()
result = ${operation}
end_time = time.perf_counter_ns()
end_clock = time.time_ns()

print('${logOperationEnd}')
bench_api_metrics_end()
Expand All @@ -253,7 +307,10 @@ String getStaticQuery(String name, String operation, long rowCount, String... lo
double_col("elapsed_nanos", [end_time - begin_time]),
long_col("processed_row_count", [loaded_tbl_size]),
long_col("result_row_count", [result.size]),
long_col("begin_clock_nanos", [begin_clock]),
long_col("end_clock_nanos", [end_clock]),
])
${teardownQueries}
""";
var read = getReadOperation(staticFactor, rowCount, loadColumns);
return populateQuery(name, staticQuery, operation, read, loadColumns);
Expand All @@ -268,16 +325,21 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC
loaded_tbl_size = ${mainTable}.size
${setupQueries}

autotune = jpy.get_type('io.deephaven.engine.table.impl.select.AutoTuningIncrementalReleaseFilter')
source_filter = autotune(0, 1000000, 1.0, True)
isat = System.getProperty('train.autotune', 'true').lower() == 'true' or ${incReleaseRowCount} <= 0
filter_name = 'AutoTuningIncrementalReleaseFilter' if isat else 'IncrementalReleaseFilter'
autotune = jpy.get_type(f'io.deephaven.engine.table.impl.select.{filter_name}')
print("******* ISAT:",isat, "FILTER:", filter_name)

source_filter = autotune(0,1000000,${incCycleFactor},True) if isat else autotune(0,${incReleaseRowCount})
${mainTable} = ${mainTable}.where(source_filter)
if right:
right_filter = autotune(0, 1010000, 1.0, True)
right_filter = autotune(0,1010000,${incCycleFactor},True) if isat else autotune(0,${incReleaseRowCount})
right = right.where(right_filter)

${preOpQueries}
bench_api_metrics_start()
print('${logOperationBegin}')
begin_clock = time.time_ns()
begin_time = time.perf_counter_ns()
result = ${operation}

Expand All @@ -291,30 +353,39 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC
source_filter.waitForCompletion()

end_time = time.perf_counter_ns()
end_clock = time.time_ns()
print('${logOperationEnd}')
bench_api_metrics_end()
standard_metrics = bench_api_metrics_collect()

stats = new_table([
double_col("elapsed_nanos", [end_time - begin_time]),
long_col("processed_row_count", [loaded_tbl_size]),
long_col("result_row_count", [result.size])
long_col("result_row_count", [result.size]),
long_col("begin_clock_nanos", [begin_clock]),
long_col("end_clock_nanos", [end_clock]),
])
${teardownQueries}
print("STANDARD EVENTS: ", f'start_ns > {begin_clock}L', f'start_ns < {end_clock}L')
standard_events = standard_events.where([f'start_ns > {begin_clock}L', f'start_ns < {end_clock}L'])
""";
var read = getReadOperation(incFactor, rowCount, loadColumns);
return populateQuery(name, incQuery, operation, read, loadColumns);
}

String populateQuery(String name, String query, String operation, String read, String... loadColumns) {
query = query.replace("${readTable}", read);
query = query.replace("${mainTable}", mainTable);
query = query.replace("${loadSupportTables}", loadSupportTables());
query = query.replace("${loadColumns}", listStr(loadColumns));
query = query.replace("${setupQueries}", String.join("\n", setupQueries));
query = query.replace("${preOpQueries}", String.join("\n", preOpQueries));
query = query.replace("${operation}", operation);
query = query.replace("${teardownQueries}", String.join("\n", teardownQueries));
query = query.replace("${logOperationBegin}", getLogSnippet("Begin", name));
query = query.replace("${logOperationEnd}", getLogSnippet("End", name));
query = query.replace("${incCycleFactor}", "" + incCycleFactor);
query = query.replace("${incReleaseRowCount}", "" + incReleaseRowCount);
query = query.replace("${mainTable}", mainTable);
return query;
}

Expand All @@ -326,6 +397,7 @@ Result runTest(String name, String warmupQuery, String mainQuery) {
stopUnusedServices(requiredServices);

try {
Log.info("Running Test: %s", name);
if (getWarmupRowCount() > 0)
api.query(warmupQuery).execute();
var result = new AtomicReference<Result>();
Expand All @@ -342,6 +414,8 @@ Result runTest(String name, String warmupQuery, String mainQuery) {
metrics.set("inc.factor", incFactor);
metrics.set("row.factor", rowCountFactor);
api.metrics().add(metrics);
}).fetchAfter("standard_events", table -> {
api.events().add(table);
}).execute();
api.result().test("deephaven-engine", result.get().elapsedTime(), result.get().loadedRowCount());
return result.get();
Expand All @@ -356,7 +430,7 @@ String listStr(String... values) {
}

String loadSupportTables() {
return supportTables.stream().map(t -> t + " = read('/data/" + t + ".parquet').select()\n")
return supportTables.stream().map(t -> t + " = bench_api_read('/data/" + t + ".parquet').select()\n")
.collect(Collectors.joining(""));
}

Expand Down Expand Up @@ -435,7 +509,7 @@ boolean generateNamedTable(String name, String distribution, String[] groups) {
}

boolean generateSourceTable(String distribution, String[] groups) {
return api.table("source")
var t = api.table("source")
.add("num1", "double", "[0-4]", distribution)
.add("num2", "double", "[1-10]", distribution)
.add("key1", "string", "[1-100]", distribution)
Expand All @@ -444,8 +518,8 @@ boolean generateSourceTable(String distribution, String[] groups) {
.add("key4", "int", "[0-98]", distribution)
.add("key5", "string", "[1-1000000]", distribution)
.withRowCount(getGeneratedRowCount())
.withColumnGrouping(groups)
.generateParquet();
.withColumnGrouping(groups);
return useLocalParquet ? t.generateLocalParquet() : t.generateParquet();
}

boolean generateRightTable(String distribution, String[] groups) {
Expand All @@ -469,7 +543,7 @@ boolean generateRightTable(String distribution, String[] groups) {
boolean generateTimedTable(String distribution, String[] groups) {
long minTime = 1676557157537L;
long maxTime = minTime + getGeneratedRowCount() - 1;
return api.table("timed")
var t = api.table("timed")
.add("timestamp", "timestamp-millis", "[" + minTime + "-" + maxTime + "]", "ascending")
.add("num1", "double", "[0-4]", distribution)
.add("num2", "double", "[1-10]", distribution)
Expand All @@ -478,8 +552,8 @@ boolean generateTimedTable(String distribution, String[] groups) {
.add("key3", "int", "[0-8]", distribution)
.add("key4", "int", "[0-98]", distribution)
.withFixedRowCount(true)
.withColumnGrouping(groups)
.generateParquet();
.withColumnGrouping(groups);
return useLocalParquet ? t.generateLocalParquet() : t.generateParquet();
}

record Result(long loadedRowCount, Duration elapsedTime, long resultRowCount) {
Expand Down
Loading
Loading