Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## 1.0.0b54 (Unreleased)

### Features Added
- Add `StatsbeatManager.add_metric_callback` to let SDKs/distros add their own metric
observations to built-in statsbeat metrics
([#47363](https://github.com/Azure/azure-sdk-for-python/pull/47363))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: blank line


### Breaking Changes
- Customer Facing SDKStats: Renamed metric dimension attributes from snake_case/dotted to camelCase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# Licensed under the MIT License.
import logging
import threading
from typing import Optional, Any, Dict
from typing import Callable, Iterable, List, Optional, Any, Dict

from opentelemetry.metrics import CallbackOptions, Observation
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
Expand Down Expand Up @@ -161,6 +162,11 @@ def __init__(self) -> None:
# Set during first initialization, preserved in shutdown for potential re-initialization
self._config: Optional[StatsbeatConfig] = None # type: ignore

# Extra observation callbacks contributed by SDKs/distros. Keyed by built-in
# statsbeat metric name. Registered directly on the singleton instance, e.g.
# ``StatsbeatManager()._additional_callbacks.setdefault(name, []).append(cb)``.
self._additional_callbacks: Dict[str, List[Callable[[CallbackOptions], Iterable[Observation]]]] = {}

@staticmethod
def _validate_config(config: Optional[StatsbeatConfig]) -> bool:
"""Validate that a configuration has all required fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
get_statsbeat_customer_sdkstats_feature_set,
get_statsbeat_browser_sdk_loader_feature_set,
)
from azure.monitor.opentelemetry.exporter.statsbeat._utils import (
_get_additional_observations,
)
from azure.monitor.opentelemetry.exporter import _utils


Expand Down Expand Up @@ -379,6 +382,7 @@ def _get_success_count(self, options: CallbackOptions) -> Iterable[Observation]:
if count != 0:
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 0
observations.extend(_get_additional_observations(_REQ_SUCCESS_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -393,6 +397,7 @@ def _get_failure_count(self, options: CallbackOptions) -> Iterable[Observation]:
attributes["statusCode"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_FAILURE_NAME[1]][code] = 0 # type: ignore
observations.extend(_get_additional_observations(_REQ_FAILURE_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -409,6 +414,7 @@ def _get_average_duration(self, options: CallbackOptions) -> Iterable[Observatio
observations.append(Observation(result * 1000, dict(attributes)))
_REQUESTS_MAP[_REQ_DURATION_NAME[1]] = 0
_REQUESTS_MAP["count"] = 0
observations.extend(_get_additional_observations(_REQ_DURATION_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -423,6 +429,7 @@ def _get_retry_count(self, options: CallbackOptions) -> Iterable[Observation]:
attributes["statusCode"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_RETRY_NAME[1]][code] = 0 # type: ignore
observations.extend(_get_additional_observations(_REQ_RETRY_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -437,6 +444,7 @@ def _get_throttle_count(self, options: CallbackOptions) -> Iterable[Observation]
attributes["statusCode"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_THROTTLE_NAME[1]][code] = 0 # type: ignore
observations.extend(_get_additional_observations(_REQ_THROTTLE_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -451,6 +459,7 @@ def _get_exception_count(self, options: CallbackOptions) -> Iterable[Observation
attributes["exceptionType"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_EXCEPTION_NAME[1]][code] = 0 # type: ignore
observations.extend(_get_additional_observations(_REQ_EXCEPTION_NAME[0], options))
return observations


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import logging
import json
from collections.abc import Iterable # pylint: disable=import-error
from typing import Optional, Dict
from typing import Optional, Dict, List
from opentelemetry.metrics import CallbackOptions, Observation

from azure.monitor.opentelemetry.exporter._constants import (
_APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME,
Expand Down Expand Up @@ -165,3 +166,40 @@ def _get_connection_string_for_region_from_config(target_region: str, settings:
"Unexpected error getting stats connection string for region '%s': %s", target_region, str(ex)
)
return None


def _get_additional_observations(metric_name: str, options: CallbackOptions) -> List[Observation]:
"""Return observations contributed by extra callbacks registered on :class:`StatsbeatManager`.

Invoked by the built-in ``_StatsbeatMetrics`` callbacks at collection time.
Reads ``StatsbeatManager()._additional_callbacks`` (a live mutable dict on the
singleton instance), which SDKs/distros populate directly. Exceptions raised by
individual callbacks are caught, logged, and skipped.

:param metric_name: Name of the built-in statsbeat metric being collected.
:type metric_name: str
:param options: OpenTelemetry callback options forwarded to each registered callback.
:type options: ~opentelemetry.metrics.CallbackOptions
:returns: List of observations contributed by registered callbacks.
:rtype: list[~opentelemetry.metrics.Observation]
"""
# Lazy import to avoid a circular import between _manager and _utils.
from azure.monitor.opentelemetry.exporter.statsbeat._manager import ( # pylint: disable=import-outside-toplevel
StatsbeatManager,
)

callbacks = StatsbeatManager()._additional_callbacks.get(metric_name, ()) # pylint: disable=protected-access

observations: List[Observation] = []
iter_logger = logging.getLogger(__name__)
for cb in callbacks:
try:
observations.extend(cb(options))
except Exception: # pylint: disable=broad-except
iter_logger.debug(
"Extra statsbeat callback %r for %r raised; skipping.",
cb,
metric_name,
exc_info=True,
)
return observations
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
_REQ_SUCCESS_NAME,
_REQ_THROTTLE_NAME,
)
from opentelemetry.metrics import Observation
from azure.monitor.opentelemetry.exporter.statsbeat import _utils as statsbeat_utils
from azure.monitor.opentelemetry.exporter.statsbeat._manager import StatsbeatManager
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
_REQUESTS_MAP,
_STATSBEAT_STATE,
Expand All @@ -35,6 +38,9 @@
_AttachTypes,
_RP_Names,
)
from azure.monitor.opentelemetry.exporter.statsbeat._utils import (
_get_additional_observations,
)


class MockResponse(object):
Expand Down Expand Up @@ -967,4 +973,115 @@ def test_shorten_host(self):
self.assertEqual(_shorten_host(url), "fakehost-5")


# pylint: disable=protected-access
class TestAdditionalObservationCallbacks(unittest.TestCase):
"""Tests for StatsbeatManager._additional_callbacks and the _get_additional_observations helper."""

def setUp(self):
_REQUESTS_MAP.clear()
Comment thread
rads-1996 marked this conversation as resolved.
# Force a fresh StatsbeatManager so its __init__ runs again (which
# rebuilds an empty _additional_callbacks dict on the instance).
StatsbeatManager._instances.pop(StatsbeatManager, None)

def tearDown(self):
_REQUESTS_MAP.clear()
Comment thread
rads-1996 marked this conversation as resolved.
StatsbeatManager._instances.pop(StatsbeatManager, None)

@staticmethod
def _register(metric_name, callback):
StatsbeatManager()._additional_callbacks.setdefault(metric_name, []).append(callback)

def _make_metric(self):
return _StatsbeatMetrics(
MeterProvider(),
"1aa11111-bbbb-1ccc-8ddd-eeeeffff3334",
"https://westus-1.in.applicationinsights.azure.com/",
False,
0,
False,
)

# ---- _get_additional_observations ----

def test_get_unregistered_name_returns_empty(self):
self.assertEqual(_get_additional_observations(_REQ_SUCCESS_NAME[0], None), [])

def test_get_returns_observations_from_registered_callback(self):
obs = Observation(7, {"endpoint": "ep1"})

def cb(_options):
yield obs

self._register(_REQ_SUCCESS_NAME[0], cb)
self.assertEqual(_get_additional_observations(_REQ_SUCCESS_NAME[0], None), [obs])

def test_get_aggregates_across_multiple_callbacks(self):
obs1 = Observation(1, {"endpoint": "ep1"})
obs2 = Observation(2, {"endpoint": "ep2"})
self._register(_REQ_SUCCESS_NAME[0], lambda _options: [obs1])
self._register(_REQ_SUCCESS_NAME[0], lambda _options: [obs2])
self.assertEqual(
_get_additional_observations(_REQ_SUCCESS_NAME[0], None),
[obs1, obs2],
)

def test_get_swallows_callback_exception_and_continues(self):
good_obs = Observation(42, {"endpoint": "ok"})

def bad_cb(_options):
raise RuntimeError("boom")

self._register(_REQ_SUCCESS_NAME[0], bad_cb)
self._register(_REQ_SUCCESS_NAME[0], lambda _options: [good_obs])
# Should not raise; should still emit the good observation.
self.assertEqual(
_get_additional_observations(_REQ_SUCCESS_NAME[0], None),
[good_obs],
)

def test_get_callbacks_for_other_metrics_not_invoked(self):
called = []
self._register(_REQ_FAILURE_NAME[0], lambda _options: called.append("failure") or [])
_get_additional_observations(_REQ_SUCCESS_NAME[0], None)
self.assertEqual(called, [])

# ---- integration with built-in callbacks ----

def test_success_count_callback_emits_extras(self):
metric = self._make_metric()
_REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 5

extra = Observation(99, {"endpoint": "extra-ep", "statusCode": 200})
self._register(_REQ_SUCCESS_NAME[0], lambda _options: [extra])

observations = metric._get_success_count(options=None)

# Built-in observation followed by the extra one.
self.assertEqual(len(observations), 2)
self.assertEqual(observations[0].value, 5)
self.assertIs(observations[-1], extra)

def test_success_count_callback_unchanged_without_extras(self):
metric = self._make_metric()
_REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 3

observations = metric._get_success_count(options=None)

self.assertEqual(len(observations), 1)
self.assertEqual(observations[0].value, 3)

def test_extras_for_other_metric_do_not_leak_into_success(self):
metric = self._make_metric()
_REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 1

unrelated = Observation(123, {"endpoint": "other"})
self._register(_REQ_FAILURE_NAME[0], lambda _options: [unrelated])

observations = metric._get_success_count(options=None)

self.assertEqual(len(observations), 1)
self.assertEqual(observations[0].value, 1)
self.assertNotIn(unrelated, observations)


# cSpell:enable