From a92db8c17af7cdcf39eddd14641aeee2b4e75a9e Mon Sep 17 00:00:00 2001 From: Marcus Henriksson Date: Tue, 2 Jun 2026 14:19:43 +0200 Subject: [PATCH] perf!: fix GC regressions and memory leak introduced in > 1.11.0 Performance fixes (allocation hot spots): - TupleVector.validate: replace eager string concatenation on each recursive schema descent with a List path stack; string is only joined when an exception is thrown (eliminated 248 GB/s of byte[] allocation in production JFR) - HashMatch, NestedLoop: return cached schema field from getSchema() instead of calling joinSchema() on every probe/iteration; both operators stored the schema in the constructor but the public override recomputed it each time - TemporaryTable.IndexTupleVector: cache selected columns to avoid recreating SelectedValueVector (+ int[] copy) on every getColumn call - ExecutionContext.copy: share the stateless ExpressionFactory instance instead of allocating a new one per NestedLoop outer-row iteration BREAKING CHANGE: Memory leak fix: - IDatasink.execute signature changed from TupleIterator to Supplier so sinks can re-execute the upstream plan on demand (cache hit skips execution; cache refresh calls input.get() for a fresh iterator each time, matching old versions behaviour) - InsertInto: removed LazyTupleIterator; passes () -> input.execute(context) as the supplier, forwards estimatedBatchCount/estimatedRowCount, guards against sinks that forget to close the iterator - SelectIntoTempTableSink: materialise result through TupleVectorBuilder instead of relying on PlanUtils.concat's single-batch fast-path which returned the raw TableScan$1$1 anonymous TupleVector; that vector held a strong reference via this$1 -> TableScan$1 -> val$context -> ExecutionContext -> QuerySession -> temporaryTables, retaining the entire execution context chain for the lifetime of the cached entry - AInMemoryCache: document that expired entries reload asynchronously for alwaysLoadAsync=false; the Supplier API change ensures async reload always has a re-executable supplier --- .../payloadbuilder/api/catalog/IDatasink.java | 9 +- .../api/execution/TupleVector.java | 69 ++++--- .../payloadbuilder/catalog/fs/InsertSink.java | 10 +- .../catalog/jdbc/InsertSink.java | 4 +- .../catalog/kafka/KafkaCatalog.java | 12 +- .../catalog/fs/FilesystemCatalogTest.java | 14 +- .../catalog/jdbc/ASqlServerTest.java | 2 +- .../catalog/jdbc/BaseJDBCTest.java | 4 +- .../catalog/jdbc/Oracle21xTest.java | 2 +- .../system/SelectIntoTempTableSink.java | 29 ++- .../core/execution/ExecutionContext.java | 2 +- .../core/execution/QuerySession.java | 2 +- .../core/execution/TemporaryTable.java | 11 +- .../core/physicalplan/HashMatch.java | 3 +- .../core/physicalplan/InsertInto.java | 175 ++++++++---------- .../core/physicalplan/NestedLoop.java | 3 +- .../cache/InMemoryTempTableCacheTest.java | 16 +- .../system/SelectIntoTempTableSinkTest.java | 72 +++++++ .../core/physicalplan/HashMatchTest.java | 18 ++ .../core/physicalplan/InsertIntoTest.java | 75 ++++---- .../core/physicalplan/NestedLoopTest.java | 16 ++ .../core/planning/QueryPlannerTest.java | 3 +- .../core/test/TestHarnessRunner.java | 8 +- 23 files changed, 358 insertions(+), 201 deletions(-) create mode 100644 payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSinkTest.java diff --git a/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/catalog/IDatasink.java b/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/catalog/IDatasink.java index 8d0a0af71..5474dbf3e 100644 --- a/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/catalog/IDatasink.java +++ b/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/catalog/IDatasink.java @@ -1,11 +1,16 @@ package se.kuseman.payloadbuilder.api.catalog; +import java.util.function.Supplier; + import se.kuseman.payloadbuilder.api.execution.IExecutionContext; import se.kuseman.payloadbuilder.api.execution.TupleIterator; /** A sink used on operators when the data flows into catalog. Ie. insert into. etc. */ public interface IDatasink { - /** Execute sink with provided input {@link TupleIterator}. */ - void execute(IExecutionContext context, TupleIterator input); + /** + * Execute sink with provided input supplier. The supplier creates a fresh {@link TupleIterator} on each call, allowing sinks to decide when — and optionally how many times — to execute the + * upstream plan. Sinks that cache results (e.g. SELECT INTO with CACHE_TTL) can skip calling the supplier on a cache hit and call it again on later cache refreshes. + */ + void execute(IExecutionContext context, Supplier input); } diff --git a/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/execution/TupleVector.java b/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/execution/TupleVector.java index de5c183d6..51c39cfda 100644 --- a/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/execution/TupleVector.java +++ b/payloadbuilder-api/src/main/java/se/kuseman/payloadbuilder/api/execution/TupleVector.java @@ -166,15 +166,30 @@ static TupleVector of(final Schema schema, final List col for (int i = 0; i < columnSize; i++) { - ResolvedType schemaType = schema.getColumns() - .get(i) - .getType(); + Column col = schema.getColumns() + .get(i); + ResolvedType schemaType = col.getType(); ResolvedType vectorType = columns.get(i) .type(); - validate(schema.getColumns() - .get(i) - .getName(), schemaType, vectorType); + // Fast-path: Any skips validation; simple types with no sub-schema are done after a type + // equality check — avoid allocating a path list for these common cases. + if (schemaType.getType() == Type.Any) + { + continue; + } + if (schemaType.getType() != vectorType.getType()) + { + throw new IllegalArgumentException("Schema type: " + schemaType.getType() + " for column: " + col.getName() + " doesn't match value vectors type " + vectorType); + } + if (schemaType.getSchema() == null) + { + continue; + } + + List path = new ArrayList<>(); + path.add(col.getName()); + validate(path, schemaType, vectorType); } return new TupleVector() @@ -337,25 +352,8 @@ default String toCsv(int indent) return sb.toString(); } - private static void validate(String columnPath, ResolvedType schemaType, ResolvedType vectorType) + private static void validate(List columnPath, ResolvedType schemaType, ResolvedType vectorType) { - // Don't validate any types, they can differ, reflection will be used runtime - if (schemaType.getType() == Type.Any) - { - return; - } - - if (schemaType.getType() != vectorType.getType()) - { - throw new IllegalArgumentException("Schema type: " + schemaType.getType() + " for column: " + columnPath + " doesn't match value vectors type " + vectorType); - } - - // If this is not a complex type we're done - if (schemaType.getSchema() == null) - { - return; - } - Schema vectorSchema = vectorType.getSchema(); // Skip validation of empty vector schemas @@ -369,7 +367,7 @@ private static void validate(String columnPath, ResolvedType schemaType, Resolve if (size != vectorSchema.getSize()) { - throw new IllegalArgumentException("Schema size: " + size + " for column: " + columnPath + " doesn't match value vectors size: " + vectorSchema.getSize()); + throw new IllegalArgumentException("Schema size: " + size + " for column: " + String.join("/", columnPath) + " doesn't match value vectors size: " + vectorSchema.getSize()); } for (int i = 0; i < size; i++) @@ -378,7 +376,26 @@ private static void validate(String columnPath, ResolvedType schemaType, Resolve .get(i); Column vectorColumn = vectorSchema.getColumns() .get(i); - validate(columnPath + "/" + schemaColumn, schemaColumn.getType(), vectorColumn.getType()); + ResolvedType childSchemaType = schemaColumn.getType(); + ResolvedType childVectorType = vectorColumn.getType(); + + if (childSchemaType.getType() == Type.Any) + { + continue; + } + if (childSchemaType.getType() != childVectorType.getType()) + { + throw new IllegalArgumentException("Schema type: " + childSchemaType + .getType() + " for column: " + String.join("/", columnPath) + "/" + schemaColumn.getName() + " doesn't match value vectors type " + childVectorType); + } + if (childSchemaType.getSchema() == null) + { + continue; + } + + columnPath.add(schemaColumn.getName()); + validate(columnPath, childSchemaType, childVectorType); + columnPath.remove(columnPath.size() - 1); } } } diff --git a/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/InsertSink.java b/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/InsertSink.java index cf09e1ab7..7203230f0 100644 --- a/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/InsertSink.java +++ b/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/fs/InsertSink.java @@ -8,6 +8,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.List; +import java.util.function.Supplier; import java.util.stream.IntStream; import se.kuseman.payloadbuilder.api.QualifiedName; @@ -44,18 +45,19 @@ class InsertSink implements IDatasink } @Override - public void execute(IExecutionContext context, TupleIterator input) + public void execute(IExecutionContext context, Supplier input) { WriteResolveResult resolveResult = catalog.resolveForWrite(context, catalogAlias, table, options); + TupleIterator it = input.get(); //@formatter:off try (OutputStream os = Files.newOutputStream(Path.of(resolveResult.filename()), OPENOPTIONS); VectorWriter vectorWriter = context.getVectorWriter(resolveResult.format(), os, options)) //@formatter:on { - while (input.hasNext()) + while (it.hasNext()) { - TupleVector next = getVector(input.next()); + TupleVector next = getVector(it.next()); vectorWriter.write(next); } } @@ -65,7 +67,7 @@ public void execute(IExecutionContext context, TupleIterator input) } finally { - input.close(); + it.close(); } } diff --git a/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/jdbc/InsertSink.java b/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/jdbc/InsertSink.java index d9a6e102f..93b3cc503 100644 --- a/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/jdbc/InsertSink.java +++ b/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/jdbc/InsertSink.java @@ -12,6 +12,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.IntStream; import org.slf4j.Logger; @@ -55,8 +56,9 @@ class InsertSink implements IDatasink } @Override - public void execute(IExecutionContext context, TupleIterator input) + public void execute(IExecutionContext context, Supplier inputSupplier) { + TupleIterator input = inputSupplier.get(); String database = catalog.getDatabase(context.getSession(), catalogAlias); SqlDialect dialect = DialectProvider.getDialect(context.getSession(), catalogAlias); AtomicReference currentStatemnet = new AtomicReference<>(); diff --git a/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalog.java b/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalog.java index 2e62148c0..e4921375e 100644 --- a/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalog.java +++ b/payloadbuilder-catalog/src/main/java/se/kuseman/payloadbuilder/catalog/kafka/KafkaCatalog.java @@ -21,12 +21,12 @@ public class KafkaCatalog extends Catalog static final String NAME = "Kafka"; // Catalog property keys - static final String BOOTSTRAP_SERVERS = "bootstrap_servers"; - static final String SCHEMA_REGISTRY_URL = "schema_registry_url"; - static final String SECURITY_PROTOCOL = "security_protocol"; - static final String SASL_MECHANISM = "sasl_mechanism"; - static final String SASL_JAAS_CONFIG = "sasl_jaas_config"; - static final String TOPIC = "topic"; + public static final String BOOTSTRAP_SERVERS = "bootstrap_servers"; + public static final String SCHEMA_REGISTRY_URL = "schema_registry_url"; + public static final String SECURITY_PROTOCOL = "security_protocol"; + public static final String SASL_MECHANISM = "sasl_mechanism"; + public static final String SASL_JAAS_CONFIG = "sasl_jaas_config"; + public static final String TOPIC = "topic"; private static final QualifiedName OFFSET = QualifiedName.of("offset"); private static final QualifiedName TIMESTAMP = QualifiedName.of("timestamp"); diff --git a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogTest.java b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogTest.java index 6cd9b81c3..1d8b39d8c 100644 --- a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogTest.java +++ b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/fs/FilesystemCatalogTest.java @@ -228,7 +228,7 @@ void test_datasink_selectinto_csv() throws IOException List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred"))); // @format:on - sink.execute(context, new TupleIterator() + sink.execute(context, () -> new TupleIterator() { int index = 0; @@ -283,7 +283,7 @@ void test_datasink_selectinto_txt() throws IOException List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred"))); // @format:on - sink.execute(context, new TupleIterator() + sink.execute(context, () -> new TupleIterator() { int index = 0; @@ -339,7 +339,7 @@ void test_datasink_selectinto_fallback() throws IOException List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred"))); // @format:on - sink.execute(context, new TupleIterator() + sink.execute(context, () -> new TupleIterator() { int index = 0; @@ -396,7 +396,7 @@ void test_datasink_selectinto_format_option() throws IOException List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred"))); // @format:on - sink.execute(context, new TupleIterator() + sink.execute(context, () -> new TupleIterator() { int index = 0; @@ -452,7 +452,7 @@ void test_datasink_selectinto_json() throws IOException List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred"))); // @format:on - sink.execute(context, new TupleIterator() + sink.execute(context, () -> new TupleIterator() { int index = 0; @@ -501,7 +501,7 @@ void test_datasink_insertinto() throws IOException List.of(VectorTestUtils.vv(Type.Int, 10, 20, 30), VectorTestUtils.vv(Type.String, "ten", "twenty", "thirty åäö"))); // @format:on - sink.execute(context, TupleIterator.singleton(vector1)); + sink.execute(context, () -> TupleIterator.singleton(vector1)); // Insert into again but with json //@formatter:off @@ -513,7 +513,7 @@ void test_datasink_insertinto() throws IOException List.of("newIntCol2", "newStringCol2"))); //@formatter:on - sink.execute(context, TupleIterator.singleton(vector1)); + sink.execute(context, () -> TupleIterator.singleton(vector1)); // CSOFF assertEquals(""" newIntCol newStringCol diff --git a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/ASqlServerTest.java b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/ASqlServerTest.java index b80c576ec..8e08da110 100644 --- a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/ASqlServerTest.java +++ b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/ASqlServerTest.java @@ -64,7 +64,7 @@ void test_qualified_tables() throws SQLException Column myStringColumn = getStringColumn("mystring", 100); IDatasink sink = catalog.getInsertIntoSink(context.getSession(), CATALOG_ALIAS, tableName, new InsertIntoData(0, Schema.EMPTY, emptyList(), emptyList())); //@formatter:off - sink.execute(context, TupleIterator.singleton(TupleVector.of(Schema.of( + sink.execute(context, () -> TupleIterator.singleton(TupleVector.of(Schema.of( intColumn, myStringColumn), VectorTestUtils.vv(Type.Int, 10, 20), diff --git a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/BaseJDBCTest.java b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/BaseJDBCTest.java index c100cd385..a2be033a9 100644 --- a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/BaseJDBCTest.java +++ b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/BaseJDBCTest.java @@ -265,7 +265,7 @@ void before() //@formatter:on // CSON - selectIntoSink.execute(executionContext, TupleIterator.singleton(testData)); + selectIntoSink.execute(executionContext, () -> TupleIterator.singleton(testData)); } @Test @@ -519,7 +519,7 @@ void test_datasource_table_scan_asterisk() //@formatter:on // CSON context = mockExecutionContext(); - insertIntoSink.execute(context, TupleIterator.singleton(testData)); + insertIntoSink.execute(context, () -> TupleIterator.singleton(testData)); it = ds.execute(context); diff --git a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/Oracle21xTest.java b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/Oracle21xTest.java index 92d2826e7..add170419 100644 --- a/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/Oracle21xTest.java +++ b/payloadbuilder-catalog/src/test/java/se/kuseman/payloadbuilder/catalog/jdbc/Oracle21xTest.java @@ -71,7 +71,7 @@ void test_clob() throws SQLException new Option(QualifiedName.of(JdbcCatalog.COLUMN, "mynclob", JdbcCatalog.DECLARATION), new LiteralStringExpression("NCLOB")))); //@formatter:off - sink.execute(mockExecutionContext(), TupleIterator.singleton(TupleVector.of(Schema.of( + sink.execute(mockExecutionContext(), () -> TupleIterator.singleton(TupleVector.of(Schema.of( Column.of("myclob", Type.String, new Column.MetaData(Map.of(MetaData.NULLABLE, false))), Column.of("mynclob", Type.String)), List.of( diff --git a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSink.java b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSink.java index 6eb1d76e4..d53deb5fa 100644 --- a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSink.java +++ b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSink.java @@ -20,11 +20,11 @@ import se.kuseman.payloadbuilder.api.execution.TupleIterator; import se.kuseman.payloadbuilder.api.execution.TupleVector; import se.kuseman.payloadbuilder.api.execution.ValueVector; +import se.kuseman.payloadbuilder.api.execution.vector.ITupleVectorBuilder; import se.kuseman.payloadbuilder.api.expression.IExpression; import se.kuseman.payloadbuilder.core.QueryException; import se.kuseman.payloadbuilder.core.execution.QuerySession; import se.kuseman.payloadbuilder.core.execution.TemporaryTable; -import se.kuseman.payloadbuilder.core.physicalplan.PlanUtils; /** Sink for inserting into temporary tables. */ class SelectIntoTempTableSink implements IDatasink @@ -58,7 +58,7 @@ class SelectIntoTempTableSink implements IDatasink } @Override - public void execute(IExecutionContext context, TupleIterator input) + public void execute(IExecutionContext context, Supplier input) { // Strip the # prefix, we don't want that when looking up tables QualifiedName table = this.table.extract(1) @@ -98,7 +98,30 @@ public void execute(IExecutionContext context, TupleIterator input) } } - Supplier tempTableSupplier = () -> new TemporaryTable(PlanUtils.concat(context, input), indices); + Supplier tempTableSupplier = () -> + { + // Always materialise through TupleVectorBuilder rather than using PlanUtils.concat directly. + // PlanUtils.concat has a single-batch fast-path that returns the raw TupleVector as-is. + // For a TableScan that raw vector is an anonymous inner class (TableScan$1$1) holding + // a strong reference to its outer iterator (TableScan$1) and through that to the + // ExecutionContext → QuerySession → temporaryTables chain. Storing that in + // TemporaryTable.vector retains the entire chain for the cached entry's lifetime. + TupleIterator it = input.get(); + try + { + ITupleVectorBuilder builder = context.getVectorFactory() + .getTupleVectorBuilder(0); + while (it.hasNext()) + { + builder.append(it.next()); + } + return new TemporaryTable(builder.build(), indices); + } + finally + { + it.close(); + } + }; TemporaryTable temporaryTable; if (cacheTtl != null) { diff --git a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/ExecutionContext.java b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/ExecutionContext.java index 3411933ef..8634a1cef 100644 --- a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/ExecutionContext.java +++ b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/ExecutionContext.java @@ -52,7 +52,7 @@ private ExecutionContext(ExecutionContext source) this.variables = source.variables; this.statementContext = new StatementContext(source.statementContext); this.vectorFactory = source.vectorFactory; - this.expressionFactory = new ExpressionFactory(); + this.expressionFactory = source.expressionFactory; } /** Return session */ diff --git a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/QuerySession.java b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/QuerySession.java index ab65d37b3..448890d9a 100644 --- a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/QuerySession.java +++ b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/QuerySession.java @@ -318,7 +318,7 @@ public CatalogRegistry getCatalogRegistry() } /** Return variables map */ - Map getVariables() + public Map getVariables() { return variables; } diff --git a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/TemporaryTable.java b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/TemporaryTable.java index 81a48019e..4d2c0bfa7 100644 --- a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/TemporaryTable.java +++ b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/execution/TemporaryTable.java @@ -232,11 +232,14 @@ static class IndexTupleVector implements TupleVector { final TupleVector vector; final ValueVector selection; + private final ValueVector[] columns; IndexTupleVector(TupleVector vector, IntList rows) { this.vector = vector; this.selection = VectorUtils.convertToSelectionVector(rows); + this.columns = new ValueVector[vector.getSchema() + .getSize()]; } @Override @@ -254,7 +257,13 @@ public Schema getSchema() @Override public ValueVector getColumn(int column) { - return SelectedValueVector.select(vector.getColumn(column), selection); + ValueVector col = columns[column]; + if (col == null) + { + col = SelectedValueVector.select(vector.getColumn(column), selection); + columns[column] = col; + } + return col; } } diff --git a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatch.java b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatch.java index b409ed93d..2af39aa18 100644 --- a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatch.java +++ b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatch.java @@ -151,7 +151,8 @@ public Map getDescribeProperties(IExecutionContext context) @Override public Schema getSchema() { - return getSchema(false); + return schema != null ? schema + : getSchema(false); } private Schema getSchema(boolean cartesian) diff --git a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/InsertInto.java b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/InsertInto.java index bfe411c2d..b2888f372 100644 --- a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/InsertInto.java +++ b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/InsertInto.java @@ -60,17 +60,78 @@ public boolean hasWritableOutput() @Override public TupleIterator execute(IExecutionContext context) { - LazyTupleIterator iterator = new LazyTupleIterator(context); + int[] rowCount = new int[1]; + boolean[] closed = new boolean[1]; + TupleIterator[] underlying = new TupleIterator[1]; + try { - datasink.execute(context, iterator); + datasink.execute(context, () -> + { + TupleIterator it = input.execute(context); + underlying[0] = it; + return new TupleIterator() + { + @Override + public int estimatedBatchCount() + { + return it.estimatedBatchCount(); + } + + @Override + public int estimatedRowCount() + { + return it.estimatedRowCount(); + } + + @Override + public boolean hasNext() + { + return it.hasNext(); + } + + @Override + public TupleVector next() + { + TupleVector next = it.next(); + if (insertColumns != null + && next.getSchema() + .getSize() != insertColumns.size()) + { + throw new QueryException("Insert column count doesn't match input column count. Insert columns: " + insertColumns + + ", input columns: " + + next.getSchema() + .getColumns()); + } + rowCount[0] += next.getRowCount(); + return hasAsteriskSchemaOrInput ? createProxyVector(context, next) + : next; + } + + @Override + public void close() + { + if (!closed[0]) + { + closed[0] = true; + it.close(); + } + } + }; + }); } finally { - // Make sure we always close the iterator to guard against bad implementations - iterator.close(); + // Guard against datasink implementations that forget to close the iterator + if (underlying[0] != null + && !closed[0]) + { + closed[0] = true; + underlying[0].close(); + } } - ((StatementContext) context.getStatementContext()).setRowCount(iterator.rowCount); + + ((StatementContext) context.getStatementContext()).setRowCount(rowCount[0]); return TupleIterator.EMPTY; } @@ -113,107 +174,27 @@ else if (obj instanceof InsertInto that) return false; } - /** - * Lazy iterator that executes the input on first access. This to allow for {@link IDatasink}'s to handle caches etc. without the need for executing anything if not needed. - */ - private class LazyTupleIterator implements TupleIterator + private TupleVector createProxyVector(IExecutionContext context, TupleVector tv) { - private final IExecutionContext context; - private TupleIterator iterator; - private int rowCount; - private boolean closed; - - LazyTupleIterator(IExecutionContext context) - { - this.context = context; - } - - @Override - public int estimatedBatchCount() - { - return getIterator().estimatedBatchCount(); - } - - @Override - public int estimatedRowCount() - { - return getIterator().estimatedRowCount(); - } - - @Override - public void close() - { - if (!closed - && iterator != null) - { - iterator.close(); - closed = true; - } - } - - @Override - public boolean hasNext() - { - return getIterator().hasNext(); - } - - @Override - public TupleVector next() + return new TupleVector() { - TupleVector next = getIterator().next(); - - if (insertColumns != null - && next.getSchema() - .getSize() != insertColumns.size()) + @Override + public Schema getSchema() { - throw new QueryException("Insert column count doesn't match input column count. Insert columns: " + insertColumns - + ", input columns: " - + next.getSchema() - .getColumns()); + return SchemaUtils.rewriteSchema(tv.getSchema(), (StatementContext) context.getStatementContext()); } - rowCount += next.getRowCount(); - // Create a proxy vector if the input is asterisk to have a proper schema with meta etc. - // that is only known runtime - if (hasAsteriskSchemaOrInput) + @Override + public int getRowCount() { - next = createProxyVector(next); + return tv.getRowCount(); } - return next; - } - - private TupleIterator getIterator() - { - if (iterator == null) + @Override + public ValueVector getColumn(int column) { - iterator = input.execute(context); + return tv.getColumn(column); } - return iterator; - } - - private TupleVector createProxyVector(TupleVector tv) - { - return new TupleVector() - { - @Override - public Schema getSchema() - { - return SchemaUtils.rewriteSchema(tv.getSchema(), (StatementContext) context.getStatementContext()); - } - - @Override - public int getRowCount() - { - return tv.getRowCount(); - } - - @Override - public ValueVector getColumn(int column) - { - return tv.getColumn(column); - } - }; - } + }; } } diff --git a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoop.java b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoop.java index 4d196a852..4aac715a3 100644 --- a/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoop.java +++ b/payloadbuilder-core/src/main/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoop.java @@ -259,7 +259,8 @@ public Map getDescribeProperties(IExecutionContext context) @Override public Schema getSchema() { - return getSchema(false); + return schema != null ? schema + : getSchema(false); } private Schema getSchema(boolean cartesian) diff --git a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/cache/InMemoryTempTableCacheTest.java b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/cache/InMemoryTempTableCacheTest.java index fa6b168c5..53213b149 100644 --- a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/cache/InMemoryTempTableCacheTest.java +++ b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/cache/InMemoryTempTableCacheTest.java @@ -65,17 +65,17 @@ else if (call == 2) Thread.sleep(150); - // Key expired and reload is performed and we get the stale "old" data back + // Key expired — background reload triggered, stale value returned immediately table = cacheImpl.computeIfAbsent(QualifiedName.of("table"), Duration.ofMillis(100), supplier); assertSame(first, table.getTupleVector()); assertEquals(1, cacheImpl.getCacheHits()); assertEquals(1, cacheImpl.getCacheMisses()); assertEquals(1, cacheImpl.getCacheStaleHits()); - // Let executor get value + // Let background reload complete Thread.sleep(10); - // A second call will yield the new result + // Fresh value now available table = cacheImpl.computeIfAbsent(QualifiedName.of("table"), Duration.ofMillis(100), supplier); assertSame(second, table.getTupleVector()); assertEquals(2, cacheImpl.getCacheHits()); @@ -84,17 +84,16 @@ else if (call == 2) Thread.sleep(150); - // Key expired and reload will fail and we should get second result back + // Key expired — reload will fail, stale value kept table = cacheImpl.computeIfAbsent(QualifiedName.of("table"), Duration.ofMillis(100), supplier); assertSame(second, table.getTupleVector()); assertEquals(2, cacheImpl.getCacheHits()); assertEquals(1, cacheImpl.getCacheMisses()); assertEquals(2, cacheImpl.getCacheStaleHits()); - // Let executor get value + // Let background reload attempt complete (it fails, second is kept) Thread.sleep(10); - // A second call will still yield second result table = cacheImpl.computeIfAbsent(QualifiedName.of("table"), Duration.ofMillis(100), supplier); assertSame(second, table.getTupleVector()); assertEquals(2, cacheImpl.getCacheHits()); @@ -103,17 +102,16 @@ else if (call == 2) Thread.sleep(150); - // Key expired and reload will kick in and we should get second result back + // Key expired again — background reload with third table = cacheImpl.computeIfAbsent(QualifiedName.of("table"), Duration.ofMillis(100), supplier); assertSame(second, table.getTupleVector()); assertEquals(2, cacheImpl.getCacheHits()); assertEquals(1, cacheImpl.getCacheMisses()); assertEquals(4, cacheImpl.getCacheStaleHits()); - // Let executor get value + // Let background reload complete Thread.sleep(10); - // A second call will yield the new result table = cacheImpl.computeIfAbsent(QualifiedName.of("table"), Duration.ofMillis(100), supplier); assertSame(third, table.getTupleVector()); assertEquals(3, cacheImpl.getCacheHits()); diff --git a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSinkTest.java b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSinkTest.java new file mode 100644 index 000000000..95ea8063f --- /dev/null +++ b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/catalog/system/SelectIntoTempTableSinkTest.java @@ -0,0 +1,72 @@ +package se.kuseman.payloadbuilder.core.catalog.system; + +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import se.kuseman.payloadbuilder.api.QualifiedName; +import se.kuseman.payloadbuilder.api.catalog.Column.Type; +import se.kuseman.payloadbuilder.api.catalog.Schema; +import se.kuseman.payloadbuilder.api.execution.TupleVector; +import se.kuseman.payloadbuilder.core.execution.QuerySession; +import se.kuseman.payloadbuilder.core.execution.TemporaryTable; +import se.kuseman.payloadbuilder.core.physicalplan.APhysicalPlanTest; +import se.kuseman.payloadbuilder.test.VectorTestUtils; + +/** Test of {@link SelectIntoTempTableSink} */ +class SelectIntoTempTableSinkTest extends APhysicalPlanTest +{ + @Test + void test_vector_is_materialised_not_raw_scan_vector() + { + // Regression: SelectIntoTempTableSink previously used PlanUtils.concat which has a + // single-batch fast-path returning the raw TupleVector (e.g. TableScan$1$1) directly. + // That anonymous class holds this$1 -> outer iterator -> val$context -> ExecutionContext + // -> QuerySession -> temporaryTables, retaining the entire execution context chain for + // the lifetime of the cached TemporaryTable. + // The fix forces materialisation through TupleVectorBuilder so the vector is a + // TupleVector$4 (from TupleVector.of) with no reference to any ExecutionContext. + Schema schema = Schema.of(col("col1", Type.Int)); + TupleVector data = TupleVector.of(schema, List.of(VectorTestUtils.vv(Type.Int, 1, 2, 3))); + + SelectIntoTempTableSink sink = new SelectIntoTempTableSink(QualifiedName.of("#", "table"), emptyList(), true); + sink.execute(context, () -> new se.kuseman.payloadbuilder.api.execution.TupleIterator() + { + boolean hasNext = true; + + @Override + public boolean hasNext() + { + return hasNext; + } + + @Override + public TupleVector next() + { + hasNext = false; + return data; + } + }); + + TemporaryTable table = ((QuerySession) context.getSession()).getTemporaryTable(QualifiedName.of("table")); + TupleVector vector = table.getTupleVector(); + + // The vector must be a materialised TupleVector (from TupleVector.of / TupleVectorBuilder), + // NOT the raw TupleVector passed in (which could be a scan anonymous class holding a context). + assertFalse(vector == data, "TemporaryTable must not store the raw input TupleVector — it must be materialised"); + + // The enclosing class of the materialised vector is TupleVector (from TupleVector.of()). + // A raw TableScan$1$1 would have enclosing class TableScan$1 -> TableScan. + assertSame(TupleVector.class, vector.getClass() + .getEnclosingClass(), "TemporaryTable vector must be a TupleVector.of() instance with no external context references"); + + // Data is intact + assertEquals(3, vector.getRowCount()); + VectorTestUtils.assertVectorsEquals(VectorTestUtils.vv(Type.Int, 1, 2, 3), vector.getColumn(0)); + } +} diff --git a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatchTest.java b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatchTest.java index 446cb7a02..9df952129 100644 --- a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatchTest.java +++ b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/HashMatchTest.java @@ -59,6 +59,24 @@ protected IPhysicalPlan createIndexLeftJoin(IPhysicalPlan outer, IPhysicalPlan i return new HashMatch(0, outer, inner, List.of(ce("col1")), List.of(ce("col3")), predicate, populateAlias, true, true); } + @Test + void test_getSchema_returns_cached_instance() + { + // Regression: getSchema() previously called joinSchema() on every invocation, + // allocating a new Schema on every probe operation. Verify same instance returned. + Schema outerSchema = Schema.of(Column.of("col1", Type.Int)); + Schema innerSchema = Schema.of(Column.of("col3", Type.String)); + IPhysicalPlan plan = createInnerJoin(scanVectors(schemaLessDS(() -> + { + }, false), outerSchema), scanVectors(schemaLessDS(() -> + { + }, false), innerSchema), (tv, ctx) -> ValueVector.literalBoolean(true, tv.getRowCount()), null); + + Schema first = plan.getSchema(); + Schema second = plan.getSchema(); + org.junit.jupiter.api.Assertions.assertSame(first, second, "getSchema() must return the same Schema instance on repeated calls"); + } + @Test void test_join_integer_against_string_hash_outer() { diff --git a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/InsertIntoTest.java b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/InsertIntoTest.java index c74060b7a..9cfcb4ae8 100644 --- a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/InsertIntoTest.java +++ b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/InsertIntoTest.java @@ -1,8 +1,6 @@ package se.kuseman.payloadbuilder.core.physicalplan; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -11,15 +9,12 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import se.kuseman.payloadbuilder.api.QualifiedName; import se.kuseman.payloadbuilder.api.catalog.Column; import se.kuseman.payloadbuilder.api.catalog.IDatasink; import se.kuseman.payloadbuilder.api.catalog.ResolvedType; import se.kuseman.payloadbuilder.api.catalog.Schema; -import se.kuseman.payloadbuilder.api.execution.IExecutionContext; import se.kuseman.payloadbuilder.api.execution.TupleIterator; import se.kuseman.payloadbuilder.api.execution.TupleVector; import se.kuseman.payloadbuilder.api.execution.ValueVector; @@ -44,52 +39,68 @@ void test_that_input_is_closed() .withMetaData(new Column.MetaData(Map.of("key", "value"))) .build()); - // Setut a runtime schema that should be used to verify schema rewriting + // Set a runtime schema that should be used to verify schema rewriting context.getStatementContext() .setRuntimeSchema(1, expectedSchema); IPhysicalPlan plan = Mockito.mock(IPhysicalPlan.class); TupleIterator iterator = Mockito.mock(TupleIterator.class); - when(iterator.hasNext()).thenReturn(true); + when(iterator.hasNext()).thenReturn(true, false); when(iterator.next()).thenReturn(vector); + when(iterator.estimatedBatchCount()).thenReturn(3); + when(iterator.estimatedRowCount()).thenReturn(100); // Asterisk schema of input when(plan.getSchema()).thenReturn(Schema.of(ast("*", table))); when(plan.execute(context)).thenReturn(iterator); - IDatasink sink = Mockito.mock(IDatasink.class); - doAnswer(new Answer() + // The sink receives a Supplier — call input.get() to execute the upstream plan, + // then consume the resulting iterator. InsertInto must close the iterator. + TupleVector[] captured = new TupleVector[1]; + IDatasink sink = (ctx, input) -> { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable - { - // Call iterator methods to verify that the lazy iterator catches them - ((TupleIterator) invocation.getArgument(1)).hasNext(); - TupleVector vector = ((TupleIterator) invocation.getArgument(1)).next(); - ((TupleIterator) invocation.getArgument(1)).estimatedBatchCount(); - ((TupleIterator) invocation.getArgument(1)).estimatedRowCount(); - - assertEquals(expectedSchema, vector.getSchema()); - assertEquals(3, vector.getRowCount()); - VectorTestUtils.assertVectorsEquals(ValueVector.literalInt(10, 3), vector.getColumn(0)); - - // We skip close to make sure that InsertInto does that for us if implementation forget - return null; - } - }).when(sink) - .execute(any(IExecutionContext.class), any(TupleIterator.class)); + TupleIterator it = input.get(); + // Verify wrapper forwards estimation methods to the underlying iterator + assertEquals(3, it.estimatedBatchCount()); + assertEquals(100, it.estimatedRowCount()); + it.hasNext(); + captured[0] = it.next(); + // Intentionally skip close() to verify InsertInto closes it + }; InsertInto into = new InsertInto(0, plan, null, sink); - into.execute(context); - verify(sink).execute(any(IExecutionContext.class), any(TupleIterator.class)); + // Verify the proxy vector applied the asterisk schema rewrite + assertEquals(expectedSchema, captured[0].getSchema()); + assertEquals(3, captured[0].getRowCount()); + VectorTestUtils.assertVectorsEquals(ValueVector.literalInt(10, 3), captured[0].getColumn(0)); + verify(plan).getSchema(); verify(plan).execute(context); - verify(iterator).hasNext(); - verify(iterator).next(); verify(iterator).estimatedBatchCount(); verify(iterator).estimatedRowCount(); + verify(iterator).hasNext(); + verify(iterator).next(); verify(iterator).close(); - verifyNoMoreInteractions(plan, sink, iterator); + verifyNoMoreInteractions(plan, iterator); + } + + @Test + void test_sink_can_skip_execution_on_cache_hit() + { + IPhysicalPlan plan = Mockito.mock(IPhysicalPlan.class); + when(plan.getSchema()).thenReturn(Schema.EMPTY); + + // Sink that does NOT call input.get() — simulates a cache hit + IDatasink sink = (ctx, input) -> + { + }; + + InsertInto into = new InsertInto(0, plan, null, sink); + into.execute(context); + + // Plan was never executed since sink did not call input.get() + verify(plan).getSchema(); + verifyNoMoreInteractions(plan); } } diff --git a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoopTest.java b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoopTest.java index bd0a4b945..f8258aa3e 100644 --- a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoopTest.java +++ b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/physicalplan/NestedLoopTest.java @@ -49,6 +49,22 @@ class NestedLoopTest extends AJoinTest */ private Set outerReferences = asSet(col("col", ResolvedType.of(Type.Any), new TableSourceReference(0, TableSourceReference.Type.TABLE, "", QualifiedName.of("table"), "t"))); + @Test + void test_getSchema_returns_cached_instance() + { + // Regression: getSchema() previously called joinSchema() on every invocation, + // allocating a new Schema on every outer-row iteration. Verify same instance returned. + IPhysicalPlan plan = NestedLoop.innerJoin(0, scanVectors(schemaLessDS(() -> + { + }, false), outerSchema), scanVectors(schemaLessDS(() -> + { + }, false), innerSchema), null, false); + + Schema first = plan.getSchema(); + Schema second = plan.getSchema(); + org.junit.jupiter.api.Assertions.assertSame(first, second, "getSchema() must return the same Schema instance on repeated calls"); + } + @Disabled @Test void measure() diff --git a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/planning/QueryPlannerTest.java b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/planning/QueryPlannerTest.java index c7b5b7024..0b3608162 100644 --- a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/planning/QueryPlannerTest.java +++ b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/planning/QueryPlannerTest.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.function.Supplier; import org.apache.commons.lang3.tuple.Triple; import org.assertj.core.api.Assertions; @@ -1209,7 +1210,7 @@ void test_temp_table() Schema.of(col("col1", ResolvedType.INT)), List.of(ValueVector.literalInt(1, 1)))), null, new IDatasink() { @Override - public void execute(IExecutionContext context, TupleIterator input) + public void execute(IExecutionContext context, Supplier input) { } }); diff --git a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/test/TestHarnessRunner.java b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/test/TestHarnessRunner.java index 341a1f029..6eb1e6dc5 100644 --- a/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/test/TestHarnessRunner.java +++ b/payloadbuilder-core/src/test/java/se/kuseman/payloadbuilder/core/test/TestHarnessRunner.java @@ -630,9 +630,9 @@ public IDatasink getInsertIntoSink(IQuerySession session, String catalogAlias, Q return new IDatasink() { @Override - public void execute(IExecutionContext context, TupleIterator input) + public void execute(IExecutionContext context, Supplier input) { - TupleVector vector = PlanUtils.concat(context, input); + TupleVector vector = PlanUtils.concat(context, input.get()); int rowCount = vector.getRowCount(); int colCount = vector.getSchema() .getSize(); @@ -682,9 +682,9 @@ public IDatasink getSelectIntoSink(IQuerySession session, String catalogAlias, Q return new IDatasink() { @Override - public void execute(IExecutionContext context, TupleIterator input) + public void execute(IExecutionContext context, Supplier input) { - TupleVector vector = PlanUtils.concat(context, input); + TupleVector vector = PlanUtils.concat(context, input.get()); Schema schema = vector.getSchema(); // Set the runtime data of the test table