From e093e3dd476305be1c36e06baf85ec5925e96e4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 25 Dec 2025 11:38:37 +0800 Subject: [PATCH 1/9] [flink] Build in function --- .../org/apache/paimon/flink/FlinkCatalog.java | 10 ++++- .../flink/function/BuiltInFunctions.java | 34 +++++++++++++++++ .../flink/function/DescriptorToString.java | 37 +++++++++++++++++++ .../flink/function/PathToDescriptor.java | 36 ++++++++++++++++++ .../apache/paimon/flink/BlobTableITCase.java | 34 +++++++++++++++++ 5 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index cd259903f21a..7738f1bac2be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -26,6 +26,7 @@ import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.flink.function.BuiltInFunctions; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; import org.apache.paimon.flink.utils.FlinkDescriptorProperties; @@ -1361,16 +1362,23 @@ public final void alterPartition( @Override public final List listFunctions(String dbName) throws CatalogException { + List functions = new ArrayList<>(BuiltInFunctions.FUNCTIONS.keySet()); try { - return catalog.listFunctions(dbName); + functions.addAll(catalog.listFunctions(dbName)); } catch (Catalog.DatabaseNotExistException e) { throw new CatalogException(e.getMessage(), e); } + return functions; } @Override public final CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { + if (BuiltInFunctions.FUNCTIONS.containsKey(functionPath.getObjectName())) { + String builtInFunction = BuiltInFunctions.FUNCTIONS.get(functionPath.getObjectName()); + return new CatalogFunctionImpl(builtInFunction, FunctionLanguage.JAVA); + } + try { org.apache.paimon.function.Function function = catalog.getFunction(toIdentifier(functionPath)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java new file mode 100644 index 000000000000..a6a94faf6141 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.function; + +import java.util.HashMap; +import java.util.Map; + +/** Paimon flink built in functions. */ +public class BuiltInFunctions { + + public static final Map FUNCTIONS = + new HashMap() { + { + put("path_to_descriptor", PathToDescriptor.class.getName()); + put("descriptor_to_string", DescriptorToString.class.getName()); + } + }; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java new file mode 100644 index 000000000000..fd622bf9efed --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.function; + +import org.apache.paimon.data.BlobDescriptor; + +import org.apache.flink.table.functions.ScalarFunction; + +/** Blob descriptor to string. */ +public class DescriptorToString extends ScalarFunction { + + public String eval(byte[] descriptorBytes) { + if (descriptorBytes == null) { + return null; + } + + BlobDescriptor descriptor = BlobDescriptor.deserialize(descriptorBytes); + + return descriptor.toString(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java new file mode 100644 index 000000000000..87d136f71790 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.function; + +import org.apache.paimon.data.BlobDescriptor; + +import org.apache.flink.table.functions.ScalarFunction; + +/** File path to blob descriptor. */ +public class PathToDescriptor extends ScalarFunction { + + public byte[] eval(String path) { + if (path == null) { + return null; + } + + BlobDescriptor descriptor = new BlobDescriptor(path, 0, Long.MAX_VALUE); + return descriptor.serialize(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java index 73c11c8f3b81..a96bb45eecd6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java @@ -98,6 +98,40 @@ public void testWriteBlobAsDescriptor() throws Exception { .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData)); } + @Test + public void testWriteBlobWithBuiltInFunction() throws Exception { + byte[] blobData = new byte[1024 * 1024]; + RANDOM.nextBytes(blobData); + FileIO fileIO = new LocalFileIO(); + String uri = "file://" + warehouse + "/external_blob"; + try (OutputStream outputStream = + fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), true)) { + outputStream.write(blobData); + } + + BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, blobData.length); + batchSql( + "INSERT INTO blob_table_descriptor VALUES (1, 'paimon', path_to_descriptor('" + + uri + + "'))"); + byte[] newDescriptorBytes = + (byte[]) batchSql("SELECT picture FROM blob_table_descriptor").get(0).getField(0); + BlobDescriptor newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes); + Options options = new Options(); + options.set("warehouse", warehouse.toString()); + CatalogContext catalogContext = CatalogContext.create(options); + UriReaderFactory uriReaderFactory = new UriReaderFactory(catalogContext); + Blob blob = + Blob.fromDescriptor( + uriReaderFactory.create(newBlobDescriptor.uri()), blobDescriptor); + assertThat(blob.toData()).isEqualTo(blobData); + URI blobUri = URI.create(blob.toDescriptor().uri()); + assertThat(blobUri.getScheme()).isNotNull(); + batchSql("ALTER TABLE blob_table_descriptor SET ('blob-as-descriptor'='false')"); + assertThat(batchSql("SELECT * FROM blob_table_descriptor")) + .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData)); + } + private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); public static String bytesToHex(byte[] bytes) { From 2537ee4fc8a8dd6fdd1dd3b2f3e3e672fdbd6598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 25 Dec 2025 13:30:19 +0800 Subject: [PATCH 2/9] [spark] Add built-in paimon-spark function --- .../org/apache/paimon/spark/SparkCatalog.java | 8 ++- .../spark/function/BuiltInFunctions.java | 36 ++++++++++++ .../function/PathToDescriptorFunction.java | 55 ++++++++++++++++++ .../function/PathToDescriptorUnbound.java | 56 +++++++++++++++++++ .../paimon/spark/sql/BlobTestBase.scala | 37 ++++++++++++ 5 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 5b5a18c80e31..8ea519875484 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -36,6 +36,7 @@ import org.apache.paimon.spark.catalog.SupportView; import org.apache.paimon.spark.catalog.functions.PaimonFunctions; import org.apache.paimon.spark.catalog.functions.V1FunctionConverter; +import org.apache.paimon.spark.function.BuiltInFunctions; import org.apache.paimon.spark.utils.CatalogUtils; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.iceberg.IcebergTable; @@ -536,12 +537,12 @@ public void renameTable(Identifier oldIdent, Identifier newIdent) @Override public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { + List result = new ArrayList<>(); + BuiltInFunctions.FUNCTIONS.keySet().forEach(fn -> result.add(Identifier.of(namespace, fn))); if (isSystemFunctionNamespace(namespace)) { - List result = new ArrayList<>(); PaimonFunctions.names().forEach(name -> result.add(Identifier.of(namespace, name))); return result.toArray(new Identifier[0]); } else if (isDatabaseFunctionNamespace(namespace)) { - List result = new ArrayList<>(); String databaseName = getDatabaseNameFromNamespace(namespace); try { catalog.listFunctions(databaseName) @@ -556,6 +557,9 @@ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExce @Override public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + if (BuiltInFunctions.FUNCTIONS.containsKey(ident.name())) { + return BuiltInFunctions.FUNCTIONS.get(ident.name()); + } String[] namespace = ident.namespace(); if (isSystemFunctionNamespace(namespace)) { UnboundFunction func = PaimonFunctions.load(ident.name()); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java new file mode 100644 index 000000000000..60b78d6b86e8 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; + +import java.util.HashMap; +import java.util.Map; + +/** Spark-paimon built in functions. */ +public class BuiltInFunctions { + + public static final Map FUNCTIONS = + new HashMap() { + { + put("path_to_descriptor", new PathToDescriptorUnbound()); + } + }; + +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java new file mode 100644 index 000000000000..5af814447685 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.paimon.data.BlobDescriptor; +import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.unsafe.types.UTF8String; + +import java.io.Serializable; + +/** Function to convert file path to blob descriptor. */ +public class PathToDescriptorFunction implements ScalarFunction, Serializable { + + @Override + public DataType[] inputTypes() { + return new DataType[] { DataTypes.StringType }; + } + + @Override + public DataType resultType() { + return DataTypes.BinaryType; + } + + public byte[] invoke(UTF8String path) { + if (path == null) { + return null; + } + + BlobDescriptor descriptor = new BlobDescriptor(path.toString(), 0, Long.MAX_VALUE); + return descriptor.serialize(); + } + + @Override + public String name() { + return "path_to_descriptor"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java new file mode 100644 index 000000000000..d4c1d5f6a710 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.spark.sql.connector.catalog.functions.BoundFunction; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructType; + +/** Function unbound to {@link PathToDescriptorFunction}. */ +public class PathToDescriptorUnbound implements UnboundFunction { + + @Override + public BoundFunction bind(StructType inputType) { + if (inputType.fields().length != 1) { + throw new UnsupportedOperationException( + "Function 'byte_to_string' requires 1 argument, but found " + inputType.fields().length + ); + } + + if (!(inputType.fields()[0].dataType() instanceof StringType)) { + throw new UnsupportedOperationException( + "The first argument of 'byte_to_string' must be BINARY type, but found " + + inputType.fields()[0].dataType().simpleString() + ); + } + + return new PathToDescriptorFunction(); + } + + @Override + public String description() { + return "Convert file path to blob descriptor"; + } + + @Override + public String name() { + return "path_to_descriptor"; + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 854f2eac34d7..01351b708032 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -127,6 +127,43 @@ class BlobTestBase extends PaimonSparkTestBase { } } + test("Blob: test write blob descriptor with built-in function") { + withTable("t") { + val blobData = new Array[Byte](1024 * 1024) + RANDOM.nextBytes(blobData) + val fileIO = new LocalFileIO + val uri = "file://" + tempDBDir.toString + "/external_blob" + try { + val outputStream = fileIO.newOutputStream(new Path(uri), true) + try outputStream.write(blobData) + finally if (outputStream != null) outputStream.close() + } + + val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length) + sql( + "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n" + "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" + "content BINARY\n" + ") \n" + + "PARTITIONED BY (ds STRING, batch STRING) \n" + + "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content','blob-as-descriptor' = 'true')") + sql( + "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES \n('1','paimon','1024','12345678','20241017', path_to_descriptor('" + uri + "'))") + val newDescriptorBytes = + sql("SELECT content FROM t WHERE id = '1'").collect()(0).get(0).asInstanceOf[Array[Byte]] + val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes) + val options = new Options() + options.set("warehouse", tempDBDir.toString) + val catalogContext = CatalogContext.create(options) + val uriReaderFactory = new UriReaderFactory(catalogContext) + val blob = Blob.fromDescriptor(uriReaderFactory.create(newBlobDescriptor.uri), blobDescriptor) + assert(util.Arrays.equals(blobData, blob.toData)) + + sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')") + checkAnswer( + sql("SELECT id, name, content, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE id = 1"), + Seq(Row("1", "paimon", blobData, 0, 1)) + ) + } + } + test("Blob: test compaction") { withTable("t") { sql( From 84c880d8a1e21e49be87b0b47c50a2b3f8765aaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 25 Dec 2025 14:02:36 +0800 Subject: [PATCH 3/9] Fix test --- .../apache/paimon/spark/function/BuiltInFunctions.java | 1 - .../paimon/spark/function/PathToDescriptorFunction.java | 3 ++- .../paimon/spark/function/PathToDescriptorUnbound.java | 9 ++++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java index 60b78d6b86e8..4fc28759cd20 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java @@ -32,5 +32,4 @@ public class BuiltInFunctions { put("path_to_descriptor", new PathToDescriptorUnbound()); } }; - } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java index 5af814447685..ea6f306cc3a7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java @@ -19,6 +19,7 @@ package org.apache.paimon.spark.function; import org.apache.paimon.data.BlobDescriptor; + import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -31,7 +32,7 @@ public class PathToDescriptorFunction implements ScalarFunction, Seriali @Override public DataType[] inputTypes() { - return new DataType[] { DataTypes.StringType }; + return new DataType[] {DataTypes.StringType}; } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java index d4c1d5f6a710..43dfe11f04f4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java @@ -30,15 +30,14 @@ public class PathToDescriptorUnbound implements UnboundFunction { public BoundFunction bind(StructType inputType) { if (inputType.fields().length != 1) { throw new UnsupportedOperationException( - "Function 'byte_to_string' requires 1 argument, but found " + inputType.fields().length - ); + "Function 'byte_to_string' requires 1 argument, but found " + + inputType.fields().length); } if (!(inputType.fields()[0].dataType() instanceof StringType)) { throw new UnsupportedOperationException( - "The first argument of 'byte_to_string' must be BINARY type, but found " + - inputType.fields()[0].dataType().simpleString() - ); + "The first argument of 'byte_to_string' must be BINARY type, but found " + + inputType.fields()[0].dataType().simpleString()); } return new PathToDescriptorFunction(); From faa73f74a3dc6db8dc08fea53b54c4414ab98d35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 25 Dec 2025 15:14:14 +0800 Subject: [PATCH 4/9] Fix minus --- .../extensions/RewritePaimonFunctionCommands.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala index ddbd9df5ac1b..046d8b954b79 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala @@ -25,6 +25,7 @@ import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME import org.apache.paimon.spark.catalog.SupportV1Function import org.apache.paimon.spark.catalog.functions.PaimonFunctions import org.apache.paimon.spark.execution.{CreatePaimonV1FunctionCommand, DescribePaimonV1FunctionCommand, DropPaimonV1FunctionCommand} +import org.apache.paimon.spark.function.BuiltInFunctions import org.apache.paimon.spark.util.OptionUtils import org.apache.spark.sql.SparkSession @@ -143,7 +144,9 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, FunctionIdentifier, Boolean)] = { nameParts match { // Spark's built-in or tmp functions is without database name or catalog name. - case Seq(funName) if isSparkBuiltInFunction(FunctionIdentifier(funName)) => + case Seq(funName) + if isSparkBuiltInFunction(FunctionIdentifier(funName)) || isPaimonBuiltInFunction( + FunctionIdentifier(funName)) => None case Seq(funName) if isSparkTmpFunc(FunctionIdentifier(funName)) => Some(null, FunctionIdentifier(funName), true) @@ -175,6 +178,10 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) catalogManager.v1SessionCatalog.isBuiltinFunction(funcIdent) } + private def isPaimonBuiltInFunction(funcIdent: FunctionIdentifier): Boolean = { + BuiltInFunctions.FUNCTIONS.containsKey(funcIdent.funcName) + } + private def isSparkTmpFunc(funcIdent: FunctionIdentifier): Boolean = { catalogManager.v1SessionCatalog.isTemporaryFunction(funcIdent) } From 09d344ff6501be25ba6dd9d938cba778d091cb18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 25 Dec 2025 15:29:57 +0800 Subject: [PATCH 5/9] Fix minus --- .../parser/extensions/RewritePaimonFunctionCommands.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala index 046d8b954b79..fd1ddd8f2eec 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala @@ -144,9 +144,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, FunctionIdentifier, Boolean)] = { nameParts match { // Spark's built-in or tmp functions is without database name or catalog name. - case Seq(funName) - if isSparkBuiltInFunction(FunctionIdentifier(funName)) || isPaimonBuiltInFunction( - FunctionIdentifier(funName)) => + case Seq(funName) if isSparkBuiltInFunction(FunctionIdentifier(funName)) => None case Seq(funName) if isSparkTmpFunc(FunctionIdentifier(funName)) => Some(null, FunctionIdentifier(funName), true) @@ -170,7 +168,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) case Some(db) if db == SYSTEM_DATABASE_NAME && PaimonFunctions.names.contains(funcIdent.funcName) => true - case _ => false + case _ => BuiltInFunctions.FUNCTIONS.containsKey(funcIdent.funcName) } } From 0f8b1c536852fb2ab39bbcfeefd9f27d0d90f36a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 25 Dec 2025 17:45:19 +0800 Subject: [PATCH 6/9] Fix minus --- .../spark/function/BuiltInFunctions.java | 1 + .../function/DescriptorToStringFunction.java | 56 +++++++++++++++++++ .../function/DescriptorToStringUnbound.java | 55 ++++++++++++++++++ .../function/PathToDescriptorUnbound.java | 4 +- 4 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java index 4fc28759cd20..ad19eb952aa6 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java @@ -30,6 +30,7 @@ public class BuiltInFunctions { new HashMap() { { put("path_to_descriptor", new PathToDescriptorUnbound()); + put("descriptor_to_string", new DescriptorToStringUnbound()); } }; } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java new file mode 100644 index 000000000000..482786f12eea --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.paimon.data.BlobDescriptor; + +import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.unsafe.types.UTF8String; + +import java.io.Serializable; + +/** Function to convert blob descriptor to its string representation. */ +public class DescriptorToStringFunction implements ScalarFunction, Serializable { + + @Override + public DataType[] inputTypes() { + return new DataType[] {DataTypes.BinaryType}; + } + + @Override + public DataType resultType() { + return DataTypes.StringType; + } + + public UTF8String invoke(byte[] descriptorBytes) { + if (descriptorBytes == null) { + return null; + } + + BlobDescriptor descriptor = BlobDescriptor.deserialize(descriptorBytes); + return UTF8String.fromString(descriptor.toString()); + } + + @Override + public String name() { + return "descriptor_to_string"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java new file mode 100644 index 000000000000..3fd02fe8c874 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.spark.sql.connector.catalog.functions.BoundFunction; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.StructType; + +/** Function unbound to {@link PathToDescriptorFunction}. */ +public class DescriptorToStringUnbound implements UnboundFunction { + + @Override + public BoundFunction bind(StructType inputType) { + if (inputType.fields().length != 1) { + throw new UnsupportedOperationException( + "Function 'descriptor_to_string' requires 1 argument, but found " + + inputType.fields().length); + } + + if (!(inputType.fields()[0].dataType() instanceof BinaryType)) { + throw new UnsupportedOperationException( + "The first argument of 'descriptor_to_string' must be BINARY type, but found " + + inputType.fields()[0].dataType().simpleString()); + } + + return new PathToDescriptorFunction(); + } + + @Override + public String description() { + return "Convert file path to blob descriptor"; + } + + @Override + public String name() { + return "path_to_descriptor"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java index 43dfe11f04f4..23b7718847db 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java @@ -30,13 +30,13 @@ public class PathToDescriptorUnbound implements UnboundFunction { public BoundFunction bind(StructType inputType) { if (inputType.fields().length != 1) { throw new UnsupportedOperationException( - "Function 'byte_to_string' requires 1 argument, but found " + "Function 'path_to_descriptor' requires 1 argument, but found " + inputType.fields().length); } if (!(inputType.fields()[0].dataType() instanceof StringType)) { throw new UnsupportedOperationException( - "The first argument of 'byte_to_string' must be BINARY type, but found " + "The first argument of 'path_to_descriptor' must be STRING type, but found " + inputType.fields()[0].dataType().simpleString()); } From f8805d7972dfb90321aec9dcd746646cb0504e60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 26 Dec 2025 17:54:43 +0800 Subject: [PATCH 7/9] Fix minus --- .../org/apache/paimon/spark/SparkCatalog.java | 8 ++--- .../spark/function/BuiltInFunctions.java | 36 ------------------- .../catalog/functions/PaimonFunctions.scala | 9 ++++- .../RewritePaimonFunctionCommands.scala | 7 +--- .../paimon/spark/sql/BlobTestBase.scala | 2 +- 5 files changed, 12 insertions(+), 50 deletions(-) delete mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 8ea519875484..5b5a18c80e31 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -36,7 +36,6 @@ import org.apache.paimon.spark.catalog.SupportView; import org.apache.paimon.spark.catalog.functions.PaimonFunctions; import org.apache.paimon.spark.catalog.functions.V1FunctionConverter; -import org.apache.paimon.spark.function.BuiltInFunctions; import org.apache.paimon.spark.utils.CatalogUtils; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.iceberg.IcebergTable; @@ -537,12 +536,12 @@ public void renameTable(Identifier oldIdent, Identifier newIdent) @Override public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException { - List result = new ArrayList<>(); - BuiltInFunctions.FUNCTIONS.keySet().forEach(fn -> result.add(Identifier.of(namespace, fn))); if (isSystemFunctionNamespace(namespace)) { + List result = new ArrayList<>(); PaimonFunctions.names().forEach(name -> result.add(Identifier.of(namespace, name))); return result.toArray(new Identifier[0]); } else if (isDatabaseFunctionNamespace(namespace)) { + List result = new ArrayList<>(); String databaseName = getDatabaseNameFromNamespace(namespace); try { catalog.listFunctions(databaseName) @@ -557,9 +556,6 @@ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExce @Override public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { - if (BuiltInFunctions.FUNCTIONS.containsKey(ident.name())) { - return BuiltInFunctions.FUNCTIONS.get(ident.name()); - } String[] namespace = ident.namespace(); if (isSystemFunctionNamespace(namespace)) { UnboundFunction func = PaimonFunctions.load(ident.name()); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java deleted file mode 100644 index ad19eb952aa6..000000000000 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/BuiltInFunctions.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark.function; - -import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; - -import java.util.HashMap; -import java.util.Map; - -/** Spark-paimon built in functions. */ -public class BuiltInFunctions { - - public static final Map FUNCTIONS = - new HashMap() { - { - put("path_to_descriptor", new PathToDescriptorUnbound()); - put("descriptor_to_string", new DescriptorToStringUnbound()); - } - }; -} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala index 2876f7e7f1fd..a0d97c68d48b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala @@ -25,6 +25,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap, import org.apache.paimon.spark.SparkInternalRowWrapper import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType import org.apache.paimon.spark.catalog.functions.PaimonFunctions._ +import org.apache.paimon.spark.function.{DescriptorToStringFunction, DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound} import org.apache.paimon.table.{BucketMode, FileStoreTable} import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, LocalZonedTimestampType, MapType, RowType, TimestampType} import org.apache.paimon.utils.ProjectedRow @@ -43,6 +44,8 @@ object PaimonFunctions { val PAIMON_BUCKET: String = "bucket" val MOD_BUCKET: String = "mod_bucket" val MAX_PT: String = "max_pt" + val PATH_TO_DESCRIPTOR: String = "path_to_descriptor" + val DESCRIPTOR_TO_STRING: String = "descriptor_to_string" private val FUNCTIONS = ImmutableMap.of( PAIMON_BUCKET, @@ -50,7 +53,11 @@ object PaimonFunctions { MOD_BUCKET, new BucketFunction(MOD_BUCKET, BucketFunctionType.MOD), MAX_PT, - new MaxPtFunction + new MaxPtFunction, + PATH_TO_DESCRIPTOR, + new PathToDescriptorUnbound, + DESCRIPTOR_TO_STRING, + new DescriptorToStringUnbound ) /** The bucket function type to the function name mapping */ diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala index fd1ddd8f2eec..ddbd9df5ac1b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala @@ -25,7 +25,6 @@ import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME import org.apache.paimon.spark.catalog.SupportV1Function import org.apache.paimon.spark.catalog.functions.PaimonFunctions import org.apache.paimon.spark.execution.{CreatePaimonV1FunctionCommand, DescribePaimonV1FunctionCommand, DropPaimonV1FunctionCommand} -import org.apache.paimon.spark.function.BuiltInFunctions import org.apache.paimon.spark.util.OptionUtils import org.apache.spark.sql.SparkSession @@ -168,7 +167,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) case Some(db) if db == SYSTEM_DATABASE_NAME && PaimonFunctions.names.contains(funcIdent.funcName) => true - case _ => BuiltInFunctions.FUNCTIONS.containsKey(funcIdent.funcName) + case _ => false } } @@ -176,10 +175,6 @@ case class RewritePaimonFunctionCommands(spark: SparkSession) catalogManager.v1SessionCatalog.isBuiltinFunction(funcIdent) } - private def isPaimonBuiltInFunction(funcIdent: FunctionIdentifier): Boolean = { - BuiltInFunctions.FUNCTIONS.containsKey(funcIdent.funcName) - } - private def isSparkTmpFunc(funcIdent: FunctionIdentifier): Boolean = { catalogManager.v1SessionCatalog.isTemporaryFunction(funcIdent) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 01351b708032..e05c9ce644c1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -145,7 +145,7 @@ class BlobTestBase extends PaimonSparkTestBase { "PARTITIONED BY (ds STRING, batch STRING) \n" + "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content','blob-as-descriptor' = 'true')") sql( - "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES \n('1','paimon','1024','12345678','20241017', path_to_descriptor('" + uri + "'))") + "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES \n('1','paimon','1024','12345678','20241017', sys.path_to_descriptor('" + uri + "'))") val newDescriptorBytes = sql("SELECT content FROM t WHERE id = '1'").collect()(0).get(0).asInstanceOf[Array[Byte]] val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes) From 20c682c5a029941758968f18460a59e3092b8e34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 26 Dec 2025 18:13:16 +0800 Subject: [PATCH 8/9] Fix minus --- .../org/apache/paimon/flink/FlinkCatalog.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 7738f1bac2be..f7fb5e7b61c9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -148,6 +148,7 @@ import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; +import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; @@ -1362,7 +1363,10 @@ public final void alterPartition( @Override public final List listFunctions(String dbName) throws CatalogException { - List functions = new ArrayList<>(BuiltInFunctions.FUNCTIONS.keySet()); + List functions = new ArrayList<>(); + if (isSystemNamespace(dbName)) { + functions.addAll(BuiltInFunctions.FUNCTIONS.keySet()); + } try { functions.addAll(catalog.listFunctions(dbName)); } catch (Catalog.DatabaseNotExistException e) { @@ -1374,9 +1378,12 @@ public final List listFunctions(String dbName) throws CatalogException { @Override public final CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { - if (BuiltInFunctions.FUNCTIONS.containsKey(functionPath.getObjectName())) { - String builtInFunction = BuiltInFunctions.FUNCTIONS.get(functionPath.getObjectName()); - return new CatalogFunctionImpl(builtInFunction, FunctionLanguage.JAVA); + if (isSystemNamespace(functionPath.getDatabaseName())) { + if (BuiltInFunctions.FUNCTIONS.containsKey(functionPath.getObjectName())) { + String builtInFunction = + BuiltInFunctions.FUNCTIONS.get(functionPath.getObjectName()); + return new CatalogFunctionImpl(builtInFunction, FunctionLanguage.JAVA); + } } try { @@ -1601,6 +1608,10 @@ public Procedure getProcedure(ObjectPath procedurePath) .orElseThrow(() -> new ProcedureNotExistException(name, procedurePath)); } + private static boolean isSystemNamespace(String namespace) { + return namespace.equalsIgnoreCase(SYSTEM_DATABASE_NAME); + } + private boolean isCalledFromFlinkRecomputeStatisticsProgram() { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); for (StackTraceElement stackTraceElement : stackTrace) { From 57db6b362b9f05bd96fe58b4e5cbf3ebe3a9e925 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 26 Dec 2025 18:19:49 +0800 Subject: [PATCH 9/9] Fix minus --- docs/content/spark/sql-functions.md | 42 +++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/docs/content/spark/sql-functions.md b/docs/content/spark/sql-functions.md index d17afc92ab13..bf04badf7515 100644 --- a/docs/content/spark/sql-functions.md +++ b/docs/content/spark/sql-functions.md @@ -53,6 +53,48 @@ SELECT * FROM t where pt = sys.max_pt('t'); -- a, 20250101 ``` +### path_to_descriptor + +`sys.path_to_descriptor($file_path)` + +Converts a file path (STRING) to a blob descriptor (BINARY). This function is useful when working with blob data stored in external files. It creates a blob descriptor that references the file at the specified path. + +**Arguments:** +- `file_path` (STRING): The path to the external file containing the blob data. + +**Returns:** +- A BINARY value representing the serialized blob descriptor. + +**Example** + +```sql +-- Insert blob data using path_to_descriptor function +INSERT INTO t VALUES ('1', 'paimon', sys.path_to_descriptor('file:///path/to/blob_file')); + +-- Insert with partition +INSERT OVERWRITE TABLE t PARTITION(ds='1017', batch='test') +VALUES ('1', 'paimon', '1024', '12345678', '20241017', sys.path_to_descriptor('file:///path/to/blob_file')); +``` + +### descriptor_to_string + +`sys.descriptor_to_string($descriptor)` + +Converts a blob descriptor (BINARY) to its string representation (STRING). This function is useful for debugging or displaying the contents of a blob descriptor in a human-readable format. + +**Arguments:** +- `descriptor` (BINARY): The blob descriptor bytes to convert. + +**Returns:** +- A STRING representation of the blob descriptor. + +**Example** + +```sql +-- Convert a blob descriptor to string for inspection +SELECT sys.descriptor_to_string(content) FROM t WHERE id = '1'; +``` + ## User-defined Function Paimon Spark supports two types of user-defined functions: lambda functions and file-based functions.