From 26013dad2113272eb4b3722e13838345318bb6f7 Mon Sep 17 00:00:00 2001 From: zhoubin11 Date: Thu, 4 Dec 2025 15:10:03 +0800 Subject: [PATCH] [spark] Reduce redundant loadTabl --- .../paimon/spark/procedure/BaseProcedure.java | 23 +++++++++---------- .../spark/procedure/CompactProcedure.java | 7 +++--- .../procedure/CreateGlobalIndexProcedure.java | 7 +++--- .../spark/procedure/RescaleProcedure.java | 13 +++++------ .../procedure/RewriteFileIndexProcedure.java | 7 +++--- 5 files changed, 29 insertions(+), 28 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java index 2bf38c9fc7c4..c61f0c1fa5e9 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java @@ -76,22 +76,17 @@ protected SparkUtils.CatalogAndIdentifier toCatalogAndIdentifier( protected T modifyPaimonTable( Identifier ident, Function func) { - return execute(ident, true, func); - } - - private T execute( - Identifier ident, - boolean refreshSparkCache, - Function func) { SparkTable sparkTable = loadSparkTable(ident); org.apache.paimon.table.Table table = sparkTable.getTable(); - T result = func.apply(table); + refreshSparkCache(ident, sparkTable); + return result; + } - if (refreshSparkCache) { - refreshSparkCache(ident, sparkTable); - } - + protected T modifySparkTable(Identifier ident, Function func) { + SparkTable sparkTable = loadSparkTable(ident); + T result = func.apply(sparkTable); + refreshSparkCache(ident, sparkTable); return result; } @@ -114,6 +109,10 @@ protected DataSourceV2Relation createRelation(Identifier ident) { loadSparkTable(ident), Option.apply(tableCatalog), Option.apply(ident)); } + protected DataSourceV2Relation createRelation(Identifier ident, Table table) { + return DataSourceV2Relation.create(table, Option.apply(tableCatalog), Option.apply(ident)); + } + protected void refreshSparkCache(Identifier ident, Table table) { DataSourceV2Relation relation = DataSourceV2Relation.create(table, Option.apply(tableCatalog), Option.apply(ident)); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 3e97c160dc04..17f2cdd4a149 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -181,9 +181,10 @@ public InternalRow[] call(InternalRow args) { partitions == null || where == null, "partitions and where cannot be used together."); String finalWhere = partitions != null ? SparkProcedureUtils.toWhere(partitions) : where; - return modifyPaimonTable( + return modifySparkTable( tableIdent, - t -> { + sparkTable -> { + org.apache.paimon.table.Table t = sparkTable.getTable(); checkArgument(t instanceof FileStoreTable); FileStoreTable table = (FileStoreTable) t; CoreOptions coreOptions = table.coreOptions(); @@ -195,7 +196,7 @@ public InternalRow[] call(InternalRow args) { "order_by should not contain partition cols, because it is meaningless, your order_by cols are %s, and partition cols are %s", sortColumns, table.partitionKeys()); - DataSourceV2Relation relation = createRelation(tableIdent); + DataSourceV2Relation relation = createRelation(tableIdent, sparkTable); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( finalWhere, diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java index 965a53fcb777..f08343188a58 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java @@ -129,10 +129,11 @@ public InternalRow[] call(InternalRow args) { LOG.info("Starting to build index for table " + tableIdent + " WHERE: " + finalWhere); - return modifyPaimonTable( + return modifySparkTable( tableIdent, - t -> { + sparkTable -> { try { + org.apache.paimon.table.Table t = sparkTable.getTable(); checkArgument( t instanceof FileStoreTable, "Only FileStoreTable supports global index creation."); @@ -148,7 +149,7 @@ public InternalRow[] call(InternalRow args) { "Column '%s' does not exist in table '%s'.", column, tableIdent); - DataSourceV2Relation relation = createRelation(tableIdent); + DataSourceV2Relation relation = createRelation(tableIdent, sparkTable); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( finalWhere, diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java index 8f8a8874be81..394f3f4d0702 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java @@ -106,9 +106,10 @@ public InternalRow[] call(InternalRow args) { "partitions and where cannot be used together."); String finalWhere = partitions != null ? SparkProcedureUtils.toWhere(partitions) : where; - return modifyPaimonTable( + return modifySparkTable( tableIdent, - table -> { + sparkTable -> { + org.apache.paimon.table.Table table = sparkTable.getTable(); checkArgument(table instanceof FileStoreTable); FileStoreTable fileStoreTable = (FileStoreTable) table; @@ -130,7 +131,7 @@ public InternalRow[] call(InternalRow args) { String.valueOf(snapshot.id())); fileStoreTable = fileStoreTable.copy(dynamicOptions); - DataSourceV2Relation relation = createRelation(tableIdent); + DataSourceV2Relation relation = createRelation(tableIdent, sparkTable); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( finalWhere, @@ -144,7 +145,7 @@ public InternalRow[] call(InternalRow args) { "When rescaling postpone bucket tables, you must provide the resulting bucket number."); } - execute(fileStoreTable, bucketNum, partitionPredicate, tableIdent); + execute(fileStoreTable, bucketNum, partitionPredicate, relation); InternalRow internalRow = newInternalRow(true); return new InternalRow[] {internalRow}; @@ -155,9 +156,7 @@ private void execute( FileStoreTable table, @Nullable Integer bucketNum, PartitionPredicate partitionPredicate, - Identifier tableIdent) { - DataSourceV2Relation relation = createRelation(tableIdent); - + DataSourceV2Relation relation) { SnapshotReader snapshotReader = table.newSnapshotReader(); if (partitionPredicate != null) { snapshotReader = snapshotReader.withPartitionFilter(partitionPredicate); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java index d45bf5796241..ea48a449fc10 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java @@ -96,11 +96,12 @@ public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String where = args.isNullAt(1) ? null : args.getString(1); - return modifyPaimonTable( + return modifySparkTable( tableIdent, - t -> { + sparkTable -> { + org.apache.paimon.table.Table t = sparkTable.getTable(); FileStoreTable table = (FileStoreTable) t; - DataSourceV2Relation relation = createRelation(tableIdent); + DataSourceV2Relation relation = createRelation(tableIdent, sparkTable); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( where,