From 08e43c74aba8d8da521e6cbcd7c10b16e7f72401 Mon Sep 17 00:00:00 2001 From: Eli Reisman Date: Sat, 27 Jun 2026 13:10:19 -0700 Subject: [PATCH 1/2] feat(capture): route analytics through v1 submitter when enabled Wires send_v1_batch into both send paths so capture_mode actually takes effect end to end. The consumer's async path (Consumer.request) and the client's sync path (_enqueue) now pick the analytics submitter by capture_mode: v1 -> the partial-retry send loop, v0 -> the legacy batch_post. The dedicated AI endpoint has no v1 form, so $ai_* events on it always use the legacy submitter regardless of capture_mode. Refactors Consumer.request to route via _send_analytics/_send_ai helpers, stores Client.max_retries so the sync path can pass it through, and forwards gzip/timeout/retries/historical_migration to the v1 submitter. Default is still v0, so existing callers are unaffected. Adds consumer routing-matrix tests (v0/v1, dedicated-AI split, config forwarding) and client sync-mode tests (v0 vs v1, dedicated-AI event stays legacy, analytics event uses v1). ruff/mypy clean. --- posthog/client.py | 23 ++++++-- posthog/consumer.py | 36 ++++++++++-- posthog/test/test_client.py | 67 ++++++++++++++++++++++ posthog/test/test_consumer.py | 92 +++++++++++++++++++++++++++++- references/public_api_snapshot.txt | 3 + 5 files changed, 210 insertions(+), 11 deletions(-) diff --git a/posthog/client.py b/posthog/client.py index 00d9c284..c9b3dcfa 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -17,6 +17,7 @@ from posthog._async_utils import _BackgroundEventLoopRunner from posthog.args import ID_TYPES, ExceptionArg, OptionalCaptureArgs, OptionalSetArgs from posthog.capture_mode import CaptureMode, resolve_capture_mode +from posthog.capture_v1 import send_v1_batch from posthog.consumer import Consumer from posthog.contexts import ( _get_current_context, @@ -373,6 +374,7 @@ def __init__( self._duplicate_client_registry_key: Optional[tuple[str, str]] = None self.gzip = gzip self.timeout = timeout + self.max_retries = max_retries self._feature_flags: Optional[list[Any]] = ( None # private variable to store flags ) @@ -1629,11 +1631,24 @@ def _enqueue(self, msg, disable_geoip): if self.sync_mode: self.log.debug("enqueued with blocking %s.", msg["event"]) - path = ( - AI_EVENTS_ENDPOINT - if self._dedicated_ai_endpoint and is_ai_event(msg.get("event")) - else EVENTS_ENDPOINT + is_dedicated_ai = self._dedicated_ai_endpoint and is_ai_event( + msg.get("event") ) + # Analytics events follow `capture_mode`; the dedicated AI endpoint + # has no v1 form and always uses the legacy submitter. + if not is_dedicated_ai and self.capture_mode == CaptureMode.V1: + send_v1_batch( + self.api_key, + self.host, + [msg], + gzip=self.gzip, + timeout=self.timeout, + max_retries=self.max_retries, + historical_migration=self.historical_migration, + ) + return sent_uuid + + path = AI_EVENTS_ENDPOINT if is_dedicated_ai else EVENTS_ENDPOINT batch_post( self.api_key, self.host, diff --git a/posthog/consumer.py b/posthog/consumer.py index cf5256db..74b581fb 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -6,6 +6,7 @@ from posthog._logging import _configure_posthog_logging from posthog.capture_mode import CaptureMode +from posthog.capture_v1 import send_v1_batch from posthog.request import ( AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT, @@ -154,9 +155,13 @@ def request(self, batch): invokes `on_error`); a second is logged here so it isn't silently lost. The batch was already dequeued in `upload()`, so unsent events are dropped after retries, same as the single-endpoint path. + + The analytics destination follows `capture_mode` (v1 -> partial-retry + submitter); the dedicated AI endpoint has no v1 form and always uses the + legacy submitter. """ if not self.dedicated_ai_endpoint: - self._send(batch, EVENTS_ENDPOINT) + self._send_analytics(batch) return ai_events: list[Any] = [] @@ -166,23 +171,42 @@ def request(self, batch): target.append(item) first_exc = None - for events, path in ( - (analytics_events, EVENTS_ENDPOINT), - (ai_events, AI_EVENTS_ENDPOINT), + for events, label, sender in ( + (analytics_events, "analytics", self._send_analytics), + (ai_events, AI_EVENTS_ENDPOINT, self._send_ai), ): if not events: continue try: - self._send(events, path) + sender(events) except Exception as e: if first_exc is None: first_exc = e else: - self.log.error("error uploading to %s: %s", path, e) + self.log.error("error uploading to %s: %s", label, e) if first_exc is not None: raise first_exc + def _send_analytics(self, batch): + """Submit analytics events via the wire protocol selected by `capture_mode`.""" + if self.capture_mode == CaptureMode.V1: + send_v1_batch( + self.api_key, + self.host, + batch, + gzip=self.gzip, + timeout=self.timeout, + max_retries=self.retries, + historical_migration=self.historical_migration, + ) + return + self._send(batch, EVENTS_ENDPOINT) + + def _send_ai(self, batch): + """Submit `$ai_*` events to the dedicated legacy AI endpoint (no v1 form).""" + self._send(batch, AI_EVENTS_ENDPOINT) + def _send(self, batch, path): """Attempt to upload a single batch to `path`, retrying before raising an error""" diff --git a/posthog/test/test_client.py b/posthog/test/test_client.py index 10a173fa..1e8f2813 100644 --- a/posthog/test/test_client.py +++ b/posthog/test/test_client.py @@ -3221,3 +3221,70 @@ def test_debug_flag_re_raises_exceptions(self, mock_enqueue): with self.assertRaises(Exception) as cm: method(*args, **kwargs) self.assertEqual(str(cm.exception), "Expected error") + + +class TestClientSyncCaptureMode(unittest.TestCase): + """Sync-mode `_enqueue` selects the analytics submitter by `capture_mode`; + the dedicated AI endpoint always uses the legacy submitter.""" + + def _client(self, **kwargs): + return Client(FAKE_TEST_API_KEY, sync_mode=True, **kwargs) + + def test_v0_sync_uses_legacy_batch_post(self): + with ( + mock.patch("posthog.client.batch_post") as mock_post, + mock.patch("posthog.client.send_v1_batch") as mock_v1, + ): + self._client().capture("evt", distinct_id="d") + mock_v1.assert_not_called() + mock_post.assert_called_once() + + def test_v1_sync_uses_v1_submitter(self): + with ( + mock.patch("posthog.client.batch_post") as mock_post, + mock.patch("posthog.client.send_v1_batch") as mock_v1, + ): + self._client(capture_mode="v1").capture("evt", distinct_id="d") + mock_post.assert_not_called() + mock_v1.assert_called_once() + sent_batch = mock_v1.call_args.args[2] + self.assertEqual(len(sent_batch), 1) + self.assertEqual(sent_batch[0]["event"], "evt") + + def test_v1_sync_forwards_config_to_submitter(self): + with ( + mock.patch("posthog.client.batch_post"), + mock.patch("posthog.client.send_v1_batch") as mock_v1, + ): + self._client( + capture_mode="v1", + gzip=True, + max_retries=4, + historical_migration=True, + ).capture("evt", distinct_id="d") + kwargs = mock_v1.call_args.kwargs + self.assertEqual(kwargs["gzip"], True) + self.assertEqual(kwargs["max_retries"], 4) + self.assertEqual(kwargs["historical_migration"], True) + + def test_v1_sync_dedicated_ai_event_stays_legacy(self): + # $ai_* on the dedicated AI endpoint has no v1 form. + with ( + mock.patch("posthog.client.batch_post") as mock_post, + mock.patch("posthog.client.send_v1_batch") as mock_v1, + ): + client = self._client(capture_mode="v1", _dedicated_ai_endpoint=True) + client.capture("$ai_generation", distinct_id="d") + mock_v1.assert_not_called() + mock_post.assert_called_once() + self.assertEqual(mock_post.call_args.kwargs["path"], "/i/v0/ai/batch/") + + def test_v1_sync_dedicated_ai_analytics_event_uses_v1(self): + with ( + mock.patch("posthog.client.batch_post") as mock_post, + mock.patch("posthog.client.send_v1_batch") as mock_v1, + ): + client = self._client(capture_mode="v1", _dedicated_ai_endpoint=True) + client.capture("regular_event", distinct_id="d") + mock_post.assert_not_called() + mock_v1.assert_called_once() diff --git a/posthog/test/test_consumer.py b/posthog/test/test_consumer.py index c136e2b3..8fff242a 100644 --- a/posthog/test/test_consumer.py +++ b/posthog/test/test_consumer.py @@ -11,8 +11,9 @@ except ImportError: from Queue import Queue +from posthog.capture_mode import CaptureMode from posthog.consumer import MAX_MSG_SIZE, Consumer -from posthog.request import APIError +from posthog.request import AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT, APIError from posthog.test.logging_helpers import capture_message_only_logs from posthog.test.test_utils import TEST_API_KEY @@ -268,3 +269,92 @@ def on_error(e: Exception, batch: list[dict[str, str]]) -> None: self.assertEqual(len(on_error_called), 1) self.assertEqual(str(on_error_called[0][0]), "request failed") self.assertEqual(on_error_called[0][1], [track]) + + +def _ai_event(event_name: str = "$ai_generation") -> dict[str, str]: + return {"type": "track", "event": event_name, "distinct_id": "distinct_id"} + + +class TestConsumerCaptureModeRouting(unittest.TestCase): + """`capture_mode` selects the analytics submitter; the dedicated AI endpoint + has no v1 form and always uses the legacy submitter.""" + + def test_v0_uses_legacy_batch_post(self) -> None: + consumer = Consumer(None, TEST_API_KEY, capture_mode=CaptureMode.V0) + with ( + mock.patch("posthog.consumer.batch_post") as mock_post, + mock.patch("posthog.consumer.send_v1_batch") as mock_v1, + ): + consumer.request([_track_event()]) + mock_v1.assert_not_called() + mock_post.assert_called_once() + self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) + + def test_v1_routes_analytics_to_v1_submitter(self) -> None: + consumer = Consumer(None, TEST_API_KEY, capture_mode=CaptureMode.V1) + batch = [_track_event()] + with ( + mock.patch("posthog.consumer.batch_post") as mock_post, + mock.patch("posthog.consumer.send_v1_batch") as mock_v1, + ): + consumer.request(batch) + mock_post.assert_not_called() + mock_v1.assert_called_once() + self.assertEqual(mock_v1.call_args.args[2], batch) + + def test_v1_forwards_consumer_config_to_submitter(self) -> None: + consumer = Consumer( + None, + TEST_API_KEY, + capture_mode=CaptureMode.V1, + gzip=True, + timeout=7, + retries=4, + historical_migration=True, + ) + with ( + mock.patch("posthog.consumer.batch_post"), + mock.patch("posthog.consumer.send_v1_batch") as mock_v1, + ): + consumer.request([_track_event()]) + kwargs = mock_v1.call_args.kwargs + self.assertEqual(kwargs["gzip"], True) + self.assertEqual(kwargs["timeout"], 7) + self.assertEqual(kwargs["max_retries"], 4) + self.assertEqual(kwargs["historical_migration"], True) + + def test_v1_dedicated_ai_splits_submitters(self) -> None: + # Analytics -> v1 submitter; $ai_* -> legacy AI endpoint. + consumer = Consumer( + None, + TEST_API_KEY, + capture_mode=CaptureMode.V1, + dedicated_ai_endpoint=True, + ) + analytics, ai = _track_event(), _ai_event() + with ( + mock.patch("posthog.consumer.batch_post") as mock_post, + mock.patch("posthog.consumer.send_v1_batch") as mock_v1, + ): + consumer.request([analytics, ai]) + mock_v1.assert_called_once() + self.assertEqual(mock_v1.call_args.args[2], [analytics]) + mock_post.assert_called_once() + self.assertEqual(mock_post.call_args.kwargs["path"], AI_EVENTS_ENDPOINT) + self.assertEqual(mock_post.call_args.kwargs["batch"], [ai]) + + def test_v1_dedicated_ai_only_ai_events_skips_v1_submitter(self) -> None: + consumer = Consumer( + None, + TEST_API_KEY, + capture_mode=CaptureMode.V1, + dedicated_ai_endpoint=True, + ) + with ( + mock.patch("posthog.consumer.batch_post") as mock_post, + mock.patch("posthog.consumer.send_v1_batch") as mock_v1, + ): + consumer.request([_ai_event()]) + mock_v1.assert_not_called() + mock_post.assert_called_once() + self.assertEqual(mock_post.call_args.kwargs["path"], AI_EVENTS_ENDPOINT) diff --git a/references/public_api_snapshot.txt b/references/public_api_snapshot.txt index 613169f7..ef85f2cf 100644 --- a/references/public_api_snapshot.txt +++ b/references/public_api_snapshot.txt @@ -284,6 +284,7 @@ alias posthog.client.remote_config -> posthog.request.remote_config alias posthog.client.reset_sessions -> posthog.request.reset_sessions alias posthog.client.resolve_bucketing_value -> posthog.feature_flags.resolve_bucketing_value alias posthog.client.resolve_capture_mode -> posthog.capture_mode.resolve_capture_mode +alias posthog.client.send_v1_batch -> posthog.capture_v1.send_v1_batch alias posthog.client.system_context -> posthog.utils.system_context alias posthog.client.to_flags_and_payloads -> posthog.types.to_flags_and_payloads alias posthog.client.to_payloads -> posthog.types.to_payloads @@ -296,6 +297,7 @@ alias posthog.consumer.DatetimeSerializer -> posthog.request.DatetimeSerializer alias posthog.consumer.EVENTS_ENDPOINT -> posthog.request.EVENTS_ENDPOINT alias posthog.consumer.batch_post -> posthog.request.batch_post alias posthog.consumer.is_ai_event -> posthog.request.is_ai_event +alias posthog.consumer.send_v1_batch -> posthog.capture_v1.send_v1_batch alias posthog.contexts.Client -> posthog.client.Client alias posthog.disable_connection_reuse -> posthog.request.disable_connection_reuse alias posthog.enable_keep_alive -> posthog.request.enable_keep_alive @@ -548,6 +550,7 @@ attribute posthog.client.Client.in_app_modules = in_app_modules attribute posthog.client.Client.is_server = is_server attribute posthog.client.Client.log = logging.getLogger('posthog') attribute posthog.client.Client.log_captured_exceptions = log_captured_exceptions +attribute posthog.client.Client.max_retries = max_retries attribute posthog.client.Client.on_error = on_error attribute posthog.client.Client.personal_api_key = (personal_api_key.strip() if isinstance(personal_api_key, str) else personal_api_key) or None attribute posthog.client.Client.poll_interval = poll_interval From 73e034fe5eda638399acdd6d883e70d7c1679788 Mon Sep 17 00:00:00 2001 From: Eli Reisman Date: Sat, 27 Jun 2026 15:32:24 -0700 Subject: [PATCH 2/2] feat(capture): wire capture_compression through client and consumer Resolve capture_compression once on the client (kwarg > env > legacy gzip flag > none) and thread it to the v1 submitter via the consumer and the sync path. Parameterize the capture_mode routing tests and use consistent submitter labels. --- posthog/client.py | 19 +++++++++- posthog/consumer.py | 7 +++- posthog/test/test_capture_compression.py | 48 ++++++++++++++++++++++++ posthog/test/test_client.py | 40 +++++++++++++------- posthog/test/test_consumer.py | 33 ++++++++-------- references/public_api_snapshot.txt | 9 ++++- 6 files changed, 123 insertions(+), 33 deletions(-) diff --git a/posthog/client.py b/posthog/client.py index c9b3dcfa..4077e51e 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -16,6 +16,10 @@ from posthog._async_utils import _BackgroundEventLoopRunner from posthog.args import ID_TYPES, ExceptionArg, OptionalCaptureArgs, OptionalSetArgs +from posthog.capture_compression import ( + CaptureCompression, + resolve_capture_compression, +) from posthog.capture_mode import CaptureMode, resolve_capture_mode from posthog.capture_v1 import send_v1_batch from posthog.consumer import Consumer @@ -262,6 +266,7 @@ def __init__( exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, capture_mode: Optional[Union[CaptureMode, str]] = None, + capture_compression: Optional[Union[CaptureCompression, str]] = None, _dedicated_ai_endpoint=False, ): """ @@ -347,6 +352,11 @@ def __init__( (or pass the string ``"v1"``) to opt into ``/i/v1/analytics/events``. When omitted, the ``POSTHOG_CAPTURE_MODE`` env var is consulted, then ``V0``. + capture_compression: Request-body compression for capture-v1 uploads + (ignored in V0, which uses ``gzip``). ``CaptureCompression.GZIP`` + or ``DEFLATE`` (or the strings ``"gzip"``/``"deflate"``). When + omitted, the ``POSTHOG_CAPTURE_COMPRESSION`` env var is consulted, + then the legacy ``gzip`` flag, then no compression. Examples: ```python @@ -404,6 +414,11 @@ def __init__( # `/i/v1/analytics/events`). Resolved here so the env-var fallback is # applied once; V0 is the default and keeps upgrades transparent. self.capture_mode = resolve_capture_mode(capture_mode) + # v1-only request compression; falls back to the legacy `gzip` flag when + # neither the kwarg nor POSTHOG_CAPTURE_COMPRESSION is set. + self.capture_compression = resolve_capture_compression( + capture_compression, gzip_fallback=gzip + ) # Internal, not ready for use: routes `$ai_*` events to a dedicated # capture-ai endpoint while the backend route + ingress roll out. self._dedicated_ai_endpoint = _dedicated_ai_endpoint @@ -510,6 +525,7 @@ def __init__( historical_migration=historical_migration, dedicated_ai_endpoint=self._dedicated_ai_endpoint, capture_mode=self.capture_mode, + capture_compression=self.capture_compression, ) self.consumers.append(consumer) @@ -1532,6 +1548,7 @@ def _reinit_after_fork(self): historical_migration=old.historical_migration, dedicated_ai_endpoint=old.dedicated_ai_endpoint, capture_mode=old.capture_mode, + capture_compression=old.capture_compression, ) new_consumers.append(consumer) @@ -1641,7 +1658,7 @@ def _enqueue(self, msg, disable_geoip): self.api_key, self.host, [msg], - gzip=self.gzip, + compression=self.capture_compression, timeout=self.timeout, max_retries=self.max_retries, historical_migration=self.historical_migration, diff --git a/posthog/consumer.py b/posthog/consumer.py index 74b581fb..f6477dc3 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -5,6 +5,7 @@ from threading import Thread from posthog._logging import _configure_posthog_logging +from posthog.capture_compression import CaptureCompression from posthog.capture_mode import CaptureMode from posthog.capture_v1 import send_v1_batch from posthog.request import ( @@ -53,6 +54,7 @@ def __init__( historical_migration=False, dedicated_ai_endpoint=False, capture_mode=CaptureMode.V0, + capture_compression=CaptureCompression.NONE, ): """Create a consumer thread.""" Thread.__init__(self) @@ -67,6 +69,7 @@ def __init__( self.gzip = gzip self.dedicated_ai_endpoint = dedicated_ai_endpoint self.capture_mode = capture_mode + self.capture_compression = capture_compression # It's important to set running in the constructor: if we are asked to # pause immediately after construction, we might set running to True in # run() *after* we set it to False in pause... and keep running @@ -173,7 +176,7 @@ def request(self, batch): first_exc = None for events, label, sender in ( (analytics_events, "analytics", self._send_analytics), - (ai_events, AI_EVENTS_ENDPOINT, self._send_ai), + (ai_events, "ai", self._send_ai), ): if not events: continue @@ -195,7 +198,7 @@ def _send_analytics(self, batch): self.api_key, self.host, batch, - gzip=self.gzip, + compression=self.capture_compression, timeout=self.timeout, max_retries=self.retries, historical_migration=self.historical_migration, diff --git a/posthog/test/test_capture_compression.py b/posthog/test/test_capture_compression.py index 70c5f0b4..b3bbcd65 100644 --- a/posthog/test/test_capture_compression.py +++ b/posthog/test/test_capture_compression.py @@ -9,7 +9,10 @@ CaptureCompression, resolve_capture_compression, ) +from posthog.client import Client +from posthog.consumer import Consumer from posthog.test.logging_helpers import capture_message_only_logs +from posthog.test.test_utils import TEST_API_KEY class TestResolveCaptureCompression(unittest.TestCase): @@ -98,3 +101,48 @@ def test_unrecognized_env_var_warns_and_uses_fallback(self) -> None: CaptureCompression.GZIP, ) self.assertIn("bogus", stream.getvalue()) + + +class TestCaptureCompressionPlumbing(unittest.TestCase): + def setUp(self) -> None: + patcher = mock.patch.dict(os.environ, {}, clear=False) + patcher.start() + self.addCleanup(patcher.stop) + os.environ.pop(CAPTURE_COMPRESSION_ENV_VAR, None) + + def test_client_defaults_to_none(self) -> None: + client = Client(TEST_API_KEY, sync_mode=True) + self.assertIs(client.capture_compression, CaptureCompression.NONE) + + def test_client_gzip_flag_falls_back_to_gzip(self) -> None: + client = Client(TEST_API_KEY, sync_mode=True, gzip=True) + self.assertIs(client.capture_compression, CaptureCompression.GZIP) + + @parameterized.expand( + [ + ("enum_deflate", CaptureCompression.DEFLATE, CaptureCompression.DEFLATE), + ("str_gzip", "gzip", CaptureCompression.GZIP), + ("str_none", "none", CaptureCompression.NONE), + ] + ) + def test_client_kwarg_overrides_gzip_flag(self, _name, kwarg, expected) -> None: + # Even with the legacy gzip flag on, the explicit kwarg wins. + client = Client( + TEST_API_KEY, sync_mode=True, gzip=True, capture_compression=kwarg + ) + self.assertIs(client.capture_compression, expected) + + def test_client_propagates_to_consumers(self) -> None: + client = Client( + TEST_API_KEY, + capture_compression=CaptureCompression.DEFLATE, + send=False, + thread=2, + ) + self.assertEqual(len(client.consumers), 2) + for consumer in client.consumers: + self.assertIs(consumer.capture_compression, CaptureCompression.DEFLATE) + + def test_consumer_defaults_to_none(self) -> None: + consumer = Consumer(None, TEST_API_KEY) + self.assertIs(consumer.capture_compression, CaptureCompression.NONE) diff --git a/posthog/test/test_client.py b/posthog/test/test_client.py index 1e8f2813..dc640c31 100644 --- a/posthog/test/test_client.py +++ b/posthog/test/test_client.py @@ -8,6 +8,7 @@ from unittest import mock from parameterized import parameterized +from posthog.capture_compression import CaptureCompression from posthog.client import Client from posthog.contexts import get_context_session_id, new_context, set_context_session from posthog.request import APIError, GetResponse @@ -3230,26 +3231,28 @@ class TestClientSyncCaptureMode(unittest.TestCase): def _client(self, **kwargs): return Client(FAKE_TEST_API_KEY, sync_mode=True, **kwargs) - def test_v0_sync_uses_legacy_batch_post(self): - with ( - mock.patch("posthog.client.batch_post") as mock_post, - mock.patch("posthog.client.send_v1_batch") as mock_v1, - ): - self._client().capture("evt", distinct_id="d") - mock_v1.assert_not_called() - mock_post.assert_called_once() - - def test_v1_sync_uses_v1_submitter(self): + @parameterized.expand( + [ + ("v0", None, False), + ("v1", "v1", True), + ] + ) + def test_capture_mode_selects_sync_submitter(self, _name, capture_mode, expects_v1): + kwargs = {"capture_mode": capture_mode} if capture_mode else {} with ( mock.patch("posthog.client.batch_post") as mock_post, mock.patch("posthog.client.send_v1_batch") as mock_v1, ): - self._client(capture_mode="v1").capture("evt", distinct_id="d") + self._client(**kwargs).capture("evt", distinct_id="d") + if expects_v1: mock_post.assert_not_called() mock_v1.assert_called_once() sent_batch = mock_v1.call_args.args[2] self.assertEqual(len(sent_batch), 1) self.assertEqual(sent_batch[0]["event"], "evt") + else: + mock_v1.assert_not_called() + mock_post.assert_called_once() def test_v1_sync_forwards_config_to_submitter(self): with ( @@ -3258,15 +3261,26 @@ def test_v1_sync_forwards_config_to_submitter(self): ): self._client( capture_mode="v1", - gzip=True, + capture_compression=CaptureCompression.GZIP, max_retries=4, historical_migration=True, ).capture("evt", distinct_id="d") kwargs = mock_v1.call_args.kwargs - self.assertEqual(kwargs["gzip"], True) + self.assertEqual(kwargs["compression"], CaptureCompression.GZIP) self.assertEqual(kwargs["max_retries"], 4) self.assertEqual(kwargs["historical_migration"], True) + def test_v1_sync_gzip_flag_falls_back_to_gzip_compression(self): + # Legacy `gzip=True` with no explicit capture_compression -> GZIP on v1. + with ( + mock.patch("posthog.client.batch_post"), + mock.patch("posthog.client.send_v1_batch") as mock_v1, + ): + self._client(capture_mode="v1", gzip=True).capture("evt", distinct_id="d") + self.assertEqual( + mock_v1.call_args.kwargs["compression"], CaptureCompression.GZIP + ) + def test_v1_sync_dedicated_ai_event_stays_legacy(self): # $ai_* on the dedicated AI endpoint has no v1 form. with ( diff --git a/posthog/test/test_consumer.py b/posthog/test/test_consumer.py index 8fff242a..0060f257 100644 --- a/posthog/test/test_consumer.py +++ b/posthog/test/test_consumer.py @@ -11,6 +11,7 @@ except ImportError: from Queue import Queue +from posthog.capture_compression import CaptureCompression from posthog.capture_mode import CaptureMode from posthog.consumer import MAX_MSG_SIZE, Consumer from posthog.request import AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT, APIError @@ -279,35 +280,37 @@ class TestConsumerCaptureModeRouting(unittest.TestCase): """`capture_mode` selects the analytics submitter; the dedicated AI endpoint has no v1 form and always uses the legacy submitter.""" - def test_v0_uses_legacy_batch_post(self) -> None: - consumer = Consumer(None, TEST_API_KEY, capture_mode=CaptureMode.V0) - with ( - mock.patch("posthog.consumer.batch_post") as mock_post, - mock.patch("posthog.consumer.send_v1_batch") as mock_v1, - ): - consumer.request([_track_event()]) - mock_v1.assert_not_called() - mock_post.assert_called_once() - self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) - - def test_v1_routes_analytics_to_v1_submitter(self) -> None: - consumer = Consumer(None, TEST_API_KEY, capture_mode=CaptureMode.V1) + @parameterized.expand( + [ + ("v0", CaptureMode.V0, False), + ("v1", CaptureMode.V1, True), + ] + ) + def test_capture_mode_selects_analytics_submitter( + self, _name, mode, expects_v1 + ) -> None: + consumer = Consumer(None, TEST_API_KEY, capture_mode=mode) batch = [_track_event()] with ( mock.patch("posthog.consumer.batch_post") as mock_post, mock.patch("posthog.consumer.send_v1_batch") as mock_v1, ): consumer.request(batch) + if expects_v1: mock_post.assert_not_called() mock_v1.assert_called_once() self.assertEqual(mock_v1.call_args.args[2], batch) + else: + mock_v1.assert_not_called() + mock_post.assert_called_once() + self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) def test_v1_forwards_consumer_config_to_submitter(self) -> None: consumer = Consumer( None, TEST_API_KEY, capture_mode=CaptureMode.V1, - gzip=True, + capture_compression=CaptureCompression.DEFLATE, timeout=7, retries=4, historical_migration=True, @@ -318,7 +321,7 @@ def test_v1_forwards_consumer_config_to_submitter(self) -> None: ): consumer.request([_track_event()]) kwargs = mock_v1.call_args.kwargs - self.assertEqual(kwargs["gzip"], True) + self.assertEqual(kwargs["compression"], CaptureCompression.DEFLATE) self.assertEqual(kwargs["timeout"], 7) self.assertEqual(kwargs["max_retries"], 4) self.assertEqual(kwargs["historical_migration"], True) diff --git a/references/public_api_snapshot.txt b/references/public_api_snapshot.txt index ef85f2cf..5b9476f9 100644 --- a/references/public_api_snapshot.txt +++ b/references/public_api_snapshot.txt @@ -223,6 +223,7 @@ alias posthog.capture_v1.normalize_host -> posthog.request.normalize_host alias posthog.capture_v1.remove_trailing_slash -> posthog.utils.remove_trailing_slash alias posthog.client.AI_EVENTS_ENDPOINT -> posthog.request.AI_EVENTS_ENDPOINT alias posthog.client.APIError -> posthog.request.APIError +alias posthog.client.CaptureCompression -> posthog.capture_compression.CaptureCompression alias posthog.client.CaptureMode -> posthog.capture_mode.CaptureMode alias posthog.client.Consumer -> posthog.consumer.Consumer alias posthog.client.DEFAULT_CODE_VARIABLES_DETECT_SECRETS -> posthog.exception_utils.DEFAULT_CODE_VARIABLES_DETECT_SECRETS @@ -283,6 +284,7 @@ alias posthog.client.normalize_host -> posthog.request.normalize_host alias posthog.client.remote_config -> posthog.request.remote_config alias posthog.client.reset_sessions -> posthog.request.reset_sessions alias posthog.client.resolve_bucketing_value -> posthog.feature_flags.resolve_bucketing_value +alias posthog.client.resolve_capture_compression -> posthog.capture_compression.resolve_capture_compression alias posthog.client.resolve_capture_mode -> posthog.capture_mode.resolve_capture_mode alias posthog.client.send_v1_batch -> posthog.capture_v1.send_v1_batch alias posthog.client.system_context -> posthog.utils.system_context @@ -292,6 +294,7 @@ alias posthog.client.to_values -> posthog.types.to_values alias posthog.client.try_attach_code_variables_to_frames -> posthog.exception_utils.try_attach_code_variables_to_frames alias posthog.consumer.AI_EVENTS_ENDPOINT -> posthog.request.AI_EVENTS_ENDPOINT alias posthog.consumer.APIError -> posthog.request.APIError +alias posthog.consumer.CaptureCompression -> posthog.capture_compression.CaptureCompression alias posthog.consumer.CaptureMode -> posthog.capture_mode.CaptureMode alias posthog.consumer.DatetimeSerializer -> posthog.request.DatetimeSerializer alias posthog.consumer.EVENTS_ENDPOINT -> posthog.request.EVENTS_ENDPOINT @@ -517,6 +520,7 @@ attribute posthog.capture_v1.V1ParsedResponse.retry_after: Optional[float] = Non attribute posthog.capture_v1.V1ParsedResponse.status_code: int attribute posthog.capture_v1.log = logging.getLogger('posthog') attribute posthog.client.Client.api_key = (project_api_key or '').strip() +attribute posthog.client.Client.capture_compression = resolve_capture_compression(capture_compression, gzip_fallback=gzip) attribute posthog.client.Client.capture_exception_code_variables = capture_exception_code_variables attribute posthog.client.Client.capture_mode = resolve_capture_mode(capture_mode) attribute posthog.client.Client.code_variables_detect_secrets = code_variables_detect_secrets if code_variables_detect_secrets is not None else DEFAULT_CODE_VARIABLES_DETECT_SECRETS @@ -571,6 +575,7 @@ attribute posthog.code_variables_mask_url_credentials = DEFAULT_CODE_VARIABLES_M attribute posthog.consumer.AI_MAX_MSG_SIZE = 8 * 1024 * 1024 attribute posthog.consumer.BATCH_SIZE_LIMIT = 5 * 1024 * 1024 attribute posthog.consumer.Consumer.api_key = api_key +attribute posthog.consumer.Consumer.capture_compression = capture_compression attribute posthog.consumer.Consumer.capture_mode = capture_mode attribute posthog.consumer.Consumer.daemon = True attribute posthog.consumer.Consumer.dedicated_ai_endpoint = dedicated_ai_endpoint @@ -879,8 +884,8 @@ class posthog.capture_mode.CaptureMode class posthog.capture_v1.CaptureV1Error(status: int | str, message: str, *, retry_after: Optional[float] = None, request_id: Optional[str] = None, attempts: Optional[int] = None, retry_exhausted: Optional[list[str]] = None, drops: Optional[list[tuple[str, Optional[str]]]] = None) class posthog.capture_v1.V1EventResult(result: Optional[str], details: Optional[str] = None) class posthog.capture_v1.V1ParsedResponse(status_code: int, is_success: bool, retry_after: Optional[float] = None, results: Optional[dict[str, V1EventResult]] = None, malformed: bool = False, error_message: str = '') -class posthog.client.Client(project_api_key: str, host=None, debug=False, max_queue_size=10000, send=True, on_error=None, flush_at=100, flush_interval=5.0, gzip=False, max_retries=3, sync_mode=False, timeout=15, thread=1, poll_interval=30, personal_api_key=None, disabled=False, disable_geoip=True, is_server=True, historical_migration=False, feature_flags_request_timeout_seconds=3, super_properties=None, enable_exception_autocapture=False, log_captured_exceptions=False, project_root=None, privacy_mode=False, before_send=None, flag_fallback_cache_url=None, enable_local_evaluation=True, flag_definition_cache_provider: Optional[FlagDefinitionCacheProvider] = None, capture_exception_code_variables=False, code_variables_mask_patterns=None, code_variables_ignore_patterns=None, code_variables_mask_url_credentials=None, code_variables_detect_secrets=None, in_app_modules: list[str] | None = None, enable_exception_autocapture_rate_limiting=False, exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE, exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, capture_mode: Optional[Union[CaptureMode, str]] = None, _dedicated_ai_endpoint=False) -class posthog.consumer.Consumer(queue, api_key, flush_at=100, host=None, on_error=None, flush_interval=5.0, gzip=False, retries=10, timeout=15, historical_migration=False, dedicated_ai_endpoint=False, capture_mode=CaptureMode.V0) +class posthog.client.Client(project_api_key: str, host=None, debug=False, max_queue_size=10000, send=True, on_error=None, flush_at=100, flush_interval=5.0, gzip=False, max_retries=3, sync_mode=False, timeout=15, thread=1, poll_interval=30, personal_api_key=None, disabled=False, disable_geoip=True, is_server=True, historical_migration=False, feature_flags_request_timeout_seconds=3, super_properties=None, enable_exception_autocapture=False, log_captured_exceptions=False, project_root=None, privacy_mode=False, before_send=None, flag_fallback_cache_url=None, enable_local_evaluation=True, flag_definition_cache_provider: Optional[FlagDefinitionCacheProvider] = None, capture_exception_code_variables=False, code_variables_mask_patterns=None, code_variables_ignore_patterns=None, code_variables_mask_url_credentials=None, code_variables_detect_secrets=None, in_app_modules: list[str] | None = None, enable_exception_autocapture_rate_limiting=False, exception_autocapture_bucket_size=ExceptionCapture.DEFAULT_BUCKET_SIZE, exception_autocapture_refill_rate=ExceptionCapture.DEFAULT_REFILL_RATE, exception_autocapture_refill_interval_seconds=ExceptionCapture.DEFAULT_REFILL_INTERVAL_SECONDS, capture_mode: Optional[Union[CaptureMode, str]] = None, capture_compression: Optional[Union[CaptureCompression, str]] = None, _dedicated_ai_endpoint=False) +class posthog.consumer.Consumer(queue, api_key, flush_at=100, host=None, on_error=None, flush_interval=5.0, gzip=False, retries=10, timeout=15, historical_migration=False, dedicated_ai_endpoint=False, capture_mode=CaptureMode.V0, capture_compression=CaptureCompression.NONE) class posthog.contexts.ContextScope(parent=None, fresh: bool = False, capture_exceptions: bool = True, client: Optional[Client] = None) class posthog.exception_capture.ExceptionCapture(client: Client, rate_limiting_enabled=False, bucket_size=DEFAULT_BUCKET_SIZE, refill_rate=DEFAULT_REFILL_RATE, refill_interval_seconds=DEFAULT_REFILL_INTERVAL_SECONDS) class posthog.exception_utils.AnnotatedValue(value, metadata)