From 22a3c4d7aee1a44c27daab24f242772fe120b395 Mon Sep 17 00:00:00 2001 From: degerahmet Date: Tue, 18 Mar 2025 10:01:56 -0700 Subject: [PATCH 1/2] ##556 Add value_serializer parameter to table and collection definitions --- faust/app/base.py | 4 +++- faust/tables/base.py | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/faust/app/base.py b/faust/app/base.py index 9bb15a18d..04a591c99 100644 --- a/faust/app/base.py +++ b/faust/app/base.py @@ -477,7 +477,7 @@ def __init__( # for the web server. self._monitor = monitor - # Any additional asyncio.Task's specified using @app.task decorator. + # Any additional asyncio.Tasks specified using @app.task decorator. self._app_tasks = [] # Called as soon as the a worker is fully operational. @@ -1138,6 +1138,7 @@ def Table( window: Optional[WindowT] = None, partitions: Optional[int] = None, help: Optional[str] = None, + value_serializer: str = None, # Add this line **kwargs: Any, ) -> TableT: """Define new table. @@ -1169,6 +1170,7 @@ def Table( beacon=self.tables.beacon, partitions=partitions, help=help, + value_serializer=value_serializer, # Add this line **kwargs, ), ) diff --git a/faust/tables/base.py b/faust/tables/base.py index ca26c0f11..daf5f2fa2 100644 --- a/faust/tables/base.py +++ b/faust/tables/base.py @@ -124,6 +124,7 @@ def __init__( on_window_close: Optional[WindowCloseCallback] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, + value_serializer: Optional[CodecArg] = None, # Add this line **kwargs: Any, ) -> None: Service.__init__(self, loop=app.loop, **kwargs) @@ -157,7 +158,7 @@ def __init__( # Possible values json and raw # Fallback to json self.key_serializer = self._serializer_from_type(self.key_type) - self.value_serializer = self._serializer_from_type(self.value_type) + self.value_serializer = value_serializer # Add this line # Table key expiration self._partition_timestamp_keys = defaultdict(set) @@ -199,7 +200,7 @@ def _new_store_by_url(self, url: Union[str, URL]) -> StoreT: table_name=self.name, key_type=self.key_type, key_serializer=self.key_serializer, - value_serializer=self.value_serializer, + value_serializer=self.value_serializer, # Add this line value_type=self.value_type, loop=self.loop, options=self.options, From ce6fad489315f2dd37293c81dead537aee6b85e2 Mon Sep 17 00:00:00 2001 From: degerahmet Date: Wed, 19 Mar 2025 23:04:55 -0700 Subject: [PATCH 2/2] Refactor value_serializer parameter handling in App and Collection classes --- faust/app/base.py | 4 ++-- faust/tables/base.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/faust/app/base.py b/faust/app/base.py index 04a591c99..881ca52b5 100644 --- a/faust/app/base.py +++ b/faust/app/base.py @@ -1138,7 +1138,7 @@ def Table( window: Optional[WindowT] = None, partitions: Optional[int] = None, help: Optional[str] = None, - value_serializer: str = None, # Add this line + value_serializer: str = None, **kwargs: Any, ) -> TableT: """Define new table. @@ -1170,7 +1170,7 @@ def Table( beacon=self.tables.beacon, partitions=partitions, help=help, - value_serializer=value_serializer, # Add this line + value_serializer=value_serializer, **kwargs, ), ) diff --git a/faust/tables/base.py b/faust/tables/base.py index daf5f2fa2..d482b78bf 100644 --- a/faust/tables/base.py +++ b/faust/tables/base.py @@ -124,7 +124,7 @@ def __init__( on_window_close: Optional[WindowCloseCallback] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, - value_serializer: Optional[CodecArg] = None, # Add this line + value_serializer: Optional[CodecArg] = None, **kwargs: Any, ) -> None: Service.__init__(self, loop=app.loop, **kwargs) @@ -158,7 +158,7 @@ def __init__( # Possible values json and raw # Fallback to json self.key_serializer = self._serializer_from_type(self.key_type) - self.value_serializer = value_serializer # Add this line + self.value_serializer = value_serializer # Table key expiration self._partition_timestamp_keys = defaultdict(set) @@ -200,7 +200,7 @@ def _new_store_by_url(self, url: Union[str, URL]) -> StoreT: table_name=self.name, key_type=self.key_type, key_serializer=self.key_serializer, - value_serializer=self.value_serializer, # Add this line + value_serializer=self.value_serializer, value_type=self.value_type, loop=self.loop, options=self.options,