diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java index b72fba530..789589517 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/BaseLanceNamespaceSparkCatalog.java @@ -26,6 +26,7 @@ import org.lance.spark.function.LanceFragmentIdWithDefaultFunction; import org.lance.spark.utils.Optional; import org.lance.spark.utils.SchemaConverter; +import org.lance.spark.utils.Utils; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; @@ -57,6 +58,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.lance.spark.utils.Utils.createReadOptions; +import static org.lance.spark.utils.Utils.getSchema; + public abstract class BaseLanceNamespaceSparkCatalog implements TableCatalog, SupportsNamespaces, FunctionCatalog { @@ -411,60 +415,17 @@ public boolean tableExists(Identifier ident) { @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - // Transform identifier for API call - Identifier actualIdent = transformIdentifierForApi(ident); - - // Build the table ID for credential vending - List tableId = buildTableId(actualIdent); - - // Call describeTable to get location and initial storage options - DescribeTableRequest describeRequest = new DescribeTableRequest(); - tableId.forEach(describeRequest::addIdItem); - DescribeTableResponse describeResponse; - try { - describeResponse = namespace.describeTable(describeRequest); - } catch (TableNotFoundException e) { - throw new NoSuchTableException(ident); - } catch (RuntimeException e) { - throw new RuntimeException("Failed to describe table: " + ident, e); - } - - String location = describeResponse.getLocation(); - Map initialStorageOptions = describeResponse.getStorageOptions(); - - // Open dataset to get schema - StructType schema; - try (Dataset dataset = - Dataset.open() - .allocator(LanceRuntime.allocator()) - .namespace(namespace) - .tableId(tableId) - .build()) { - schema = LanceArrowUtils.fromArrowSchema(dataset.getSchema()); - } catch (TableNotFoundException e) { - throw new NoSuchTableException(ident); - } + return loadTableInternal(ident, Optional.empty(), Optional.empty()); + } - // Create read options with namespace support - LanceSparkReadOptions readOptions = createReadOptions(location, tableId); - return createDataset( - readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + return loadTableInternal(ident, Optional.empty(), Optional.of(version)); } - /** - * Creates LanceSparkReadOptions with namespace settings for this catalog. - * - * @param location the dataset location URI - * @param tableId the table identifier within the namespace - * @return a new LanceSparkReadOptions with all catalog settings - */ - private LanceSparkReadOptions createReadOptions(String location, List tableId) { - return LanceSparkReadOptions.builder() - .datasetUri(location) - .withCatalogDefaults(catalogConfig) - .namespace(namespace) - .tableId(tableId) - .build(); + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + return loadTableInternal(ident, Optional.of(timestamp), Optional.empty()); } @Override @@ -500,7 +461,13 @@ public Table createTable( Map initialStorageOptions = describeResponse.getStorageOptions(); // Create read options with namespace settings - LanceSparkReadOptions readOptions = createReadOptions(location, tableIdList); + LanceSparkReadOptions readOptions = + createReadOptions( + location, + catalogConfig, + Optional.empty(), + Optional.of(namespace), + Optional.of(tableIdList)); return createDataset( readOptions, processedSchema, initialStorageOptions, namespaceImpl, namespaceProperties); } @@ -651,6 +618,63 @@ private List buildTableId(Identifier ident) { .collect(Collectors.toList()); } + private Table loadTableInternal( + Identifier ident, Optional timestamp, Optional version) + throws NoSuchTableException { + + // Transform identifier for API call + Identifier actualIdent = transformIdentifierForApi(ident); + + // Build the table ID for credential vending + List tableId = buildTableId(actualIdent); + + // Call describeTable to get location and initial storage options + DescribeTableRequest describeRequest = new DescribeTableRequest(); + tableId.forEach(describeRequest::addIdItem); + DescribeTableResponse describeResponse; + try { + describeResponse = namespace.describeTable(describeRequest); + } catch (TableNotFoundException e) { + throw new NoSuchTableException(ident); + } catch (RuntimeException e) { + throw new RuntimeException("Failed to describe table: " + ident, e); + } + String location = describeResponse.getLocation(); + Map initialStorageOptions = describeResponse.getStorageOptions(); + + Optional versionId = Optional.empty(); + if (timestamp.isPresent()) { + try (Dataset dataset = + Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(location) + .readOptions( + createReadOptions( + location, + catalogConfig, + Optional.empty(), + Optional.of(namespace), + Optional.of(tableId)) + .toReadOptions()) + .build()) { + versionId = Optional.of(Utils.findVersion(dataset.listVersions(), timestamp.get())); + } catch (TableNotFoundException e) { + throw new NoSuchTableException(ident); + } + } else if (version.isPresent()) { + versionId = Optional.of(Utils.parseVersion(version.get())); + } + + LanceSparkReadOptions readOptions = + createReadOptions( + location, catalogConfig, versionId, Optional.of(namespace), Optional.of(tableId)); + StructType schema = getSchema(ident, location, readOptions, namespace); + + // Create read options with namespace support + return createDataset( + readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties); + } + public abstract LanceDataset createDataset( LanceSparkReadOptions readOptions, StructType sparkSchema, diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java index afe9c102d..513e708da 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceCatalog.java @@ -15,6 +15,8 @@ import org.lance.Dataset; import org.lance.WriteParams; +import org.lance.spark.utils.Optional; +import org.lance.spark.utils.Utils; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -30,6 +32,9 @@ import java.util.Map; +import static org.lance.spark.utils.Utils.createReadOptions; +import static org.lance.spark.utils.Utils.getSchema; + /** * A simple Lance catalog that supports both path-based and catalog-based table access. * @@ -52,20 +57,17 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - String datasetUri = getDatasetUri(ident); - LanceSparkReadOptions readOptions = createReadOptions(datasetUri); - StructType schema; - try (Dataset dataset = - Dataset.open() - .allocator(LanceRuntime.allocator()) - .uri(datasetUri) - .readOptions(readOptions.toReadOptions()) - .build()) { - schema = LanceArrowUtils.fromArrowSchema(dataset.getSchema()); - } catch (IllegalArgumentException e) { - throw new NoSuchTableException(ident); - } - return new LanceDataset(readOptions, schema, null, null, null); + return loadTableInternal(ident, Optional.empty(), Optional.empty()); + } + + @Override + public Table loadTable(Identifier ident, String version) throws NoSuchTableException { + return loadTableInternal(ident, Optional.empty(), Optional.of(version)); + } + + @Override + public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException { + return loadTableInternal(ident, Optional.of(timestamp), Optional.empty()); } @Override @@ -73,7 +75,9 @@ public Table createTable( Identifier ident, StructType schema, Transform[] partitions, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { String datasetUri = getDatasetUri(ident); - LanceSparkReadOptions readOptions = createReadOptions(datasetUri); + LanceSparkReadOptions readOptions = + createReadOptions( + datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty()); try { Dataset.write() .allocator(LanceRuntime.allocator()) @@ -115,19 +119,6 @@ public void initialize(String name, CaseInsensitiveStringMap options) { this.catalogConfig = LanceSparkCatalogConfig.from(options.asCaseSensitiveMap()); } - /** - * Creates LanceSparkReadOptions for this catalog. - * - * @param datasetUri the dataset URI - * @return a new LanceSparkReadOptions with catalog settings - */ - private LanceSparkReadOptions createReadOptions(String datasetUri) { - return LanceSparkReadOptions.builder() - .datasetUri(datasetUri) - .withCatalogDefaults(catalogConfig) - .build(); - } - @Override public String name() { return catalogName; @@ -169,4 +160,40 @@ private String getDatasetUri(Identifier ident) { sb.append(name); return sb.toString(); } + + private Table loadTableInternal( + Identifier ident, Optional timestamp, Optional version) + throws NoSuchTableException { + String datasetUri = getDatasetUri(ident); + + Optional versionId = Optional.empty(); + + if (version.isPresent()) { + versionId = Optional.of(Utils.parseVersion(version.get())); + } else if (timestamp.isPresent()) { + try (Dataset dataset = + Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(datasetUri) + .readOptions( + createReadOptions( + datasetUri, + catalogConfig, + Optional.empty(), + Optional.empty(), + Optional.empty()) + .toReadOptions()) + .build()) { + versionId = Optional.of(Utils.findVersion(dataset.listVersions(), timestamp.get())); + } catch (IllegalArgumentException e) { + throw new NoSuchTableException(ident); + } + } + + LanceSparkReadOptions readOptions = + createReadOptions(datasetUri, catalogConfig, versionId, Optional.empty(), Optional.empty()); + StructType schema = getSchema(ident, datasetUri, readOptions, null); + + return new LanceDataset(readOptions, schema, null, null, null); + } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java index 17dbed211..f3cb1ad70 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java @@ -96,6 +96,7 @@ private Dataset openDataset(LanceSparkReadOptions readOptions) { .allocator(allocator) .namespace(readOptions.getNamespace()) .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) .build(); } else { return Dataset.open() diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index b35292ba8..72ad29804 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -103,6 +103,7 @@ private Dataset getOrOpenDataset() { .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace()) .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) .build(); } else { lazyDataset = diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java index 8ec4461ea..0bcd06796 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceSplit.java @@ -51,6 +51,7 @@ private static Dataset openDataset(LanceSparkReadOptions readOptions) { .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace()) .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) .build(); } else { return Dataset.open() diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java new file mode 100644 index 000000000..27781016a --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/Utils.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.utils; + +import org.lance.Dataset; +import org.lance.OpenDatasetBuilder; +import org.lance.Version; +import org.lance.namespace.LanceNamespace; +import org.lance.spark.LanceRuntime; +import org.lance.spark.LanceSparkCatalogConfig; +import org.lance.spark.LanceSparkReadOptions; + +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.LanceArrowUtils; + +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.List; + +public class Utils { + + public static long parseVersion(String version) { + return Long.parseUnsignedLong(version); + } + + public static long findVersion(List versions, long timestamp) { + long versionID = -1; + Instant instant = instantFromTimestamp(timestamp); + for (Version version : versions) { + ZonedDateTime dataTime = version.getDataTime(); + if (dataTime.toInstant().compareTo(instant) < 0) { + versionID = version.getId(); + } else if (dataTime.toInstant().equals(instant)) { + return version.getId(); + } else { + break; + } + } + if (versionID == -1) { + throw new IllegalArgumentException("No version found with timestamp: " + timestamp); + } + return versionID; + } + + public static StructType getSchema( + Identifier ident, + String datasetUri, + LanceSparkReadOptions readOptions, + LanceNamespace namespace) + throws NoSuchTableException { + Dataset dataset = null; + try { + OpenDatasetBuilder builder = + Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(datasetUri) + .readOptions(readOptions.toReadOptions()); + if (namespace != null) { + builder.namespace(namespace); + } + dataset = builder.build(); + return LanceArrowUtils.fromArrowSchema(dataset.getSchema()); + } catch (IllegalArgumentException e) { + throw new NoSuchTableException(ident); + } finally { + if (dataset != null) { + dataset.close(); + } + } + } + + /** + * Creates LanceSparkReadOptions for this catalog. + * + * @param location the dataset URI + * @param versionId optional dataset version id + * @return a new LanceSparkReadOptions with catalog settings + */ + public static LanceSparkReadOptions createReadOptions( + String location, + LanceSparkCatalogConfig catalogConfig, + Optional versionId, + Optional namespace, + Optional> tableId) { + LanceSparkReadOptions.Builder builder = + LanceSparkReadOptions.builder().datasetUri(location).withCatalogDefaults(catalogConfig); + + if (versionId.isPresent()) { + builder.version(versionId.get().intValue()); + } + if (tableId.isPresent()) { + builder.tableId(tableId.get()); + } + if (namespace.isPresent()) { + builder.namespace(namespace.get()); + } + + return builder.build(); + } + + // Determine if the timestamp is in microseconds or nanoseconds and convert to Instant + private static Instant instantFromTimestamp(long timestamp) { + if (timestamp <= 0) { + throw new IllegalArgumentException("Timestamp must be greater than zero"); + } + return instantFromEpochMicros(timestamp); + } + + private static Instant instantFromEpochMicros(long epochMicros) { + long sec = Math.floorDiv(epochMicros, 1_000_000L); + long nanoAdj = Math.floorMod(epochMicros, 1_000_000L) * 1_000L; + return Instant.ofEpochSecond(sec, nanoAdj); + } +} diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala index 0a1e86c1b..1d3cf7bb7 100755 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala @@ -175,6 +175,7 @@ case class AddIndexExec( Dataset.open() .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace) + .readOptions(readOptions.toReadOptions) .tableId(readOptions.getTableId) .build() } else { diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala index c643a348a..a223d7bfa 100644 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/VacuumExec.scala @@ -74,6 +74,7 @@ case class VacuumExec( Dataset.open() .allocator(LanceRuntime.allocator()) .namespace(readOptions.getNamespace) + .readOptions(readOptions.toReadOptions()) .tableId(readOptions.getTableId) .build() } else { diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java index 4f8575c48..3f858c119 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/SparkLanceNamespaceTestBase.java @@ -13,9 +13,12 @@ */ package org.lance.spark; +import org.lance.Version; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -24,6 +27,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +55,7 @@ void setup() throws IOException { .config( "spark.sql.catalog." + catalogName, "org.lance.spark.LanceNamespaceSparkCatalog") .config("spark.sql.catalog." + catalogName + ".impl", getNsImpl()) + .config("spark.sql.session.timeZone", "UTC") .getOrCreate(); Map additionalConfigs = getAdditionalNsConfigs(); @@ -94,6 +99,54 @@ protected String generateTableName(String baseName) { return baseName + "_" + UUID.randomUUID().toString().replace("-", ""); } + @Test + public void testTimeTravelVersionAsOf() throws Exception { + String tableName = generateTableName("time_travel_version"); + String fullName = catalogName + ".default." + tableName; + + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + assertTrue(checkDataset(0, fullName)); + + spark.sql("INSERT INTO " + fullName + " VALUES (1, 'v1')"); + assertTrue(checkDataset(1, fullName)); + spark.sql("INSERT INTO " + fullName + " VALUES (2, 'v2')"); + assertTrue(checkDataset(2, fullName)); + + // time travel to version 2 (the second insert) + Dataset actual = spark.sql("SELECT * FROM " + fullName + " VERSION AS OF " + "2"); + List res = actual.collectAsList(); + assertEquals(1, res.size()); + } + + @Test + public void testTimeTravelTimestampAsOf() throws Exception { + String tableName = generateTableName("time_travel_version"); + String fullName = catalogName + ".default." + tableName; + + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + assertTrue(checkDataset(0, fullName)); + + spark.sql("INSERT INTO " + fullName + " VALUES (1, 'v1')"); + assertTrue(checkDataset(1, fullName)); + + Thread.sleep(1000); + spark.sql("INSERT INTO " + fullName + " VALUES (2, 'v2')"); + assertTrue(checkDataset(2, fullName)); + + Version version = getLatestVersion(tableName); + spark.sql("INSERT INTO " + fullName + " VALUES (3, 'v3')"); + assertTrue(checkDataset(3, fullName)); + + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String date = version.getDataTime().format(format); + + // time travel to timestamp before second insert + Dataset actual = + spark.sql("SELECT * FROM " + fullName + " TIMESTAMP AS OF '" + date + "'"); + List res = actual.collectAsList(); + assertEquals(1, res.size()); + } + @Test public void testCreateAndDescribeTable() throws Exception { String tableName = generateTableName("test_table"); @@ -570,4 +623,36 @@ public void testOnePartIdentifier() throws Exception { assertEquals(42L, row.getLong(0)); assertEquals(3.14, row.getDouble(1), 0.001); } + + private boolean checkDataset(int expectedSize, String tableName) { + Dataset actual = spark.sql("SELECT * FROM " + tableName); + List res = actual.collectAsList(); + + return expectedSize == res.size(); + } + + private Version getLatestVersion(String tableName) throws Exception { + Identifier ident = Identifier.of(new String[] {"default"}, tableName); + LanceDataset lanceTable = (LanceDataset) catalog.loadTable(ident); + LanceSparkReadOptions readOptions = lanceTable.readOptions(); + try (org.lance.Dataset dataset = openLatestDataset(readOptions)) { + return dataset.getVersion(); + } + } + + private org.lance.Dataset openLatestDataset(LanceSparkReadOptions readOptions) { + if (readOptions.hasNamespace()) { + return org.lance.Dataset.open() + .allocator(LanceRuntime.allocator()) + .namespace(readOptions.getNamespace()) + .tableId(readOptions.getTableId()) + .readOptions(readOptions.toReadOptions()) + .build(); + } + return org.lance.Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(readOptions.getDatasetUri()) + .readOptions(readOptions.toReadOptions()) + .build(); + } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java index 699a82942..b7b81221b 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/BaseSparkConnectorWriteTest.java @@ -13,13 +13,16 @@ */ package org.lance.spark.write; +import org.lance.Version; import org.lance.spark.LanceDataSource; +import org.lance.spark.LanceRuntime; import org.lance.spark.LanceSparkReadOptions; import org.lance.spark.TestUtils; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; @@ -34,6 +37,7 @@ import java.io.File; import java.nio.file.Path; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; @@ -56,6 +60,7 @@ static void setup() { .master("local") .config("spark.sql.catalog.lance", "org.lance.spark.LanceCatalog") .config("spark.sql.catalog.lance.max_row_per_file", "1") + .config("spark.sql.session.timeZone", "UTC") .getOrCreate(); StructType schema = new StructType( @@ -315,4 +320,66 @@ public void writeWithInvalidBatchSizeFails(TestInfo testInfo) { "Expected batch_size validation error, got: " + e.getMessage()); } } + + @Test + public void testTimeTravel(TestInfo testInfo) throws InterruptedException { + String tableName = testInfo.getTestMethod().get().getName(); + String outputPath = TestUtils.getDatasetUri(dbPath.toString(), tableName); + + StructType schema = + new StructType().add("id", DataTypes.LongType).add("value", DataTypes.LongType); + + List data1 = List.of(RowFactory.create(1L, 100L)); + Dataset df1 = spark.createDataFrame(data1, schema); + df1.write().format("lance").option(LanceSparkReadOptions.CONFIG_DATASET_URI, outputPath).save(); + assertTrue(checkDataset(1, outputPath)); + Thread.sleep(1000); + + List data2 = List.of(RowFactory.create(2L, 200L)); + Dataset df2 = spark.createDataFrame(data2, schema); + df2.write().format("lance").mode(SaveMode.Append).save(outputPath); + assertTrue(checkDataset(2, outputPath)); + + Version version_3 = getLatestVersion(outputPath); + + List data3 = List.of(RowFactory.create(3L, 300L)); + Dataset df3 = spark.createDataFrame(data3, schema); + df3.write().format("lance").mode(SaveMode.Append).save(outputPath); + assertTrue(checkDataset(3, outputPath)); + + // check timestamp as of + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + String date = version_3.getDataTime().format(format); + String sql = String.format("select * from lance.`%s` TIMESTAMP AS OF '%s'", outputPath, date); + + List res = spark.sql(sql).collectAsList(); + assertEquals(1, res.size()); + + // check version as of + List res2 = + spark.sql("select * from lance.`" + outputPath + "` VERSION AS OF " + 2).collectAsList(); + assertEquals(1, res2.size()); + } + + private boolean checkDataset(int expectedSize, String path) { + Dataset lanceTable = + spark + .read() + .format(LanceDataSource.name) + .option(LanceSparkReadOptions.CONFIG_DATASET_URI, path) + .load(); + + lanceTable.createOrReplaceTempView("table_a"); + Dataset actual = spark.sql("SELECT * FROM table_a"); + List res = actual.collectAsList(); + + return expectedSize == res.size(); + } + + private Version getLatestVersion(String datasetUri) { + try (org.lance.Dataset dataset = + org.lance.Dataset.open().allocator(LanceRuntime.allocator()).uri(datasetUri).build()) { + return dataset.getVersion(); + } + } }