-
Notifications
You must be signed in to change notification settings - Fork 76
[Feature][DataLoader] Add catalog for loading tables #454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
0200aba
Add OpenHouseTableCatalog with TableCatalog protocol for DataLoader
robreeves 213a7d3
Add required catalog parameter to OpenHouseDataLoader
robreeves 46f52da
Inherit OpenHouseTableCatalog from PyIceberg Catalog
robreeves 5316373
Rename OpenHouseTableCatalog to OpenHouseCatalog
robreeves 9d25fba
Rename table_catalog.py to catalog.py
robreeves 2850c4d
Add OpenHouseCatalogError and improve error handling in catalog
robreeves 92d13c4
Add logging to OpenHouseCatalog matching Java catalog pattern
robreeves f39feaa
Verify all Table properties in load_table test
robreeves 19c7215
Improve test coverage and consistency in catalog tests
robreeves c3b69dd
Refactor catalog tests to use responses library and improve clarity
robreeves 675f414
Handle malformed JSON responses in OpenHouseCatalog.load_table
robreeves 8791ed9
Make identifier parsing private and return TableIdentifier
robreeves 91e5315
Add context manager support, narrow exception handling, and truncate …
robreeves 1584bbf
Include truncation details inline in error messages
robreeves 62ea9f6
Standardize error types: OSError for server errors, let JSON/file err…
robreeves 02da1cc
Remove response truncation logic from error messages
robreeves dfa652d
Use pluggable FileIO via _load_file_io instead of hardcoded PyArrowFi…
robreeves 0e31f4a
Use public load_file_io instead of private _load_file_io method
robreeves a56fcaa
Replace _parse_identifier with base class identifier_to_database_and_…
robreeves 5682b81
Replace **properties with explicit constructor params and add timeout
robreeves 286d211
Raise NoSuchTableError for 404 responses
robreeves 156a794
Remove manual integration test script
robreeves 6e9c4dd
Rename trust_store to ssl_ca_cert
robreeves 443e910
Add optional properties param for PyIceberg base class config
robreeves ca6637c
Add catalog integration test to CI
robreeves 9f21338
Add integration test instructions to dataloader README
robreeves 1822393
Add unit test verifying DEFAULT_SCHEME resolves schemeless paths to hdfs
robreeves File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
10 changes: 9 additions & 1 deletion
10
integrations/python/dataloader/src/openhouse/dataloader/__init__.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,15 @@ | ||
| from importlib.metadata import version | ||
|
|
||
| from openhouse.dataloader.catalog import OpenHouseCatalog, OpenHouseCatalogError | ||
| from openhouse.dataloader.data_loader import DataLoaderContext, OpenHouseDataLoader | ||
| from openhouse.dataloader.filters import always_true, col | ||
|
|
||
| __version__ = version("openhouse.dataloader") | ||
| __all__ = ["OpenHouseDataLoader", "DataLoaderContext", "always_true", "col"] | ||
| __all__ = [ | ||
| "OpenHouseDataLoader", | ||
| "DataLoaderContext", | ||
| "OpenHouseCatalog", | ||
| "OpenHouseCatalogError", | ||
| "always_true", | ||
| "col", | ||
| ] |
155 changes: 155 additions & 0 deletions
155
integrations/python/dataloader/src/openhouse/dataloader/catalog.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| import requests | ||
| from pyiceberg.catalog import Catalog | ||
| from pyiceberg.exceptions import NoSuchTableError | ||
| from pyiceberg.io import load_file_io | ||
| from pyiceberg.serializers import FromInputFile | ||
| from pyiceberg.table import Table | ||
| from pyiceberg.typedef import Identifier | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _TABLE_LOCATION = "tableLocation" | ||
|
|
||
|
|
||
| class OpenHouseCatalogError(Exception): | ||
| """Error raised when the OpenHouse catalog fails to load a table.""" | ||
|
|
||
|
|
||
| class OpenHouseCatalog(Catalog): | ||
| """Client-side catalog implementation for Iceberg tables in OpenHouse. | ||
|
|
||
| Leverages the OpenHouse Tables Service REST API to load table metadata. | ||
|
|
||
| Args: | ||
| name: Catalog name | ||
| uri: OpenHouse Tables Service base URL | ||
| auth_token: JWT Bearer token for authentication | ||
| ssl_ca_cert: Path to CA cert bundle for SSL verification | ||
| timeout_seconds: HTTP request timeout in seconds | ||
| properties: Additional properties forwarded to the PyIceberg Catalog base class | ||
| (e.g., ``py-io-impl`` for custom FileIO) | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| name: str, | ||
| uri: str, | ||
| auth_token: str | None = None, | ||
| ssl_ca_cert: str | None = None, | ||
| timeout_seconds: float = 30, | ||
| properties: dict[str, str] | None = None, | ||
| ): | ||
| super().__init__(name, uri=uri, **(properties or {})) | ||
| self._uri = uri.rstrip("/") | ||
| self._timeout = timeout_seconds | ||
| logger.info("Initializing OpenHouseCatalog for service at %s", self._uri) | ||
| self._session = requests.Session() | ||
| self._session.headers["Content-Type"] = "application/json" | ||
|
|
||
| if auth_token is not None: | ||
| self._session.headers["Authorization"] = f"Bearer {auth_token}" | ||
|
|
||
| if ssl_ca_cert is not None: | ||
| self._session.verify = ssl_ca_cert | ||
|
|
||
| def __enter__(self): | ||
| return self | ||
|
|
||
| def __exit__(self, *_: Any): | ||
| self.close() | ||
|
|
||
| def close(self): | ||
| self._session.close() | ||
|
|
||
| def load_table(self, identifier: str | Identifier) -> Table: | ||
| database, table = self.identifier_to_database_and_table(identifier) | ||
| url = f"{self._uri}/v1/databases/{database}/tables/{table}" | ||
| logger.info("Calling load_table for table: %s.%s", database, table) | ||
|
|
||
| response = self._session.get(url, timeout=self._timeout) | ||
| if not response.ok: | ||
| if response.status_code == 404: | ||
| raise NoSuchTableError(f"Table {database}.{table} does not exist") | ||
| raise OSError( | ||
| f"Failed to load table {database}.{table}: HTTP {response.status_code}. Response: {response.text}" | ||
| ) | ||
|
|
||
| table_response = response.json() | ||
| metadata_location = table_response.get(_TABLE_LOCATION) | ||
| if not metadata_location: | ||
| raise OpenHouseCatalogError( | ||
| f"Response for table {database}.{table} is missing '{_TABLE_LOCATION}'. Response: {table_response}" | ||
| ) | ||
|
|
||
| file_io = load_file_io(properties=self.properties, location=metadata_location) | ||
| metadata_file = file_io.new_input(metadata_location) | ||
| metadata = FromInputFile.table_metadata(metadata_file) | ||
|
|
||
| logger.debug("Calling load_table succeeded") | ||
| return Table( | ||
| identifier=(database, table), | ||
| metadata=metadata, | ||
| metadata_location=metadata_location, | ||
| io=file_io, | ||
| catalog=self, | ||
| ) | ||
|
|
||
| # -- Unsupported operations -- | ||
| # Required by the Catalog ABC but not needed for read-only table loading. | ||
|
|
||
| def drop_table(self, *_: Any, **__: Any) -> None: | ||
| raise NotImplementedError | ||
|
|
||
| def purge_table(self, *_: Any, **__: Any) -> None: | ||
| raise NotImplementedError | ||
|
|
||
| def rename_table(self, *_: Any, **__: Any) -> Table: | ||
| raise NotImplementedError | ||
|
|
||
| def create_table(self, *_: Any, **__: Any) -> Table: | ||
| raise NotImplementedError | ||
|
|
||
| def create_table_transaction(self, *_: Any, **__: Any) -> Any: | ||
| raise NotImplementedError | ||
|
|
||
| def register_table(self, *_: Any, **__: Any) -> Table: | ||
| raise NotImplementedError | ||
|
|
||
| def commit_table(self, *_: Any, **__: Any) -> Any: | ||
| raise NotImplementedError | ||
|
|
||
| def list_tables(self, *_: Any, **__: Any) -> list[Identifier]: | ||
| raise NotImplementedError | ||
|
|
||
| def list_namespaces(self, *_: Any, **__: Any) -> list[Identifier]: | ||
| raise NotImplementedError | ||
|
|
||
| def create_namespace(self, *_: Any, **__: Any) -> None: | ||
| raise NotImplementedError | ||
|
|
||
| def drop_namespace(self, *_: Any, **__: Any) -> None: | ||
| raise NotImplementedError | ||
|
|
||
| def load_namespace_properties(self, *_: Any, **__: Any) -> dict[str, str]: | ||
| raise NotImplementedError | ||
|
|
||
| def update_namespace_properties(self, *_: Any, **__: Any) -> Any: | ||
| raise NotImplementedError | ||
|
|
||
| def list_views(self, *_: Any, **__: Any) -> list[Identifier]: | ||
| raise NotImplementedError | ||
|
|
||
| def drop_view(self, *_: Any, **__: Any) -> None: | ||
| raise NotImplementedError | ||
|
|
||
| def table_exists(self, *_: Any, **__: Any) -> bool: | ||
|
robreeves marked this conversation as resolved.
cbb330 marked this conversation as resolved.
|
||
| raise NotImplementedError | ||
|
|
||
| def view_exists(self, *_: Any, **__: Any) -> bool: | ||
| raise NotImplementedError | ||
|
|
||
| def namespace_exists(self, *_: Any, **__: Any) -> bool: | ||
| raise NotImplementedError | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.