Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/content/spark/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,48 @@ SELECT * FROM t where pt = sys.max_pt('t');
-- a, 20250101
```

### path_to_descriptor

`sys.path_to_descriptor($file_path)`

Converts a file path (STRING) to a blob descriptor (BINARY). This function is useful when working with blob data stored in external files. It creates a blob descriptor that references the file at the specified path.

**Arguments:**
- `file_path` (STRING): The path to the external file containing the blob data.

**Returns:**
- A BINARY value representing the serialized blob descriptor.

**Example**

```sql
-- Insert blob data using path_to_descriptor function
INSERT INTO t VALUES ('1', 'paimon', sys.path_to_descriptor('file:///path/to/blob_file'));

-- Insert with partition
INSERT OVERWRITE TABLE t PARTITION(ds='1017', batch='test')
VALUES ('1', 'paimon', '1024', '12345678', '20241017', sys.path_to_descriptor('file:///path/to/blob_file'));
```

### descriptor_to_string

`sys.descriptor_to_string($descriptor)`

Converts a blob descriptor (BINARY) to its string representation (STRING). This function is useful for debugging or displaying the contents of a blob descriptor in a human-readable format.

**Arguments:**
- `descriptor` (BINARY): The blob descriptor bytes to convert.

**Returns:**
- A STRING representation of the blob descriptor.

**Example**

```sql
-- Convert a blob descriptor to string for inspection
SELECT sys.descriptor_to_string(content) FROM t WHERE id = '1';
```

## User-defined Function

Paimon Spark supports two types of user-defined functions: lambda functions and file-based functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.flink.function.BuiltInFunctions;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
Expand Down Expand Up @@ -147,6 +148,7 @@
import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
Expand Down Expand Up @@ -1361,16 +1363,29 @@ public final void alterPartition(

@Override
public final List<String> listFunctions(String dbName) throws CatalogException {
List<String> functions = new ArrayList<>();
if (isSystemNamespace(dbName)) {
functions.addAll(BuiltInFunctions.FUNCTIONS.keySet());
}
try {
return catalog.listFunctions(dbName);
functions.addAll(catalog.listFunctions(dbName));
} catch (Catalog.DatabaseNotExistException e) {
throw new CatalogException(e.getMessage(), e);
}
return functions;
}

@Override
public final CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
if (isSystemNamespace(functionPath.getDatabaseName())) {
if (BuiltInFunctions.FUNCTIONS.containsKey(functionPath.getObjectName())) {
String builtInFunction =
BuiltInFunctions.FUNCTIONS.get(functionPath.getObjectName());
return new CatalogFunctionImpl(builtInFunction, FunctionLanguage.JAVA);
}
}

try {
org.apache.paimon.function.Function function =
catalog.getFunction(toIdentifier(functionPath));
Expand Down Expand Up @@ -1593,6 +1608,10 @@ public Procedure getProcedure(ObjectPath procedurePath)
.orElseThrow(() -> new ProcedureNotExistException(name, procedurePath));
}

private static boolean isSystemNamespace(String namespace) {
return namespace.equalsIgnoreCase(SYSTEM_DATABASE_NAME);
}

private boolean isCalledFromFlinkRecomputeStatisticsProgram() {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
for (StackTraceElement stackTraceElement : stackTrace) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.function;

import java.util.HashMap;
import java.util.Map;

/** Paimon flink built in functions. */
public class BuiltInFunctions {

public static final Map<String, String> FUNCTIONS =
new HashMap<String, String>() {
{
put("path_to_descriptor", PathToDescriptor.class.getName());
put("descriptor_to_string", DescriptorToString.class.getName());
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.function;

import org.apache.paimon.data.BlobDescriptor;

import org.apache.flink.table.functions.ScalarFunction;

/** Blob descriptor to string. */
public class DescriptorToString extends ScalarFunction {

public String eval(byte[] descriptorBytes) {
if (descriptorBytes == null) {
return null;
}

BlobDescriptor descriptor = BlobDescriptor.deserialize(descriptorBytes);

return descriptor.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.function;

import org.apache.paimon.data.BlobDescriptor;

import org.apache.flink.table.functions.ScalarFunction;

/** File path to blob descriptor. */
public class PathToDescriptor extends ScalarFunction {

public byte[] eval(String path) {
if (path == null) {
return null;
}

BlobDescriptor descriptor = new BlobDescriptor(path, 0, Long.MAX_VALUE);
return descriptor.serialize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,40 @@ public void testWriteBlobAsDescriptor() throws Exception {
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
}

@Test
public void testWriteBlobWithBuiltInFunction() throws Exception {
byte[] blobData = new byte[1024 * 1024];
RANDOM.nextBytes(blobData);
FileIO fileIO = new LocalFileIO();
String uri = "file://" + warehouse + "/external_blob";
try (OutputStream outputStream =
fileIO.newOutputStream(new org.apache.paimon.fs.Path(uri), true)) {
outputStream.write(blobData);
}

BlobDescriptor blobDescriptor = new BlobDescriptor(uri, 0, blobData.length);
batchSql(
"INSERT INTO blob_table_descriptor VALUES (1, 'paimon', path_to_descriptor('"
+ uri
+ "'))");
byte[] newDescriptorBytes =
(byte[]) batchSql("SELECT picture FROM blob_table_descriptor").get(0).getField(0);
BlobDescriptor newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes);
Options options = new Options();
options.set("warehouse", warehouse.toString());
CatalogContext catalogContext = CatalogContext.create(options);
UriReaderFactory uriReaderFactory = new UriReaderFactory(catalogContext);
Blob blob =
Blob.fromDescriptor(
uriReaderFactory.create(newBlobDescriptor.uri()), blobDescriptor);
assertThat(blob.toData()).isEqualTo(blobData);
URI blobUri = URI.create(blob.toDescriptor().uri());
assertThat(blobUri.getScheme()).isNotNull();
batchSql("ALTER TABLE blob_table_descriptor SET ('blob-as-descriptor'='false')");
assertThat(batchSql("SELECT * FROM blob_table_descriptor"))
.containsExactlyInAnyOrder(Row.of(1, "paimon", blobData));
}

private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();

public static String bytesToHex(byte[] bytes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.function;

import org.apache.paimon.data.BlobDescriptor;

import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.Serializable;

/** Function to convert blob descriptor to its string representation. */
public class DescriptorToStringFunction implements ScalarFunction<UTF8String>, Serializable {

@Override
public DataType[] inputTypes() {
return new DataType[] {DataTypes.BinaryType};
}

@Override
public DataType resultType() {
return DataTypes.StringType;
}

public UTF8String invoke(byte[] descriptorBytes) {
if (descriptorBytes == null) {
return null;
}

BlobDescriptor descriptor = BlobDescriptor.deserialize(descriptorBytes);
return UTF8String.fromString(descriptor.toString());
}

@Override
public String name() {
return "descriptor_to_string";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.function;

import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.StructType;

/** Function unbound to {@link PathToDescriptorFunction}. */
public class DescriptorToStringUnbound implements UnboundFunction {

@Override
public BoundFunction bind(StructType inputType) {
if (inputType.fields().length != 1) {
throw new UnsupportedOperationException(
"Function 'descriptor_to_string' requires 1 argument, but found "
+ inputType.fields().length);
}

if (!(inputType.fields()[0].dataType() instanceof BinaryType)) {
throw new UnsupportedOperationException(
"The first argument of 'descriptor_to_string' must be BINARY type, but found "
+ inputType.fields()[0].dataType().simpleString());
}

return new PathToDescriptorFunction();
}

@Override
public String description() {
return "Convert file path to blob descriptor";
}

@Override
public String name() {
return "path_to_descriptor";
}
}
Loading
Loading