Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package se.kuseman.payloadbuilder.api.catalog;

import java.util.function.Supplier;

import se.kuseman.payloadbuilder.api.execution.IExecutionContext;
import se.kuseman.payloadbuilder.api.execution.TupleIterator;

/** A sink used on operators when the data flows into catalog. Ie. insert into. etc. */
public interface IDatasink
{
/** Execute sink with provided input {@link TupleIterator}. */
void execute(IExecutionContext context, TupleIterator input);
/**
* Execute sink with provided input supplier. The supplier creates a fresh {@link TupleIterator} on each call, allowing sinks to decide when — and optionally how many times — to execute the
* upstream plan. Sinks that cache results (e.g. SELECT INTO with CACHE_TTL) can skip calling the supplier on a cache hit and call it again on later cache refreshes.
*/
void execute(IExecutionContext context, Supplier<TupleIterator> input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,30 @@ static TupleVector of(final Schema schema, final List<? extends ValueVector> col

for (int i = 0; i < columnSize; i++)
{
ResolvedType schemaType = schema.getColumns()
.get(i)
.getType();
Column col = schema.getColumns()
.get(i);
ResolvedType schemaType = col.getType();
ResolvedType vectorType = columns.get(i)
.type();

validate(schema.getColumns()
.get(i)
.getName(), schemaType, vectorType);
// Fast-path: Any skips validation; simple types with no sub-schema are done after a type
// equality check — avoid allocating a path list for these common cases.
if (schemaType.getType() == Type.Any)
{
continue;
}
if (schemaType.getType() != vectorType.getType())
{
throw new IllegalArgumentException("Schema type: " + schemaType.getType() + " for column: " + col.getName() + " doesn't match value vectors type " + vectorType);
}
if (schemaType.getSchema() == null)
{
continue;
}

List<String> path = new ArrayList<>();
path.add(col.getName());
validate(path, schemaType, vectorType);
}

return new TupleVector()
Expand Down Expand Up @@ -337,25 +352,8 @@ default String toCsv(int indent)
return sb.toString();
}

private static void validate(String columnPath, ResolvedType schemaType, ResolvedType vectorType)
private static void validate(List<String> columnPath, ResolvedType schemaType, ResolvedType vectorType)
{
// Don't validate any types, they can differ, reflection will be used runtime
if (schemaType.getType() == Type.Any)
{
return;
}

if (schemaType.getType() != vectorType.getType())
{
throw new IllegalArgumentException("Schema type: " + schemaType.getType() + " for column: " + columnPath + " doesn't match value vectors type " + vectorType);
}

// If this is not a complex type we're done
if (schemaType.getSchema() == null)
{
return;
}

Schema vectorSchema = vectorType.getSchema();

// Skip validation of empty vector schemas
Expand All @@ -369,7 +367,7 @@ private static void validate(String columnPath, ResolvedType schemaType, Resolve

if (size != vectorSchema.getSize())
{
throw new IllegalArgumentException("Schema size: " + size + " for column: " + columnPath + " doesn't match value vectors size: " + vectorSchema.getSize());
throw new IllegalArgumentException("Schema size: " + size + " for column: " + String.join("/", columnPath) + " doesn't match value vectors size: " + vectorSchema.getSize());
}

for (int i = 0; i < size; i++)
Expand All @@ -378,7 +376,26 @@ private static void validate(String columnPath, ResolvedType schemaType, Resolve
.get(i);
Column vectorColumn = vectorSchema.getColumns()
.get(i);
validate(columnPath + "/" + schemaColumn, schemaColumn.getType(), vectorColumn.getType());
ResolvedType childSchemaType = schemaColumn.getType();
ResolvedType childVectorType = vectorColumn.getType();

if (childSchemaType.getType() == Type.Any)
{
continue;
}
if (childSchemaType.getType() != childVectorType.getType())
{
throw new IllegalArgumentException("Schema type: " + childSchemaType
.getType() + " for column: " + String.join("/", columnPath) + "/" + schemaColumn.getName() + " doesn't match value vectors type " + childVectorType);
}
if (childSchemaType.getSchema() == null)
{
continue;
}

columnPath.add(schemaColumn.getName());
validate(columnPath, childSchemaType, childVectorType);
columnPath.remove(columnPath.size() - 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import se.kuseman.payloadbuilder.api.QualifiedName;
Expand Down Expand Up @@ -44,18 +45,19 @@ class InsertSink implements IDatasink
}

@Override
public void execute(IExecutionContext context, TupleIterator input)
public void execute(IExecutionContext context, Supplier<TupleIterator> input)
{
WriteResolveResult resolveResult = catalog.resolveForWrite(context, catalogAlias, table, options);
TupleIterator it = input.get();

//@formatter:off
try (OutputStream os = Files.newOutputStream(Path.of(resolveResult.filename()), OPENOPTIONS);
VectorWriter vectorWriter = context.getVectorWriter(resolveResult.format(), os, options))
//@formatter:on
{
while (input.hasNext())
while (it.hasNext())
{
TupleVector next = getVector(input.next());
TupleVector next = getVector(it.next());
vectorWriter.write(next);
}
}
Expand All @@ -65,7 +67,7 @@ public void execute(IExecutionContext context, TupleIterator input)
}
finally
{
input.close();
it.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import org.slf4j.Logger;
Expand Down Expand Up @@ -55,8 +56,9 @@ class InsertSink implements IDatasink
}

@Override
public void execute(IExecutionContext context, TupleIterator input)
public void execute(IExecutionContext context, Supplier<TupleIterator> inputSupplier)
{
TupleIterator input = inputSupplier.get();
String database = catalog.getDatabase(context.getSession(), catalogAlias);
SqlDialect dialect = DialectProvider.getDialect(context.getSession(), catalogAlias);
AtomicReference<Statement> currentStatemnet = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ public class KafkaCatalog extends Catalog
static final String NAME = "Kafka";

// Catalog property keys
static final String BOOTSTRAP_SERVERS = "bootstrap_servers";
static final String SCHEMA_REGISTRY_URL = "schema_registry_url";
static final String SECURITY_PROTOCOL = "security_protocol";
static final String SASL_MECHANISM = "sasl_mechanism";
static final String SASL_JAAS_CONFIG = "sasl_jaas_config";
static final String TOPIC = "topic";
public static final String BOOTSTRAP_SERVERS = "bootstrap_servers";
public static final String SCHEMA_REGISTRY_URL = "schema_registry_url";
public static final String SECURITY_PROTOCOL = "security_protocol";
public static final String SASL_MECHANISM = "sasl_mechanism";
public static final String SASL_JAAS_CONFIG = "sasl_jaas_config";
public static final String TOPIC = "topic";
private static final QualifiedName OFFSET = QualifiedName.of("offset");
private static final QualifiedName TIMESTAMP = QualifiedName.of("timestamp");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void test_datasink_selectinto_csv() throws IOException
List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred")));
// @format:on

sink.execute(context, new TupleIterator()
sink.execute(context, () -> new TupleIterator()
{
int index = 0;

Expand Down Expand Up @@ -283,7 +283,7 @@ void test_datasink_selectinto_txt() throws IOException
List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred")));
// @format:on

sink.execute(context, new TupleIterator()
sink.execute(context, () -> new TupleIterator()
{
int index = 0;

Expand Down Expand Up @@ -339,7 +339,7 @@ void test_datasink_selectinto_fallback() throws IOException
List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred")));
// @format:on

sink.execute(context, new TupleIterator()
sink.execute(context, () -> new TupleIterator()
{
int index = 0;

Expand Down Expand Up @@ -396,7 +396,7 @@ void test_datasink_selectinto_format_option() throws IOException
List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred")));
// @format:on

sink.execute(context, new TupleIterator()
sink.execute(context, () -> new TupleIterator()
{
int index = 0;

Expand Down Expand Up @@ -452,7 +452,7 @@ void test_datasink_selectinto_json() throws IOException
List.of(VectorTestUtils.vv(Type.Int, 100, 200, 300), VectorTestUtils.vv(Type.String, "hundret", "two hundret", "three hundred")));
// @format:on

sink.execute(context, new TupleIterator()
sink.execute(context, () -> new TupleIterator()
{
int index = 0;

Expand Down Expand Up @@ -501,7 +501,7 @@ void test_datasink_insertinto() throws IOException
List.of(VectorTestUtils.vv(Type.Int, 10, 20, 30), VectorTestUtils.vv(Type.String, "ten", "twenty", "thirty åäö")));
// @format:on

sink.execute(context, TupleIterator.singleton(vector1));
sink.execute(context, () -> TupleIterator.singleton(vector1));

// Insert into again but with json
//@formatter:off
Expand All @@ -513,7 +513,7 @@ void test_datasink_insertinto() throws IOException
List.of("newIntCol2", "newStringCol2")));
//@formatter:on

sink.execute(context, TupleIterator.singleton(vector1));
sink.execute(context, () -> TupleIterator.singleton(vector1));
// CSOFF
assertEquals("""
newIntCol newStringCol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void test_qualified_tables() throws SQLException
Column myStringColumn = getStringColumn("mystring", 100);
IDatasink sink = catalog.getInsertIntoSink(context.getSession(), CATALOG_ALIAS, tableName, new InsertIntoData(0, Schema.EMPTY, emptyList(), emptyList()));
//@formatter:off
sink.execute(context, TupleIterator.singleton(TupleVector.of(Schema.of(
sink.execute(context, () -> TupleIterator.singleton(TupleVector.of(Schema.of(
intColumn,
myStringColumn),
VectorTestUtils.vv(Type.Int, 10, 20),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void before()
//@formatter:on
// CSON

selectIntoSink.execute(executionContext, TupleIterator.singleton(testData));
selectIntoSink.execute(executionContext, () -> TupleIterator.singleton(testData));
}

@Test
Expand Down Expand Up @@ -519,7 +519,7 @@ void test_datasource_table_scan_asterisk()
//@formatter:on
// CSON
context = mockExecutionContext();
insertIntoSink.execute(context, TupleIterator.singleton(testData));
insertIntoSink.execute(context, () -> TupleIterator.singleton(testData));

it = ds.execute(context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void test_clob() throws SQLException
new Option(QualifiedName.of(JdbcCatalog.COLUMN, "mynclob", JdbcCatalog.DECLARATION), new LiteralStringExpression("NCLOB"))));

//@formatter:off
sink.execute(mockExecutionContext(), TupleIterator.singleton(TupleVector.of(Schema.of(
sink.execute(mockExecutionContext(), () -> TupleIterator.singleton(TupleVector.of(Schema.of(
Column.of("myclob", Type.String, new Column.MetaData(Map.of(MetaData.NULLABLE, false))),
Column.of("mynclob", Type.String)),
List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import se.kuseman.payloadbuilder.api.execution.TupleIterator;
import se.kuseman.payloadbuilder.api.execution.TupleVector;
import se.kuseman.payloadbuilder.api.execution.ValueVector;
import se.kuseman.payloadbuilder.api.execution.vector.ITupleVectorBuilder;
import se.kuseman.payloadbuilder.api.expression.IExpression;
import se.kuseman.payloadbuilder.core.QueryException;
import se.kuseman.payloadbuilder.core.execution.QuerySession;
import se.kuseman.payloadbuilder.core.execution.TemporaryTable;
import se.kuseman.payloadbuilder.core.physicalplan.PlanUtils;

/** Sink for inserting into temporary tables. */
class SelectIntoTempTableSink implements IDatasink
Expand Down Expand Up @@ -58,7 +58,7 @@ class SelectIntoTempTableSink implements IDatasink
}

@Override
public void execute(IExecutionContext context, TupleIterator input)
public void execute(IExecutionContext context, Supplier<TupleIterator> input)
{
// Strip the # prefix, we don't want that when looking up tables
QualifiedName table = this.table.extract(1)
Expand Down Expand Up @@ -98,7 +98,30 @@ public void execute(IExecutionContext context, TupleIterator input)
}
}

Supplier<TemporaryTable> tempTableSupplier = () -> new TemporaryTable(PlanUtils.concat(context, input), indices);
Supplier<TemporaryTable> tempTableSupplier = () ->
{
// Always materialise through TupleVectorBuilder rather than using PlanUtils.concat directly.
// PlanUtils.concat has a single-batch fast-path that returns the raw TupleVector as-is.
// For a TableScan that raw vector is an anonymous inner class (TableScan$1$1) holding
// a strong reference to its outer iterator (TableScan$1) and through that to the
// ExecutionContext → QuerySession → temporaryTables chain. Storing that in
// TemporaryTable.vector retains the entire chain for the cached entry's lifetime.
TupleIterator it = input.get();
try
{
ITupleVectorBuilder builder = context.getVectorFactory()
.getTupleVectorBuilder(0);
while (it.hasNext())
{
builder.append(it.next());
}
return new TemporaryTable(builder.build(), indices);
}
finally
{
it.close();
}
};
TemporaryTable temporaryTable;
if (cacheTtl != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private ExecutionContext(ExecutionContext source)
this.variables = source.variables;
this.statementContext = new StatementContext(source.statementContext);
this.vectorFactory = source.vectorFactory;
this.expressionFactory = new ExpressionFactory();
this.expressionFactory = source.expressionFactory;
}

/** Return session */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public CatalogRegistry getCatalogRegistry()
}

/** Return variables map */
Map<String, ValueVector> getVariables()
public Map<String, ValueVector> getVariables()
{
return variables;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,14 @@ static class IndexTupleVector implements TupleVector
{
final TupleVector vector;
final ValueVector selection;
private final ValueVector[] columns;

IndexTupleVector(TupleVector vector, IntList rows)
{
this.vector = vector;
this.selection = VectorUtils.convertToSelectionVector(rows);
this.columns = new ValueVector[vector.getSchema()
.getSize()];
}

@Override
Expand All @@ -254,7 +257,13 @@ public Schema getSchema()
@Override
public ValueVector getColumn(int column)
{
return SelectedValueVector.select(vector.getColumn(column), selection);
ValueVector col = columns[column];
if (col == null)
{
col = SelectedValueVector.select(vector.getColumn(column), selection);
columns[column] = col;
}
return col;
}
}

Expand Down
Loading
Loading