Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.lance.spark.function.LanceFragmentIdWithDefaultFunction;
import org.lance.spark.utils.Optional;
import org.lance.spark.utils.SchemaConverter;
import org.lance.spark.utils.Utils;

import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
Expand Down Expand Up @@ -57,6 +58,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.lance.spark.utils.Utils.createReadOptions;
import static org.lance.spark.utils.Utils.getSchema;

public abstract class BaseLanceNamespaceSparkCatalog
implements TableCatalog, SupportsNamespaces, FunctionCatalog {

Expand Down Expand Up @@ -411,60 +415,17 @@ public boolean tableExists(Identifier ident) {

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
// Transform identifier for API call
Identifier actualIdent = transformIdentifierForApi(ident);

// Build the table ID for credential vending
List<String> tableId = buildTableId(actualIdent);

// Call describeTable to get location and initial storage options
DescribeTableRequest describeRequest = new DescribeTableRequest();
tableId.forEach(describeRequest::addIdItem);
DescribeTableResponse describeResponse;
try {
describeResponse = namespace.describeTable(describeRequest);
} catch (TableNotFoundException e) {
throw new NoSuchTableException(ident);
} catch (RuntimeException e) {
throw new RuntimeException("Failed to describe table: " + ident, e);
}

String location = describeResponse.getLocation();
Map<String, String> initialStorageOptions = describeResponse.getStorageOptions();

// Open dataset to get schema
StructType schema;
try (Dataset dataset =
Dataset.open()
.allocator(LanceRuntime.allocator())
.namespace(namespace)
.tableId(tableId)
.build()) {
schema = LanceArrowUtils.fromArrowSchema(dataset.getSchema());
} catch (TableNotFoundException e) {
throw new NoSuchTableException(ident);
}
return loadTableInternal(ident, Optional.empty(), Optional.empty());
}

// Create read options with namespace support
LanceSparkReadOptions readOptions = createReadOptions(location, tableId);
return createDataset(
readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties);
@Override
public Table loadTable(Identifier ident, String version) throws NoSuchTableException {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like all of the loadTable instances are basically duplicated. Would it make sense to consolidate some of the code? maybe provide a "version" either from string parsing or timestamp lookup as an argument and if it doesn't exist than use latest?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return loadTableInternal(ident, Optional.empty(), Optional.of(version));
}

/**
* Creates LanceSparkReadOptions with namespace settings for this catalog.
*
* @param location the dataset location URI
* @param tableId the table identifier within the namespace
* @return a new LanceSparkReadOptions with all catalog settings
*/
private LanceSparkReadOptions createReadOptions(String location, List<String> tableId) {
return LanceSparkReadOptions.builder()
.datasetUri(location)
.withCatalogDefaults(catalogConfig)
.namespace(namespace)
.tableId(tableId)
.build();
@Override
public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
return loadTableInternal(ident, Optional.of(timestamp), Optional.empty());
}

@Override
Expand Down Expand Up @@ -500,7 +461,13 @@ public Table createTable(
Map<String, String> initialStorageOptions = describeResponse.getStorageOptions();

// Create read options with namespace settings
LanceSparkReadOptions readOptions = createReadOptions(location, tableIdList);
LanceSparkReadOptions readOptions =
createReadOptions(
location,
catalogConfig,
Optional.empty(),
Optional.of(namespace),
Optional.of(tableIdList));
return createDataset(
readOptions, processedSchema, initialStorageOptions, namespaceImpl, namespaceProperties);
}
Expand Down Expand Up @@ -651,6 +618,63 @@ private List<String> buildTableId(Identifier ident) {
.collect(Collectors.toList());
}

private Table loadTableInternal(
Identifier ident, Optional<Long> timestamp, Optional<String> version)
throws NoSuchTableException {

// Transform identifier for API call
Identifier actualIdent = transformIdentifierForApi(ident);

// Build the table ID for credential vending
List<String> tableId = buildTableId(actualIdent);

// Call describeTable to get location and initial storage options
DescribeTableRequest describeRequest = new DescribeTableRequest();
tableId.forEach(describeRequest::addIdItem);
DescribeTableResponse describeResponse;
try {
describeResponse = namespace.describeTable(describeRequest);
} catch (TableNotFoundException e) {
throw new NoSuchTableException(ident);
} catch (RuntimeException e) {
throw new RuntimeException("Failed to describe table: " + ident, e);
}
String location = describeResponse.getLocation();
Map<String, String> initialStorageOptions = describeResponse.getStorageOptions();

Optional<Long> versionId = Optional.empty();
if (timestamp.isPresent()) {
try (Dataset dataset =
Dataset.open()
.allocator(LanceRuntime.allocator())
.uri(location)
.readOptions(
createReadOptions(
location,
catalogConfig,
Optional.empty(),
Optional.of(namespace),
Optional.of(tableId))
.toReadOptions())
.build()) {
versionId = Optional.of(Utils.findVersion(dataset.listVersions(), timestamp.get()));
} catch (TableNotFoundException e) {
throw new NoSuchTableException(ident);
}
} else if (version.isPresent()) {
versionId = Optional.of(Utils.parseVersion(version.get()));
}

LanceSparkReadOptions readOptions =
createReadOptions(
location, catalogConfig, versionId, Optional.of(namespace), Optional.of(tableId));
StructType schema = getSchema(ident, location, readOptions, namespace);

// Create read options with namespace support
return createDataset(
readOptions, schema, initialStorageOptions, namespaceImpl, namespaceProperties);
}

public abstract LanceDataset createDataset(
LanceSparkReadOptions readOptions,
StructType sparkSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import org.lance.Dataset;
import org.lance.WriteParams;
import org.lance.spark.utils.Optional;
import org.lance.spark.utils.Utils;

import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
Expand All @@ -30,6 +32,9 @@

import java.util.Map;

import static org.lance.spark.utils.Utils.createReadOptions;
import static org.lance.spark.utils.Utils.getSchema;

/**
* A simple Lance catalog that supports both path-based and catalog-based table access.
*
Expand All @@ -52,28 +57,27 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
String datasetUri = getDatasetUri(ident);
LanceSparkReadOptions readOptions = createReadOptions(datasetUri);
StructType schema;
try (Dataset dataset =
Dataset.open()
.allocator(LanceRuntime.allocator())
.uri(datasetUri)
.readOptions(readOptions.toReadOptions())
.build()) {
schema = LanceArrowUtils.fromArrowSchema(dataset.getSchema());
} catch (IllegalArgumentException e) {
throw new NoSuchTableException(ident);
}
return new LanceDataset(readOptions, schema, null, null, null);
return loadTableInternal(ident, Optional.empty(), Optional.empty());
}

@Override
public Table loadTable(Identifier ident, String version) throws NoSuchTableException {
return loadTableInternal(ident, Optional.empty(), Optional.of(version));
}

@Override
public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
return loadTableInternal(ident, Optional.of(timestamp), Optional.empty());
}

@Override
public Table createTable(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String datasetUri = getDatasetUri(ident);
LanceSparkReadOptions readOptions = createReadOptions(datasetUri);
LanceSparkReadOptions readOptions =
createReadOptions(
datasetUri, catalogConfig, Optional.empty(), Optional.empty(), Optional.empty());
try {
Dataset.write()
.allocator(LanceRuntime.allocator())
Expand Down Expand Up @@ -115,19 +119,6 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogConfig = LanceSparkCatalogConfig.from(options.asCaseSensitiveMap());
}

/**
* Creates LanceSparkReadOptions for this catalog.
*
* @param datasetUri the dataset URI
* @return a new LanceSparkReadOptions with catalog settings
*/
private LanceSparkReadOptions createReadOptions(String datasetUri) {
return LanceSparkReadOptions.builder()
.datasetUri(datasetUri)
.withCatalogDefaults(catalogConfig)
.build();
}

@Override
public String name() {
return catalogName;
Expand Down Expand Up @@ -169,4 +160,40 @@ private String getDatasetUri(Identifier ident) {
sb.append(name);
return sb.toString();
}

private Table loadTableInternal(
Identifier ident, Optional<Long> timestamp, Optional<String> version)
throws NoSuchTableException {
String datasetUri = getDatasetUri(ident);

Optional<Long> versionId = Optional.empty();

if (version.isPresent()) {
versionId = Optional.of(Utils.parseVersion(version.get()));
} else if (timestamp.isPresent()) {
try (Dataset dataset =
Dataset.open()
.allocator(LanceRuntime.allocator())
.uri(datasetUri)
.readOptions(
createReadOptions(
datasetUri,
catalogConfig,
Optional.empty(),
Optional.empty(),
Optional.empty())
.toReadOptions())
.build()) {
versionId = Optional.of(Utils.findVersion(dataset.listVersions(), timestamp.get()));
} catch (IllegalArgumentException e) {
throw new NoSuchTableException(ident);
}
}

LanceSparkReadOptions readOptions =
createReadOptions(datasetUri, catalogConfig, versionId, Optional.empty(), Optional.empty());
StructType schema = getSchema(ident, datasetUri, readOptions, null);

return new LanceDataset(readOptions, schema, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private Dataset openDataset(LanceSparkReadOptions readOptions) {
.allocator(allocator)
.namespace(readOptions.getNamespace())
.tableId(readOptions.getTableId())
.readOptions(readOptions.toReadOptions())
.build();
} else {
return Dataset.open()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private Dataset getOrOpenDataset() {
.allocator(LanceRuntime.allocator())
.namespace(readOptions.getNamespace())
.tableId(readOptions.getTableId())
.readOptions(readOptions.toReadOptions())
.build();
} else {
lazyDataset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private static Dataset openDataset(LanceSparkReadOptions readOptions) {
.allocator(LanceRuntime.allocator())
.namespace(readOptions.getNamespace())
.tableId(readOptions.getTableId())
.readOptions(readOptions.toReadOptions())
.build();
} else {
return Dataset.open()
Expand Down
Loading