Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
_ENV_CONFIG = Config()

TOKEN = "token"
AUTH_MANAGER = "auth.manager"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, could we move this to pyiceberg/catalog/rest/auth.py? with the other auth manager code.

TYPE = "type"
PY_CATALOG_IMPL = "py-catalog-impl"
ICEBERG = "iceberg"
Expand Down
25 changes: 14 additions & 11 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,7 @@
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt

from pyiceberg import __version__
from pyiceberg.catalog import (
BOTOCORE_SESSION,
TOKEN,
URI,
WAREHOUSE_LOCATION,
Catalog,
PropertiesUpdateSummary,
)
from pyiceberg.catalog import AUTH_MANAGER, BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
from pyiceberg.catalog.rest.response import _handle_non_200_response
from pyiceberg.exceptions import (
Expand All @@ -49,7 +42,7 @@
TableAlreadyExistsError,
UnauthorizedError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.table import (
Expand Down Expand Up @@ -218,6 +211,7 @@ class ListViewsResponse(IcebergBaseModel):
class RestCatalog(Catalog):
uri: str
_session: Session
_auth_manager: AuthManager | None

def __init__(self, name: str, **properties: str):
"""Rest Catalog.
Expand All @@ -229,6 +223,7 @@ def __init__(self, name: str, **properties: str):
properties: Properties that are passed along to the configuration.
"""
super().__init__(name, **properties)
self._auth_manager: AuthManager | None = None
self.uri = properties[URI]
self._fetch_config()
self._session = self._create_session()
Expand Down Expand Up @@ -263,16 +258,24 @@ def _create_session(self) -> Session:
if auth_type != CUSTOM and auth_impl:
raise ValueError("auth.impl can only be specified when using custom auth.type")

session.auth = AuthManagerAdapter(AuthManagerFactory.create(auth_impl or auth_type, auth_type_config))
self._auth_manager = AuthManagerFactory.create(auth_impl or auth_type, auth_type_config)
session.auth = AuthManagerAdapter(self._auth_manager)
else:
session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session))
self._auth_manager = self._create_legacy_oauth2_auth_manager(session)
session.auth = AuthManagerAdapter(self._auth_manager)

# Configure SigV4 Request Signing
if property_as_bool(self.properties, SIGV4, False):
self._init_sigv4(session)

return session

def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
merged_properties = {**self.properties, **properties}
if self._auth_manager:
merged_properties[AUTH_MANAGER] = self._auth_manager
return load_file_io(merged_properties, location)

def is_rest_scan_planning_enabled(self) -> bool:
"""Check if rest server-side scan planning is enabled.

Expand Down
16 changes: 13 additions & 3 deletions pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from fsspec.implementations.local import LocalFileSystem
from requests import HTTPError

from pyiceberg.catalog import TOKEN, URI
from pyiceberg.catalog import AUTH_MANAGER, TOKEN, URI
from pyiceberg.exceptions import SignError
from pyiceberg.io import (
ADLS_ACCOUNT_HOST,
Expand Down Expand Up @@ -121,9 +121,19 @@ def __call__(self, request: "AWSRequest", **_: Any) -> None:
signer_url = self.properties.get(S3_SIGNER_URI, self.properties[URI]).rstrip("/") # type: ignore
signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)

signer_headers = {}
signer_headers: dict[str, str] = {}

auth_header: str | None = None
if token := self.properties.get(TOKEN):
signer_headers = {"Authorization": f"Bearer {token}"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was partially mentioned but, should the auth manager take precedence over token when both are set?

auth_header = f"Bearer {token}"
Comment on lines 127 to +128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can get rid of accessing TOKEN through properties here and just standardize on using the auth manager.

Copy link
Collaborator

@sungwy sungwy Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be out of scope for this bug-fix PR, but it looks like we’re tightly coupling AuthManager and authentication tokens which are RestCatalog concepts into FileIO, which should be Catalog type agnostic.

It might be worth revisiting this design in more detail in the future to ensure we don’t introduce fallback logic that’s driven by configuration properties rather than clearer separation of concerns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea great point. i think it would be better to split out the "rest signer" from fileio. there's a good example already in the REST catalog,

def _init_sigv4(self, session: Session) -> None:
from urllib import parse
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from requests import PreparedRequest
from requests.adapters import HTTPAdapter
class SigV4Adapter(HTTPAdapter):
def __init__(self, **properties: str):
super().__init__()
self._properties = properties
self._boto_session = boto3.Session(
region_name=get_first_property_value(self._properties, AWS_REGION),
botocore_session=self._properties.get(BOTOCORE_SESSION),
aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID),
aws_secret_access_key=get_first_property_value(self._properties, AWS_SECRET_ACCESS_KEY),
aws_session_token=get_first_property_value(self._properties, AWS_SESSION_TOKEN),
)
def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylint: disable=W0613
credentials = self._boto_session.get_credentials().get_frozen_credentials()
region = self._properties.get(SIGV4_REGION, self._boto_session.region_name)
service = self._properties.get(SIGV4_SERVICE, "execute-api")
url = str(request.url).split("?")[0]
query = str(parse.urlsplit(request.url).query)
params = dict(parse.parse_qsl(query))
# remove the connection header as it will be updated after signing
del request.headers["connection"]
aws_request = AWSRequest(
method=request.method, url=url, params=params, data=request.body, headers=dict(request.headers)
)
SigV4Auth(credentials, service, region).add_auth(aws_request)
original_header = request.headers
signed_headers = aws_request.headers
relocated_headers = {}
# relocate headers if there is a conflict with signed headers
for header, value in original_header.items():
if header in signed_headers and signed_headers[header] != value:
relocated_headers[f"Original-{header}"] = value
request.headers.update(relocated_headers)
request.headers.update(signed_headers)
session.mount(self.uri, SigV4Adapter(**self.properties))

it might also be easier to just pass in the request Session from the REST catalog to the Signer. So we dont need to recreate the auth header directly

but again, we can refactor this after the bug fix :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed - just leaving a comment so we don't forget 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case we forget, #2862

elif auth_manager := self.properties.get(AUTH_MANAGER):
header = getattr(auth_manager, "auth_header", None)
if callable(header):
auth_header = header()
Comment on lines +130 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
header = getattr(auth_manager, "auth_header", None)
if callable(header):
auth_header = header()
auth_header = auth_manager.auth_header()

could we just call the function directly? this will fail if the auth_header function does not exist.
i think the current solution will fail silently, i.e. not add any auth header if the auth_header function does not exist


if auth_header:
signer_headers["Authorization"] = auth_header

signer_headers.update(get_header_properties(self.properties))

signer_body = {
Expand Down
51 changes: 51 additions & 0 deletions tests/io/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from fsspec.spec import AbstractFileSystem
from requests_mock import Mocker

from pyiceberg.catalog import AUTH_MANAGER
from pyiceberg.exceptions import SignError
from pyiceberg.io import fsspec
from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner
Expand Down Expand Up @@ -948,3 +949,53 @@ def test_s3v4_rest_signer_forbidden(requests_mock: Mocker) -> None:
"""Failed to sign request 401: {'method': 'HEAD', 'region': 'us-west-2', 'uri': 'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro', 'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}"""
in str(exc_info.value)
)


def test_s3v4_rest_signer_uses_auth_manager(requests_mock: Mocker) -> None:
new_uri = "https://bucket/metadata/snap-signed.avro"
requests_mock.post(
f"{TEST_URI}/v1/aws/s3/sign",
json={
"uri": new_uri,
"headers": {
"Authorization": ["AWS4-HMAC-SHA256 Credential=ASIA.../s3/aws4_request, SignedHeaders=host, Signature=abc"],
"Host": ["bucket.s3.us-west-2.amazonaws.com"],
},
"extensions": {},
},
status_code=200,
)

request = AWSRequest(
method="HEAD",
url="https://bucket/metadata/snap-foo.avro",
headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
data=b"",
params={},
auth_path="/metadata/snap-foo.avro",
)
request.context = {
"client_region": "us-west-2",
"has_streaming_input": False,
"auth_type": None,
"signing": {"bucket": "bucket"},
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
}

class DummyAuthManager:
def __init__(self) -> None:
self.calls = 0

def auth_header(self) -> str:
self.calls += 1
return "Bearer via-manager"

auth_manager = DummyAuthManager()

signer = S3V4RestSigner(properties={AUTH_MANAGER: auth_manager, "uri": TEST_URI})
signer(request)

assert auth_manager.calls == 1
assert requests_mock.last_request is not None
assert requests_mock.last_request.headers["Authorization"] == "Bearer via-manager"
assert request.url == new_uri