-
Notifications
You must be signed in to change notification settings - Fork 36
⚡ Bolt: Optimize ThreadSafeCache with O(1) LRU eviction #467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8d215a1
a2e449f
da14ed5
cc742f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,23 +2,22 @@ | |||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||||||
| from typing import Any, Optional | ||||||||||||||||||||||||||||||
| from datetime import datetime, timedelta | ||||||||||||||||||||||||||||||
| from collections import OrderedDict | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| class ThreadSafeCache: | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| Thread-safe cache implementation with TTL and memory management. | ||||||||||||||||||||||||||||||
| Fixes race conditions and implements proper cache expiration. | ||||||||||||||||||||||||||||||
| Uses OrderedDict for O(1) LRU eviction. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def __init__(self, ttl: int = 300, max_size: int = 100): | ||||||||||||||||||||||||||||||
| self._data = {} | ||||||||||||||||||||||||||||||
| self._timestamps = {} | ||||||||||||||||||||||||||||||
| self._data = OrderedDict() # Key -> (value, timestamp) | ||||||||||||||||||||||||||||||
| self._ttl = ttl # Time to live in seconds | ||||||||||||||||||||||||||||||
| self._max_size = max_size # Maximum number of cache entries | ||||||||||||||||||||||||||||||
| self._lock = threading.RLock() # Reentrant lock for thread safety | ||||||||||||||||||||||||||||||
| self._access_count = {} # Track access frequency for LRU eviction | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def get(self, key: str = "default") -> Optional[Any]: | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
|
|
@@ -27,16 +26,22 @@ def get(self, key: str = "default") -> Optional[Any]: | |||||||||||||||||||||||||||||
| with self._lock: | ||||||||||||||||||||||||||||||
| current_time = time.time() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Check if key exists and is not expired | ||||||||||||||||||||||||||||||
| if key in self._data and key in self._timestamps: | ||||||||||||||||||||||||||||||
| if current_time - self._timestamps[key] < self._ttl: | ||||||||||||||||||||||||||||||
| # Update access count for LRU | ||||||||||||||||||||||||||||||
| self._access_count[key] = self._access_count.get(key, 0) + 1 | ||||||||||||||||||||||||||||||
| return self._data[key] | ||||||||||||||||||||||||||||||
| # Check if key exists | ||||||||||||||||||||||||||||||
| if key in self._data: | ||||||||||||||||||||||||||||||
| value, timestamp = self._data[key] | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Check expiration | ||||||||||||||||||||||||||||||
| if current_time - timestamp < self._ttl: | ||||||||||||||||||||||||||||||
| # Move to end (MRU) | ||||||||||||||||||||||||||||||
| self._data.move_to_end(key) | ||||||||||||||||||||||||||||||
| # print(f"DEBUG: get({key}) hit. Order: {list(self._data.keys())}") | ||||||||||||||||||||||||||||||
| return value | ||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| # Expired entry - remove it | ||||||||||||||||||||||||||||||
| self._remove_key(key) | ||||||||||||||||||||||||||||||
| del self._data[key] | ||||||||||||||||||||||||||||||
| # print(f"DEBUG: get({key}) expired. Order: {list(self._data.keys())}") | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # print(f"DEBUG: get({key}) miss. Order: {list(self._data.keys())}") | ||||||||||||||||||||||||||||||
| return None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def set(self, data: Any, key: str = "default") -> None: | ||||||||||||||||||||||||||||||
|
|
@@ -46,36 +51,37 @@ def set(self, data: Any, key: str = "default") -> None: | |||||||||||||||||||||||||||||
| with self._lock: | ||||||||||||||||||||||||||||||
| current_time = time.time() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Clean up expired entries before adding new one | ||||||||||||||||||||||||||||||
| self._cleanup_expired() | ||||||||||||||||||||||||||||||
| # If key already exists, update and move to end | ||||||||||||||||||||||||||||||
| if key in self._data: | ||||||||||||||||||||||||||||||
| self._data.move_to_end(key) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # If cache is full, evict least recently used entry | ||||||||||||||||||||||||||||||
| if len(self._data) >= self._max_size and key not in self._data: | ||||||||||||||||||||||||||||||
| self._evict_lru() | ||||||||||||||||||||||||||||||
| # Set new data | ||||||||||||||||||||||||||||||
| self._data[key] = (data, current_time) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Set new data atomically | ||||||||||||||||||||||||||||||
| self._data[key] = data | ||||||||||||||||||||||||||||||
| self._timestamps[key] = current_time | ||||||||||||||||||||||||||||||
| self._access_count[key] = 1 | ||||||||||||||||||||||||||||||
| # Evict if over capacity | ||||||||||||||||||||||||||||||
| if len(self._data) > self._max_size: | ||||||||||||||||||||||||||||||
| # Remove first item (LRU) | ||||||||||||||||||||||||||||||
|
Comment on lines
+61
to
+63
|
||||||||||||||||||||||||||||||
| # Evict if over capacity | |
| if len(self._data) > self._max_size: | |
| # Remove first item (LRU) | |
| # Opportunistically remove expired entries from LRU side | |
| while self._data: | |
| oldest_key, (old_value, old_timestamp) = next(iter(self._data.items())) | |
| if current_time - old_timestamp >= self._ttl: | |
| # Remove expired LRU entry | |
| self._data.popitem(last=False) | |
| else: | |
| break | |
| # Evict non-expired entries if still over capacity (LRU) | |
| while len(self._data) > self._max_size: |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,35 +46,26 @@ | |
|
|
||
| # Cached Functions | ||
|
|
||
| # Simple Cache Implementation to avoid async-lru dependency issues on Render | ||
| _cache_store = {} | ||
| # Robust Cache Implementation using ThreadSafeCache (OrderedDict + LRU) | ||
| from backend.cache import ThreadSafeCache | ||
|
|
||
| CACHE_TTL = 3600 # 1 hour | ||
| MAX_CACHE_SIZE = 500 | ||
| _detection_cache = ThreadSafeCache(ttl=CACHE_TTL, max_size=MAX_CACHE_SIZE) | ||
|
Comment on lines
+49
to
+54
|
||
|
|
||
| async def _get_cached_result(key: str, func, *args, **kwargs): | ||
| current_time = time.time() | ||
|
|
||
| # Check cache | ||
| if key in _cache_store: | ||
| result, timestamp = _cache_store[key] | ||
| if current_time - timestamp < CACHE_TTL: | ||
| return result | ||
| else: | ||
| del _cache_store[key] | ||
|
|
||
| # Prune cache if too large | ||
| if len(_cache_store) > MAX_CACHE_SIZE: | ||
| keys_to_remove = list(_cache_store.keys())[:int(MAX_CACHE_SIZE * 0.2)] | ||
| for k in keys_to_remove: | ||
| del _cache_store[k] | ||
| cached_result = _detection_cache.get(key) | ||
| if cached_result is not None: | ||
| return cached_result | ||
|
|
||
| # Execute function | ||
| if 'client' not in kwargs: | ||
| import backend.dependencies | ||
| kwargs['client'] = backend.dependencies.SHARED_HTTP_CLIENT | ||
|
|
||
| result = await func(*args, **kwargs) | ||
| _cache_store[key] = (result, current_time) | ||
| _detection_cache.set(result, key) | ||
| return result | ||
|
|
||
| async def _cached_detect_severity(image_bytes: bytes): | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,73 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time |
Copilot
AI
Feb 24, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_thread_safety_concurrent_writes won't fail if worker threads raise exceptions, because exceptions in threading.Thread don't propagate to the main thread. To make this test meaningful, capture exceptions in a shared list/queue (or use concurrent.futures.ThreadPoolExecutor) and assert that no exceptions occurred.
| def worker(): | |
| for i in range(100): | |
| # set(data, key) | |
| cache.set(i, f"key-{i}") | |
| threads = [threading.Thread(target=worker) for _ in range(10)] | |
| for t in threads: t.start() | |
| for t in threads: t.join() | |
| exceptions = [] | |
| def worker(): | |
| try: | |
| for i in range(100): | |
| # set(data, key) | |
| cache.set(i, f"key-{i}") | |
| except Exception as e: | |
| exceptions.append(e) | |
| threads = [threading.Thread(target=worker) for _ in range(10)] | |
| for t in threads: | |
| t.start() | |
| for t in threads: | |
| t.join() | |
| assert not exceptions, f"Exceptions in worker threads: {exceptions}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several commented-out
print(...)debug statements inget()/set(). These add noise and can easily be re-enabled accidentally; prefer removing them or switching tologger.debug(...)behind the existing logger if you need trace-level introspection.