From 45b643ee817b2416dc9eb658e38135eb12561d59 Mon Sep 17 00:00:00 2001 From: Robert Leonard Date: Mon, 25 Aug 2025 16:00:12 -0400 Subject: [PATCH] Simplify docs --- docs/examples/retrieving-results.md | 181 --------- docs/security/api-key-management.md | 147 ------- docs/security/data-privacy.md | 81 ---- docs/security/environment-variables.md | 78 ---- docs/security/rate-limiting.md | 541 ------------------------- 5 files changed, 1028 deletions(-) delete mode 100644 docs/security/data-privacy.md diff --git a/docs/examples/retrieving-results.md b/docs/examples/retrieving-results.md index c25615c..07a7a2d 100644 --- a/docs/examples/retrieving-results.md +++ b/docs/examples/retrieving-results.md @@ -127,187 +127,6 @@ accuracy = correct / len(all_results) if all_results else 0 print(f"Overall accuracy: {accuracy:.1%} ({correct:,}/{len(all_results):,})") ``` -### With Progress Tracking - -```python -from atlas import Atlas - -def get_all_results_with_progress(evaluation_id): - client = Atlas() - all_results = [] - page = 1 - page_size = 50 - - while True: - print(f"Fetching page {page}...") - - results_data = client.results.get_by_id( - evaluation_id=evaluation_id, - page=page, - page_size=page_size - ) - - if not results_data or not results_data.results: - break - - all_results.extend(results_data.results) - - # Show progress - if page == 1: - total_count = results_data.pagination.total_count - total_pages = results_data.pagination.total_pages - print(f"Total results: {total_count:,} across {total_pages} pages") - - progress = len(all_results) / results_data.pagination.total_count * 100 - print(f"Progress: {progress:.1f}% ({len(all_results):,} collected)") - - if page >= results_data.pagination.total_pages: - break - - page += 1 - - return all_results - -# Usage -results = get_all_results_with_progress("your_evaluation_id") -print(f"Done! Collected {len(results):,} results") -``` - -## Async Version - -```python -import asyncio -from atlas import AsyncAtlas - -async def get_results_async(evaluation_id): - client = AsyncAtlas() - - all_results = [] - page = 1 - - while True: - results_data = await client.results.get_by_id( - evaluation_id=evaluation_id, - page=page, - page_size=100 - ) - - if not results_data or not results_data.results: - break - - all_results.extend(results_data.results) - print(f"Page {page}: {len(results_data.results)} results") - - if page >= results_data.pagination.total_pages: - break - - page += 1 - - return all_results - -# Run it -results = asyncio.run(get_results_async("your_evaluation_id")) -print(f"Total: {len(results)} results") -``` - -## Complete Workflow - -```python -from atlas import Atlas - - client = Atlas() - -# 1. Create evaluation -models = client.models.get() -benchmarks = client.benchmarks.get() - -evaluation = client.evaluations.create( - model=models[0], - benchmark=benchmarks[0] -) -print(f"Created evaluation: {evaluation.id}") - -# 2. Wait for completion -print("Waiting for evaluation to complete...") -completed_evaluation = client.evaluations.wait_for_completion( - evaluation, - interval_seconds=30, - timeout_seconds=1800 # 30 minutes -) - -# 3. Get all results -if completed_evaluation.is_success: - print("Getting results...") - - all_results = [] - page = 1 - - while True: - results_data = client.results.get_by_id( - evaluation_id=completed_evaluation.id, - page=page, - page_size=100 - ) - - if not results_data or not results_data.results: - break - - all_results.extend(results_data.results) - if page >= results_data.pagination.total_pages: - break - page += 1 - - # 4. Analyze results - correct = sum(1 for r in all_results if r.score > 0.5) - accuracy = correct / len(all_results) - avg_score = sum(r.score for r in all_results) / len(all_results) - - print(f"Results: {len(all_results):,} total") - print(f"Accuracy: {accuracy:.1%}") - print(f"Average score: {avg_score:.3f}") -else: - print(f"Evaluation failed: {completed_evaluation.status}") -``` - -## Analyze Results by Subset - -```python -from atlas import Atlas -from collections import defaultdict - - client = Atlas() -evaluation_id = "your_evaluation_id" - -# Get all results -all_results = [] -page = 1 - -while True: - results_data = client.results.get_by_id(evaluation_id=evaluation_id, page=page) - if not results_data or not results_data.results: - break - all_results.extend(results_data.results) - if page >= results_data.pagination.total_pages: - break - page += 1 - -# Group by subset -subset_results = defaultdict(list) -for result in all_results: - subset_results[result.subset].append(result) - -# Analyze each subset -print(f"Analysis by subset:") -for subset, results in subset_results.items(): - correct = sum(1 for r in results if r.score > 0.5) - accuracy = correct / len(results) - avg_score = sum(r.score for r in results) / len(results) - - print(f" {subset}:") - print(f" Cases: {len(results)}") - print(f" Accuracy: {accuracy:.1%}") - print(f" Avg Score: {avg_score:.3f}") -``` ## Key Points diff --git a/docs/security/api-key-management.md b/docs/security/api-key-management.md index bc55cb0..8a27d83 100644 --- a/docs/security/api-key-management.md +++ b/docs/security/api-key-management.md @@ -95,150 +95,3 @@ client = Atlas() .env.*.local *.env ``` - -### Advanced Credential Management - -#### Using External Secret Managers - -**AWS Secrets Manager**: - -```python -import boto3 -import json -from atlas import Atlas - -def get_atlas_credentials_from_aws(): - """Retrieve Atlas credentials from AWS Secrets Manager""" - session = boto3.session.Session() - client = session.client('secretsmanager', region_name='us-east-1') - - try: - response = client.get_secret_value(SecretId='layerlens/atlas/credentials') - secrets = json.loads(response['SecretString']) - - return { - 'api_key': secrets['api_key'], - } - except Exception as e: - print(f"Error retrieving secrets: {e}") - return None - -# Usage -credentials = get_atlas_credentials_from_aws() -if credentials: - client = Atlas(**credentials) -``` - -## Environment-Specific Key Management - -### Separating Development and Production Keys - -**Use different API keys for different environments**: - -```python -import os -from atlas import Atlas - -def get_atlas_client(): - """Get Atlas client based on environment""" - environment = os.getenv('ATLAS_ENV', 'development') - - if environment == 'development': - return Atlas( - api_key=os.getenv('DEV_ATLAS_API_KEY'), - base_url=os.getenv('DEV_ATLAS_BASE_URL') # Dev server if applicable - ) - elif environment == 'staging': - return Atlas( - api_key=os.getenv('STAGING_ATLAS_API_KEY'), - ) - elif environment == 'production': - return Atlas( - api_key=os.getenv('PROD_ATLAS_API_KEY'), - ) - else: - raise ValueError(f"Unknown environment: {environment}") - -# Usage -client = get_atlas_client() -``` - -**Environment-specific .env files**: - -```bash -# .env.development -DEV_ATLAS_API_KEY=sk-dev-key-here -DEV_ATLAS_BASE_URL=https://dev-api.layerlens.com - -# .env.production -PROD_ATLAS_API_KEY=sk-prod-key-here -``` - -### Container and Deployment Security - -**Docker Secrets**: - -```yaml -# docker-compose.yml -version: "3.8" - -services: - atlas-app: - image: your-app:latest - secrets: - - atlas_api_key - environment: - - LAYERLENS_ATLAS_API_KEY_FILE=/run/secrets/atlas_api_key - -secrets: - atlas_api_key: - file: ./secrets/atlas_api_key.txt -``` - -**Reading Docker secrets in Python**: - -```python -import os -from atlas import Atlas - -def read_docker_secret(secret_name): - """Read secret from Docker secrets file""" - secret_file = f"/run/secrets/{secret_name}" - try: - with open(secret_file, 'r') as f: - return f.read().strip() - except FileNotFoundError: - return None - -def get_atlas_client_from_docker_secrets(): - """Initialize Atlas client using Docker secrets""" - # Try Docker secrets first, fall back to environment variables - api_key = (read_docker_secret('atlas_api_key') or - os.getenv('LAYERLENS_ATLAS_API_KEY')) - - return Atlas(api_key=api_key) - -# Usage -client = get_atlas_client_from_docker_secrets() -``` - -## Security Checklist - -### Development Security Checklist - -- [ ] ✅ API keys stored in environment variables, not hardcoded -- [ ] ✅ `.env` files added to `.gitignore` -- [ ] ✅ Different API keys for development, staging, and production -- [ ] ✅ API key validation implemented before deployment -- [ ] ✅ Error handling doesn't expose API keys in logs -- [ ] ✅ Code review process includes credential security checks - -### Production Security Checklist - -- [ ] ✅ API keys stored in secure credential management system -- [ ] ✅ Key rotation schedule established and automated -- [ ] ✅ API usage monitoring and alerting configured -- [ ] ✅ Audit logging enabled for all API operations -- [ ] ✅ Network security controls (firewalls, VPNs) in place -- [ ] ✅ Least privilege access principles applied -- [ ] ✅ Incident response plan includes credential compromise scenarios diff --git a/docs/security/data-privacy.md b/docs/security/data-privacy.md deleted file mode 100644 index cf2953b..0000000 --- a/docs/security/data-privacy.md +++ /dev/null @@ -1,81 +0,0 @@ -# Data Privacy - -This guide covers data privacy considerations and best practices when using the Atlas Python SDK to ensure compliance with privacy regulations and protect sensitive information. - -## Overview - -When using the Atlas Python SDK, you may be handling sensitive data including: - -- **AI model outputs** and evaluation results -- **Prompt data** used in evaluations -- **API credentials** and authentication tokens -- **Organizational information** and project data -- **Usage patterns** and performance metrics - -Proper data privacy practices are essential for regulatory compliance and maintaining user trust. - -## Data Classification - -### Understanding Your Data Types - -**Public Data** ✅ (No privacy concerns): -- Model names and identifiers -- Benchmark names and types -- General evaluation statistics -- Documentation and configuration - -**Internal Data** ⚠️ (Moderate privacy): -- Evaluation results and scores -- Performance metrics -- Usage analytics -- System logs (without sensitive content) - -**Confidential Data** 🔒 (High privacy): -- API keys and credentials -- Custom prompts and datasets -- Proprietary model outputs -- Personal identifiable information (PII) - -**Restricted Data** 🚫 (Maximum privacy): -- Personal data under GDPR/CCPA -- Financial or healthcare information -- Trade secrets and intellectual property -- Customer data requiring special handling - -### Data Classification Example - -```python -from enum import Enum -from dataclasses import dataclass -from typing import Optional, List - -class DataClassification(Enum): - PUBLIC = "public" - INTERNAL = "internal" - CONFIDENTIAL = "confidential" - RESTRICTED = "restricted" - -@dataclass -class EvaluationDataMap: - """Map Atlas data types to privacy classifications""" - - model_name: DataClassification = DataClassification.PUBLIC - benchmark_name: DataClassification = DataClassification.PUBLIC - evaluation_scores: DataClassification = DataClassification.INTERNAL - model_outputs: DataClassification = DataClassification.CONFIDENTIAL - api_credentials: DataClassification = DataClassification.RESTRICTED - custom_prompts: DataClassification = DataClassification.CONFIDENTIAL - -def classify_atlas_data(): - """Example data classification for Atlas SDK usage""" - data_map = EvaluationDataMap() - - print("🔍 Atlas Data Classification:") - for field_name, field_value in data_map.__dict__.items(): - privacy_level = field_value.value - print(f" {field_name}: {privacy_level.upper()}") - - return data_map - -classify_atlas_data() -``` diff --git a/docs/security/environment-variables.md b/docs/security/environment-variables.md index b7b6f5e..1236903 100644 --- a/docs/security/environment-variables.md +++ b/docs/security/environment-variables.md @@ -129,81 +129,3 @@ try: except Exception as e: print(f"❌ Failed to initialize client: {e}") ``` - -### Environment-Specific .env Files - -**Create separate files for each environment**: - -**.env.development**: - -```bash -LAYERLENS_ATLAS_API_KEY=sk-dev-key-here -``` - -**.env.staging**: - -```bash -LAYERLENS_ATLAS_API_KEY=sk-staging-key-here -``` - -**.env.production**: - -```bash -LAYERLENS_ATLAS_API_KEY=sk-prod-key-here -``` - -**Load environment-specific configuration**: - -```python -import os -from dotenv import load_dotenv -from atlas import Atlas - -def load_environment_config(): - """Load environment-specific configuration""" - # Determine environment - env = os.getenv('ATLAS_ENV', 'development') - - # Load base .env file first - load_dotenv('.env') - - # Override with environment-specific file - env_file = f'.env.{env}' - if os.path.exists(env_file): - load_dotenv(env_file, override=True) - print(f"📄 Loaded configuration from {env_file}") - else: - print(f"⚠️ Environment file {env_file} not found, using base configuration") - - return env - -def get_atlas_client(): - """Get Atlas client with environment-specific configuration""" - env = load_environment_config() - - # Create client with loaded environment variables - client = Atlas() - - # Log configuration (without sensitive data) - print(f"🌍 Environment: {env}") - print(f"🔗 Base URL: {client.base_url}") - print(f"⏱️ Timeout: {client.timeout}s") - - return client - -# Usage -client = get_atlas_client() -``` - -## Security Best Practices - -### Environment Variable Security Checklist - -- [ ] ✅ No sensitive values hardcoded in source code -- [ ] ✅ .env files added to .gitignore -- [ ] ✅ Different credentials for each environment (dev/staging/prod) -- [ ] ✅ Environment variables validated before use -- [ ] ✅ Production secrets managed through secure systems (not .env files) -- [ ] ✅ Regular rotation of API keys -- [ ] ✅ Monitoring for credential exposure in logs -- [ ] ✅ Team members trained on secure credential handling diff --git a/docs/security/rate-limiting.md b/docs/security/rate-limiting.md index 1cfac05..bb4eab9 100644 --- a/docs/security/rate-limiting.md +++ b/docs/security/rate-limiting.md @@ -27,544 +27,3 @@ except atlas.RateLimitError as e: print(f"Status code: {e.status_code}") # 429 print(f"Response headers: {dict(e.response.headers)}") ``` - -### Rate Limit Headers - -The API response includes helpful headers: - -```python -import atlas -from atlas import Atlas - -def inspect_rate_limit_headers(error): - """Inspect rate limit headers from error response""" - headers = error.response.headers - - # Common rate limit headers - rate_limit_info = { - 'retry_after': headers.get('retry-after'), - 'x_ratelimit_limit': headers.get('x-ratelimit-limit'), - 'x_ratelimit_remaining': headers.get('x-ratelimit-remaining'), - 'x_ratelimit_reset': headers.get('x-ratelimit-reset'), - } - - print("Rate limit information:") - for key, value in rate_limit_info.items(): - if value: - print(f" {key}: {value}") - -try: - client = Atlas() - # ... make request that triggers rate limit - -except atlas.RateLimitError as e: - inspect_rate_limit_headers(e) -``` - -## Handling Rate Limits - -### Basic Retry with Backoff - -```python -import time -import random -import atlas -from atlas import Atlas - -def create_evaluation_with_retry(model: str, benchmark: str, max_retries: int = 3): - """Create evaluation with rate limit retry logic""" - client = Atlas() - - for attempt in range(max_retries): - try: - evaluation = client.evaluations.create(model=model, benchmark=benchmark) - - if evaluation: - print(f"✅ Success on attempt {attempt + 1}") - return evaluation - - except atlas.RateLimitError as e: - print(f"⏳ Rate limited on attempt {attempt + 1}") - - # Check if server provided retry-after header - retry_after = e.response.headers.get('retry-after') - - if retry_after: - wait_time = int(retry_after) - print(f" Server requests waiting {wait_time} seconds") - else: - # Exponential backoff with jitter - base_wait = 2 ** attempt - jitter = random.uniform(0, 1) - wait_time = base_wait + jitter - print(f" Using exponential backoff: {wait_time:.1f} seconds") - - if attempt < max_retries - 1: - time.sleep(wait_time) - else: - print(f"❌ Exhausted all {max_retries} retry attempts") - raise - - except atlas.APIError as e: - print(f"❌ Non-rate-limit error: {e}") - raise - - return None - -# Usage -evaluation = create_evaluation_with_retry("gpt-4", "mmlu") -``` - -### Advanced Retry Strategies - -#### Exponential Backoff with Jitter - -```python -import time -import random -import atlas -from atlas import Atlas - -class ExponentialBackoffRetry: - """Implement exponential backoff with jitter for rate limit handling""" - - def __init__(self, max_retries=5, base_delay=1.0, max_delay=60.0): - self.max_retries = max_retries - self.base_delay = base_delay - self.max_delay = max_delay - - def calculate_delay(self, attempt: int, retry_after: str = None) -> float: - """Calculate delay before next retry""" - - # If server provided retry-after, use that - if retry_after: - try: - return float(retry_after) - except (ValueError, TypeError): - pass - - # Exponential backoff: 2^attempt * base_delay - delay = self.base_delay * (2 ** attempt) - - # Add jitter to prevent thundering herd - jitter = delay * 0.1 * random.uniform(-1, 1) - delay += jitter - - # Cap at maximum delay - return min(delay, self.max_delay) - - def retry_operation(self, operation_func, *args, **kwargs): - """Retry operation with exponential backoff""" - - for attempt in range(self.max_retries): - try: - return operation_func(*args, **kwargs) - - except atlas.RateLimitError as e: - if attempt == self.max_retries - 1: - # Last attempt - re-raise the error - raise - - retry_after = e.response.headers.get('retry-after') - delay = self.calculate_delay(attempt, retry_after) - - print(f"⏳ Rate limited (attempt {attempt + 1}/{self.max_retries})") - print(f" Waiting {delay:.1f} seconds before retry...") - - time.sleep(delay) - continue - - except atlas.APIError as e: - # Don't retry other API errors - print(f"❌ Non-retryable error: {e}") - raise - -# Usage -backoff = ExponentialBackoffRetry(max_retries=5, base_delay=2.0, max_delay=120.0) - -def create_evaluation(): - client = Atlas() - return client.evaluations.create(model="gpt-4", benchmark="mmlu") - -evaluation = backoff.retry_operation(create_evaluation) -``` - - -## Proactive Rate Limit Management - -### Request Throttling - -```python -import time -from threading import Lock -from datetime import datetime, timedelta -import atlas -from atlas import Atlas - -class ThrottledAtlasClient: - """Atlas client with built-in request throttling""" - - def __init__(self, requests_per_minute=30, **client_kwargs): - self.client = Atlas(**client_kwargs) - self.requests_per_minute = requests_per_minute - self.min_interval = 60.0 / requests_per_minute # seconds between requests - self.last_request_time = None - self.lock = Lock() - - def _wait_for_next_request(self): - """Wait if necessary to maintain rate limit""" - with self.lock: - if self.last_request_time: - elapsed = time.time() - self.last_request_time - if elapsed < self.min_interval: - wait_time = self.min_interval - elapsed - print(f"⏳ Throttling: waiting {wait_time:.1f}s") - time.sleep(wait_time) - - self.last_request_time = time.time() - - def create_evaluation(self, *args, **kwargs): - """Create evaluation with throttling""" - self._wait_for_next_request() - return self.client.evaluations.create(*args, **kwargs) - - def get_results(self, *args, **kwargs): - """Get results with throttling""" - self._wait_for_next_request() - return self.client.results.get(*args, **kwargs) - -# Usage -throttled_client = ThrottledAtlasClient(requests_per_minute=20) - -# These requests will be automatically throttled -evaluations = [] -for i in range(10): - evaluation = throttled_client.create_evaluation( - model="gpt-4", - benchmark="mmlu" - ) - evaluations.append(evaluation) -``` - -### Batch Request Management - -```python -import time -from typing import List, Tuple, Callable, Any -from concurrent.futures import ThreadPoolExecutor, as_completed -import atlas -from atlas import Atlas - -class BatchRequestManager: - """Manage batch requests with rate limiting""" - - def __init__(self, requests_per_minute=30, max_concurrent=5): - self.requests_per_minute = requests_per_minute - self.max_concurrent = max_concurrent - self.request_interval = 60.0 / requests_per_minute - - def execute_batch(self, operations: List[Tuple[Callable, tuple, dict]], - handle_rate_limits=True) -> List[Any]: - """Execute a batch of operations with rate limiting""" - - results = [] - - if self.max_concurrent == 1 or not handle_rate_limits: - # Sequential execution - for i, (func, args, kwargs) in enumerate(operations): - if i > 0 and handle_rate_limits: - time.sleep(self.request_interval) - - try: - result = func(*args, **kwargs) - results.append({"success": True, "result": result, "index": i}) - except Exception as e: - results.append({"success": False, "error": e, "index": i}) - else: - # Concurrent execution with rate limiting - with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor: - future_to_index = {} - - for i, (func, args, kwargs) in enumerate(operations): - if i > 0 and handle_rate_limits: - # Stagger request submissions - time.sleep(self.request_interval / self.max_concurrent) - - future = executor.submit(self._execute_with_retry, func, args, kwargs) - future_to_index[future] = i - - # Collect results - for future in as_completed(future_to_index): - index = future_to_index[future] - try: - result = future.result() - results.append({"success": True, "result": result, "index": index}) - except Exception as e: - results.append({"success": False, "error": e, "index": index}) - - # Sort results by original order - results.sort(key=lambda x: x["index"]) - return results - - def _execute_with_retry(self, func, args, kwargs, max_retries=3): - """Execute operation with retry on rate limit""" - for attempt in range(max_retries): - try: - return func(*args, **kwargs) - except atlas.RateLimitError as e: - if attempt == max_retries - 1: - raise - - retry_after = e.response.headers.get('retry-after', 60) - wait_time = int(retry_after) - time.sleep(wait_time) - -# Usage -client = Atlas() -batch_manager = BatchRequestManager(requests_per_minute=20, max_concurrent=3) - -# Prepare batch operations -operations = [] -models = ["gpt-4", "claude-3-opus", "gpt-3.5-turbo"] * 5 - -for model in models: - operation = ( - client.evaluations.create, # function - (), # args - {"model": model, "benchmark": "mmlu"} # kwargs - ) - operations.append(operation) - -# Execute batch -print(f"📦 Executing batch of {len(operations)} operations...") -results = batch_manager.execute_batch(operations) - -# Process results -successful = [r for r in results if r["success"]] -failed = [r for r in results if not r["success"]] - -print(f"✅ Successful: {len(successful)}") -print(f"❌ Failed: {len(failed)}") - -for result in failed: - print(f" Failed operation {result['index']}: {result['error']}") -``` - -## Monitoring Rate Limits - -### Rate Limit Usage Tracking - -```python -import time -from collections import defaultdict, deque -from datetime import datetime, timedelta -from typing import Dict, List -import atlas -from atlas import Atlas - -class RateLimitMonitor: - """Monitor and track rate limit usage""" - - def __init__(self, window_minutes=60): - self.window_minutes = window_minutes - self.request_times = deque() - self.rate_limit_events = [] - self.operation_counts = defaultdict(int) - self.error_counts = defaultdict(int) - - def record_request(self, operation: str): - """Record a successful request""" - now = datetime.now() - self.request_times.append(now) - self.operation_counts[operation] += 1 - self._cleanup_old_data(now) - - def record_rate_limit(self, operation: str, retry_after: int = None): - """Record a rate limit event""" - event = { - 'timestamp': datetime.now(), - 'operation': operation, - 'retry_after': retry_after - } - self.rate_limit_events.append(event) - self.error_counts['rate_limit'] += 1 - - def _cleanup_old_data(self, current_time: datetime): - """Remove data outside monitoring window""" - cutoff = current_time - timedelta(minutes=self.window_minutes) - - # Clean request times - while self.request_times and self.request_times[0] < cutoff: - self.request_times.popleft() - - # Clean rate limit events - self.rate_limit_events = [ - event for event in self.rate_limit_events - if event['timestamp'] > cutoff - ] - - def get_current_rate(self) -> float: - """Get current requests per minute""" - self._cleanup_old_data(datetime.now()) - - if not self.request_times: - return 0.0 - - # Calculate rate over actual time window - time_span = (datetime.now() - self.request_times[0]).total_seconds() / 60 - return len(self.request_times) / max(time_span, 1) - - def get_statistics(self) -> Dict: - """Get comprehensive rate limit statistics""" - self._cleanup_old_data(datetime.now()) - - recent_rate_limits = len(self.rate_limit_events) - total_requests = len(self.request_times) - - return { - 'current_rate_per_minute': self.get_current_rate(), - 'total_requests_in_window': total_requests, - 'rate_limit_events': recent_rate_limits, - 'rate_limit_percentage': (recent_rate_limits / max(total_requests, 1)) * 100, - 'operation_breakdown': dict(self.operation_counts), - 'last_rate_limit': max([e['timestamp'] for e in self.rate_limit_events], - default=None) - } - - def should_slow_down(self, threshold_percentage=5) -> bool: - """Check if we should slow down requests based on rate limits""" - stats = self.get_statistics() - return stats['rate_limit_percentage'] > threshold_percentage - -class MonitoredAtlasClient: - """Atlas client with rate limit monitoring""" - - def __init__(self, **client_kwargs): - self.client = Atlas(**client_kwargs) - self.monitor = RateLimitMonitor() - - def create_evaluation(self, *args, **kwargs): - """Create evaluation with monitoring""" - try: - result = self.client.evaluations.create(*args, **kwargs) - self.monitor.record_request('create_evaluation') - - # Adaptive slowdown - if self.monitor.should_slow_down(): - print("⚠️ High rate limit percentage detected, slowing down...") - time.sleep(2) - - return result - - except atlas.RateLimitError as e: - retry_after = e.response.headers.get('retry-after') - self.monitor.record_rate_limit('create_evaluation', retry_after) - raise - - def get_results(self, *args, **kwargs): - """Get results with monitoring""" - try: - result = self.client.results.get(*args, **kwargs) - self.monitor.record_request('get_results') - return result - - except atlas.RateLimitError as e: - retry_after = e.response.headers.get('retry-after') - self.monitor.record_rate_limit('get_results', retry_after) - raise - - def print_statistics(self): - """Print current rate limit statistics""" - stats = self.monitor.get_statistics() - - print("📊 Rate Limit Statistics (last hour):") - print(f" Current rate: {stats['current_rate_per_minute']:.1f} requests/min") - print(f" Total requests: {stats['total_requests_in_window']}") - print(f" Rate limit events: {stats['rate_limit_events']}") - print(f" Rate limit percentage: {stats['rate_limit_percentage']:.1f}%") - - if stats['operation_breakdown']: - print(" Operations:") - for op, count in stats['operation_breakdown'].items(): - print(f" {op}: {count}") - - if stats['last_rate_limit']: - print(f" Last rate limit: {stats['last_rate_limit']}") - -# Usage -monitored_client = MonitoredAtlasClient() - -# Make requests and monitor -for i in range(20): - try: - evaluation = monitored_client.create_evaluation( - model="gpt-4", - benchmark="mmlu" - ) - print(f"✅ Evaluation {i+1} created") - - if i % 5 == 0: # Print stats every 5 requests - monitored_client.print_statistics() - - except atlas.RateLimitError: - print(f"⏳ Rate limited on request {i+1}") - time.sleep(30) # Wait before continuing - -# Final statistics -monitored_client.print_statistics() -``` - -## Best Practices Summary - -### 1. Implement Proper Retry Logic -```python -# ✅ Good: Exponential backoff with jitter -def robust_request(operation_func, max_retries=3): - for attempt in range(max_retries): - try: - return operation_func() - except atlas.RateLimitError as e: - if attempt == max_retries - 1: - raise - - # Use server-suggested wait time if available - retry_after = e.response.headers.get('retry-after', 2 ** attempt) - wait_time = int(retry_after) + random.uniform(0, 1) - time.sleep(wait_time) -``` - -### 2. Respect Server Headers -```python -# ✅ Good: Check retry-after header -except atlas.RateLimitError as e: - retry_after = e.response.headers.get('retry-after') - if retry_after: - time.sleep(int(retry_after)) -``` - -### 3. Monitor Your Usage -```python -# ✅ Good: Track your rate limit usage -monitor = RateLimitMonitor() -# ... use monitor to adjust request patterns -``` - -### 4. Use Appropriate Request Rates -```python -# ✅ Good: Conservative request rate -throttled_client = ThrottledAtlasClient(requests_per_minute=20) - -# ❌ Bad: Aggressive request rate -# aggressive_client = ThrottledAtlasClient(requests_per_minute=1000) -``` - -### 5. Handle Rate Limits Gracefully -```python -# ✅ Good: Graceful handling -try: - result = client.evaluations.create(model="gpt-4", benchmark="mmlu") -except atlas.RateLimitError: - # Log the event, wait, and potentially retry - logger.warning("Rate limit hit, backing off") - time.sleep(60) -```