Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/quiet-flags-retry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'PostHog': patch
'PostHog.AspNetCore': patch
---

Add a per-client circuit breaker for feature flag requests after consecutive transient network failures, temporarily failing fast before probing for recovery.
2 changes: 2 additions & 0 deletions src/PostHog/Api/PostHogApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal sealed class PostHogApiClient : IDisposable
readonly HttpClient _httpClient;
readonly IOptions<PostHogOptions> _options;
readonly ILogger<PostHogApiClient> _logger;
readonly FeatureFlagRequestCircuitBreaker _featureFlagRequestCircuitBreaker = new();

/// <summary>
/// Initialize a new PostHog client
Expand Down Expand Up @@ -159,6 +160,7 @@ public async Task<ApiResult> SendEventAsync(
payload,
_timeProvider,
_options.Value,
_featureFlagRequestCircuitBreaker,
cancellationToken);
}

Expand Down
138 changes: 131 additions & 7 deletions src/PostHog/Library/HttpClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,37 @@ internal static class HttpClientExtensions
object content,
TimeProvider timeProvider,
PostHogOptions options,
FeatureFlagRequestCircuitBreaker circuitBreaker,
CancellationToken cancellationToken)
{
var maxRetries = options.FeatureFlagRequestMaxRetries;
var currentDelay = options.InitialRetryDelay;
var maxDelay = options.MaxRetryDelay;
var attempt = 0;

if (!circuitBreaker.TryEnter(timeProvider, out var isHalfOpenProbe))
Comment thread
marandaneto marked this conversation as resolved.
{
throw new HttpRequestException("Feature flag request circuit breaker is open.");
}

async Task DelayBeforeRetry()
{
await Delay(timeProvider, currentDelay > maxDelay ? maxDelay : currentDelay, cancellationToken);
currentDelay = DoubleWithCap(currentDelay, maxDelay);
}

async Task<bool> ShouldRetryAfterTransientFailure()
{
var circuitClosed = circuitBreaker.RecordTransientFailure(timeProvider, isHalfOpenProbe);
if (attempt > maxRetries || !circuitClosed)
{
return false;
}

await DelayBeforeRetry();
return true;
}

while (true)
{
attempt++;
Expand All @@ -75,24 +99,53 @@ internal static class HttpClientExtensions
JsonSerializerHelper.Options,
cancellationToken);
}
catch (HttpRequestException e) when (attempt <= maxRetries && IsRetryableFlagsHttpRequestException(e))
catch (HttpRequestException e) when (IsRetryableFlagsHttpRequestException(e))
Comment thread
marandaneto marked this conversation as resolved.
{
await Delay(timeProvider, currentDelay > maxDelay ? maxDelay : currentDelay, cancellationToken);
currentDelay = DoubleWithCap(currentDelay, maxDelay);
if (!await ShouldRetryAfterTransientFailure())
{
throw;
}

continue;
Comment thread
marandaneto marked this conversation as resolved.
}
catch (TaskCanceledException) when (!cancellationToken.IsCancellationRequested && attempt <= maxRetries)
catch (HttpRequestException)
Comment thread
marandaneto marked this conversation as resolved.
{
await Delay(timeProvider, currentDelay > maxDelay ? maxDelay : currentDelay, cancellationToken);
currentDelay = DoubleWithCap(currentDelay, maxDelay);
if (isHalfOpenProbe)
{
circuitBreaker.RecordTransientFailure(timeProvider, isHalfOpenProbe: true);
}
throw;
}
catch (TaskCanceledException) when (!cancellationToken.IsCancellationRequested)
{
if (!await ShouldRetryAfterTransientFailure())
{
throw;
}

continue;
}
catch (OperationCanceledException) when (isHalfOpenProbe && cancellationToken.IsCancellationRequested)
{
circuitBreaker.RecordTransientFailure(timeProvider, isHalfOpenProbe: true);
throw;
}
catch (Exception) when (isHalfOpenProbe)
Comment thread
marandaneto marked this conversation as resolved.
{
circuitBreaker.RecordTransientFailure(timeProvider, isHalfOpenProbe: true);
throw;
}
Comment thread
marandaneto marked this conversation as resolved.

// Response processing is outside the try-catch so that exceptions from
// EnsureSuccessfulApiCall (which may return HttpRequestException for 404s) won't
// be caught by the retry logic above.
using (response)
{
if (isHalfOpenProbe || response.IsSuccessStatusCode)
{
circuitBreaker.Close();
}

await response.EnsureSuccessfulApiCall(cancellationToken);

var result = await response.Content.ReadAsStreamAsync(cancellationToken);
Expand Down Expand Up @@ -418,4 +471,75 @@ public static async Task EnsureSuccessfulApiCall(

throw await CreateApiException(response, cancellationToken);
}
}
}

sealed class FeatureFlagRequestCircuitBreaker
{
const int FailureThreshold = 5;
static readonly TimeSpan OpenDuration = TimeSpan.FromSeconds(30);
Comment thread
marandaneto marked this conversation as resolved.

readonly object _lock = new();
State _state;
int _consecutiveFailures;
DateTimeOffset _openUntil;

public bool TryEnter(TimeProvider timeProvider, out bool isHalfOpenProbe)
{
lock (_lock)
{
isHalfOpenProbe = false;

if (_state == State.Open && timeProvider.GetUtcNow() < _openUntil)
{
return false;
}

if (_state == State.Open)
{
_state = State.HalfOpen;
isHalfOpenProbe = true;
return true;
}

return _state != State.HalfOpen;
}
}

public bool RecordTransientFailure(TimeProvider timeProvider, bool isHalfOpenProbe)
{
lock (_lock)
{
if (isHalfOpenProbe || ++_consecutiveFailures >= FailureThreshold)
{
Open(timeProvider);
return false;
}

return true;
}
}

public void Close()
{
lock (_lock)
{
_state = State.Closed;
_consecutiveFailures = 0;
_openUntil = default;
}
}

void Open(TimeProvider timeProvider)
{
_state = State.Open;
_consecutiveFailures = 0;
_openUntil = timeProvider.GetUtcNow() + OpenDuration;
}

enum State
{
Closed,
Open,
HalfOpen
}
}
Loading
Loading