Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,23 @@ jobs:
with:
python-version: ${{ matrix.python-version }}

- name: Install Hatch
run: pip install --upgrade hatch

- if: matrix.python-version == '3.9' && runner.os == 'Linux'
name: Lint
run: hatch run lint:all
run: |
pip install "black==22.10.0" "mypy>=0.991" "ruff>=0.0.166"
ruff check .
black --check --diff .
mypy --install-types --non-interactive earningscall tests

- name: Run tests and track code coverage
run: hatch run cov
run: |
pip install -e . "coverage[toml]>=6.5" pytest responses coveralls
python -m coverage run -m pytest tests
python -m coverage combine
python -m coverage report --show-missing
- if: matrix.python-version == '3.9' && runner.os == 'Linux'
name: Publish Coverage to Coveralls
run: hatch run coveralls
run: python -m coveralls
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}

Expand All @@ -62,7 +67,7 @@ jobs:
python-version: '3.10'

- name: Install Hatch
run: pip install --upgrade hatch hatch-containers
run: pip install --upgrade "hatch<1.16" hatch-containers

- name: Run tests in container matrix
run: hatch run all:test
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,37 @@ earningscall.retry_strategy = {
}
```

### Latency Metrics Collection

The library can collect per-request latency metrics in memory so you can periodically send them to your own
observability endpoint.

Each metric includes:

- request target (API path or URL)
- HTTP method
- status code
- duration in milliseconds
- retry attempt count
- cache hit information
- error type (when a request raises)

```python
import earningscall
from earningscall import get_latency_metrics_summary, pop_latency_metrics

# Optional tuning
earningscall.enable_latency_metrics = True
earningscall.latency_metrics_max_entries = 1000 # Keep latest N request metrics in memory

# ...run your EarningsCall API requests...

# Snapshot aggregated latency stats without clearing the buffer
summary = get_latency_metrics_summary()
print(summary)

# Drain raw metrics when it's time to ship a batch to your observability service
raw_metrics = pop_latency_metrics()
# requests.post("https://your-observability-endpoint.example/v1/metrics", json=raw_metrics)
```

12 changes: 12 additions & 0 deletions earningscall/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from typing import Dict, Optional, Union

from earningscall.api import (
clear_latency_metrics,
get_latency_metrics,
get_latency_metrics_summary,
pop_latency_metrics,
)
from earningscall.exports import get_company, get_all_companies, get_sp500_companies, get_calendar
import earningscall.exchanges as exchanges
from earningscall.symbols import Symbols, load_symbols

api_key: Optional[str] = None
enable_requests_cache: bool = True
retry_strategy: Optional[Dict[str, Union[str, int, float]]] = None
enable_latency_metrics: bool = True
latency_metrics_max_entries: int = 1000

__all__ = [
"get_company",
Expand All @@ -15,5 +23,9 @@
"Symbols",
"load_symbols",
"get_calendar",
"get_latency_metrics",
"get_latency_metrics_summary",
"clear_latency_metrics",
"pop_latency_metrics",
"exchanges",
]
248 changes: 212 additions & 36 deletions earningscall/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import importlib.metadata
import logging
import math
import os
import platform
import threading
import time
import urllib.parse
from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union

import requests
from requests_cache import CachedSession
Expand All @@ -21,6 +23,9 @@
"base_delay": 1,
"max_attempts": 10,
}
DEFAULT_LATENCY_METRICS_MAX_ENTRIES = 1000
_latency_metrics: List[dict] = []
_latency_metrics_lock = threading.Lock()


def get_api_key():
Expand Down Expand Up @@ -61,6 +66,134 @@ def purge_cache():
return cache_session().cache.clear()


def _get_latency_metrics_max_entries() -> int:
configured_max_entries = getattr(earningscall, "latency_metrics_max_entries", DEFAULT_LATENCY_METRICS_MAX_ENTRIES)
try:
max_entries = int(configured_max_entries)
except (TypeError, ValueError):
max_entries = DEFAULT_LATENCY_METRICS_MAX_ENTRIES
return max(max_entries, 0)


def _record_latency_metric(
*,
method: str,
target: str,
duration_ms: float,
attempts: int,
status_code: Optional[int] = None,
from_cache: bool = False,
error_type: Optional[str] = None,
) -> None:
if not getattr(earningscall, "enable_latency_metrics", True):
return

max_entries = _get_latency_metrics_max_entries()
if max_entries == 0:
return

metric = {
"timestamp": time.time(),
"method": method,
"target": target,
"status_code": status_code,
"duration_ms": round(duration_ms, 3),
"attempts": attempts,
"from_cache": from_cache,
"error_type": error_type,
}
with _latency_metrics_lock:
_latency_metrics.append(metric)
overflow = len(_latency_metrics) - max_entries
if overflow > 0:
del _latency_metrics[:overflow]


def get_latency_metrics() -> List[dict]:
with _latency_metrics_lock:
return [metric.copy() for metric in _latency_metrics]


def pop_latency_metrics() -> List[dict]:
with _latency_metrics_lock:
snapshot = [metric.copy() for metric in _latency_metrics]
_latency_metrics.clear()
return snapshot


def clear_latency_metrics() -> None:
with _latency_metrics_lock:
_latency_metrics.clear()


def _calculate_percentile(values: List[float], percentile: float) -> float:
if not values:
return 0.0
sorted_values = sorted(values)
index = max(math.ceil(percentile * len(sorted_values)) - 1, 0)
return sorted_values[index]


def get_latency_metrics_summary(reset: bool = False) -> List[dict]:
metrics = pop_latency_metrics() if reset else get_latency_metrics()
groups: Dict[tuple, dict] = {}
for metric in metrics:
key = (
metric["method"],
metric["target"],
metric["status_code"],
metric["error_type"],
)
group = groups.setdefault(
key,
{
"method": metric["method"],
"target": metric["target"],
"status_code": metric["status_code"],
"error_type": metric["error_type"],
"count": 0,
"from_cache_count": 0,
"retry_count": 0,
"durations_ms": [],
},
)
group["count"] += 1
group["from_cache_count"] += int(bool(metric["from_cache"]))
group["retry_count"] += max(int(metric["attempts"]) - 1, 0)
group["durations_ms"].append(float(metric["duration_ms"]))

summaries: List[dict] = []
for group in groups.values():
durations = group["durations_ms"]
count = group["count"]
total_duration = sum(durations)
summaries.append(
{
"method": group["method"],
"target": group["target"],
"status_code": group["status_code"],
"error_type": group["error_type"],
"count": count,
"from_cache_count": group["from_cache_count"],
"retry_count": group["retry_count"],
"min_duration_ms": round(min(durations), 3),
"max_duration_ms": round(max(durations), 3),
"avg_duration_ms": round(total_duration / count, 3),
"p95_duration_ms": round(_calculate_percentile(durations, 0.95), 3),
}
)

return sorted(
summaries,
key=lambda item: (
item["target"],
item["method"],
str(item["status_code"]),
str(item["error_type"]),
),
)


def get_earnings_call_version():
try:
return importlib.metadata.version("earningscall")
Expand Down Expand Up @@ -91,9 +224,28 @@ def _get_with_optional_cache(url: str, *, params: Optional[dict] = None, stream:
"""
Internal helper to GET an absolute URL, using the shared requests cache when enabled.
"""
if earningscall.enable_requests_cache:
return cache_session().get(url, params=params, headers=get_headers(), stream=stream)
return requests.get(url, params=params, headers=get_headers(), stream=stream)
request_start_time = time.perf_counter()
response: Optional[requests.Response] = None
error_type: Optional[str] = None
try:
if earningscall.enable_requests_cache:
response = cache_session().get(url, params=params, headers=get_headers(), stream=stream)
else:
response = requests.get(url, params=params, headers=get_headers(), stream=stream)
return response
except Exception as error:
error_type = type(error).__name__
raise
finally:
_record_latency_metric(
method="GET",
target=url,
status_code=response.status_code if response is not None else None,
duration_ms=(time.perf_counter() - request_start_time) * 1000,
attempts=1,
from_cache=bool(getattr(response, "from_cache", False)) if response is not None else False,
error_type=error_type,
)


def can_retry(response: requests.Response) -> bool:
Expand Down Expand Up @@ -139,39 +291,63 @@ def do_get(
delay = retry_strategy["base_delay"]
max_attempts = int(retry_strategy["max_attempts"])

for attempt in range(max_attempts):
if use_cache and earningscall.enable_requests_cache:
response = cache_session().get(url, params=params)
else:
response = requests.get(
url,
params=params,
headers=get_headers(),
stream=kwargs.get("stream"),
)

if is_success(response):
return response

if response.status_code == 401:
raise InvalidApiKeyError("Your API key is invalid. You can get your API key at: https://{DOMAIN}/api-key")

if not can_retry(response):
return response

if attempt < max_attempts - 1: # Don't sleep after the last attempt
if retry_strategy["strategy"] == "exponential":
wait_time = delay * (2**attempt) # Exponential backoff: 1s -> 2s -> 4s -> 8s -> 16s -> 32s -> 64s
elif retry_strategy["strategy"] == "linear":
wait_time = delay * (attempt + 1) # Linear backoff: 1s -> 2s -> 3s -> 4s -> 5s -> 6s -> 7s
else:
raise ValueError("Invalid retry strategy. Must be one of: 'exponential', 'linear'")
# TODO: Should we log a warning here? Does the customer want to see this log?
log.warning(
f"Rate limited (429). Retrying in {wait_time} seconds... (Attempt {attempt + 1}/{max_attempts})"
)
time.sleep(float(wait_time))
request_start_time = time.perf_counter()
response: Optional[requests.Response] = None
attempts = 0
error_type: Optional[str] = None

try:
for attempt in range(max_attempts):
attempts = attempt + 1
if use_cache and earningscall.enable_requests_cache:
response = cache_session().get(url, params=params)
else:
response = requests.get(
url,
params=params,
headers=get_headers(),
stream=kwargs.get("stream"),
)

if is_success(response):
return response

if response.status_code == 401:
raise InvalidApiKeyError(
"Your API key is invalid. You can get your API key at: https://{DOMAIN}/api-key"
)

if not can_retry(response):
return response

if attempt < max_attempts - 1: # Don't sleep after the last attempt
if retry_strategy["strategy"] == "exponential":
wait_time = delay * (2**attempt) # Exponential backoff: 1s -> 2s -> 4s -> 8s -> 16s -> 32s -> 64s
elif retry_strategy["strategy"] == "linear":
wait_time = delay * (attempt + 1) # Linear backoff: 1s -> 2s -> 3s -> 4s -> 5s -> 6s -> 7s
else:
raise ValueError("Invalid retry strategy. Must be one of: 'exponential', 'linear'")
# TODO: Should we log a warning here? Does the customer want to see this log?
log.warning(
f"Rate limited (429). Retrying in {wait_time} seconds... (Attempt {attempt + 1}/{max_attempts})"
)
time.sleep(float(wait_time))
except Exception as error:
error_type = type(error).__name__
raise
finally:
_record_latency_metric(
method="GET",
target=path,
status_code=response.status_code if response is not None else None,
duration_ms=(time.perf_counter() - request_start_time) * 1000,
attempts=attempts,
from_cache=bool(getattr(response, "from_cache", False)) if response is not None else False,
error_type=error_type,
)

if response is None:
raise RuntimeError("No request attempt was made. Ensure retry_strategy.max_attempts is >= 1.")
return response # Return the last response if all retries failed


Expand Down
Loading