diff --git a/scripts/test_auth_e2e.py b/scripts/test_auth_e2e.py new file mode 100644 index 0000000..76375b6 --- /dev/null +++ b/scripts/test_auth_e2e.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +"""Interactive end-to-end test for the CLI auth flow. + +Starts a tiny local HTTP server that serves the /auth/cli-config endpoint, +then runs `layerlens login` against it. + +Usage: + python scripts/test_auth_e2e.py + +This tests the full flow WITHOUT needing the real backend running. +The device-code step will still fail (no real Cognito) but it validates +that config discovery, credential storage, and the CLI plumbing all work. + +To test against the real staging/production backend, set: + LAYERLENS_STRATIX_BASE_URL=https://api.layerlens.ai/api/v1 layerlens login +""" + +import json +import threading +from http.server import HTTPServer, BaseHTTPRequestHandler + +# Fake Cognito config — replace with real values to test end-to-end +FAKE_AUTH_CONFIG = { + "region": "us-east-1", + "client_id": "REPLACE_WITH_REAL_CLIENT_ID", + "domain": "atlas-production", + "scopes": "openid profile email", +} + +# Fake login response +FAKE_LOGIN_RESPONSE = { + "access_token": "fake-access-token-xyz", + "id_token": "fake-id-token-xyz", + "refresh_token": "fake-refresh-token-xyz", + "expires_in": 3600, + "token_type": "Bearer", + "user": {"email": "test@example.com", "given_name": "Test", "name": "Test User"}, +} + +# Fake device-code response +FAKE_DEVICE_CODE_RESPONSE = { + "device_code": "test-device-code-000", + "user_code": "ABCD-1234", + "verification_uri": "https://atlas-production.auth.us-east-1.amazoncognito.com/activate", + "verification_uri_complete": "https://atlas-production.auth.us-east-1.amazoncognito.com/activate?user_code=ABCD-1234", + "expires_in": 60, + "interval": 5, +} + + +class MockHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path.endswith("/dgklmnr/auth/cli-config"): + self._json_response(200, FAKE_AUTH_CONFIG) + else: + self._json_response(404, {"error": "not found"}) + + def do_POST(self): + if self.path.endswith("/dgklmnr/auth/cli-login"): + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) if length else {} + if not body.get("email") or not body.get("password"): + self._json_response(400, {"error": "Email and password are required"}) + elif body["email"] == "test@example.com" and body["password"] == "password123": + self._json_response(200, FAKE_LOGIN_RESPONSE) + else: + self._json_response(401, {"error": "Invalid email or password"}) + elif "deviceAuthorization" in self.path: + self._json_response(200, FAKE_DEVICE_CODE_RESPONSE) + elif "oauth2/token" in self.path: + # Simulate authorization_pending + self._json_response(400, {"error": "authorization_pending"}) + else: + self._json_response(404, {"error": "not found"}) + + def _json_response(self, status, body): + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(body).encode()) + + def log_message(self, format, *args): + print(f" [mock-server] {format % args}") + + +def main(): + port = 18923 + server = HTTPServer(("127.0.0.1", port), MockHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + base_url = f"http://127.0.0.1:{port}/api/v1" + print(f"\n Mock server running at {base_url}") + print(f" Testing config discovery...\n") + + # Test 1: Config discovery + import layerlens.cli._auth as auth_mod + from layerlens.cli._auth import fetch_auth_config + + auth_mod._cached_auth_config = None # clear cache + + config = fetch_auth_config(base_url) + print(f" Config fetched: {json.dumps(config, indent=2)}") + assert config["client_id"] == FAKE_AUTH_CONFIG["client_id"], "Config mismatch!" + print(" [PASS] Config discovery works\n") + + # Test 2: Credential storage round-trip + from layerlens.cli._auth import load_credentials, save_credentials, clear_credentials + + test_creds = {"access_token": "test-tok", "auth_config": config} + save_credentials(test_creds) + loaded = load_credentials() + assert loaded["access_token"] == "test-tok" + print(" [PASS] Credential storage works\n") + + clear_credentials() + assert load_credentials() is None + print(" [PASS] Credential clearing works\n") + + # Test 3: CLI login via email/password + print(" Testing CLI login command (email/password)...\n") + + import os + + os.environ["LAYERLENS_STRATIX_BASE_URL"] = base_url + auth_mod._cached_auth_config = None # clear cache + + from layerlens.cli._auth import cli_login + + creds = cli_login("test@example.com", "password123", base_url=base_url) + assert creds["access_token"] == FAKE_LOGIN_RESPONSE["access_token"] + assert creds["user"]["email"] == "test@example.com" + print(" [PASS] CLI login works\n") + + # Test 4: CLI login command via CliRunner + print(" Testing CLI login command via CliRunner...\n") + clear_credentials() + auth_mod._cached_auth_config = None + + from click.testing import CliRunner + + from layerlens.cli._app import cli + + runner = CliRunner() + result = runner.invoke(cli, ["login"], input="test@example.com\npassword123\n") + print(f" Exit code: {result.exit_code}") + print(f" Output: {result.output}") + assert result.exit_code == 0 + loaded = load_credentials() + assert loaded is not None and loaded["access_token"] == FAKE_LOGIN_RESPONSE["access_token"] + print(" [PASS] CLI login command works\n") + + # Test 5: whoami command + print(" Testing whoami command...\n") + result = runner.invoke(cli, ["whoami"]) + print(f" Exit code: {result.exit_code}") + print(f" Output: {result.output}") + assert result.exit_code == 0 + print(" [PASS] whoami works\n") + + # Test 6: logout command + result = runner.invoke(cli, ["logout"]) + assert result.exit_code == 0 + assert load_credentials() is None + print(" [PASS] logout works\n") + + server.shutdown() + print("\n Done! All checks passed.") + + +if __name__ == "__main__": + main() diff --git a/src/layerlens/_client.py b/src/layerlens/_client.py index 032e15b..873ab4c 100644 --- a/src/layerlens/_client.py +++ b/src/layerlens/_client.py @@ -10,8 +10,8 @@ from . import _exceptions from ._utils import is_mapping -from .models import Organization, OrganizationResponse -from ._constants import DEFAULT_TIMEOUT, DEFAULT_BASE_URL +from .models import Organization, OrganizationResponse, OrganizationsListResponse +from ._constants import DEFAULT_TIMEOUT, DEFAULT_BASE_URL, DIRTY_ROUTER_PREFIX from ._exceptions import StratixError, APIStatusError from ._base_client import BaseClient, BaseAsyncClient @@ -37,6 +37,7 @@ class Stratix(BaseClient): api_key: str organization_id: str | None project_id: str | None + _use_bearer_auth: bool def __init__( self, @@ -44,6 +45,7 @@ def __init__( api_key: str | None = None, base_url: str | httpx.URL | None = None, timeout: Union[float, httpx.Timeout, None] = DEFAULT_TIMEOUT, + use_bearer_auth: bool = False, ) -> None: """Construct a new synchronous Stratix client instance. @@ -57,12 +59,17 @@ def __init__( "The api_key client option must be set either by passing api_key to the client or by setting the LAYERLENS_STRATIX_API_KEY environment variable" ) self.api_key = api_key + self._use_bearer_auth = use_bearer_auth if base_url is None: base_url = os.environ.get("LAYERLENS_STRATIX_BASE_URL") or os.environ.get("LAYERLENS_ATLAS_BASE_URL") if base_url is None: base_url = DEFAULT_BASE_URL + # Bearer auth (OAuth tokens) routes through the dirty prefix + if use_bearer_auth: + base_url = str(base_url).rstrip("/") + DIRTY_ROUTER_PREFIX + super().__init__( base_url=base_url, timeout=timeout, @@ -154,7 +161,11 @@ def public(self) -> PublicClient: @property @override def auth_headers(self) -> dict[str, str]: - return {"x-api-key": self.api_key} if self.api_key else {} + if not self.api_key: + return {} + if self._use_bearer_auth: + return {"Authorization": f"Bearer {self.api_key}"} + return {"x-api-key": self.api_key} def copy( self, @@ -171,6 +182,7 @@ def copy( api_key=api_key or self.api_key, base_url=base_url or self.base_url, timeout=self.timeout or timeout, + use_bearer_auth=self._use_bearer_auth, **_extra_kwargs, ) @@ -214,8 +226,32 @@ def _make_status_error( return APIStatusError(err_msg, response=response, body=data) def _get_organization(self) -> Optional[Organization]: + if self._use_bearer_auth: + # JWT-authenticated route returns a list of organizations + resp = super().get_cast( + "/organizations", + timeout=30, + cast_to=OrganizationsListResponse, + ) + if isinstance(resp, OrganizationsListResponse) and resp.data: + # Try to find an org owned by the logged-in user that has projects + from .cli._auth import load_credentials + + creds = load_credentials() + email = (creds or {}).get("user", {}).get("email") if creds else None + if email: + for org in resp.data: + if org.owner_id == email and org.projects: + return org + # Fall back to first org with projects + for org in resp.data: + if org.projects: + return org + return resp.data[0] + return None + organization = super().get_cast( - f"/organizations", + "/organizations", timeout=30, cast_to=OrganizationResponse, ) @@ -226,6 +262,7 @@ class AsyncStratix(BaseAsyncClient): api_key: str organization_id: str | None project_id: str | None + _use_bearer_auth: bool def __init__( self, @@ -233,6 +270,7 @@ def __init__( api_key: str | None = None, base_url: str | httpx.URL | None = None, timeout: float | httpx.Timeout | None = DEFAULT_TIMEOUT, + use_bearer_auth: bool = False, ) -> None: """Construct a new asynchronous Stratix client instance. @@ -247,12 +285,16 @@ def __init__( "or by setting the LAYERLENS_STRATIX_API_KEY environment variable" ) self.api_key = api_key + self._use_bearer_auth = use_bearer_auth if base_url is None: base_url = os.environ.get("LAYERLENS_STRATIX_BASE_URL") or os.environ.get("LAYERLENS_ATLAS_BASE_URL") if base_url is None: base_url = DEFAULT_BASE_URL + if use_bearer_auth: + base_url = str(base_url).rstrip("/") + DIRTY_ROUTER_PREFIX + super().__init__(base_url=base_url, timeout=timeout) organization = self._get_organization() @@ -341,7 +383,11 @@ def public(self) -> AsyncPublicClient: @property @override def auth_headers(self) -> dict[str, str]: - return {"x-api-key": self.api_key} if self.api_key else {} + if not self.api_key: + return {} + if self._use_bearer_auth: + return {"Authorization": f"Bearer {self.api_key}"} + return {"x-api-key": self.api_key} def copy( self, @@ -355,6 +401,7 @@ def copy( api_key=api_key or self.api_key, base_url=base_url or self.base_url, timeout=self.timeout or timeout, + use_bearer_auth=self._use_bearer_auth, **_extra_kwargs, ) @@ -398,6 +445,23 @@ def _get_organization(self) -> Optional[Organization]: data = response.json() + if self._use_bearer_auth: + resp = OrganizationsListResponse(**data) + if resp.data: + from .cli._auth import load_credentials + + creds = load_credentials() + email = (creds or {}).get("user", {}).get("email") if creds else None + if email: + for org in resp.data: + if org.owner_id == email and org.projects: + return org + for org in resp.data: + if org.projects: + return org + return resp.data[0] + return None + organization = OrganizationResponse(**data) return organization.data if isinstance(organization, OrganizationResponse) else None diff --git a/src/layerlens/_constants.py b/src/layerlens/_constants.py index 2440945..5d659a8 100644 --- a/src/layerlens/_constants.py +++ b/src/layerlens/_constants.py @@ -4,3 +4,10 @@ DEFAULT_TIMEOUT = httpx.Timeout(timeout=600, connect=5.0) DEFAULT_BASE_URL = "https://api.layerlens.ai/api/v1" + +# The "dirty" router prefix used by the backend for browser/session routes +DIRTY_ROUTER_PREFIX = "/dgklmnr" + +# CLI auth endpoints (appended to base URL + dirty prefix) +AUTH_CLI_CONFIG_PATH = DIRTY_ROUTER_PREFIX + "/auth/cli-config" +AUTH_CLI_LOGIN_PATH = DIRTY_ROUTER_PREFIX + "/auth/cli-login" diff --git a/src/layerlens/cli/_app.py b/src/layerlens/cli/_app.py index 489d965..fd58961 100644 --- a/src/layerlens/cli/_app.py +++ b/src/layerlens/cli/_app.py @@ -4,6 +4,7 @@ from .._version import __version__ from .commands.ci import ci +from .commands.auth import login, logout, whoami from .commands.bulk import bulk from .commands.judge import judge from .commands.space import space @@ -78,6 +79,11 @@ def cli( cli.add_command(bulk) cli.add_command(ci) +# Auth commands +cli.add_command(login) +cli.add_command(logout) +cli.add_command(whoami) + @cli.command("completion") @click.argument("shell", type=click.Choice(["bash", "zsh", "fish", "powershell"])) diff --git a/src/layerlens/cli/_auth.py b/src/layerlens/cli/_auth.py new file mode 100644 index 0000000..ece8f36 --- /dev/null +++ b/src/layerlens/cli/_auth.py @@ -0,0 +1,272 @@ +"""Credential storage, token refresh, and CLI login via backend.""" + +from __future__ import annotations + +import os +import json +import time +from typing import Any, Dict, Optional +from pathlib import Path + +import httpx + +from .._constants import DEFAULT_BASE_URL, AUTH_CLI_LOGIN_PATH, AUTH_CLI_CONFIG_PATH + +CREDENTIALS_DIR = Path.home() / ".layerlens" +CREDENTIALS_FILE = CREDENTIALS_DIR / "credentials" + +# How many seconds before expiry to trigger a proactive refresh +TOKEN_REFRESH_MARGIN = 300 # 5 minutes + + +# --------------------------------------------------------------------------- +# Credential storage (JSON, atomic writes) +# --------------------------------------------------------------------------- + + +def _ensure_dir() -> None: + CREDENTIALS_DIR.mkdir(parents=True, exist_ok=True) + CREDENTIALS_DIR.chmod(0o700) + + +def load_credentials() -> Optional[Dict[str, Any]]: + """Load stored credentials from ~/.layerlens/credentials.""" + if not CREDENTIALS_FILE.exists(): + return None + try: + data = json.loads(CREDENTIALS_FILE.read_text()) + if not isinstance(data, dict): + return None + return data + except (json.JSONDecodeError, OSError): + return None + + +def save_credentials(creds: Dict[str, Any]) -> None: + """Persist credentials to ~/.layerlens/credentials with restrictive perms.""" + _ensure_dir() + tmp = CREDENTIALS_FILE.with_suffix(".tmp") + tmp.write_text(json.dumps(creds, indent=2)) + tmp.chmod(0o600) + tmp.rename(CREDENTIALS_FILE) + + +def clear_credentials() -> None: + """Remove stored credentials.""" + if CREDENTIALS_FILE.exists(): + CREDENTIALS_FILE.unlink() + + +# --------------------------------------------------------------------------- +# Auth config discovery +# --------------------------------------------------------------------------- + +_cached_auth_config: Optional[Dict[str, str]] = None + + +def _get_base_url() -> str: + return ( + os.environ.get("LAYERLENS_STRATIX_BASE_URL") or os.environ.get("LAYERLENS_ATLAS_BASE_URL") or DEFAULT_BASE_URL + ) + + +def fetch_auth_config(base_url: Optional[str] = None) -> Dict[str, str]: + """Fetch Cognito OAuth2 config from the backend discovery endpoint. + + Returns dict with keys: region, client_id, domain, scopes. + Caches the result in memory for the process lifetime. + """ + global _cached_auth_config + + if _cached_auth_config is not None: + return _cached_auth_config + + # Also check if stored credentials already have the config cached + creds = load_credentials() + if creds and "auth_config" in creds: + cached: Dict[str, str] = creds["auth_config"] + _cached_auth_config = cached + return cached + + url = (base_url or _get_base_url()).rstrip("/") + AUTH_CLI_CONFIG_PATH + resp = httpx.get(url, timeout=15) + resp.raise_for_status() + fetched: Dict[str, str] = resp.json() + + _cached_auth_config = fetched + return fetched + + +def _cognito_base_url(config: Dict[str, str]) -> str: + return f"https://{config['domain']}.auth.{config['region']}.amazoncognito.com" + + +def _token_url(config: Dict[str, str]) -> str: + return f"{_cognito_base_url(config)}/oauth2/token" + + +# --------------------------------------------------------------------------- +# Token helpers +# --------------------------------------------------------------------------- + + +def is_token_expired(creds: Dict[str, Any]) -> bool: + """Check whether the access token is expired or about to expire.""" + expires_at: float = creds.get("expires_at", 0) + return bool(time.time() >= (expires_at - TOKEN_REFRESH_MARGIN)) + + +def refresh_access_token(creds: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Use the refresh_token to obtain a new access token from Cognito. + + Returns updated credentials dict or ``None`` on failure. + """ + refresh_token = creds.get("refresh_token") + if not refresh_token: + return None + + auth_config = creds.get("auth_config") + if not auth_config: + try: + auth_config = fetch_auth_config() + except httpx.HTTPError: + return None + + try: + resp = httpx.post( + _token_url(auth_config), + data={ + "grant_type": "refresh_token", + "client_id": auth_config["client_id"], + "refresh_token": refresh_token, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30, + ) + resp.raise_for_status() + except httpx.HTTPError: + return None + + body = resp.json() + creds["access_token"] = body["access_token"] + creds["id_token"] = body.get("id_token", creds.get("id_token")) + creds["expires_at"] = time.time() + body.get("expires_in", 3600) + # Cognito does not rotate refresh tokens by default + if "refresh_token" in body: + creds["refresh_token"] = body["refresh_token"] + save_credentials(creds) + return creds + + +def get_valid_token() -> Optional[str]: + """Return a valid ID token for Bearer auth, refreshing transparently if needed. + + The backend JWT middleware expects claims (email, cognito:groups) that are + present in the Cognito **ID token**, not the access token. + + Falls back to ``LAYERLENS_API_KEY`` env-var for CI/CD. + """ + # 1. Env-var fallback (CI/CD) + env_key = os.environ.get("LAYERLENS_API_KEY") + if env_key: + return env_key + + creds = load_credentials() + if creds is None: + return None + + if is_token_expired(creds): + creds = refresh_access_token(creds) + if creds is None: + return None + + # Prefer id_token (has email + cognito:groups claims needed by backend JWT middleware) + return creds.get("id_token") or creds.get("access_token") + + +# --------------------------------------------------------------------------- +# Email/password login via backend /auth/cli-login +# --------------------------------------------------------------------------- + + +class LoginError(Exception): + """Raised when login fails.""" + + +def cli_login(email: str, password: str, base_url: Optional[str] = None) -> Dict[str, Any]: + """Authenticate with email/password via the backend CLI login endpoint. + + Returns the stored credentials dict. + """ + url = (base_url or _get_base_url()).rstrip("/") + AUTH_CLI_LOGIN_PATH + + resp = httpx.post( + url, + json={"email": email, "password": password}, + timeout=30, + ) + + if resp.status_code == 401: + raise LoginError("Invalid email or password.") + if resp.status_code == 400: + raise LoginError("Email and password are required.") + resp.raise_for_status() + + body = resp.json() + + # Also fetch auth config for future token refresh + try: + auth_config = fetch_auth_config(base_url) + except httpx.HTTPError: + auth_config = None + + creds: Dict[str, Any] = { + "access_token": body["access_token"], + "id_token": body.get("id_token"), + "refresh_token": body.get("refresh_token"), + "expires_at": time.time() + body.get("expires_in", 3600), + "token_type": body.get("token_type", "Bearer"), + "user": body.get("user"), + "base_url": base_url or _get_base_url(), + } + if auth_config: + creds["auth_config"] = auth_config + + save_credentials(creds) + return creds + + +# --------------------------------------------------------------------------- +# User-info helper +# --------------------------------------------------------------------------- + + +def get_user_info(access_token: str) -> Optional[Dict[str, Any]]: + """Return user info from stored credentials or Cognito userInfo endpoint.""" + creds = load_credentials() + + # First try stored user info from login response + if creds and creds.get("user"): + user_info: Dict[str, Any] = creds["user"] + return user_info + + # Fall back to Cognito userInfo endpoint + auth_config = (creds or {}).get("auth_config") + if not auth_config: + try: + auth_config = fetch_auth_config() + except httpx.HTTPError: + return None + + userinfo_url = f"{_cognito_base_url(auth_config)}/oauth2/userInfo" + try: + resp = httpx.get( + userinfo_url, + headers={"Authorization": f"Bearer {access_token}"}, + timeout=15, + ) + resp.raise_for_status() + result: Dict[str, Any] = resp.json() + return result + except httpx.HTTPError: + return None diff --git a/src/layerlens/cli/_client.py b/src/layerlens/cli/_client.py index a727363..d31eab8 100644 --- a/src/layerlens/cli/_client.py +++ b/src/layerlens/cli/_client.py @@ -13,11 +13,37 @@ def get_client(ctx: click.Context) -> Stratix: - """Create a Stratix client from CLI context options.""" + """Create a Stratix client from CLI context options. + + Resolution order for the API key: + 1. ``--api-key`` CLI option + 2. ``LAYERLENS_STRATIX_API_KEY`` / ``LAYERLENS_ATLAS_API_KEY`` env vars (handled by Stratix) + 3. ``LAYERLENS_API_KEY`` env var + 4. Stored OAuth token from ``layerlens login`` + """ + import os + + api_key = ctx.obj.get("api_key") + use_bearer_auth = False + + # Fall back to LAYERLENS_API_KEY env var, then stored credentials + if ( + not api_key + and not os.environ.get("LAYERLENS_STRATIX_API_KEY") + and not os.environ.get("LAYERLENS_ATLAS_API_KEY") + ): + from ._auth import get_valid_token + + token = get_valid_token() + if token: + api_key = token + use_bearer_auth = True + try: return Stratix( - api_key=ctx.obj.get("api_key"), + api_key=api_key, base_url=ctx.obj.get("base_url"), + use_bearer_auth=use_bearer_auth, ) except StratixError as e: click.echo(f"Error: {e}", err=True) diff --git a/src/layerlens/cli/commands/auth.py b/src/layerlens/cli/commands/auth.py new file mode 100644 index 0000000..c95779f --- /dev/null +++ b/src/layerlens/cli/commands/auth.py @@ -0,0 +1,96 @@ +"""Authentication commands: login, logout, whoami.""" + +from __future__ import annotations + +import sys + +import click + + +@click.command() +@click.pass_context +def login(ctx: click.Context) -> None: + """Authenticate with LayerLens using email and password.""" + from .._auth import ( + LoginError, + cli_login, + load_credentials, + ) + + existing = load_credentials() + if existing and existing.get("access_token"): + if not click.confirm("You are already logged in. Re-authenticate?", default=False): + return + + base_url = ctx.obj.get("base_url") if ctx.obj else None + + email = click.prompt(" Email") + password = click.prompt(" Password", hide_input=True) + + try: + creds = cli_login(email, password, base_url=base_url) + user = creds.get("user") or {} + name = user.get("given_name") or user.get("name") or email + click.echo(f"\n Logged in as {name}", err=True) + except LoginError as exc: + click.echo(f"Error: {exc}", err=True) + sys.exit(1) + except Exception as exc: + click.echo(f"Error: {exc}", err=True) + sys.exit(1) + + +@click.command() +def logout() -> None: + """Clear stored authentication credentials.""" + from .._auth import load_credentials, clear_credentials + + if load_credentials() is None: + click.echo("Not currently logged in.", err=True) + return + + clear_credentials() + click.echo("Logged out successfully.", err=True) + + +@click.command() +def whoami() -> None: + """Display the currently authenticated user.""" + import os + + from .._auth import ( + get_user_info, + get_valid_token, + load_credentials, + ) + + # Check env-var API key first + env_key = os.environ.get("LAYERLENS_API_KEY") + if env_key: + click.echo("Authenticated via LAYERLENS_API_KEY environment variable.", err=True) + return + + creds = load_credentials() + if creds is None: + click.echo("Not logged in. Run `layerlens login` to authenticate.", err=True) + sys.exit(1) + + token = get_valid_token() + if token is None: + click.echo("Session expired. Run `layerlens login` to re-authenticate.", err=True) + sys.exit(1) + + info = get_user_info(token) + if info is None: + click.echo("Authenticated (could not fetch user details).", err=True) + return + + email = info.get("email", "unknown") + name = info.get("name") or info.get("given_name", "") + sub = info.get("sub", "") + + click.echo(f" Email: {email}") + if name: + click.echo(f" Name: {name}") + if sub: + click.echo(f" ID: {sub}") diff --git a/src/layerlens/models/__init__.py b/src/layerlens/models/__init__.py index b4aa7a5..ff856e1 100644 --- a/src/layerlens/models/__init__.py +++ b/src/layerlens/models/__init__.py @@ -22,6 +22,7 @@ EvaluationSpacesResponse, TraceEvaluationsResponse, CreateEvaluationsResponse, + OrganizationsListResponse, JudgeOptimizationRunsResponse, TraceEvaluationResultsResponse, CreateJudgeOptimizationRunResponse, @@ -128,6 +129,7 @@ "OptimizationRunStatus", "Organization", "OrganizationResponse", + "OrganizationsListResponse", "Pagination", "PerformanceDetails", "Project", diff --git a/src/layerlens/models/api.py b/src/layerlens/models/api.py index 308f89f..fa34182 100644 --- a/src/layerlens/models/api.py +++ b/src/layerlens/models/api.py @@ -46,6 +46,10 @@ class OrganizationResponse(BaseModel): data: Organization +class OrganizationsListResponse(BaseModel): + data: List[Organization] + + class ResultMetrics(BaseModel): total_count: int diff --git a/src/layerlens/models/organization.py b/src/layerlens/models/organization.py index cacc6b3..0233b75 100644 --- a/src/layerlens/models/organization.py +++ b/src/layerlens/models/organization.py @@ -8,6 +8,7 @@ class Organization(BaseModel): id: str name: str + owner_id: Optional[str] = None projects: Optional[List[Project]] = None diff --git a/tests/cli/test_auth.py b/tests/cli/test_auth.py new file mode 100644 index 0000000..e673acc --- /dev/null +++ b/tests/cli/test_auth.py @@ -0,0 +1,416 @@ +"""Tests for CLI authentication: credential storage, token refresh, login flow.""" +# ruff: noqa: ARG002 # creds_dir fixture is used for its monkeypatch side effect + +from __future__ import annotations + +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from click.testing import CliRunner + +from layerlens.cli._app import cli +from layerlens.cli._auth import ( + is_token_expired, + load_credentials, + save_credentials, + clear_credentials, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def runner(): + try: + return CliRunner(mix_stderr=False) + except TypeError: + return CliRunner() + + +@pytest.fixture +def creds_dir(tmp_path: Path, monkeypatch): + """Redirect credential storage to a temp directory.""" + creds_dir = tmp_path / ".layerlens" + creds_file = creds_dir / "credentials" + monkeypatch.setattr("layerlens.cli._auth.CREDENTIALS_DIR", creds_dir) + monkeypatch.setattr("layerlens.cli._auth.CREDENTIALS_FILE", creds_file) + return creds_dir + + +@pytest.fixture(autouse=True) +def clear_config_cache(monkeypatch): + """Reset the in-memory auth config cache between tests.""" + monkeypatch.setattr("layerlens.cli._auth._cached_auth_config", None) + + +SAMPLE_AUTH_CONFIG = { + "region": "us-east-1", + "client_id": "test-client-id", + "domain": "atlas-test", + "scopes": "openid profile email", +} + + +@pytest.fixture +def sample_creds(): + return { + "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test", + "id_token": "id-tok", + "refresh_token": "refresh-tok", + "expires_at": time.time() + 3600, + "token_type": "Bearer", + "auth_config": SAMPLE_AUTH_CONFIG, + "user": {"email": "test@example.com", "given_name": "Test"}, + } + + +# --------------------------------------------------------------------------- +# Credential storage +# --------------------------------------------------------------------------- + + +class TestCredentialStorage: + def test_save_and_load(self, creds_dir, sample_creds): + save_credentials(sample_creds) + loaded = load_credentials() + assert loaded is not None + assert loaded["access_token"] == sample_creds["access_token"] + + def test_load_returns_none_when_missing(self, creds_dir): + assert load_credentials() is None + + def test_clear_credentials(self, creds_dir, sample_creds): + save_credentials(sample_creds) + assert load_credentials() is not None + clear_credentials() + assert load_credentials() is None + + def test_clear_when_nothing_stored(self, creds_dir): + clear_credentials() + + def test_file_permissions(self, creds_dir, sample_creds): + save_credentials(sample_creds) + creds_file = creds_dir / "credentials" + mode = creds_file.stat().st_mode & 0o777 + assert mode == 0o600 + + def test_dir_permissions(self, creds_dir, sample_creds): + save_credentials(sample_creds) + mode = creds_dir.stat().st_mode & 0o777 + assert mode == 0o700 + + def test_load_corrupted_file(self, creds_dir): + creds_dir.mkdir(parents=True, exist_ok=True) + (creds_dir / "credentials").write_text("not json{{{") + assert load_credentials() is None + + +# --------------------------------------------------------------------------- +# Auth config discovery +# --------------------------------------------------------------------------- + + +class TestAuthConfigDiscovery: + def test_fetch_from_api(self, creds_dir): + from layerlens.cli._auth import fetch_auth_config + + mock_resp = MagicMock() + mock_resp.json.return_value = SAMPLE_AUTH_CONFIG + mock_resp.raise_for_status = MagicMock() + + with patch("layerlens.cli._auth.httpx.get", return_value=mock_resp) as mock_get: + result = fetch_auth_config("https://api.example.com/api/v1") + + assert result["client_id"] == "test-client-id" + assert result["domain"] == "atlas-test" + mock_get.assert_called_once() + + def test_uses_cached_from_credentials(self, creds_dir, sample_creds): + from layerlens.cli._auth import fetch_auth_config + + save_credentials(sample_creds) + + with patch("layerlens.cli._auth.httpx.get") as mock_get: + result = fetch_auth_config() + + mock_get.assert_not_called() + assert result["client_id"] == "test-client-id" + + def test_caches_in_memory(self, creds_dir): + from layerlens.cli._auth import fetch_auth_config + + mock_resp = MagicMock() + mock_resp.json.return_value = SAMPLE_AUTH_CONFIG + mock_resp.raise_for_status = MagicMock() + + with patch("layerlens.cli._auth.httpx.get", return_value=mock_resp) as mock_get: + fetch_auth_config("https://api.example.com/api/v1") + fetch_auth_config("https://api.example.com/api/v1") + + assert mock_get.call_count == 1 + + +# --------------------------------------------------------------------------- +# Token expiry +# --------------------------------------------------------------------------- + + +class TestTokenExpiry: + def test_not_expired(self): + creds = {"expires_at": time.time() + 3600} + assert not is_token_expired(creds) + + def test_expired(self): + creds = {"expires_at": time.time() - 10} + assert is_token_expired(creds) + + def test_within_margin(self): + creds = {"expires_at": time.time() + 240} + assert is_token_expired(creds) + + def test_missing_expires_at(self): + assert is_token_expired({}) + + +# --------------------------------------------------------------------------- +# Token refresh +# --------------------------------------------------------------------------- + + +class TestTokenRefresh: + def test_refresh_success(self, creds_dir, sample_creds): + from layerlens.cli._auth import refresh_access_token + + save_credentials(sample_creds) + sample_creds["expires_at"] = time.time() - 10 + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "access_token": "new-access-token", + "id_token": "new-id-token", + "expires_in": 3600, + } + mock_resp.raise_for_status = MagicMock() + + with patch("layerlens.cli._auth.httpx.post", return_value=mock_resp): + result = refresh_access_token(sample_creds) + + assert result is not None + assert result["access_token"] == "new-access-token" + loaded = load_credentials() + assert loaded["access_token"] == "new-access-token" + + def test_refresh_no_refresh_token(self, creds_dir): + from layerlens.cli._auth import refresh_access_token + + result = refresh_access_token({"access_token": "tok"}) + assert result is None + + def test_refresh_http_error(self, creds_dir, sample_creds): + import httpx as _httpx + + from layerlens.cli._auth import refresh_access_token + + with patch("layerlens.cli._auth.httpx.post", side_effect=_httpx.HTTPError("fail")): + result = refresh_access_token(sample_creds) + + assert result is None + + +# --------------------------------------------------------------------------- +# get_valid_token +# --------------------------------------------------------------------------- + + +class TestGetValidToken: + def test_env_var_takes_precedence(self, creds_dir, monkeypatch): + from layerlens.cli._auth import get_valid_token + + monkeypatch.setenv("LAYERLENS_API_KEY", "env-key-123") + assert get_valid_token() == "env-key-123" + + def test_returns_stored_token(self, creds_dir, sample_creds, monkeypatch): + from layerlens.cli._auth import get_valid_token + + monkeypatch.delenv("LAYERLENS_API_KEY", raising=False) + save_credentials(sample_creds) + # get_valid_token prefers id_token (has email + cognito:groups for backend JWT middleware) + assert get_valid_token() == sample_creds["id_token"] + + def test_returns_none_when_no_creds(self, creds_dir, monkeypatch): + from layerlens.cli._auth import get_valid_token + + monkeypatch.delenv("LAYERLENS_API_KEY", raising=False) + assert get_valid_token() is None + + def test_refreshes_expired_token(self, creds_dir, sample_creds, monkeypatch): + from layerlens.cli._auth import get_valid_token + + monkeypatch.delenv("LAYERLENS_API_KEY", raising=False) + sample_creds["expires_at"] = time.time() - 10 + save_credentials(sample_creds) + + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "access_token": "refreshed-token", + "id_token": "refreshed-id-token", + "expires_in": 3600, + } + mock_resp.raise_for_status = MagicMock() + + with patch("layerlens.cli._auth.httpx.post", return_value=mock_resp): + token = get_valid_token() + + # get_valid_token prefers id_token + assert token == "refreshed-id-token" + + +# --------------------------------------------------------------------------- +# cli_login +# --------------------------------------------------------------------------- + + +class TestCLILogin: + def test_login_success(self, creds_dir): + from layerlens.cli._auth import cli_login + + login_resp = MagicMock() + login_resp.status_code = 200 + login_resp.json.return_value = { + "access_token": "access-tok", + "id_token": "id-tok", + "refresh_token": "refresh-tok", + "expires_in": 3600, + "token_type": "Bearer", + "user": {"email": "user@example.com", "given_name": "Test"}, + } + login_resp.raise_for_status = MagicMock() + + config_resp = MagicMock() + config_resp.json.return_value = SAMPLE_AUTH_CONFIG + config_resp.raise_for_status = MagicMock() + + with patch("layerlens.cli._auth.httpx.post", return_value=login_resp): + with patch("layerlens.cli._auth.httpx.get", return_value=config_resp): + result = cli_login("user@example.com", "pass123", base_url="https://api.test.com/api/v1") + + assert result["access_token"] == "access-tok" + assert result["user"]["email"] == "user@example.com" + + loaded = load_credentials() + assert loaded["access_token"] == "access-tok" + + def test_login_invalid_credentials(self, creds_dir): + from layerlens.cli._auth import LoginError, cli_login + + mock_resp = MagicMock() + mock_resp.status_code = 401 + + with patch("layerlens.cli._auth.httpx.post", return_value=mock_resp): + with pytest.raises(LoginError, match="Invalid email or password"): + cli_login("bad@example.com", "wrong") + + def test_login_missing_fields(self, creds_dir): + from layerlens.cli._auth import LoginError, cli_login + + mock_resp = MagicMock() + mock_resp.status_code = 400 + + with patch("layerlens.cli._auth.httpx.post", return_value=mock_resp): + with pytest.raises(LoginError, match="required"): + cli_login("", "") + + +# --------------------------------------------------------------------------- +# CLI command tests +# --------------------------------------------------------------------------- + + +class TestLoginCommand: + def test_login_success(self, runner, creds_dir): + mock_creds = { + "access_token": "tok", + "expires_at": time.time() + 3600, + "user": {"email": "user@test.com", "given_name": "Test"}, + } + + with patch("layerlens.cli._auth.cli_login", return_value=mock_creds): + with patch("layerlens.cli._auth.load_credentials", return_value=None): + result = runner.invoke(cli, ["login"], input="user@test.com\nsecret\n") + + assert result.exit_code == 0 + combined = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "logged in" in combined.lower() + + def test_login_already_logged_in_decline(self, runner, creds_dir): + existing = {"access_token": "old-tok"} + + with patch("layerlens.cli._auth.load_credentials", return_value=existing): + result = runner.invoke(cli, ["login"], input="n\n") + + assert result.exit_code == 0 + + def test_login_error(self, runner, creds_dir): + from layerlens.cli._auth import LoginError + + with patch("layerlens.cli._auth.load_credentials", return_value=None): + with patch("layerlens.cli._auth.cli_login", side_effect=LoginError("Invalid email or password.")): + result = runner.invoke(cli, ["login"], input="bad@test.com\nwrong\n") + + assert result.exit_code != 0 + combined = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "invalid" in combined.lower() + + +class TestLogoutCommand: + def test_logout_success(self, runner, creds_dir, sample_creds): + save_credentials(sample_creds) + result = runner.invoke(cli, ["logout"]) + assert result.exit_code == 0 + combined = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "logged out" in combined.lower() + assert load_credentials() is None + + def test_logout_not_logged_in(self, runner, creds_dir): + result = runner.invoke(cli, ["logout"]) + assert result.exit_code == 0 + combined = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "not currently" in combined.lower() + + +class TestWhoamiCommand: + def test_whoami_with_env_var(self, runner, monkeypatch): + monkeypatch.setenv("LAYERLENS_API_KEY", "env-key") + result = runner.invoke(cli, ["whoami"]) + assert result.exit_code == 0 + combined = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "LAYERLENS_API_KEY" in combined + + def test_whoami_not_logged_in(self, runner, creds_dir, monkeypatch): + monkeypatch.delenv("LAYERLENS_API_KEY", raising=False) + result = runner.invoke(cli, ["whoami"]) + assert result.exit_code != 0 + combined = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "not logged in" in combined.lower() + + def test_whoami_shows_user_info(self, runner, creds_dir, sample_creds, monkeypatch): + monkeypatch.delenv("LAYERLENS_API_KEY", raising=False) + save_credentials(sample_creds) + + with patch("layerlens.cli._auth.get_valid_token", return_value="tok"): + with patch( + "layerlens.cli._auth.get_user_info", + return_value={"email": "user@example.com", "name": "Test User", "sub": "abc-123"}, + ): + result = runner.invoke(cli, ["whoami"]) + + assert result.exit_code == 0 + combined = (result.output or "") + (getattr(result, "stderr", "") or "") + assert "user@example.com" in combined + assert "Test User" in combined