diff --git a/cloudsmith_cli/cli/commands/whoami.py b/cloudsmith_cli/cli/commands/whoami.py index ffbf9842..59fa31c2 100644 --- a/cloudsmith_cli/cli/commands/whoami.py +++ b/cloudsmith_cli/cli/commands/whoami.py @@ -1,14 +1,11 @@ """CLI/Commands - Retrieve authentication status.""" -import os - import click from ...core import keyring from ...core.api.exceptions import ApiException from ...core.api.user import get_token_metadata, get_user_brief from .. import decorators, utils -from ..config import CredentialsReader from ..exceptions import handle_api_exceptions from .main import main @@ -26,26 +23,17 @@ def _get_active_method(api_config): def _get_api_key_source(opts): """Determine where the API key was loaded from. - Checks in priority order matching actual resolution: - CLI --api-key flag > CLOUDSMITH_API_KEY env var > credentials.ini. + Uses the credential provider chain result attached by initialise_api. """ - if not opts.api_key: - return {"configured": False, "source": None, "source_key": None} - - env_key = os.environ.get("CLOUDSMITH_API_KEY") - - # If env var is set but differs from the resolved key, CLI flag won - if env_key and opts.api_key != env_key: - source, key = "CLI --api-key flag", "cli_flag" - elif env_key: - suffix = env_key[-4:] - source, key = f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", "env_var" - elif creds := CredentialsReader.find_existing_files(): - source, key = f"credentials.ini ({creds[0]})", "credentials_file" - else: - source, key = "CLI --api-key flag", "cli_flag" - - return {"configured": True, "source": source, "source_key": key} + credential = getattr(opts, "credential", None) + if credential: + return { + "configured": True, + "source": credential.source_detail or credential.source_name, + "source_key": credential.source_name, + } + + return {"configured": False, "source": None, "source_key": None} def _get_sso_status(api_host): diff --git a/cloudsmith_cli/cli/decorators.py b/cloudsmith_cli/cli/decorators.py index ba75e236..06ce7836 100644 --- a/cloudsmith_cli/cli/decorators.py +++ b/cloudsmith_cli/cli/decorators.py @@ -7,6 +7,8 @@ from cloudsmith_cli.cli import validators from ..core.api.init import initialise_api as _initialise_api +from ..core.credentials import CredentialContext, CredentialProviderChain +from ..core.credentials.session import create_session as _create_session from ..core.mcp import server from . import config, utils @@ -20,6 +22,14 @@ def report_retry(seconds, context=None): ) +def _pop_boolean_flag(kwargs, name, invert=False): + """Pop a boolean flag from kwargs, optionally inverting it.""" + value = kwargs.pop(name) + if value is not None and invert: + value = not value + return value + + def common_package_action_options(f): """Add common options for package actions.""" @@ -214,15 +224,17 @@ def common_api_auth_options(f): def wrapper(ctx, *args, **kwargs): # pylint: disable=missing-docstring opts = config.get_or_create_options(ctx) - opts.api_key = kwargs.pop("api_key") + api_key = kwargs.pop("api_key") + if api_key: + opts.api_key = api_key kwargs["opts"] = opts return ctx.invoke(f, *args, **kwargs) return wrapper -def initialise_api(f): - """Initialise the Cloudsmith API for use.""" +def initialise_session(f): + """Create a shared HTTP session with proxy/SSL/user-agent settings.""" @click.option( "--api-host", envvar="CLOUDSMITH_API_HOST", help="The API host to connect to." @@ -252,6 +264,78 @@ def initialise_api(f): envvar="CLOUDSMITH_API_HEADERS", help="A CSV list of extra headers (key=value) to send to the API.", ) + @click.pass_context + @functools.wraps(f) + def wrapper(ctx, *args, **kwargs): + # pylint: disable=missing-docstring + opts = config.get_or_create_options(ctx) + opts.api_host = kwargs.pop("api_host") + opts.api_proxy = kwargs.pop("api_proxy") + opts.api_ssl_verify = _pop_boolean_flag( + kwargs, "without_api_ssl_verify", invert=True + ) + opts.api_user_agent = kwargs.pop("api_user_agent") + opts.api_headers = kwargs.pop("api_headers") + + opts.session = _create_session( + proxy=opts.api_proxy, + ssl_verify=opts.api_ssl_verify, + user_agent=opts.api_user_agent, + headers=opts.api_headers, + ) + + kwargs["opts"] = opts + return ctx.invoke(f, *args, **kwargs) + + return wrapper + + +def resolve_credentials(f): + """Resolve credentials via the provider chain. Depends on initialise_session.""" + + @click.pass_context + @functools.wraps(f) + def wrapper(ctx, *args, **kwargs): + # pylint: disable=missing-docstring + opts = config.get_or_create_options(ctx) + + context = CredentialContext( + session=opts.session, + api_key=opts.api_key, + api_host=opts.api_host or "https://api.cloudsmith.io", + creds_file_path=ctx.meta.get("creds_file"), + profile=ctx.meta.get("profile"), + debug=opts.debug, + ) + + chain = CredentialProviderChain() + credential = chain.resolve(context) + + if context.keyring_refresh_failed: + click.secho( + "An error occurred when attempting to refresh your SSO access token. " + "To refresh this session, run 'cloudsmith auth'", + fg="yellow", + err=True, + ) + if credential: + click.secho( + "Falling back to API key authentication.", + fg="yellow", + err=True, + ) + + opts.credential = credential + + kwargs["opts"] = opts + return ctx.invoke(f, *args, **kwargs) + + return initialise_session(wrapper) + + +def initialise_api(f): + """Initialise the Cloudsmith API for use. Depends on resolve_credentials.""" + @click.option( "-R", "--without-rate-limit", @@ -294,20 +378,8 @@ def initialise_api(f): @functools.wraps(f) def wrapper(ctx, *args, **kwargs): # pylint: disable=missing-docstring - def _set_boolean(name, invert=False): - value = kwargs.pop(name) - value = value if value is not None else None - if value is not None and invert: - value = not value - return value - opts = config.get_or_create_options(ctx) - opts.api_host = kwargs.pop("api_host") - opts.api_proxy = kwargs.pop("api_proxy") - opts.api_ssl_verify = _set_boolean("without_api_ssl_verify", invert=True) - opts.api_user_agent = kwargs.pop("api_user_agent") - opts.api_headers = kwargs.pop("api_headers") - opts.rate_limit = _set_boolean("without_rate_limit", invert=True) + opts.rate_limit = _pop_boolean_flag(kwargs, "without_rate_limit", invert=True) opts.rate_limit_warning = kwargs.pop("rate_limit_warning") opts.error_retry_max = kwargs.pop("error_retry_max") opts.error_retry_backoff = kwargs.pop("error_retry_backoff") @@ -320,7 +392,7 @@ def call_print_rate_limit_info_with_opts(rate_info): opts.api_config = _initialise_api( debug=opts.debug, host=opts.api_host, - key=opts.api_key, + credential=opts.credential, proxy=opts.api_proxy, ssl_verify=opts.api_ssl_verify, user_agent=opts.api_user_agent, @@ -336,7 +408,7 @@ def call_print_rate_limit_info_with_opts(rate_info): kwargs["opts"] = opts return ctx.invoke(f, *args, **kwargs) - return wrapper + return resolve_credentials(wrapper) def initialise_mcp(f): diff --git a/cloudsmith_cli/cli/tests/conftest.py b/cloudsmith_cli/cli/tests/conftest.py index c42f23f4..ad6262dd 100644 --- a/cloudsmith_cli/cli/tests/conftest.py +++ b/cloudsmith_cli/cli/tests/conftest.py @@ -5,6 +5,7 @@ from ...core.api.init import initialise_api from ...core.api.repos import create_repo, delete_repo +from ...core.credentials import CredentialResult from .utils import random_str @@ -51,7 +52,9 @@ def organization(): @pytest.fixture() def tmp_repository(organization, api_host, api_key): """Yield a temporary repository.""" - initialise_api(host=api_host, key=api_key) + initialise_api( + host=api_host, credential=CredentialResult(api_key=api_key, source_name="test") + ) repo_data = create_repo(organization, {"name": random_str()}) yield repo_data delete_repo(organization, repo_data["slug"]) diff --git a/cloudsmith_cli/cli/tests/test_webserver.py b/cloudsmith_cli/cli/tests/test_webserver.py index 538ad147..12ecdf77 100644 --- a/cloudsmith_cli/cli/tests/test_webserver.py +++ b/cloudsmith_cli/cli/tests/test_webserver.py @@ -40,7 +40,11 @@ def test_refresh_api_config_passes_sso_token(self): mock_init_api.assert_called_once() call_kwargs = mock_init_api.call_args.kwargs - assert call_kwargs.get("access_token") == "test_sso_token_123" + credential = call_kwargs.get("credential") + assert credential is not None + assert credential.api_key == "test_sso_token_123" + assert credential.auth_type == "bearer" + assert credential.source_name == "sso" class TestAuthenticationWebRequestHandlerKeyring: diff --git a/cloudsmith_cli/cli/webserver.py b/cloudsmith_cli/cli/webserver.py index eff5d523..3d2ffca9 100644 --- a/cloudsmith_cli/cli/webserver.py +++ b/cloudsmith_cli/cli/webserver.py @@ -9,6 +9,7 @@ from ..core.api.exceptions import ApiException from ..core.api.init import initialise_api +from ..core.credentials import CredentialResult from ..core.keyring import store_sso_tokens from .saml import exchange_2fa_token @@ -79,7 +80,15 @@ def refresh_api_config_after_auth(self): user_agent=getattr(self.api_opts, "user_agent", None), headers=getattr(self.api_opts, "headers", None), rate_limit=getattr(self.api_opts, "rate_limit", True), - access_token=self.sso_access_token, + credential=( + CredentialResult( + api_key=self.sso_access_token, + source_name="sso", + auth_type="bearer", + ) + if self.sso_access_token + else None + ), ) def finish_request(self, request, client_address): diff --git a/cloudsmith_cli/core/api/init.py b/cloudsmith_cli/core/api/init.py index b6a856bc..ecbc8f6f 100644 --- a/cloudsmith_cli/core/api/init.py +++ b/cloudsmith_cli/core/api/init.py @@ -6,16 +6,13 @@ import click import cloudsmith_api -from ...cli import saml -from .. import keyring from ..rest import RestClient -from .exceptions import ApiException def initialise_api( debug=False, host=None, - key=None, + credential=None, proxy=None, ssl_verify=True, user_agent=None, @@ -26,7 +23,6 @@ def initialise_api( error_retry_backoff=None, error_retry_codes=None, error_retry_cb=None, - access_token=None, ): """Initialise the cloudsmith_api.Configuration.""" # FIXME: pylint: disable=too-many-arguments @@ -45,65 +41,15 @@ def initialise_api( config.verify_ssl = ssl_verify config.client_side_validation = False - # Use directly provided access token (e.g. from SSO callback), - # or fall back to keyring lookup if enabled. - if not access_token: - access_token = keyring.get_access_token(config.host) - - if access_token: - auth_header = config.headers.get("Authorization") - - # overwrite auth header if empty or is basic auth without username or password - if not auth_header or auth_header == config.get_basic_auth_token(): - refresh_token = keyring.get_refresh_token(config.host) - - try: - if keyring.should_refresh_access_token(config.host): - new_access_token, new_refresh_token = saml.refresh_access_token( - config.host, - access_token, - refresh_token, - session=saml.create_configured_session(config), - ) - keyring.store_sso_tokens( - config.host, new_access_token, new_refresh_token - ) - # Use the new tokens - access_token = new_access_token - except ApiException: - keyring.update_refresh_attempted_at(config.host) - - click.secho( - "An error occurred when attempting to refresh your SSO access token. To refresh this session, run 'cloudsmith auth'", - fg="yellow", - err=True, - ) - - # Clear access_token to prevent using expired token - access_token = None - - # Fall back to API key auth if available - if key: - click.secho( - "Falling back to API key authentication.", - fg="yellow", - err=True, - ) - config.api_key["X-Api-Key"] = key - - # Only use SSO token if refresh didn't fail - if access_token: - config.headers["Authorization"] = "Bearer {access_token}".format( - access_token=access_token - ) - - if config.debug: - click.echo("SSO access token config value set") - elif key: - config.api_key["X-Api-Key"] = key - - if config.debug: - click.echo("User API key config value set") + if credential: + if credential.auth_type == "bearer": + config.headers["Authorization"] = f"Bearer {credential.api_key}" + if config.debug: + click.echo("SSO access token config value set") + else: + config.api_key["X-Api-Key"] = credential.api_key + if config.debug: + click.echo("User API key config value set") auth_header = headers and config.headers.get("Authorization") if auth_header and " " in auth_header: diff --git a/cloudsmith_cli/core/credentials/__init__.py b/cloudsmith_cli/core/credentials/__init__.py new file mode 100644 index 00000000..d55413ce --- /dev/null +++ b/cloudsmith_cli/core/credentials/__init__.py @@ -0,0 +1,100 @@ +"""Credential Provider Chain for Cloudsmith CLI. + +Implements an AWS SDK-style credential resolution chain that evaluates +credential sources sequentially and returns the first valid result. +""" + +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import requests + +logger = logging.getLogger(__name__) + + +@dataclass +class CredentialContext: + """Context passed to credential providers during resolution. + + All values are populated directly from Click options / ``opts``. + """ + + session: requests.Session | None = None + api_key: str | None = None + api_host: str = "https://api.cloudsmith.io" + creds_file_path: str | None = None + profile: str | None = None + debug: bool = False + keyring_refresh_failed: bool = False + + +@dataclass +class CredentialResult: + """Result from a successful credential resolution.""" + + api_key: str + source_name: str + source_detail: str | None = None + auth_type: str = "api_key" + + +class CredentialProvider(ABC): + """Base class for credential providers.""" + + name: str = "base" + + @abstractmethod + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Attempt to resolve credentials. Return CredentialResult or None.""" + + +class CredentialProviderChain: + """Evaluates credential providers in order, returning the first valid result. + + If no providers are given, uses the default chain: + Keyring → CLIFlag. + """ + + def __init__(self, providers: list[CredentialProvider] | None = None): + if providers is not None: + self.providers = providers + else: + from .providers import CLIFlagProvider, KeyringProvider + + self.providers = [ + KeyringProvider(), + CLIFlagProvider(), + ] + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Evaluate each provider in order. Return the first successful result.""" + for provider in self.providers: + try: + result = provider.resolve(context) + if result is not None: + if context.debug: + logger.debug( + "Credentials resolved by %s: %s", + provider.name, + result.source_detail or result.source_name, + ) + return result + if context.debug: + logger.debug( + "Provider %s did not resolve credentials, trying next", + provider.name, + ) + except Exception: # pylint: disable=broad-exception-caught + # Intentionally broad - one provider failing shouldn't stop others + logger.debug( + "Provider %s raised an exception, skipping", + provider.name, + exc_info=True, + ) + continue + return None diff --git a/cloudsmith_cli/core/credentials/providers/__init__.py b/cloudsmith_cli/core/credentials/providers/__init__.py new file mode 100644 index 00000000..5482e397 --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/__init__.py @@ -0,0 +1,9 @@ +"""Credential providers for the Cloudsmith CLI.""" + +from .cli_flag import CLIFlagProvider +from .keyring_provider import KeyringProvider + +__all__ = [ + "CLIFlagProvider", + "KeyringProvider", +] diff --git a/cloudsmith_cli/core/credentials/providers/cli_flag.py b/cloudsmith_cli/core/credentials/providers/cli_flag.py new file mode 100644 index 00000000..99b55dcf --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/cli_flag.py @@ -0,0 +1,22 @@ +"""CLI flag credential provider.""" + +from __future__ import annotations + +from .. import CredentialContext, CredentialProvider, CredentialResult + + +class CLIFlagProvider(CredentialProvider): + """Resolves credentials from a CLI flag value passed via CredentialContext.""" + + name = "cli_flag" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + api_key = context.api_key + if api_key and api_key.strip(): + suffix = api_key.strip()[-4:] + return CredentialResult( + api_key=api_key.strip(), + source_name="cli_flag", + source_detail=f"--api-key flag, CLOUDSMITH_API_KEY, or credentials.ini (ends with ...{suffix})", + ) + return None diff --git a/cloudsmith_cli/core/credentials/providers/keyring_provider.py b/cloudsmith_cli/core/credentials/providers/keyring_provider.py new file mode 100644 index 00000000..c4f44e95 --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/keyring_provider.py @@ -0,0 +1,54 @@ +"""Keyring credential provider.""" + +from __future__ import annotations + +import logging + +from .. import CredentialContext, CredentialProvider, CredentialResult + +logger = logging.getLogger(__name__) + + +class KeyringProvider(CredentialProvider): + """Resolves credentials from SAML tokens stored in the system keyring.""" + + name = "keyring" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + from ....cli.saml import refresh_access_token + from ....core import keyring + + if not keyring.should_use_keyring(): + return None + + api_host = context.api_host + access_token = keyring.get_access_token(api_host) + + if not access_token: + return None + + try: + if keyring.should_refresh_access_token(api_host): + if not context.session: + return None + refresh_token = keyring.get_refresh_token(api_host) + new_access_token, new_refresh_token = refresh_access_token( + api_host, + access_token, + refresh_token, + session=context.session, + ) + keyring.store_sso_tokens(api_host, new_access_token, new_refresh_token) + access_token = new_access_token + except Exception: # pylint: disable=broad-exception-caught + keyring.update_refresh_attempted_at(api_host) + context.keyring_refresh_failed = True + logger.debug("Failed to refresh SAML token", exc_info=True) + return None + + return CredentialResult( + api_key=access_token, + source_name="keyring", + source_detail="SAML token from system keyring", + auth_type="bearer", + ) diff --git a/cloudsmith_cli/core/credentials/session.py b/cloudsmith_cli/core/credentials/session.py new file mode 100644 index 00000000..9e314efb --- /dev/null +++ b/cloudsmith_cli/core/credentials/session.py @@ -0,0 +1,63 @@ +"""HTTP session factory with networking configuration and retry support.""" + +from __future__ import annotations + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +#: Default retry policy: retry on connection errors, 429, and 5xx responses +#: with exponential back-off (1s, 2s, 4s). +DEFAULT_RETRY = Retry( + total=3, + backoff_factor=1, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"), + raise_on_status=False, +) + + +def create_session( + proxy: str | None = None, + ssl_verify: bool = True, + user_agent: str | None = None, + headers: dict | None = None, + api_key: str | None = None, + retry: Retry | None = DEFAULT_RETRY, +) -> requests.Session: + """Create a requests session with networking, auth, and retry configuration. + + Args: + proxy: HTTP/HTTPS proxy URL. + ssl_verify: Whether to verify SSL certificates. + user_agent: Custom User-Agent header value. + headers: Additional headers to include in every request. + api_key: If provided, set as a Bearer token in the Authorization header. + retry: urllib3 Retry configuration. Defaults to :data:`DEFAULT_RETRY`. + Pass ``None`` to disable automatic retries. + + Returns: + A configured :class:`requests.Session`. + """ + session = requests.Session() + + if retry is not None: + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + + if proxy: + session.proxies = {"http": proxy, "https": proxy} + + session.verify = ssl_verify + + if user_agent: + session.headers["User-Agent"] = user_agent + + if headers: + session.headers.update(headers) + + if api_key: + session.headers["Authorization"] = f"Bearer {api_key}" + + return session diff --git a/cloudsmith_cli/core/tests/test_cli_flag_provider.py b/cloudsmith_cli/core/tests/test_cli_flag_provider.py new file mode 100644 index 00000000..1f10f25e --- /dev/null +++ b/cloudsmith_cli/core/tests/test_cli_flag_provider.py @@ -0,0 +1,34 @@ +"""Tests for the CLI flag credential provider.""" + +from cloudsmith_cli.core.credentials import CredentialContext +from cloudsmith_cli.core.credentials.providers import CLIFlagProvider + + +class TestCLIFlagProvider: + def test_resolves_from_context(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key="my-api-key-1234") + result = provider.resolve(context) + assert result is not None + assert result.api_key == "my-api-key-1234" + assert result.source_name == "cli_flag" + assert result.auth_type == "api_key" + assert "1234" in result.source_detail + + def test_returns_none_when_not_set(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=None) + result = provider.resolve(context) + assert result is None + + def test_returns_none_for_empty_value(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=" ") + result = provider.resolve(context) + assert result is None + + def test_strips_whitespace(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=" my-key ") + result = provider.resolve(context) + assert result.api_key == "my-key" diff --git a/cloudsmith_cli/core/tests/test_credential_context.py b/cloudsmith_cli/core/tests/test_credential_context.py new file mode 100644 index 00000000..15c4b846 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_credential_context.py @@ -0,0 +1,14 @@ +"""Tests for the CredentialContext class.""" + +from cloudsmith_cli.core.credentials import CredentialContext + + +class TestCredentialContext: + def test_keyring_refresh_failed_defaults_false(self): + context = CredentialContext() + assert context.keyring_refresh_failed is False + + def test_keyring_refresh_failed_can_be_set(self): + context = CredentialContext() + context.keyring_refresh_failed = True + assert context.keyring_refresh_failed is True diff --git a/cloudsmith_cli/core/tests/test_credential_provider_chain.py b/cloudsmith_cli/core/tests/test_credential_provider_chain.py new file mode 100644 index 00000000..94eb1e46 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_credential_provider_chain.py @@ -0,0 +1,80 @@ +"""Tests for the credential provider chain.""" + +from cloudsmith_cli.core.credentials import ( + CredentialContext, + CredentialProvider, + CredentialProviderChain, + CredentialResult, +) + + +class DummyProvider(CredentialProvider): + """Test provider that returns a configurable result.""" + + def __init__(self, name, result=None, should_raise=False): + self.name = name + self._result = result + self._should_raise = should_raise + + def resolve(self, context): + if self._should_raise: + raise RuntimeError("Provider error") + return self._result + + +class TestCredentialProviderChain: + def test_first_provider_wins(self): + result1 = CredentialResult(api_key="key1", source_name="first") + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=result1), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key1" + assert result.source_name == "first" + + def test_falls_through_to_second(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_returns_none_when_all_fail(self): + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=None), + ] + ) + result = chain.resolve(CredentialContext()) + assert result is None + + def test_skips_erroring_provider(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", should_raise=True), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_empty_chain(self): + chain = CredentialProviderChain([]) + result = chain.resolve(CredentialContext()) + assert result is None + + def test_default_chain_order(self): + chain = CredentialProviderChain() + assert len(chain.providers) == 2 + assert chain.providers[0].name == "keyring" + assert chain.providers[1].name == "cli_flag" diff --git a/cloudsmith_cli/core/tests/test_init.py b/cloudsmith_cli/core/tests/test_init.py index 5906796c..33881b16 100644 --- a/cloudsmith_cli/core/tests/test_init.py +++ b/cloudsmith_cli/core/tests/test_init.py @@ -1,62 +1,8 @@ -import os -from unittest.mock import patch - -import pytest from cloudsmith_api import Configuration -from ...cli import saml -from .. import keyring from ..api.init import initialise_api -@pytest.fixture -def mocked_get_access_token(): - with patch.object( - keyring, "get_access_token", return_value="dummy_access_token" - ) as get_access_token_mock: - yield get_access_token_mock - - -@pytest.fixture -def mocked_get_refresh_token(): - with patch.object( - keyring, "get_refresh_token", return_value="dummy_refresh_token" - ) as get_refresh_token_mock: - yield get_refresh_token_mock - - -@pytest.fixture -def mocked_should_refresh_access_token(): - with patch.object( - keyring, "should_refresh_access_token", return_value=False - ) as should_refresh_access_token_mock: - yield should_refresh_access_token_mock - - -@pytest.fixture -def mocked_refresh_access_token(): - with patch.object( - saml, - "refresh_access_token", - return_value=("new_access_token", "new_refresh_token"), - ) as refresh_access_token_mock: - yield refresh_access_token_mock - - -@pytest.fixture -def mocked_store_sso_tokens(): - with patch.object(keyring, "store_sso_tokens") as store_sso_tokens_mock: - yield store_sso_tokens_mock - - -@pytest.fixture -def mocked_update_refresh_attempted_at(): - with patch.object( - keyring, "update_refresh_attempted_at" - ) as update_refresh_attempted_at_mock: - yield update_refresh_attempted_at_mock - - class TestInitialiseApi: def setup_class(cls): # pylint: disable=no-self-argument # For the purposes of these tests, we need to explicitly call set_default(None) at the @@ -65,14 +11,10 @@ def setup_class(cls): # pylint: disable=no-self-argument # Configuration class to its vanilla, unmodified behaviour/state. Configuration.set_default(None) - def test_initialise_api_sets_cloudsmith_api_config_default( - self, mocked_get_access_token - ): + def test_initialise_api_sets_cloudsmith_api_config_default(self): """Assert that the extra attributes we add to the cloudsmith_cli.Configuration class are present on newly-created instances of that class. """ - mocked_get_access_token.return_value = None - # Read and understand the Configuration class's initialiser. # Notice how the _default class attribute is used if not None. # https://github.com/cloudsmith-io/cloudsmith-api/blob/57963fff5b7818783b3d87246495275545d505df/bindings/python/src/cloudsmith_api/configuration.py#L32-L40 @@ -122,64 +64,49 @@ def test_initialise_api_sets_cloudsmith_api_config_default( is not new_config_after_initialise ) - def test_initialise_api_with_refreshable_access_token_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - mocked_should_refresh_access_token.return_value = True - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com") + def test_initialise_api_sets_bearer_auth_with_access_token(self): + """Verify access_token is set as Bearer auth header.""" + from cloudsmith_cli.core.credentials import CredentialResult - assert config.headers == {"Authorization": "Bearer new_access_token"} - mocked_refresh_access_token.assert_called_once() - mocked_store_sso_tokens.assert_called_once_with( - "https://example.com", "new_access_token", "new_refresh_token" + credential = CredentialResult( + api_key="test_access_token", source_name="test", auth_type="bearer" ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.headers == {"Authorization": "Bearer test_access_token"} - def test_initialise_api_with_recently_refreshed_access_token_and_empty_basic_auth_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - auth_header = Configuration().get_basic_auth_token() + def test_initialise_api_sets_api_key(self): + """Verify key is set as X-Api-Key header.""" + from cloudsmith_cli.core.credentials import CredentialResult - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api( - host="https://example.com", headers={"Authorization": auth_header} - ) + credential = CredentialResult( + api_key="test_api_key", source_name="test", auth_type="api_key" + ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.api_key["X-Api-Key"] == "test_api_key" - assert config.headers == {"Authorization": "Bearer dummy_access_token"} - assert config.username == "" - assert config.password == "" - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() + def test_initialise_api_bearer_credential(self): + """Verify bearer credential sets Authorization header, not X-Api-Key.""" + from cloudsmith_cli.core.credentials import CredentialResult - def test_initialise_api_with_recently_refreshed_access_token_and_present_basic_auth( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): + Configuration.set_default(None) + credential = CredentialResult( + api_key="test_access_token", source_name="test", auth_type="bearer" + ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.headers == {"Authorization": "Bearer test_access_token"} + assert "X-Api-Key" not in config.api_key + + def test_initialise_api_with_basic_auth_header(self): + """Verify basic auth header is parsed into username and password.""" temp_config = Configuration() temp_config.username = "username" temp_config.password = "password" @@ -191,181 +118,3 @@ def test_initialise_api_with_recently_refreshed_access_token_and_present_basic_a assert config.headers == {"Authorization": auth_header} assert config.username == "username" assert config.password == "password" - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() - - def test_initialise_api_skips_keyring_when_env_var_set( - self, - mocked_get_access_token, - ): - """Verify keyring returns None when CLOUDSMITH_NO_KEYRING=1.""" - mocked_get_access_token.return_value = None - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api(host="https://example.com", key="test_api_key") - - # get_access_token is called but returns None due to internal guard - mocked_get_access_token.assert_called_once() - # API key should be used instead - assert config.api_key["X-Api-Key"] == "test_api_key" - - def test_initialise_api_uses_keyring_when_env_var_not_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - ): - """Verify keyring is accessed when CLOUDSMITH_NO_KEYRING is not set.""" - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com") - - # Keyring should be accessed - mocked_get_access_token.assert_called_once() - assert config.headers == {"Authorization": "Bearer dummy_access_token"} - - def test_initialise_api_falls_back_to_api_key_when_sso_refresh_fails( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify API key is used as fallback when SSO token refresh fails.""" - from ..api.exceptions import ApiException - - # Simulate SSO token refresh failure - mocked_should_refresh_access_token.return_value = True - mocked_refresh_access_token.side_effect = ApiException( - status=401, detail="Unauthorized" - ) - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com", key="fallback_api_key") - - # Should not use expired SSO token - assert ( - "Authorization" not in config.headers - or config.headers.get("Authorization") != "Bearer dummy_access_token" - ) - # Should fall back to API key - assert config.api_key["X-Api-Key"] == "fallback_api_key" - mocked_update_refresh_attempted_at.assert_called_once() - mocked_store_sso_tokens.assert_not_called() - - def test_initialise_api_no_auth_when_sso_refresh_fails_without_api_key( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify expired SSO token is not used when refresh fails and no API key available.""" - from ..api.exceptions import ApiException - - # Reset Configuration to clear any state from previous tests - Configuration.set_default(None) - - # Simulate SSO token refresh failure - mocked_should_refresh_access_token.return_value = True - mocked_refresh_access_token.side_effect = ApiException( - status=401, detail="Unauthorized" - ) - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com", key=None) - - # Should not use expired SSO token - assert ( - "Authorization" not in config.headers - or config.headers.get("Authorization") != "Bearer dummy_access_token" - ) - # Should not have API key either - assert "X-Api-Key" not in config.api_key - mocked_update_refresh_attempted_at.assert_called_once() - mocked_store_sso_tokens.assert_not_called() - - def test_initialise_api_uses_direct_access_token_when_keyring_disabled( - self, - mocked_get_access_token, - ): - """Verify a directly provided access_token is used even when keyring is disabled. - - This is the critical path for --request-api-key with CLOUDSMITH_NO_KEYRING=1. - The SSO callback provides the access token directly, bypassing keyring storage. - """ - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api( - host="https://example.com", - access_token="sso_direct_token_abc123", - ) - - # Keyring should NOT be accessed - mocked_get_access_token.assert_not_called() - # The directly provided access token should be used as Bearer auth - assert config.headers == {"Authorization": "Bearer sso_direct_token_abc123"} - - def test_initialise_api_direct_access_token_takes_precedence_over_keyring( - self, - mocked_get_access_token, - mocked_should_refresh_access_token, - ): - """Verify a directly provided access_token takes precedence over keyring.""" - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api( - host="https://example.com", - access_token="direct_token_xyz", - ) - - # Keyring should NOT be accessed because we have a direct token - mocked_get_access_token.assert_not_called() - # The direct access token should be used - assert config.headers == {"Authorization": "Bearer direct_token_xyz"} - - def test_initialise_api_direct_access_token_skips_refresh( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify a directly provided access_token skips the refresh cycle entirely. - - When the SSO callback provides a fresh token - directly (e.g. for --request-api-key with CLOUDSMITH_NO_KEYRING=1), - we must NOT attempt to refresh it. The refresh path would fail because - there is no refresh_token in keyring, clearing the access_token and - leaving zero authentication. - """ - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api( - host="https://example.com", - access_token="fresh_sso_token", - ) - - # Keyring lookup should be skipped (direct token provided) - mocked_get_access_token.assert_not_called() - # should_refresh_access_token is called but returns False - # due to internal should_use_keyring() guard - mocked_should_refresh_access_token.assert_called_once() - # Refresh logic should NOT be triggered - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() - # The fresh SSO token should be used as-is - assert config.headers == {"Authorization": "Bearer fresh_sso_token"} diff --git a/cloudsmith_cli/core/tests/test_keyring_provider.py b/cloudsmith_cli/core/tests/test_keyring_provider.py new file mode 100644 index 00000000..3a6732af --- /dev/null +++ b/cloudsmith_cli/core/tests/test_keyring_provider.py @@ -0,0 +1,81 @@ +"""Tests for the keyring credential provider.""" + +import os +from unittest.mock import MagicMock, patch + +from cloudsmith_cli.core.credentials import CredentialContext +from cloudsmith_cli.core.credentials.providers import KeyringProvider + + +class TestKeyringProvider: + def test_returns_none_when_keyring_disabled(self): + provider = KeyringProvider() + with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): + result = provider.resolve(CredentialContext()) + assert result is None + + def test_returns_none_when_no_token(self): + from cloudsmith_cli.core import keyring + + provider = KeyringProvider() + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object(keyring, "get_access_token", return_value=None): + result = provider.resolve(CredentialContext()) + assert result is None + + def test_returns_bearer_token(self): + from cloudsmith_cli.core import keyring + + provider = KeyringProvider() + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object( + keyring, "get_access_token", return_value="sso_token" + ): + with patch.object( + keyring, "should_refresh_access_token", return_value=False + ): + result = provider.resolve(CredentialContext()) + assert result is not None + assert result.api_key == "sso_token" + assert result.auth_type == "bearer" + assert result.source_name == "keyring" + + def test_returns_none_on_refresh_failure(self): + from cloudsmith_cli.cli import saml + from cloudsmith_cli.core import keyring + from cloudsmith_cli.core.api.exceptions import ApiException + + provider = KeyringProvider() + context = CredentialContext(session=MagicMock()) + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object( + keyring, "get_access_token", return_value="old_token" + ): + with patch.object( + keyring, "should_refresh_access_token", return_value=True + ): + with patch.object( + keyring, "get_refresh_token", return_value="refresh_tok" + ): + with patch.object( + saml, + "refresh_access_token", + side_effect=ApiException( + status=401, detail="Unauthorized" + ), + ): + with patch.object( + keyring, "update_refresh_attempted_at" + ): + result = provider.resolve(context) + assert result is None + assert context.keyring_refresh_failed is True diff --git a/cloudsmith_cli/core/tests/test_rest.py b/cloudsmith_cli/core/tests/test_rest.py index 364af4e7..08335720 100644 --- a/cloudsmith_cli/core/tests/test_rest.py +++ b/cloudsmith_cli/core/tests/test_rest.py @@ -5,9 +5,21 @@ from ..rest import RestClient +@pytest.fixture(autouse=True) +def patch_httpretty_socket(monkeypatch): + """Patch httpretty's fake socket to handle shutdown() which urllib3 2.0+ calls.""" + import httpretty.core + + monkeypatch.setattr( + httpretty.core.fakesock.socket, + "shutdown", + lambda self, how: None, + raising=False, + ) + + class TestRestClient: @httpretty.activate(allow_net_connect=False, verbose=True) - @pytest.mark.usefixtures("mock_keyring") def test_implicit_retry_for_status_codes(self): """Assert that the rest client retries certain status codes automatically.""" # initialise_api() needs to be called before RestClient can be instantiated, @@ -39,32 +51,3 @@ def test_implicit_retry_for_status_codes(self): assert len(httpretty.latest_requests()) == 6 assert r.status == 200 - - -@pytest.fixture -def mock_keyring(monkeypatch): - """Mock keyring functions to prevent reading real SSO tokens from the system keyring. - - This is necessary because initialise_api() checks the keyring for SSO tokens, - and if found, it attempts to refresh them via a network request. When running - this test in isolation with httpretty mocking enabled, that network request - will fail because it's not mocked. - """ - # Import here to avoid circular imports - import httpretty.core - - from .. import keyring - - # Mock all keyring getter functions to return None/False - monkeypatch.setattr(keyring, "get_access_token", lambda api_host: None) - monkeypatch.setattr(keyring, "get_refresh_token", lambda api_host: None) - monkeypatch.setattr(keyring, "should_refresh_access_token", lambda api_host: False) - - # Patch httpretty's fake socket to handle shutdown() which urllib3 2.0+ calls - # This fixes: "Failed to socket.shutdown because a real socket does not exist" - monkeypatch.setattr( - httpretty.core.fakesock.socket, - "shutdown", - lambda self, how: None, - raising=False, - )