diff --git a/Justfile b/Justfile index af54993..cb004d4 100644 --- a/Justfile +++ b/Justfile @@ -22,3 +22,9 @@ publish: uv version $GITHUB_REF_NAME uv build uv publish --token $PYPI_TOKEN + +run-faststream-example *args: + #!/bin/bash + trap 'echo; docker rm -f microbootstrap-redis' EXIT + docker run --name microbootstrap-redis -p 6379:6379 -d redis + uv run examples/faststream_app.py diff --git a/examples/faststream_app.py b/examples/faststream_app.py new file mode 100644 index 0000000..0fe5717 --- /dev/null +++ b/examples/faststream_app.py @@ -0,0 +1,49 @@ +from __future__ import annotations +import typing +from typing import TYPE_CHECKING + +from faststream.redis import RedisBroker + +from microbootstrap.bootstrappers.faststream import FastStreamBootstrapper +from microbootstrap.config.faststream import FastStreamConfig +from microbootstrap.granian_server import create_granian_server +from microbootstrap.settings import FastStreamSettings + + +if TYPE_CHECKING: + from faststream.asgi import AsgiFastStream + + +class Settings(FastStreamSettings): ... + + +settings: typing.Final = Settings() + + +def create_app() -> AsgiFastStream: + broker = RedisBroker() + + @broker.subscriber("first") + @broker.publisher("second") + def _(message: str) -> str: + print(message) # noqa: T201 + return "Hi from first handler!" + + @broker.subscriber("second") + def _(message: str) -> None: + print(message) # noqa: T201 + + application: typing.Final = ( + FastStreamBootstrapper(settings).configure_application(FastStreamConfig(broker=broker)).bootstrap() + ) + + @application.after_startup + async def send_first_message() -> None: + await broker.connect() + await broker.publish("Hi from startup!", "first") + + return application + + +if __name__ == "__main__": + create_granian_server("examples.faststream_app:create_app", settings, factory=True).serve() diff --git a/microbootstrap/bootstrappers/faststream.py b/microbootstrap/bootstrappers/faststream.py index a4fa7a1..9e72305 100644 --- a/microbootstrap/bootstrappers/faststream.py +++ b/microbootstrap/bootstrappers/faststream.py @@ -5,6 +5,7 @@ import prometheus_client import structlog import typing_extensions +from faststream._internal.logger.logger_proxy import RealLoggerObject from faststream.asgi import AsgiFastStream, AsgiResponse from faststream.asgi import get as handle_get from faststream.specification import AsyncAPI @@ -71,10 +72,19 @@ def get_config_type(cls) -> type[FastStreamOpentelemetryConfig]: return FastStreamOpentelemetryConfig +faststream_app_logger: typing.Final = structlog.get_logger("microbootstrap.faststream.app") +faststream_broker_logger: typing.Final = structlog.get_logger("microbootstrap.faststream.broker") + + @FastStreamBootstrapper.use_instrument() class FastStreamLoggingInstrument(LoggingInstrument): def bootstrap_before(self) -> dict[str, typing.Any]: - return {"logger": structlog.get_logger("microbootstrap-faststream")} + return {"logger": faststream_app_logger} + + def bootstrap_after(self, application: AsgiFastStream) -> AsgiFastStream: # type: ignore[override] + for one_broker in application.brokers: + one_broker.config.broker_config.logger.logger = RealLoggerObject(faststream_broker_logger) + return application @FastStreamBootstrapper.use_instrument()