diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java index 28b3c6e3eaaf..11aa42cf5e42 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java @@ -41,6 +41,24 @@ public class LanceUtils { private static final Class jindoFileIOKlass; private static final Class hadoopFileIOKlass; + // OSS configuration keys + public static final String FS_OSS_ENDPOINT = "fs.oss.endpoint"; + public static final String FS_OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId"; + public static final String FS_OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"; + public static final String FS_OSS_SECURITY_TOKEN = "fs.oss.securityToken"; + private static final String FS_OSS_PREFIX = "fs.oss."; + + // Storage options keys for Lance + public static final String STORAGE_OPTION_ENDPOINT = "endpoint"; + public static final String STORAGE_OPTION_ACCESS_KEY_ID = "access_key_id"; + public static final String STORAGE_OPTION_SECRET_ACCESS_KEY = "secret_access_key"; + public static final String STORAGE_OPTION_SESSION_TOKEN = "session_token"; + public static final String STORAGE_OPTION_VIRTUAL_HOSTED_STYLE = "virtual_hosted_style_request"; + public static final String STORAGE_OPTION_OSS_ACCESS_KEY_ID = "oss_access_key_id"; + public static final String STORAGE_OPTION_OSS_SECRET_ACCESS_KEY = "oss_secret_access_key"; + public static final String STORAGE_OPTION_OSS_SESSION_TOKEN = "oss_session_token"; + public static final String STORAGE_OPTION_OSS_ENDPOINT = "oss_endpoint"; + static { Class klass; try { @@ -107,19 +125,38 @@ public static Pair> toLanceSpecified(FileIO fileIO, Pa converted = new Path(uriString.replace("traceable:/", "file:/")); } } else if ("oss".equals(schema)) { - assert originOptions.containsKey("fs.oss.endpoint"); - assert originOptions.containsKey("fs.oss.accessKeyId"); - assert originOptions.containsKey("fs.oss.accessKeySecret"); + assert originOptions.containsKey(FS_OSS_ENDPOINT); + assert originOptions.containsKey(FS_OSS_ACCESS_KEY_ID); + assert originOptions.containsKey(FS_OSS_ACCESS_KEY_SECRET); + + for (String key : originOptions.keySet()) { + if (key.startsWith(FS_OSS_PREFIX)) { + storageOptions.put(key, originOptions.get(key)); + } + } + + storageOptions.put( + STORAGE_OPTION_ENDPOINT, + "https://" + uri.getHost() + "." + originOptions.get(FS_OSS_ENDPOINT)); storageOptions.put( - "endpoint", - "https://" + uri.getHost() + "." + originOptions.get("fs.oss.endpoint")); - storageOptions.put("access_key_id", originOptions.get("fs.oss.accessKeyId")); - storageOptions.put("secret_access_key", originOptions.get("fs.oss.accessKeySecret")); - storageOptions.put("virtual_hosted_style_request", "true"); - if (originOptions.containsKey("fs.oss.securityToken")) { - storageOptions.put("session_token", originOptions.get("fs.oss.securityToken")); + STORAGE_OPTION_ACCESS_KEY_ID, originOptions.get(FS_OSS_ACCESS_KEY_ID)); + storageOptions.put( + STORAGE_OPTION_OSS_ACCESS_KEY_ID, originOptions.get(FS_OSS_ACCESS_KEY_ID)); + storageOptions.put( + STORAGE_OPTION_SECRET_ACCESS_KEY, originOptions.get(FS_OSS_ACCESS_KEY_SECRET)); + storageOptions.put( + STORAGE_OPTION_OSS_SECRET_ACCESS_KEY, + originOptions.get(FS_OSS_ACCESS_KEY_SECRET)); + storageOptions.put(STORAGE_OPTION_VIRTUAL_HOSTED_STYLE, "true"); + if (originOptions.containsKey(FS_OSS_SECURITY_TOKEN)) { + storageOptions.put( + STORAGE_OPTION_SESSION_TOKEN, originOptions.get(FS_OSS_SECURITY_TOKEN)); + storageOptions.put( + STORAGE_OPTION_OSS_SESSION_TOKEN, originOptions.get(FS_OSS_SECURITY_TOKEN)); + } + if (originOptions.containsKey(FS_OSS_ENDPOINT)) { + storageOptions.put(STORAGE_OPTION_OSS_ENDPOINT, originOptions.get(FS_OSS_ENDPOINT)); } - converted = new Path(uri.toString().replace("oss://", "s3://")); } return Pair.of(converted, storageOptions); diff --git a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java new file mode 100644 index 000000000000..d00f9a3dd635 --- /dev/null +++ b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.lance; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PluginFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Pair; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LanceUtilsTest { + + private static class TestFileIO extends PluginFileIO { + private static final long serialVersionUID = 1L; + + @Override + public boolean isObjectStore() { + return true; + } + + @Override + protected FileIO createFileIO(Path path) { + throw new UnsupportedOperationException("Not used in tests"); + } + + @Override + protected ClassLoader pluginClassLoader() { + return Thread.currentThread().getContextClassLoader(); + } + + void setOptions(Options opts) { + this.options = opts; + } + } + + @Test + void testOssUrlConversion() { + Path path = new Path("oss://test-bucket/db-name.db/table-name/bucket-0/data.lance"); + Options options = new Options(); + options.set(LanceUtils.FS_OSS_ENDPOINT, "oss-example-region.example.com"); + options.set(LanceUtils.FS_OSS_ACCESS_KEY_ID, "test-key"); + options.set(LanceUtils.FS_OSS_ACCESS_KEY_SECRET, "test-secret"); + + TestFileIO fileIO = new TestFileIO(); + fileIO.setOptions(options); + + Pair> result = LanceUtils.toLanceSpecified(fileIO, path); + + assertTrue(result.getKey().toString().startsWith("oss://test-bucket/")); + + Map storageOptions = result.getValue(); + assertEquals( + "https://test-bucket.oss-example-region.example.com", + storageOptions.get(LanceUtils.STORAGE_OPTION_ENDPOINT)); + assertEquals("test-key", storageOptions.get(LanceUtils.STORAGE_OPTION_ACCESS_KEY_ID)); + assertEquals( + "test-secret", storageOptions.get(LanceUtils.STORAGE_OPTION_SECRET_ACCESS_KEY)); + assertEquals("true", storageOptions.get(LanceUtils.STORAGE_OPTION_VIRTUAL_HOSTED_STYLE)); + + assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ENDPOINT)); + assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ACCESS_KEY_ID)); + assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_ACCESS_KEY_SECRET)); + } + + @Test + void testOssUrlWithSecurityToken() { + Path path = new Path("oss://my-bucket/path/to/file.lance"); + Options options = new Options(); + options.set(LanceUtils.FS_OSS_ENDPOINT, "oss-example-region.example.com"); + options.set(LanceUtils.FS_OSS_ACCESS_KEY_ID, "test-access-key"); + options.set(LanceUtils.FS_OSS_ACCESS_KEY_SECRET, "test-secret-key"); + options.set(LanceUtils.FS_OSS_SECURITY_TOKEN, "test-token"); + + TestFileIO fileIO = new TestFileIO(); + fileIO.setOptions(options); + + Pair> result = LanceUtils.toLanceSpecified(fileIO, path); + + Map storageOptions = result.getValue(); + assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_SESSION_TOKEN)); + assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_OSS_SESSION_TOKEN)); + assertTrue(storageOptions.containsKey(LanceUtils.FS_OSS_SECURITY_TOKEN)); + } +} diff --git a/paimon-python/pypaimon/read/reader/lance_utils.py b/paimon-python/pypaimon/read/reader/lance_utils.py index c7d768e631b1..33c2a8423524 100644 --- a/paimon-python/pypaimon/read/reader/lance_utils.py +++ b/paimon-python/pypaimon/read/reader/lance_utils.py @@ -18,6 +18,7 @@ import os from typing import Dict, Optional, Tuple +from urllib.parse import urlparse from pypaimon.common.config import OssOptions from pypaimon.common.file_io import FileIO @@ -38,21 +39,35 @@ def to_lance_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[D if scheme == 'oss': storage_options = {} if hasattr(file_io, 'properties'): + for key, value in file_io.properties.items(): + if key.startswith('fs.oss.'): + storage_options[key] = value + + parsed = urlparse(file_path) + bucket = parsed.netloc + path = parsed.path.lstrip('/') + endpoint = file_io.properties.get(OssOptions.OSS_ENDPOINT) if endpoint: - if not endpoint.startswith('http://') and not endpoint.startswith('https://'): - storage_options['endpoint'] = f"https://{endpoint}" - else: - storage_options['endpoint'] = endpoint + endpoint_clean = endpoint.replace('http://', '').replace('https://', '') + storage_options['endpoint'] = f"https://{bucket}.{endpoint_clean}" if OssOptions.OSS_ACCESS_KEY_ID in file_io.properties: storage_options['access_key_id'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_ID] + storage_options['oss_access_key_id'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_ID] if OssOptions.OSS_ACCESS_KEY_SECRET in file_io.properties: storage_options['secret_access_key'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_SECRET] + storage_options['oss_secret_access_key'] = file_io.properties[OssOptions.OSS_ACCESS_KEY_SECRET] if OssOptions.OSS_SECURITY_TOKEN in file_io.properties: storage_options['session_token'] = file_io.properties[OssOptions.OSS_SECURITY_TOKEN] + storage_options['oss_session_token'] = file_io.properties[OssOptions.OSS_SECURITY_TOKEN] + if OssOptions.OSS_ENDPOINT in file_io.properties: + storage_options['oss_endpoint'] = file_io.properties[OssOptions.OSS_ENDPOINT] storage_options['virtual_hosted_style_request'] = 'true' - file_path_for_lance = file_path.replace('oss://', 's3://') + if bucket and path: + file_path_for_lance = f"oss://{bucket}/{path}" + elif bucket: + file_path_for_lance = f"oss://{bucket}" return file_path_for_lance, storage_options diff --git a/paimon-python/pypaimon/tests/lance_utils_test.py b/paimon-python/pypaimon/tests/lance_utils_test.py new file mode 100644 index 000000000000..fd35626c22f5 --- /dev/null +++ b/paimon-python/pypaimon/tests/lance_utils_test.py @@ -0,0 +1,84 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest + +from pypaimon.common.config import OssOptions +from pypaimon.common.file_io import FileIO +from pypaimon.read.reader.lance_utils import to_lance_specified + + +class LanceUtilsTest(unittest.TestCase): + + def test_oss_url_bucket_extraction_correctness(self): + file_path = "oss://test-bucket/db-name.db/table-name/bucket-0/data.lance" + + properties = { + OssOptions.OSS_ENDPOINT: "oss-example-region.example.com", + OssOptions.OSS_ACCESS_KEY_ID: "test-key", + OssOptions.OSS_ACCESS_KEY_SECRET: "test-secret", + } + + file_io = FileIO(file_path, properties) + file_path_for_lance, storage_options = to_lance_specified(file_io, file_path) + + self.assertEqual( + storage_options['endpoint'], + "https://test-bucket.oss-example-region.example.com" + ) + + self.assertTrue(file_path_for_lance.startswith("oss://test-bucket/")) + + self.assertEqual(storage_options.get('virtual_hosted_style_request'), 'true') + + self.assertTrue('fs.oss.endpoint' in storage_options) + self.assertTrue('fs.oss.accessKeyId' in storage_options) + self.assertTrue('fs.oss.accessKeySecret' in storage_options) + + def test_oss_url_with_security_token(self): + file_path = "oss://my-bucket/path/to/file.lance" + + properties = { + OssOptions.OSS_ENDPOINT: "oss-example-region.example.com", + OssOptions.OSS_ACCESS_KEY_ID: "test-access-key", + OssOptions.OSS_ACCESS_KEY_SECRET: "test-secret-key", + OssOptions.OSS_SECURITY_TOKEN: "test-token", + } + + file_io = FileIO(file_path, properties) + file_path_for_lance, storage_options = to_lance_specified(file_io, file_path) + + self.assertEqual(file_path_for_lance, "oss://my-bucket/path/to/file.lance") + + self.assertEqual( + storage_options['endpoint'], + "https://my-bucket.oss-example-region.example.com" + ) + + self.assertEqual(storage_options.get('virtual_hosted_style_request'), 'true') + + self.assertEqual(storage_options.get('access_key_id'), "test-access-key") + self.assertEqual(storage_options.get('secret_access_key'), "test-secret-key") + self.assertEqual(storage_options.get('session_token'), "test-token") + self.assertEqual(storage_options.get('oss_session_token'), "test-token") + + self.assertTrue('fs.oss.securityToken' in storage_options) + + +if __name__ == '__main__': + unittest.main()