Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bf5456e
bind sdk metrics
xiazhvera Nov 21, 2025
d3d6352
set aws-c-mqtt to test branch
xiazhvera Nov 21, 2025
21fe38d
bind sdk metrics
xiazhvera Nov 21, 2025
13fe021
update aws-c-common
xiazhvera Nov 21, 2025
da7ed31
add SdkMetrics in mqtt5 lib
xiazhvera Nov 21, 2025
c167b43
move metrics to client option
xiazhvera Dec 23, 2025
2368721
clean up & lint
xiazhvera Dec 23, 2025
c456258
Merge branch 'main' into iot_metrics_2
xiazhvera Dec 23, 2025
03e5d98
update test to use client option
xiazhvera Dec 23, 2025
9a9c89d
Merge branch 'iot_metrics_2' of https://github.com/awslabs/aws-crt-py…
xiazhvera Dec 23, 2025
5f3868c
add metrics for adapter
xiazhvera Dec 23, 2025
ec40b81
fix param order
xiazhvera Dec 23, 2025
fb81ecc
remove custom metrics test
xiazhvera Dec 23, 2025
0435c7f
update test test_metrics_disabled
xiazhvera Dec 23, 2025
3a7b509
Merge branch 'main' into iot_metrics_2
xiazhvera Jan 12, 2026
a21e542
Merge branch 'main' into iot_metrics_2
xiazhvera Jan 15, 2026
c51a273
kick ci
xiazhvera Jan 15, 2026
2f31d9d
Merge branch 'iot_metrics_2' of https://github.com/awslabs/aws-crt-py…
xiazhvera Jan 15, 2026
a6a3c30
fix set metrics error handling
xiazhvera Jan 16, 2026
0f05a84
address comments
xiazhvera Feb 12, 2026
0b84d10
Merge branch 'main' of https://github.com/awslabs/aws-crt-python into…
xiazhvera Feb 12, 2026
ad950b5
allow none metrics
xiazhvera Feb 12, 2026
a6982bf
update metrics structure
xiazhvera Feb 13, 2026
d7ea6c7
update mqtt lib
xiazhvera Feb 17, 2026
8424f97
add metrics enabled test and metrics should be private
xiazhvera Mar 2, 2026
e050814
lint & fix validation
xiazhvera Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions awscrt/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from awscrt.http import HttpProxyOptions, HttpRequest
from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions
from dataclasses import dataclass
from awscrt.mqtt5 import Client as Mqtt5Client
from awscrt.mqtt5 import Client as Mqtt5Client, SdkMetrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of pulling SdkMetrics in from mqtt5, should we instead implement an mqtt3 version of SdkMetrics? It appears there's precedence for this with the double implementation of OperationStatisticsData. Unsure if this is a good or bad practice but it's one we've previously used. This could also potentially allow us to set different things by default for mqtt3 vs. mqtt5 or it could add confusion and a second place to update things when we make changes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to duplicate the structure across MQTT3 and MQTT5. Maintaining it in two places increases the risk of them going out of sync over time. I realize that's the pattern we used in Python, but I wouldn't consider it a good practice to follow.

If we ever need different defaults for MQTT3 vs. MQTT5, we can always inherit from the base structure at that point.



class QoS(IntEnum):
Expand Down Expand Up @@ -330,6 +330,8 @@ class Connection(NativeResource):

proxy_options (Optional[awscrt.http.HttpProxyOptions]):
Optional proxy options for all connections.

enable_metrics (bool): Enable IoT SDK metrics in MQTT CONNECT packet username field. Default to True.
"""

def __init__(self,
Expand All @@ -355,7 +357,8 @@ def __init__(self,
proxy_options=None,
on_connection_success=None,
on_connection_failure=None,
on_connection_closed=None
on_connection_closed=None,
enable_metrics=True,
):

assert isinstance(client, Client) or isinstance(client, Mqtt5Client)
Expand Down Expand Up @@ -408,6 +411,10 @@ def __init__(self,
self.password = password
self.socket_options = socket_options if socket_options else SocketOptions()
self.proxy_options = proxy_options if proxy_options else websocket_proxy_options
if enable_metrics:
self._metrics = SdkMetrics()
else:
self._metrics = None

self._binding = _awscrt.mqtt_client_connection_new(
self,
Expand Down Expand Up @@ -524,7 +531,8 @@ def on_connect(error_code, return_code, session_present):
self.password,
self.clean_session,
on_connect,
self.proxy_options
self.proxy_options,
self._metrics
)

except Exception as e:
Expand Down
35 changes: 31 additions & 4 deletions awscrt/mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@
from inspect import signature


@dataclass
class SdkMetrics:
"""
Configuration for IoT SDK metrics that are embedded in MQTT Connect Packet username field.

Args:
library_name (str): The SDK library name (e.g., "IoTDeviceSDK/Python")

"""
library_name: str = "IoTDeviceSDK/Python"


class QoS(IntEnum):
"""MQTT message delivery quality of service.

Expand Down Expand Up @@ -1158,6 +1170,7 @@ class ConnectPacket:
will_delay_interval_sec (int): A time interval, in seconds, that the server should wait (for a session reconnection) before sending the will message associated with the connection's session. If omitted or None, the server will send the will when the associated session is destroyed. If the session is destroyed before a will delay interval has elapsed, then the will must be sent at the time of session destruction.
will (PublishPacket): The definition of a message to be published when the connection's session is destroyed by the server or when the will delay interval has elapsed, whichever comes first. If None, then nothing will be sent.
user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet.

"""
keep_alive_interval_sec: int = None
client_id: str = None
Expand Down Expand Up @@ -1338,6 +1351,7 @@ class ClientOptions:
on_lifecycle_event_connection_success_fn (Callable[[LifecycleConnectSuccessData],]): Callback for Lifecycle Event Connection Success.
on_lifecycle_event_connection_failure_fn (Callable[[LifecycleConnectFailureData],]): Callback for Lifecycle Event Connection Failure.
on_lifecycle_event_disconnection_fn (Callable[[LifecycleDisconnectData],]): Callback for Lifecycle Event Disconnection.
enable_metrics (bool): Enable IoT SDK metrics in MQTT CONNECT packet username field. Default to True.
"""
host_name: str
port: int = None
Expand All @@ -1364,6 +1378,7 @@ class ClientOptions:
on_lifecycle_event_connection_success_fn: Callable[[LifecycleConnectSuccessData], None] = None
on_lifecycle_event_connection_failure_fn: Callable[[LifecycleConnectFailureData], None] = None
on_lifecycle_event_disconnection_fn: Callable[[LifecycleDisconnectData], None] = None
enable_metrics: bool = True


def _check_callback(callback):
Expand Down Expand Up @@ -1392,6 +1407,7 @@ def __init__(self, client_options: ClientOptions):
self._on_lifecycle_connection_failure_cb = _check_callback(
client_options.on_lifecycle_event_connection_failure_fn)
self._on_lifecycle_disconnection_cb = _check_callback(client_options.on_lifecycle_event_disconnection_fn)
self._enable_metrics = client_options.enable_metrics

def _ws_handshake_transform(self, http_request_binding, http_headers_binding, native_userdata):
if self._ws_handshake_transform_cb is None:
Expand Down Expand Up @@ -1704,7 +1720,8 @@ def __init__(
ping_timeout_ms: int,
keep_alive_secs: int,
ack_timeout_secs: int,
clean_session: int):
clean_session: int,
enable_metrics: bool):
self.host_name = host_name
self.port = port
self.client_id = "" if client_id is None else client_id
Expand All @@ -1715,6 +1732,7 @@ def __init__(
self.keep_alive_secs: int = 1200 if keep_alive_secs is None else keep_alive_secs
self.ack_timeout_secs: int = 0 if ack_timeout_secs is None else ack_timeout_secs
self.clean_session: bool = True if clean_session is None else clean_session
self.enable_metrics: bool = True if enable_metrics is None else enable_metrics


class Client(NativeResource):
Expand All @@ -1728,7 +1746,6 @@ class Client(NativeResource):
"""

def __init__(self, client_options: ClientOptions):

super().__init__()

core = _ClientCore(client_options)
Expand All @@ -1746,6 +1763,12 @@ def __init__(self, client_options: ClientOptions):
if not socket_options:
socket_options = SocketOptions()

# Handle metrics configuration
if client_options.enable_metrics:
self._metrics = SdkMetrics()
else:
self._metrics = None

if not connect_options.will:
is_will_none = True
will = PublishPacket()
Expand Down Expand Up @@ -1797,6 +1820,8 @@ def __init__(self, client_options: ClientOptions):
client_options.ack_timeout_sec,
client_options.topic_aliasing_options,
websocket_is_none,
client_options.enable_metrics,
self._metrics.library_name if self._metrics else None,
core)

# Store the options for adapter
Expand All @@ -1811,7 +1836,8 @@ def __init__(self, client_options: ClientOptions):
keep_alive_secs=connect_options.keep_alive_interval_sec,
ack_timeout_secs=client_options.ack_timeout_sec,
clean_session=(
client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True))
client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True),
enable_metrics=client_options.enable_metrics)

def start(self):
"""Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint.
Expand Down Expand Up @@ -2043,5 +2069,6 @@ def new_connection(self, on_connection_interrupted=None, on_connection_resumed=N
use_websockets=False,
websocket_proxy_options=None,
websocket_handshake_transform=None,
proxy_options=None
proxy_options=None,
enable_metrics=self.adapter_options.enable_metrics
)
19 changes: 18 additions & 1 deletion source/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -859,10 +859,13 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
/* Callbacks */
PyObject *is_websocket_none_py;
PyObject *client_core_py;
/* Metrics */
PyObject *is_metrics_enabled_py; /* optional enable metrics */
struct aws_byte_cursor metrics_library_name; /* optional IoT SDK metrics username */

if (!PyArg_ParseTuple(
args,
"Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOOOO",
"Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOOOOz#O",
/* O */ &self_py,
/* s */ &host_name.ptr,
/* # */ &host_name.len,
Expand Down Expand Up @@ -917,6 +920,12 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
/* O */ &topic_aliasing_options_py,

/* O */ &is_websocket_none_py,

/* Metrics */
/* O */ &is_metrics_enabled_py,
/* z */ &metrics_library_name.ptr,
/* # */ &metrics_library_name.len,

/* O */ &client_core_py)) {
return NULL;
}
Expand Down Expand Up @@ -1279,6 +1288,14 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
connect_options.will = &will;
}

/* METRICS */
struct aws_mqtt_iot_metrics metrics_tmp;
AWS_ZERO_STRUCT(metrics_tmp);
if (PyObject_IsTrue(is_metrics_enabled_py)) {
metrics_tmp.library_name = metrics_library_name;
client_options.metrics = &metrics_tmp;
}

/* CALLBACKS */

Py_INCREF(client_core_py);
Expand Down
46 changes: 44 additions & 2 deletions source/mqtt_client_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,39 @@ static void s_on_connect(
PyGILState_Release(state);
}

/* If unsuccessful, false is returned and a Python error has been set */
bool s_set_metrics(struct aws_mqtt_client_connection *connection, PyObject *metrics) {
assert(metrics && (metrics != Py_None));

if (connection == NULL) {
return false;
}

bool success = false;

PyObject *library_name_py = PyObject_GetAttrString(metrics, "library_name");
struct aws_byte_cursor library_name = aws_byte_cursor_from_pyunicode(library_name_py);
if (!library_name.ptr) {
PyErr_SetString(PyExc_TypeError, "metrics.library_name must be str type");
goto done;
}

struct aws_mqtt_iot_metrics metrics_struct = {
.library_name = library_name,
};

if (aws_mqtt_client_connection_set_metrics(connection, &metrics_struct)) {
PyErr_SetAwsLastError();
goto done;
}

success = true;

done:
Py_DECREF(library_name_py);
return success;
}

/* If unsuccessful, false is returned and a Python error has been set */
bool s_set_will(struct aws_mqtt_client_connection *connection, PyObject *will) {
assert(will && (will != Py_None));
Expand Down Expand Up @@ -668,9 +701,10 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args)
PyObject *is_clean_session;
PyObject *on_connect;
PyObject *proxy_options_py;
PyObject *metrics_py;
if (!PyArg_ParseTuple(
args,
"Os#s#IOOKKHIIOz#z#OOO",
"Os#s#IOOKKHIIOz#z#OOOO",
&impl_capsule,
&client_id,
&client_id_len,
Expand All @@ -691,7 +725,8 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args)
&password_len,
&is_clean_session,
&on_connect,
&proxy_options_py)) {
&proxy_options_py,
&metrics_py)) {
return NULL;
}

Expand Down Expand Up @@ -773,6 +808,13 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args)
}
}

/* Set metrics if provided */
if (metrics_py != Py_None) {
if (!s_set_metrics(py_connection->native, metrics_py)) {
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "MQTT connection failed to set AWS IoT metrics.");
}
}

if (on_connect != Py_None) {
Py_INCREF(on_connect);
py_connection->on_connect = on_connect;
Expand Down
48 changes: 46 additions & 2 deletions test/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ def _test_mqtt311_direct_connect_basic_auth(self):
host_name=input_host_name,
port=input_port,
username=input_username,
password=input_password)
password=input_password,
enable_metrics=False)
connection.connect().result(TIMEOUT)
connection.disconnect().result(TIMEOUT)

Expand Down Expand Up @@ -760,7 +761,8 @@ def sign_function(transform_args, **kwargs):
username=input_username,
password=input_password,
use_websockets=True,
websocket_handshake_transform=sign_function)
websocket_handshake_transform=sign_function,
enable_metrics=False)
connection.connect().result(TIMEOUT)
connection.disconnect().result(TIMEOUT)

Expand Down Expand Up @@ -831,6 +833,48 @@ def sign_function(transform_args, **kwargs):
def test_mqtt311_websocket_connect_http_proxy_tls(self):
test_retry_wrapper(self._test_mqtt311_websocket_connect_http_proxy_tls)

# ==============================================================
# METRICS TEST CASES
# ==============================================================

def _test_mqtt311_direct_connect_basic_auth_metrics_enabled(self):
"""Test that connection fails with basic auth when metrics are enabled.

When metrics are enabled, the SDK appends metrics information to the username field,
which corrupts the basic authentication and causes the connection to fail.
"""
input_host_name = _get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_HOST")
input_port = int(_get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_PORT"))
input_username = _get_env_variable("AWS_TEST_MQTT311_BASIC_AUTH_USERNAME")
input_password = _get_env_variable("AWS_TEST_MQTT311_BASIC_AUTH_PASSWORD")

elg = EventLoopGroup()
resolver = DefaultHostResolver(elg)
bootstrap = ClientBootstrap(elg, resolver)
client = Client(bootstrap, None)

# Create connection with enable_metrics=True explicitly
# This should fail because metrics appends to username, corrupting basic auth
connection = Connection(
client=client,
client_id=create_client_id(),
host_name=input_host_name,
port=input_port,
username=input_username,
password=input_password,
enable_metrics=True)

# Connection should fail because metrics corrupts the username for basic auth
exception_occurred = False
try:
connection.connect().result(TIMEOUT)
except Exception:
exception_occurred = True
self.assertTrue(exception_occurred, "Connection should fail when metrics are enabled with basic auth!")

def test_mqtt311_direct_connect_basic_auth_metrics_enabled(self):
test_retry_wrapper(self._test_mqtt311_direct_connect_basic_auth_metrics_enabled)


if __name__ == 'main':
unittest.main()
Loading