From 0fa4c0274dca0bb911d5e1b66c34cfec4b656217 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 12 Dec 2025 12:07:46 +0300 Subject: [PATCH 1/7] Add FastStream example with Redis integration --- Justfile | 6 ++++++ examples/faststream_app.py | 42 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 examples/faststream_app.py diff --git a/Justfile b/Justfile index af54993..7025ef9 100644 --- a/Justfile +++ b/Justfile @@ -22,3 +22,9 @@ publish: uv version $GITHUB_REF_NAME uv build uv publish --token $PYPI_TOKEN + +run-faststream-example *args: + #!/bin/bash + trap 'echo; docker rm microbootstrap-redis' EXIT + docker run --name microbootstrap-redis -p 6379:6379 -d redis + uv run examples/faststream_app.py diff --git a/examples/faststream_app.py b/examples/faststream_app.py new file mode 100644 index 0000000..a256c06 --- /dev/null +++ b/examples/faststream_app.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from faststream.asgi import AsgiFastStream +from faststream.redis import RedisBroker + +from microbootstrap.bootstrappers.faststream import FastStreamBootstrapper +from microbootstrap.config.faststream import FastStreamConfig +from microbootstrap.granian_server import create_granian_server +from microbootstrap.settings import FastStreamSettings + + +class Settings(FastStreamSettings): ... + + +settings = Settings() + + +def create_app() -> AsgiFastStream: + broker = RedisBroker() + + @broker.subscriber("first") + @broker.publisher("second") + def _(message: str) -> str: + print(message) # noqa: T201 + return "Hi from first handler!" + + @broker.subscriber("second") + def _(message: str) -> None: + print(message) # noqa: T201 + + app = FastStreamBootstrapper(settings).configure_application(FastStreamConfig(broker=broker)).bootstrap() + + @app.after_startup + async def send_first_message() -> None: + await broker.connect() + await broker.publish("Hi from startup!", "first") + + return app + + +if __name__ == "__main__": + create_granian_server("examples.faststream_app:create_app", settings, factory=True).serve() From 34fb90228d3ec87fd98ccd9189bc12098249fee8 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 12 Dec 2025 12:36:46 +0300 Subject: [PATCH 2/7] Add force flag to docker cleanup and configure structlog for FastStream broker --- Justfile | 2 +- examples/faststream_app.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Justfile b/Justfile index 7025ef9..cb004d4 100644 --- a/Justfile +++ b/Justfile @@ -25,6 +25,6 @@ publish: run-faststream-example *args: #!/bin/bash - trap 'echo; docker rm microbootstrap-redis' EXIT + trap 'echo; docker rm -f microbootstrap-redis' EXIT docker run --name microbootstrap-redis -p 6379:6379 -d redis uv run examples/faststream_app.py diff --git a/examples/faststream_app.py b/examples/faststream_app.py index a256c06..b9801f9 100644 --- a/examples/faststream_app.py +++ b/examples/faststream_app.py @@ -1,5 +1,7 @@ from __future__ import annotations +import structlog +from faststream._internal.logger.logger_proxy import RealLoggerObject from faststream.asgi import AsgiFastStream from faststream.redis import RedisBroker @@ -28,7 +30,12 @@ def _(message: str) -> str: def _(message: str) -> None: print(message) # noqa: T201 - app = FastStreamBootstrapper(settings).configure_application(FastStreamConfig(broker=broker)).bootstrap() + app = ( + FastStreamBootstrapper(settings) + .configure_application(FastStreamConfig(broker=broker, logger=structlog.get_logger(__name__))) + .bootstrap() + ) + broker.config.broker_config.logger.logger = RealLoggerObject(structlog.get_logger(__name__)) @app.after_startup async def send_first_message() -> None: From 740d1fdfc59f5422531f7c1a4f87b9e9b5d69a4a Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 12 Dec 2025 12:41:07 +0300 Subject: [PATCH 3/7] Add TestApp context manager and debug output to foreign logs test --- tests/instruments/test_logging.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/instruments/test_logging.py b/tests/instruments/test_logging.py index 0e988d9..8495eef 100644 --- a/tests/instruments/test_logging.py +++ b/tests/instruments/test_logging.py @@ -7,6 +7,7 @@ import litestar import pytest from fastapi.testclient import TestClient as FastAPITestClient +from faststream import TestApp from faststream.redis import RedisBroker, TestRedisBroker from litestar.testing import TestClient as LitestarTestClient from opentelemetry import trace @@ -240,15 +241,15 @@ async def test_faststream(self, capsys: pytest.CaptureFixture[str]) -> None: async def greet() -> None: logger.info("said hi") - ( + app = ( FastStreamBootstrapper(FastStreamSettings(service_debug=False, logging_buffer_capacity=0)) .configure_application(FastStreamConfig(broker=broker)) .bootstrap() ) - async with TestRedisBroker(broker): + async with TestRedisBroker(broker), TestApp(app): await broker.publish(message="hello", channel="greetings") - stdout = capsys.readouterr().out + raise Exception(stdout) assert '{"event":"said hi","level":"info","logger":"root"' in stdout assert stdout.count("said hi") == 1 From 3c242841fb276e1c495a1656f2ec6a9ac1a42e1b Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 12 Dec 2025 12:52:54 +0300 Subject: [PATCH 4/7] Refactor logging configuration for FastStream application and brokers --- examples/faststream_app.py | 4 ++-- microbootstrap/bootstrappers/faststream.py | 12 +++++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/examples/faststream_app.py b/examples/faststream_app.py index b9801f9..d05d0fb 100644 --- a/examples/faststream_app.py +++ b/examples/faststream_app.py @@ -32,10 +32,10 @@ def _(message: str) -> None: app = ( FastStreamBootstrapper(settings) - .configure_application(FastStreamConfig(broker=broker, logger=structlog.get_logger(__name__))) + .configure_application(FastStreamConfig(broker=broker)) .bootstrap() ) - broker.config.broker_config.logger.logger = RealLoggerObject(structlog.get_logger(__name__)) + # broker.config.broker_config.logger.logger = RealLoggerObject(structlog.get_logger(__name__)) @app.after_startup async def send_first_message() -> None: diff --git a/microbootstrap/bootstrappers/faststream.py b/microbootstrap/bootstrappers/faststream.py index a4fa7a1..9e72305 100644 --- a/microbootstrap/bootstrappers/faststream.py +++ b/microbootstrap/bootstrappers/faststream.py @@ -5,6 +5,7 @@ import prometheus_client import structlog import typing_extensions +from faststream._internal.logger.logger_proxy import RealLoggerObject from faststream.asgi import AsgiFastStream, AsgiResponse from faststream.asgi import get as handle_get from faststream.specification import AsyncAPI @@ -71,10 +72,19 @@ def get_config_type(cls) -> type[FastStreamOpentelemetryConfig]: return FastStreamOpentelemetryConfig +faststream_app_logger: typing.Final = structlog.get_logger("microbootstrap.faststream.app") +faststream_broker_logger: typing.Final = structlog.get_logger("microbootstrap.faststream.broker") + + @FastStreamBootstrapper.use_instrument() class FastStreamLoggingInstrument(LoggingInstrument): def bootstrap_before(self) -> dict[str, typing.Any]: - return {"logger": structlog.get_logger("microbootstrap-faststream")} + return {"logger": faststream_app_logger} + + def bootstrap_after(self, application: AsgiFastStream) -> AsgiFastStream: # type: ignore[override] + for one_broker in application.brokers: + one_broker.config.broker_config.logger.logger = RealLoggerObject(faststream_broker_logger) + return application @FastStreamBootstrapper.use_instrument() From 0bf2ed453e9ff63ad51edf80d3ae9213d57ed20c Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 12 Dec 2025 12:53:25 +0300 Subject: [PATCH 5/7] Remove unused imports and simplify app creation in faststream example --- examples/faststream_app.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/examples/faststream_app.py b/examples/faststream_app.py index d05d0fb..2508823 100644 --- a/examples/faststream_app.py +++ b/examples/faststream_app.py @@ -1,8 +1,6 @@ from __future__ import annotations +from typing import TYPE_CHECKING -import structlog -from faststream._internal.logger.logger_proxy import RealLoggerObject -from faststream.asgi import AsgiFastStream from faststream.redis import RedisBroker from microbootstrap.bootstrappers.faststream import FastStreamBootstrapper @@ -11,6 +9,10 @@ from microbootstrap.settings import FastStreamSettings +if TYPE_CHECKING: + from faststream.asgi import AsgiFastStream + + class Settings(FastStreamSettings): ... @@ -30,12 +32,7 @@ def _(message: str) -> str: def _(message: str) -> None: print(message) # noqa: T201 - app = ( - FastStreamBootstrapper(settings) - .configure_application(FastStreamConfig(broker=broker)) - .bootstrap() - ) - # broker.config.broker_config.logger.logger = RealLoggerObject(structlog.get_logger(__name__)) + app = FastStreamBootstrapper(settings).configure_application(FastStreamConfig(broker=broker)).bootstrap() @app.after_startup async def send_first_message() -> None: From 9831ce083dec3a430f93b729b516167197f3989a Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 12 Dec 2025 12:57:33 +0300 Subject: [PATCH 6/7] Remove unused TestApp import and simplify test setup --- tests/instruments/test_logging.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/instruments/test_logging.py b/tests/instruments/test_logging.py index 8495eef..0e988d9 100644 --- a/tests/instruments/test_logging.py +++ b/tests/instruments/test_logging.py @@ -7,7 +7,6 @@ import litestar import pytest from fastapi.testclient import TestClient as FastAPITestClient -from faststream import TestApp from faststream.redis import RedisBroker, TestRedisBroker from litestar.testing import TestClient as LitestarTestClient from opentelemetry import trace @@ -241,15 +240,15 @@ async def test_faststream(self, capsys: pytest.CaptureFixture[str]) -> None: async def greet() -> None: logger.info("said hi") - app = ( + ( FastStreamBootstrapper(FastStreamSettings(service_debug=False, logging_buffer_capacity=0)) .configure_application(FastStreamConfig(broker=broker)) .bootstrap() ) - async with TestRedisBroker(broker), TestApp(app): + async with TestRedisBroker(broker): await broker.publish(message="hello", channel="greetings") + stdout = capsys.readouterr().out - raise Exception(stdout) assert '{"event":"said hi","level":"info","logger":"root"' in stdout assert stdout.count("said hi") == 1 From c8870639f0585a41bb5ba3976e6d0de8edd42675 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Fri, 12 Dec 2025 12:59:11 +0300 Subject: [PATCH 7/7] Add typing imports and finalize application variable --- examples/faststream_app.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/examples/faststream_app.py b/examples/faststream_app.py index 2508823..0fe5717 100644 --- a/examples/faststream_app.py +++ b/examples/faststream_app.py @@ -1,4 +1,5 @@ from __future__ import annotations +import typing from typing import TYPE_CHECKING from faststream.redis import RedisBroker @@ -16,7 +17,7 @@ class Settings(FastStreamSettings): ... -settings = Settings() +settings: typing.Final = Settings() def create_app() -> AsgiFastStream: @@ -32,14 +33,16 @@ def _(message: str) -> str: def _(message: str) -> None: print(message) # noqa: T201 - app = FastStreamBootstrapper(settings).configure_application(FastStreamConfig(broker=broker)).bootstrap() + application: typing.Final = ( + FastStreamBootstrapper(settings).configure_application(FastStreamConfig(broker=broker)).bootstrap() + ) - @app.after_startup + @application.after_startup async def send_first_message() -> None: await broker.connect() await broker.publish("Hi from startup!", "first") - return app + return application if __name__ == "__main__":