Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/informer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2026 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example: use SharedInformer to watch pods in the default namespace.

The informer runs a background daemon thread that keeps a local cache
synchronised with the Kubernetes API server. The main thread is free to
query the cache at any time without worrying about connectivity or retries.
"""

import time

import kubernetes
from kubernetes import config
from kubernetes.client import CoreV1Api
from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer


def on_pod_added(pod):
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
print("[ADDED] ", name)


def on_pod_modified(pod):
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
print("[MODIFIED]", name)


def on_pod_deleted(pod):
name = pod.metadata.name if hasattr(pod, "metadata") else pod["metadata"]["name"]
print("[DELETED] ", name)


def main():
config.load_kube_config()

v1 = CoreV1Api()
informer = SharedInformer(
list_func=v1.list_namespaced_pod,
namespace="default",
resync_period=60,
)

informer.add_event_handler(ADDED, on_pod_added)
informer.add_event_handler(MODIFIED, on_pod_modified)
informer.add_event_handler(DELETED, on_pod_deleted)

informer.start()
print('Informer started. Watching pods in "default" namespace ...')

try:
while True:
cached = informer.cache.list()
print("Cached pods: {}".format(len(cached)))
time.sleep(10)
except KeyboardInterrupt:
pass
finally:
informer.stop()
print("Informer stopped.")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
from . import stream
from . import utils
from . import leaderelection
from . import informer
225 changes: 225 additions & 0 deletions kubernetes/e2e_test/test_informer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Copyright 2026 The Kubernetes Authors.
# Licensed under the Apache License, Version 2.0 (the "License").
# End-to-end tests for kubernetes.informer.SharedInformer.

import threading
import time
import unittest
import uuid

from kubernetes.client import api_client
from kubernetes.client.api import core_v1_api
from kubernetes.e2e_test import base
from kubernetes.informer import ADDED, DELETED, MODIFIED, SharedInformer

_TIMEOUT = 30


def _uid():
return str(uuid.uuid4())[-12:]


def _cm(name, payload=None):
return {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": name, "labels": {"inf-e2e": "1"}},
"data": payload or {"k": "v"},
}


def _name_of(obj):
if hasattr(obj, "metadata"):
return obj.metadata.name
return (obj.get("metadata") or {}).get("name")


class TestSharedInformerE2E(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.cfg = base.get_e2e_configuration()
cls.apiclient = api_client.ApiClient(configuration=cls.cfg)
cls.api = core_v1_api.CoreV1Api(cls.apiclient)

def _drop(self, cm_name):
try:
self.api.delete_namespaced_config_map(name=cm_name, namespace="default")
except Exception:
pass

def _expect(self, ev, label):
if not ev.wait(timeout=_TIMEOUT):
self.fail("Timeout waiting for: " + label)

def _wait_in_cache(self, inf, key):
stop = time.monotonic() + _TIMEOUT
while time.monotonic() < stop:
if inf.cache.get_by_key(key) is not None:
return
time.sleep(0.25)
self.fail("key " + key + " never appeared in cache")

def _wait_listed(self, inf):
stop = time.monotonic() + _TIMEOUT
while inf._resource_version is None and time.monotonic() < stop:
time.sleep(0.1)
self.assertIsNotNone(inf._resource_version, "initial list never completed")

# -------------------------------------------------------

def test_cache_populated_after_start(self):
"""Pre-existing ConfigMaps appear in the cache once the informer starts."""
name = "inf-pre-" + _uid()
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self.addCleanup(self._drop, name)

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.start()
self.addCleanup(inf.stop)

self._wait_in_cache(inf, "default/" + name)
cached = inf.cache.get_by_key("default/" + name)
self.assertEqual(_name_of(cached), name)
# Verify the cached object actually contains the expected data payload.
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
self.assertEqual(data.get("k"), "v")

def test_added_event_and_cache_entry(self):
"""Creating a ConfigMap fires ADDED and the object appears in the cache."""
name = "inf-add-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)

self._wait_listed(inf)
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._expect(seen, "ADDED/" + name)
self.assertIsNotNone(inf.cache.get_by_key("default/" + name))

def test_modified_event_and_cache_refresh(self):
"""Patching a ConfigMap fires MODIFIED and the cache holds the updated object."""
name = "inf-mod-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(MODIFIED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)

self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._wait_in_cache(inf, "default/" + name)

self.api.patch_namespaced_config_map(
name=name, namespace="default", body={"data": {"k": "updated"}}
)
self._expect(seen, "MODIFIED/" + name)
# Verify that the cache now holds the updated data.
cached = inf.cache.get_by_key("default/" + name)
self.assertIsNotNone(cached)
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
self.assertEqual(data.get("k"), "updated")

def test_deleted_event_removes_from_cache(self):
"""Deleting a ConfigMap fires DELETED and removes it from the cache."""
name = "inf-del-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(DELETED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)

self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._wait_in_cache(inf, "default/" + name)

self.api.delete_namespaced_config_map(name=name, namespace="default")
self._expect(seen, "DELETED/" + name)
self.assertIsNone(inf.cache.get_by_key("default/" + name))

def test_resource_version_advances(self):
"""The stored resourceVersion advances after watch events are received."""
name = "inf-rv-" + _uid()
seen = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
)
inf.add_event_handler(ADDED, lambda o: seen.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)
self.addCleanup(self._drop, name)

self._wait_listed(inf)
rv_before = int(inf._resource_version)

self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self._expect(seen, "ADDED/" + name)
self.assertGreater(int(inf._resource_version), rv_before)


def test_resync_fires_modified_for_existing_objects(self):
"""Periodic resync re-lists from the API server and fires MODIFIED for cached objects.

A short resync_period (5 s) is used so the test completes quickly.
After the informer has cached the ConfigMap via the initial list, we
wait for a MODIFIED event that is fired by the resync, verifying that
the resync actually contacts the API server and triggers callbacks.
"""
name = "inf-rsync-" + _uid()
self.api.create_namespaced_config_map(body=_cm(name), namespace="default")
self.addCleanup(self._drop, name)

added = threading.Event()
resynced = threading.Event()

inf = SharedInformer(
list_func=self.api.list_namespaced_config_map,
namespace="default",
label_selector="inf-e2e=1",
resync_period=5,
)
inf.add_event_handler(ADDED, lambda o: added.set() if _name_of(o) == name else None)
# The resync fires MODIFIED for existing cached objects; wait for it.
inf.add_event_handler(MODIFIED, lambda o: resynced.set() if _name_of(o) == name else None)
inf.start()
self.addCleanup(inf.stop)

# First, wait for the object to be added to the cache.
self._expect(added, "ADDED/" + name)
# Then wait for the resync to fire MODIFIED (allow up to 3× resync_period).
if not resynced.wait(timeout=15):
self.fail("Timeout waiting for resync MODIFIED/" + name)

# Verify the cached object still holds the expected data.
cached = inf.cache.get_by_key("default/" + name)
self.assertIsNotNone(cached)
data = cached.data if hasattr(cached, "data") else (cached.get("data") or {})
self.assertEqual(data.get("k"), "v")


if __name__ == "__main__":
unittest.main()
27 changes: 27 additions & 0 deletions kubernetes/informer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2026 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .cache import ObjectCache, _meta_namespace_key
from .informer import SharedInformer, ADDED, MODIFIED, DELETED, BOOKMARK, ERROR

__all__ = [
"ObjectCache",
"_meta_namespace_key",
"SharedInformer",
"ADDED",
"MODIFIED",
"DELETED",
"BOOKMARK",
"ERROR",
]
Loading