diff --git a/.github/resources/adhoc-benchmark-docker-compose.yml b/.github/resources/adhoc-benchmark-docker-compose.yml index c02c30e3..e83c6db9 100644 --- a/.github/resources/adhoc-benchmark-docker-compose.yml +++ b/.github/resources/adhoc-benchmark-docker-compose.yml @@ -1,13 +1,15 @@ services: deephaven: - image: ${DOCKER_IMG} + image: ghcr.io/stanbrub/server:jvm25 ports: - "${DEEPHAVEN_PORT:-10000}:10000" volumes: - ./data:/data - ./minio:/minio environment: + - "JAVA_OPTS=-XX:+UseG1GC" - "START_OPTS=-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler ${CONFIG_OPTS}" + - "DEEPHAVEN_HOST_OS_DIR=${ENV_DEEPHAVEN_HOST_OS_DIR}" redpanda: command: diff --git a/.github/resources/adhoc-scale-benchmark.properties b/.github/resources/adhoc-scale-benchmark.properties index adab86ad..31a2fd15 100644 --- a/.github/resources/adhoc-scale-benchmark.properties +++ b/.github/resources/adhoc-scale-benchmark.properties @@ -15,7 +15,7 @@ schema.registry.addr=redpanda:8081 kafka.consumer.addr=redpanda:29092 # Default timeout to complete processes (Executing queries, generating records) -default.completion.timeout=10 minutes +default.completion.timeout=20 minutes # Default data distribution for column data (random, ascending, descending, runlength) default.data.distribution=${baseDistrib} diff --git a/.github/scripts/manage-deephaven-remote.sh b/.github/scripts/manage-deephaven-remote.sh index e1e49cf8..57af19cb 100755 --- a/.github/scripts/manage-deephaven-remote.sh +++ b/.github/scripts/manage-deephaven-remote.sh @@ -35,6 +35,7 @@ if [[ ${CONFIG_OPTS} == "" ]]; then CONFIG_OPTS="-Xmx24g" fi echo "CONFIG_OPTS=${CONFIG_OPTS}" > .env +echo "ENV_DEEPHAVEN_HOST_OS_DIR=${DEEPHAVEN_DIR}" >> .env IS_BRANCH="false" if [[ ${DOCKER_IMG} == *"@sha"*":"* ]]; then diff --git a/.github/scripts/run-benchmarks-remote.sh b/.github/scripts/run-benchmarks-remote.sh index b28a25c1..259915e6 100755 --- a/.github/scripts/run-benchmarks-remote.sh +++ b/.github/scripts/run-benchmarks-remote.sh @@ -43,7 +43,7 @@ title "-- Running Benchmarks --" set +f cd ${RUN_DIR} cat ${RUN_TYPE}-scale-benchmark.properties | sed 's|${baseRowCount}|'"${ROW_COUNT}|g" | sed 's|${baseDistrib}|'"${DISTRIB}|g" | sed 's|${userHome}|'"${HOME}|g" > scale-benchmark.properties -JAVA_OPTS=$(echo -Dbenchmark.profile=scale-benchmark.properties -jar deephaven-benchmark-*-standalone.jar -cp standard-tests.jar) +JAVA_OPTS=$(echo -Xmx4g -Dbenchmark.profile=scale-benchmark.properties -jar deephaven-benchmark-*-standalone.jar -cp standard-tests.jar) set -f if [ "${TAG_NAME}" = "Any" ]; then diff --git a/.github/scripts/setup-test-server-remote.sh b/.github/scripts/setup-test-server-remote.sh index 43033300..920e6c4d 100755 --- a/.github/scripts/setup-test-server-remote.sh +++ b/.github/scripts/setup-test-server-remote.sh @@ -135,7 +135,8 @@ sudo docker system prune --volumes --force sudo rm -rf ${DEEPHAVEN_DIR} title "-- Staging Docker Resources --" -mkdir -p ${DEEPHAVEN_DIR} +mkdir -p ${DEEPHAVEN_DIR}/data +chmod 777 ${DEEPHAVEN_DIR}/data cd ${DEEPHAVEN_DIR} cp ${GIT_DIR}/benchmark/.github/resources/${RUN_TYPE}-benchmark-docker-compose.yml docker-compose.yml diff --git a/pom.xml b/pom.xml index 304b6632..70a4bcfa 100644 --- a/pom.xml +++ b/pom.xml @@ -178,7 +178,7 @@ ${project.basedir}/eclipse-java-google-style.xml - /* Copyright (c) 2022-$YEAR Deephaven Data Labs and Patent Pending */ + /* Copyright (c) $YEAR Deephaven Data Labs and Patent Pending */ @@ -271,6 +271,11 @@ kafka-protobuf-serializer 8.1.1 + + blue.strategic.parquet + parquet-floor + 1.64 + io.deephaven deephaven-java-client-barrage-dagger diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java index 755633f7..b7b31fe4 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java @@ -28,6 +28,9 @@ * practical purposes, though it is not ideal. */ public class CompareTestRunner { + static { + System.setProperty("root.test.package", "io.deephaven.benchmark.tests"); + } final Object testInst; final Set requiredPackages = new LinkedHashSet<>(); final Map downloadFiles = new LinkedHashMap<>(); diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java index 3ca12176..5f3da6e5 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.tests.standard; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -10,6 +10,7 @@ import io.deephaven.benchmark.controller.Controller; import io.deephaven.benchmark.controller.DeephavenDockerController; import io.deephaven.benchmark.metric.Metrics; +import io.deephaven.benchmark.util.Log; import io.deephaven.benchmark.util.Timer; /** @@ -21,17 +22,25 @@ * conventions are followed (ex. main file is "source") */ final public class StandardTestRunner { + static { + System.setProperty("root.test.package", "io.deephaven.benchmark.tests"); + } final Object testInst; final List supportTables = new ArrayList<>(); final List setupQueries = new ArrayList<>(); final List preOpQueries = new ArrayList<>(); + final List teardownQueries = new ArrayList<>(); final Set requiredServices = new TreeSet<>(List.of("deephaven")); private String mainTable = "source"; private Bench api; private Controller controller; + private int rowCountFactor = 1; private int staticFactor = 1; private int incFactor = 1; - private int rowCountFactor = 1; + private boolean useCachedSource = true; + private boolean useLocalParquet = false; + private float incCycleFactor = 1.0f; + private long incReleaseRowCount = 1000000; public StandardTestRunner(Object testInst) { this.testInst = testInst; @@ -96,6 +105,25 @@ public void setServices(String... services) { requiredServices.addAll(Arrays.asList(services)); } + /** + * Set if the generated tables are loaded into memory before running the test queries. + * + * @return true if in memory source, otherwise false + */ + public void useCachedSource(boolean useMemorySource) { + this.useCachedSource = useMemorySource; + } + + /** + * Set if the generated tables are created through Deephaven (i.e. real client-server) or through the local file + * system (i.e. a local copy). The default of "false" is preferred. + * + * @param useLocalParquet false to generate tables through Deephaven, otherwise false + */ + public void useLocalParquet(boolean useLocalParquet) { + this.useLocalParquet = useLocalParquet; + } + /** * Add a query to be run directly after the main table is loaded. It is not measured. This query can transform the * main table or supporting table, set up aggregations or updateby operations, etc. @@ -117,6 +145,16 @@ public void addPreOpQuery(String query) { preOpQueries.add(query); } + /** + * Add a query to be run after everything else is done. This is useful for teardown of any resources after the test + * is run like logging, temporary files, perf table retrieval, etc. + * + * @param query the query to run after the measured operation + */ + public void addTeardownQuery(String query) { + teardownQueries.add(query); + } + /** * The {@code scale.row.count} property supplies a default for the number of rows generated for benchmark tests. * Given that some operations use less memory than others, scaling up the generated rows per operation is more @@ -143,6 +181,18 @@ public void setScaleFactors(int staticFactor, int incFactor) { this.incFactor = incFactor; } + /** + * Set the incremental release filter to use for the incremental test. By default, this is an auto-tuning release + * filter with a target cycle factor of 1.0f. + * + * @param cycleFactor if isAutoTune is true, the target cycle factor, otherwise ignored + * @param releaseRowsCount if isAutoTune is true, ignored, otherwise the number of rows to release per cycle + */ + public void setIncReleaseFilter(float cycleFactor, long releaseRowCount) { + this.incCycleFactor = cycleFactor; + this.incReleaseRowCount = releaseRowCount; + } + /** * Run a single operation test through the Bench API with no upper bound expected on the resulting row count * @@ -193,12 +243,12 @@ public void test(String name, long maxExpectedRowCount, String operation, String } } - long getWarmupRowCount() { - return (long) (api.propertyAsIntegral("warmup.row.count", "0") * rowCountFactor); + public long getGeneratedRowCount() { + return (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor); } - long getGeneratedRowCount() { - return (long) (api.propertyAsIntegral("scale.row.count", "100000") * rowCountFactor); + long getWarmupRowCount() { + return (long) (api.propertyAsIntegral("warmup.row.count", "0") * rowCountFactor); } long getMaxExpectedRowCount(long expectedRowCount, long scaleFactor) { @@ -206,27 +256,29 @@ long getMaxExpectedRowCount(long expectedRowCount, long scaleFactor) { } String getReadOperation(int scaleFactor, long rowCount, String... loadColumns) { - var headRows = (rowCount >= getGeneratedRowCount())?"":".head(${rows})"; + var headRows = (rowCount >= getGeneratedRowCount()) ? "" : ".head(${rows})"; + var selectStr = useCachedSource ? "select" : "view"; if (scaleFactor > 1 && mainTable.equals("timed") && Arrays.asList(loadColumns).contains("timestamp")) { var read = """ merge([ - read('/data/timed.parquet').view(formulas=[${loadColumns}])${headRows} + bench_api_read('/data/timed.parquet').view(formulas=[${loadColumns}])${headRows} ] * ${scaleFactor}).update_view([ 'timestamp=timestamp.plusMillis((long)(ii / ${rows}) * ${rows})' - ]).select() + ]).${selectStr}() """; - read = read.replace("${headRows}",headRows); + read = read.replace("${headRows}", headRows).replace("${selectStr}", selectStr); return read.replace("${scaleFactor}", "" + scaleFactor).replace("${rows}", "" + rowCount); } - var read = "read('/data/${mainTable}.parquet')${headRows}.select(formulas=[${loadColumns}])"; + var read = "bench_api_read('/data/${mainTable}.parquet')${headRows}.${selectStr}(formulas=[${loadColumns}])"; read = (loadColumns.length == 0) ? ("empty_table(${rows})") : read; if (scaleFactor > 1) { read = "merge([${readTable}] * ${scaleFactor})".replace("${readTable}", read); read = read.replace("${scaleFactor}", "" + scaleFactor); } - return read.replace("${headRows}",headRows).replace("${rows}", "" + rowCount); + read = read.replace("${headRows}", headRows).replace("${rows}", "" + rowCount); + return read.replace("${selectStr}", selectStr); } String getStaticQuery(String name, String operation, long rowCount, String... loadColumns) { @@ -241,9 +293,11 @@ String getStaticQuery(String name, String operation, long rowCount, String... lo bench_api_metrics_start() print('${logOperationBegin}') + begin_clock = time.time_ns() begin_time = time.perf_counter_ns() result = ${operation} end_time = time.perf_counter_ns() + end_clock = time.time_ns() print('${logOperationEnd}') bench_api_metrics_end() @@ -253,7 +307,10 @@ String getStaticQuery(String name, String operation, long rowCount, String... lo double_col("elapsed_nanos", [end_time - begin_time]), long_col("processed_row_count", [loaded_tbl_size]), long_col("result_row_count", [result.size]), + long_col("begin_clock_nanos", [begin_clock]), + long_col("end_clock_nanos", [end_clock]), ]) + ${teardownQueries} """; var read = getReadOperation(staticFactor, rowCount, loadColumns); return populateQuery(name, staticQuery, operation, read, loadColumns); @@ -268,16 +325,21 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC loaded_tbl_size = ${mainTable}.size ${setupQueries} - autotune = jpy.get_type('io.deephaven.engine.table.impl.select.AutoTuningIncrementalReleaseFilter') - source_filter = autotune(0, 1000000, 1.0, True) + isat = System.getProperty('train.autotune', 'true').lower() == 'true' or ${incReleaseRowCount} <= 0 + filter_name = 'AutoTuningIncrementalReleaseFilter' if isat else 'IncrementalReleaseFilter' + autotune = jpy.get_type(f'io.deephaven.engine.table.impl.select.{filter_name}') + print("******* ISAT:",isat, "FILTER:", filter_name) + + source_filter = autotune(0,1000000,${incCycleFactor},True) if isat else autotune(0,${incReleaseRowCount}) ${mainTable} = ${mainTable}.where(source_filter) if right: - right_filter = autotune(0, 1010000, 1.0, True) + right_filter = autotune(0,1010000,${incCycleFactor},True) if isat else autotune(0,${incReleaseRowCount}) right = right.where(right_filter) ${preOpQueries} bench_api_metrics_start() print('${logOperationBegin}') + begin_clock = time.time_ns() begin_time = time.perf_counter_ns() result = ${operation} @@ -291,6 +353,7 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC source_filter.waitForCompletion() end_time = time.perf_counter_ns() + end_clock = time.time_ns() print('${logOperationEnd}') bench_api_metrics_end() standard_metrics = bench_api_metrics_collect() @@ -298,8 +361,13 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC stats = new_table([ double_col("elapsed_nanos", [end_time - begin_time]), long_col("processed_row_count", [loaded_tbl_size]), - long_col("result_row_count", [result.size]) + long_col("result_row_count", [result.size]), + long_col("begin_clock_nanos", [begin_clock]), + long_col("end_clock_nanos", [end_clock]), ]) + ${teardownQueries} + print("STANDARD EVENTS: ", f'start_ns > {begin_clock}L', f'start_ns < {end_clock}L') + standard_events = standard_events.where([f'start_ns > {begin_clock}L', f'start_ns < {end_clock}L']) """; var read = getReadOperation(incFactor, rowCount, loadColumns); return populateQuery(name, incQuery, operation, read, loadColumns); @@ -307,14 +375,17 @@ String getIncQuery(String name, String operation, long rowCount, String... loadC String populateQuery(String name, String query, String operation, String read, String... loadColumns) { query = query.replace("${readTable}", read); - query = query.replace("${mainTable}", mainTable); query = query.replace("${loadSupportTables}", loadSupportTables()); query = query.replace("${loadColumns}", listStr(loadColumns)); query = query.replace("${setupQueries}", String.join("\n", setupQueries)); query = query.replace("${preOpQueries}", String.join("\n", preOpQueries)); query = query.replace("${operation}", operation); + query = query.replace("${teardownQueries}", String.join("\n", teardownQueries)); query = query.replace("${logOperationBegin}", getLogSnippet("Begin", name)); query = query.replace("${logOperationEnd}", getLogSnippet("End", name)); + query = query.replace("${incCycleFactor}", "" + incCycleFactor); + query = query.replace("${incReleaseRowCount}", "" + incReleaseRowCount); + query = query.replace("${mainTable}", mainTable); return query; } @@ -326,6 +397,7 @@ Result runTest(String name, String warmupQuery, String mainQuery) { stopUnusedServices(requiredServices); try { + Log.info("Running Test: %s", name); if (getWarmupRowCount() > 0) api.query(warmupQuery).execute(); var result = new AtomicReference(); @@ -342,6 +414,8 @@ Result runTest(String name, String warmupQuery, String mainQuery) { metrics.set("inc.factor", incFactor); metrics.set("row.factor", rowCountFactor); api.metrics().add(metrics); + }).fetchAfter("standard_events", table -> { + api.events().add(table); }).execute(); api.result().test("deephaven-engine", result.get().elapsedTime(), result.get().loadedRowCount()); return result.get(); @@ -356,7 +430,7 @@ String listStr(String... values) { } String loadSupportTables() { - return supportTables.stream().map(t -> t + " = read('/data/" + t + ".parquet').select()\n") + return supportTables.stream().map(t -> t + " = bench_api_read('/data/" + t + ".parquet').select()\n") .collect(Collectors.joining("")); } @@ -435,7 +509,7 @@ boolean generateNamedTable(String name, String distribution, String[] groups) { } boolean generateSourceTable(String distribution, String[] groups) { - return api.table("source") + var t = api.table("source") .add("num1", "double", "[0-4]", distribution) .add("num2", "double", "[1-10]", distribution) .add("key1", "string", "[1-100]", distribution) @@ -444,8 +518,8 @@ boolean generateSourceTable(String distribution, String[] groups) { .add("key4", "int", "[0-98]", distribution) .add("key5", "string", "[1-1000000]", distribution) .withRowCount(getGeneratedRowCount()) - .withColumnGrouping(groups) - .generateParquet(); + .withColumnGrouping(groups); + return useLocalParquet ? t.generateLocalParquet() : t.generateParquet(); } boolean generateRightTable(String distribution, String[] groups) { @@ -469,7 +543,7 @@ boolean generateRightTable(String distribution, String[] groups) { boolean generateTimedTable(String distribution, String[] groups) { long minTime = 1676557157537L; long maxTime = minTime + getGeneratedRowCount() - 1; - return api.table("timed") + var t = api.table("timed") .add("timestamp", "timestamp-millis", "[" + minTime + "-" + maxTime + "]", "ascending") .add("num1", "double", "[0-4]", distribution) .add("num2", "double", "[1-10]", distribution) @@ -478,8 +552,8 @@ boolean generateTimedTable(String distribution, String[] groups) { .add("key3", "int", "[0-8]", distribution) .add("key4", "int", "[0-98]", distribution) .withFixedRowCount(true) - .withColumnGrouping(groups) - .generateParquet(); + .withColumnGrouping(groups); + return useLocalParquet ? t.generateLocalParquet() : t.generateParquet(); } record Result(long loadedRowCount, Duration elapsedTime, long resultRowCount) { diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java index 5fadc8ba..4126feac 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java @@ -14,6 +14,9 @@ * Test reading and writing parquet files with various data types and compression codecs. */ class FileTestRunner { + static { + System.setProperty("root.test.package", "io.deephaven.benchmark.tests"); + } final String parquetCfg = "max_dictionary_keys=1048576, max_dictionary_size=1048576, target_page_size=65536"; final Object testInst; final Set requiredServices = new TreeSet<>(List.of("deephaven")); diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java index d65014d8..c919f852 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java @@ -19,6 +19,9 @@ * append/blink table types. Results are checked to ensure the correct number of rows has been processed. */ class KafkaTestRunner { + static { + System.setProperty("root.test.package", "io.deephaven.benchmark.tests"); + } final Object testInst; final Bench api; final Controller controller; diff --git a/src/it/java/io/deephaven/benchmark/tests/train/AggByTrainTest.java b/src/it/java/io/deephaven/benchmark/tests/train/AggByTrainTest.java new file mode 100644 index 00000000..5d9bd4f5 --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/train/AggByTrainTest.java @@ -0,0 +1,42 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.train; + +import org.junit.jupiter.api.*; + +/** + * Training tests for the aggBy table operations that do aggregations (e.g. sum, std, min/max. var, avg). See + * TrainTestRunner for more information. + */ +public class AggByTrainTest { + final TrainTestRunner runner = new TrainTestRunner(this); + + void setup(double staticRowFactor, double incRowFactor) { + runner.tables(staticRowFactor, incRowFactor, "timed"); + + var setupStr = """ + from deephaven import agg + + aggs = [ + agg.sum_('Sum=num1'), agg.std('Std=num2'), agg.min_('Min=num1'), agg.max_('Max=num2'), + agg.avg('Avg=num1'), agg.var('Var=num2'), agg.count_('num1') + ] + """; + runner.addSetupQuery(setupStr); + } + + @Test + @Disabled + void aggBy0Groups() { + setup(572, 286); + var q = "timed.agg_by(aggs)"; + runner.test("AggBy- No Groups", 1, q, "num1", "num2"); + } + + @Test + void aggBy2Groups() { + setup(66, 38); + var q = "timed.agg_by(aggs, by=['key1', 'key2'])"; + runner.test("AggBy- 2 Groups 10K Unique Combos", 10100, q, "key1", "key2", "num1", "num2"); + } + +} diff --git a/src/it/java/io/deephaven/benchmark/tests/train/FilterTrainTest.java b/src/it/java/io/deephaven/benchmark/tests/train/FilterTrainTest.java new file mode 100644 index 00000000..a3721852 --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/train/FilterTrainTest.java @@ -0,0 +1,46 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.train; + +import org.junit.jupiter.api.*; + +/** + * Standard tests for the whereIn table operation. Filters rows of data from the source table where the rows match + * column values in the filter table. + */ +public class FilterTrainTest { + final TrainTestRunner runner = new TrainTestRunner(this); + + void setup(double staticRowFactor, double incRowFactor) { + runner.tables(staticRowFactor, incRowFactor, "timed"); + var setup = """ + from deephaven.column import string_col, int_col + where_filter = new_table([ + string_col("set1", ['1', '2', '3', '4', '5', '6', '7', '8']), + string_col("set2", ['10', '20', '30', '40', '50', '60', '70', '80']), + int_col("set3", [-1, -2, -3, -4, 1, 2, 3, 4]) + ]) + """; + runner.addSetupQuery(setup); + } + + @Test + @Disabled + void filter2Cols() { + runner.setIncReleaseRowCount(3382734); + setup(815, 815); + var q = "timed.where_in(where_filter, cols=['key1 = set1']).where(['inRange(num1, 0, 100)'])"; + runner.test("Filter- 2 Cols", 0, q, "key1", "key2", "num1"); + } + + @Test + void filter3Cols() { + runner.setIncReleaseRowCount(3468549); + setup(620, 620); + var q = """ + timed.where_in(where_filter, cols=['key1 = set1', 'key2 = set2', 'key3 = set3']) \ + .where(filters=["key1 = '1'", 'inRange(num1, 0, 100)', 'key3 in -2, -1, 0, 1, 2']) + """; + runner.test("Filter- 3 Cols", 0, q, "key1", "key2", "key3", "num1"); + } + +} diff --git a/src/it/java/io/deephaven/benchmark/tests/train/FormulaTrainTest.java b/src/it/java/io/deephaven/benchmark/tests/train/FormulaTrainTest.java new file mode 100644 index 00000000..c136c639 --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/train/FormulaTrainTest.java @@ -0,0 +1,51 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.train; + +import org.junit.jupiter.api.*; + +/** + * Training tests for the formula table operations (e.g. udf, inline). See TrainTestRunner for more + * information. + */ +public class FormulaTrainTest { + final TrainTestRunner runner = new TrainTestRunner(this); + + void setup(double staticRowFactor, double incRowFactor) { + runner.tables(staticRowFactor, incRowFactor, "timed"); + } + + @Test + void formulaUdf() { + setup(9, 9); + var setup = """ + def f_py(num1: float, num2: float) -> float: + return (num2 + num1) / 2 + def f_np(num1: np.float64, num2: np.float64) -> np.float64: + return num1 + num2 + """; + runner.addSetupQuery(setup); + var q = "timed.view(['New1 = f_py(num1, num2)','New2 = f_np(num1, num2)']).sum_by()"; + runner.test("Formula- UDF 2 Calcs", 1, q, "num1", "num2"); + } + + @Test + void formulaInline() { + setup(467, 467); + var q = "timed.view(['New1 = (float)((num2 + num1) / 2)', 'New2 = (float)(num1 + num2)']).sum_by()"; + runner.test("Formula- Inline 2 Calcs", 1, q, "num1", "num2"); + } + + @Test + @Disabled + void formulaDate() { + setup(3, 3); + var q = """ + timed.view([ + 'New1 = parseDuration(`PT4H52M14S`).toHours()', + 'New1 = parseInstant(`2023-05-31T04:52:14.001 ET`).getEpochSecond()' + ]).sum_by() + """; + runner.test("Formula- Inline 2 Dates", 1, q, "num1", "num2"); + } + +} diff --git a/src/it/java/io/deephaven/benchmark/tests/train/NaturalJoinTrainTest.java b/src/it/java/io/deephaven/benchmark/tests/train/NaturalJoinTrainTest.java new file mode 100644 index 00000000..eb2559f4 --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/train/NaturalJoinTrainTest.java @@ -0,0 +1,36 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.train; + +import org.junit.jupiter.api.*; + +/** + * Training tests for the aggBy table operations that do joins (e.g. natural join). See TrainTestRunner for + * more information. + */ +public class NaturalJoinTrainTest { + final TrainTestRunner runner = new TrainTestRunner(this); + + void setup(double staticRowFactor, double incRowFactor) { + runner.tables(staticRowFactor, incRowFactor, "timed", "right"); + } + + @Test + @Disabled + void naturalJoinOn1Col() { + runner.setIncReleaseRowCount(448939); + setup(230, 120); + var r = "right = right.select_distinct(['r_wild'])"; + runner.addSetupQuery(r); + var q = "timed.natural_join(right, on=['key1 = r_wild'])"; + runner.test("NaturalJoin- Join On 1 Col", 0, q, "key1", "num1"); + } + + @Test + void naturalJoinOn3Cols() { + runner.setIncReleaseRowCount(52043); + setup(100, 20); + var q = "timed.natural_join(right, on=['key1 = r_wild', 'key2 = r_key2', 'key1 = r_key1'])"; + runner.test("NaturalJoin- Join On 3 Cols", 0, q, "key1", "key2", "num1"); + } + +} diff --git a/src/it/java/io/deephaven/benchmark/tests/train/OrderedTrainTest.java b/src/it/java/io/deephaven/benchmark/tests/train/OrderedTrainTest.java new file mode 100644 index 00000000..961f1c5d --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/train/OrderedTrainTest.java @@ -0,0 +1,43 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.train; + +import org.junit.jupiter.api.*; + +/** + * Training tests for the aggBy table operations that do ordering (e.g.. median, percentile, sorted_first/last). See + * TrainTestRunner for more information. + * + */ +public class OrderedTrainTest { + final TrainTestRunner runner = new TrainTestRunner(this); + + void setup(double staticRowFactor, double incRowFactor) { + runner.tables(staticRowFactor, incRowFactor, "timed"); + + var setupStr = """ + from deephaven import agg + aggs = [ + agg.median('Median=num1'), agg.pct(0.50, ['Percentile=num1']), + agg.unique('Unique=num2'), agg.sorted_first('key4', ['num2']), + agg.sorted_last('key3', ['num1']) + ] + """; + runner.addSetupQuery(setupStr); + } + + @Test + @Disabled + void ordered0Groups() { + setup(145, 18); + var q = "timed.agg_by(aggs)"; + runner.test("Ordered- No Groups", 100, q, "key3", "key4", "num1", "num2"); + } + + @Test + void ordered2Groups() { + setup(30, 5.7); + var q = "timed.agg_by(aggs, by=['key1', 'key2'])"; + runner.test("Ordered- 2 Groups 10K Unique Combos", 10100, q, "key1", "key2", "key3", "key4", "num1", "num2"); + } + +} diff --git a/src/it/java/io/deephaven/benchmark/tests/train/TrainTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/train/TrainTestRunner.java new file mode 100644 index 00000000..08bc4fe4 --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/train/TrainTestRunner.java @@ -0,0 +1,248 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.train; + +import java.util.*; +import io.deephaven.benchmark.tests.standard.StandardTestRunner; + +/** + * A wrapper for the Bench api that allows the running of operational training (think AOT) tests without requiring the + * boilerplate logic like imports, parquet reads, time measurement logic, etc. Each test runs two + * benchmarks; one reading from a static parquet, and the other exercising ticking tables through the + * AutotuningIncrementalReleaseFilter. This is different from the StandardTestRunner in that + * it runs more than one operation per benchmark and attempts to cover the majority of the query code base with fewer + * benchmarks. It is meant for training AOT and for "representative" benchmarks used to compare things like JDK/Python + * versions and GC types. + */ +final public class TrainTestRunner { + static final int maxRowFactor = 620; + static final float incCycleFactor = 1.0f; + static final double incReleaseFactor = 1.0f; + final Object testInst; + final List setupQueries = new ArrayList<>(); + final List teardownQueries = new ArrayList<>(); + private double staticRowFactor = 1; + private double incRowFactor = 1; + private long incReleaseRowCount = 0; + private String[] tableNames = null; + + TrainTestRunner(Object testInst) { + this.testInst = testInst; + } + + public void tables(double staticRowFactor, double incRowFactor, String... names) { + if (Math.max(staticRowFactor, incRowFactor) > maxRowFactor) + throw new IllegalArgumentException("Row factors cannot be greater than " + maxRowFactor); + this.staticRowFactor = 0; // staticRowFactor; + this.incRowFactor = incRowFactor; + tableNames = names; + } + + public void addSetupQuery(String query) { + setupQueries.add(query); + } + + public void setIncReleaseRowCount(long incReleaseRowCount) { + this.incReleaseRowCount = incReleaseRowCount; + } + + public void test(String name, long maxExpectedRowCount, String operation, String... loadColumns) { + if (staticRowFactor <= 0 && incRowFactor <= 0) + throw new IllegalStateException("At least one of staticRowFactor or incRowFactor must be > 0"); + + setupQueries.add(startJfrQuery); + teardownQueries.add(stopJfrQuery); + +// if (staticRowFactor > 0) +// test(name, maxExpectedRowCount, operation, staticRowFactor, true, loadColumns); + + if (incRowFactor > 0) { + setupQueries.add(startUgpQuery); + teardownQueries.add(stopUgpQuery); + operation += "\ntrain_ugp_listener = listen(result, train_ugp_update)"; + test(name, maxExpectedRowCount, operation, incRowFactor, false, loadColumns); + } + } + + void test(String name, long maxExpectedRowCount, String operation, double rowFactor, boolean isStatic, + String... loadColumns) { + var delegate = new StandardTestRunner(testInst); + var baseRowCount = delegate.getGeneratedRowCount(); + delegate.useCachedSource(false); + delegate.useLocalParquet(true); + delegate.setRowFactor(maxRowFactor); + delegate.tables(tableNames); + delegate.setScaleFactors(isStatic ? 1 : 0, isStatic ? 0 : 1); + delegate.setIncReleaseFilter(incCycleFactor, (long) (incReleaseRowCount * incReleaseFactor)); + + var headQuery = """ + ${mainTable} = ${mainTable}.head(${trainRowCount}) + loaded_tbl_size = ${mainTable}.size + """.replace("${trainRowCount}", String.valueOf((long) (baseRowCount * rowFactor * incReleaseFactor))); + + delegate.addSetupQuery(headQuery); + setupQueries.forEach(delegate::addSetupQuery); + teardownQueries.forEach(delegate::addTeardownQuery); + delegate.test(name, maxExpectedRowCount, operation, loadColumns); + } + + static final String startJfrQuery = """ + import jpy + Recording = jpy.get_type("jdk.jfr.Recording") + rec = Recording() + rec.setName("benchmark") + + enabled_events=['jdk.ExecutionSample','jdk.NativeMethodSample','jdk.ThreadCPULoad','jdk.GarbageCollection', + 'jdk.GCPhasePause','jdk.SafepointBegin','jdk.SafepointEnd','jdk.SafepointState', + 'jdk.ObjectAllocationInNewTLAB','jdk.ObjectAllocationOutsideTLAB'] + for n in enabled_events: + try: + rec.enable(n) + except Exception: + print(f"Event Not Enabled: {n}") + + disabled_events=['jdk.GCPhaseConcurrent','jdk.GCPhaseConcurrentMark','jdk.GCPhaseConcurrentEvacuation', + 'jdk.G1GarbageCollection','jdk.ShenandoahGarbageCollection','jdk.ZGarbageCollection','jdk.GCHeapSummary', + 'jdk.GCReferenceStatistics','jdk.GCWorkerData','jdk.GCCPUTime','jdk.GCPhasePause'] + for n in disabled_events: + try: + rec.disable(n) + except Exception: + print(f"Event Not Disabled: {n}") + + rec.start() + """; + + static final String stopJfrQuery = """ + Paths = jpy.get_type("java.nio.file.Paths") + RecordingFile = jpy.get_type("jdk.jfr.consumer.RecordingFile") + + rec.dump(Paths.get("/data/benchmark.jfr")) + rec.stop() + rec.close() + + events = RecordingFile.readAllEvents(Paths.get("/data/benchmark.jfr")) + jfr_rows = [] + + def getEventValue(ev, field): + try: + return ev.getValue(field) + except Exception: + return None + + def getNanoValue(ev, duration_field): + val = ev.getValue(duration_field) + if val is None or str(val) == "null": return 0 + if isinstance(val, int): return val + if hasattr(val, "size") and hasattr(val, "get"): + total = 0 + for i in range(val.size()): + d = val.get(i) + if d is not None and str(d) != "null": total += d.toNanos() + return total + if hasattr(val, "toNanos"): return val.toNanos() + raise TypeError(f"Unsupported JFR value type: {type(val)}") + + for i in range(events.size()): + e = events.get(i) + etype = e.getEventType().getName() + start = e.getStartTime().getEpochSecond() * 1000000000 + e.getStartTime().getNano() + + if etype == 'jdk.GarbageCollection': + duration = getNanoValue(e, 'duration') + name = 'sumOfPauses' + value = getNanoValue(e, 'sumOfPauses') + elif etype == 'jdk.GCPhasePause': + duration = getNanoValue(e, 'duration') + name = getEventValue(e, 'name') + value = duration + else: + continue + + jfr_rows.append([etype, start, duration, name, value]) + + if len(jfr_rows) > 0: + jfr_gc = new_table([ + string_col("origin", ["deephaven-engine" for r in jfr_rows]), + string_col("type", [r[0] for r in jfr_rows]), + long_col("start_ns", [r[1] for r in jfr_rows]), + long_col("duration_ns", [r[2] for r in jfr_rows]), + string_col("name", [r[3] for r in jfr_rows]), + double_col("value", [r[4] for r in jfr_rows]), + ]) + standard_events = merge([standard_events, jfr_gc]) + """; + + static final String startUgpQuery = """ + from deephaven import time_table, perfmon + from deephaven.table_listener import listen + import time + + ss_log = perfmon.server_state_log() + if 'train_ugp_listener' in globals(): train_ugp_listener.stop() + train_wall_epoch_ns = time.time_ns() + train_ugp_times = [(time.perf_counter_ns(), 0, 0)] + + def train_ugp_update(update, is_replay): + ugp_cycle_cost = 0 + if autotune: + ug = update.table.update_graph.j_update_graph + ugp_cycle_cost = System.nanoTime() - ug.cycleStartNanoTime() + train_ugp_times.append((time.perf_counter_ns(), ${mainTable}.size, ugp_cycle_cost)) + """; + + static final String stopUgpQuery = """ + if 'train_ugp_listener' in globals(): train_ugp_listener.stop() + if len(train_ugp_times) > 1: + mono_start = train_ugp_times[0][0] + ugp_rows = [] + for i in range(1, len(train_ugp_times)): + mono_prev = train_ugp_times[i - 1][0] + mono_curr = train_ugp_times[i][0] + size_prev = train_ugp_times[i - 1][1] + size_curr = train_ugp_times[i][1] + ugp_cycle_cost = train_ugp_times[i][2] + delta_ns = mono_curr - mono_prev + wall_clock_ns = train_wall_epoch_ns + (mono_curr - mono_start) + delta_rows = max(0, size_curr - size_prev) + ugp_cycle_cost = max(0, ugp_cycle_cost) + ugp_rows.append([wall_clock_ns, delta_ns, delta_rows, ugp_cycle_cost]) + + ugp_events = new_table([ + string_col("origin", ["deephaven-engine"] * len(ugp_rows)), + string_col("type", ["ugp.delta"] * len(ugp_rows)), + long_col("start_ns", [r[0] for r in ugp_rows]), + long_col("duration_ns", [r[1] for r in ugp_rows]), + string_col("name", ["duration_rows"] * len(ugp_rows)), + double_col("value", [float(r[2]) for r in ugp_rows]), + ]) + + ugp_cycle_events = new_table([ + string_col("origin", ["deephaven-engine"] * len(ugp_rows)), + string_col("type", ["ugp.cycle.cost"] * len(ugp_rows)), + long_col("start_ns", [r[0] for r in ugp_rows]), + long_col("duration_ns", [r[3] for r in ugp_rows]), + string_col("name", ["duration_rows"] * len(ugp_rows)), + double_col("value", [float(r[2]) for r in ugp_rows]), + ]) + + standard_events = merge([standard_events, ugp_events, ugp_cycle_events]) + + ss_log = perfmon.server_state_log().ungroup(["IntervalUGPCyclesTimeMicros"]).snapshot() + if ss_log.size > 0: + ss_rows = [] + for row in ss_log.iter_dict(): + start = row['IntervalStartTime'].getEpochSecond() * 1000000000 + row['IntervalStartTime'].getNano() + ss_rows.append((start, row['IntervalCollectionTimeMicros'] * 1000, + row['IntervalUGPCyclesTimeMicros'] * 1000)) + + ss_events = new_table([ + string_col("origin", ["deephaven-engine"] * len(ss_rows)), + string_col("type", ["server_state_log"] * len(ss_rows)), + long_col("start_ns", [r[0] for r in ss_rows]), + long_col("duration_ns", [r[1] for r in ss_rows]), + string_col("name", ["ugp.cycle.time"] * len(ss_rows)), + double_col("value", [r[2] for r in ss_rows]), + ]) + standard_events = merge([standard_events, ss_events]) + """; +} diff --git a/src/it/java/io/deephaven/benchmark/tests/train/UpdateByTrainTest.java b/src/it/java/io/deephaven/benchmark/tests/train/UpdateByTrainTest.java new file mode 100644 index 00000000..140f10f6 --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/train/UpdateByTrainTest.java @@ -0,0 +1,54 @@ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.tests.train; + +import org.junit.jupiter.api.*; + +/** + * Standard tests for the updateBy table operation. Combines a mixture of rolling operations and cumulative operations + */ +public class UpdateByTrainTest { + final TrainTestRunner runner = new TrainTestRunner(this); + final String noGroups = """ + avg_contains = rolling_avg_time(ts_col='timestamp',cols=['A=num1','B=num2'],rev_time='PT5S',fwd_time='PT5S') + max_before = rolling_max_tick(cols=['C=num1','D=num2'], rev_ticks=3000,fwd_ticks=-1000) + prod_after = rolling_prod_time(ts_col='timestamp',cols=['E=num1','F=num2'],rev_time='-PT1S',fwd_time='PT4S') + """; + final String group10K = """ + avg_contains = rolling_avg_time(ts_col='timestamp',cols=['A=num1','B=num2'],rev_time='PT4M',fwd_time='PT5M') + max_before = rolling_max_tick(cols=['C=num1','D=num2'], rev_ticks=30,fwd_ticks=-10) + prod_after = rolling_prod_time(ts_col='timestamp',cols=['E=num1','F=num2'],rev_time='-PT1M',fwd_time='PT4M') + """; + + void setup(double staticRowFactor, double incRowFactor) { + runner.tables(staticRowFactor, incRowFactor, "timed"); + var setup = """ + from deephaven.updateby import rolling_avg_time, rolling_max_tick, rolling_prod_time + from deephaven.updateby import ema_tick, cum_min, cum_sum + + ema_tick_op = ema_tick(decay_ticks=10000,cols=['G=num1','H=num2']) + min_op = cum_min(cols=['I=num1','J=num2']) + sum_op = cum_sum(cols=['K=num1','L=num2']) + """; + runner.addSetupQuery(setup); + } + + @Test + @Disabled + void mixedComboNoGroups() { + setup(21.8, 17); + runner.addSetupQuery(noGroups); + var q = "timed.update_by(ops=[avg_contains, max_before, prod_after, ema_tick_op, min_op, sum_op])"; + runner.test("UpdateBy- No Groups 12 Cols", 0, q, "num1", "num2", "timestamp"); + } + + @Test + void rollingCombo2Groups() { + setup(9, 0.2); + runner.addSetupQuery(group10K); + var q = """ + timed.update_by(ops=[avg_contains,max_before,prod_after,ema_tick_op,min_op,sum_op], by=['key1','key2']) + """; + runner.test("UpdateBy- 2 Groups 10K Unique Combos", 0, q, "key1", "key2", "num1", "num2", "timestamp"); + } + +} diff --git a/src/main/java/io/deephaven/benchmark/api/Bench.java b/src/main/java/io/deephaven/benchmark/api/Bench.java index c4330dd9..1d3e5ab6 100644 --- a/src/main/java/io/deephaven/benchmark/api/Bench.java +++ b/src/main/java/io/deephaven/benchmark/api/Bench.java @@ -1,8 +1,7 @@ -/* Copyright (c) 2022-2025 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.api; import java.io.Closeable; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; @@ -11,9 +10,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import io.deephaven.benchmark.metric.Metrics; -import io.deephaven.benchmark.util.Filer; -import io.deephaven.benchmark.util.Ids; -import io.deephaven.benchmark.util.Timer; +import io.deephaven.benchmark.util.*; /** * The root accessor class for the API. Use Bench.create(this) in a typical JUnit test to start things off @@ -27,6 +24,7 @@ final public class Bench { * The root benchmark result directory */ static final public Path rootOutputDir = Paths.get("results"); + /** * The name of the benchmark results csv file */ @@ -35,6 +33,10 @@ final public class Bench { * The name of the benchmark metrics csv file */ static final public String metricsFileName = "benchmark-metrics.csv"; + /** + * The name of the benchmark events csv file + */ + static final public String eventsFileName = "benchmark-events.csv"; /** * The name of the benchmark platform csv file */ @@ -61,9 +63,9 @@ static public Bench create(Object testInst) { return v; } - final Object testInst; final BenchResult result; final BenchMetrics metrics; + final BenchEvents events; final BenchPlatform platform; final QueryLog queryLog; final BenchLog runLog; @@ -72,13 +74,13 @@ static public Bench create(Object testInst) { final Session session = new Session(); private boolean isClosed = false; - Bench(Class testInst) { - this.testInst = testInst; + Bench(Class testClass) { this.result = new BenchResult(outputDir); this.metrics = new BenchMetrics(outputDir); + this.events = new BenchEvents(outputDir); this.platform = new BenchPlatform(this, outputDir); - this.queryLog = new QueryLog(outputDir, testInst); - this.runLog = new BenchLog(outputDir, testInst); + this.queryLog = new QueryLog(getLogDir(testClass), testClass); + this.runLog = new BenchLog(getLogDir(testClass), testClass); } /** @@ -91,6 +93,7 @@ public void setName(String name) { throw new RuntimeException("No blank Benchmark names allowed"); this.result.setName(name); this.metrics.setName(name); + this.events.setName(name); this.queryLog.setName(name); this.runLog.setName(name); } @@ -199,6 +202,15 @@ public BenchMetrics metrics() { return metrics; } + /** + * Get the events for this Benchmark instance (e.g. test) used for collecting event values + * + * @return the events instance + */ + public BenchEvents events() { + return events; + } + /** * Get the platform for this Benchmark instance (e.g. test) used for collecting platform properties * @@ -209,9 +221,9 @@ public BenchPlatform platform() { } /** - * Get the metrics for this Benchmark instance (e.g. test) used for collecting metric values + * Get the query log for this Benchmark instance (e.g. test) used for recording queries * - * @return the metrics instance + * @return the query log instance */ public BenchLog log() { return runLog; @@ -243,6 +255,7 @@ public void close() { closeables.clear(); result.commit(); metrics.commit(); + events.commit(); platform.commit(); runLog.close(); queryLog.close(); @@ -270,6 +283,12 @@ > T addFuture(T future) { return future; } + static private Path getLogDir(Class testClass) { + var pkgRoot = profile.property("root.test.package", Bench.class.getPackageName().replaceAll("[.][^.]+$", "")); + var name = testClass.getPackageName().replaceAll(pkgRoot + '.', "") + '.' + testClass.getSimpleName(); + return Filer.createDirectory(outputDir.resolve("test-logs").resolve(name).toString()); + } + static private Path initializeOutputDirectory() { setSystemProperties(); boolean isTimestamped = profile.propertyAsBoolean("timestamp.test.results", "false"); @@ -277,11 +296,7 @@ static private Path initializeOutputDirectory() { if (isTimestamped) dir = dir.resolve(Ids.runId()); Filer.delete(dir); - try { - return Files.createDirectories(dir); - } catch (Exception ex) { - throw new RuntimeException("Failed initialize benchmark result directory: " + dir, ex); - } + return Filer.createDirectory(dir.toString()); } static private void setSystemProperties() { diff --git a/src/main/java/io/deephaven/benchmark/api/BenchEvents.java b/src/main/java/io/deephaven/benchmark/api/BenchEvents.java new file mode 100644 index 00000000..1ecef26d --- /dev/null +++ b/src/main/java/io/deephaven/benchmark/api/BenchEvents.java @@ -0,0 +1,85 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.api; + +import java.io.BufferedWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.*; +import io.deephaven.benchmark.connect.ResultTable; + +/** + * Represents the events gathered during usage of the Bench API. These can include events gather by the API or the user. + */ +final public class BenchEvents { + static final String header = "benchmark_name,origin,type,start,duration,name,value"; + final List events = new ArrayList<>(); + final Path file; + private String name = null; + + BenchEvents(Path parent) { + this(parent, Bench.eventsFileName); + } + + BenchEvents(Path parent, String resultFileName) { + this.file = parent.resolve(resultFileName); + } + + /** + * Add the results from a table as events to persist to the file system. This table must have columns defined as + * origin, type, start_ns, duration_ns, detail + * + * @param table a table containing events + * @return this instance + */ + public BenchEvents add(ResultTable table) { + for (int r = 0, rn = table.getRowCount(); r < rn; r++) { + var origin = table.getValue(r, "origin").toString(); + var type = table.getValue(r, "type").toString(); + var startNanos = table.getNumber(r, "start_ns").longValue(); + var durationNanos = table.getNumber(r, "duration_ns").longValue(); + var name = String.valueOf(table.getValue(r, "name")); + var value = table.getNumber(r, "value").doubleValue(); + var event = new Event(origin, type, startNanos, durationNanos, name, value); + events.add(event); + } + return this; + } + + /** + * Save the collected events to a csv file. + */ + public void commit() { + if (!hasHeader()) + writeLine(header, file); + + for (Event event : events) { + var line = name + ',' + event.toCsv(); + writeLine(line, file); + } + } + + void setName(String name) { + this.name = name; + } + + private boolean hasHeader() { + return Files.exists(file); + } + + static void writeLine(String line, Path file) { + try (BufferedWriter out = Files.newBufferedWriter(file, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { + out.write(line); + out.newLine(); + } catch (Exception ex) { + throw new RuntimeException("Failed to write result to file: " + file, ex); + } + } + + record Event(String origin, String type, long startNanos, long durationNanos, String name, double value) { + String toCsv() { + return origin + "," + type + "," + startNanos + "," + durationNanos + "," + name + "," + value; + } + } + +} diff --git a/src/main/java/io/deephaven/benchmark/api/BenchLog.java b/src/main/java/io/deephaven/benchmark/api/BenchLog.java index 3f4de79b..68416595 100644 --- a/src/main/java/io/deephaven/benchmark/api/BenchLog.java +++ b/src/main/java/io/deephaven/benchmark/api/BenchLog.java @@ -26,7 +26,7 @@ final public class BenchLog { BenchLog(Path parent, Class testClass) { this.testClass = testClass; this.parent = parent; - this.logFile = getLogFile(parent, testClass); + this.logFile = parent.resolve("engine.log"); } /** diff --git a/src/main/java/io/deephaven/benchmark/api/BenchTable.java b/src/main/java/io/deephaven/benchmark/api/BenchTable.java index b00c00e3..50198c42 100644 --- a/src/main/java/io/deephaven/benchmark/api/BenchTable.java +++ b/src/main/java/io/deephaven/benchmark/api/BenchTable.java @@ -1,17 +1,17 @@ -/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.api; import java.io.Closeable; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import io.deephaven.benchmark.generator.*; import io.deephaven.benchmark.metric.Metrics; -import io.deephaven.benchmark.util.Ids; -import io.deephaven.benchmark.util.Log; -import io.deephaven.benchmark.util.Numbers; -import io.deephaven.benchmark.util.Timer; +import io.deephaven.benchmark.util.*; /** * Represents the configuration of table name and columns. @@ -24,7 +24,7 @@ final public class BenchTable implements Closeable { private int durationSecs = -1; private int rowPauseMillis = -1; private String compression = null; - private Generator generator = null; + private List generators = new ArrayList<>(); private boolean isFixed = false; private String defaultDistro = null; private String[] columnGrouping = null; @@ -188,7 +188,7 @@ public boolean generateParquet() { }).execute(); if (usedExistingParquet.get()) { - Log.info("Using existing table '%s' with %s rows", tableName, getRowCount()); + Log.info("\nUsing existing table '%s' with %s rows", tableName, getRowCount()); return false; } Log.info("Generating table '%s' with %s rows", tableName, getRowCount()); @@ -197,14 +197,71 @@ public boolean generateParquet() { if (rowPauseMillis < 0) withRowPause(0, ChronoUnit.MILLIS); - bench.awaitCompletion(generateWithAvro()); - Log.info("Produce Data Duration: " + timer.duration().toMillis()); + var m = bench.awaitCompletion(generateWithAvro()); + Log.info("Produce Send Rate: %.2f recs/sec", m.getValue("send.rate")); + Log.info("Produce Data Duration: %d secs", timer.duration().toSeconds()); timer = Timer.start(); q = replaceTableAndGeneratorFields(kafkaToParquetQuery); bench.query(q).execute(); - Log.info("DH Write Table Duration: " + timer.duration().toMillis()); + Log.info("DH Write Table Duration: %d secs", timer.duration().toSeconds()); + return true; + } + + /** + * Generate the table synchronously to a parquet file in the engine's data directory. If a parquet file already + * exists in the Deephaven data directory that matches this table definition, use it and skip generation. + *

+ * Note: This is the same as generateParquet() except it generates the parquet file directly to the + * engine's data directory without going through kafka. As such, it will not work when the test runner and the + * engine are not co-located. + * + * @return true if file was generated, otherwise false + */ + public boolean generateLocalParquet() { + columns.setDefaultDistribution(getDefaultDistro()); + var q = replaceTableAndGeneratorFields(useExistingParquetQuery); + + var usedExistingParquet = new AtomicBoolean(false); + var tableGenParquet = new AtomicReference(""); + var dhHostOsDir = new AtomicReference(""); + bench.query(q).fetchAfter("used_existing_parquet_" + tableName, table -> { + usedExistingParquet.set(table.getValue(0, "UsedExistingParquet").toString().equalsIgnoreCase("true")); + tableGenParquet.set(table.getValue(0, "TableGenParquet").toString()); + dhHostOsDir.set(table.getValue(0, "DhHostOsDir").toString()); + }).execute(); + + if (usedExistingParquet.get()) { + Log.info("Using existing table '%s' with %s rows", tableName, getRowCount()); + return false; + } + Log.info("Generating table '%s' with %s rows", tableName, getRowCount()); + var timer = Timer.start(); + + if (rowPauseMillis < 0) + withRowPause(0, ChronoUnit.MILLIS); + + if (dhHostOsDir.get().isEmpty()) + throw new RuntimeException("DEEPHAVEN_HOST_OS_DIR env must be set to use local parquet generation"); + + var parquetPath = (dhHostOsDir.get() + "/" + tableGenParquet.get()).replace(".parquet", ".dataset"); + var threadCount = 8; + var rowsPerThread = getRowCount() / threadCount; + var futures = new ArrayList>(threadCount); + for (int i = 0; i < threadCount; i++) { + long rows = (i < threadCount - 1) ? rowsPerThread : (getRowCount() - (rowsPerThread * i)); + var future = generateWithLocalParquet(parquetPath, String.format("%04d.parquet", i), i, rows); + futures.add(future); + } + futures.stream().forEach(future -> bench.awaitCompletion(future)); + close(); // Needed for the final parquet flushes + + bench.query(localToParquetQuery).execute(); + var durMillis = timer.duration().toMillis(); + Log.info("Produce Send Rate: %.2f recs/sec", getRowCount() / (durMillis / 1000.0)); + Log.info("Produce Data Duration: %.2f secs", durMillis / 1000.0); + Log.info("Produce Write Rate: %.2f MB/sec", Filer.getByteSize(parquetPath) * 1000.0 / durMillis / 1024 / 1024); return true; } @@ -212,29 +269,41 @@ public boolean generateParquet() { * Shutdown and cleanup any running generator */ public void close() { - if (generator != null) + for (Generator generator : generators) generator.close(); + generators.clear(); } private Future generateWithAvro() { String bootstrapServer = bench.property("client.redpanda.addr", "localhost:9092"); String schemaRegistry = "http://" + bench.property("client.schema.registry.addr", "localhost:8081"); - generator = new AvroKafkaGenerator(bootstrapServer, schemaRegistry, tableName, columns, getCompression()); - return generator.produce(getRowPause(), getRowCount(), getRunDuration()); + var gen = new AvroKafkaGenerator(bootstrapServer, schemaRegistry, tableName, columns, getCompression()); + generators.add(gen); + return gen.produce(getRowPause(), getRowCount(), getRunDuration()); } private Future generateWithJson() { String bootstrapServer = bench.property("client.redpanda.addr", "localhost:9092"); String schemaRegistry = "http://" + bench.property("client.schema.registry.addr", "localhost:8081"); - generator = new JsonKafkaGenerator(bootstrapServer, schemaRegistry, tableName, columns, getCompression()); - return generator.produce(getRowPause(), getRowCount(), getRunDuration()); + var gen = new JsonKafkaGenerator(bootstrapServer, schemaRegistry, tableName, columns, getCompression()); + generators.add(gen); + return gen.produce(getRowPause(), getRowCount(), getRunDuration()); } private Future generateWithProtobuf() { String bootstrapServer = bench.property("client.redpanda.addr", "localhost:9092"); String schemaRegistry = "http://" + bench.property("client.schema.registry.addr", "localhost:8081"); - generator = new ProtobufKafkaGenerator(bootstrapServer, schemaRegistry, tableName, columns, getCompression()); - return generator.produce(getRowPause(), getRowCount(), getRunDuration()); + var gen = new ProtobufKafkaGenerator(bootstrapServer, schemaRegistry, tableName, columns, getCompression()); + generators.add(gen); + return gen.produce(getRowPause(), getRowCount(), getRunDuration()); + } + + private Future generateWithLocalParquet(String parquetPath, String parquetPart, long startRow, + long rowCount) { + var parquetFile = Filer.createFile(parquetPath, parquetPart).toString(); + var gen = new LocalParquetGenerator(parquetFile, tableName, columns.copy(), startRow); + generators.add(gen); + return gen.produce(getRowPause(), rowCount, getRunDuration()); } private int getRowPause() { @@ -342,11 +411,18 @@ with open(path) as f: usedExisting = False matching_gen_parquet = findMatchingGenParquet(table_gen_def_text) - if matching_gen_parquet is not None and os.path.exists(str(matching_gen_parquet) + '.gen.parquet'): - os.link(str(matching_gen_parquet) + '.gen.parquet', table_parquet) + if matching_gen_parquet and os.path.exists(f"{matching_gen_parquet}.gen.parquet"): + bench_api_link(str(matching_gen_parquet) + '.gen.parquet', table_parquet) + usedExisting = True + if matching_gen_parquet and os.path.exists(f"{matching_gen_parquet}.gen.dataset"): + bench_api_link(str(matching_gen_parquet) + '.gen.dataset', table_parquet) usedExisting = True - used_existing_parquet_${table.name} = new_table([string_col("UsedExistingParquet", [str(usedExisting)])]) + used_existing_parquet_${table.name} = new_table([ + string_col("UsedExistingParquet", [str(usedExisting)]), + string_col("TableGenParquet", [table_gen_parquet]), + string_col("DhHostOsDir", [os.getenv("DEEPHAVEN_HOST_OS_DIR","")]) + ]) """; static final String kafkaToParquetQuery = """ @@ -379,7 +455,7 @@ with open(table_gen_def_file, 'w') as f: column_grouping=${column.grouping} if column_grouping: ${table.name} = ${table.name}.sort([${column.grouping}]) write(${table.name}, table_gen_parquet ${compression.codec} ${max.dict.keys} ${max.dict.bytes} ${target.page.bytes}) - os.link(table_gen_parquet, table_parquet) + bench_api_link(table_gen_parquet, table_parquet) del ${table.name} @@ -387,4 +463,20 @@ with open(table_gen_def_file, 'w') as f: garbage_collect() """; + static final String localToParquetQuery = """ + # Link an already created parquet dataset directory + import jpy, os + + if os.path.exists(table_parquet): + os.remove(table_parquet) + + with open(table_gen_def_file, 'w') as f: + f.write(table_gen_def_text) + + bench_api_link(table_gen_parquet.replace(".parquet", ".dataset"), table_parquet) + + from deephaven import garbage_collect + garbage_collect() + """; + } diff --git a/src/main/java/io/deephaven/benchmark/api/QueryLog.java b/src/main/java/io/deephaven/benchmark/api/QueryLog.java index b14b615e..bf307893 100644 --- a/src/main/java/io/deephaven/benchmark/api/QueryLog.java +++ b/src/main/java/io/deephaven/benchmark/api/QueryLog.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.api; import static java.nio.file.StandardOpenOption.*; @@ -30,7 +30,7 @@ class QueryLog implements Closeable { QueryLog(Path parent, Class testClass) { this.testClass = testClass; this.parent = parent; - this.logFile = getLogFile(parent, testClass); + this.logFile = parent.resolve("query.md"); } /** @@ -51,9 +51,9 @@ public void close() { write("## " + label + " - " + name, 2); for (int i = 0, n = queries.size(); i < n; i++) { write("### Query " + (i + 1), 1); - write("````", 1); + write("```", 1); write(queries.get(i), 0); - write("````", 2); + write("```", 2); } } @@ -93,14 +93,4 @@ private void write(String text, int newLineCount) { } } - static Path getLogFile(Path parent, Class testClass) { - Path logFile = parent.resolve("test-logs/" + testClass.getName() + ".query.md"); - try { - Files.createDirectories(logFile.getParent()); - return logFile; - } catch (Exception ex) { - throw new RuntimeException("Failed to create query log directory" + logFile.getParent(), ex); - } - } - } diff --git a/src/main/java/io/deephaven/benchmark/api/Snippets.java b/src/main/java/io/deephaven/benchmark/api/Snippets.java index 07ad6f5e..0008f0d3 100644 --- a/src/main/java/io/deephaven/benchmark/api/Snippets.java +++ b/src/main/java/io/deephaven/benchmark/api/Snippets.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.api; /** @@ -75,14 +75,26 @@ with exclusive_lock(table): /** * Initialize the container for storing benchmark metrics. Define functions for getting some MX Bean data for gc, - * jit and heap + * jit and heap. *

* ex. bench_api_metrics_init() */ static String bench_api_metrics_init = """ + import jpy + from deephaven import new_table + from deephaven.column import string_col, long_col, double_col + System = jpy.get_type('java.lang.System') def bench_api_metrics_init(): - global bench_api_metrics + global bench_api_metrics, standard_events bench_api_metrics = [] + standard_events = new_table([ + string_col("origin", []), + string_col("type", []), + long_col("start_ns", []), + long_col("duration_ns", []), + string_col("name", []), + double_col("value", []), + ]) """; /** @@ -175,7 +187,8 @@ def bench_api_metrics_add(category, name, value, note=''): * ex. bench_api_metrics_table = bench_api_metrics_collect() */ static String bench_api_metrics_collect = """ - from deephaven import input_table, empty_table, dtypes as dht + from deephaven import input_table, empty_table, new_table, dtypes as dht + from deephaven.column import string_col, long_col, double_col def bench_api_metrics_collect(): s = dht.string t = input_table({'timestamp':s,'origin':s,'category':s,'name':s,'value':s,'note':s}) @@ -186,6 +199,47 @@ def bench_api_metrics_collect(): return t """; + /** + * Make a file containing a one line reference to another file. Note: This is to get around the fact that + * Deephaven's parquet can't read from symbolic links that are directories. + *

+ * ex. bench_api_link('my_parquet_dir_or_file', 'my_link_name') + * + * @param target the table to link + * @param link_name the name to link the table to for retrieval + */ + static String bench_api_link = """ + import os, glob + def bench_api_link(target, link_name): + for f in glob.glob(link_name + '*'): + os.remove(f) + if target.endswith('.dataset'): + with open(link_name + '.link', 'w') as f: + f.write(target) + else: + os.link(target, link_name) + """; + + /** + * Read a parquet file or dataset into a Deephaven table. If the filename is a link (e.g. ".link") grab the file + * reference within it. + *

+ * ex. source = bench_api_read('/data/timed.parquet') + * + * @param file_name the name of the file containing the link reference + * @return a table containing the contents of the linked parquet file or dataset + */ + static String bench_api_read = """ + import os + from deephaven.parquet import read + def bench_api_read(file_name): + link_path = file_name + '.link' + if os.path.exists(link_path): + with open(link_path, 'r') as f: + file_name = f.read().strip() + return read(file_name) + """; + /** * Returns a query containing the api functions called by the query * @@ -206,6 +260,8 @@ static String getFunctions(String query) { defs += getFunc("bench_api_metrics_add", bench_api_metrics_add, query, defs); defs += getFunc("bench_api_metrics_collect", bench_api_metrics_collect, query, defs); defs += getFunc("bench_api_await_column_value_limit", bench_api_await_column_value_limit, query, defs); + defs += getFunc("bench_api_link", bench_api_link, query, defs); + defs += getFunc("bench_api_read", bench_api_read, query, defs); return defs; } diff --git a/src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java b/src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java index 632811ca..85cc3c3f 100644 --- a/src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java +++ b/src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java @@ -35,7 +35,8 @@ class BarrageConnector implements Connector { static { System.setProperty("thread.initialization", ""); // Remove server side initializers (e.g. DebuggingInitializer) } - static final int maxFetchCount = 1000; + static final int maxFetchCount = 100000; + static final int inboundMessageMB = 64; final private BarrageSession session; final private ConsoleSession console; final private ManagedChannel channel; @@ -243,6 +244,9 @@ private ManagedChannel getManagedChannel(String host, int port) { final ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress(host, port); channelBuilder.usePlaintext(); // channelBuilder.useTransportSecurity(); If eventually security is needed + // Increase the maximum inbound message size so large Barrage snapshots (e.g. standard_events) + // do not trip the default 4 MiB gRPC limit while prototyping benchmarks. + channelBuilder.maxInboundMessageSize(inboundMessageMB * 1024 * 1024); // 32 MiB return channelBuilder.build(); } diff --git a/src/main/java/io/deephaven/benchmark/generator/AvroKafkaGenerator.java b/src/main/java/io/deephaven/benchmark/generator/AvroKafkaGenerator.java index db662a88..e2d741e4 100644 --- a/src/main/java/io/deephaven/benchmark/generator/AvroKafkaGenerator.java +++ b/src/main/java/io/deephaven/benchmark/generator/AvroKafkaGenerator.java @@ -128,11 +128,13 @@ private Producer createProducer(String bootstrapServer, S props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put("schema.registry.url", schemaRegistryUrl); - props.put(ACKS_CONFIG, "0"); + props.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); + props.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); + props.put(ACKS_CONFIG, "all"); props.put(COMPRESSION_TYPE_CONFIG, getCompression(compression)); - props.put(BATCH_SIZE_CONFIG, 16384); - props.put(BUFFER_MEMORY_CONFIG, 16384 * 4); - props.put(LINGER_MS_CONFIG, 50); + props.put(BATCH_SIZE_CONFIG, "512000"); + props.put(BUFFER_MEMORY_CONFIG, "67108864"); + props.put(LINGER_MS_CONFIG, 10); return new KafkaProducer<>(props); } diff --git a/src/main/java/io/deephaven/benchmark/generator/ColumnDefs.java b/src/main/java/io/deephaven/benchmark/generator/ColumnDefs.java index a2211eb4..8d57292f 100644 --- a/src/main/java/io/deephaven/benchmark/generator/ColumnDefs.java +++ b/src/main/java/io/deephaven/benchmark/generator/ColumnDefs.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.generator; import java.util.*; @@ -12,16 +12,16 @@ * Note: All possible data values are loaded up front to prevent object-creation during production. This can take a * considerable amount of memory for larger scales, especially for generated strings. */ -public class ColumnDefs { +final public class ColumnDefs { final int valueCacheSize; final List columns = new ArrayList<>(); private String defaultDistribution = "random"; /** - * Initialize the instance with a default cache size of 1024 + * Initialize the instance with a default cache size large enough to cover typical column value ranges */ public ColumnDefs() { - this(1024); + this(2_000_000); } ColumnDefs(int valueCacheSize) { @@ -105,6 +105,21 @@ public ColumnDefs add(String name, String type, String valueDef) { return add(name, type, valueDef, null); } + /** + * Create an independent copy of this column definitions instance. Each copy has its own Maker objects, value + * caches, and distribution functions, making it safe to use from a separate thread without contention. + * + * @return a new independent ColumnDefs with the same column definitions + */ + public ColumnDefs copy() { + var c = new ColumnDefs(valueCacheSize); + c.defaultDistribution = defaultDistribution; + for (ColumnDef col : columns) { + c.add(col.name(), col.type(), col.valueDef(), col.maker().distributionName); + } + return c; + } + /** * Get the next value for the column in the given index according to the columns defined distribution. * diff --git a/src/main/java/io/deephaven/benchmark/generator/LocalParquetGenerator.java b/src/main/java/io/deephaven/benchmark/generator/LocalParquetGenerator.java new file mode 100644 index 00000000..692e6fd8 --- /dev/null +++ b/src/main/java/io/deephaven/benchmark/generator/LocalParquetGenerator.java @@ -0,0 +1,162 @@ +/* Copyright (c) 2026-2026 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.generator; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import blue.strategic.parquet.*; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import io.deephaven.benchmark.metric.Metrics; +import io.deephaven.benchmark.util.Log; +import io.deephaven.benchmark.util.Threads; + +/** + * Generator that produces rows to a local Parquet file according to the provided column definitions. Note: This + * generator MUST generate the same row and column data in the same order and types as the non-local + * AvroKafkaGenerator when the two generators have the same column definitions. (The "same data" is defined + * by how it looks in Deephaven tables, not byte-for-byte in the files.) + */ +public class LocalParquetGenerator implements Generator { + final private ExecutorService queue = Threads.single("LocalParquetGenerator"); + final private ColumnDefs columnDefs; + final private String topic; + final private long startSeed; + final private MessageType schema; + final private File parquetFile; + final private AtomicBoolean isClosed = new AtomicBoolean(false); + private ParquetWriter writer; + + /** + * Create a local Parquet generator with the provided column definitions and output file. The column definitions + * determine the schema of the Parquet file and the data generated for each column. + * + * @param parquetFile output Parquet file path + * @param topic topic name (used for logging and schema generation) + * @param columnDefs column definitions that determine the schema and generated data + * @param startSeed starting seed for data generation + */ + public LocalParquetGenerator(String parquetFile, String topic, ColumnDefs columnDefs, long startSeed) { + this.topic = topic; + this.columnDefs = columnDefs; + this.startSeed = startSeed; + this.parquetFile = new File(parquetFile); + this.schema = MessageTypeParser.parseMessageType(getSchemaMessage(topic, columnDefs)); + try { + this.writer = ParquetWriter.writeFile(schema, this.parquetFile, createDehydrator()); + } catch (IOException ex) { + throw new RuntimeException("Failed to create Parquet writer for topic: " + topic, ex); + } + } + + /** + * Produce a maximum number of records asynchronously. + * + * @param perRecordPauseMillis wait time between each record sent + * @param maxRecordCount maximum records to produce + * @param maxDurationSecs maximum duration to produce + */ + public Future produce(int perRecordPauseMillis, long maxRecordCount, int maxDurationSecs) { + checkClosed(); + return queue.submit(() -> { + final long maxDuration = maxDurationSecs * 1000L; + final long beginTime = System.currentTimeMillis(); + final int colCount = columnDefs.getCount(); + + long recCount = startSeed; + long totalWritten = 0; + long duration = 0; + Object[] row = new Object[colCount]; + + while (!isClosed.get() && recCount < maxRecordCount) { + for (int i = 0; i < colCount; i++) { + row[i] = columnDefs.nextValue(i, recCount, maxRecordCount); + } + writer.write(row); + recCount++; + + if (++totalWritten % 10_000_000 == 0) + Log.info("Produced %s records to topic: %s", totalWritten, topic); + + duration = System.currentTimeMillis() - beginTime; + if (duration > maxDuration) + break; + } + + Log.info("Produced %s records to topic: %s", totalWritten, topic); + duration = System.currentTimeMillis() - beginTime; + return new Metrics("test-runner", "generate." + topic) + .set("duration.secs", duration / 1000.0) + .set("record.count", totalWritten) + .set("send.rate", totalWritten / (duration / 1000.0)); + }); + } + + /** + * Close the producer and shutdown any async threads created during production + */ + public void close() { + if (isClosed.get()) + return; + isClosed.set(true); + queue.shutdown(); + try { + writer.close(); + } catch (Exception ex) { + throw new RuntimeException("Failed to close Parquet writer for topic: " + topic, ex); + } + } + + private void checkClosed() { + if (isClosed.get()) + throw new RuntimeException("Generator is closed"); + } + + private Dehydrator createDehydrator() { + final String[] colNames = columnDefs.toTypeMap().keySet().toArray(new String[0]); + return (row, valueWriter) -> { + for (int i = 0; i < colNames.length; i++) { + valueWriter.write(colNames[i], row[i]); + } + }; + } + + private String getSchemaMessage(String topic, ColumnDefs fieldDefs) { + var schema = """ + message ${topic} { + ${fields} + } + """; + var fields = ""; + for (Map.Entry e : fieldDefs.toTypeMap().entrySet()) { + var name = e.getKey(); + var type = e.getValue(); + fields += String.format("required %s %s %s;\n", getFieldType(type), name, getCharEncoding(type)); + } + schema = schema.replace("${topic}", topic); + return schema.replace("${fields}", fields); + } + + private String getFieldType(String type) { + return switch (type) { + case "long" -> "int64"; + case "int" -> "int32"; + case "double" -> "double"; + case "float" -> "float"; + case "string" -> "binary"; + case "timestamp-millis" -> "int64"; + default -> throw new RuntimeException("Unsupported generator data type: " + type); + }; + } + + private String getCharEncoding(String type) { + return switch (type) { + case "string" -> "(UTF8)"; + case "timestamp-millis" -> "(TIMESTAMP(MILLIS,true))"; + default -> ""; + }; + } + +} diff --git a/src/main/java/io/deephaven/benchmark/util/Filer.java b/src/main/java/io/deephaven/benchmark/util/Filer.java index f14a85a9..30010640 100644 --- a/src/main/java/io/deephaven/benchmark/util/Filer.java +++ b/src/main/java/io/deephaven/benchmark/util/Filer.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.util; import static java.nio.file.StandardOpenOption.*; @@ -6,6 +6,8 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermissions; import java.util.Comparator; import java.util.stream.Collectors; @@ -34,6 +36,53 @@ static public void delete(Path path) { } } + /** + * Create a file with the given name in the given parent directory. Create the parent directory if it does not + * exist. Permissions are 755 for directories and 644 for files. + * + * @param parentDir the parent directory to contain the file + * @param fileName the name of the file to create + * @return the path of the created file + */ + static public Path createFile(String parentDir, String fileName) { + try { + return Files.createFile(createDirectory(parentDir).resolve(fileName), + PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--"))); + } catch (Exception ex) { + throw new RuntimeException("Failed to create file: " + fileName, ex); + } + } + + /** + * Create a directory with the given name. Create parent directories if they do not exist. Permissions are 755. + * + * @param dir the directory to create + * @return the path of the created directories + */ + static public Path createDirectory(String dir) { + try { + return Files.createDirectories(Paths.get(dir), PosixFilePermissions.asFileAttribute( + PosixFilePermissions.fromString("rwxr-xr-x"))); + } catch (Exception ex) { + throw new RuntimeException("Failed to create temp directory: " + dir, ex); + } + } + + /** + * Get the size of a file or directory in bytes. Directory sizes are calculated recursively by summing the sizes of + * all regular files contained within. + * + * @param file the file or directory to get the size of + * @return the size of the file or directory in bytes + */ + static public long getByteSize(String path) { + try { + return Files.walk(Paths.get(path)).filter(Files::isRegularFile).mapToLong(f -> f.toFile().length()).sum(); + } catch (Exception ex) { + throw new RuntimeException("Failed to get size of file: " + path, ex); + } + } + /** * Read the text of a file while preserving newlines and getting rid of carriage returns * diff --git a/src/main/resources/io/deephaven/benchmark/run/profile/queries/dashboards/benchmark_functions.dh.py b/src/main/resources/io/deephaven/benchmark/run/profile/queries/dashboards/benchmark_functions.dh.py index be362158..a68c6a54 100644 --- a/src/main/resources/io/deephaven/benchmark/run/profile/queries/dashboards/benchmark_functions.dh.py +++ b/src/main/resources/io/deephaven/benchmark/run/profile/queries/dashboards/benchmark_functions.dh.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending +# Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending # # Deephaven python functions to support Benchmark Dashboards. These functions produce basic tables, # format strings, and do calculations. The data for creating tables is downloaded and cached from @@ -6,11 +6,9 @@ # # Requirements: Deephaven 0.36.1 or greater -import os, re, glob, jpy -import deephaven.dtypes as dht +import os, re, jpy from deephaven import read_csv, merge, agg, empty_table, input_table, dtypes as dht from urllib.request import urlopen, urlretrieve -from numpy import typing as npt # Convert the given name to a name suitable for a DH column name def normalize_name(name): @@ -123,9 +121,12 @@ def convert_result(table): # Do any conversions of type or column name needed from benchmark-metrics.csv def convert_metric(table): - return table.view(['benchmark_name','origin','timestamp=(long)timestamp','name', - 'value=(double)value','note']) + return table.view(['benchmark_name','origin','timestamp=(long)timestamp','name','value=(double)value','note']) +# Do any conversions of type or column name needed from benchmark-events.csv +def convert_event(table): + return table.view(['benchmark_name','origin','type','start','duration','name','value=(double)value']) + # Do any conversions of type or column name needed from benchmark-platform.csv def convert_platform(table): return table.view(['origin','name','value']) @@ -171,6 +172,11 @@ def load_bench_results(storage_uri, category='adhoc', actor_filter=None, set_fil def load_bench_metrics(storage_uri, category='adhoc', actor_filter=None, set_filter=None): run_ids = get_run_paths(storage_uri, category, actor_filter, set_filter, 100) return merge_run_tables(storage_uri, run_ids, category, 'benchmark-metrics.csv', convert_metric) + +# Load all benchmark-events.csv data collected from the given storage, category, and filters +def load_bench_events(storage_uri, category='adhoc', actor_filter=None, set_filter=None): + run_ids = get_run_paths(storage_uri, category, actor_filter, set_filter, 100) + return merge_run_tables(storage_uri, run_ids, category, 'benchmark-events.csv', convert_event) # Load all benchmark-platform.csv data collected from the given storage, category, and filters def load_bench_platform(storage_uri, category='adhoc', actor_filter=None, set_filter=None): @@ -201,7 +207,9 @@ def load_table_or_empty(table_name, storage_uri, category='adhoc', actor_filter= return globals()[f'empty_bench_{table_name}']() # Add columns for the specified platform properties -def add_platform_values(table, pnames=[], cnames = []): +def add_platform_values(table, pnames=None, cnames=None): + pnames = pnames if pnames is not None else [] + cnames = cnames if cnames is not None else [] pnames = list(dict.fromkeys(pnames)) for pname in pnames: new_pname = normalize_name(pname) @@ -213,14 +221,16 @@ def add_platform_values(table, pnames=[], cnames = []): return table # Add columns for the specified metric properties -def add_metric_values(table, pnames=[], cnames=[]): +def add_metric_values(table, pnames=None, cnames=None): + pnames = pnames if pnames is not None else [] + cnames = cnames if cnames is not None else [] pnames = list(dict.fromkeys(pnames)) for pname in pnames: new_pname = normalize_name(pname) cnames.append(new_pname) - single_metrtics = bench_metrics.where(['name=pname']).first_by(['benchmark_name','set_id','run_id','origin']) + single_metrics = bench_metrics.where(['name=pname']).first_by(['benchmark_name','set_id','run_id','origin']) table = table.natural_join( - single_metrtics, on=['benchmark_name','set_id','run_id','origin'], joins=[new_pname+'=value'] + single_metrics, on=['benchmark_name','set_id','run_id','origin'], joins=[new_pname+'=value'] ) return table @@ -239,12 +249,14 @@ def format_columns(table,pct_cols=(),int_cols=()): # Get a percentage standard deviation for the given list of rates def rstd(rates) -> float: rates = [i for i in rates if i >= 0] + if not rates: return 0.0 mean = statistics.mean(rates) return (statistics.pstdev(rates) / mean) if mean != 0 else 0.0 # Get the zscore of one rate against a list of rates def zscore(rate, rates) -> float: rates = [i for i in rates if i >= 0] + if not rates: return 0.0 std = statistics.pstdev(rates) return ((rate - statistics.mean(rates)) / std) if std != 0 else 0.0 @@ -260,11 +272,11 @@ def rchange(rates) -> float: rates = array('l', rates) if(len(rates) < 2): return 0.0 m = statistics.mean(rates[:-1]) - return (rates[-1] - m) / m + return ((rates[-1] - m) / m) if m != 0 else 0.0 # Get the percentage gain between two values def gain(start, end) -> float: - return (end - start) / start + return ((end - start) / start) if start != 0 else 0.0 # Format a list of rates to make them easier to read in a DHC table def format_rates(rates): diff --git a/src/test/java/io/deephaven/benchmark/api/QueryLogTest.java b/src/test/java/io/deephaven/benchmark/api/QueryLogTest.java index 0d5169b6..14cc37d7 100644 --- a/src/test/java/io/deephaven/benchmark/api/QueryLogTest.java +++ b/src/test/java/io/deephaven/benchmark/api/QueryLogTest.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.api; import static org.junit.jupiter.api.Assertions.*; @@ -11,7 +11,7 @@ public class QueryLogTest { @Test public void logQuery() throws Exception { Path outParent = Paths.get(getClass().getResource("test-profile.properties").toURI()).getParent(); - Files.deleteIfExists(QueryLog.getLogFile(outParent, QueryLogTest.class)); + Files.deleteIfExists(outParent.resolve("query.md")); var qlog = new QueryLog(outParent, QueryLogTest.class); qlog.setName(getClass().getSimpleName()); @@ -37,21 +37,21 @@ public void logQuery() throws Exception { ## Test - 1st Test ### Query 1 - ```` + ``` setup test - ```` + ``` ### Query 2 - ```` + ``` query1 query line - ```` + ``` ### Query 3 - ```` + ``` query2 query line - ```` + ``` """.replace("\r", "").trim(); var text = Filer.getFileText(qlog.logFile); diff --git a/src/test/java/io/deephaven/benchmark/util/FilerTest.java b/src/test/java/io/deephaven/benchmark/util/FilerTest.java index 773cd46a..53442e79 100644 --- a/src/test/java/io/deephaven/benchmark/util/FilerTest.java +++ b/src/test/java/io/deephaven/benchmark/util/FilerTest.java @@ -1,4 +1,4 @@ -/* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */ +/* Copyright (c) 2022-2026 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.util; import static org.junit.jupiter.api.Assertions.*; @@ -48,4 +48,11 @@ public void getUrlText() throws Exception { assertEquals("One Two Three\nFour Five Six", Filer.getURLText(url), "Wrong file text"); } + @Test + public void getByteSize() throws Exception { + var p = Paths.get(getClass().getResource("filerfolder").toURI()).toFile().toString(); + assertEquals(46, Filer.getByteSize(p), "Wrong byte size"); + p = Paths.get(getClass().getResource("filertest.txt").toURI()).toFile().toString(); + assertEquals(27, Filer.getByteSize(p), "Wrong byte size"); + } } diff --git a/src/test/resources/io/deephaven/benchmark/util/filerfolder/filer1.txt b/src/test/resources/io/deephaven/benchmark/util/filerfolder/filer1.txt new file mode 100644 index 00000000..9f40cf63 --- /dev/null +++ b/src/test/resources/io/deephaven/benchmark/util/filerfolder/filer1.txt @@ -0,0 +1 @@ +This file has bytes \ No newline at end of file diff --git a/src/test/resources/io/deephaven/benchmark/util/filerfolder/folder1/filer2.txt b/src/test/resources/io/deephaven/benchmark/util/filerfolder/folder1/filer2.txt new file mode 100644 index 00000000..58da37b3 --- /dev/null +++ b/src/test/resources/io/deephaven/benchmark/util/filerfolder/folder1/filer2.txt @@ -0,0 +1 @@ +This file has bytes as well \ No newline at end of file