Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ public class LanceUtils {
private static final Class<?> jindoFileIOKlass;
private static final Class<?> hadoopFileIOKlass;

// OSS configuration keys
public static final String FS_OSS_ENDPOINT = "fs.oss.endpoint";
public static final String FS_OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
public static final String FS_OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
public static final String FS_OSS_SECURITY_TOKEN = "fs.oss.securityToken";
private static final String FS_OSS_PREFIX = "fs.oss.";

// Storage options keys for Lance
public static final String STORAGE_OPTION_ENDPOINT = "endpoint";
public static final String STORAGE_OPTION_ACCESS_KEY_ID = "access_key_id";
public static final String STORAGE_OPTION_SECRET_ACCESS_KEY = "secret_access_key";
public static final String STORAGE_OPTION_SESSION_TOKEN = "session_token";
public static final String STORAGE_OPTION_VIRTUAL_HOSTED_STYLE = "virtual_hosted_style_request";
public static final String STORAGE_OPTION_OSS_ACCESS_KEY_ID = "oss_access_key_id";
public static final String STORAGE_OPTION_OSS_SECRET_ACCESS_KEY = "oss_secret_access_key";
public static final String STORAGE_OPTION_OSS_SESSION_TOKEN = "oss_session_token";
public static final String STORAGE_OPTION_OSS_ENDPOINT = "oss_endpoint";

static {
Class<?> klass;
try {
Expand Down Expand Up @@ -107,19 +125,38 @@ public static Pair<Path, Map<String, String>> toLanceSpecified(FileIO fileIO, Pa
converted = new Path(uriString.replace("traceable:/", "file:/"));
}
} else if ("oss".equals(schema)) {
assert originOptions.containsKey("fs.oss.endpoint");
assert originOptions.containsKey("fs.oss.accessKeyId");
assert originOptions.containsKey("fs.oss.accessKeySecret");
assert originOptions.containsKey(FS_OSS_ENDPOINT);
assert originOptions.containsKey(FS_OSS_ACCESS_KEY_ID);
assert originOptions.containsKey(FS_OSS_ACCESS_KEY_SECRET);

for (String key : originOptions.keySet()) {
if (key.startsWith(FS_OSS_PREFIX)) {
storageOptions.put(key, originOptions.get(key));
}
}

storageOptions.put(
STORAGE_OPTION_ENDPOINT,
"https://" + uri.getHost() + "." + originOptions.get(FS_OSS_ENDPOINT));
storageOptions.put(
"endpoint",
"https://" + uri.getHost() + "." + originOptions.get("fs.oss.endpoint"));
storageOptions.put("access_key_id", originOptions.get("fs.oss.accessKeyId"));
storageOptions.put("secret_access_key", originOptions.get("fs.oss.accessKeySecret"));
storageOptions.put("virtual_hosted_style_request", "true");
if (originOptions.containsKey("fs.oss.securityToken")) {
storageOptions.put("session_token", originOptions.get("fs.oss.securityToken"));
STORAGE_OPTION_ACCESS_KEY_ID, originOptions.get(FS_OSS_ACCESS_KEY_ID));
storageOptions.put(
STORAGE_OPTION_OSS_ACCESS_KEY_ID, originOptions.get(FS_OSS_ACCESS_KEY_ID));
storageOptions.put(
STORAGE_OPTION_SECRET_ACCESS_KEY, originOptions.get(FS_OSS_ACCESS_KEY_SECRET));
storageOptions.put(
STORAGE_OPTION_OSS_SECRET_ACCESS_KEY,
originOptions.get(FS_OSS_ACCESS_KEY_SECRET));
storageOptions.put(STORAGE_OPTION_VIRTUAL_HOSTED_STYLE, "true");
if (originOptions.containsKey(FS_OSS_SECURITY_TOKEN)) {
storageOptions.put(
STORAGE_OPTION_SESSION_TOKEN, originOptions.get(FS_OSS_SECURITY_TOKEN));
storageOptions.put(
STORAGE_OPTION_OSS_SESSION_TOKEN, originOptions.get(FS_OSS_SECURITY_TOKEN));
}
if (originOptions.containsKey(FS_OSS_ENDPOINT)) {
storageOptions.put(STORAGE_OPTION_OSS_ENDPOINT, originOptions.get(FS_OSS_ENDPOINT));
}
converted = new Path(uri.toString().replace("oss://", "s3://"));
}

return Pair.of(converted, storageOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.lance;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PluginFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class LanceUtilsTest {

private static class TestFileIO extends PluginFileIO {
private static final long serialVersionUID = 1L;

@Override
public boolean isObjectStore() {
return true;
}

@Override
protected FileIO createFileIO(Path path) {
throw new UnsupportedOperationException("Not used in tests");
}

@Override
protected ClassLoader pluginClassLoader() {
return Thread.currentThread().getContextClassLoader();
}

void setOptions(Options opts) {
this.options = opts;
}
}

@Test
void testOssUrlConversion() {
Path path = new Path("oss://test-bucket/db-name.db/table-name/bucket-0/data.lance");
Options options = new Options();
options.set(LanceUtils.FS_OSS_ENDPOINT, "oss-example-region.example.com");
options.set(LanceUtils.FS_OSS_ACCESS_KEY_ID, "test-key");
options.set(LanceUtils.FS_OSS_ACCESS_KEY_SECRET, "test-secret");

TestFileIO fileIO = new TestFileIO();
fileIO.setOptions(options);

Pair<Path, Map<String, String>> result = LanceUtils.toLanceSpecified(fileIO, path);

assertTrue(result.getKey().toString().startsWith("oss://test-bucket/"));

Map<String, String> storageOptions = result.getValue();
assertEquals(
"https://test-bucket.oss-example-region.example.com",
storageOptions.get(LanceUtils.STORAGE_OPTION_ENDPOINT));
assertEquals("test-key", storageOptions.get(LanceUtils.STORAGE_OPTION_ACCESS_KEY_ID));
assertEquals(
"test-secret", storageOptions.get(LanceUtils.STORAGE_OPTION_SECRET_ACCESS_KEY));
assertEquals("true", storageOptions.get(LanceUtils.STORAGE_OPTION_VIRTUAL_HOSTED_STYLE));

assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ENDPOINT));
assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ACCESS_KEY_ID));
assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ACCESS_KEY_SECRET));
}

@Test
void testOssUrlWithSecurityToken() {
Path path = new Path("oss://my-bucket/path/to/file.lance");
Options options = new Options();
options.set(LanceUtils.FS_OSS_ENDPOINT, "oss-example-region.example.com");
options.set(LanceUtils.FS_OSS_ACCESS_KEY_ID, "test-access-key");
options.set(LanceUtils.FS_OSS_ACCESS_KEY_SECRET, "test-secret-key");
options.set(LanceUtils.FS_OSS_SECURITY_TOKEN, "test-token");

TestFileIO fileIO = new TestFileIO();
fileIO.setOptions(options);

Pair<Path, Map<String, String>> result = LanceUtils.toLanceSpecified(fileIO, path);

Map<String, String> storageOptions = result.getValue();
assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_SESSION_TOKEN));
assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_OSS_SESSION_TOKEN));
assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_SECURITY_TOKEN));
}
}
25 changes: 20 additions & 5 deletions paimon-python/pypaimon/read/reader/lance_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import os
from typing import Dict, Optional, Tuple
from urllib.parse import urlparse

from pypaimon.common.config import OssOptions
from pypaimon.common.file_io import FileIO
Expand All @@ -38,21 +39,35 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[D
if scheme == 'oss':
storage_options = {}
if hasattr(file_io, 'properties'):
for key, value in file_io.properties.items():
if key.startswith('fs.oss.'):
storage_options[key] = value

parsed = urlparse(file_path)
bucket = parsed.netloc
path = parsed.path.lstrip('/')

endpoint = file_io.properties.get(OssOptions.OSS_ENDPOINT)
if endpoint:
if not endpoint.startswith('http://') and not endpoint.startswith('https://'):
storage_options['endpoint'] = f"https://{endpoint}"
else:
storage_options['endpoint'] = endpoint
endpoint_clean = endpoint.replace('http://', '').replace('https://', '')
storage_options['endpoint'] = f"https://{bucket}.{endpoint_clean}"

if OssOptions.OSS_ACCESS_KEY_ID in file_io.properties:
storage_options['access_key_id'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_ID]
storage_options['oss_access_key_id'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_ID]
if OssOptions.OSS_ACCESS_KEY_SECRET in file_io.properties:
storage_options['secret_access_key'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_SECRET]
storage_options['oss_secret_access_key'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_SECRET]
if OssOptions.OSS_SECURITY_TOKEN in file_io.properties:
storage_options['session_token'] = file_io.properties[OssOptions.OSS_SECURITY_TOKEN]
storage_options['oss_session_token'] = file_io.properties[OssOptions.OSS_SECURITY_TOKEN]
if OssOptions.OSS_ENDPOINT in file_io.properties:
storage_options['oss_endpoint'] = file_io.properties[OssOptions.OSS_ENDPOINT]
storage_options['virtual_hosted_style_request'] = 'true'

file_path_for_lance = file_path.replace('oss://', 's3://')
if bucket and path:
file_path_for_lance = f"oss://{bucket}/{path}"
elif bucket:
file_path_for_lance = f"oss://{bucket}"

return file_path_for_lance, storage_options
84 changes: 84 additions & 0 deletions paimon-python/pypaimon/tests/lance_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import unittest

from pypaimon.common.config import OssOptions
from pypaimon.common.file_io import FileIO
from pypaimon.read.reader.lance_utils import to_lance_specified


class LanceUtilsTest(unittest.TestCase):

def test_oss_url_bucket_extraction_correctness(self):
file_path = "oss://test-bucket/db-name.db/table-name/bucket-0/data.lance"

properties = {
OssOptions.OSS_ENDPOINT: "oss-example-region.example.com",
OssOptions.OSS_ACCESS_KEY_ID: "test-key",
OssOptions.OSS_ACCESS_KEY_SECRET: "test-secret",
}

file_io = FileIO(file_path, properties)
file_path_for_lance, storage_options = to_lance_specified(file_io, file_path)

self.assertEqual(
storage_options['endpoint'],
"https://test-bucket.oss-example-region.example.com"
)

self.assertTrue(file_path_for_lance.startswith("oss://test-bucket/"))

self.assertEqual(storage_options.get('virtual_hosted_style_request'), 'true')

self.assertTrue('fs.oss.endpoint' in storage_options)
self.assertTrue('fs.oss.accessKeyId' in storage_options)
self.assertTrue('fs.oss.accessKeySecret' in storage_options)

def test_oss_url_with_security_token(self):
file_path = "oss://my-bucket/path/to/file.lance"

properties = {
OssOptions.OSS_ENDPOINT: "oss-example-region.example.com",
OssOptions.OSS_ACCESS_KEY_ID: "test-access-key",
OssOptions.OSS_ACCESS_KEY_SECRET: "test-secret-key",
OssOptions.OSS_SECURITY_TOKEN: "test-token",
}

file_io = FileIO(file_path, properties)
file_path_for_lance, storage_options = to_lance_specified(file_io, file_path)

self.assertEqual(file_path_for_lance, "oss://my-bucket/path/to/file.lance")

self.assertEqual(
storage_options['endpoint'],
"https://my-bucket.oss-example-region.example.com"
)

self.assertEqual(storage_options.get('virtual_hosted_style_request'), 'true')

self.assertEqual(storage_options.get('access_key_id'), "test-access-key")
self.assertEqual(storage_options.get('secret_access_key'), "test-secret-key")
self.assertEqual(storage_options.get('session_token'), "test-token")
self.assertEqual(storage_options.get('oss_session_token'), "test-token")

self.assertTrue('fs.oss.securityToken' in storage_options)


if __name__ == '__main__':
unittest.main()
Loading