Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,17 @@ protected SparkUtils.CatalogAndIdentifier toCatalogAndIdentifier(

protected <T> T modifyPaimonTable(
Identifier ident, Function<org.apache.paimon.table.Table, T> func) {
return execute(ident, true, func);
}

private <T> T execute(
Identifier ident,
boolean refreshSparkCache,
Function<org.apache.paimon.table.Table, T> func) {
SparkTable sparkTable = loadSparkTable(ident);
org.apache.paimon.table.Table table = sparkTable.getTable();

T result = func.apply(table);
refreshSparkCache(ident, sparkTable);
return result;
}

if (refreshSparkCache) {
refreshSparkCache(ident, sparkTable);
}

protected <T> T modifySparkTable(Identifier ident, Function<SparkTable, T> func) {
SparkTable sparkTable = loadSparkTable(ident);
T result = func.apply(sparkTable);
refreshSparkCache(ident, sparkTable);
return result;
}

Expand All @@ -114,6 +109,10 @@ protected DataSourceV2Relation createRelation(Identifier ident) {
loadSparkTable(ident), Option.apply(tableCatalog), Option.apply(ident));
}

protected DataSourceV2Relation createRelation(Identifier ident, Table table) {
return DataSourceV2Relation.create(table, Option.apply(tableCatalog), Option.apply(ident));
}

protected void refreshSparkCache(Identifier ident, Table table) {
DataSourceV2Relation relation =
DataSourceV2Relation.create(table, Option.apply(tableCatalog), Option.apply(ident));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ public InternalRow[] call(InternalRow args) {
partitions == null || where == null,
"partitions and where cannot be used together.");
String finalWhere = partitions != null ? SparkProcedureUtils.toWhere(partitions) : where;
return modifyPaimonTable(
return modifySparkTable(
tableIdent,
t -> {
sparkTable -> {
org.apache.paimon.table.Table t = sparkTable.getTable();
checkArgument(t instanceof FileStoreTable);
FileStoreTable table = (FileStoreTable) t;
CoreOptions coreOptions = table.coreOptions();
Expand All @@ -195,7 +196,7 @@ public InternalRow[] call(InternalRow args) {
"order_by should not contain partition cols, because it is meaningless, your order_by cols are %s, and partition cols are %s",
sortColumns,
table.partitionKeys());
DataSourceV2Relation relation = createRelation(tableIdent);
DataSourceV2Relation relation = createRelation(tableIdent, sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
finalWhere,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ public InternalRow[] call(InternalRow args) {

LOG.info("Starting to build index for table " + tableIdent + " WHERE: " + finalWhere);

return modifyPaimonTable(
return modifySparkTable(
tableIdent,
t -> {
sparkTable -> {
try {
org.apache.paimon.table.Table t = sparkTable.getTable();
checkArgument(
t instanceof FileStoreTable,
"Only FileStoreTable supports global index creation.");
Expand All @@ -148,7 +149,7 @@ public InternalRow[] call(InternalRow args) {
"Column '%s' does not exist in table '%s'.",
column,
tableIdent);
DataSourceV2Relation relation = createRelation(tableIdent);
DataSourceV2Relation relation = createRelation(tableIdent, sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
finalWhere,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ public InternalRow[] call(InternalRow args) {
"partitions and where cannot be used together.");
String finalWhere = partitions != null ? SparkProcedureUtils.toWhere(partitions) : where;

return modifyPaimonTable(
return modifySparkTable(
tableIdent,
table -> {
sparkTable -> {
org.apache.paimon.table.Table table = sparkTable.getTable();
checkArgument(table instanceof FileStoreTable);
FileStoreTable fileStoreTable = (FileStoreTable) table;

Expand All @@ -130,7 +131,7 @@ public InternalRow[] call(InternalRow args) {
String.valueOf(snapshot.id()));
fileStoreTable = fileStoreTable.copy(dynamicOptions);

DataSourceV2Relation relation = createRelation(tableIdent);
DataSourceV2Relation relation = createRelation(tableIdent, sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
finalWhere,
Expand All @@ -144,7 +145,7 @@ public InternalRow[] call(InternalRow args) {
"When rescaling postpone bucket tables, you must provide the resulting bucket number.");
}

execute(fileStoreTable, bucketNum, partitionPredicate, tableIdent);
execute(fileStoreTable, bucketNum, partitionPredicate, relation);

InternalRow internalRow = newInternalRow(true);
return new InternalRow[] {internalRow};
Expand All @@ -155,9 +156,7 @@ private void execute(
FileStoreTable table,
@Nullable Integer bucketNum,
PartitionPredicate partitionPredicate,
Identifier tableIdent) {
DataSourceV2Relation relation = createRelation(tableIdent);

DataSourceV2Relation relation) {
SnapshotReader snapshotReader = table.newSnapshotReader();
if (partitionPredicate != null) {
snapshotReader = snapshotReader.withPartitionFilter(partitionPredicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String where = args.isNullAt(1) ? null : args.getString(1);

return modifyPaimonTable(
return modifySparkTable(
tableIdent,
t -> {
sparkTable -> {
org.apache.paimon.table.Table t = sparkTable.getTable();
FileStoreTable table = (FileStoreTable) t;
DataSourceV2Relation relation = createRelation(tableIdent);
DataSourceV2Relation relation = createRelation(tableIdent, sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
where,
Expand Down