diff --git a/docs/content/spark/sql-functions.md b/docs/content/spark/sql-functions.md index d17afc92ab13..bf04badf7515 100644 --- a/docs/content/spark/sql-functions.md +++ b/docs/content/spark/sql-functions.md @@ -53,6 +53,48 @@ SELECT * FROM t where pt = sys.max_pt('t'); -- a, 20250101 ``` +### path_to_descriptor + +`sys.path_to_descriptor($file_path)` + +Converts a file path (STRING) to a blob descriptor (BINARY). This function is useful when working with blob data stored in external files. It creates a blob descriptor that references the file at the specified path. + +**Arguments:** +- `file_path` (STRING): The path to the external file containing the blob data. + +**Returns:** +- A BINARY value representing the serialized blob descriptor. + +**Example** + +```sql +-- Insert blob data using path_to_descriptor function +INSERT INTO t VALUES ('1', 'paimon', sys.path_to_descriptor('file:///path/to/blob_file')); + +-- Insert with partition +INSERT OVERWRITE TABLE t PARTITION(ds='1017', batch='test') +VALUES ('1', 'paimon', '1024', '12345678', '20241017', sys.path_to_descriptor('file:///path/to/blob_file')); +``` + +### descriptor_to_string + +`sys.descriptor_to_string($descriptor)` + +Converts a blob descriptor (BINARY) to its string representation (STRING). This function is useful for debugging or displaying the contents of a blob descriptor in a human-readable format. + +**Arguments:** +- `descriptor` (BINARY): The blob descriptor bytes to convert. + +**Returns:** +- A STRING representation of the blob descriptor. + +**Example** + +```sql +-- Convert a blob descriptor to string for inspection +SELECT sys.descriptor_to_string(content) FROM t WHERE id = '1'; +``` + ## User-defined Function Paimon Spark supports two types of user-defined functions: lambda functions and file-based functions. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index cd259903f21a..f7fb5e7b61c9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -26,6 +26,7 @@ import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.flink.function.BuiltInFunctions; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; import org.apache.paimon.flink.utils.FlinkDescriptorProperties; @@ -147,6 +148,7 @@ import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; +import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; @@ -1361,16 +1363,29 @@ public final void alterPartition( @Override public final List listFunctions(String dbName) throws CatalogException { + List functions = new ArrayList<>(); + if (isSystemNamespace(dbName)) { + functions.addAll(BuiltInFunctions.FUNCTIONS.keySet()); + } try { - return catalog.listFunctions(dbName); + functions.addAll(catalog.listFunctions(dbName)); } catch (Catalog.DatabaseNotExistException e) { throw new CatalogException(e.getMessage(), e); } + return functions; } @Override public final CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { + if (isSystemNamespace(functionPath.getDatabaseName())) { + if (BuiltInFunctions.FUNCTIONS.containsKey(functionPath.getObjectName())) { + String builtInFunction = + BuiltInFunctions.FUNCTIONS.get(functionPath.getObjectName()); + return new CatalogFunctionImpl(builtInFunction, FunctionLanguage.JAVA); + } + } + try { org.apache.paimon.function.Function function = catalog.getFunction(toIdentifier(functionPath)); @@ -1593,6 +1608,10 @@ public Procedure getProcedure(ObjectPath procedurePath) .orElseThrow(() -> new ProcedureNotExistException(name, procedurePath)); } + private static boolean isSystemNamespace(String namespace) { + return namespace.equalsIgnoreCase(SYSTEM_DATABASE_NAME); + } + private boolean isCalledFromFlinkRecomputeStatisticsProgram() { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); for (StackTraceElement stackTraceElement : stackTrace) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java new file mode 100644 index 000000000000..a6a94faf6141 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/BuiltInFunctions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.function; + +import java.util.HashMap; +import java.util.Map; + +/** Paimon flink built in functions. */ +public class BuiltInFunctions { + + public static final Map FUNCTIONS = + new HashMap() { + { + put("path_to_descriptor", PathToDescriptor.class.getName()); + put("descriptor_to_string", DescriptorToString.class.getName()); + } + }; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java new file mode 100644 index 000000000000..fd622bf9efed --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/DescriptorToString.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.function; + +import org.apache.paimon.data.BlobDescriptor; + +import org.apache.flink.table.functions.ScalarFunction; + +/** Blob descriptor to string. */ +public class DescriptorToString extends ScalarFunction { + + public String eval(byte[] descriptorBytes) { + if (descriptorBytes == null) { + return null; + } + + BlobDescriptor descriptor = BlobDescriptor.deserialize(descriptorBytes); + + return descriptor.toString(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java new file mode 100644 index 000000000000..87d136f71790 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.function; + +import org.apache.paimon.data.BlobDescriptor; + +import org.apache.flink.table.functions.ScalarFunction; + +/** File path to blob descriptor. */ +public class PathToDescriptor extends ScalarFunction { + + public byte[] eval(String path) { + if (path == null) { + return null; + } + + BlobDescriptor descriptor = new BlobDescriptor(path, 0, Long.MAX_VALUE); + return descriptor.serialize(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java index 73c11c8f3b81..a96bb45eecd6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java @@ -98,6 +98,40 @@ public void testWriteBlobAsDescriptor() throws Exception { .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData)); } + @Test + public void testWriteBlobWithBuiltInFunction() throws Exception { + byte[] blobData = new byte[1024 * 1024]; + RANDOM.nextBytes(blobData); + FileIO fileIO = new LocalFileIO(); + String uri = "file://" + warehouse + "/external_blob"; + try (OutputStream outputStream = + fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), true)) { + outputStream.write(blobData); + } + + BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, blobData.length); + batchSql( + "INSERT INTO blob_table_descriptor VALUES (1, 'paimon', path_to_descriptor('" + + uri + + "'))"); + byte[] newDescriptorBytes = + (byte[]) batchSql("SELECT picture FROM blob_table_descriptor").get(0).getField(0); + BlobDescriptor newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes); + Options options = new Options(); + options.set("warehouse", warehouse.toString()); + CatalogContext catalogContext = CatalogContext.create(options); + UriReaderFactory uriReaderFactory = new UriReaderFactory(catalogContext); + Blob blob = + Blob.fromDescriptor( + uriReaderFactory.create(newBlobDescriptor.uri()), blobDescriptor); + assertThat(blob.toData()).isEqualTo(blobData); + URI blobUri = URI.create(blob.toDescriptor().uri()); + assertThat(blobUri.getScheme()).isNotNull(); + batchSql("ALTER TABLE blob_table_descriptor SET ('blob-as-descriptor'='false')"); + assertThat(batchSql("SELECT * FROM blob_table_descriptor")) + .containsExactlyInAnyOrder(Row.of(1, "paimon", blobData)); + } + private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); public static String bytesToHex(byte[] bytes) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java new file mode 100644 index 000000000000..482786f12eea --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.paimon.data.BlobDescriptor; + +import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.unsafe.types.UTF8String; + +import java.io.Serializable; + +/** Function to convert blob descriptor to its string representation. */ +public class DescriptorToStringFunction implements ScalarFunction, Serializable { + + @Override + public DataType[] inputTypes() { + return new DataType[] {DataTypes.BinaryType}; + } + + @Override + public DataType resultType() { + return DataTypes.StringType; + } + + public UTF8String invoke(byte[] descriptorBytes) { + if (descriptorBytes == null) { + return null; + } + + BlobDescriptor descriptor = BlobDescriptor.deserialize(descriptorBytes); + return UTF8String.fromString(descriptor.toString()); + } + + @Override + public String name() { + return "descriptor_to_string"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java new file mode 100644 index 000000000000..3fd02fe8c874 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/DescriptorToStringUnbound.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.spark.sql.connector.catalog.functions.BoundFunction; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.StructType; + +/** Function unbound to {@link PathToDescriptorFunction}. */ +public class DescriptorToStringUnbound implements UnboundFunction { + + @Override + public BoundFunction bind(StructType inputType) { + if (inputType.fields().length != 1) { + throw new UnsupportedOperationException( + "Function 'descriptor_to_string' requires 1 argument, but found " + + inputType.fields().length); + } + + if (!(inputType.fields()[0].dataType() instanceof BinaryType)) { + throw new UnsupportedOperationException( + "The first argument of 'descriptor_to_string' must be BINARY type, but found " + + inputType.fields()[0].dataType().simpleString()); + } + + return new PathToDescriptorFunction(); + } + + @Override + public String description() { + return "Convert file path to blob descriptor"; + } + + @Override + public String name() { + return "path_to_descriptor"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java new file mode 100644 index 000000000000..ea6f306cc3a7 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.paimon.data.BlobDescriptor; + +import org.apache.spark.sql.connector.catalog.functions.ScalarFunction; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.unsafe.types.UTF8String; + +import java.io.Serializable; + +/** Function to convert file path to blob descriptor. */ +public class PathToDescriptorFunction implements ScalarFunction, Serializable { + + @Override + public DataType[] inputTypes() { + return new DataType[] {DataTypes.StringType}; + } + + @Override + public DataType resultType() { + return DataTypes.BinaryType; + } + + public byte[] invoke(UTF8String path) { + if (path == null) { + return null; + } + + BlobDescriptor descriptor = new BlobDescriptor(path.toString(), 0, Long.MAX_VALUE); + return descriptor.serialize(); + } + + @Override + public String name() { + return "path_to_descriptor"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java new file mode 100644 index 000000000000..23b7718847db --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorUnbound.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.function; + +import org.apache.spark.sql.connector.catalog.functions.BoundFunction; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructType; + +/** Function unbound to {@link PathToDescriptorFunction}. */ +public class PathToDescriptorUnbound implements UnboundFunction { + + @Override + public BoundFunction bind(StructType inputType) { + if (inputType.fields().length != 1) { + throw new UnsupportedOperationException( + "Function 'path_to_descriptor' requires 1 argument, but found " + + inputType.fields().length); + } + + if (!(inputType.fields()[0].dataType() instanceof StringType)) { + throw new UnsupportedOperationException( + "The first argument of 'path_to_descriptor' must be STRING type, but found " + + inputType.fields()[0].dataType().simpleString()); + } + + return new PathToDescriptorFunction(); + } + + @Override + public String description() { + return "Convert file path to blob descriptor"; + } + + @Override + public String name() { + return "path_to_descriptor"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala index 2876f7e7f1fd..a0d97c68d48b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala @@ -25,6 +25,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap, import org.apache.paimon.spark.SparkInternalRowWrapper import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType import org.apache.paimon.spark.catalog.functions.PaimonFunctions._ +import org.apache.paimon.spark.function.{DescriptorToStringFunction, DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound} import org.apache.paimon.table.{BucketMode, FileStoreTable} import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, LocalZonedTimestampType, MapType, RowType, TimestampType} import org.apache.paimon.utils.ProjectedRow @@ -43,6 +44,8 @@ object PaimonFunctions { val PAIMON_BUCKET: String = "bucket" val MOD_BUCKET: String = "mod_bucket" val MAX_PT: String = "max_pt" + val PATH_TO_DESCRIPTOR: String = "path_to_descriptor" + val DESCRIPTOR_TO_STRING: String = "descriptor_to_string" private val FUNCTIONS = ImmutableMap.of( PAIMON_BUCKET, @@ -50,7 +53,11 @@ object PaimonFunctions { MOD_BUCKET, new BucketFunction(MOD_BUCKET, BucketFunctionType.MOD), MAX_PT, - new MaxPtFunction + new MaxPtFunction, + PATH_TO_DESCRIPTOR, + new PathToDescriptorUnbound, + DESCRIPTOR_TO_STRING, + new DescriptorToStringUnbound ) /** The bucket function type to the function name mapping */ diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 854f2eac34d7..e05c9ce644c1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -127,6 +127,43 @@ class BlobTestBase extends PaimonSparkTestBase { } } + test("Blob: test write blob descriptor with built-in function") { + withTable("t") { + val blobData = new Array[Byte](1024 * 1024) + RANDOM.nextBytes(blobData) + val fileIO = new LocalFileIO + val uri = "file://" + tempDBDir.toString + "/external_blob" + try { + val outputStream = fileIO.newOutputStream(new Path(uri), true) + try outputStream.write(blobData) + finally if (outputStream != null) outputStream.close() + } + + val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length) + sql( + "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n" + "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" + "content BINARY\n" + ") \n" + + "PARTITIONED BY (ds STRING, batch STRING) \n" + + "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content','blob-as-descriptor' = 'true')") + sql( + "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES \n('1','paimon','1024','12345678','20241017', sys.path_to_descriptor('" + uri + "'))") + val newDescriptorBytes = + sql("SELECT content FROM t WHERE id = '1'").collect()(0).get(0).asInstanceOf[Array[Byte]] + val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes) + val options = new Options() + options.set("warehouse", tempDBDir.toString) + val catalogContext = CatalogContext.create(options) + val uriReaderFactory = new UriReaderFactory(catalogContext) + val blob = Blob.fromDescriptor(uriReaderFactory.create(newBlobDescriptor.uri), blobDescriptor) + assert(util.Arrays.equals(blobData, blob.toData)) + + sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='false')") + checkAnswer( + sql("SELECT id, name, content, _ROW_ID, _SEQUENCE_NUMBER FROM t WHERE id = 1"), + Seq(Row("1", "paimon", blobData, 0, 1)) + ) + } + } + test("Blob: test compaction") { withTable("t") { sql(