Skip to content

feat(auth): Arquitetura de autenticação para agents federados #2

@vavasilva

Description

@vavasilva

Contexto

Com a adoção do MCP Context Forge como gateway central para federação de agents, precisamos definir e implementar a arquitetura de autenticação que suporte:

  1. Autenticação do usuário na borda (Context Forge)
  2. Propagação de identidade para os agents especialistas
  3. Compatibilidade com agents que também funcionam standalone

Arquitetura Proposta

┌────────────────────────────────────────────────────────────────────────────┐
│                           FLUXO DE AUTENTICAÇÃO                            │
├────────────────────────────────────────────────────────────────────────────┤
│                                                                            │
│  ┌─────────┐      OAuth 2.1        ┌──────────────────────────────────┐   │
│  │ Cliente │  (Cognito + Fluig)    │       MCP Context Forge          │   │
│  │         ├──────────────────────►│                                  │   │
│  └─────────┘                       │  1. Valida JWT Cognito           │   │
│                                    │  2. Busca permissões (TCloud API)│   │
│                                    │  3. Cacheia no Redis             │   │
│                                    │  4. Propaga headers p/ agents    │   │
│                                    └───────────────┬──────────────────┘   │
│                                                    │                      │
│                              Headers propagados:   │                      │
│                              X-User-Email          │                      │
│                              X-User-Customers      │                      │
│                                                    ▼                      │
│                                    ┌──────────────────────────────────┐   │
│                                    │      Agent Orquestrador          │   │
│                                    └───────────────┬──────────────────┘   │
│                                                    │                      │
│                    ┌───────────────┬───────────────┴───────────────┐      │
│                    ▼               ▼                               ▼      │
│              ┌──────────┐   ┌──────────┐                    ┌──────────┐  │
│              │ CPU/RAM  │   │    DB    │        ...         │   App    │  │
│              │  Agent   │   │  Agent   │                    │  Agent   │  │
│              └──────────┘   └──────────┘                    └──────────┘  │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Solução: Plugin de Autenticação para Context Forge

O Context Forge suporta plugins de autenticação via hook http_auth_resolve_user. Vamos criar um plugin customizado.

Estrutura do Plugin

plugins/tcloud_cognito_auth/
├── __init__.py
├── plugin-manifest.yaml
├── tcloud_cognito_auth.py    # Plugin principal
├── cognito.py                # Validação JWT Cognito
├── tcloud_api.py             # Client TCloud API
├── tests/
│   └── test_plugin.py
└── README.md

Fluxo do Plugin

┌─────────────────────────────────────────────────────────────────────────────┐
│                      Plugin: TCloudCognitoAuthPlugin                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  Hook: HTTP_AUTH_RESOLVE_USER                                               │
│                                                                             │
│  1. Extrai Bearer token do header Authorization                             │
│  2. Valida JWT com Cognito JWKS                                             │
│     - Verifica issuer (Cognito User Pool)                                   │
│     - Verifica expiração                                                    │
│     - Verifica client_id                                                    │
│  3. Extrai email do usuário (do username ou claim)                          │
│  4. Busca permissões na TCloud API (com cache Redis)                        │
│     - GET /customer → lista de cloud_ids                                    │
│     - Cache TTL: 5 minutos                                                  │
│  5. Retorna user dict + metadata com permissões                             │
│                                                                             │
│  Resultado:                                                                 │
│  - modified_payload: {email, full_name, is_admin, is_active}               │
│  - metadata: {auth_method: "cognito", customers: [...]}                    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Código do Plugin

# plugins/tcloud_cognito_auth/tcloud_cognito_auth.py

from mcpgateway.plugins.framework import (
    Plugin, PluginConfig, PluginContext, PluginResult,
    HttpAuthResolveUserPayload, HttpHeaderPayload,
    PluginViolation, PluginViolationError,
)
import aiohttp
import jwt
from datetime import datetime, timezone

class TCloudCognitoAuthPlugin(Plugin):
    """Authenticate users via Cognito + TCloud API permissions."""
    
    def __init__(self, config: PluginConfig):
        super().__init__(config)
        self.cognito_region = config.config.get("cognito_region")
        self.cognito_user_pool_id = config.config.get("cognito_user_pool_id")
        self.cognito_client_id = config.config.get("cognito_client_id")
        self.tcloud_api_url = config.config.get("tcloud_api_url")
        self.tcloud_api_key = config.config.get("tcloud_api_key")
        self._jwks_cache = None
        self._session = None
    
    async def initialize(self):
        """Called when plugin is loaded."""
        self._session = aiohttp.ClientSession()
        await self._get_cognito_jwks()  # Pre-fetch JWKS
    
    async def shutdown(self):
        """Called when plugin is unloaded."""
        if self._session:
            await self._session.close()
    
    async def http_auth_resolve_user(
        self,
        payload: HttpAuthResolveUserPayload,
        context: PluginContext
    ) -> PluginResult[dict]:
        """Authenticate via Cognito JWT and fetch TCloud permissions."""
        
        if not payload.credentials:
            return PluginResult(continue_processing=True)
        
        token = payload.credentials.get("credentials")
        if not token:
            return PluginResult(continue_processing=True)
        
        try:
            # 1. Validate JWT with Cognito
            claims = await self._validate_cognito_jwt(token)
            email = self._extract_email(claims)
            
            # 2. Fetch TCloud permissions (with Redis cache)
            customers = await self._get_user_customers(token, context)
            
            self.logger.info(f"Authenticated user {email} with {len(customers)} customers")
            
            # 3. Return authenticated user with permissions
            return PluginResult(
                modified_payload={
                    "email": email,
                    "full_name": claims.get("name", email),
                    "is_admin": False,
                    "is_active": True,
                },
                metadata={
                    "auth_method": "cognito",
                    "customers": customers,
                    "cognito_sub": claims.get("sub"),
                },
                continue_processing=True,
            )
            
        except jwt.ExpiredSignatureError:
            raise PluginViolationError(
                message="Token expired",
                violation=PluginViolation(reason="Token expired", code="TOKEN_EXPIRED")
            )
        except jwt.InvalidTokenError as e:
            raise PluginViolationError(
                message=f"Invalid token: {e}",
                violation=PluginViolation(reason="Invalid token", code="INVALID_TOKEN")
            )
        except Exception as e:
            self.logger.error(f"Auth error: {e}")
            return PluginResult(continue_processing=True)
    
    async def _validate_cognito_jwt(self, token: str) -> dict:
        """Validate JWT against Cognito JWKS."""
        jwks = await self._get_cognito_jwks()
        header = jwt.get_unverified_header(token)
        kid = header.get("kid")
        
        key = None
        for k in jwks["keys"]:
            if k["kid"] == kid:
                key = jwt.algorithms.RSAAlgorithm.from_jwk(k)
                break
        
        if not key:
            raise jwt.InvalidTokenError("Key not found in JWKS")
        
        issuer = f"https://cognito-idp.{self.cognito_region}.amazonaws.com/{self.cognito_user_pool_id}"
        
        claims = jwt.decode(
            token, key, algorithms=["RS256"],
            issuer=issuer, options={"verify_aud": False}
        )
        
        # Verify client_id for access tokens
        if claims.get("token_use") == "access":
            if claims.get("client_id") != self.cognito_client_id:
                raise jwt.InvalidTokenError("Invalid client_id")
        
        return claims
    
    async def _get_cognito_jwks(self) -> dict:
        """Fetch and cache Cognito JWKS."""
        if self._jwks_cache:
            return self._jwks_cache
        
        url = f"https://cognito-idp.{self.cognito_region}.amazonaws.com/{self.cognito_user_pool_id}/.well-known/jwks.json"
        async with self._session.get(url) as resp:
            self._jwks_cache = await resp.json()
            return self._jwks_cache
    
    async def _get_user_customers(self, token: str, context: PluginContext) -> list:
        """Fetch user's customers from TCloud API with Redis cache."""
        import hashlib
        cache_key = f"tcloud:customers:{hashlib.sha256(token.encode()).hexdigest()[:16]}"
        
        # Try cache (via Context Forge Redis)
        try:
            from mcpgateway.cache import redis_cache
            cached = await redis_cache.get(cache_key)
            if cached:
                return cached
        except Exception:
            pass
        
        # Fetch from TCloud API
        headers = {
            "Authorization": f"Bearer {token}",
            "x-api-key": self.tcloud_api_key,
        }
        
        async with self._session.get(f"{self.tcloud_api_url}/customer", headers=headers) as resp:
            if resp.status != 200:
                self.logger.warning(f"TCloud API error: {resp.status}")
                return []
            
            data = await resp.json()
            customers = [c.get("cloud_id") for c in data if c.get("cloud_id")]
            
            # Cache for 5 minutes
            try:
                from mcpgateway.cache import redis_cache
                await redis_cache.set(cache_key, customers, ttl=300)
            except Exception:
                pass
            
            return customers
    
    def _extract_email(self, claims: dict) -> str:
        """Extract email from Cognito claims."""
        if claims.get("token_use") == "access":
            username = claims.get("username", "")
            return username.split("_", 1)[1] if "_" in username else username
        return claims.get("email", claims.get("sub"))

Configuração do Plugin

# infrastructure/context-forge/plugins/config.yaml
plugins:
  - name: "TCloudCognitoAuthPlugin"
    kind: "plugins.tcloud_cognito_auth.tcloud_cognito_auth.TCloudCognitoAuthPlugin"
    description: "Authenticate via Cognito + TCloud API permissions"
    version: "1.0.0"
    author: "TCloud Team"
    hooks: ["http_auth_resolve_user"]
    mode: "enforce"
    priority: 10
    config:
      cognito_region: "sa-east-1"
      cognito_user_pool_id: "${COGNITO_USER_POOL_ID}"
      cognito_client_id: "${COGNITO_APP_CLIENT_ID}"
      tcloud_api_url: "${TCLOUD_API_URL}"
      tcloud_api_key: "${TCLOUD_API_KEY}"

Helm Values para Secrets

# infrastructure/context-forge/values-dev.yaml
mcpContextForge:
  env:
    # Cognito
    COGNITO_REGION: "sa-east-1"
    COGNITO_USER_POOL_ID: "sa-east-1_xxx"
    COGNITO_APP_CLIENT_ID: "xxx"
    # TCloud API
    TCLOUD_API_URL: "https://api.tcloud.cloudtotvs.com.br/dev"
  
  # Secrets via Kubernetes Secret
  secretEnv:
    - name: TCLOUD_API_KEY
      secretName: tcloud-api-credentials
      secretKey: api-key
  
  # Plugin config
  plugins:
    enabled: true
    configFile: /app/plugins/config.yaml

Header Propagation para Agents

O Context Forge pode propagar headers para os agents downstream. Usar o plugin header_injector:

# plugins/config.yaml (adicional)
plugins:
  # ... TCloudCognitoAuthPlugin ...
  
  - name: "TCloudHeaderInjector"
    kind: "plugins.header_injector.header_injector.HeaderInjectorPlugin"
    hooks: ["tool_pre_invoke", "agent_pre_invoke"]
    priority: 20
    config:
      inject_from_auth_metadata: true
      headers:
        X-User-Email: "${auth.email}"
        X-User-Customers: "${auth.metadata.customers}"

Headers Padronizados

Header Descrição Exemplo
X-User-Email Email do usuário autenticado user@totvs.com.br
X-User-Customers Lista de cloud_ids permitidos (JSON) ["cloud_123", "cloud_456"]
X-Request-ID ID único para tracing uuid

Impacto nos Agents

Os agents precisam suportar dual-mode authentication:

  1. Via Context Forge: Lê permissões dos headers X-User-*
  2. Standalone: Valida OAuth e busca permissões na TCloud API

Ver: tcloud-dev/tcloud-watch-mcp-server#2

Tarefas

  • Criar estrutura do plugin em plugins/tcloud_cognito_auth/
  • Implementar validação JWT Cognito
  • Implementar integração com TCloud API
  • Configurar cache Redis para permissões
  • Criar testes unitários
  • Atualizar Helm values com novas env vars
  • Criar Kubernetes Secret para TCLOUD_API_KEY
  • Testar fluxo end-to-end com tcloud-watch-mcp-server
  • Documentar em docs/authentication.md

Referências

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions