Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0200aba
Add OpenHouseTableCatalog with TableCatalog protocol for DataLoader
robreeves Feb 13, 2026
213a7d3
Add required catalog parameter to OpenHouseDataLoader
robreeves Feb 13, 2026
46f52da
Inherit OpenHouseTableCatalog from PyIceberg Catalog
robreeves Feb 13, 2026
5316373
Rename OpenHouseTableCatalog to OpenHouseCatalog
robreeves Feb 13, 2026
9d25fba
Rename table_catalog.py to catalog.py
robreeves Feb 13, 2026
2850c4d
Add OpenHouseCatalogError and improve error handling in catalog
robreeves Feb 13, 2026
92d13c4
Add logging to OpenHouseCatalog matching Java catalog pattern
robreeves Feb 13, 2026
f39feaa
Verify all Table properties in load_table test
robreeves Feb 13, 2026
19c7215
Improve test coverage and consistency in catalog tests
robreeves Feb 14, 2026
c3b69dd
Refactor catalog tests to use responses library and improve clarity
robreeves Feb 17, 2026
675f414
Handle malformed JSON responses in OpenHouseCatalog.load_table
robreeves Feb 17, 2026
8791ed9
Make identifier parsing private and return TableIdentifier
robreeves Feb 18, 2026
91e5315
Add context manager support, narrow exception handling, and truncate …
robreeves Feb 18, 2026
1584bbf
Include truncation details inline in error messages
robreeves Feb 18, 2026
62ea9f6
Standardize error types: OSError for server errors, let JSON/file err…
robreeves Feb 18, 2026
02da1cc
Remove response truncation logic from error messages
robreeves Feb 18, 2026
dfa652d
Use pluggable FileIO via _load_file_io instead of hardcoded PyArrowFi…
robreeves Feb 19, 2026
0e31f4a
Use public load_file_io instead of private _load_file_io method
robreeves Feb 19, 2026
a56fcaa
Replace _parse_identifier with base class identifier_to_database_and_…
robreeves Feb 19, 2026
5682b81
Replace **properties with explicit constructor params and add timeout
robreeves Feb 19, 2026
286d211
Raise NoSuchTableError for 404 responses
robreeves Feb 19, 2026
156a794
Remove manual integration test script
robreeves Feb 19, 2026
6e9c4dd
Rename trust_store to ssl_ca_cert
robreeves Feb 20, 2026
443e910
Add optional properties param for PyIceberg base class config
robreeves Feb 20, 2026
ca6637c
Add catalog integration test to CI
robreeves Feb 20, 2026
9f21338
Add integration test instructions to dataloader README
robreeves Feb 20, 2026
1822393
Add unit test verifying DEFAULT_SCHEME resolves schemeless paths to hdfs
robreeves Feb 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/build-run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ jobs:
working-directory: integrations/python/dataloader
run: make sync verify

- name: Run Data Loader Integration Tests
working-directory: integrations/python/dataloader
run: make integration-tests TOKEN_FILE=../../../tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/resources/dummy.token

- name: Install dependencies
run: pip install -r scripts/python/requirements.txt

Expand Down
5 changes: 4 additions & 1 deletion integrations/python/dataloader/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: help sync clean lint format format-check check test verify build package-check
.PHONY: help sync clean lint format format-check check test verify build package-check integration-tests

help:
@echo "Available commands:"
Expand Down Expand Up @@ -37,6 +37,9 @@ build:
package-check:
uv run twine check dist/*

integration-tests:
uv run python tests/integration_test_catalog.py $(TOKEN_FILE)

clean:
rm -rf build/
rm -rf dist/
Expand Down
23 changes: 21 additions & 2 deletions integrations/python/dataloader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,28 @@ filters = (col("age") >= 18) & (col("country").is_in(["US", "CA"])) & ~col("emai
# Set up environment
make sync

# Run tests
make test
# Run all checks and unit tests
make verify

# See all available commands
make help
```

### Integration Tests

Integration tests run the data loader end to end against an instance of OpenHouse running in Docker.

**Prerequisites:** Docker and a Gradle build of OpenHouse.

```bash
# Create docker OpenHouse instance from repo root
./gradlew clean build
docker compose -f infra/recipes/docker-compose/oh-only/docker-compose.yml up -d --build

# Run integration tests
make -C integrations/python/dataloader sync
make -C integrations/python/dataloader integration-tests TOKEN_FILE=../../../tables-test-fixtures/tables-test-fixtures-iceberg-1.2/src/main/resources/dummy.token

# Stop docker
docker compose -f infra/recipes/docker-compose/oh-only/docker-compose.yml down
```
4 changes: 2 additions & 2 deletions integrations/python/dataloader/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ readme = "README.md"
requires-python = ">=3.12"
license = {text = "BSD-2-Clause"}
keywords = ["openhouse", "data-loader", "lakehouse", "iceberg", "datafusion"]
dependencies = ["datafusion==51.0.0", "pyiceberg~=0.11.0"]
dependencies = ["datafusion==51.0.0", "pyiceberg~=0.11.0", "requests>=2.31.0"]

[project.optional-dependencies]
dev = ["ruff>=0.9.0", "pytest>=8.0.0", "twine>=6.0.0"]
dev = ["responses>=0.25.0", "ruff>=0.9.0", "pytest>=8.0.0", "twine>=6.0.0"]

[tool.hatch.version]
source = "vcs"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from importlib.metadata import version

from openhouse.dataloader.catalog import OpenHouseCatalog, OpenHouseCatalogError
from openhouse.dataloader.data_loader import DataLoaderContext, OpenHouseDataLoader
from openhouse.dataloader.filters import always_true, col

__version__ = version("openhouse.dataloader")
__all__ = ["OpenHouseDataLoader", "DataLoaderContext", "always_true", "col"]
__all__ = [
"OpenHouseDataLoader",
"DataLoaderContext",
"OpenHouseCatalog",
"OpenHouseCatalogError",
"always_true",
"col",
]
155 changes: 155 additions & 0 deletions integrations/python/dataloader/src/openhouse/dataloader/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import logging
from typing import Any

import requests
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.io import load_file_io
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import Table
from pyiceberg.typedef import Identifier

logger = logging.getLogger(__name__)

_TABLE_LOCATION = "tableLocation"


class OpenHouseCatalogError(Exception):
"""Error raised when the OpenHouse catalog fails to load a table."""


class OpenHouseCatalog(Catalog):
Comment thread
robreeves marked this conversation as resolved.
"""Client-side catalog implementation for Iceberg tables in OpenHouse.

Leverages the OpenHouse Tables Service REST API to load table metadata.

Args:
name: Catalog name
uri: OpenHouse Tables Service base URL
auth_token: JWT Bearer token for authentication
ssl_ca_cert: Path to CA cert bundle for SSL verification
timeout_seconds: HTTP request timeout in seconds
properties: Additional properties forwarded to the PyIceberg Catalog base class
(e.g., ``py-io-impl`` for custom FileIO)
"""

def __init__(
self,
name: str,
uri: str,
auth_token: str | None = None,
ssl_ca_cert: str | None = None,
timeout_seconds: float = 30,
properties: dict[str, str] | None = None,
):
super().__init__(name, uri=uri, **(properties or {}))
self._uri = uri.rstrip("/")
self._timeout = timeout_seconds
logger.info("Initializing OpenHouseCatalog for service at %s", self._uri)
self._session = requests.Session()
self._session.headers["Content-Type"] = "application/json"

if auth_token is not None:
self._session.headers["Authorization"] = f"Bearer {auth_token}"

if ssl_ca_cert is not None:
self._session.verify = ssl_ca_cert

def __enter__(self):
return self

def __exit__(self, *_: Any):
self.close()

def close(self):
self._session.close()

def load_table(self, identifier: str | Identifier) -> Table:
database, table = self.identifier_to_database_and_table(identifier)
url = f"{self._uri}/v1/databases/{database}/tables/{table}"
logger.info("Calling load_table for table: %s.%s", database, table)

response = self._session.get(url, timeout=self._timeout)
if not response.ok:
if response.status_code == 404:
raise NoSuchTableError(f"Table {database}.{table} does not exist")
raise OSError(
f"Failed to load table {database}.{table}: HTTP {response.status_code}. Response: {response.text}"
)

table_response = response.json()
metadata_location = table_response.get(_TABLE_LOCATION)
if not metadata_location:
raise OpenHouseCatalogError(
f"Response for table {database}.{table} is missing '{_TABLE_LOCATION}'. Response: {table_response}"
)

file_io = load_file_io(properties=self.properties, location=metadata_location)
metadata_file = file_io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(metadata_file)

logger.debug("Calling load_table succeeded")
return Table(
identifier=(database, table),
metadata=metadata,
metadata_location=metadata_location,
io=file_io,
catalog=self,
)

# -- Unsupported operations --
# Required by the Catalog ABC but not needed for read-only table loading.

def drop_table(self, *_: Any, **__: Any) -> None:
raise NotImplementedError

def purge_table(self, *_: Any, **__: Any) -> None:
raise NotImplementedError

def rename_table(self, *_: Any, **__: Any) -> Table:
raise NotImplementedError

def create_table(self, *_: Any, **__: Any) -> Table:
raise NotImplementedError

def create_table_transaction(self, *_: Any, **__: Any) -> Any:
raise NotImplementedError

def register_table(self, *_: Any, **__: Any) -> Table:
raise NotImplementedError

def commit_table(self, *_: Any, **__: Any) -> Any:
raise NotImplementedError

def list_tables(self, *_: Any, **__: Any) -> list[Identifier]:
raise NotImplementedError

def list_namespaces(self, *_: Any, **__: Any) -> list[Identifier]:
raise NotImplementedError

def create_namespace(self, *_: Any, **__: Any) -> None:
raise NotImplementedError

def drop_namespace(self, *_: Any, **__: Any) -> None:
raise NotImplementedError

def load_namespace_properties(self, *_: Any, **__: Any) -> dict[str, str]:
raise NotImplementedError

def update_namespace_properties(self, *_: Any, **__: Any) -> Any:
raise NotImplementedError

def list_views(self, *_: Any, **__: Any) -> list[Identifier]:
raise NotImplementedError

def drop_view(self, *_: Any, **__: Any) -> None:
raise NotImplementedError

def table_exists(self, *_: Any, **__: Any) -> bool:
Comment thread
robreeves marked this conversation as resolved.
Comment thread
cbb330 marked this conversation as resolved.
raise NotImplementedError

def view_exists(self, *_: Any, **__: Any) -> bool:
raise NotImplementedError

def namespace_exists(self, *_: Any, **__: Any) -> bool:
raise NotImplementedError
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass

from pyiceberg.catalog import Catalog

from openhouse.dataloader.data_loader_split import DataLoaderSplit
from openhouse.dataloader.filters import Filter, always_true
from openhouse.dataloader.table_identifier import TableIdentifier
Expand Down Expand Up @@ -31,6 +33,7 @@ class OpenHouseDataLoader:

def __init__(
self,
catalog: Catalog,
database: str,
table: str,
branch: str | None = None,
Expand All @@ -40,13 +43,15 @@ def __init__(
):
"""
Args:
catalog: Catalog for loading table metadata
database: Database name
table: Table name
branch: Optional branch name
columns: Column names to load, or None to load all columns
filters: Row filter expression, defaults to always_true() (all rows)
context: Data loader context
"""
self._catalog = catalog
self._table = TableIdentifier(database, table, branch)
self._columns = columns
self._filters = filters if filters is not None else always_true()
Expand Down
Loading