Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 66 additions & 18 deletions backend/services/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from datetime import datetime
from supabase import create_client, Client

from services.observability import logger


class SupabaseAuthService:
"""Supabase authentication and user management"""
Expand All @@ -21,46 +23,92 @@ def __init__(self):
if not all([self.supabase_url, self.supabase_key]):
raise ValueError("Supabase credentials not configured")

if not self.jwt_secret:
logger.warning("SUPABASE_JWT_SECRET not set -- falling back to API-based verification")

self.client: Client = create_client(self.supabase_url, self.supabase_key)

def verify_jwt(self, token: str) -> Dict[str, Any]:
"""
Verify Supabase JWT token and return user data
Verify Supabase JWT token locally using the signing secret.

Args:
token: JWT token from Authorization header (format: "Bearer <token>")

Returns:
Dict with user_id, email, and other user metadata

Raises:
HTTPException: If token is invalid or expired
No network call required -- instant verification using HS256.
Falls back to Supabase API call if JWT_SECRET is not configured.
"""
if token.lower().startswith("bearer "):
token = token[7:]

# local decode when secret is available (fast path, no network)
if self.jwt_secret:
return self._verify_local(token)

# fallback: API call to Supabase (slow path, requires network)
return self._verify_via_api(token)

def _verify_local(self, token: str) -> Dict[str, Any]:
"""Decode and verify JWT locally with HS256 secret."""
try:
# Remove "Bearer " prefix if present
if token.startswith("Bearer "):
token = token[7:]
payload = jwt.decode(
token,
self.jwt_secret,
algorithms=["HS256"],
audience="authenticated",
leeway=30,
)
Comment thread
DevanshuNEU marked this conversation as resolved.

# Use Supabase client to verify token and get user
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token missing subject claim",
)

return {
"user_id": user_id,
"email": payload.get("email"),
"metadata": payload.get("user_metadata") or {},
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
)
except jwt.InvalidAudienceError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token audience",
)
except jwt.InvalidTokenError as e:
logger.debug("JWT decode failed", error=str(e))
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)

def _verify_via_api(self, token: str) -> Dict[str, Any]:
"""Fallback: verify via Supabase API call (requires network)."""
try:
response = self.client.auth.get_user(token)

if not response.user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
detail="Invalid or expired token",
)

return {
"user_id": response.user.id,
"email": response.user.email,
"created_at": response.user.created_at,
"metadata": response.user.user_metadata
"metadata": response.user.user_metadata or {},
}

except HTTPException:
raise
except Exception as e:
logger.debug("API-based JWT verification failed", error=str(e))
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token verification failed: {str(e)}"
detail="Token verification failed",
)

async def signup(self, email: str, password: str, github_username: Optional[str] = None) -> Dict[str, Any]:
Expand Down
142 changes: 142 additions & 0 deletions backend/tests/test_jwt_local_decode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Tests for JWT local decode (OPE-75)."""
import pytest
import jwt as pyjwt
import time
from unittest.mock import patch, MagicMock


JWT_SECRET = "test-jwt-secret-for-unit-tests"


def _make_token(payload: dict, secret: str = JWT_SECRET) -> str:
"""Create a signed JWT for testing."""
defaults = {
"aud": "authenticated",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
"role": "authenticated",
}
return pyjwt.encode({**defaults, **payload}, secret, algorithm="HS256")


@pytest.fixture
def auth_service():
"""Create auth service with local JWT secret configured."""
with patch("services.auth.create_client") as mock_client:
mock_client.return_value = MagicMock()
with patch.dict("os.environ", {
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_ANON_KEY": "test-key",
"SUPABASE_JWT_SECRET": JWT_SECRET,
}):
from services.auth import SupabaseAuthService
yield SupabaseAuthService()


class TestLocalJWTDecode:
"""Verify local JWT decode works without network calls."""

def test_valid_token_returns_user_data(self, auth_service):
token = _make_token({"sub": "user-123", "email": "dev@test.com", "user_metadata": {"tier": "pro"}})
result = auth_service.verify_jwt(token)

assert result["user_id"] == "user-123"
assert result["email"] == "dev@test.com"
assert result["metadata"]["tier"] == "pro"

def test_bearer_prefix_stripped(self, auth_service):
token = _make_token({"sub": "user-456", "email": "a@b.com"})
result = auth_service.verify_jwt(f"Bearer {token}")

assert result["user_id"] == "user-456"

def test_expired_token_raises_401(self, auth_service):
token = _make_token({"sub": "user-789", "exp": int(time.time()) - 60})

from fastapi import HTTPException
with pytest.raises(HTTPException) as exc:
auth_service.verify_jwt(token)
assert exc.value.status_code == 401
assert "expired" in exc.value.detail.lower()

def test_wrong_secret_raises_401(self, auth_service):
token = _make_token({"sub": "user-000"}, secret="wrong-secret")

from fastapi import HTTPException
with pytest.raises(HTTPException) as exc:
auth_service.verify_jwt(token)
assert exc.value.status_code == 401

def test_missing_sub_claim_raises_401(self, auth_service):
token = _make_token({"email": "no-sub@test.com"})

from fastapi import HTTPException
with pytest.raises(HTTPException) as exc:
auth_service.verify_jwt(token)
assert exc.value.status_code == 401
assert "subject" in exc.value.detail.lower()

def test_wrong_audience_raises_401(self, auth_service):
payload = {"sub": "user-aud", "aud": "wrong-audience", "exp": int(time.time()) + 3600}
token = pyjwt.encode(payload, JWT_SECRET, algorithm="HS256")

from fastapi import HTTPException
with pytest.raises(HTTPException) as exc:
auth_service.verify_jwt(token)
assert exc.value.status_code == 401

def test_no_network_call_made(self, auth_service):
"""The whole point of OPE-75: verify_jwt should NOT hit the network."""
token = _make_token({"sub": "user-net", "email": "net@test.com"})

auth_service.verify_jwt(token)

# get_user should never be called when jwt_secret is available
auth_service.client.auth.get_user.assert_not_called()

def test_metadata_defaults_to_empty_dict(self, auth_service):
token = _make_token({"sub": "user-no-meta", "email": "x@y.com"})
result = auth_service.verify_jwt(token)

assert result["metadata"] == {}

def test_metadata_null_coalesced_to_empty_dict(self, auth_service):
"""user_metadata can be explicitly null in Supabase JWTs."""
token = _make_token({"sub": "user-null-meta", "user_metadata": None})
result = auth_service.verify_jwt(token)

assert result["metadata"] == {}

def test_bearer_prefix_case_insensitive(self, auth_service):
token = _make_token({"sub": "user-case"})
for prefix in ["Bearer ", "bearer ", "BEARER "]:
result = auth_service.verify_jwt(f"{prefix}{token}")
assert result["user_id"] == "user-case"


class TestAPIFallback:
"""When JWT secret is not configured, fall back to Supabase API."""

def test_falls_back_to_api_when_no_secret(self):
with patch("services.auth.create_client") as mock_client:
client = MagicMock()
user = MagicMock()
user.id = "api-user-123"
user.email = "api@test.com"
user.user_metadata = {"tier": "free"}
response = MagicMock()
response.user = user
client.auth.get_user.return_value = response
mock_client.return_value = client

with patch.dict("os.environ", {
"SUPABASE_URL": "https://test.supabase.co",
"SUPABASE_ANON_KEY": "test-key",
"SUPABASE_JWT_SECRET": "",
}):
from services.auth import SupabaseAuthService
service = SupabaseAuthService()
result = service.verify_jwt("some-token")

assert result["user_id"] == "api-user-123"
client.auth.get_user.assert_called_once_with("some-token")