Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,252 changes: 2,118 additions & 134 deletions README.md

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ services:
- db
- sentinel
- datadog-agent
- redis
volumes:
- ./services/gateway/app:/app/app
- dogstatsd-socket:/var/run/datadog
Expand Down Expand Up @@ -196,6 +197,26 @@ services:
timeout: 5s
retries: 5

# Redis - Rate Limiting & Caching
redis:
image: redis:7-alpine
restart: always
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- clestiq-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
labels:
com.datadoghq.tags.service: "clestiq-shield-redis"
com.datadoghq.tags.env: "development"
com.datadoghq.tags.version: "7.0.0"

# Datadog Agent
datadog-agent:
image: gcr.io/datadoghq/agent:latest
Expand Down Expand Up @@ -257,3 +278,4 @@ networks:
volumes:
postgres_data:
dogstatsd-socket:
redis_data:
23 changes: 20 additions & 3 deletions services/gateway/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,32 @@ async def get_api_key(
result = await db.execute(
select(ApiKey)
.options(selectinload(ApiKey.application))
.filter(ApiKey.key_hash == hashed_key, ApiKey.is_active)
.filter(ApiKey.key_hash == hashed_key)
)
api_key_obj = result.scalars().first()

if not api_key_obj or not api_key_obj.application:
logger.warning("Authentication failed", api_key_prefix=api_key[:4] + "...")
if not api_key_obj:
logger.warning(
"Authentication failed: Key not found", api_key_prefix=api_key[:4] + "..."
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key",
)

if not api_key_obj.is_active:
logger.warning(
"Authentication failed: Key disabled", api_key_prefix=api_key[:4] + "..."
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API Key blocked by application",
)

if not api_key_obj.application:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key (No App)",
)

return api_key_obj
58 changes: 58 additions & 0 deletions services/gateway/app/api/v1/endpoints/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
TokenUsage,
)
from app.core.telemetry import telemetry
from app.main import rate_limiter

router = APIRouter()
logger = structlog.get_logger()
Expand Down Expand Up @@ -64,6 +65,51 @@ async def chat_request(
moderation=body.moderation,
)

# --- RATE LIMIT CHECK (Token Usage) ---
# Convert UUID to str for Redis key
key_id = str(api_key.id)
# 1. Check if disabled (handled by deps.get_api_key/DB, but we double check or just trust DB)

# 2. Check Token Limit: 10k per 5 mins (300s)
token_limit_key = f"rate:tokens:{key_id}"
TOKEN_LIMIT = 5000
TOKEN_WINDOW = 300

is_allowed = await rate_limiter.check_current_usage(token_limit_key, TOKEN_LIMIT)
if not is_allowed:
# Check penalties
violation_key = f"rate:violations:{key_id}"
VIOLATION_WINDOW = 1200 # 20 mins

violations = await rate_limiter.record_violation(
violation_key, VIOLATION_WINDOW
)
logger.warning("Rate limit exceeded", key_id=key_id, violations=violations)

if violations >= 2:
# DISABLE KEY
logger.critical(
"Disabling API Key due to repeated violations",
key_id=key_id,
app_id=str(current_app.id),
)
api_key.is_active = False
await db.commit()

telemetry.increment(
"clestiq.gateway.keys_disabled", tags=[f"app:{current_app.name}"]
)

raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API Key disabled due to repeated rate limit violations",
)

raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Token rate limit exceeded (10k tokens / 5 mins)",
)

# Get client info
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("user-agent")
Expand Down Expand Up @@ -213,6 +259,9 @@ async def chat_request(
from sqlalchemy import func

api_key.last_used_at = func.now()
api_key.last_used_at = func.now()
if api_key.request_count is None:
api_key.request_count = 0
api_key.request_count += 1

# Update usage_data JSON
Expand Down Expand Up @@ -272,6 +321,15 @@ async def chat_request(
tags=[f"app:{current_app.name}", f"model:{model_used}", "type:total"],
)

# --- UPDATE RATE LIMITER ---
# Increment token usage
# We count total tokens (input + output)
total_tokens_used = response_metrics.token_usage.total_tokens
if total_tokens_used > 0:
await rate_limiter.increment_and_check(
token_limit_key, total_tokens_used, TOKEN_LIMIT, TOKEN_WINDOW
)

# 4. Tokens Saved (Efficiency)
if response_metrics.tokens_saved > 0:
telemetry.increment(
Expand Down
65 changes: 64 additions & 1 deletion services/gateway/app/api/v1/endpoints/router_eagleeye.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,31 @@
import httpx
import structlog
from app.core.config import get_settings
from typing import Any
from jose import jwt, JWTError
from app.main import rate_limiter

router = APIRouter()
logger = structlog.get_logger()
settings = get_settings()

EAGLE_EYE_URL = "http://eagle-eye:8003"
APP_CREATION_LIMIT = 2
APP_CREATION_WINDOW = 600 # 10 mins
KEY_CREATION_LIMIT = 4
KEY_CREATION_WINDOW = 600 # 10 mins


def get_user_id_from_token(auth_header: str) -> str | None:
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ")[1]
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
return payload.get("sub")
except JWTError:
return None


async def _proxy_request(request: Request, path: str):
Expand Down Expand Up @@ -53,4 +71,49 @@ async def _proxy_request(request: Request, path: str):
"/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
)
async def proxy_eagle_eye(request: Request, path: str):
# --- RATE LIMIT CHECK (Resource Creation) ---
if request.method == "POST":
user_id = get_user_id_from_token(request.headers.get("Authorization"))

if user_id:
# Check App Creation: POST /api/v1/apps (or just /apps if mounted there)
# The path arg here comes from mount, so if mounted at /api/v1/apps, path might be empty or "/"
# If mounted at /api/v1, path might be "apps"
# Let's inspect the full URL path to be safe, or relying on the fact that this router handles specific mounts.

full_path = request.url.path

# 1. App Creation
# Path ends with /apps or /apps/
if full_path.endswith("/apps") or full_path.endswith("/apps/"):
limit_key = f"rate:apps_created:{user_id}"
allowed = await rate_limiter.check_limit(
limit_key, APP_CREATION_LIMIT, APP_CREATION_WINDOW
)
if not allowed:
logger.warning("App creation limit exceeded", user_id=user_id)
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="App creation limit exceeded (2 apps / 10 mins)",
)

# 2. Key Creation
# Path matches .../apps/{app_id}/keys
if (
"/keys" in full_path and not "/keys/" in full_path
): # simplistic check for collection POST
# Better check: split by /
parts = full_path.split("/")
if parts[-1] == "keys" and parts[-3] == "apps":
limit_key = f"rate:keys_created:{user_id}"
allowed = await rate_limiter.check_limit(
limit_key, KEY_CREATION_LIMIT, KEY_CREATION_WINDOW
)
if not allowed:
logger.warning("Key creation limit exceeded", user_id=user_id)
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="API Key creation limit exceeded (4 keys / 10 mins)",
)

return await _proxy_request(request, request.url.path.replace("/api/v1", ""))
5 changes: 5 additions & 0 deletions services/gateway/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ class Settings(BaseSettings):

# Database
DATABASE_URL: str
REDIS_URL: str = "redis://redis:6379/0"

# Security
SECRET_KEY: str = "change_this_to_a_strong_secret_key"
ALGORITHM: str = "HS256"

# Datadog APM
TELEMETRY_ENABLED: bool = True
Expand Down
89 changes: 89 additions & 0 deletions services/gateway/app/core/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import redis.asyncio as redis
from app.core.config import get_settings
import structlog

logger = structlog.get_logger()
settings = get_settings()


class RateLimiter:
def __init__(self):
self.redis = redis.from_url(
settings.REDIS_URL, encoding="utf-8", decode_responses=True
)

async def close(self):
await self.redis.close()

async def check_limit(self, key: str, limit: int, window_seconds: int) -> bool:
"""
Check if a limit has been exceeded.
Returns True if request is allowed, False if limit exceeded.
"""
try:
current = await self.redis.get(key)
if current and int(current) >= limit:
return False

# Use a transaction (pipeline) to ensure atomicity
pipe = self.redis.pipeline()
pipe.incr(key)
if not current:
pipe.expire(key, window_seconds)
await pipe.execute()

return True
except Exception as e:
logger.error("Rate limiter error", error=str(e))
# In case of Redis failure, we default to allowing traffic to avoid outage
return True

async def increment_and_check(
self, key: str, amount: int, limit: int, window_seconds: int
) -> bool:
"""
Increment a counter by 'amount' and check if it exceeds 'limit'.
Used for token usage.
Returns True if request is allowed (after increment), False if limit exceeded.
"""
try:
# Simple INCRBY first
current = await self.redis.incrby(key, amount)

# If it was a new key (or expired), set expiration
if current == amount:
await self.redis.expire(key, window_seconds)

if current > limit:
return False
return True
except Exception as e:
logger.error("Rate limiter error", error=str(e))
return True

async def check_current_usage(self, key: str, limit: int) -> bool:
"""
Check usage without incrementing.
Returns True if usage < limit.
"""
try:
current = await self.redis.get(key)
if current and int(current) >= limit:
return False
return True
except Exception as e:
logger.error("Rate limiter error", error=str(e))
return True

async def record_violation(self, key: str, window_seconds: int) -> int:
"""
Record a violation. Returns the new violation count.
"""
try:
count = await self.redis.incr(key)
if count == 1:
await self.redis.expire(key, window_seconds)
return count
except Exception as e:
logger.error("Rate limiter error", error=str(e))
return 0
Loading