Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions kubernetes/base/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,45 @@ def stream(self, func, *args, **kwargs):
disable_retries = ('timeout_seconds' in kwargs)
retry_after_410 = False
deserialize = kwargs.pop('deserialize', True)

# If no resource_version is specified for a watch (not a log follow),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When Watch.stream() is called without a resource_version, the stored self.resource_version reflects the last individual object event seen (e.g., a namespace last modified weeks ago). After 30–60 min of inactivity, this stale version falls outside the API server's watch cache, causing a 410 on retry and an unrecoverable ApiException.

I agree with the problem statement. Without this PR, when the problem happens, a user needs to carefully decide how to resume watching.

  • Watching with resourceVersion: None is obviously not reliable
  • First doing a LIST, then watching with the RV from the LIST is preferable. This is also how watch should be used at the very first place.

I'm concerned about the behavior change in this PR.

  • For users who know how to list and watch, their applications probably already perform list-and-watch upon start, and also when 410 happens. Basically, this PR won't affect these users.
  • For users who don't do list and watch in their application, I'm concerned if their application would tolerate this new behavior. I'm not sure what use cases there are when calling watch with resourceVersion: None, and I'm not sure if this PR would break their use cases... I almost feel if we should have validated that resourceVersion cannot be None. @brendandburns WDYT?

# perform an initial list call to get the current resourceVersion from
# the list metadata. This ensures that any subsequent watch restart
# after a 410 uses a valid, recent resourceVersion rather than a
# potentially stale one from an individual resource event.
if watch_arg == 'watch' and self.resource_version is None:
_list_excluded = {watch_arg, '_preload_content',
'allow_watch_bookmarks', 'timeout_seconds'}
list_kwargs = {k: v for k, v in kwargs.items()
if k not in _list_excluded}
initial_list = func(*args, **list_kwargs)
if (hasattr(initial_list, 'metadata')
and hasattr(initial_list.metadata, 'resource_version')
and isinstance(
initial_list.metadata.resource_version, str)
and initial_list.metadata.resource_version):
self.resource_version = initial_list.metadata.resource_version
kwargs['resource_version'] = self.resource_version
if (hasattr(initial_list, 'items')
and isinstance(initial_list.items, list)):
for item in initial_list.items:
raw_obj = \
self._api_client.sanitize_for_serialization(item)
if deserialize:
yield {
'type': 'ADDED',
'object': item,
'raw_object': raw_obj,
}
else:
yield {
'type': 'ADDED',
'object': raw_obj,
'raw_object': raw_obj,
}
if self._stop:
return

while True:
resp = func(*args, **kwargs)
try:
Expand Down
84 changes: 78 additions & 6 deletions kubernetes/base/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def test_watch_with_decode(self):
# make sure that all three records were consumed by the stream
self.assertEqual(4, count)

fake_api.get_namespaces.assert_called_once_with(
# The function is called twice: first for the initial list (no watch
# kwargs), then for the actual watch (with resource_version from list).
self.assertEqual(fake_api.get_namespaces.call_count, 2)
fake_api.get_namespaces.assert_called_with(
_preload_content=False, watch=True)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
Expand Down Expand Up @@ -292,6 +295,67 @@ def get_values(*args, **kwargs):
# more strict test with worse error message
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)

def test_watch_with_initial_list_resource_version(self):
"""Verify the list-then-watch pattern.

When stream() is called without a resource_version, it should:
1. Perform an initial list call to get the current resourceVersion.
2. Yield items from that list as ADDED events.
3. Start the watch from the list's resourceVersion so that
subsequent restarts after a 410 use a valid, recent version.
"""
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
# The watch stream returns one new event after the initial list.
fake_resp.stream = Mock(
return_value=[
'{"type": "MODIFIED", "object": {"metadata": '
'{"name": "ns-new", "resourceVersion": "200"}, '
'"spec": {}, "status": {}}}\n',
])

# Build a real-ish list response with two existing namespaces.
ns1 = client.V1Namespace(
metadata=client.V1ObjectMeta(
name="ns-one", resource_version="100"))
ns2 = client.V1Namespace(
metadata=client.V1ObjectMeta(
name="ns-two", resource_version="150"))
fake_list = client.V1NamespaceList(
metadata=client.V1ListMeta(resource_version="180"),
items=[ns1, ns2])

def _list_or_watch(*args, **kwargs):
return fake_resp if kwargs.get('watch') else fake_list

fake_api = Mock()
fake_api.list_namespaces = Mock(side_effect=_list_or_watch)
fake_api.list_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
events = []
for e in w.stream(fake_api.list_namespaces, timeout_seconds=1):
events.append(e)

# The two existing namespaces must appear first as ADDED events.
self.assertEqual(len(events), 3)
self.assertEqual(events[0]['type'], 'ADDED')
self.assertEqual(events[0]['object'].metadata.name, 'ns-one')
self.assertEqual(events[1]['type'], 'ADDED')
self.assertEqual(events[1]['object'].metadata.name, 'ns-two')
# The new event from the watch stream follows.
self.assertEqual(events[2]['type'], 'MODIFIED')
self.assertEqual(events[2]['object'].metadata.name, 'ns-new')

# The watch must have started from the list's resourceVersion.
fake_api.list_namespaces.assert_has_calls([
call(),
call(_preload_content=False, watch=True,
timeout_seconds=1, resource_version="180"),
])
self.assertEqual(w.resource_version, "200")

def test_watch_stream_twice(self):
w = Watch(float)
for step in ['first', 'second']:
Expand All @@ -312,7 +376,10 @@ def test_watch_stream_twice(self):
w.stop()

self.assertEqual(count, 3)
fake_api.get_namespaces.assert_called_once_with(
# The function is called twice per stream() invocation: once for
# the initial list call and once for the actual watch call.
self.assertEqual(fake_api.get_namespaces.call_count, 2)
fake_api.get_namespaces.assert_called_with(
_preload_content=False, watch=True)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
Expand Down Expand Up @@ -346,7 +413,9 @@ def test_watch_stream_loop(self):
w.stop()

self.assertEqual(count, 2)
self.assertEqual(fake_api.get_namespaces.call_count, 2)
# Each stream() call makes 2 API calls: initial list + watch.
# Two stream() calls = 4 total API calls.
self.assertEqual(fake_api.get_namespaces.call_count, 4)
self.assertEqual(fake_resp.stream.call_count, 2)
self.assertEqual(fake_resp.close.call_count, 2)
self.assertEqual(fake_resp.release_conn.call_count, 2)
Expand Down Expand Up @@ -423,8 +492,9 @@ def test_watch_with_exception(self):
pass
# expected

fake_api.get_thing.assert_called_once_with(
fake_api.get_thing.assert_called_with(
_preload_content=False, watch=True)
self.assertEqual(fake_api.get_thing.call_count, 2)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
Expand All @@ -447,8 +517,9 @@ def test_watch_with_error_event(self):
# No retry is attempted either, preventing an ApiException
assert not list(w.stream(fake_api.get_thing))

fake_api.get_thing.assert_called_once_with(
fake_api.get_thing.assert_called_with(
_preload_content=False, watch=True)
self.assertEqual(fake_api.get_thing.call_count, 2)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
Expand Down Expand Up @@ -500,8 +571,9 @@ def test_watch_with_error_event_and_timeout_param(self):
except client.rest.ApiException:
pass

fake_api.get_thing.assert_called_once_with(
fake_api.get_thing.assert_called_with(
_preload_content=False, watch=True, timeout_seconds=10)
self.assertEqual(fake_api.get_thing.call_count, 2)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
Expand Down