-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
Contexto
Com a adoção do MCP Context Forge como gateway central para federação de agents, precisamos definir e implementar a arquitetura de autenticação que suporte:
- Autenticação do usuário na borda (Context Forge)
- Propagação de identidade para os agents especialistas
- Compatibilidade com agents que também funcionam standalone
Arquitetura Proposta
┌────────────────────────────────────────────────────────────────────────────┐
│ FLUXO DE AUTENTICAÇÃO │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ OAuth 2.1 ┌──────────────────────────────────┐ │
│ │ Cliente │ (Cognito + Fluig) │ MCP Context Forge │ │
│ │ ├──────────────────────►│ │ │
│ └─────────┘ │ 1. Valida JWT Cognito │ │
│ │ 2. Busca permissões (TCloud API)│ │
│ │ 3. Cacheia no Redis │ │
│ │ 4. Propaga headers p/ agents │ │
│ └───────────────┬──────────────────┘ │
│ │ │
│ Headers propagados: │ │
│ X-User-Email │ │
│ X-User-Customers │ │
│ ▼ │
│ ┌──────────────────────────────────┐ │
│ │ Agent Orquestrador │ │
│ └───────────────┬──────────────────┘ │
│ │ │
│ ┌───────────────┬───────────────┴───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ CPU/RAM │ │ DB │ ... │ App │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Solução: Plugin de Autenticação para Context Forge
O Context Forge suporta plugins de autenticação via hook http_auth_resolve_user. Vamos criar um plugin customizado.
Estrutura do Plugin
plugins/tcloud_cognito_auth/
├── __init__.py
├── plugin-manifest.yaml
├── tcloud_cognito_auth.py # Plugin principal
├── cognito.py # Validação JWT Cognito
├── tcloud_api.py # Client TCloud API
├── tests/
│ └── test_plugin.py
└── README.md
Fluxo do Plugin
┌─────────────────────────────────────────────────────────────────────────────┐
│ Plugin: TCloudCognitoAuthPlugin │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Hook: HTTP_AUTH_RESOLVE_USER │
│ │
│ 1. Extrai Bearer token do header Authorization │
│ 2. Valida JWT com Cognito JWKS │
│ - Verifica issuer (Cognito User Pool) │
│ - Verifica expiração │
│ - Verifica client_id │
│ 3. Extrai email do usuário (do username ou claim) │
│ 4. Busca permissões na TCloud API (com cache Redis) │
│ - GET /customer → lista de cloud_ids │
│ - Cache TTL: 5 minutos │
│ 5. Retorna user dict + metadata com permissões │
│ │
│ Resultado: │
│ - modified_payload: {email, full_name, is_admin, is_active} │
│ - metadata: {auth_method: "cognito", customers: [...]} │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Código do Plugin
# plugins/tcloud_cognito_auth/tcloud_cognito_auth.py
from mcpgateway.plugins.framework import (
Plugin, PluginConfig, PluginContext, PluginResult,
HttpAuthResolveUserPayload, HttpHeaderPayload,
PluginViolation, PluginViolationError,
)
import aiohttp
import jwt
from datetime import datetime, timezone
class TCloudCognitoAuthPlugin(Plugin):
"""Authenticate users via Cognito + TCloud API permissions."""
def __init__(self, config: PluginConfig):
super().__init__(config)
self.cognito_region = config.config.get("cognito_region")
self.cognito_user_pool_id = config.config.get("cognito_user_pool_id")
self.cognito_client_id = config.config.get("cognito_client_id")
self.tcloud_api_url = config.config.get("tcloud_api_url")
self.tcloud_api_key = config.config.get("tcloud_api_key")
self._jwks_cache = None
self._session = None
async def initialize(self):
"""Called when plugin is loaded."""
self._session = aiohttp.ClientSession()
await self._get_cognito_jwks() # Pre-fetch JWKS
async def shutdown(self):
"""Called when plugin is unloaded."""
if self._session:
await self._session.close()
async def http_auth_resolve_user(
self,
payload: HttpAuthResolveUserPayload,
context: PluginContext
) -> PluginResult[dict]:
"""Authenticate via Cognito JWT and fetch TCloud permissions."""
if not payload.credentials:
return PluginResult(continue_processing=True)
token = payload.credentials.get("credentials")
if not token:
return PluginResult(continue_processing=True)
try:
# 1. Validate JWT with Cognito
claims = await self._validate_cognito_jwt(token)
email = self._extract_email(claims)
# 2. Fetch TCloud permissions (with Redis cache)
customers = await self._get_user_customers(token, context)
self.logger.info(f"Authenticated user {email} with {len(customers)} customers")
# 3. Return authenticated user with permissions
return PluginResult(
modified_payload={
"email": email,
"full_name": claims.get("name", email),
"is_admin": False,
"is_active": True,
},
metadata={
"auth_method": "cognito",
"customers": customers,
"cognito_sub": claims.get("sub"),
},
continue_processing=True,
)
except jwt.ExpiredSignatureError:
raise PluginViolationError(
message="Token expired",
violation=PluginViolation(reason="Token expired", code="TOKEN_EXPIRED")
)
except jwt.InvalidTokenError as e:
raise PluginViolationError(
message=f"Invalid token: {e}",
violation=PluginViolation(reason="Invalid token", code="INVALID_TOKEN")
)
except Exception as e:
self.logger.error(f"Auth error: {e}")
return PluginResult(continue_processing=True)
async def _validate_cognito_jwt(self, token: str) -> dict:
"""Validate JWT against Cognito JWKS."""
jwks = await self._get_cognito_jwks()
header = jwt.get_unverified_header(token)
kid = header.get("kid")
key = None
for k in jwks["keys"]:
if k["kid"] == kid:
key = jwt.algorithms.RSAAlgorithm.from_jwk(k)
break
if not key:
raise jwt.InvalidTokenError("Key not found in JWKS")
issuer = f"https://cognito-idp.{self.cognito_region}.amazonaws.com/{self.cognito_user_pool_id}"
claims = jwt.decode(
token, key, algorithms=["RS256"],
issuer=issuer, options={"verify_aud": False}
)
# Verify client_id for access tokens
if claims.get("token_use") == "access":
if claims.get("client_id") != self.cognito_client_id:
raise jwt.InvalidTokenError("Invalid client_id")
return claims
async def _get_cognito_jwks(self) -> dict:
"""Fetch and cache Cognito JWKS."""
if self._jwks_cache:
return self._jwks_cache
url = f"https://cognito-idp.{self.cognito_region}.amazonaws.com/{self.cognito_user_pool_id}/.well-known/jwks.json"
async with self._session.get(url) as resp:
self._jwks_cache = await resp.json()
return self._jwks_cache
async def _get_user_customers(self, token: str, context: PluginContext) -> list:
"""Fetch user's customers from TCloud API with Redis cache."""
import hashlib
cache_key = f"tcloud:customers:{hashlib.sha256(token.encode()).hexdigest()[:16]}"
# Try cache (via Context Forge Redis)
try:
from mcpgateway.cache import redis_cache
cached = await redis_cache.get(cache_key)
if cached:
return cached
except Exception:
pass
# Fetch from TCloud API
headers = {
"Authorization": f"Bearer {token}",
"x-api-key": self.tcloud_api_key,
}
async with self._session.get(f"{self.tcloud_api_url}/customer", headers=headers) as resp:
if resp.status != 200:
self.logger.warning(f"TCloud API error: {resp.status}")
return []
data = await resp.json()
customers = [c.get("cloud_id") for c in data if c.get("cloud_id")]
# Cache for 5 minutes
try:
from mcpgateway.cache import redis_cache
await redis_cache.set(cache_key, customers, ttl=300)
except Exception:
pass
return customers
def _extract_email(self, claims: dict) -> str:
"""Extract email from Cognito claims."""
if claims.get("token_use") == "access":
username = claims.get("username", "")
return username.split("_", 1)[1] if "_" in username else username
return claims.get("email", claims.get("sub"))Configuração do Plugin
# infrastructure/context-forge/plugins/config.yaml
plugins:
- name: "TCloudCognitoAuthPlugin"
kind: "plugins.tcloud_cognito_auth.tcloud_cognito_auth.TCloudCognitoAuthPlugin"
description: "Authenticate via Cognito + TCloud API permissions"
version: "1.0.0"
author: "TCloud Team"
hooks: ["http_auth_resolve_user"]
mode: "enforce"
priority: 10
config:
cognito_region: "sa-east-1"
cognito_user_pool_id: "${COGNITO_USER_POOL_ID}"
cognito_client_id: "${COGNITO_APP_CLIENT_ID}"
tcloud_api_url: "${TCLOUD_API_URL}"
tcloud_api_key: "${TCLOUD_API_KEY}"Helm Values para Secrets
# infrastructure/context-forge/values-dev.yaml
mcpContextForge:
env:
# Cognito
COGNITO_REGION: "sa-east-1"
COGNITO_USER_POOL_ID: "sa-east-1_xxx"
COGNITO_APP_CLIENT_ID: "xxx"
# TCloud API
TCLOUD_API_URL: "https://api.tcloud.cloudtotvs.com.br/dev"
# Secrets via Kubernetes Secret
secretEnv:
- name: TCLOUD_API_KEY
secretName: tcloud-api-credentials
secretKey: api-key
# Plugin config
plugins:
enabled: true
configFile: /app/plugins/config.yamlHeader Propagation para Agents
O Context Forge pode propagar headers para os agents downstream. Usar o plugin header_injector:
# plugins/config.yaml (adicional)
plugins:
# ... TCloudCognitoAuthPlugin ...
- name: "TCloudHeaderInjector"
kind: "plugins.header_injector.header_injector.HeaderInjectorPlugin"
hooks: ["tool_pre_invoke", "agent_pre_invoke"]
priority: 20
config:
inject_from_auth_metadata: true
headers:
X-User-Email: "${auth.email}"
X-User-Customers: "${auth.metadata.customers}"Headers Padronizados
| Header | Descrição | Exemplo |
|---|---|---|
X-User-Email |
Email do usuário autenticado | user@totvs.com.br |
X-User-Customers |
Lista de cloud_ids permitidos (JSON) | ["cloud_123", "cloud_456"] |
X-Request-ID |
ID único para tracing | uuid |
Impacto nos Agents
Os agents precisam suportar dual-mode authentication:
- Via Context Forge: Lê permissões dos headers
X-User-* - Standalone: Valida OAuth e busca permissões na TCloud API
Ver: tcloud-dev/tcloud-watch-mcp-server#2
Tarefas
- Criar estrutura do plugin em
plugins/tcloud_cognito_auth/ - Implementar validação JWT Cognito
- Implementar integração com TCloud API
- Configurar cache Redis para permissões
- Criar testes unitários
- Atualizar Helm values com novas env vars
- Criar Kubernetes Secret para TCLOUD_API_KEY
- Testar fluxo end-to-end com tcloud-watch-mcp-server
- Documentar em
docs/authentication.md
Referências
- Context Forge HTTP Auth Hooks
- Context Forge Plugin Framework
- tcloud-dev/tcloud-watch-mcp-server#2 (Dual-mode auth no CPU/RAM agent)
Metadata
Metadata
Assignees
Labels
No labels