From a5c8bacd588367016799308ac3fa0a0e1287f376 Mon Sep 17 00:00:00 2001 From: Aaron Jacobs Date: Tue, 23 Jun 2026 16:48:53 -0400 Subject: [PATCH] fix: Detect invalid API keys more carefully. When credentials come from the command line (or environment variables) rather than the store, we attempt to validate them by hitting the `/v1/user` endpoint, and error out if this fails. It turns out that this doesn't work for machine credentials, which aren't associated with a user. Thankfully the difference between "invalid API key" and "not a user API key" can be detected based on the error code from the response, so this commit draws this exact distinction. Unit tests are included. Signed-off-by: Aaron Jacobs --- rsconnect/api.py | 65 ++++++++++++++++++++------------ tests/test_api.py | 95 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 23 deletions(-) diff --git a/rsconnect/api.py b/rsconnect/api.py index ed55d39a..b2383e86 100644 --- a/rsconnect/api.py +++ b/rsconnect/api.py @@ -1554,23 +1554,13 @@ def server_settings(self): def verify_api_key(self, server: Optional[RSConnectServer] = None): """ Verify that an API Key may be used to authenticate with the given Posit Connect server. - If the API key verifies, we return the username of the associated user. """ if not server: server = self.remote_server if isinstance(server, ShinyappsServer): raise RSConnectException("Shinnyapps server does not use an API key.") with RSConnectClient(server) as client: - result = client.me() - if isinstance(result, HTTPResponse): - if ( - result.json_data - and isinstance(result.json_data, dict) - and "code" in result.json_data - and result.json_data["code"] == 30 - ): - raise RSConnectException("The specified API key is not valid.") - raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason)) + verify_api_key_response(client) return self @property @@ -2053,27 +2043,56 @@ def verify_server(connect_server: RSConnectServer): raise RSConnectException("There is an SSL/TLS configuration problem: %s" % ssl_error) +def verify_api_key_response(client: RSConnectClient) -> Optional[UserRecord]: + """ + Issue GET v1/user and interpret the response for the purpose of API key verification. + + :param client: a client configured with the credential to verify. + :return: the user record on success, or None for a valid credential that has no + associated user (a service principal or machine identity, for example one used + for trusted publishing). + :raises RSConnectException: if the credential is invalid or the request otherwise fails. + """ + # Use the raw response rather than client.me(), which would raise a generic error + # and discard the error code we need to distinguish the cases below. + result = client.get("v1/user") + if isinstance(result, HTTPResponse): + # A transport-layer failure (network/TLS/socket error) yields an HTTPResponse + # with no status or reason, so handle it before inspecting them, mirroring + # handle_bad_response. + if result.exception: + raise RSConnectException( + "Exception trying to connect to %s - %s" % (result.full_uri, result.exception), + cause=result.exception, + ) + json_data = result.json_data if isinstance(result.json_data, dict) else {} + code = json_data.get("code") + # A service principal or machine identity authenticates successfully but has no + # associated user, so the v1/user endpoint rejects it with a 403 and error code + # 22. That code is unambiguous on this endpoint -- a genuinely invalid credential + # is rejected at the auth layer with code 30 instead -- so the credential is valid + # and we treat it as verified. + if result.status == 403 and code == 22: + return None + if code == 30: + raise RSConnectException("The specified API key is not valid.") + raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason)) + return cast(UserRecord, result) + + def verify_api_key(connect_server: RSConnectServer) -> str: """ Verify that an API Key may be used to authenticate with the given Posit Connect server. If the API key verifies, we return the username of the associated user. :param connect_server: the Connect server information, including the API key to test. - :return: the username of the user to whom the API key belongs. + :return: the username of the user to whom the API key belongs, or an empty string for a + valid credential with no associated user (a service principal or machine identity). """ warn("This method has been moved and will be deprecated.", DeprecationWarning, stacklevel=2) with RSConnectClient(connect_server) as client: - result = client.me() - if isinstance(result, HTTPResponse): - if ( - result.json_data - and isinstance(result.json_data, dict) - and "code" in result.json_data - and result.json_data["code"] == 30 - ): - raise RSConnectException("The specified API key is not valid.") - raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason)) - return result["username"] + user = verify_api_key_response(client) + return user["username"] if user else "" def get_python_info(connect_server: Union[RSConnectServer, SPCSConnectServer]): diff --git a/tests/test_api.py b/tests/test_api.py index 0be3a763..dbd94656 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -15,6 +15,7 @@ ShinyappsServer, ShinyappsService, SPCSConnectServer, + verify_api_key, ) from rsconnect.exception import DeploymentFailedException, RSConnectException @@ -99,6 +100,100 @@ def test_client_system_caches_runtime_list(self): result = ce.runtime_caches self.assertDictEqual(result, mocked_response) + @httpretty.activate(verbose=True, allow_net_connect=False) + def test_verify_api_key_user(self): + ce = RSConnectExecutor(None, None, "http://test-server/", "api_key") + httpretty.register_uri( + httpretty.GET, + "http://test-server/__api__/v1/user", + body=json.dumps({"username": "alice"}), + status=200, + forcing_headers={"Content-Type": "application/json"}, + ) + # Returns the executor without raising for a regular user. + self.assertIs(ce.verify_api_key(), ce) + + @httpretty.activate(verbose=True, allow_net_connect=False) + def test_verify_api_key_service_principal(self): + # A service principal (e.g. for trusted publishing) authenticates but is not a + # user, so v1/user returns 403 / code 22. The credential is still valid, so + # verification should succeed instead of raising. + ce = RSConnectExecutor(None, None, "http://test-server/", "api_key") + httpretty.register_uri( + httpretty.GET, + "http://test-server/__api__/v1/user", + body=json.dumps({"code": 22, "error": "You don't have permission to perform this operation."}), + status=403, + forcing_headers={"Content-Type": "application/json"}, + ) + self.assertIs(ce.verify_api_key(), ce) + + @httpretty.activate(verbose=True, allow_net_connect=False) + def test_verify_api_key_invalid(self): + ce = RSConnectExecutor(None, None, "http://test-server/", "api_key") + httpretty.register_uri( + httpretty.GET, + "http://test-server/__api__/v1/user", + body=json.dumps({"code": 30, "error": "Invalid login."}), + status=401, + forcing_headers={"Content-Type": "application/json"}, + ) + with self.assertRaises(RSConnectException) as cm: + ce.verify_api_key() + self.assertIn("not valid", str(cm.exception)) + + def test_verify_api_key_connection_error(self): + # A transport-layer failure yields an HTTPResponse with no status/reason, only + # an exception. Verification should surface a clean RSConnectException rather + # than an AttributeError from reading the missing status. + from rsconnect.http_support import HTTPResponse + + ce = RSConnectExecutor(None, None, "http://test-server/", "api_key") + failed_response = HTTPResponse("http://test-server/__api__/v1/user", exception=OSError("connection refused")) + with patch.object(RSConnectClient, "get", return_value=failed_response): + with self.assertRaises(RSConnectException) as cm: + ce.verify_api_key() + self.assertIn("connection refused", str(cm.exception)) + + # The deprecated module-level verify_api_key() is reached via actions.test_api_key() + # during `rsconnect add`, so it must accept the same credentials as the executor path. + @httpretty.activate(verbose=True, allow_net_connect=False) + def test_module_verify_api_key_user(self): + httpretty.register_uri( + httpretty.GET, + "http://test-server/__api__/v1/user", + body=json.dumps({"username": "alice"}), + status=200, + forcing_headers={"Content-Type": "application/json"}, + ) + self.assertEqual(verify_api_key(RSConnectServer("http://test-server", "api_key")), "alice") + + @httpretty.activate(verbose=True, allow_net_connect=False) + def test_module_verify_api_key_service_principal(self): + # A service principal authenticates but is not a user (403 / code 22); the + # credential is valid, so this returns an empty username instead of raising. + httpretty.register_uri( + httpretty.GET, + "http://test-server/__api__/v1/user", + body=json.dumps({"code": 22, "error": "You don't have permission to perform this operation."}), + status=403, + forcing_headers={"Content-Type": "application/json"}, + ) + self.assertEqual(verify_api_key(RSConnectServer("http://test-server", "api_key")), "") + + @httpretty.activate(verbose=True, allow_net_connect=False) + def test_module_verify_api_key_invalid(self): + httpretty.register_uri( + httpretty.GET, + "http://test-server/__api__/v1/user", + body=json.dumps({"code": 30, "error": "Invalid login."}), + status=401, + forcing_headers={"Content-Type": "application/json"}, + ) + with self.assertRaises(RSConnectException) as cm: + verify_api_key(RSConnectServer("http://test-server", "api_key")) + self.assertIn("not valid", str(cm.exception)) + # RSConnectExecutor.delete_runtime_cache() dry run returns expected request # RSConnectExecutor.delete_runtime_cache() dry run prints expected messages @httpretty.activate(verbose=True, allow_net_connect=False)