Skip to content
15 changes: 14 additions & 1 deletion kr8s/_portforward.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from typing import TYPE_CHECKING, Literal, Union

import anyio
import httpx
import httpx_ws

from ._exceptions import ConnectionClosedError
from ._exceptions import ConnectionClosedError, ServerError
from ._types import APIObjectWithPods

LocalPortType = Union[Literal["match", "auto"], int, None]
Expand Down Expand Up @@ -238,7 +239,19 @@ async def _connect_websocket(self):
yield websocket
break
except httpx_ws.HTTPXWSException as e:
if connection_attempts > 5:
if (
isinstance(e, httpx_ws.WebSocketUpgradeError)
and hasattr(e, "response")
and e.response.status_code in (401, 403)
):
error_message = f"Permission denied: {e.response.status_code}"
with suppress(httpx.StreamClosed, httpx.ResponseNotRead):
await e.response.aread()
error_message = e.response.text
raise ServerError(error_message, response=e.response) from e
self.pod = None
connection_attempts += 1
if connection_attempts > 5:
raise ConnectionClosedError("Unable to connect to Pod") from e
await anyio.sleep(0.1 * connection_attempts)
Expand Down
160 changes: 160 additions & 0 deletions kr8s/tests/test_portforward_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, Kr8s Developers (See LICENSE for list)
# SPDX-License-Identifier: BSD 3-Clause License
import asyncio
import os
import tempfile
from pathlib import Path

import pytest
import yaml

import kr8s
from kr8s._exceptions import ServerError


# Override the k8s_cluster fixture to avoid creating a Kind cluster
@pytest.fixture
def k8s_cluster():
class MockCluster:
@property
def kubeconfig_path(self):
return Path(os.environ.get("KUBECONFIG", "~/.kube/config")).expanduser()

return MockCluster()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you did this for local development? In CI we need to this run against the kind cluster.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I was using this for my local dev test,
sure have incorporated that



@pytest.mark.asyncio
async def test_portforward_invalid_token(k8s_cluster):
# Connect with admin kubeconfig to create a real pod
try:
api_admin = await kr8s.asyncio.api(kubeconfig=k8s_cluster.kubeconfig_path)
except Exception as e:
pytest.skip(f"Skipping: cannot connect to cluster: {e}")

# Simple connectivity check
try:
async for _ in api_admin.get("pods", namespace="default"):
break
except Exception as e:
pytest.skip(f"Skipping test: Could not list pods (check kubeconfig): {e}")

# Create a dummy pod object for testing
pod = kr8s.asyncio.objects.Pod(
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": "test-portforward-auth", "namespace": "default"},
"spec": {
"containers": [
{
"name": "nginx",
"image": "nginx:latest",
"ports": [{"containerPort": 80}],
}
]
},
},
api=api_admin,
)

# Delete if exists
if await pod.exists():
await pod.delete()
await pod.wait("condition=Deleted")

await pod.create()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the Pod actually need to exist? It never gets used other than to create pod_invalid.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that makes sense

try:
await pod.wait("condition=Ready")

# 2. Create a temporary kubeconfig with an INVALID token
# We read the current kubeconfig
if k8s_cluster.kubeconfig_path.exists():
kubeconfig_data = yaml.safe_load(k8s_cluster.kubeconfig_path.read_text())
else:
pytest.skip("Kubeconfig file not found.")

# Find the user and change the token/certs to an invalid token
# We replace the user entry with a simple token user
kubeconfig_data["users"] = [
{"name": "invalid-user", "user": {"token": "invalid-token-12345"}}
]
# Update context to use this user
current_context = kubeconfig_data.get("current-context")
if not current_context and kubeconfig_data["contexts"]:
current_context = kubeconfig_data["contexts"][0]["name"]

if current_context:
for ctx in kubeconfig_data["contexts"]:
if ctx["name"] == current_context:
ctx["context"]["user"] = "invalid-user"
break
else:
# Fallback if no context structure
pass

# Write invalid kubeconfig to a temp file
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") as tmp_kubeconfig:
yaml.dump(kubeconfig_data, tmp_kubeconfig)
tmp_kubeconfig.flush()

# 3. Initialize a new API with the invalid kubeconfig
from kr8s._api import Api

original_check_version = Api._check_version

async def mock_check_version(self):
return

Api._check_version = mock_check_version

try:
api_invalid = await kr8s.asyncio.api(kubeconfig=tmp_kubeconfig.name)
finally:
Api._check_version = original_check_version

# 4. Create a Pod object bound to this invalid API
# We use the same name/namespace, but bound to the invalid API
pod_invalid = kr8s.asyncio.objects.Pod(
pod.name, namespace=pod.namespace, api=api_invalid
)

# 5. Attempt portforward and expect ServerError (Unauthorized)
pf = pod_invalid.portforward(80, local_port=None)

print("Attempting portforward with invalid token...")
# We need to connect to the local port to trigger the websocket connection to the API server
# The ServerError will be raised in the background task or when we try to read/write?
# Actually, _sync_sockets runs in the server loop.
# If it fails, it might log an error or close the connection.
# But verify_real.py caught ServerError.

# In verify_real.py:
# async with pf as port:
# ...
# reader, writer = await asyncio.open_connection("127.0.0.1", port)

# The ServerError in verify_real.py was caught because it was raised from _connect_websocket?
# But _connect_websocket is called from _sync_sockets.

# Let's try to connect and see if it raises.

with pytest.raises(ServerError) as excinfo:
async with pf._connect_websocket() as _:
pass

# Verify it is a 401 or 403 (likely 401 for invalid token)
assert excinfo.value.response.status_code in (401, 403)
print(f"Success: Caught expected ServerError (401/403): {excinfo.value}")

finally:
await pod.delete()


if __name__ == "__main__":

class MockCluster:
@property
def kubeconfig_path(self):
return Path(os.environ.get("KUBECONFIG", "~/.kube/config")).expanduser()

asyncio.run(test_portforward_invalid_token(MockCluster()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In future instead of this you could just run pytest -k test_portforward_invalid_token and pytest will find the test and run it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted