Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class DatasourceData
private final List<? extends ISortItem> sortItems;
private final Projection projection;
private final List<Option> options;
/** Top count hint from a TOP/LIMIT operator. -1 means no limit. Catalogs may use this to append TOP/LIMIT to their generated SQL. */
private int topCount = -1;

public DatasourceData(int nodeId, List<IPredicate> predicates, List<? extends ISortItem> sortItems, Projection projection, List<Option> options)
{
Expand Down Expand Up @@ -84,6 +86,22 @@ public List<Option> getOptions()
return options;
}

/**
* Return the top count hint pushed down from a TOP/LIMIT operator. Returns -1 if no limit is set. Catalogs that support row limiting (e.g. JDBC) may append TOP/LIMIT to their generated SQL for
* efficiency. The physical Limit operator remains in the plan and provides correctness; this is purely a performance hint.
*/
public int getTopCount()
{
return topCount;
}

/** Set the top count hint. Called by the planner when a TOP/LIMIT operator is pushed down. */
public DatasourceData withTopCount(int n)
{
this.topCount = n;
return this;
}

/**
* Convenience method for extracting a equals predicate for specific column.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private IDatasource getDataSource(IQuerySession session, String catalogAlias, Qu
.findAny()
.orElse(null);

return new JdbcDatasource(this, catalogAlias, table, seekPredicate, projection, predicates, sortItems, tableHintsOption, data.getOptions());
return new JdbcDatasource(this, catalogAlias, table, seekPredicate, projection, predicates, sortItems, tableHintsOption, data.getOptions(), data.getTopCount());
}

private Projection getOptionProjection(List<Option> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
class JdbcDatasource implements IDatasource
{
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcDatasource.class);
static final String QUERY = "Query";
private final JdbcCatalog catalog;
private final String catalogAlias;
private final QualifiedName table;
Expand All @@ -58,9 +59,10 @@ class JdbcDatasource implements IDatasource
private final IExpression tableHintsOption;
private final List<Option> options;
private final Map<String, ColumnOption> columnOptions;
private final int topCount;

JdbcDatasource(JdbcCatalog catalog, String catalogAlias, QualifiedName table, ISeekPredicate indexPredicate, Projection projection, List<IPredicate> predicates, List<ISortItem> sortItems,
IExpression tableHintsOption, List<Option> options)
IExpression tableHintsOption, List<Option> options, int topCount)
{
this.catalog = catalog;
this.catalogAlias = catalogAlias;
Expand All @@ -72,6 +74,7 @@ class JdbcDatasource implements IDatasource
this.tableHintsOption = tableHintsOption;
this.options = options;
this.columnOptions = ColumnOption.extract(options);
this.topCount = topCount;
}

@Override
Expand All @@ -94,7 +97,7 @@ public Map<String, Object> getDescribeProperties(IExecutionContext context)
.collect(joining(",")));
}

result.put("Query", buildSql(dialect, context, true));
result.put(QUERY, buildSql(dialect, context, true));

return result;
}
Expand Down Expand Up @@ -129,6 +132,15 @@ private String buildSql(SqlDialect dialect, IExecutionContext context, boolean d
.collect(joining(","));
};
StringBuilder sb = new StringBuilder("SELECT ");
// Dialect-specific TOP at the beginning of SELECT (e.g. SQL Server TOP(n))
if (topCount >= 0)
{
String selectTop = dialect.selectTopN(topCount);
if (!selectTop.isEmpty())
{
sb.append(selectTop);
}
}
sb.append(projectionString);
sb.append(" FROM ");
sb.append(table.toString())
Expand Down Expand Up @@ -266,6 +278,14 @@ public ValueVector getValue()
}
}

// Dialect-specific trailing TOP/LIMIT (e.g. LIMIT n for MySQL/PostgreSQL, FETCH FIRST n ROWS ONLY for ANSI)
if (topCount >= 0
&& dialect.selectTopN(topCount)
.isEmpty())
{
dialect.appendTopN(sb, topCount);
}

return sb.toString();
}

Expand Down Expand Up @@ -456,6 +476,9 @@ public void close()
.getPrintWriter());
JdbcUtils.printWarnings(connection, context.getSession()
.getPrintWriter());
// Cancel any in-progress query so the server stops executing and data transfer halts
// (important when a TOP/LIMIT operator closes the iterator before all rows are consumed)
JdbcUtils.cancelQuiet(statement);
JdbcUtils.closeQuiet(connection, statement, rs);
schemaResult = null;
vectors = null;
Expand Down Expand Up @@ -502,6 +525,10 @@ private Statement getStatement() throws SQLException
statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
}

// Enable streaming so drivers that support it (SQL Server adaptive, MySQL useCursorFetch, etc.) fetch
// rows on demand. Combined with close() → cancel() this stops data transfer as soon as caller is done.
statement.setFetchSize(batchSize);

JdbcUtils.printWarnings(statement, context.getSession()
.getPrintWriter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class MySqlMariaDbDialect implements SqlDialect
{
private static final ZoneId UTC = ZoneId.of("UTC");

@Override
public void appendTopN(StringBuilder sb, int n)
{
sb.append(" LIMIT ")
.append(n);
}

@Override
public ColumnMeta getColumnMeta(ResultSetMetaData rsmd, int jdbcType, int ordinal) throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ class PostgreDialect implements SqlDialect
{
private static final String TIMESTAMPTZ = "TIMESTAMPTZ";

@Override
public void appendTopN(StringBuilder sb, int n)
{
sb.append(" LIMIT ")
.append(n);
}

@Override
public String getIdentifierQuoteString(Connection connection) throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,33 @@ default String getDropTableStatement(QualifiedName qname, boolean lenient)
: "DROP TABLE %s").formatted(qname.toDotDelimited());
}

/**
* Returns the TOP-N fragment inserted right after "SELECT " in the SELECT clause (SQL Server style). Return empty string if this dialect uses a trailing clause instead.
* <p>
* Example for SQL Server: {@code "TOP(500) "}
* </p>
*/
default String selectTopN(int n)
{
return "";
}

/**
* Appends a trailing row-limiting clause to the query (MySQL/PostgreSQL/ANSI style). Only called when {@link #selectTopN(int)} returns an empty string.
* <p>
* Default: ANSI SQL {@code FETCH FIRST n ROWS ONLY}.
* </p>
*/
default void appendTopN(StringBuilder sb, int n)
{
sb.append(" FETCH FIRST ")
.append(n)
.append(" ROWS ONLY");
}

/**
* Appends join statement for provided seek keys.
*
*
* <pre>
* This is the SQL that we need
*
Expand All @@ -329,7 +353,7 @@ default String getDropTableStatement(QualifiedName qname, boolean lenient)
* SELECT 1 col1, 2 col2
* UNION SELECT 2, 4
* UNION SELECT 3, 6
*
*
* ) xx
* ON xx.col1 = y.col1
* AND xx.col2 = y.col2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ else if (type == Type.Any)
return SqlDialect.super.getColumnDeclaration(type, scale, precision);
}

@Override
public String selectTopN(int n)
{
return "TOP(" + n + ") ";
}

@Override
public String getDropTableStatement(QualifiedName qname, boolean lenient)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,36 @@ void test_datasource_index_seek_multiple_columns_asterisk_with_sortitems()
assertEquals(1, rowCount);
}

@Test
void test_datasource_table_scan_with_top_count()
{
IExecutionContext context = mockExecutionContext();
// Table has 5 rows — request only 3 via topCount push-down
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, TEST_TABLE, new DatasourceData(0, emptyList(), emptyList(), Projection.ALL, emptyList()).withTopCount(3));

// Verify the generated SQL contains the dialect-specific row-limiting clause
Map<String, Object> describe = ds.getDescribeProperties(context);
String query = String.valueOf(describe.get(JdbcDatasource.QUERY));
assertTrue(query.toLowerCase()
.contains("top")
|| query.toLowerCase()
.contains("limit")
|| query.toLowerCase()
.contains("fetch first"),
"Expected TOP/LIMIT/FETCH FIRST in query: " + query);

TupleIterator it = ds.execute(context);
int rowCount = 0;
while (it.hasNext())
{
rowCount += it.next()
.getRowCount();
}
it.close();

assertEquals(3, rowCount);
}

protected IExecutionContext mockExecutionContext()
{
return mockExecutionContext(null);
Expand Down
45 changes: 37 additions & 8 deletions payloadbuilder-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<plugin>
<groupId>se.bjurr.gitchangelog</groupId>
<artifactId>git-changelog-maven-plugin</artifactId>
<version>1.92</version>
<version>2.2.11</version>
<executions>
<execution>
<id>GenerateChangelog</id>
Expand All @@ -145,25 +145,54 @@
{{#ifReleaseTag .}}
## [{{name}}](https://github.com/kuseman/payloadbuilder/releases/tag/{{name}}) ({{tagDate .}})

{{#ifContainsType commits type='feat'}}
{{#ifContainsBreaking commits}}
### Breaking Changes
{{#commits}}
{{#ifCommitBreaking .}}
- {{#eachCommitScope .}} **{{.}}** {{/eachCommitScope}} {{{commitDescription .}}} ([{{hash}}](https://github.com/kuseman/payloadbuilder/commit/{{hashFull}}))
{{/ifCommitBreaking}}
{{/commits}}
{{/ifContainsBreaking}}
{{#ifContainsType commits type='feat'}}
### Features

{{#commits}}
{{#ifCommitType . type='feat'}}
- {{#eachCommitScope .}} **{{.}}** {{/eachCommitScope}} {{{commitDescription .}}} ([{{hash}}](https://github.com/kuseman/payloadbuilder/commit/{{hashFull}}))
{{/ifCommitType}}
{{/commits}}
{{/ifContainsType}}

{{#ifContainsType commits type='fix'}}
{{/ifContainsType}}
{{#ifContainsType commits type='fix'}}
### Bug Fixes

{{#commits}}
{{#ifCommitType . type='fix'}}
- {{#eachCommitScope .}} **{{.}}** {{/eachCommitScope}} {{{commitDescription .}}} ([{{hash}}](https://github.com/kuseman/payloadbuilder/commit/{{hashFull}}))
{{/ifCommitType}}
{{/commits}}
{{/ifContainsType}}
{{/ifContainsType}}
{{#ifContainsType commits type='perf'}}
### Performance Improvements
{{#commits}}
{{#ifCommitType . type='perf'}}
- {{#eachCommitScope .}} **{{.}}** {{/eachCommitScope}} {{{commitDescription .}}} ([{{hash}}](https://github.com/kuseman/payloadbuilder/commit/{{hashFull}}))
{{/ifCommitType}}
{{/commits}}
{{/ifContainsType}}
{{#ifContainsType commits type='refactor'}}
### Refactoring
{{#commits}}
{{#ifCommitType . type='refactor'}}
- {{#eachCommitScope .}} **{{.}}** {{/eachCommitScope}} {{{commitDescription .}}} ([{{hash}}](https://github.com/kuseman/payloadbuilder/commit/{{hashFull}}))
{{/ifCommitType}}
{{/commits}}
{{/ifContainsType}}
{{#ifContainsType commits type='docs'}}
### Documentation
{{#commits}}
{{#ifCommitType . type='docs'}}
- {{#eachCommitScope .}} **{{.}}** {{/eachCommitScope}} {{{commitDescription .}}} ([{{hash}}](https://github.com/kuseman/payloadbuilder/commit/{{hashFull}}))
{{/ifCommitType}}
{{/commits}}
{{/ifContainsType}}

{{/ifReleaseTag}}
{{/tags}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
*/
public class CompiledQuery
{
private final String name;
private final QueryStatement query;
private final List<Warning> warnings;

CompiledQuery(QueryStatement query, List<Warning> warnings)
CompiledQuery(QueryStatement query, List<Warning> warnings, String name)
{
this.name = name;
this.query = requireNonNull(query);
this.warnings = warnings;
}

public String getName()
{
return name;
}

/** Execute this query with provided session */
public QueryResult execute(QuerySession session)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,19 @@ public static CompiledQuery compile(CatalogRegistry registry, String query)
* </pre>
*/
public static CompiledQuery compile(QuerySession session, String query)
{
return compile(session, query, null);
}

/**
* Compile query with a stable name. The name is embedded in coverage reports, allowing test code to register a source file for it via
* {@link se.kuseman.payloadbuilder.core.execution.QueryCoverageRegistry#registerQuerySource(String, String)}.
*/
public static CompiledQuery compile(QuerySession session, String query, String queryName)
{
List<CompiledQuery.Warning> warnings = new ArrayList<>();
QueryStatement queryStatement = PARSER.parseQuery(query, warnings);
queryStatement = StatementPlanner.plan(session, queryStatement);
return new CompiledQuery(queryStatement, warnings);
queryStatement = StatementPlanner.plan(session, queryStatement, queryName);
return new CompiledQuery(queryStatement, warnings, queryName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public static void registerCacheMBean(CacheType type, String providerName, Quali
: "",
name));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
if (mbs.isRegistered(objectName))
{
mbs.unregisterMBean(objectName);
}
mbs.registerMBean(new CacheMBeanImpl(cache), objectName);
}
catch (Exception e)
Expand Down
Loading
Loading