diff --git a/lib/Consumer/LibCurl.php b/lib/Consumer/LibCurl.php index 973e960..4bb1838 100644 --- a/lib/Consumer/LibCurl.php +++ b/lib/Consumer/LibCurl.php @@ -52,34 +52,91 @@ public function getConsumer() * @return boolean whether the request succeeded */ public function flushBatch($messages) + { + if (empty($messages)) { + return true; + } + + return $this->sendBatch($messages); + } + + /** + * Sends a batch of messages, splitting if necessary to fit 32KB limit + * @param array $messages + * @return boolean success + */ + private function sendBatch($messages) { $body = $this->payload($messages); $payload = json_encode($body); - // Verify message size is below than 32KB + // Check 32KB limit if (strlen($payload) >= 32 * 1024) { - if ($this->debug()) { - $msg = "Message size is larger than 32KB"; - error_log("[PostHog][" . $this->type . "] " . $msg); - } + return $this->handleOversizedBatch($messages, strlen($payload)); + } + + // Send the batch + return $this->performHttpRequest($payload, $messages[0]); + } + /** + * Handles batches that exceed 32KB limit + * @param array $messages + * @param int $payloadSize + * @return boolean success + */ + private function handleOversizedBatch($messages, $payloadSize) + { + $messageCount = count($messages); + + // Single message too large - drop it + if ($messageCount === 1) { + $this->handleError( + 'payload_too_large', + sprintf( + 'Single message payload size (%d bytes) exceeds 32KB limit. Message will be dropped.', + $payloadSize + ) + ); return false; } + // Split and try both halves + $midpoint = intval($messageCount / 2); + $firstHalf = array_slice($messages, 0, $midpoint); + $secondHalf = array_slice($messages, $midpoint); + + $firstResult = $this->sendBatch($firstHalf); + $secondResult = $this->sendBatch($secondHalf); + + return $firstResult && $secondResult; + } + + /** + * Performs the actual HTTP request + * @param string $payload + * @param array $sampleMessage + * @return boolean success + */ + private function performHttpRequest($payload, $sampleMessage) + { if ($this->compress_request) { $payload = gzencode($payload); } - return $this->httpClient->sendRequest( + $response = $this->httpClient->sendRequest( '/batch/', $payload, [ - // Send user agent in the form of {library_name}/{library_version} as per RFC 7231. - "User-Agent: {$messages[0]['library']}/{$messages[0]['library_version']}", + "User-Agent: {$sampleMessage['library']}/{$sampleMessage['library_version']}", ], [ 'shouldVerify' => $this->options['verify_batch_events_request'] ?? true, ] - )->getResponse(); + ); + + // Return boolean based on whether we got a response + return !empty($response->getResponse()); } + } diff --git a/lib/QueueConsumer.php b/lib/QueueConsumer.php index 354e12c..efa37c0 100644 --- a/lib/QueueConsumer.php +++ b/lib/QueueConsumer.php @@ -95,16 +95,22 @@ public function alias(array $message) public function flush() { $count = count($this->queue); - $success = true; + $overallSuccess = true; - while ($count > 0 && $success) { + while ($count > 0) { $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); - $success = $this->flushBatch($batch); + $batchSuccess = $this->flushBatch($batch); + + // Track overall success but continue processing remaining batches + // This ensures we attempt to send all queued events even if some batches fail + if (!$batchSuccess) { + $overallSuccess = false; + } $count = count($this->queue); } - return $success; + return $overallSuccess; } /** diff --git a/test/MockErrorHandler.php b/test/MockErrorHandler.php new file mode 100644 index 0000000..22e5878 --- /dev/null +++ b/test/MockErrorHandler.php @@ -0,0 +1,31 @@ +errors[] = ['code' => $code, 'message' => $message]; + } + + public function hasError($code) + { + foreach ($this->errors as $error) { + if ($error['code'] === $code) { + return true; + } + } + return false; + } + + public function getErrors() + { + return $this->errors; + } +} diff --git a/test/MockedHttpClient.php b/test/MockedHttpClient.php index b00b297..6fda511 100644 --- a/test/MockedHttpClient.php +++ b/test/MockedHttpClient.php @@ -52,6 +52,10 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders return new HttpResponse(json_encode($this->flagEndpointResponse), 200); } + if (str_starts_with($path, "/batch/")) { + return new HttpResponse('{"status":"Ok"}', 200); + } + return parent::sendRequest($path, $payload, $extraHeaders, $requestOptions); } } diff --git a/test/PayloadSizeLimitFixTest.php b/test/PayloadSizeLimitFixTest.php new file mode 100644 index 0000000..0df93f3 --- /dev/null +++ b/test/PayloadSizeLimitFixTest.php @@ -0,0 +1,212 @@ +mockHttpClient = new MockedHttpClient( + "app.posthog.com", + true, + 10000, + false, + true + ); + + $this->client = new Client( + "test_api_key", + [ + "consumer" => "lib_curl", + "debug" => true, + "batch_size" => 10, // Small batch size to control test + ], + $this->mockHttpClient + ); + } + + /** + * Helper method to reset and count HTTP requests + */ + private function resetRequestCount(): void + { + $this->mockHttpClient->calls = []; + } + + /** + * Helper method to get number of batch requests made + */ + private function getBatchRequestCount(): int + { + if (!isset($this->mockHttpClient->calls)) { + return 0; + } + + $batchRequests = 0; + foreach ($this->mockHttpClient->calls as $call) { + if ($call['path'] === '/batch/') { + $batchRequests++; + } + } + return $batchRequests; + } + + /** + * Test that the fix properly handles oversized batches by splitting them + */ + public function testOversizedBatchSplitting(): void + { + // Reset request counter + $this->resetRequestCount(); + + // Create events with large properties to exceed 32KB when batched + $largeProperty = str_repeat('A', 4000); // 4KB string + + // Create 8 events, each ~4KB, totaling ~32KB+ (exceeds 32KB limit) + for ($i = 0; $i < 8; $i++) { + $result = $this->client->capture([ + "distinctId" => "user-{$i}", + "event" => "large_event_{$i}", + "properties" => [ + "large_data" => $largeProperty, + "event_index" => $i + ] + ]); + $this->assertTrue($result, "Event {$i} should be captured successfully"); + } + + // Flush remaining events + $flushResult = $this->client->flush(); + $this->assertTrue($flushResult, "Flush should succeed with splitting"); + + // Verify that multiple HTTP requests were made due to splitting + $requestCount = $this->getBatchRequestCount(); + $this->assertGreaterThan(1, $requestCount, "Multiple requests should be made when batch is split"); + } + + /** + * Test that single oversized messages are properly handled and reported + */ + public function testSingleOversizedMessage(): void + { + // Create a single event that exceeds 32KB + $veryLargeProperty = str_repeat('X', 33 * 1024); // 33KB string + + // Capture error logs + $errorHandler = new MockErrorHandler(); + $client = new Client( + "test_api_key", + [ + "consumer" => "lib_curl", + "debug" => true, + "error_handler" => [$errorHandler, 'handleError'] + ], + $this->mockHttpClient + ); + + $result = $client->capture([ + "distinctId" => "oversized_user", + "event" => "oversized_event", + "properties" => [ + "very_large_data" => $veryLargeProperty + ] + ]); + + // The event should still be accepted initially + $this->assertTrue($result, "Oversized event should be accepted initially"); + + // But flush should fail and error should be logged + $flushResult = $client->flush(); + $this->assertFalse($flushResult, "Flush should fail for oversized single message"); + + // Verify error was reported + $this->assertTrue( + $errorHandler->hasError('payload_too_large'), + "Error should be reported for oversized message" + ); + } + + /** + * Test that multiple small events that accumulate to exceed 32KB are handled properly + */ + public function testAccumulativePayloadSizeHandling(): void + { + $this->resetRequestCount(); + + // Each event is small (2KB) but 20 events = 40KB total + $smallProperty = str_repeat('Z', 2000); + + $allSuccessful = true; + for ($i = 0; $i < 20; $i++) { + $result = $this->client->capture([ + "distinctId" => "accumulative_user_{$i}", + "event" => "small_event", + "properties" => [ + "data" => $smallProperty, + "index" => $i + ] + ]); + if (!$result) { + $allSuccessful = false; + } + } + + $this->assertTrue($allSuccessful, "All small events should be accepted"); + + // Final flush should succeed because batches were split appropriately + $flushResult = $this->client->flush(); + $this->assertTrue($flushResult, "Final flush should succeed with proper batch splitting"); + + // Verify multiple requests were made + $requestCount = $this->getBatchRequestCount(); + $this->assertGreaterThan( + 1, + $requestCount, + "Multiple requests should be made for accumulative payload" + ); + } + + /** + * Test that normal-sized batches still work correctly + */ + public function testNormalSizedBatches(): void + { + $this->resetRequestCount(); + + // Create normal-sized events + for ($i = 0; $i < 5; $i++) { + $result = $this->client->capture([ + "distinctId" => "normal_user_{$i}", + "event" => "normal_event", + "properties" => [ + "small_data" => "normal data", + "index" => $i + ] + ]); + $this->assertTrue($result, "Normal event {$i} should be captured"); + } + + $flushResult = $this->client->flush(); + $this->assertTrue($flushResult, "Normal batch flush should succeed"); + + // Should only need one request for normal sized batch + $requestCount = $this->getBatchRequestCount(); + $this->assertEquals(1, $requestCount, "Only one request should be made for normal batch"); + } +}