66from taskiq .cli .utils import import_object
77
88
9- def startup_event_generator (app : FastAPI ) -> Callable [[TaskiqState ], Awaitable [None ]]:
9+ def startup_event_generator (
10+ broker : AsyncBroker ,
11+ app_path : str ,
12+ ) -> Callable [[TaskiqState ], Awaitable [None ]]:
1013 """
1114 Generate shutdown event.
1215
13- This function takes FastAPI application
16+ This function takes FastAPI application path
1417 and runs startup event on broker's startup.
1518
16- :param app: fastapi application.
19+ :param broker: current broker.
20+ :param app_path: fastapi application path.
1721 :returns: startup handler.
1822 """
1923
2024 async def startup (state : TaskiqState ) -> None :
25+ if not broker .is_worker_process :
26+ return
27+ app = import_object (app_path )
28+ if not isinstance (app , FastAPI ):
29+ app = app ()
30+
31+ if not isinstance (app , FastAPI ):
32+ raise ValueError (f"'{ app_path } ' is not a FastAPI application." )
33+
2134 state .fastapi_app = app
2235 app .router .routes = []
2336 await app .router .startup ()
37+ populate_dependency_context (broker , app )
2438
2539 return startup
2640
2741
28- def shutdown_event_generator (app : FastAPI ) -> Callable [[TaskiqState ], Awaitable [None ]]:
42+ def shutdown_event_generator (
43+ broker : AsyncBroker ,
44+ ) -> Callable [[TaskiqState ], Awaitable [None ]]:
2945 """
3046 Generate shutdown event.
3147
3248 This function takes FastAPI application
3349 and runs shutdown event on broker's shutdown.
3450
35- :param app : current application .
36- :return: startup event handler.
51+ :param broker : current broker .
52+ :return: shutdown event handler.
3753 """
3854
39- async def startup (_ : TaskiqState ) -> None :
40- await app .router .shutdown ()
55+ async def shutdown (state : TaskiqState ) -> None :
56+ if not broker .is_worker_process :
57+ return
58+ await state .fastapi_app .router .shutdown ()
4159
42- return startup
60+ return shutdown
4361
4462
4563def init (broker : AsyncBroker , app_path : str ) -> None :
@@ -49,35 +67,21 @@ def init(broker: AsyncBroker, app_path: str) -> None:
4967 This is the main function to integrate FastAPI
5068 with taskiq.
5169
52- This function imports fastapi application by
53- python's path string and adds startup events
54- for broker .
70+ It creates startup events for broker. So
71+ in worker processes all fastapi
72+ startup events will run .
5573
5674 :param broker: current broker to use.
5775 :param app_path: path to fastapi application.
58- :raises ValueError: if fastapi cannot be resolved.
5976 """
60- if not broker .is_worker_process :
61- return
62-
63- app = import_object (app_path )
64-
65- if not isinstance (app , FastAPI ):
66- app = app ()
67-
68- if not isinstance (app , FastAPI ):
69- raise ValueError (f"'{ app_path } ' is not a FastAPI application." )
70-
71- populate_dependency_context (broker , app )
72-
7377 broker .add_event_handler (
7478 TaskiqEvents .WORKER_STARTUP ,
75- startup_event_generator (app ),
79+ startup_event_generator (broker , app_path ),
7680 )
7781
7882 broker .add_event_handler (
7983 TaskiqEvents .WORKER_SHUTDOWN ,
80- shutdown_event_generator (app ),
84+ shutdown_event_generator (broker ),
8185 )
8286
8387
0 commit comments