diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 38ff67a..f1100a9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,18 +34,23 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Install Hatch - run: pip install --upgrade hatch - - if: matrix.python-version == '3.9' && runner.os == 'Linux' name: Lint - run: hatch run lint:all + run: | + pip install "black==22.10.0" "mypy>=0.991" "ruff>=0.0.166" + ruff check . + black --check --diff . + mypy --install-types --non-interactive earningscall tests - name: Run tests and track code coverage - run: hatch run cov + run: | + pip install -e . "coverage[toml]>=6.5" pytest responses coveralls + python -m coverage run -m pytest tests + python -m coverage combine + python -m coverage report --show-missing - if: matrix.python-version == '3.9' && runner.os == 'Linux' name: Publish Coverage to Coveralls - run: hatch run coveralls + run: python -m coveralls env: COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} @@ -62,7 +67,7 @@ jobs: python-version: '3.10' - name: Install Hatch - run: pip install --upgrade hatch hatch-containers + run: pip install --upgrade "hatch<1.16" hatch-containers - name: Run tests in container matrix run: hatch run all:test diff --git a/README.md b/README.md index 8ace90c..23cf107 100644 --- a/README.md +++ b/README.md @@ -357,3 +357,37 @@ earningscall.retry_strategy = { } ``` +### Latency Metrics Collection + +The library can collect per-request latency metrics in memory so you can periodically send them to your own +observability endpoint. + +Each metric includes: + +- request target (API path or URL) +- HTTP method +- status code +- duration in milliseconds +- retry attempt count +- cache hit information +- error type (when a request raises) + +```python +import earningscall +from earningscall import get_latency_metrics_summary, pop_latency_metrics + +# Optional tuning +earningscall.enable_latency_metrics = True +earningscall.latency_metrics_max_entries = 1000 # Keep latest N request metrics in memory + +# ...run your EarningsCall API requests... + +# Snapshot aggregated latency stats without clearing the buffer +summary = get_latency_metrics_summary() +print(summary) + +# Drain raw metrics when it's time to ship a batch to your observability service +raw_metrics = pop_latency_metrics() +# requests.post("https://your-observability-endpoint.example/v1/metrics", json=raw_metrics) +``` + diff --git a/earningscall/__init__.py b/earningscall/__init__.py index 85c4303..f47fba4 100644 --- a/earningscall/__init__.py +++ b/earningscall/__init__.py @@ -1,5 +1,11 @@ from typing import Dict, Optional, Union +from earningscall.api import ( + clear_latency_metrics, + get_latency_metrics, + get_latency_metrics_summary, + pop_latency_metrics, +) from earningscall.exports import get_company, get_all_companies, get_sp500_companies, get_calendar import earningscall.exchanges as exchanges from earningscall.symbols import Symbols, load_symbols @@ -7,6 +13,8 @@ api_key: Optional[str] = None enable_requests_cache: bool = True retry_strategy: Optional[Dict[str, Union[str, int, float]]] = None +enable_latency_metrics: bool = True +latency_metrics_max_entries: int = 1000 __all__ = [ "get_company", @@ -15,5 +23,9 @@ "Symbols", "load_symbols", "get_calendar", + "get_latency_metrics", + "get_latency_metrics_summary", + "clear_latency_metrics", + "pop_latency_metrics", "exchanges", ] diff --git a/earningscall/api.py b/earningscall/api.py index 8e3ac89..7beaa23 100644 --- a/earningscall/api.py +++ b/earningscall/api.py @@ -1,10 +1,12 @@ import importlib.metadata import logging +import math import os import platform +import threading import time import urllib.parse -from typing import Dict, Optional, Union +from typing import Dict, List, Optional, Union import requests from requests_cache import CachedSession @@ -21,6 +23,9 @@ "base_delay": 1, "max_attempts": 10, } +DEFAULT_LATENCY_METRICS_MAX_ENTRIES = 1000 +_latency_metrics: List[dict] = [] +_latency_metrics_lock = threading.Lock() def get_api_key(): @@ -61,6 +66,134 @@ def purge_cache(): return cache_session().cache.clear() +def _get_latency_metrics_max_entries() -> int: + configured_max_entries = getattr(earningscall, "latency_metrics_max_entries", DEFAULT_LATENCY_METRICS_MAX_ENTRIES) + try: + max_entries = int(configured_max_entries) + except (TypeError, ValueError): + max_entries = DEFAULT_LATENCY_METRICS_MAX_ENTRIES + return max(max_entries, 0) + + +def _record_latency_metric( + *, + method: str, + target: str, + duration_ms: float, + attempts: int, + status_code: Optional[int] = None, + from_cache: bool = False, + error_type: Optional[str] = None, +) -> None: + if not getattr(earningscall, "enable_latency_metrics", True): + return + + max_entries = _get_latency_metrics_max_entries() + if max_entries == 0: + return + + metric = { + "timestamp": time.time(), + "method": method, + "target": target, + "status_code": status_code, + "duration_ms": round(duration_ms, 3), + "attempts": attempts, + "from_cache": from_cache, + "error_type": error_type, + } + with _latency_metrics_lock: + _latency_metrics.append(metric) + overflow = len(_latency_metrics) - max_entries + if overflow > 0: + del _latency_metrics[:overflow] + + +def get_latency_metrics() -> List[dict]: + with _latency_metrics_lock: + return [metric.copy() for metric in _latency_metrics] + + +def pop_latency_metrics() -> List[dict]: + with _latency_metrics_lock: + snapshot = [metric.copy() for metric in _latency_metrics] + _latency_metrics.clear() + return snapshot + + +def clear_latency_metrics() -> None: + with _latency_metrics_lock: + _latency_metrics.clear() + + +def _calculate_percentile(values: List[float], percentile: float) -> float: + if not values: + return 0.0 + sorted_values = sorted(values) + index = max(math.ceil(percentile * len(sorted_values)) - 1, 0) + return sorted_values[index] + + +def get_latency_metrics_summary(reset: bool = False) -> List[dict]: + metrics = pop_latency_metrics() if reset else get_latency_metrics() + groups: Dict[tuple, dict] = {} + for metric in metrics: + key = ( + metric["method"], + metric["target"], + metric["status_code"], + metric["error_type"], + ) + group = groups.setdefault( + key, + { + "method": metric["method"], + "target": metric["target"], + "status_code": metric["status_code"], + "error_type": metric["error_type"], + "count": 0, + "from_cache_count": 0, + "retry_count": 0, + "durations_ms": [], + }, + ) + group["count"] += 1 + group["from_cache_count"] += int(bool(metric["from_cache"])) + group["retry_count"] += max(int(metric["attempts"]) - 1, 0) + group["durations_ms"].append(float(metric["duration_ms"])) + + summaries: List[dict] = [] + for group in groups.values(): + durations = group["durations_ms"] + count = group["count"] + total_duration = sum(durations) + summaries.append( + { + "method": group["method"], + "target": group["target"], + "status_code": group["status_code"], + "error_type": group["error_type"], + "count": count, + "from_cache_count": group["from_cache_count"], + "retry_count": group["retry_count"], + "min_duration_ms": round(min(durations), 3), + "max_duration_ms": round(max(durations), 3), + "avg_duration_ms": round(total_duration / count, 3), + "p95_duration_ms": round(_calculate_percentile(durations, 0.95), 3), + } + ) + + return sorted( + summaries, + key=lambda item: ( + item["target"], + item["method"], + str(item["status_code"]), + str(item["error_type"]), + ), + ) + + def get_earnings_call_version(): try: return importlib.metadata.version("earningscall") @@ -91,9 +224,28 @@ def _get_with_optional_cache(url: str, *, params: Optional[dict] = None, stream: """ Internal helper to GET an absolute URL, using the shared requests cache when enabled. """ - if earningscall.enable_requests_cache: - return cache_session().get(url, params=params, headers=get_headers(), stream=stream) - return requests.get(url, params=params, headers=get_headers(), stream=stream) + request_start_time = time.perf_counter() + response: Optional[requests.Response] = None + error_type: Optional[str] = None + try: + if earningscall.enable_requests_cache: + response = cache_session().get(url, params=params, headers=get_headers(), stream=stream) + else: + response = requests.get(url, params=params, headers=get_headers(), stream=stream) + return response + except Exception as error: + error_type = type(error).__name__ + raise + finally: + _record_latency_metric( + method="GET", + target=url, + status_code=response.status_code if response is not None else None, + duration_ms=(time.perf_counter() - request_start_time) * 1000, + attempts=1, + from_cache=bool(getattr(response, "from_cache", False)) if response is not None else False, + error_type=error_type, + ) def can_retry(response: requests.Response) -> bool: @@ -139,39 +291,63 @@ def do_get( delay = retry_strategy["base_delay"] max_attempts = int(retry_strategy["max_attempts"]) - for attempt in range(max_attempts): - if use_cache and earningscall.enable_requests_cache: - response = cache_session().get(url, params=params) - else: - response = requests.get( - url, - params=params, - headers=get_headers(), - stream=kwargs.get("stream"), - ) - - if is_success(response): - return response - - if response.status_code == 401: - raise InvalidApiKeyError("Your API key is invalid. You can get your API key at: https://{DOMAIN}/api-key") - - if not can_retry(response): - return response - - if attempt < max_attempts - 1: # Don't sleep after the last attempt - if retry_strategy["strategy"] == "exponential": - wait_time = delay * (2**attempt) # Exponential backoff: 1s -> 2s -> 4s -> 8s -> 16s -> 32s -> 64s - elif retry_strategy["strategy"] == "linear": - wait_time = delay * (attempt + 1) # Linear backoff: 1s -> 2s -> 3s -> 4s -> 5s -> 6s -> 7s - else: - raise ValueError("Invalid retry strategy. Must be one of: 'exponential', 'linear'") - # TODO: Should we log a warning here? Does the customer want to see this log? - log.warning( - f"Rate limited (429). Retrying in {wait_time} seconds... (Attempt {attempt + 1}/{max_attempts})" - ) - time.sleep(float(wait_time)) + request_start_time = time.perf_counter() + response: Optional[requests.Response] = None + attempts = 0 + error_type: Optional[str] = None + try: + for attempt in range(max_attempts): + attempts = attempt + 1 + if use_cache and earningscall.enable_requests_cache: + response = cache_session().get(url, params=params) + else: + response = requests.get( + url, + params=params, + headers=get_headers(), + stream=kwargs.get("stream"), + ) + + if is_success(response): + return response + + if response.status_code == 401: + raise InvalidApiKeyError( + "Your API key is invalid. You can get your API key at: https://{DOMAIN}/api-key" + ) + + if not can_retry(response): + return response + + if attempt < max_attempts - 1: # Don't sleep after the last attempt + if retry_strategy["strategy"] == "exponential": + wait_time = delay * (2**attempt) # Exponential backoff: 1s -> 2s -> 4s -> 8s -> 16s -> 32s -> 64s + elif retry_strategy["strategy"] == "linear": + wait_time = delay * (attempt + 1) # Linear backoff: 1s -> 2s -> 3s -> 4s -> 5s -> 6s -> 7s + else: + raise ValueError("Invalid retry strategy. Must be one of: 'exponential', 'linear'") + # TODO: Should we log a warning here? Does the customer want to see this log? + log.warning( + f"Rate limited (429). Retrying in {wait_time} seconds... (Attempt {attempt + 1}/{max_attempts})" + ) + time.sleep(float(wait_time)) + except Exception as error: + error_type = type(error).__name__ + raise + finally: + _record_latency_metric( + method="GET", + target=path, + status_code=response.status_code if response is not None else None, + duration_ms=(time.perf_counter() - request_start_time) * 1000, + attempts=attempts, + from_cache=bool(getattr(response, "from_cache", False)) if response is not None else False, + error_type=error_type, + ) + + if response is None: + raise RuntimeError("No request attempt was made. Ensure retry_strategy.max_attempts is >= 1.") return response # Return the last response if all retries failed diff --git a/earningscall/symbols.py b/earningscall/symbols.py index 4094945..0c85f19 100644 --- a/earningscall/symbols.py +++ b/earningscall/symbols.py @@ -72,7 +72,6 @@ def exchange_symbol(self): class Symbols: - def __init__(self): self.exchanges = set() self.by_name = defaultdict(set) diff --git a/hatch.toml b/hatch.toml index b0095b6..e28d54c 100644 --- a/hatch.toml +++ b/hatch.toml @@ -26,7 +26,7 @@ python = ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] [envs.lint] detached = true dependencies = [ - "black>=22.10.0", + "black==22.10.0", "mypy>=0.991", "ruff>=0.0.166", ] diff --git a/tests/test_download_audio_files.py b/tests/test_download_audio_files.py index 611fcab..2ebefc1 100644 --- a/tests/test_download_audio_files.py +++ b/tests/test_download_audio_files.py @@ -61,6 +61,7 @@ def test_download_audio_file_event(): os.unlink(file_name) +@responses.activate def test_download_audio_file_missing_params_raises_value_error(): ## responses._add_from_file(file_path=data_path("symbols-v2.yaml")) diff --git a/tests/test_latency_metrics.py b/tests/test_latency_metrics.py new file mode 100644 index 0000000..c9e990a --- /dev/null +++ b/tests/test_latency_metrics.py @@ -0,0 +1,174 @@ +import pytest +import responses + +import earningscall +from earningscall.api import ( + API_BASE, + clear_latency_metrics, + do_get, + get_exchanges_json, + get_latency_metrics, + get_latency_metrics_summary, + pop_latency_metrics, +) + + +@pytest.fixture(autouse=True) +def run_before_and_after_tests(): + earningscall.api_key = None + earningscall.retry_strategy = { + "strategy": "exponential", + "base_delay": 0, + "max_attempts": 3, + } + earningscall.enable_requests_cache = False + earningscall.enable_latency_metrics = True + earningscall.latency_metrics_max_entries = 1000 + clear_latency_metrics() + yield + clear_latency_metrics() + earningscall.api_key = None + earningscall.retry_strategy = None + earningscall.enable_requests_cache = True + earningscall.enable_latency_metrics = True + earningscall.latency_metrics_max_entries = 1000 + + +@responses.activate +def test_do_get_collects_latency_metrics(): + responses.add( + responses.GET, + f"{API_BASE}/symbols-v2.txt?apikey=demo", + body="AAPL\nMSFT", + status=200, + ) + + response = do_get("symbols-v2.txt") + + assert response.status_code == 200 + metrics = get_latency_metrics() + assert len(metrics) == 1 + metric = metrics[0] + assert metric["method"] == "GET" + assert metric["target"] == "symbols-v2.txt" + assert metric["status_code"] == 200 + assert metric["attempts"] == 1 + assert metric["error_type"] is None + assert metric["duration_ms"] >= 0 + + +@responses.activate +def test_do_get_latency_metrics_include_retry_attempts(): + earningscall.retry_strategy = { + "strategy": "linear", + "base_delay": 0, + "max_attempts": 3, + } + url = f"{API_BASE}/transcript?apikey=demo&exchange=NASDAQ&symbol=AAPL&year=2023&quarter=1&level=1" + responses.add( + responses.GET, + url, + json={"error": "Rate limit exceeded"}, + status=429, + ) + responses.add( + responses.GET, + url, + json={"text": "hello"}, + status=200, + ) + + response = do_get( + "transcript", + params={ + "exchange": "NASDAQ", + "symbol": "AAPL", + "year": "2023", + "quarter": "1", + "level": "1", + }, + ) + + assert response.status_code == 200 + metrics = get_latency_metrics() + assert len(metrics) == 1 + metric = metrics[0] + assert metric["attempts"] == 2 + assert metric["status_code"] == 200 + + +@responses.activate +def test_latency_metrics_summary_and_drain(): + url = f"{API_BASE}/symbols-v2.txt?apikey=demo" + responses.add(responses.GET, url, body="AAPL", status=200) + responses.add(responses.GET, url, body="MSFT", status=200) + + do_get("symbols-v2.txt") + do_get("symbols-v2.txt") + + summary = get_latency_metrics_summary() + assert len(summary) == 1 + assert summary[0]["target"] == "symbols-v2.txt" + assert summary[0]["count"] == 2 + assert summary[0]["retry_count"] == 0 + assert summary[0]["p95_duration_ms"] >= summary[0]["min_duration_ms"] + + drained_metrics = pop_latency_metrics() + assert len(drained_metrics) == 2 + assert get_latency_metrics() == [] + + +@responses.activate +def test_latency_metrics_respect_max_entries(): + earningscall.latency_metrics_max_entries = 1 + responses.add( + responses.GET, + f"{API_BASE}/symbols-v2.txt?apikey=demo", + body="AAPL", + status=200, + ) + responses.add( + responses.GET, + f"{API_BASE}/symbols/sp500.txt?apikey=demo", + body="AAPL\nMSFT", + status=200, + ) + + do_get("symbols-v2.txt") + do_get("symbols/sp500.txt") + + metrics = get_latency_metrics() + assert len(metrics) == 1 + assert metrics[0]["target"] == "symbols/sp500.txt" + + +@responses.activate +def test_latency_metrics_can_be_disabled(): + earningscall.enable_latency_metrics = False + responses.add( + responses.GET, + f"{API_BASE}/symbols-v2.txt?apikey=demo", + body="AAPL", + status=200, + ) + + do_get("symbols-v2.txt") + + assert get_latency_metrics() == [] + + +@responses.activate +def test_get_exchanges_json_collects_absolute_url_metrics(): + responses.add( + responses.GET, + "https://earningscall.biz/exchanges.json", + json={"exchanges": []}, + status=200, + ) + + exchanges_payload = get_exchanges_json() + + assert exchanges_payload == {"exchanges": []} + metrics = get_latency_metrics() + assert len(metrics) == 1 + assert metrics[0]["target"] == "https://earningscall.biz/exchanges.json"