From 62479d4c269729273f3e96a702ac2639208e04b5 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 4 Feb 2026 11:09:41 +0800 Subject: [PATCH 01/13] need review --- .../spark/BaseLanceNamespaceSparkCatalog.java | 115 +++++--- .../java/org/lance/spark/LanceCatalog.java | 56 ++-- .../read/LanceCountStarPartitionReader.java | 1 + .../lance/spark/read/LanceScanBuilder.java | 1 + .../java/org/lance/spark/read/LanceSplit.java | 1 + .../java/org/lance/spark/utils/Utils.java | 135 +++++++++ .../datasources/v2/AddIndexExec.scala | 3 +- .../execution/datasources/v2/VacuumExec.scala | 1 + .../spark/SparkLanceNamespaceTestBase.java | 46 +++ .../write/BaseSparkConnectorWriteTest.java | 58 +++- .../sql/connector/catalog/TableCatalog.java | 273 ++++++++++++++++++ 11 files changed, 627 insertions(+), 63 deletions(-) create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java create mode 100644 org/apache/spark/sql/connector/catalog/TableCatalog.java diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index c882425ee..76b276284 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -26,7 +26,9 @@ import org.lance.spark.function.LanceFragmentIdWithDefaultFunction; import org.lance.spark.utils.Optional; import org.lance.spark.utils.SchemaConverter; +import org.lance.spark.utils.Utils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -57,6 +59,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.lance.spark.utils.Utils.createReadOptions; +import static org.lance.spark.utils.Utils.getSchema; + public abstract class BaseLanceNamespaceSparkCatalog implements TableCatalog, SupportsNamespaces, FunctionCatalog { @@ -411,62 +416,68 @@ public boolean tableExists(Identifier ident) { @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - // Transform identifier for API call - Identifier actualIdent = transformIdentifierForApi(ident); + Triple, String, Map> res = getTableMessage(ident); + List tableId = res.getLeft(); + String location = res.getMiddle(); + Map initialStorageOptions = res.getRight(); - // Build the table ID for credential vending - List tableId = buildTableId(actualIdent); + // Create read options with namespace support + LanceSparkReadOptions readOptions = + createReadOptions(location, catalogConfig, tableId, namespace); - // Call describeTable to get location and initial storage options - DescribeTableRequest describeRequest = new DescribeTableRequest(); - tableId.forEach(describeRequest::addIdItem); - DescribeTableResponse describeResponse; - try { - describeResponse = namespace.describeTable(describeRequest); - } catch (TableNotFoundException e) { - throw new NoSuchTableException(ident); - } catch (RuntimeException e) { - throw new RuntimeException("Failed to describe table: " + ident, e); - } + // Open dataset to get schema + StructType schema = getSchema(ident, location, readOptions, namespace); + return createDataset( + readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); + } - String location = describeResponse.getLocation(); - Map initialStorageOptions = describeResponse.getStorageOptions(); + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + Triple, String, Map> res = getTableMessage(ident); + List tableId = res.getLeft(); + String location = res.getMiddle(); + Map initialStorageOptions = res.getRight(); + + // Open dataset to get schema + long versionId = Utils.parseVersion(version); + LanceSparkReadOptions readOptions = + createReadOptions(location, versionId, catalogConfig, namespace, tableId); + StructType schema = getSchema(ident, location, readOptions, namespace); + + // Create read options with namespace support + return createDataset( + readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); + } + + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + Triple, String, Map> res = getTableMessage(ident); + List tableId = res.getLeft(); + String location = res.getMiddle(); + Map initialStorageOptions = res.getRight(); // Open dataset to get schema - StructType schema; + long versionId; try (Dataset dataset = Dataset.open() .allocator(LanceRuntime.allocator()) - .namespace(namespace) - .tableId(tableId) + .uri(location) + .readOptions(createReadOptions(location, catalogConfig).toReadOptions()) .build()) { - schema = LanceArrowUtils.fromArrowSchema(dataset.getSchema()); + versionId = Utils.findVersion(dataset.listVersions(), timestamp); } catch (IllegalArgumentException e) { throw new NoSuchTableException(ident); } + LanceSparkReadOptions readOptions = + createReadOptions(location, versionId, catalogConfig, namespace, tableId); + StructType schema = getSchema(ident, location, readOptions, namespace); + // Create read options with namespace support - LanceSparkReadOptions readOptions = createReadOptions(location, tableId); return createDataset( readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); } - /** - * Creates LanceSparkReadOptions with namespace settings for this catalog. - * - * @param location the dataset location URI - * @param tableId the table identifier within the namespace - * @return a new LanceSparkReadOptions with all catalog settings - */ - private LanceSparkReadOptions createReadOptions(String location, List tableId) { - return LanceSparkReadOptions.builder() - .datasetUri(location) - .withCatalogDefaults(catalogConfig) - .namespace(namespace) - .tableId(tableId) - .build(); - } - @Override public Table createTable( Identifier ident, StructType schema, Transform[] partitions, Map properties) @@ -500,7 +511,8 @@ public Table createTable( Map initialStorageOptions = describeResponse.getStorageOptions(); // Create read options with namespace settings - LanceSparkReadOptions readOptions = createReadOptions(location, tableIdList); + LanceSparkReadOptions readOptions = + createReadOptions(location, catalogConfig, tableIdList, namespace); return createDataset( readOptions, processedSchema, initialStorageOptions, namespaceImpl, namespaceProperties); } @@ -651,6 +663,31 @@ private List buildTableId(Identifier ident) { .collect(Collectors.toList()); } + private Triple, String, Map> getTableMessage(Identifier ident) + throws NoSuchTableException { + // Transform identifier for API call + Identifier actualIdent = transformIdentifierForApi(ident); + + // Build the table ID for credential vending + List tableId = buildTableId(actualIdent); + + // Call describeTable to get location and initial storage options + DescribeTableRequest describeRequest = new DescribeTableRequest(); + tableId.forEach(describeRequest::addIdItem); + DescribeTableResponse describeResponse; + try { + describeResponse = namespace.describeTable(describeRequest); + } catch (TableNotFoundException e) { + throw new NoSuchTableException(ident); + } catch (RuntimeException e) { + throw new RuntimeException("Failed to describe table: " + ident, e); + } + String location = describeResponse.getLocation(); + Map initialStorageOptions = describeResponse.getStorageOptions(); + + return Triple.of(tableId, location, initialStorageOptions); + } + public abstract LanceDataset createDataset( LanceSparkReadOptions readOptions, StructType sparkSchema, diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java index afe9c102d..f1bec6f72 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java @@ -15,6 +15,7 @@ import org.lance.Dataset; import org.lance.WriteParams; +import org.lance.spark.utils.Utils; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -30,6 +31,8 @@ import java.util.Map; +import static org.lance.spark.utils.Utils.*; + /** * A simple Lance catalog that supports both path-based and catalog-based table access. * @@ -53,19 +56,37 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public Table loadTable(Identifier ident) throws NoSuchTableException { String datasetUri = getDatasetUri(ident); - LanceSparkReadOptions readOptions = createReadOptions(datasetUri); - StructType schema; + LanceSparkReadOptions readOptions = createReadOptions(datasetUri, catalogConfig); + StructType schema = getSchema(ident, datasetUri, readOptions, null); + + return new LanceDataset(readOptions, schema, null, null, null); + } + + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + String datasetUri = getDatasetUri(ident); + long versionId = Utils.parseVersion(version); + + return getLanceDataset(ident, datasetUri, versionId, catalogConfig); + } + + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + String datasetUri = getDatasetUri(ident); + + long versionId; try (Dataset dataset = Dataset.open() .allocator(LanceRuntime.allocator()) .uri(datasetUri) - .readOptions(readOptions.toReadOptions()) + .readOptions(createReadOptions(datasetUri, catalogConfig).toReadOptions()) .build()) { - schema = LanceArrowUtils.fromArrowSchema(dataset.getSchema()); + versionId = Utils.findVersion(dataset.listVersions(), timestamp); } catch (IllegalArgumentException e) { throw new NoSuchTableException(ident); } - return new LanceDataset(readOptions, schema, null, null, null); + + return getLanceDataset(ident, datasetUri, versionId, catalogConfig); } @Override @@ -73,7 +94,7 @@ public Table createTable( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { String datasetUri = getDatasetUri(ident); - LanceSparkReadOptions readOptions = createReadOptions(datasetUri); + LanceSparkReadOptions readOptions = createReadOptions(datasetUri, catalogConfig); try { Dataset.write() .allocator(LanceRuntime.allocator()) @@ -115,19 +136,6 @@ public void initialize(String name, CaseInsensitiveStringMap options) { this.catalogConfig = LanceSparkCatalogConfig.from(options.asCaseSensitiveMap()); } - /** - * Creates LanceSparkReadOptions for this catalog. - * - * @param datasetUri the dataset URI - * @return a new LanceSparkReadOptions with catalog settings - */ - private LanceSparkReadOptions createReadOptions(String datasetUri) { - return LanceSparkReadOptions.builder() - .datasetUri(datasetUri) - .withCatalogDefaults(catalogConfig) - .build(); - } - @Override public String name() { return catalogName; @@ -169,4 +177,14 @@ private String getDatasetUri(Identifier ident) { sb.append(name); return sb.toString(); } + + private Table getLanceDataset( + Identifier ident, String datasetUri, long versionId, LanceSparkCatalogConfig catalogConfig) + throws NoSuchTableException { + LanceSparkReadOptions readOptions = + createReadOptions(datasetUri, versionId, catalogConfig, null, null); + StructType schema = getSchema(ident, datasetUri, readOptions, null); + + return new LanceDataset(readOptions, schema, null, null, null); + } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java index 17dbed211..f3cb1ad70 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java @@ -96,6 +96,7 @@ private Dataset openDataset(LanceSparkReadOptions readOptions) { .allocator(allocator) .namespace(readOptions.getNamespace()) .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) .build(); } else { return Dataset.open() diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index b35292ba8..72ad29804 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -103,6 +103,7 @@ private Dataset getOrOpenDataset() { .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace()) .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) .build(); } else { lazyDataset = diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java index 8ec4461ea..0bcd06796 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java @@ -51,6 +51,7 @@ private static Dataset openDataset(LanceSparkReadOptions readOptions) { .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace()) .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) .build(); } else { return Dataset.open() diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java new file mode 100644 index 000000000..9b8511685 --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java @@ -0,0 +1,135 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.utils; + +import org.lance.Dataset; +import org.lance.OpenDatasetBuilder; +import org.lance.Version; +import org.lance.namespace.LanceNamespace; +import org.lance.spark.LanceRuntime; +import org.lance.spark.LanceSparkCatalogConfig; +import org.lance.spark.LanceSparkReadOptions; + +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.LanceArrowUtils; + +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.List; + +public class Utils { + + public static long parseVersion(String version) { + return Long.parseUnsignedLong(version); + } + + public static long findVersion(List versions, long timestampMicros) { + long versionID = -1; + Instant timestamp = instantFromEpochNanos(timestampMicros); + for (Version version : versions) { + ZonedDateTime dataTime = version.getDataTime(); + if (dataTime.toInstant().compareTo(timestamp) < 0) { + versionID = version.getId(); + } else if (dataTime.toInstant().equals(timestamp)) { + return version.getId(); + } else { + break; + } + } + if (versionID == -1) { + throw new IllegalArgumentException("No version found with timestamp: " + timestampMicros); + } + return versionID; + } + + public static StructType getSchema( + Identifier ident, + String datasetUri, + LanceSparkReadOptions readOptions, + LanceNamespace namespace) + throws NoSuchTableException { + Dataset dataset = null; + try { + OpenDatasetBuilder builder = + Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(datasetUri) + .readOptions(readOptions.toReadOptions()); + if (namespace != null) { + builder.namespace(namespace); + } + dataset = builder.build(); + return LanceArrowUtils.fromArrowSchema(dataset.getSchema()); + } catch (IllegalArgumentException e) { + throw new NoSuchTableException(ident); + } finally { + if (dataset != null) { + dataset.close(); + } + } + } + + public static LanceSparkReadOptions createReadOptions( + String datasetUri, LanceSparkCatalogConfig catalogConfig) { + return createReadOptions(datasetUri, null, catalogConfig, null, null); + } + + public static LanceSparkReadOptions createReadOptions( + String datasetUri, + LanceSparkCatalogConfig catalogConfig, + List tableId, + LanceNamespace namespace) { + return createReadOptions(datasetUri, null, catalogConfig, namespace, tableId); + } + + /** + * Creates LanceSparkReadOptions for this catalog. + * + * @param location the dataset URI + * @param versionId optional dataset version id + * @return a new LanceSparkReadOptions with catalog settings + */ + public static LanceSparkReadOptions createReadOptions( + String location, + Long versionId, + LanceSparkCatalogConfig catalogConfig, + LanceNamespace namespace, + List tableId) { + LanceSparkReadOptions.Builder builder = + LanceSparkReadOptions.builder().datasetUri(location).withCatalogDefaults(catalogConfig); + + if (tableId != null) { + builder.tableId(tableId); + } + + if (namespace != null) { + builder.namespace(namespace); + } + + if (versionId != null) { + // TODO 修改version为long类型 + builder.version(versionId.intValue()); + } + return builder.build(); + } + + // Convert microseconds since epoch to ZonedDateTime in UTC + private static Instant instantFromEpochNanos(long epochNanos) { + long sec = Math.floorDiv(epochNanos, 1_000_000_000L); + long nanoAdj = Math.floorMod(epochNanos, 1_000_000_000L); + return Instant.ofEpochSecond(sec, nanoAdj); + } +} diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index 0a1e86c1b..db164e0c7 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -175,13 +175,14 @@ case class AddIndexExec( Dataset.open() .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace) + .readOptions(readOptions.toReadOptions()) .tableId(readOptions.getTableId) .build() } else { Dataset.open() .allocator(LanceRuntime.allocator()) .uri(readOptions.getDatasetUri) - .readOptions(readOptions.toReadOptions) + .readOptions(readOptions.toReadOptions()) .build() } } diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala index c643a348a..a223d7bfa 100644 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala @@ -74,6 +74,7 @@ case class VacuumExec( Dataset.open() .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace) + .readOptions(readOptions.toReadOptions()) .tableId(readOptions.getTableId) .build() } else { diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java index 4f8575c48..bbe128099 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java @@ -94,6 +94,52 @@ protected String generateTableName(String baseName) { return baseName + "_" + UUID.randomUUID().toString().replace("-", ""); } + @Test + public void testTimeTravelVersionAsOf() throws Exception { + String tableName = generateTableName("time_travel_version"); + String fullName = catalogName + ".default." + tableName; + + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + assertTrue(checkDataset(0, fullName)); + + spark.sql("INSERT INTO " + fullName + " VALUES (1, 'v1')"); + assertTrue(checkDataset(1, fullName)); + spark.sql("INSERT INTO " + fullName + " VALUES (2, 'v2')"); + assertTrue(checkDataset(2, fullName)); + + // time travel to version 2 (the second insert) + Dataset actual = spark.sql("SELECT * FROM " + fullName + " VERSION AS OF " + "2"); + List res = actual.collectAsList(); + assertEquals(1, res.size()); + } + + @Test + public void testTimeTravelTimestampAsOf() throws Exception { + String tableName = generateTableName("time_travel_version"); + String fullName = catalogName + ".default." + tableName; + + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + assertTrue(checkDataset(0, fullName)); + + spark.sql("INSERT INTO " + fullName + " VALUES (1, 'v1')"); + long ts1 = System.currentTimeMillis(); + assertTrue(checkDataset(1, fullName)); + spark.sql("INSERT INTO " + fullName + " VALUES (2, 'v2')"); + assertTrue(checkDataset(2, fullName)); + + // time travel to timestamp before second insert + Dataset actual = spark.sql("SELECT * FROM " + fullName + " TIMESTAMP AS OF " + ts1); + List res = actual.collectAsList(); + assertEquals(1, res.size()); + } + + private boolean checkDataset(int expectedSize, String tableName) { + Dataset actual = spark.sql("SELECT * FROM " + tableName); + List res = actual.collectAsList(); + + return expectedSize == res.size(); + } + @Test public void testCreateAndDescribeTable() throws Exception { String tableName = generateTableName("test_table"); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java index 699a82942..5bc0ce700 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java @@ -17,10 +17,7 @@ import org.lance.spark.LanceSparkReadOptions; import org.lance.spark.TestUtils; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.types.DataTypes; @@ -315,4 +312,57 @@ public void writeWithInvalidBatchSizeFails(TestInfo testInfo) { "Expected batch_size validation error, got: " + e.getMessage()); } } + + @Test + public void testTimeTravel(TestInfo testInfo) { + String tableName = testInfo.getTestMethod().get().getName(); + String outputPath = TestUtils.getDatasetUri(dbPath.toString(), tableName); + + StructType schema = + new StructType().add("id", DataTypes.LongType).add("value", DataTypes.LongType); + + List data1 = List.of(RowFactory.create(1L, 100L)); + Dataset df1 = spark.createDataFrame(data1, schema); + df1.write() + .format("lance") + .option( + LanceSparkReadOptions.CONFIG_DATASET_URI, + TestUtils.getDatasetUri(dbPath.toString(), tableName)) + .save(); + assertTrue(checkDataset(1, outputPath)); + + long ts1 = System.currentTimeMillis(); + + List data2 = List.of(RowFactory.create(2L, 200L)); + Dataset df2 = spark.createDataFrame(data2, schema); + df2.write().format("lance").mode(SaveMode.Append).save(outputPath); + assertTrue(checkDataset(2, outputPath)); + + // check timestamp as of + List res = + spark + .sql("select * from lance.`" + outputPath + "` TIMESTAMP AS OF " + ts1) + .collectAsList(); + assertEquals(1, res.size()); + + // check version as of + List res2 = + spark.sql("select * from lance.`" + outputPath + "` VERSION AS OF " + 2).collectAsList(); + assertEquals(1, res2.size()); + } + + private boolean checkDataset(int expectedSize, String path) { + Dataset lanceTable = + spark + .read() + .format(LanceDataSource.name) + .option(LanceSparkReadOptions.CONFIG_DATASET_URI, path) + .load(); + + lanceTable.createOrReplaceTempView("table_a"); + Dataset actual = spark.sql("SELECT * FROM table_a"); + List res = actual.collectAsList(); + + return expectedSize == res.size(); + } } diff --git a/org/apache/spark/sql/connector/catalog/TableCatalog.java b/org/apache/spark/sql/connector/catalog/TableCatalog.java new file mode 100644 index 000000000..eb442ad38 --- /dev/null +++ b/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.errors.QueryCompilationErrors; +import org.apache.spark.sql.types.StructType; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * Catalog methods for working with Tables. + *

+ * TableCatalog implementations may be case sensitive or case insensitive. Spark will pass + * {@link Identifier table identifiers} without modification. Field names passed to + * {@link #alterTable(Identifier, TableChange...)} will be normalized to match the case used in the + * table schema when updating, renaming, or dropping existing columns when catalyst analysis is case + * insensitive. + * + * @since 3.0.0 + */ +@Evolving +public interface TableCatalog extends CatalogPlugin { + + /** + * A reserved property to specify the location of the table. The files of the table + * should be under this location. + */ + String PROP_LOCATION = "location"; + + /** + * A reserved property to indicate that the table location is managed, not user-specified. + * If this property is "true", SHOW CREATE TABLE will not generate the LOCATION clause. + */ + String PROP_IS_MANAGED_LOCATION = "is_managed_location"; + + /** + * A reserved property to specify a table was created with EXTERNAL. + */ + String PROP_EXTERNAL = "external"; + + /** + * A reserved property to specify the description of the table. + */ + String PROP_COMMENT = "comment"; + + /** + * A reserved property to specify the provider of the table. + */ + String PROP_PROVIDER = "provider"; + + /** + * A reserved property to specify the owner of the table. + */ + String PROP_OWNER = "owner"; + + /** + * A prefix used to pass OPTIONS in table properties + */ + String OPTION_PREFIX = "option."; + + /** + * @return the set of capabilities for this TableCatalog + */ + default Set capabilities() { return Collections.emptySet(); } + + /** + * List the tables in a namespace from the catalog. + *

+ * If the catalog supports views, this must return identifiers for only tables and not views. + * + * @param namespace a multi-part namespace + * @return an array of Identifiers for tables + * @throws NoSuchNamespaceException If the namespace does not exist (optional). + */ + Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException; + + /** + * Load table metadata by {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist or is a view + */ + Table loadTable(Identifier ident) throws NoSuchTableException; + + /** + * Load table metadata of a specific version by {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @param version version of the table + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist or is a view + */ + default Table loadTable(Identifier ident, String version) throws NoSuchTableException { + throw QueryCompilationErrors.noSuchTableError(ident); + } + + /** + * Load table metadata at a specific time by {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @param timestamp timestamp of the table, which is microseconds since 1970-01-01 00:00:00 UTC + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist or is a view + */ + default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + throw QueryCompilationErrors.noSuchTableError(ident); + } + + /** + * Invalidate cached table metadata for an {@link Identifier identifier}. + *

+ * If the table is already loaded or cached, drop cached data. If the table does not exist or is + * not cached, do nothing. Calling this method should not query remote services. + * + * @param ident a table identifier + */ + default void invalidateTable(Identifier ident) { + } + + /** + * Test whether a table exists using an {@link Identifier identifier} from the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must return false. + * + * @param ident a table identifier + * @return true if the table exists, false otherwise + */ + default boolean tableExists(Identifier ident) { + try { + return loadTable(ident) != null; + } catch (NoSuchTableException e) { + return false; + } + } + + /** + * Create a table in the catalog. + *

+ * This is deprecated. Please override + * {@link #createTable(Identifier, Column[], Transform[], Map)} instead. + */ + @Deprecated + Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param columns the columns of the new table. + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + default Table createTable( + Identifier ident, + Column[] columns, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } + + /** + * Apply a set of {@link TableChange changes} to a table in the catalog. + *

+ * Implementations may reject the requested changes. If any change is rejected, none of the + * changes should be applied to the table. + *

+ * The requested changes must be applied in the order given. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must throw {@link NoSuchTableException}. + * + * @param ident a table identifier + * @param changes changes to apply to the table + * @return updated metadata for the table + * @throws NoSuchTableException If the table doesn't exist or is a view + * @throws IllegalArgumentException If any change is rejected by the implementation. + */ + Table alterTable( + Identifier ident, + TableChange... changes) throws NoSuchTableException; + + /** + * Drop a table in the catalog. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must not drop the view and must return false. + * + * @param ident a table identifier + * @return true if a table was deleted, false if no table exists for the identifier + */ + boolean dropTable(Identifier ident); + + /** + * Drop a table in the catalog and completely remove its data by skipping a trash even if it is + * supported. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must not drop the view and must return false. + *

+ * If the catalog supports to purge a table, this method should be overridden. + * The default implementation throws {@link UnsupportedOperationException}. + * + * @param ident a table identifier + * @return true if a table was deleted, false if no table exists for the identifier + * @throws UnsupportedOperationException If table purging is not supported + * + * @since 3.1.0 + */ + default boolean purgeTable(Identifier ident) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Purge table is not supported."); + } + + /** + * Renames a table in the catalog. + *

+ * If the catalog supports views and contains a view for the old identifier and not a table, this + * throws {@link NoSuchTableException}. Additionally, if the new identifier is a table or a view, + * this throws {@link TableAlreadyExistsException}. + *

+ * If the catalog does not support table renames between namespaces, it throws + * {@link UnsupportedOperationException}. + * + * @param oldIdent the table identifier of the existing table to rename + * @param newIdent the new table identifier of the table + * @throws NoSuchTableException If the table to rename doesn't exist or is a view + * @throws TableAlreadyExistsException If the new table name already exists or is a view + * @throws UnsupportedOperationException If the namespaces of old and new identifiers do not + * match (optional) + */ + void renameTable(Identifier oldIdent, Identifier newIdent) + throws NoSuchTableException, TableAlreadyExistsException; +} From 37eec7b9b3372fa9e351910254d45c02397cbf06 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 4 Feb 2026 12:02:50 +0800 Subject: [PATCH 02/13] lance spark support time travel --- .../spark/BaseLanceNamespaceSparkCatalog.java | 3 +- .../java/org/lance/spark/utils/Utils.java | 27 +- .../datasources/v2/AddIndexExec.scala | 4 +- .../sql/connector/catalog/TableCatalog.java | 273 ------------------ 4 files changed, 24 insertions(+), 283 deletions(-) delete mode 100644 org/apache/spark/sql/connector/catalog/TableCatalog.java diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index 76b276284..1e16eb55c 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -462,7 +462,8 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep Dataset.open() .allocator(LanceRuntime.allocator()) .uri(location) - .readOptions(createReadOptions(location, catalogConfig).toReadOptions()) + .readOptions( + createReadOptions(location, catalogConfig, tableId, namespace).toReadOptions()) .build()) { versionId = Utils.findVersion(dataset.listVersions(), timestamp); } catch (IllegalArgumentException e) { diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java index 9b8511685..37d259a7d 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java @@ -36,21 +36,21 @@ public static long parseVersion(String version) { return Long.parseUnsignedLong(version); } - public static long findVersion(List versions, long timestampMicros) { + public static long findVersion(List versions, long timestamp) { long versionID = -1; - Instant timestamp = instantFromEpochNanos(timestampMicros); + Instant instant = instantFromTimestamp(timestamp); for (Version version : versions) { ZonedDateTime dataTime = version.getDataTime(); - if (dataTime.toInstant().compareTo(timestamp) < 0) { + if (dataTime.toInstant().compareTo(instant) < 0) { versionID = version.getId(); - } else if (dataTime.toInstant().equals(timestamp)) { + } else if (dataTime.toInstant().equals(instant)) { return version.getId(); } else { break; } } if (versionID == -1) { - throw new IllegalArgumentException("No version found with timestamp: " + timestampMicros); + throw new IllegalArgumentException("No version found with timestamp: " + timestamp); } return versionID; } @@ -120,13 +120,26 @@ public static LanceSparkReadOptions createReadOptions( } if (versionId != null) { - // TODO 修改version为long类型 builder.version(versionId.intValue()); } return builder.build(); } - // Convert microseconds since epoch to ZonedDateTime in UTC + // Determine if the timestamp is in microseconds or nanoseconds and convert to Instant + private static Instant instantFromTimestamp(long timestamp) { + long abs = timestamp == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(timestamp); + if (abs >= 1_000_000_000_000_000_000L) { + return instantFromEpochNanos(timestamp); + } + return instantFromEpochMicros(timestamp); + } + + private static Instant instantFromEpochMicros(long epochMicros) { + long sec = Math.floorDiv(epochMicros, 1_000_000L); + long nanoAdj = Math.floorMod(epochMicros, 1_000_000L) * 1_000L; + return Instant.ofEpochSecond(sec, nanoAdj); + } + private static Instant instantFromEpochNanos(long epochNanos) { long sec = Math.floorDiv(epochNanos, 1_000_000_000L); long nanoAdj = Math.floorMod(epochNanos, 1_000_000_000L); diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index db164e0c7..1d3cf7bb7 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -175,14 +175,14 @@ case class AddIndexExec( Dataset.open() .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace) - .readOptions(readOptions.toReadOptions()) + .readOptions(readOptions.toReadOptions) .tableId(readOptions.getTableId) .build() } else { Dataset.open() .allocator(LanceRuntime.allocator()) .uri(readOptions.getDatasetUri) - .readOptions(readOptions.toReadOptions()) + .readOptions(readOptions.toReadOptions) .build() } } diff --git a/org/apache/spark/sql/connector/catalog/TableCatalog.java b/org/apache/spark/sql/connector/catalog/TableCatalog.java deleted file mode 100644 index eb442ad38..000000000 --- a/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.expressions.Transform; -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.apache.spark.sql.errors.QueryCompilationErrors; -import org.apache.spark.sql.types.StructType; - -import java.util.Collections; -import java.util.Map; -import java.util.Set; - -/** - * Catalog methods for working with Tables. - *

- * TableCatalog implementations may be case sensitive or case insensitive. Spark will pass - * {@link Identifier table identifiers} without modification. Field names passed to - * {@link #alterTable(Identifier, TableChange...)} will be normalized to match the case used in the - * table schema when updating, renaming, or dropping existing columns when catalyst analysis is case - * insensitive. - * - * @since 3.0.0 - */ -@Evolving -public interface TableCatalog extends CatalogPlugin { - - /** - * A reserved property to specify the location of the table. The files of the table - * should be under this location. - */ - String PROP_LOCATION = "location"; - - /** - * A reserved property to indicate that the table location is managed, not user-specified. - * If this property is "true", SHOW CREATE TABLE will not generate the LOCATION clause. - */ - String PROP_IS_MANAGED_LOCATION = "is_managed_location"; - - /** - * A reserved property to specify a table was created with EXTERNAL. - */ - String PROP_EXTERNAL = "external"; - - /** - * A reserved property to specify the description of the table. - */ - String PROP_COMMENT = "comment"; - - /** - * A reserved property to specify the provider of the table. - */ - String PROP_PROVIDER = "provider"; - - /** - * A reserved property to specify the owner of the table. - */ - String PROP_OWNER = "owner"; - - /** - * A prefix used to pass OPTIONS in table properties - */ - String OPTION_PREFIX = "option."; - - /** - * @return the set of capabilities for this TableCatalog - */ - default Set capabilities() { return Collections.emptySet(); } - - /** - * List the tables in a namespace from the catalog. - *

- * If the catalog supports views, this must return identifiers for only tables and not views. - * - * @param namespace a multi-part namespace - * @return an array of Identifiers for tables - * @throws NoSuchNamespaceException If the namespace does not exist (optional). - */ - Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException; - - /** - * Load table metadata by {@link Identifier identifier} from the catalog. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must throw {@link NoSuchTableException}. - * - * @param ident a table identifier - * @return the table's metadata - * @throws NoSuchTableException If the table doesn't exist or is a view - */ - Table loadTable(Identifier ident) throws NoSuchTableException; - - /** - * Load table metadata of a specific version by {@link Identifier identifier} from the catalog. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must throw {@link NoSuchTableException}. - * - * @param ident a table identifier - * @param version version of the table - * @return the table's metadata - * @throws NoSuchTableException If the table doesn't exist or is a view - */ - default Table loadTable(Identifier ident, String version) throws NoSuchTableException { - throw QueryCompilationErrors.noSuchTableError(ident); - } - - /** - * Load table metadata at a specific time by {@link Identifier identifier} from the catalog. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must throw {@link NoSuchTableException}. - * - * @param ident a table identifier - * @param timestamp timestamp of the table, which is microseconds since 1970-01-01 00:00:00 UTC - * @return the table's metadata - * @throws NoSuchTableException If the table doesn't exist or is a view - */ - default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - throw QueryCompilationErrors.noSuchTableError(ident); - } - - /** - * Invalidate cached table metadata for an {@link Identifier identifier}. - *

- * If the table is already loaded or cached, drop cached data. If the table does not exist or is - * not cached, do nothing. Calling this method should not query remote services. - * - * @param ident a table identifier - */ - default void invalidateTable(Identifier ident) { - } - - /** - * Test whether a table exists using an {@link Identifier identifier} from the catalog. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must return false. - * - * @param ident a table identifier - * @return true if the table exists, false otherwise - */ - default boolean tableExists(Identifier ident) { - try { - return loadTable(ident) != null; - } catch (NoSuchTableException e) { - return false; - } - } - - /** - * Create a table in the catalog. - *

- * This is deprecated. Please override - * {@link #createTable(Identifier, Column[], Transform[], Map)} instead. - */ - @Deprecated - Table createTable( - Identifier ident, - StructType schema, - Transform[] partitions, - Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; - - /** - * Create a table in the catalog. - * - * @param ident a table identifier - * @param columns the columns of the new table. - * @param partitions transforms to use for partitioning data in the table - * @param properties a string map of table properties - * @return metadata for the new table - * @throws TableAlreadyExistsException If a table or view already exists for the identifier - * @throws UnsupportedOperationException If a requested partition transform is not supported - * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) - */ - default Table createTable( - Identifier ident, - Column[] columns, - Transform[] partitions, - Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { - return createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); - } - - /** - * Apply a set of {@link TableChange changes} to a table in the catalog. - *

- * Implementations may reject the requested changes. If any change is rejected, none of the - * changes should be applied to the table. - *

- * The requested changes must be applied in the order given. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must throw {@link NoSuchTableException}. - * - * @param ident a table identifier - * @param changes changes to apply to the table - * @return updated metadata for the table - * @throws NoSuchTableException If the table doesn't exist or is a view - * @throws IllegalArgumentException If any change is rejected by the implementation. - */ - Table alterTable( - Identifier ident, - TableChange... changes) throws NoSuchTableException; - - /** - * Drop a table in the catalog. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must not drop the view and must return false. - * - * @param ident a table identifier - * @return true if a table was deleted, false if no table exists for the identifier - */ - boolean dropTable(Identifier ident); - - /** - * Drop a table in the catalog and completely remove its data by skipping a trash even if it is - * supported. - *

- * If the catalog supports views and contains a view for the identifier and not a table, this - * must not drop the view and must return false. - *

- * If the catalog supports to purge a table, this method should be overridden. - * The default implementation throws {@link UnsupportedOperationException}. - * - * @param ident a table identifier - * @return true if a table was deleted, false if no table exists for the identifier - * @throws UnsupportedOperationException If table purging is not supported - * - * @since 3.1.0 - */ - default boolean purgeTable(Identifier ident) throws UnsupportedOperationException { - throw new UnsupportedOperationException("Purge table is not supported."); - } - - /** - * Renames a table in the catalog. - *

- * If the catalog supports views and contains a view for the old identifier and not a table, this - * throws {@link NoSuchTableException}. Additionally, if the new identifier is a table or a view, - * this throws {@link TableAlreadyExistsException}. - *

- * If the catalog does not support table renames between namespaces, it throws - * {@link UnsupportedOperationException}. - * - * @param oldIdent the table identifier of the existing table to rename - * @param newIdent the new table identifier of the table - * @throws NoSuchTableException If the table to rename doesn't exist or is a view - * @throws TableAlreadyExistsException If the new table name already exists or is a view - * @throws UnsupportedOperationException If the namespaces of old and new identifiers do not - * match (optional) - */ - void renameTable(Identifier oldIdent, Identifier newIdent) - throws NoSuchTableException, TableAlreadyExistsException; -} From 5bd525bd0546a38ad778ea6583c6b88531c18f58 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 4 Feb 2026 12:09:55 +0800 Subject: [PATCH 03/13] fix ut --- .../java/org/lance/spark/SparkLanceNamespaceTestBase.java | 4 +++- .../org/lance/spark/write/BaseSparkConnectorWriteTest.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java index bbe128099..64ebe4df1 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -122,7 +123,8 @@ public void testTimeTravelTimestampAsOf() throws Exception { assertTrue(checkDataset(0, fullName)); spark.sql("INSERT INTO " + fullName + " VALUES (1, 'v1')"); - long ts1 = System.currentTimeMillis(); + Instant instant = Instant.now(); + long ts1 = instant.toEpochMilli(); assertTrue(checkDataset(1, fullName)); spark.sql("INSERT INTO " + fullName + " VALUES (2, 'v2')"); assertTrue(checkDataset(2, fullName)); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java index 5bc0ce700..ab03d5e21 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java @@ -31,6 +31,7 @@ import java.io.File; import java.nio.file.Path; +import java.time.Instant; import java.util.Arrays; import java.util.List; @@ -331,7 +332,8 @@ public void testTimeTravel(TestInfo testInfo) { .save(); assertTrue(checkDataset(1, outputPath)); - long ts1 = System.currentTimeMillis(); + Instant instant = Instant.now(); + long ts1 = instant.toEpochMilli(); List data2 = List.of(RowFactory.create(2L, 200L)); Dataset df2 = spark.createDataFrame(data2, schema); From 825c8866cf63fb7ade954dc8811746c06b1f0e7c Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 4 Feb 2026 14:05:54 +0800 Subject: [PATCH 04/13] fix ut --- .../spark/TestSparkDirectoryNamespace.java | 17 ++++++- .../spark/SparkLanceNamespaceTestBase.java | 50 +++++++++++++++---- .../write/BaseSparkConnectorWriteTest.java | 23 +++++---- 3 files changed, 68 insertions(+), 22 deletions(-) diff --git a/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java index 7f43cf3e8..dffceed7b 100644 --- a/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java +++ b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java @@ -13,4 +13,19 @@ */ package org.lance.spark; -public class TestSparkDirectoryNamespace extends BaseTestSparkDirectoryNamespace {} +import org.junit.jupiter.api.Test; + +public class TestSparkDirectoryNamespace extends BaseTestSparkDirectoryNamespace { + + @Test + @Override + public void testTimeTravelVersionAsOf() throws Exception { + super.testTimeTravelVersionAsOf(); + } + + @Test + @Override + public void testTimeTravelTimestampAsOf() throws Exception { + super.testTimeTravelTimestampAsOf(); + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java index 64ebe4df1..1b2c9312b 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java @@ -13,9 +13,12 @@ */ package org.lance.spark; +import org.lance.Version; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -24,7 +27,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -123,25 +125,19 @@ public void testTimeTravelTimestampAsOf() throws Exception { assertTrue(checkDataset(0, fullName)); spark.sql("INSERT INTO " + fullName + " VALUES (1, 'v1')"); - Instant instant = Instant.now(); - long ts1 = instant.toEpochMilli(); assertTrue(checkDataset(1, fullName)); spark.sql("INSERT INTO " + fullName + " VALUES (2, 'v2')"); assertTrue(checkDataset(2, fullName)); + Version version = getLatestVersion(tableName); + long ts = version.getDataTime().toInstant().toEpochMilli() - 1; + // time travel to timestamp before second insert - Dataset actual = spark.sql("SELECT * FROM " + fullName + " TIMESTAMP AS OF " + ts1); + Dataset actual = spark.sql("SELECT * FROM " + fullName + " TIMESTAMP AS OF " + ts); List res = actual.collectAsList(); assertEquals(1, res.size()); } - private boolean checkDataset(int expectedSize, String tableName) { - Dataset actual = spark.sql("SELECT * FROM " + tableName); - List res = actual.collectAsList(); - - return expectedSize == res.size(); - } - @Test public void testCreateAndDescribeTable() throws Exception { String tableName = generateTableName("test_table"); @@ -618,4 +614,36 @@ public void testOnePartIdentifier() throws Exception { assertEquals(42L, row.getLong(0)); assertEquals(3.14, row.getDouble(1), 0.001); } + + private boolean checkDataset(int expectedSize, String tableName) { + Dataset actual = spark.sql("SELECT * FROM " + tableName); + List res = actual.collectAsList(); + + return expectedSize == res.size(); + } + + private Version getLatestVersion(String tableName) throws Exception { + Identifier ident = Identifier.of(new String[] {"default"}, tableName); + LanceDataset lanceTable = (LanceDataset) catalog.loadTable(ident); + LanceSparkReadOptions readOptions = lanceTable.readOptions(); + try (org.lance.Dataset dataset = openLatestDataset(readOptions)) { + return dataset.getVersion(); + } + } + + private org.lance.Dataset openLatestDataset(LanceSparkReadOptions readOptions) { + if (readOptions.hasNamespace()) { + return org.lance.Dataset.open() + .allocator(LanceRuntime.allocator()) + .namespace(readOptions.getNamespace()) + .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) + .build(); + } + return org.lance.Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(readOptions.getDatasetUri()) + .readOptions(readOptions.toReadOptions()) + .build(); + } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java index ab03d5e21..59f93f16a 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java @@ -13,7 +13,9 @@ */ package org.lance.spark.write; +import org.lance.Version; import org.lance.spark.LanceDataSource; +import org.lance.spark.LanceRuntime; import org.lance.spark.LanceSparkReadOptions; import org.lance.spark.TestUtils; @@ -31,7 +33,6 @@ import java.io.File; import java.nio.file.Path; -import java.time.Instant; import java.util.Arrays; import java.util.List; @@ -324,23 +325,18 @@ public void testTimeTravel(TestInfo testInfo) { List data1 = List.of(RowFactory.create(1L, 100L)); Dataset df1 = spark.createDataFrame(data1, schema); - df1.write() - .format("lance") - .option( - LanceSparkReadOptions.CONFIG_DATASET_URI, - TestUtils.getDatasetUri(dbPath.toString(), tableName)) - .save(); + df1.write().format("lance").option(LanceSparkReadOptions.CONFIG_DATASET_URI, outputPath).save(); assertTrue(checkDataset(1, outputPath)); - Instant instant = Instant.now(); - long ts1 = instant.toEpochMilli(); - List data2 = List.of(RowFactory.create(2L, 200L)); Dataset df2 = spark.createDataFrame(data2, schema); df2.write().format("lance").mode(SaveMode.Append).save(outputPath); assertTrue(checkDataset(2, outputPath)); // check timestamp as of + Version version = getLatestVersion(outputPath); + long ts1 = version.getDataTime().toInstant().toEpochMilli() - 1; + List res = spark .sql("select * from lance.`" + outputPath + "` TIMESTAMP AS OF " + ts1) @@ -367,4 +363,11 @@ private boolean checkDataset(int expectedSize, String path) { return expectedSize == res.size(); } + + private Version getLatestVersion(String datasetUri) { + try (org.lance.Dataset dataset = + org.lance.Dataset.open().allocator(LanceRuntime.allocator()).uri(datasetUri).build()) { + return dataset.getVersion(); + } + } } From 3a2d50c0e54eb5bec4c95dee6391ac62cfc12a32 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 4 Feb 2026 14:13:22 +0800 Subject: [PATCH 05/13] fix ut --- .../lance/spark/TestSparkDirectoryNamespace.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java index dffceed7b..a69afdf60 100644 --- a/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java +++ b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java @@ -15,17 +15,4 @@ import org.junit.jupiter.api.Test; -public class TestSparkDirectoryNamespace extends BaseTestSparkDirectoryNamespace { - - @Test - @Override - public void testTimeTravelVersionAsOf() throws Exception { - super.testTimeTravelVersionAsOf(); - } - - @Test - @Override - public void testTimeTravelTimestampAsOf() throws Exception { - super.testTimeTravelTimestampAsOf(); - } -} +public class TestSparkDirectoryNamespace extends BaseTestSparkDirectoryNamespace {} From d8ec42f005c47b833df42e2cf02d9ab9ca063c7a Mon Sep 17 00:00:00 2001 From: YueZhang Date: Wed, 4 Feb 2026 14:13:31 +0800 Subject: [PATCH 06/13] fix ut --- .../test/java/org/lance/spark/TestSparkDirectoryNamespace.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java index a69afdf60..7f43cf3e8 100644 --- a/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java +++ b/lance-spark-3.4_2.12/src/test/java/org/lance/spark/TestSparkDirectoryNamespace.java @@ -13,6 +13,4 @@ */ package org.lance.spark; -import org.junit.jupiter.api.Test; - public class TestSparkDirectoryNamespace extends BaseTestSparkDirectoryNamespace {} From 64325cdf785d3ea6d66171de46d9fa212e6f12ce Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Feb 2026 15:32:38 +0800 Subject: [PATCH 07/13] merge main --- .../java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index 1e16eb55c..4f9d5c114 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -466,7 +466,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep createReadOptions(location, catalogConfig, tableId, namespace).toReadOptions()) .build()) { versionId = Utils.findVersion(dataset.listVersions(), timestamp); - } catch (IllegalArgumentException e) { + } catch (TableNotFoundException e) { throw new NoSuchTableException(ident); } From 40c77715c24166ca4a4103699dc3a3069dfb9735 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Feb 2026 20:26:31 +0800 Subject: [PATCH 08/13] code review --- .../spark/BaseLanceNamespaceSparkCatalog.java | 11 +-- .../java/org/lance/spark/LanceCatalog.java | 11 +-- .../java/org/lance/spark/utils/Utils.java | 26 +------ .../spark/SparkLanceNamespaceTestBase.java | 8 ++- .../write/BaseSparkConnectorWriteTest.java | 22 ++++-- .../catalyst/analysis/TimeTravelSpec.scala | 68 +++++++++++++++++++ 6 files changed, 105 insertions(+), 41 deletions(-) create mode 100644 org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index 4f9d5c114..c9a21c2e5 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -423,7 +423,7 @@ public Table loadTable(Identifier ident) throws NoSuchTableException { // Create read options with namespace support LanceSparkReadOptions readOptions = - createReadOptions(location, catalogConfig, tableId, namespace); + createReadOptions(location, catalogConfig, null, namespace, tableId); // Open dataset to get schema StructType schema = getSchema(ident, location, readOptions, namespace); @@ -441,7 +441,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep // Open dataset to get schema long versionId = Utils.parseVersion(version); LanceSparkReadOptions readOptions = - createReadOptions(location, versionId, catalogConfig, namespace, tableId); + createReadOptions(location, catalogConfig, versionId, namespace, tableId); StructType schema = getSchema(ident, location, readOptions, namespace); // Create read options with namespace support @@ -463,7 +463,8 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep .allocator(LanceRuntime.allocator()) .uri(location) .readOptions( - createReadOptions(location, catalogConfig, tableId, namespace).toReadOptions()) + createReadOptions(location, catalogConfig, null, namespace, tableId) + .toReadOptions()) .build()) { versionId = Utils.findVersion(dataset.listVersions(), timestamp); } catch (TableNotFoundException e) { @@ -471,7 +472,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep } LanceSparkReadOptions readOptions = - createReadOptions(location, versionId, catalogConfig, namespace, tableId); + createReadOptions(location, catalogConfig, versionId, namespace, tableId); StructType schema = getSchema(ident, location, readOptions, namespace); // Create read options with namespace support @@ -513,7 +514,7 @@ public Table createTable( // Create read options with namespace settings LanceSparkReadOptions readOptions = - createReadOptions(location, catalogConfig, tableIdList, namespace); + createReadOptions(location, catalogConfig, null, namespace, tableIdList); return createDataset( readOptions, processedSchema, initialStorageOptions, namespaceImpl, namespaceProperties); } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java index f1bec6f72..ff2e1e717 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java @@ -56,7 +56,8 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public Table loadTable(Identifier ident) throws NoSuchTableException { String datasetUri = getDatasetUri(ident); - LanceSparkReadOptions readOptions = createReadOptions(datasetUri, catalogConfig); + LanceSparkReadOptions readOptions = + createReadOptions(datasetUri, catalogConfig, null, null, null); StructType schema = getSchema(ident, datasetUri, readOptions, null); return new LanceDataset(readOptions, schema, null, null, null); @@ -79,7 +80,8 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep Dataset.open() .allocator(LanceRuntime.allocator()) .uri(datasetUri) - .readOptions(createReadOptions(datasetUri, catalogConfig).toReadOptions()) + .readOptions( + createReadOptions(datasetUri, catalogConfig, null, null, null).toReadOptions()) .build()) { versionId = Utils.findVersion(dataset.listVersions(), timestamp); } catch (IllegalArgumentException e) { @@ -94,7 +96,8 @@ public Table createTable( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { String datasetUri = getDatasetUri(ident); - LanceSparkReadOptions readOptions = createReadOptions(datasetUri, catalogConfig); + LanceSparkReadOptions readOptions = + createReadOptions(datasetUri, catalogConfig, null, null, null); try { Dataset.write() .allocator(LanceRuntime.allocator()) @@ -182,7 +185,7 @@ private Table getLanceDataset( Identifier ident, String datasetUri, long versionId, LanceSparkCatalogConfig catalogConfig) throws NoSuchTableException { LanceSparkReadOptions readOptions = - createReadOptions(datasetUri, versionId, catalogConfig, null, null); + createReadOptions(datasetUri, catalogConfig, versionId, null, null); StructType schema = getSchema(ident, datasetUri, readOptions, null); return new LanceDataset(readOptions, schema, null, null, null); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java index 37d259a7d..14f7c765f 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java @@ -82,19 +82,6 @@ public static StructType getSchema( } } - public static LanceSparkReadOptions createReadOptions( - String datasetUri, LanceSparkCatalogConfig catalogConfig) { - return createReadOptions(datasetUri, null, catalogConfig, null, null); - } - - public static LanceSparkReadOptions createReadOptions( - String datasetUri, - LanceSparkCatalogConfig catalogConfig, - List tableId, - LanceNamespace namespace) { - return createReadOptions(datasetUri, null, catalogConfig, namespace, tableId); - } - /** * Creates LanceSparkReadOptions for this catalog. * @@ -104,8 +91,8 @@ public static LanceSparkReadOptions createReadOptions( */ public static LanceSparkReadOptions createReadOptions( String location, - Long versionId, LanceSparkCatalogConfig catalogConfig, + Long versionId, LanceNamespace namespace, List tableId) { LanceSparkReadOptions.Builder builder = @@ -127,9 +114,8 @@ public static LanceSparkReadOptions createReadOptions( // Determine if the timestamp is in microseconds or nanoseconds and convert to Instant private static Instant instantFromTimestamp(long timestamp) { - long abs = timestamp == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(timestamp); - if (abs >= 1_000_000_000_000_000_000L) { - return instantFromEpochNanos(timestamp); + if (timestamp <= 0) { + throw new IllegalArgumentException("Timestamp must be greater than zero"); } return instantFromEpochMicros(timestamp); } @@ -139,10 +125,4 @@ private static Instant instantFromEpochMicros(long epochMicros) { long nanoAdj = Math.floorMod(epochMicros, 1_000_000L) * 1_000L; return Instant.ofEpochSecond(sec, nanoAdj); } - - private static Instant instantFromEpochNanos(long epochNanos) { - long sec = Math.floorDiv(epochNanos, 1_000_000_000L); - long nanoAdj = Math.floorMod(epochNanos, 1_000_000_000L); - return Instant.ofEpochSecond(sec, nanoAdj); - } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java index 1b2c9312b..97f121e1e 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,6 +55,7 @@ void setup() throws IOException { .config( "spark.sql.catalog." + catalogName, "org.lance.spark.LanceNamespaceSparkCatalog") .config("spark.sql.catalog." + catalogName + ".impl", getNsImpl()) + .config("spark.sql.session.timeZone", "UTC") .getOrCreate(); Map additionalConfigs = getAdditionalNsConfigs(); @@ -130,10 +132,12 @@ public void testTimeTravelTimestampAsOf() throws Exception { assertTrue(checkDataset(2, fullName)); Version version = getLatestVersion(tableName); - long ts = version.getDataTime().toInstant().toEpochMilli() - 1; + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String date = version.getDataTime().format(format); // time travel to timestamp before second insert - Dataset actual = spark.sql("SELECT * FROM " + fullName + " TIMESTAMP AS OF " + ts); + Dataset actual = + spark.sql("SELECT * FROM " + fullName + " TIMESTAMP AS OF '" + date + "'"); List res = actual.collectAsList(); assertEquals(1, res.size()); } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java index 59f93f16a..a396d38da 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java @@ -33,6 +33,7 @@ import java.io.File; import java.nio.file.Path; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; @@ -55,6 +56,7 @@ static void setup() { .master("local") .config("spark.sql.catalog.lance", "org.lance.spark.LanceCatalog") .config("spark.sql.catalog.lance.max_row_per_file", "1") + .config("spark.sql.session.timeZone", "UTC") .getOrCreate(); StructType schema = new StructType( @@ -316,7 +318,7 @@ public void writeWithInvalidBatchSizeFails(TestInfo testInfo) { } @Test - public void testTimeTravel(TestInfo testInfo) { + public void testTimeTravel(TestInfo testInfo) throws InterruptedException { String tableName = testInfo.getTestMethod().get().getName(); String outputPath = TestUtils.getDatasetUri(dbPath.toString(), tableName); @@ -327,20 +329,26 @@ public void testTimeTravel(TestInfo testInfo) { Dataset df1 = spark.createDataFrame(data1, schema); df1.write().format("lance").option(LanceSparkReadOptions.CONFIG_DATASET_URI, outputPath).save(); assertTrue(checkDataset(1, outputPath)); + Thread.sleep(1000); List data2 = List.of(RowFactory.create(2L, 200L)); Dataset df2 = spark.createDataFrame(data2, schema); df2.write().format("lance").mode(SaveMode.Append).save(outputPath); assertTrue(checkDataset(2, outputPath)); + Version version_3 = getLatestVersion(outputPath); + + List data3 = List.of(RowFactory.create(3L, 300L)); + Dataset df3 = spark.createDataFrame(data3, schema); + df3.write().format("lance").mode(SaveMode.Append).save(outputPath); + assertTrue(checkDataset(3, outputPath)); + // check timestamp as of - Version version = getLatestVersion(outputPath); - long ts1 = version.getDataTime().toInstant().toEpochMilli() - 1; + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String date = version_3.getDataTime().format(format); + String sql = String.format("select * from lance.`%s` TIMESTAMP AS OF '%s'", outputPath, date); - List res = - spark - .sql("select * from lance.`" + outputPath + "` TIMESTAMP AS OF " + ts1) - .collectAsList(); + List res = spark.sql(sql).collectAsList(); assertEquals(1, res.size()); // check version as of diff --git a/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala b/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala new file mode 100644 index 000000000..26856d9a5 --- /dev/null +++ b/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, RuntimeReplaceable, SubqueryExpression, Unevaluable} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.TimestampType + +sealed trait TimeTravelSpec + +case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec +case class AsOfVersion(version: String) extends TimeTravelSpec + +object TimeTravelSpec { + def create( + timestamp: Option[Expression], + version: Option[String], + conf: SQLConf) : Option[TimeTravelSpec] = { + if (timestamp.nonEmpty && version.nonEmpty) { + throw QueryCompilationErrors.invalidTimeTravelSpecError() + } else if (timestamp.nonEmpty) { + val ts = timestamp.get + assert(ts.resolved && ts.references.isEmpty && !SubqueryExpression.hasSubquery(ts)) + if (!Cast.canAnsiCast(ts.dataType, TimestampType)) { + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) + } + val tsToEval = ts.transform { + case r: RuntimeReplaceable => r.replacement + case _: Unevaluable => + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.UNEVALUABLE", ts) + case e if !e.deterministic => + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts) + } + val tz = Some(conf.sessionLocalTimeZone) + // Set `ansiEnabled` to false, so that it can return null for invalid input and we can provide + // better error message. + val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval() + if (value == null) { + throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) + } + Some(AsOfTimestamp(value.asInstanceOf[Long])) + } else if (version.nonEmpty) { + Some(AsOfVersion(version.get)) + } else { + None + } + } +} From cd77d74377fc03595301636941b6e94fa135a540 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Feb 2026 20:57:55 +0800 Subject: [PATCH 09/13] code review --- .../spark/BaseLanceNamespaceSparkCatalog.java | 101 ++++++++---------- .../java/org/lance/spark/LanceCatalog.java | 67 ++++++------ .../java/org/lance/spark/utils/Utils.java | 21 ++-- 3 files changed, 89 insertions(+), 100 deletions(-) diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index c9a21c2e5..629f2e66e 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -28,7 +28,6 @@ import org.lance.spark.utils.SchemaConverter; import org.lance.spark.utils.Utils; -import org.apache.commons.lang3.tuple.Triple; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -416,68 +415,17 @@ public boolean tableExists(Identifier ident) { @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - Triple, String, Map> res = getTableMessage(ident); - List tableId = res.getLeft(); - String location = res.getMiddle(); - Map initialStorageOptions = res.getRight(); - - // Create read options with namespace support - LanceSparkReadOptions readOptions = - createReadOptions(location, catalogConfig, null, namespace, tableId); - - // Open dataset to get schema - StructType schema = getSchema(ident, location, readOptions, namespace); - return createDataset( - readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); + return loadTableInternal(ident, Optional.empty(), Optional.empty()); } @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { - Triple, String, Map> res = getTableMessage(ident); - List tableId = res.getLeft(); - String location = res.getMiddle(); - Map initialStorageOptions = res.getRight(); - - // Open dataset to get schema - long versionId = Utils.parseVersion(version); - LanceSparkReadOptions readOptions = - createReadOptions(location, catalogConfig, versionId, namespace, tableId); - StructType schema = getSchema(ident, location, readOptions, namespace); - - // Create read options with namespace support - return createDataset( - readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); + return loadTableInternal(ident, Optional.empty(), Optional.of(version)); } @Override public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - Triple, String, Map> res = getTableMessage(ident); - List tableId = res.getLeft(); - String location = res.getMiddle(); - Map initialStorageOptions = res.getRight(); - - // Open dataset to get schema - long versionId; - try (Dataset dataset = - Dataset.open() - .allocator(LanceRuntime.allocator()) - .uri(location) - .readOptions( - createReadOptions(location, catalogConfig, null, namespace, tableId) - .toReadOptions()) - .build()) { - versionId = Utils.findVersion(dataset.listVersions(), timestamp); - } catch (TableNotFoundException e) { - throw new NoSuchTableException(ident); - } - - LanceSparkReadOptions readOptions = - createReadOptions(location, catalogConfig, versionId, namespace, tableId); - StructType schema = getSchema(ident, location, readOptions, namespace); - - // Create read options with namespace support - return createDataset( - readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); + return loadTableInternal(ident, Optional.of(timestamp), null); } @Override @@ -514,7 +462,12 @@ public Table createTable( // Create read options with namespace settings LanceSparkReadOptions readOptions = - createReadOptions(location, catalogConfig, null, namespace, tableIdList); + createReadOptions( + location, + catalogConfig, + Optional.empty(), + Optional.of(namespace), + Optional.of(tableIdList)); return createDataset( readOptions, processedSchema, initialStorageOptions, namespaceImpl, namespaceProperties); } @@ -665,8 +618,10 @@ private List buildTableId(Identifier ident) { .collect(Collectors.toList()); } - private Triple, String, Map> getTableMessage(Identifier ident) + private Table loadTableInternal( + Identifier ident, Optional timestamp, Optional version) throws NoSuchTableException { + // Transform identifier for API call Identifier actualIdent = transformIdentifierForApi(ident); @@ -687,7 +642,37 @@ private Triple, String, Map> getTableMessage(Identi String location = describeResponse.getLocation(); Map initialStorageOptions = describeResponse.getStorageOptions(); - return Triple.of(tableId, location, initialStorageOptions); + Optional versionId = Optional.empty(); + if (timestamp.isPresent()) { + try (Dataset dataset = + Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(location) + .readOptions( + createReadOptions( + location, + catalogConfig, + Optional.empty(), + Optional.of(namespace), + Optional.of(tableId)) + .toReadOptions()) + .build()) { + versionId = Optional.of(Utils.findVersion(dataset.listVersions(), timestamp.get())); + } catch (TableNotFoundException e) { + throw new NoSuchTableException(ident); + } + } else if (version.isPresent()) { + versionId = Optional.of(Utils.parseVersion(version.get())); + } + + LanceSparkReadOptions readOptions = + createReadOptions( + location, catalogConfig, versionId, Optional.of(namespace), Optional.of(tableId)); + StructType schema = getSchema(ident, location, readOptions, namespace); + + // Create read options with namespace support + return createDataset( + readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); } public abstract LanceDataset createDataset( diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java index ff2e1e717..9c17fa110 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java @@ -15,6 +15,7 @@ import org.lance.Dataset; import org.lance.WriteParams; +import org.lance.spark.utils.Optional; import org.lance.spark.utils.Utils; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -55,40 +56,17 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - String datasetUri = getDatasetUri(ident); - LanceSparkReadOptions readOptions = - createReadOptions(datasetUri, catalogConfig, null, null, null); - StructType schema = getSchema(ident, datasetUri, readOptions, null); - - return new LanceDataset(readOptions, schema, null, null, null); + return loadTableInternal(ident, Optional.empty(), Optional.empty()); } @Override public Table loadTable(Identifier ident, String version) throws NoSuchTableException { - String datasetUri = getDatasetUri(ident); - long versionId = Utils.parseVersion(version); - - return getLanceDataset(ident, datasetUri, versionId, catalogConfig); + return loadTableInternal(ident, Optional.empty(), Optional.of(version)); } @Override public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - String datasetUri = getDatasetUri(ident); - - long versionId; - try (Dataset dataset = - Dataset.open() - .allocator(LanceRuntime.allocator()) - .uri(datasetUri) - .readOptions( - createReadOptions(datasetUri, catalogConfig, null, null, null).toReadOptions()) - .build()) { - versionId = Utils.findVersion(dataset.listVersions(), timestamp); - } catch (IllegalArgumentException e) { - throw new NoSuchTableException(ident); - } - - return getLanceDataset(ident, datasetUri, versionId, catalogConfig); + return loadTableInternal(ident, Optional.of(timestamp), Optional.empty()); } @Override @@ -97,7 +75,8 @@ public Table createTable( throws TableAlreadyExistsException, NoSuchNamespaceException { String datasetUri = getDatasetUri(ident); LanceSparkReadOptions readOptions = - createReadOptions(datasetUri, catalogConfig, null, null, null); + createReadOptions( + datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty()); try { Dataset.write() .allocator(LanceRuntime.allocator()) @@ -181,11 +160,37 @@ private String getDatasetUri(Identifier ident) { return sb.toString(); } - private Table getLanceDataset( - Identifier ident, String datasetUri, long versionId, LanceSparkCatalogConfig catalogConfig) - throws NoSuchTableException { + private Table loadTableInternal( + Identifier ident, Optional timestamp, Optional version) + throws NoSuchTableException { + String datasetUri = getDatasetUri(ident); + + Optional versionId = Optional.empty(); + + if (version.isPresent()) { + versionId = Optional.of(Utils.parseVersion(version.get())); + } else if (timestamp.isPresent()) { + try (Dataset dataset = + Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(datasetUri) + .readOptions( + createReadOptions( + datasetUri, + catalogConfig, + Optional.empty(), + Optional.empty(), + Optional.empty()) + .toReadOptions()) + .build()) { + versionId = Optional.of(Utils.findVersion(dataset.listVersions(), timestamp.get())); + } catch (IllegalArgumentException e) { + throw new NoSuchTableException(ident); + } + } + LanceSparkReadOptions readOptions = - createReadOptions(datasetUri, catalogConfig, versionId, null, null); + createReadOptions(datasetUri, catalogConfig, versionId, Optional.empty(), Optional.empty()); StructType schema = getSchema(ident, datasetUri, readOptions, null); return new LanceDataset(readOptions, schema, null, null, null); diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java index 14f7c765f..27781016a 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java @@ -92,23 +92,22 @@ public static StructType getSchema( public static LanceSparkReadOptions createReadOptions( String location, LanceSparkCatalogConfig catalogConfig, - Long versionId, - LanceNamespace namespace, - List tableId) { + Optional versionId, + Optional namespace, + Optional> tableId) { LanceSparkReadOptions.Builder builder = LanceSparkReadOptions.builder().datasetUri(location).withCatalogDefaults(catalogConfig); - if (tableId != null) { - builder.tableId(tableId); + if (versionId.isPresent()) { + builder.version(versionId.get().intValue()); } - - if (namespace != null) { - builder.namespace(namespace); + if (tableId.isPresent()) { + builder.tableId(tableId.get()); } - - if (versionId != null) { - builder.version(versionId.intValue()); + if (namespace.isPresent()) { + builder.namespace(namespace.get()); } + return builder.build(); } From 6dfb204d77151bbef1be4481759aa97d0bc5c5a0 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Feb 2026 21:00:05 +0800 Subject: [PATCH 10/13] code review --- .../catalyst/analysis/TimeTravelSpec.scala | 68 ------------------- 1 file changed, 68 deletions(-) delete mode 100644 org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala diff --git a/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala b/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala deleted file mode 100644 index 26856d9a5..000000000 --- a/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, RuntimeReplaceable, SubqueryExpression, Unevaluable} -import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.TimestampType - -sealed trait TimeTravelSpec - -case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec -case class AsOfVersion(version: String) extends TimeTravelSpec - -object TimeTravelSpec { - def create( - timestamp: Option[Expression], - version: Option[String], - conf: SQLConf) : Option[TimeTravelSpec] = { - if (timestamp.nonEmpty && version.nonEmpty) { - throw QueryCompilationErrors.invalidTimeTravelSpecError() - } else if (timestamp.nonEmpty) { - val ts = timestamp.get - assert(ts.resolved && ts.references.isEmpty && !SubqueryExpression.hasSubquery(ts)) - if (!Cast.canAnsiCast(ts.dataType, TimestampType)) { - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) - } - val tsToEval = ts.transform { - case r: RuntimeReplaceable => r.replacement - case _: Unevaluable => - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.UNEVALUABLE", ts) - case e if !e.deterministic => - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.NON_DETERMINISTIC", ts) - } - val tz = Some(conf.sessionLocalTimeZone) - // Set `ansiEnabled` to false, so that it can return null for invalid input and we can provide - // better error message. - val value = Cast(tsToEval, TimestampType, tz, ansiEnabled = false).eval() - if (value == null) { - throw QueryCompilationErrors.invalidTimestampExprForTimeTravel( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.INPUT", ts) - } - Some(AsOfTimestamp(value.asInstanceOf[Long])) - } else if (version.nonEmpty) { - Some(AsOfVersion(version.get)) - } else { - None - } - } -} From d51df7475274a874310df92df430af42cd57a11b Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Feb 2026 21:05:40 +0800 Subject: [PATCH 11/13] fmt --- .../java/org/lance/spark/LanceCatalog.java | 33 ++++++++++--------- .../write/BaseSparkConnectorWriteTest.java | 6 +++- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java index 9c17fa110..513e708da 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java @@ -32,7 +32,8 @@ import java.util.Map; -import static org.lance.spark.utils.Utils.*; +import static org.lance.spark.utils.Utils.createReadOptions; +import static org.lance.spark.utils.Utils.getSchema; /** * A simple Lance catalog that supports both path-based and catalog-based table access. @@ -161,8 +162,8 @@ private String getDatasetUri(Identifier ident) { } private Table loadTableInternal( - Identifier ident, Optional timestamp, Optional version) - throws NoSuchTableException { + Identifier ident, Optional timestamp, Optional version) + throws NoSuchTableException { String datasetUri = getDatasetUri(ident); Optional versionId = Optional.empty(); @@ -171,18 +172,18 @@ private Table loadTableInternal( versionId = Optional.of(Utils.parseVersion(version.get())); } else if (timestamp.isPresent()) { try (Dataset dataset = - Dataset.open() - .allocator(LanceRuntime.allocator()) - .uri(datasetUri) - .readOptions( - createReadOptions( - datasetUri, - catalogConfig, - Optional.empty(), - Optional.empty(), - Optional.empty()) - .toReadOptions()) - .build()) { + Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(datasetUri) + .readOptions( + createReadOptions( + datasetUri, + catalogConfig, + Optional.empty(), + Optional.empty(), + Optional.empty()) + .toReadOptions()) + .build()) { versionId = Optional.of(Utils.findVersion(dataset.listVersions(), timestamp.get())); } catch (IllegalArgumentException e) { throw new NoSuchTableException(ident); @@ -190,7 +191,7 @@ private Table loadTableInternal( } LanceSparkReadOptions readOptions = - createReadOptions(datasetUri, catalogConfig, versionId, Optional.empty(), Optional.empty()); + createReadOptions(datasetUri, catalogConfig, versionId, Optional.empty(), Optional.empty()); StructType schema = getSchema(ident, datasetUri, readOptions, null); return new LanceDataset(readOptions, schema, null, null, null); diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java index a396d38da..b7b81221b 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java @@ -19,7 +19,11 @@ import org.lance.spark.LanceSparkReadOptions; import org.lance.spark.TestUtils; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.types.DataTypes; From 11a979bb6f1bc1d2367b3367601de3a3c9d0e9a0 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Feb 2026 21:45:57 +0800 Subject: [PATCH 12/13] fix ut --- .../java/org/lance/spark/SparkLanceNamespaceTestBase.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java index 97f121e1e..3f858c119 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java @@ -128,10 +128,15 @@ public void testTimeTravelTimestampAsOf() throws Exception { spark.sql("INSERT INTO " + fullName + " VALUES (1, 'v1')"); assertTrue(checkDataset(1, fullName)); + + Thread.sleep(1000); spark.sql("INSERT INTO " + fullName + " VALUES (2, 'v2')"); assertTrue(checkDataset(2, fullName)); Version version = getLatestVersion(tableName); + spark.sql("INSERT INTO " + fullName + " VALUES (3, 'v3')"); + assertTrue(checkDataset(3, fullName)); + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); String date = version.getDataTime().format(format); From be2737d240fb5a3d70aff2f8e886a510ade96f74 Mon Sep 17 00:00:00 2001 From: YueZhang Date: Thu, 5 Feb 2026 22:00:02 +0800 Subject: [PATCH 13/13] fix ut --- .../java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index 629f2e66e..789589517 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -425,7 +425,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep @Override public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - return loadTableInternal(ident, Optional.of(timestamp), null); + return loadTableInternal(ident, Optional.of(timestamp), Optional.empty()); } @Override