Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions rsconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,23 +1554,13 @@ def server_settings(self):
def verify_api_key(self, server: Optional[RSConnectServer] = None):
"""
Verify that an API Key may be used to authenticate with the given Posit Connect server.
If the API key verifies, we return the username of the associated user.
"""
if not server:
server = self.remote_server
if isinstance(server, ShinyappsServer):
raise RSConnectException("Shinnyapps server does not use an API key.")
with RSConnectClient(server) as client:
result = client.me()
if isinstance(result, HTTPResponse):
if (
result.json_data
and isinstance(result.json_data, dict)
and "code" in result.json_data
and result.json_data["code"] == 30
):
raise RSConnectException("The specified API key is not valid.")
raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason))
verify_api_key_response(client)
return self

@property
Expand Down Expand Up @@ -2053,27 +2043,56 @@ def verify_server(connect_server: RSConnectServer):
raise RSConnectException("There is an SSL/TLS configuration problem: %s" % ssl_error)


def verify_api_key_response(client: RSConnectClient) -> Optional[UserRecord]:
"""
Issue GET v1/user and interpret the response for the purpose of API key verification.

:param client: a client configured with the credential to verify.
:return: the user record on success, or None for a valid credential that has no
associated user (a service principal or machine identity, for example one used
for trusted publishing).
:raises RSConnectException: if the credential is invalid or the request otherwise fails.
"""
# Use the raw response rather than client.me(), which would raise a generic error
# and discard the error code we need to distinguish the cases below.
result = client.get("v1/user")
if isinstance(result, HTTPResponse):
# A transport-layer failure (network/TLS/socket error) yields an HTTPResponse
# with no status or reason, so handle it before inspecting them, mirroring
# handle_bad_response.
if result.exception:
raise RSConnectException(
"Exception trying to connect to %s - %s" % (result.full_uri, result.exception),
cause=result.exception,
)
json_data = result.json_data if isinstance(result.json_data, dict) else {}
code = json_data.get("code")
# A service principal or machine identity authenticates successfully but has no
# associated user, so the v1/user endpoint rejects it with a 403 and error code
# 22. That code is unambiguous on this endpoint -- a genuinely invalid credential
# is rejected at the auth layer with code 30 instead -- so the credential is valid
# and we treat it as verified.
if result.status == 403 and code == 22:
return None
if code == 30:
raise RSConnectException("The specified API key is not valid.")
raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason))
return cast(UserRecord, result)


def verify_api_key(connect_server: RSConnectServer) -> str:
"""
Verify that an API Key may be used to authenticate with the given Posit Connect server.
If the API key verifies, we return the username of the associated user.

:param connect_server: the Connect server information, including the API key to test.
:return: the username of the user to whom the API key belongs.
:return: the username of the user to whom the API key belongs, or an empty string for a
valid credential with no associated user (a service principal or machine identity).
"""
warn("This method has been moved and will be deprecated.", DeprecationWarning, stacklevel=2)
with RSConnectClient(connect_server) as client:
result = client.me()
if isinstance(result, HTTPResponse):
if (
result.json_data
and isinstance(result.json_data, dict)
and "code" in result.json_data
and result.json_data["code"] == 30
):
raise RSConnectException("The specified API key is not valid.")
raise RSConnectException("Could not verify the API key: %s %s" % (result.status, result.reason))
return result["username"]
user = verify_api_key_response(client)
return user["username"] if user else ""


def get_python_info(connect_server: Union[RSConnectServer, SPCSConnectServer]):
Expand Down
95 changes: 95 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ShinyappsServer,
ShinyappsService,
SPCSConnectServer,
verify_api_key,
)
from rsconnect.exception import DeploymentFailedException, RSConnectException

Expand Down Expand Up @@ -99,6 +100,100 @@ def test_client_system_caches_runtime_list(self):
result = ce.runtime_caches
self.assertDictEqual(result, mocked_response)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_verify_api_key_user(self):
ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"username": "alice"}),
status=200,
forcing_headers={"Content-Type": "application/json"},
)
# Returns the executor without raising for a regular user.
self.assertIs(ce.verify_api_key(), ce)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_verify_api_key_service_principal(self):
# A service principal (e.g. for trusted publishing) authenticates but is not a
# user, so v1/user returns 403 / code 22. The credential is still valid, so
# verification should succeed instead of raising.
ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 22, "error": "You don't have permission to perform this operation."}),
status=403,
forcing_headers={"Content-Type": "application/json"},
)
self.assertIs(ce.verify_api_key(), ce)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_verify_api_key_invalid(self):
ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 30, "error": "Invalid login."}),
status=401,
forcing_headers={"Content-Type": "application/json"},
)
with self.assertRaises(RSConnectException) as cm:
ce.verify_api_key()
self.assertIn("not valid", str(cm.exception))

def test_verify_api_key_connection_error(self):
# A transport-layer failure yields an HTTPResponse with no status/reason, only
# an exception. Verification should surface a clean RSConnectException rather
# than an AttributeError from reading the missing status.
from rsconnect.http_support import HTTPResponse

ce = RSConnectExecutor(None, None, "http://test-server/", "api_key")
failed_response = HTTPResponse("http://test-server/__api__/v1/user", exception=OSError("connection refused"))
with patch.object(RSConnectClient, "get", return_value=failed_response):
with self.assertRaises(RSConnectException) as cm:
ce.verify_api_key()
self.assertIn("connection refused", str(cm.exception))

# The deprecated module-level verify_api_key() is reached via actions.test_api_key()
# during `rsconnect add`, so it must accept the same credentials as the executor path.
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_module_verify_api_key_user(self):
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"username": "alice"}),
status=200,
forcing_headers={"Content-Type": "application/json"},
)
self.assertEqual(verify_api_key(RSConnectServer("http://test-server", "api_key")), "alice")

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_module_verify_api_key_service_principal(self):
# A service principal authenticates but is not a user (403 / code 22); the
# credential is valid, so this returns an empty username instead of raising.
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 22, "error": "You don't have permission to perform this operation."}),
status=403,
forcing_headers={"Content-Type": "application/json"},
)
self.assertEqual(verify_api_key(RSConnectServer("http://test-server", "api_key")), "")

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_module_verify_api_key_invalid(self):
httpretty.register_uri(
httpretty.GET,
"http://test-server/__api__/v1/user",
body=json.dumps({"code": 30, "error": "Invalid login."}),
status=401,
forcing_headers={"Content-Type": "application/json"},
)
with self.assertRaises(RSConnectException) as cm:
verify_api_key(RSConnectServer("http://test-server", "api_key"))
self.assertIn("not valid", str(cm.exception))

# RSConnectExecutor.delete_runtime_cache() dry run returns expected request
# RSConnectExecutor.delete_runtime_cache() dry run prints expected messages
@httpretty.activate(verbose=True, allow_net_connect=False)
Expand Down
Loading