Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 116 additions & 79 deletions python/hsfs/client/online_store_rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,24 @@ def init_or_reset_online_store_rest_client(
transport=transport, optional_config=optional_config
)
else:
_logger.warning(
"Online Store Rest Client is already initialised. To reset connection or/and override configuration, "
+ "use reset_online_store_rest_client flag.",
stacklevel=2,
)
if _logger.isEnabledFor(logging.WARNING):
_logger.warning(
"Online Store Rest Client is already initialised. To reset connection or/and override configuration, "
+ "use reset_online_store_rest_client flag.",
stacklevel=2,
)


def get_instance() -> OnlineStoreRestClientSingleton:
global _online_store_rest_client
if _online_store_rest_client is None:
_logger.warning(
"Online Store Rest Client is not initialised. Initialising with default configuration."
)
if _logger.isEnabledFor(logging.WARNING):
_logger.warning(
"Online Store Rest Client is not initialised. Initialising with default configuration."
)
_online_store_rest_client = OnlineStoreRestClientSingleton()
_logger.debug("Accessing global Online Store Rest Client instance.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Accessing global Online Store Rest Client instance.")
return _online_store_rest_client


Expand Down Expand Up @@ -91,11 +94,13 @@ def __init__(
] = None,
optional_config: Optional[Dict[str, Any]] = None,
):
_logger.debug(
f"Initialising Online Store Rest Client {'with optional configuration' if optional_config else ''}."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
f"Initialising Online Store Rest Client {'with optional configuration' if optional_config else ''}."
)
if optional_config:
_logger.debug(f"Optional Config: {optional_config!r}")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f"Optional Config: {optional_config!r}")
self._check_hopsworks_connection()
self.variable_api = variable_api.VariableApi()
self._auth: client.auth.OnlineStoreKeyAuth
Expand All @@ -116,14 +121,17 @@ def reset_client(
] = None,
optional_config: Optional[Dict[str, Any]] = None,
):
_logger.debug(
f"Resetting Online Store Rest Client {'with optional configuration' if optional_config else ''}."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
f"Resetting Online Store Rest Client {'with optional configuration' if optional_config else ''}."
)
if optional_config:
_logger.debug(f"Optional Config: {optional_config}")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f"Optional Config: {optional_config}")
self._check_hopsworks_connection()
if hasattr(self, "_session") and self._session:
_logger.debug("Closing existing session.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Closing existing session.")
self._session.close()
delattr(self, "_session")
self._setup_rest_client(
Expand All @@ -140,46 +148,54 @@ def _setup_rest_client(
optional_config: Optional[Dict[str, Any]] = None,
use_current_config: bool = True,
):
_logger.debug("Setting up Online Store Rest Client.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Setting up Online Store Rest Client.")
if optional_config and not isinstance(optional_config, dict):
raise ValueError(
"optional_config must be a dictionary. See documentation for allowed keys and values."
)
_logger.debug("Optional Config: %s", optional_config)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Optional Config: %s", optional_config)
if not use_current_config:
_logger.debug(
"Retrieving default configuration for Online Store REST Client."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Retrieving default configuration for Online Store REST Client."
)
self._current_config = self._get_default_client_config()
if optional_config:
_logger.debug(
"Updating default configuration with provided optional configuration."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Updating default configuration with provided optional configuration."
)
self._current_config.update(optional_config)

self._set_auth(optional_config)
if not hasattr(self, "_session") or not self._session:
_logger.debug("Initialising new requests session.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Initialising new requests session.")
self._session = requests.Session()
else:
raise ValueError(
"Use the init_or_reset_online_store_connection method with reset_connection flag set "
+ "to True to reset the online_store_client_connection"
)
if transport is not None:
_logger.debug("Setting custom transport adapter.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Setting custom transport adapter.")
self._session.mount("https://", transport)
self._session.mount("http://", transport)

if not self._current_config[self.VERIFY_CERTS]:
_logger.warning(
"Disabling SSL certificate verification. This is not recommended for production environments."
)
if _logger.isEnabledFor(logging.WARNING):
_logger.warning(
"Disabling SSL certificate verification. This is not recommended for production environments."
)
self._session.verify = False
else:
_logger.debug(
f"Setting SSL certificate verification using CA Certs path: {self._current_config[self.CA_CERTS]}"
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
f"Setting SSL certificate verification using CA Certs path: {self._current_config[self.CA_CERTS]}"
)
self._session.verify = self._current_config[self.CA_CERTS]

# Set base_url
Expand All @@ -202,15 +218,19 @@ def _setup_rest_client(
), "Online Store REST Client Configuration failed to initialise."

def _get_default_client_config(self) -> Dict[str, Any]:
_logger.debug("Retrieving default configuration for Online Store REST Client.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Retrieving default configuration for Online Store REST Client."
)
default_config = self._get_default_static_parameters_config()
default_config.update(self._get_default_dynamic_parameters_config())
return default_config

def _get_default_static_parameters_config(self) -> Dict[str, Any]:
_logger.debug(
"Retrieving default static configuration for Online Store REST Client."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Retrieving default static configuration for Online Store REST Client."
)
return {
self.TIMEOUT: self._DEFAULT_ONLINE_STORE_REST_CLIENT_TIMEOUT_SECOND,
self.VERIFY_CERTS: self._DEFAULT_ONLINE_STORE_REST_CLIENT_VERIFY_CERTS,
Expand All @@ -222,14 +242,18 @@ def _get_default_static_parameters_config(self) -> Dict[str, Any]:
def _get_default_dynamic_parameters_config(
self,
) -> Dict[str, Any]:
_logger.debug(
"Retrieving default dynamic configuration for Online Store REST Client."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Retrieving default dynamic configuration for Online Store REST Client."
)
url = furl(self._get_rondb_rest_server_endpoint())
_logger.debug(f"Default RonDB Rest Server host and port: {url.host}:{url.port}")
_logger.debug(
f"Using CA Certs from Hopsworks Client: {client.get_instance()._get_ca_chain_path()}"
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
f"Default RonDB Rest Server host and port: {url.host}:{url.port}"
)
_logger.debug(
f"Using CA Certs from Hopsworks Client: {client.get_instance()._get_ca_chain_path()}"
)
return {
self.HOST: url.host,
self.PORT: url.port,
Expand All @@ -248,31 +272,36 @@ def _get_rondb_rest_server_endpoint(self) -> str:
str: RonDB Rest Server endpoint with default port.
"""
if client.get_instance()._is_external():
_logger.debug(
"External Online Store REST Client : Retrieving RonDB Rest Server endpoint via loadbalancer."
)
external_domain = self.variable_api.get_loadbalancer_external_domain()
if external_domain == "":
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"External Online Store REST Client : Loadbalancer external domain is not set. Using client host as endpoint."
)
external_domain = self.variable_api.get_loadbalancer_external_domain()
if external_domain == "":
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"External Online Store REST Client : Loadbalancer external domain is not set. Using client host as endpoint."
)
external_domain = client.get_instance().host
default_url = f"https://{external_domain}:{self._DEFAULT_ONLINE_STORE_REST_CLIENT_PORT}"
_logger.debug(
f"External Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}"
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
f"External Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}"
)
return default_url
else:
_logger.debug(
"Internal Online Store REST Client : Retrieving RonDB Rest Server endpoint via service discovery."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Internal Online Store REST Client : Retrieving RonDB Rest Server endpoint via service discovery."
)
service_discovery_domain = self.variable_api.get_service_discovery_domain()
if service_discovery_domain == "":
raise FeatureStoreException("Service discovery domain is not set.")
default_url = f"https://rdrs.service.{service_discovery_domain}:{self._DEFAULT_ONLINE_STORE_REST_CLIENT_PORT}"
_logger.debug(
f"Internal Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}"
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
f"Internal Online Store REST Client : Default RonDB Rest Server endpoint: {default_url}"
)
return default_url

def send_request(
Expand All @@ -284,9 +313,10 @@ def send_request(
) -> requests.Response:
url = self._base_url.copy()
url.path.segments.extend(path_params)
_logger.debug(f"Sending {method} request to {url.url}.")
_logger.debug(f"Provided Data: {data}")
_logger.debug(f"Provided Headers: {headers}")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f"Sending {method} request to {url.url}.")
_logger.debug(f"Provided Data: {data}")
_logger.debug(f"Provided Headers: {headers}")
prepped_request = self._session.prepare_request(
requests.Request(
method, url=url.url, headers=headers, data=data, auth=self.auth
Expand All @@ -300,42 +330,48 @@ def send_request(
)

def _check_hopsworks_connection(self) -> None:
_logger.debug("Checking Hopsworks connection.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Checking Hopsworks connection.")
assert (
client.get_instance() is not None and client.get_instance()._connected
), """Hopsworks Client is not connected. Please connect to Hopsworks cluster
via hopsworks.login or hsfs.connection before initialising the Online Store REST Client.
"""
_logger.debug("Hopsworks connection is active.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Hopsworks connection is active.")

def _set_auth(self, optional_config: Optional[Dict[str, Any]] = None) -> None:
"""Set authentication object for the Online Store REST Client.

RonDB Rest Server uses Hopsworks Api Key to authenticate requests via the X-API-KEY header by default.
The api key determines the permissions of the user making the request for access to a given Feature Store.
"""
_logger.debug("Setting authentication for Online Store REST Client.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Setting authentication for Online Store REST Client.")
if client.get_instance()._is_external():
assert hasattr(
client.get_instance()._auth, "_token"
), "External client must use API Key authentication. Contact your system administrator."
_logger.debug(
"External Online Store REST Client : Setting authentication using Hopsworks Client API Key."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"External Online Store REST Client : Setting authentication using Hopsworks Client API Key."
)
self._auth = client.auth.OnlineStoreKeyAuth(
client.get_instance()._auth._token
)
elif isinstance(optional_config, dict) and optional_config.get(
self.API_KEY, False
):
_logger.debug(
"Setting authentication using provided API Key from optional configuration."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Setting authentication using provided API Key from optional configuration."
)
self._auth = client.auth.OnlineStoreKeyAuth(optional_config[self.API_KEY])
elif hasattr(self, "_auth") and self._auth is not None:
_logger.debug(
"Authentication for Online Store REST Client is already set. Using existing authentication api key."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Authentication for Online Store REST Client is already set. Using existing authentication api key."
)
else:
raise FeatureStoreException(
"RonDB Rest Server uses Hopsworks Api Key to authenticate request."
Expand All @@ -345,14 +381,15 @@ def _set_auth(self, optional_config: Optional[Dict[str, Any]] = None) -> None:
def is_connected(self):
"""If Online Store Rest Client is initialised, ping RonDB Rest Server to ensure connection is active."""
if self._session is None:
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Checking Online Store REST Client is connected. Pinging RonDB Rest Server."
)
raise FeatureStoreException("Online Store REST Client is not initialised.")
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Checking Online Store REST Client is connected. Session is not initialised."
)
raise FeatureStoreException("Online Store REST Client is not initialised.")

_logger.debug(
"Checking Online Store REST Client is connected. Pinging RonDB Rest Server."
)
if not self.send_request("GET", ["ping"]):
warn("Ping failed, RonDB Rest Server is not reachable.", stacklevel=2)
return False
Expand Down
Loading
Loading