diff --git a/python/hsfs/client/online_store_rest_client.py b/python/hsfs/client/online_store_rest_client.py index b733269a1a..4a256a8e04 100644 --- a/python/hsfs/client/online_store_rest_client.py +++ b/python/hsfs/client/online_store_rest_client.py @@ -49,21 +49,24 @@ def init_or_reset_online_store_rest_client( transport=transport, optional_config=optional_config ) else: - _logger.warning( - "Online Store Rest Client is already initialised. To reset connection or/and override configuration, " - + "use reset_online_store_rest_client flag.", - stacklevel=2, - ) + if _logger.isEnabledFor(logging.WARNING): + _logger.warning( + "Online Store Rest Client is already initialised. To reset connection or/and override configuration, " + + "use reset_online_store_rest_client flag.", + stacklevel=2, + ) def get_instance() -> OnlineStoreRestClientSingleton: global _online_store_rest_client if _online_store_rest_client is None: - _logger.warning( - "Online Store Rest Client is not initialised. Initialising with default configuration." - ) + if _logger.isEnabledFor(logging.WARNING): + _logger.warning( + "Online Store Rest Client is not initialised. Initialising with default configuration." + ) _online_store_rest_client = OnlineStoreRestClientSingleton() - _logger.debug("Accessing global Online Store Rest Client instance.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Accessing global Online Store Rest Client instance.") return _online_store_rest_client @@ -91,11 +94,13 @@ def __init__( ] = None, optional_config: Optional[Dict[str, Any]] = None, ): - _logger.debug( - f"Initialising Online Store Rest Client {'with optional configuration' if optional_config else ''}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Initialising Online Store Rest Client {'with optional configuration' if optional_config else ''}." + ) if optional_config: - _logger.debug(f"Optional Config: {optional_config!r}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Optional Config: {optional_config!r}") self._check_hopsworks_connection() self.variable_api = variable_api.VariableApi() self._auth: client.auth.OnlineStoreKeyAuth @@ -116,14 +121,17 @@ def reset_client( ] = None, optional_config: Optional[Dict[str, Any]] = None, ): - _logger.debug( - f"Resetting Online Store Rest Client {'with optional configuration' if optional_config else ''}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Resetting Online Store Rest Client {'with optional configuration' if optional_config else ''}." + ) if optional_config: - _logger.debug(f"Optional Config: {optional_config}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Optional Config: {optional_config}") self._check_hopsworks_connection() if hasattr(self, "_session") and self._session: - _logger.debug("Closing existing session.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Closing existing session.") self._session.close() delattr(self, "_session") self._setup_rest_client( @@ -140,26 +148,31 @@ def _setup_rest_client( optional_config: Optional[Dict[str, Any]] = None, use_current_config: bool = True, ): - _logger.debug("Setting up Online Store Rest Client.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Setting up Online Store Rest Client.") if optional_config and not isinstance(optional_config, dict): raise ValueError( "optional_config must be a dictionary. See documentation for allowed keys and values." ) - _logger.debug("Optional Config: %s", optional_config) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Optional Config: %s", optional_config) if not use_current_config: - _logger.debug( - "Retrieving default configuration for Online Store REST Client." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Retrieving default configuration for Online Store REST Client." + ) self._current_config = self._get_default_client_config() if optional_config: - _logger.debug( - "Updating default configuration with provided optional configuration." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Updating default configuration with provided optional configuration." + ) self._current_config.update(optional_config) self._set_auth(optional_config) if not hasattr(self, "_session") or not self._session: - _logger.debug("Initialising new requests session.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Initialising new requests session.") self._session = requests.Session() else: raise ValueError( @@ -167,19 +180,22 @@ def _setup_rest_client( + "to True to reset the online_store_client_connection" ) if transport is not None: - _logger.debug("Setting custom transport adapter.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Setting custom transport adapter.") self._session.mount("https://", transport) self._session.mount("http://", transport) if not self._current_config[self.VERIFY_CERTS]: - _logger.warning( - "Disabling SSL certificate verification. This is not recommended for production environments." - ) + if _logger.isEnabledFor(logging.WARNING): + _logger.warning( + "Disabling SSL certificate verification. This is not recommended for production environments." + ) self._session.verify = False else: - _logger.debug( - f"Setting SSL certificate verification using CA Certs path: {self._current_config[self.CA_CERTS]}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Setting SSL certificate verification using CA Certs path: {self._current_config[self.CA_CERTS]}" + ) self._session.verify = self._current_config[self.CA_CERTS] # Set base_url @@ -202,15 +218,19 @@ def _setup_rest_client( ), "Online Store REST Client Configuration failed to initialise." def _get_default_client_config(self) -> Dict[str, Any]: - _logger.debug("Retrieving default configuration for Online Store REST Client.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Retrieving default configuration for Online Store REST Client." + ) default_config = self._get_default_static_parameters_config() default_config.update(self._get_default_dynamic_parameters_config()) return default_config def _get_default_static_parameters_config(self) -> Dict[str, Any]: - _logger.debug( - "Retrieving default static configuration for Online Store REST Client." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Retrieving default static configuration for Online Store REST Client." + ) return { self.TIMEOUT: self._DEFAULT_ONLINE_STORE_REST_CLIENT_TIMEOUT_SECOND, self.VERIFY_CERTS: self._DEFAULT_ONLINE_STORE_REST_CLIENT_VERIFY_CERTS, @@ -222,14 +242,18 @@ def _get_default_static_parameters_config(self) -> Dict[str, Any]: def _get_default_dynamic_parameters_config( self, ) -> Dict[str, Any]: - _logger.debug( - "Retrieving default dynamic configuration for Online Store REST Client." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Retrieving default dynamic configuration for Online Store REST Client." + ) url = furl(self._get_rondb_rest_server_endpoint()) - _logger.debug(f"Default RonDB Rest Server host and port: {url.host}:{url.port}") - _logger.debug( - f"Using CA Certs from Hopsworks Client: {client.get_instance()._get_ca_chain_path()}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Default RonDB Rest Server host and port: {url.host}:{url.port}" + ) + _logger.debug( + f"Using CA Certs from Hopsworks Client: {client.get_instance()._get_ca_chain_path()}" + ) return { self.HOST: url.host, self.PORT: url.port, @@ -248,31 +272,36 @@ def _get_rondb_rest_server_endpoint(self) -> str: str: RonDB Rest Server endpoint with default port. """ if client.get_instance()._is_external(): - _logger.debug( - "External Online Store REST Client : Retrieving RonDB Rest Server endpoint via loadbalancer." - ) - external_domain = self.variable_api.get_loadbalancer_external_domain() - if external_domain == "": + if _logger.isEnabledFor(logging.DEBUG): _logger.debug( "External Online Store REST Client : Loadbalancer external domain is not set. Using client host as endpoint." ) + external_domain = self.variable_api.get_loadbalancer_external_domain() + if external_domain == "": + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "External Online Store REST Client : Loadbalancer external domain is not set. Using client host as endpoint." + ) external_domain = client.get_instance().host default_url = f"https://{external_domain}:{self._DEFAULT_ONLINE_STORE_REST_CLIENT_PORT}" - _logger.debug( - f"External Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"External Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}" + ) return default_url else: - _logger.debug( - "Internal Online Store REST Client : Retrieving RonDB Rest Server endpoint via service discovery." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Internal Online Store REST Client : Retrieving RonDB Rest Server endpoint via service discovery." + ) service_discovery_domain = self.variable_api.get_service_discovery_domain() if service_discovery_domain == "": raise FeatureStoreException("Service discovery domain is not set.") default_url = f"https://rdrs.service.{service_discovery_domain}:{self._DEFAULT_ONLINE_STORE_REST_CLIENT_PORT}" - _logger.debug( - f"Internal Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Internal Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}" + ) return default_url def send_request( @@ -284,9 +313,10 @@ def send_request( ) -> requests.Response: url = self._base_url.copy() url.path.segments.extend(path_params) - _logger.debug(f"Sending {method} request to {url.url}.") - _logger.debug(f"Provided Data: {data}") - _logger.debug(f"Provided Headers: {headers}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Sending {method} request to {url.url}.") + _logger.debug(f"Provided Data: {data}") + _logger.debug(f"Provided Headers: {headers}") prepped_request = self._session.prepare_request( requests.Request( method, url=url.url, headers=headers, data=data, auth=self.auth @@ -300,13 +330,15 @@ def send_request( ) def _check_hopsworks_connection(self) -> None: - _logger.debug("Checking Hopsworks connection.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Checking Hopsworks connection.") assert ( client.get_instance() is not None and client.get_instance()._connected ), """Hopsworks Client is not connected. Please connect to Hopsworks cluster via hopsworks.login or hsfs.connection before initialising the Online Store REST Client. """ - _logger.debug("Hopsworks connection is active.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Hopsworks connection is active.") def _set_auth(self, optional_config: Optional[Dict[str, Any]] = None) -> None: """Set authentication object for the Online Store REST Client. @@ -314,28 +346,32 @@ def _set_auth(self, optional_config: Optional[Dict[str, Any]] = None) -> None: RonDB Rest Server uses Hopsworks Api Key to authenticate requests via the X-API-KEY header by default. The api key determines the permissions of the user making the request for access to a given Feature Store. """ - _logger.debug("Setting authentication for Online Store REST Client.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Setting authentication for Online Store REST Client.") if client.get_instance()._is_external(): assert hasattr( client.get_instance()._auth, "_token" ), "External client must use API Key authentication. Contact your system administrator." - _logger.debug( - "External Online Store REST Client : Setting authentication using Hopsworks Client API Key." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "External Online Store REST Client : Setting authentication using Hopsworks Client API Key." + ) self._auth = client.auth.OnlineStoreKeyAuth( client.get_instance()._auth._token ) elif isinstance(optional_config, dict) and optional_config.get( self.API_KEY, False ): - _logger.debug( - "Setting authentication using provided API Key from optional configuration." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Setting authentication using provided API Key from optional configuration." + ) self._auth = client.auth.OnlineStoreKeyAuth(optional_config[self.API_KEY]) elif hasattr(self, "_auth") and self._auth is not None: - _logger.debug( - "Authentication for Online Store REST Client is already set. Using existing authentication api key." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Authentication for Online Store REST Client is already set. Using existing authentication api key." + ) else: raise FeatureStoreException( "RonDB Rest Server uses Hopsworks Api Key to authenticate request." @@ -345,14 +381,15 @@ def _set_auth(self, optional_config: Optional[Dict[str, Any]] = None) -> None: def is_connected(self): """If Online Store Rest Client is initialised, ping RonDB Rest Server to ensure connection is active.""" if self._session is None: + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Checking Online Store REST Client is connected. Pinging RonDB Rest Server." + ) + raise FeatureStoreException("Online Store REST Client is not initialised.") + if _logger.isEnabledFor(logging.DEBUG): _logger.debug( "Checking Online Store REST Client is connected. Session is not initialised." ) - raise FeatureStoreException("Online Store REST Client is not initialised.") - - _logger.debug( - "Checking Online Store REST Client is connected. Pinging RonDB Rest Server." - ) if not self.send_request("GET", ["ping"]): warn("Ping failed, RonDB Rest Server is not reachable.", stacklevel=2) return False diff --git a/python/hsfs/core/online_store_rest_client_api.py b/python/hsfs/core/online_store_rest_client_api.py index 392ff87903..57cb32d50c 100644 --- a/python/hsfs/core/online_store_rest_client_api.py +++ b/python/hsfs/core/online_store_rest_client_api.py @@ -66,9 +66,10 @@ def get_single_raw_feature_vector(self, payload: Dict[str, Any]) -> Dict[str, An or authorization header (x-api-key) is not properly set. - 500: Internal server error. """ - _logger.debug( - f"Sending request to RonDB Rest Server with payload: {json.dumps(payload, indent=2, cls=util.NpDatetimeEncoder)}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Sending request to RonDB Rest Server with payload: {json.dumps(payload, indent=2, cls=util.NpDatetimeEncoder)}" + ) return self.handle_rdrs_feature_store_response( online_store_rest_client.get_instance().send_request( method="POST", @@ -110,9 +111,10 @@ def get_batch_raw_feature_vectors(self, payload: Dict[str, Any]) -> Dict[str, An or authorization header (x-api-key) is not properly set. - 500: Internal server error. """ - _logger.debug( - f"Sending request to RonDB Rest Server with payload: {json.dumps(payload, indent=2, cls=util.NpDatetimeEncoder)}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Sending request to RonDB Rest Server with payload: {json.dumps(payload, indent=2, cls=util.NpDatetimeEncoder)}" + ) return self.handle_rdrs_feature_store_response( online_store_rest_client.get_instance().send_request( method="POST", @@ -124,11 +126,13 @@ def get_batch_raw_feature_vectors(self, payload: Dict[str, Any]) -> Dict[str, An def ping_rondb_rest_server(self) -> int: """Ping the RonDB Rest Server to check if it is alive.""" - _logger.debug("Pinging RonDB Rest Server") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Pinging RonDB Rest Server") ping_response = online_store_rest_client.get_instance().send_request( method="GET", path_params=[self.PING_ENDPOINT] ) - _logger.debug(f"Received response from RonDB Rest Server: {ping_response}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Received response from RonDB Rest Server: {ping_response}") return ping_response def handle_rdrs_feature_store_response(self, response: Response) -> Dict[str, Any]: @@ -148,14 +152,16 @@ def handle_rdrs_feature_store_response(self, response: Response) -> Dict[str, An - 500: Internal server error. """ if response.status_code == 200: - _logger.debug( - "Received response from RonDB Rest Server with status code 200" - ) - _logger.debug(f"Response: {json.dumps(response.json(), indent=2)}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Received response from RonDB Rest Server with status code 200" + ) + _logger.debug(f"Response: {json.dumps(response.json(), indent=2)}") return response.json() else: - _logger.error( - f"Received response from RonDB Rest Server with status code {response.status_code}" - ) - _logger.error(f"Response: {response.text}") + if _logger.isEnabledFor(logging.ERROR): + _logger.error( + f"Received response from RonDB Rest Server with status code {response.status_code}" + ) + _logger.error(f"Response: {response.text}") raise exceptions.RestAPIError(response.url, response) diff --git a/python/hsfs/core/online_store_sql_engine.py b/python/hsfs/core/online_store_sql_engine.py index c2dd72c4a3..730a7162a8 100644 --- a/python/hsfs/core/online_store_sql_engine.py +++ b/python/hsfs/core/online_store_sql_engine.py @@ -47,7 +47,8 @@ def __init__( serving_keys: Optional[Set[ServingKey]] = None, connection_options: Optional[Dict[str, Any]] = None, ): - _logger.debug("Initialising Online Store Sql Client") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Initialising Online Store Sql Client") self._feature_store_id = feature_store_id self._skip_fg_ids: Set[int] = skip_fg_ids or set() self._external = external @@ -77,11 +78,13 @@ def fetch_prepared_statements( inference_helper_columns: bool, ) -> None: if isinstance(entity, feature_view.FeatureView): - _logger.debug( - f"Initialising prepared statements for feature view {entity.name} version {entity.version}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Initialising prepared statements for feature view {entity.name} version {entity.version}." + ) for key in self.get_prepared_statement_labels(inference_helper_columns): - _logger.debug(f"Fetching prepared statement for key {key}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Fetching prepared statement for key {key}") self.prepared_statements[key] = ( self.feature_view_api.get_serving_prepared_statement( entity.name, @@ -90,15 +93,18 @@ def fetch_prepared_statements( inference_helper_columns=key.endswith("helper_column"), ) ) - _logger.debug(f"{self.prepared_statements[key]}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"{self.prepared_statements[key]}") elif isinstance(entity, training_dataset.TrainingDataset): - _logger.debug( - f"Initialising prepared statements for training dataset {entity.name} version {entity.version}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Initialising prepared statements for training dataset {entity.name} version {entity.version}." + ) for key in self.get_prepared_statement_labels( with_inference_helper_column=False ): - _logger.debug(f"Fetching prepared statement for key {key}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Fetching prepared statement for key {key}") self.prepared_statements[key] = ( self.training_dataset_api.get_serving_prepared_statement( entity, batch=key.startswith("batch") @@ -110,9 +116,10 @@ def fetch_prepared_statements( ) if len(self.skip_fg_ids) > 0: - _logger.debug( - f"Skip feature groups {self.skip_fg_ids} when initialising prepared statements." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Skip feature groups {self.skip_fg_ids} when initialising prepared statements." + ) self.prepared_statements[key] = { ps for ps in self.prepared_statements[key] @@ -124,9 +131,10 @@ def init_prepared_statements( entity: Union[feature_view.FeatureView, training_dataset.TrainingDataset], inference_helper_columns: bool, ) -> None: - _logger.debug( - "Fetch and reset prepared statements and external as user may be re-initialising with different parameters" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Fetch and reset prepared statements and external as user may be re-initialising with different parameters" + ) self.fetch_prepared_statements(entity, inference_helper_columns) self.init_parametrize_and_serving_utils( @@ -134,7 +142,8 @@ def init_prepared_statements( ) for key in self.get_prepared_statement_labels(inference_helper_columns): - _logger.debug(f"Parametrize prepared statements for key {key}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Parametrize prepared statements for key {key}") self._parametrised_prepared_statements[key] = ( self._parametrize_prepared_statements( self.prepared_statements[key], batch=key.startswith("batch") @@ -145,10 +154,11 @@ def init_parametrize_and_serving_utils( self, prepared_statements: List[ServingPreparedStatement], ) -> None: - _logger.debug( - "Initializing parametrize and serving utils property using %s", - json.dumps(prepared_statements, default=lambda x: x.__dict__, indent=2), - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Initializing parametrize and serving utils property using %s", + json.dumps(prepared_statements, default=lambda x: x.__dict__, indent=2), + ) self.prefix_by_serving_index = { statement.prepared_statement_index: statement.prefix for statement in prepared_statements @@ -162,13 +172,14 @@ def init_parametrize_and_serving_utils( ) for statement in prepared_statements } - - _logger.debug("Build serving keys by PreparedStatementParameter.index") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Build serving keys by PreparedStatementParameter.index") for sk in self._serving_keys: self.serving_key_by_serving_index[sk.join_index] = ( self.serving_key_by_serving_index.get(sk.join_index, []) + [sk] ) - _logger.debug("Sort serving keys by PreparedStatementParameter.index") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Sort serving keys by PreparedStatementParameter.index") for join_index in self.serving_key_by_serving_index: # feature_name_order_by_psp do not include the join index when the joint feature only contains label only # But _serving_key_by_serving_index include the index when the join_index is 0 (left side) @@ -214,9 +225,10 @@ def init_async_mysql_connection(self, options=None): "Prepared statements are not initialized. " "Please call `init_prepared_statement` method first." ) - _logger.debug( - "Fetching storage connector for sql connection to Online Feature Store." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Fetching storage connector for sql connection to Online Feature Store." + ) self._online_connector = self._storage_connector_api.get_online_connector( self._feature_store_id ) @@ -224,12 +236,14 @@ def init_async_mysql_connection(self, options=None): self._hostname = util.get_host_name() if self._external else None if util.is_runtime_notebook(): - _logger.debug("Running in Jupyter notebook, applying nest_asyncio") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Running in Jupyter notebook, applying nest_asyncio") import nest_asyncio nest_asyncio.apply() else: - _logger.debug("Running in python script. Not applying nest_asyncio") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Running in python script. Not applying nest_asyncio") def get_single_feature_vector(self, entry: Dict[str, Any]) -> Dict[str, Any]: """Retrieve single vector with parallel queries using aiomysql engine.""" @@ -300,18 +314,21 @@ def _single_vector_result( ) # run all the prepared statements in parallel using aiomysql engine - _logger.debug( - f"Executing prepared statements for serving vector with entries: {bind_entries}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Executing prepared statements for serving vector with entries: {bind_entries}" + ) loop = self._get_or_create_event_loop() results_dict = loop.run_until_complete( self._execute_prep_statements(prepared_statement_execution, bind_entries) ) - _logger.debug(f"Retrieved feature vectors: {results_dict}") - _logger.debug("Constructing serving vector from results") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Retrieved feature vectors: {results_dict}") + _logger.debug("Constructing serving vector from results") for key in results_dict: for row in results_dict[key]: - _logger.debug(f"Processing row: {row} for prepared statement {key}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Processing row: {row} for prepared statement {key}") result_dict = dict(row) serving_vector.update(result_dict) @@ -323,9 +340,10 @@ def _batch_vector_results( prepared_statement_objects: Dict[int, sql.text], ): """Execute prepared statements in parallel using aiomysql engine.""" - _logger.debug( - f"Starting batch vector retrieval for {len(entries)} entries via aiomysql engine." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Starting batch vector retrieval for {len(entries)} entries via aiomysql engine." + ) # create dict object that will have of order of the vector as key and values as # vector itself to stitch them correctly if there are multiple feature groups involved. At this point we # expect that backend will return correctly ordered vectors. @@ -334,7 +352,10 @@ def _batch_vector_results( serving_keys_all_fg = [] prepared_stmts_to_execute = {} # construct the list of entry values for binding to query - _logger.debug(f"Parametrize prepared statements with entry values: {entries}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Parametrize prepared statements with entry values: {entries}" + ) for prepared_statement_index in prepared_statement_objects: # prepared_statement_index include fg with label only # But _serving_key_by_serving_index include the index when the join_index is 0 (left side) @@ -362,21 +383,26 @@ def _batch_vector_results( entries, ) ) - _logger.debug( - f"Prepared statement {prepared_statement_index} with entries: {entry_values_tuples}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Prepared statement {prepared_statement_index} with entries: {entry_values_tuples}" + ) entry_values[prepared_statement_index] = {"batch_ids": entry_values_tuples} - _logger.debug( - f"Executing prepared statements for batch vector with entries: {entry_values}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Executing prepared statements for batch vector with entries: {entry_values}" + ) # run all the prepared statements in parallel using aiomysql engine loop = self._get_or_create_event_loop() parallel_results = loop.run_until_complete( self._execute_prep_statements(prepared_stmts_to_execute, entry_values) ) - _logger.debug(f"Retrieved feature vectors: {parallel_results}, stitching them.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Retrieved feature vectors: {parallel_results}, stitching them." + ) # construct the results for prepared_statement_index in prepared_stmts_to_execute: statement_results = {} @@ -387,31 +413,37 @@ def _batch_vector_results( + sk.feature_name for sk in self.serving_key_by_serving_index[prepared_statement_index] ] - _logger.debug( - f"Use prefix from prepare statement because prefix from serving key is collision adjusted {prefix_features}." - ) - _logger.debug("iterate over results by index of the prepared statement") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Use prefix from prepare statement because prefix from serving key is collision adjusted {prefix_features}." + ) + _logger.debug("iterate over results by index of the prepared statement") for row in parallel_results[prepared_statement_index]: - _logger.debug(f"Processing row: {row}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Processing row: {row}") row_dict = dict(row) # can primary key be complex feature? No, not supported. result_dict = row_dict - _logger.debug( - f"Add result to statement results: {self._get_result_key(prefix_features, row_dict)} : {result_dict}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Add result to statement results: {self._get_result_key(prefix_features, row_dict)} : {result_dict}" + ) statement_results[self._get_result_key(prefix_features, row_dict)] = ( result_dict ) - - _logger.debug(f"Add partial results to batch results: {statement_results}") - for i, entry in enumerate(entries): + if _logger.isEnabledFor(logging.DEBUG): _logger.debug( - "Processing entry %s : %s", - entry, - statement_results.get( - self._get_result_key_serving_key(serving_keys, entry), {} - ), + f"Add partial results to batch results: {statement_results}" ) + for i, entry in enumerate(entries): + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Processing entry %s : %s", + entry, + statement_results.get( + self._get_result_key_serving_key(serving_keys, entry), {} + ), + ) batch_results[i].update( statement_results.get( self._get_result_key_serving_key(serving_keys, entry), {} @@ -421,42 +453,49 @@ def _batch_vector_results( def _get_or_create_event_loop(self): try: - _logger.debug("Acquiring or starting event loop for async engine.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Acquiring or starting event loop for async engine.") loop = asyncio.get_event_loop() asyncio.set_event_loop(loop) except RuntimeError as ex: if "There is no current event loop in thread" in str(ex): - _logger.debug( - "No existing running event loop. Creating new event loop." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "No existing running event loop. Creating new event loop." + ) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop def refresh_mysql_connection(self): - _logger.debug("Refreshing MySQL connection.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Refreshing MySQL connection.") try: - _logger.debug("Checking if the connection is still alive.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Checking if the connection is still alive.") with self._prepared_statement_engine.connect(): # This will raise an exception if the connection is closed pass except exc.OperationalError: - _logger.debug("Connection is closed, re-establishing connection.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Connection is closed, re-establishing connection.") self._set_mysql_connection() def _make_preview_statement(self, statement, n): return text(statement.text[: statement.text.find(" WHERE ")] + f" LIMIT {n}") def _set_mysql_connection(self, options=None): - _logger.debug( - "Retrieve MySQL connection details from the online storage connector." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Retrieve MySQL connection details from the online storage connector." + ) online_conn = self._storage_connector_api.get_online_connector( self._feature_store_id ) - _logger.debug( - f"Creating MySQL {'external' if self.external is True else ''}engine with options: {options}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Creating MySQL {'external' if self.external is True else ''}engine with options: {options}." + ) self._prepared_statement_engine = util.create_mysql_engine( online_conn, self._external, options=options ) @@ -472,7 +511,8 @@ def _parametrize_query(name: str, query_online: str) -> str: # `.*?` - matches any character (except for line terminators). `*?` Quantifier — # Matches between zero and unlimited times, expanding until needed, i.e 1st occurrence of `\?` # character. - _logger.debug(f"Parametrizing name {name} in query {query_online}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Parametrizing name {name} in query {query_online}") return re.sub( r"^(.*?)\?", r"\1:" + name, @@ -483,7 +523,10 @@ def _parametrize_query(name: str, query_online: str) -> str: def _get_result_key( primary_keys: List[str], result_dict: Dict[str, str] ) -> Tuple[str]: - _logger.debug(f"Get result key {primary_keys} from result dict {result_dict}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Get result key {primary_keys} from result dict {result_dict}" + ) result_key = [] for pk in primary_keys: result_key.append(result_dict.get(pk)) @@ -493,19 +536,22 @@ def _get_result_key( def _get_result_key_serving_key( serving_keys: List["ServingKey"], result_dict: Dict[str, Dict[str, Any]] ) -> Tuple[str]: - _logger.debug( - f"Get result key serving key {serving_keys} from result dict {result_dict}" - ) - result_key = [] - for sk in serving_keys: + if _logger.isEnabledFor(logging.DEBUG): _logger.debug( - f"Get result key for serving key {sk.required_serving_key} or {sk.feature_name}" + f"Get result key serving key {serving_keys} from result dict {result_dict}" ) + result_key = [] + for sk in serving_keys: + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Get result key for serving key {sk.required_serving_key} or {sk.feature_name}" + ) result_key.append( result_dict.get(sk.required_serving_key) or result_dict.get(sk.feature_name) ) - _logger.debug(f"Result key: {result_key}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Result key: {result_key}") return tuple(result_key) @staticmethod @@ -541,14 +587,17 @@ async def _query_async_sql(self, stmt, bind_params): ) async with self._connection_pool.acquire() as conn: # Execute the prepared statement - _logger.debug( - f"Executing prepared statement: {stmt} with bind params: {bind_params}" - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Executing prepared statement: {stmt} with bind params: {bind_params}" + ) cursor = await conn.execute(stmt, bind_params) # Fetch the result - _logger.debug("Waiting for resultset.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Waiting for resultset.") resultset = await cursor.fetchall() - _logger.debug(f"Retrieved resultset: {resultset}. Closing cursor.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Retrieved resultset: {resultset}. Closing cursor.") await cursor.close() return resultset @@ -580,7 +629,8 @@ async def _execute_prep_statements( # Run the queries in parallel using asyncio.gather results = await asyncio.gather(*tasks) except asyncio.CancelledError as e: - _logger.error(f"Failed executing prepared statements: {e}") + if _logger.isEnabledFor(logging.ERROR): + _logger.error(f"Failed executing prepared statements: {e}") raise e # Create a dict of results with the prepared statement index as key @@ -646,7 +696,8 @@ def prefix_by_serving_index(self) -> Dict[int, str]: @prefix_by_serving_index.setter def prefix_by_serving_index(self, prefix_by_serving_index: Dict[int, str]) -> None: - _logger.debug(f"Setting prefix by serving index {prefix_by_serving_index}.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Setting prefix by serving index {prefix_by_serving_index}.") self._prefix_by_serving_index = prefix_by_serving_index @property @@ -684,9 +735,10 @@ def serving_keys(self) -> Set[ServingKey]: "Prepared statements are not initialized. Please call `init_prepared_statement` method first." ) else: - _logger.debug( - "Build serving keys from prepared statements ignoring prefix to ensure compatibility with older version." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Build serving keys from prepared statements ignoring prefix to ensure compatibility with older version." + ) self._serving_keys = util.build_serving_keys_from_prepared_statements( self.prepared_statements[ self.BATCH_VECTOR_KEY diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 21106e9344..3670ead418 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -219,7 +219,8 @@ def setup_sql_client( inference_helper_columns: bool, options: Optional[Dict[str, Any]] = None, ) -> None: - _logger.debug("Initialising Online Store SQL client") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Initialising Online Store SQL client") self._sql_client = online_store_sql_engine.OnlineStoreSqlClient( feature_store_id=self._feature_store_id, skip_fg_ids=self._skip_fg_ids, @@ -239,7 +240,8 @@ def setup_rest_client_and_engine( reset_rest_client: bool = False, ): # naming is off here, but it avoids confusion with the argument init_rest_client - _logger.debug("Initialising Online Store REST client") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Initialising Online Store REST client") self._rest_client_engine = ( online_store_rest_client_engine.OnlineStoreRestClientEngine( feature_store_name=self._feature_store_name, @@ -275,17 +277,20 @@ def get_feature_vector( vector_db_features=vector_db_features, ) if len(rondb_entry) == 0: - _logger.debug("Empty entry for rondb, skipping fetching.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Empty entry for rondb, skipping fetching.") serving_vector = {} # updated below with vector_db_features and passed_features elif online_client_choice == self.DEFAULT_REST_CLIENT: - _logger.debug("get_feature_vector Online REST client") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("get_feature_vector Online REST client") serving_vector = self.rest_client_engine.get_single_feature_vector( rondb_entry, drop_missing=not allow_missing, return_type=self.rest_client_engine.RETURN_TYPE_FEATURE_VALUE_DICT, ) else: - _logger.debug("get_feature_vector Online SQL client") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("get_feature_vector Online SQL client") serving_vector = self.sql_client.get_single_feature_vector(rondb_entry) vector = self.assemble_feature_vector( @@ -350,7 +355,8 @@ def get_feature_vectors( skipped_empty_entries.append(idx) if online_client_choice == self.DEFAULT_REST_CLIENT and len(rondb_entries) > 0: - _logger.debug("get_batch_feature_vector Online REST client") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("get_batch_feature_vector Online REST client") batch_results = self.rest_client_engine.get_batch_feature_vectors( entries=rondb_entries, drop_missing=not allow_missing, @@ -358,13 +364,16 @@ def get_feature_vectors( ) elif len(rondb_entries) > 0: # get result row - _logger.debug("get_batch_feature_vectors through SQL client") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("get_batch_feature_vectors through SQL client") batch_results, _ = self.sql_client.get_batch_feature_vectors(rondb_entries) else: - _logger.debug("Empty entries for rondb, skipping fetching.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Empty entries for rondb, skipping fetching.") batch_results = [] - _logger.debug("Assembling feature vectors from batch results") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Assembling feature vectors from batch results") next_skipped = ( skipped_empty_entries.pop(0) if len(skipped_empty_entries) > 0 else None ) @@ -380,7 +389,8 @@ def get_feature_vectors( fillvalue=None, ): if next_skipped == idx: - _logger.debug("Entry %d was skipped, setting to empty dict.", idx) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Entry %d was skipped, setting to empty dict.", idx) next_skipped = ( skipped_empty_entries.pop(0) if len(skipped_empty_entries) > 0 @@ -415,15 +425,19 @@ def assemble_feature_vector( ) -> Optional[List[Any]]: """Assembles serving vector from online feature store.""" # Errors in batch requests are returned as None values - _logger.debug("Assembling serving vector: %s", result_dict) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Assembling serving vector: %s", result_dict) if result_dict is None: - _logger.debug("Found null result, setting to empty dict.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Found null result, setting to empty dict.") result_dict = {} if vector_db_result is not None and len(vector_db_result) > 0: - _logger.debug("Updating with vector_db features: %s", vector_db_result) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Updating with vector_db features: %s", vector_db_result) result_dict.update(vector_db_result) if passed_values is not None and len(passed_values) > 0: - _logger.debug("Updating with passed features: %s", passed_values) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Updating with passed features: %s", passed_values) result_dict.update(passed_values) missing_features = set(self.feature_vector_col_name).difference( @@ -451,7 +465,10 @@ def assemble_feature_vector( if len(self.transformation_functions) > 0: self.apply_transformation(result_dict) - _logger.debug("Assembled and transformed dict feature vector: %s", result_dict) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Assembled and transformed dict feature vector: %s", result_dict + ) return [result_dict.get(fname, None) for fname in self.feature_vector_col_name] @@ -474,18 +491,22 @@ def handle_feature_vector_return_type( ]: # Only get-feature-vector and get-feature-vectors can return list or numpy if return_type.lower() == "list" and not inference_helper: - _logger.debug("Returning feature vector as value list") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Returning feature vector as value list") return feature_vectorz elif return_type.lower() == "numpy" and not inference_helper: - _logger.debug("Returning feature vector as numpy array") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Returning feature vector as numpy array") return np.array(feature_vectorz) # Only inference helper can return dict elif return_type.lower() == "dict" and inference_helper: - _logger.debug("Returning feature vector as dictionary") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Returning feature vector as dictionary") return feature_vectorz # Both can return pandas and polars elif return_type.lower() == "pandas": - _logger.debug("Returning feature vector as pandas dataframe") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Returning feature vector as pandas dataframe") if batch and inference_helper: return pd.DataFrame(feature_vectorz) elif inference_helper: @@ -499,7 +520,8 @@ def handle_feature_vector_return_type( pandas_df.columns = self._feature_vector_col_name return pandas_df elif return_type.lower() == "polars": - _logger.debug("Returning feature vector as polars dataframe") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Returning feature vector as polars dataframe") if not HAS_POLARS: raise ModuleNotFoundError(polars_not_installed_message) @@ -524,10 +546,11 @@ def get_inference_helper( default_client = self.which_client_and_ensure_initialised( force_rest_client, force_sql_client ) - _logger.debug( - f"Retrieve inference helper values for single entry via {default_client.upper()} client." - ) - _logger.debug(f"entry: {entry} as return type: {return_type}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Retrieve inference helper values for single entry via {default_client.upper()} client." + ) + _logger.debug(f"entry: {entry} as return type: {return_type}") if default_client == self.DEFAULT_REST_CLIENT: return self.handle_feature_vector_return_type( self.rest_client_engine.get_single_feature_vector( @@ -558,10 +581,11 @@ def get_inference_helpers( default_client = self.which_client_and_ensure_initialised( force_rest_client, force_sql_client ) - _logger.debug( - f"Retrieve inference helper values for batch entries via {default_client.upper()} client." - ) - _logger.debug(f"entries: {entries} as return type: {return_type}") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Retrieve inference helper values for batch entries via {default_client.upper()} client." + ) + _logger.debug(f"entries: {entries} as return type: {return_type}") if default_client == self.DEFAULT_REST_CLIENT: batch_results = self.rest_client_engine.get_batch_feature_vectors( @@ -656,7 +680,8 @@ def apply_transformation(self, row_dict: Dict[str, Any]): matching_keys = set(self.transformation_functions.keys()).intersection( row_dict.keys() ) - _logger.debug("Applying transformation functions to : %s", matching_keys) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Applying transformation functions to : %s", matching_keys) for feature_name in matching_keys: row_dict[feature_name] = self.transformation_functions[ feature_name @@ -672,9 +697,11 @@ def apply_return_value_handlers( matching_keys = set(self.feature_to_handle_if_sql).intersection( row_dict.keys() ) - _logger.debug("Applying return value handlers to : %s", matching_keys) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Applying return value handlers to : %s", matching_keys) for fname in matching_keys: - _logger.debug("Applying return value handler to feature: %s", fname) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Applying return value handler to feature: %s", fname) row_dict[fname] = self.return_feature_value_handlers[fname](row_dict[fname]) return row_dict @@ -699,11 +726,13 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: if len(complex_feature_schemas) == 0: return {} else: - _logger.debug( - f"Building complex feature decoders corresponding to {complex_feature_schemas}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Building complex feature decoders corresponding to {complex_feature_schemas}." + ) if HAS_FASTAVRO: - _logger.debug("Using fastavro for deserialization.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Using fastavro for deserialization.") return { f_name: ( lambda feature_value, avro_schema=schema: ( @@ -726,7 +755,8 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: for (f_name, schema) in complex_feature_schemas.items() } else: - _logger.debug("Fast Avro not found, using avro for deserialization.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Fast Avro not found, using avro for deserialization.") return { f_name: ( lambda feature_value, avro_schema=schema: avro_schema.read( @@ -763,10 +793,11 @@ def set_return_feature_value_handlers( return if len(features) == 0: return - _logger.debug( - f"Setting return feature value handlers for Feature View {self._feature_view_name}," - f" version: {self._feature_view_version} in Feature Store {self._feature_store_name}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Setting return feature value handlers for Feature View {self._feature_view_name}," + f" version: {self._feature_view_version} in Feature Store {self._feature_store_name}." + ) self._return_feature_value_handlers.update( self.build_complex_feature_decoders() ) @@ -824,14 +855,17 @@ def validate_entry( Keys relevant to vector_db are filtered out. """ - _logger.debug("Checking keys in entry are valid serving keys.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug("Checking keys in entry are valid serving keys.") for key in entry.keys(): if key not in self.valid_serving_keys: raise exceptions.FeatureStoreException( f"Provided key {key} is not a serving key. Required serving keys: {self.required_serving_keys}." ) - - _logger.debug("Checking entry has either all or none of composite serving keys") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Checking entry has either all or none of composite serving keys" + ) for composite_group in self.groups_of_composite_serving_keys.values(): present_keys = [ True @@ -873,9 +907,10 @@ def identify_missing_features_pre_fetch( - The method does not check whether serving keys correspond to existing rows in the online feature store. - The method does not check whether the passed features names and data types correspond to the query schema. """ - _logger.debug( - "Checking missing serving keys in entry correspond to passed features." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "Checking missing serving keys in entry correspond to passed features." + ) missing_features_per_serving_keys = {} has_missing = False for sk_name, ( @@ -886,9 +921,11 @@ def identify_missing_features_pre_fetch( set(passed_features.keys()) if passed_features else set() ) if vector_db_features and len(vector_db_features) > 0: - _logger.debug( - "vector_db_features for pre-fetch missing : %s", vector_db_features - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + "vector_db_features for pre-fetch missing : %s", + vector_db_features, + ) passed_feature_names = passed_feature_names.union( vector_db_features.keys() ) @@ -901,9 +938,10 @@ def identify_missing_features_pre_fetch( if ( sk_name not in entry.keys() and sk_no_prefix not in entry.keys() ) and not fetched_features.issubset(passed_feature_names): - _logger.debug( - f"Missing serving key {sk_name} and corresponding features {neither_fetched_nor_passed}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Missing serving key {sk_name} and corresponding features {neither_fetched_nor_passed}." + ) has_missing = True missing_features_per_serving_keys[sk_name] = neither_fetched_nor_passed @@ -980,9 +1018,10 @@ def groups_of_composite_serving_keys(self) -> Dict[int, List[str]]: ) for sk in self.serving_keys } - _logger.debug( - f"Groups of composite serving keys: {self._groups_of_composite_serving_keys}." - ) + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug( + f"Groups of composite serving keys: {self._groups_of_composite_serving_keys}." + ) return self._groups_of_composite_serving_keys @property @@ -993,7 +1032,8 @@ def rondb_serving_keys(self) -> List[str]: for sk in self.serving_keys if sk.feature_group.id not in self._skip_fg_ids ] - _logger.debug(f"RonDB serving keys: {self._rondb_serving_keys}.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"RonDB serving keys: {self._rondb_serving_keys}.") return self._rondb_serving_keys @property @@ -1115,6 +1155,6 @@ def default_client(self, default_client: Literal["rest", "sql"]): f"Default Online Store client is set to {self.DEFAULT_SQL_CLIENT} but Online Store SQL client" + " is not initialised. Call `init_serving` with init_sql_client set to True before using it." ) - - _logger.debug(f"Default Online Store Client is set to {default_client}.") + if _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Default Online Store Client is set to {default_client}.") self._default_client = default_client