Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8b7f574
fix(flagd): no retry for certain error codes, implement test steps
leakonvalinka Nov 10, 2025
f0a1db2
attempt to handle fatal error
leakonvalinka Nov 14, 2025
654c8da
fix(flagd): update testbed + step, fix event
leakonvalinka Nov 24, 2025
07195a7
adjust rpc resolver
leakonvalinka Dec 12, 2025
ccf5120
Merge branch 'main' into fix/flagd-infinite-connection-retries
leakonvalinka Dec 15, 2025
e6d4057
fix e2e tests
leakonvalinka Dec 17, 2025
75392e6
Merge branch 'main' into fix/flagd-infinite-connection-retries
leakonvalinka Dec 17, 2025
95a880c
clean up
leakonvalinka Dec 17, 2025
45a9822
fatal only on first connection
leakonvalinka Dec 17, 2025
e50aa7f
remove exclusion of sync e2e test tag
leakonvalinka Dec 17, 2025
a636257
add shutdown after fatal, fix tests
leakonvalinka Dec 19, 2025
5794b1a
Merge branch 'main' into fix/flagd-infinite-connection-retries
leakonvalinka Dec 19, 2025
d27e4e9
remove shutdown
leakonvalinka Dec 19, 2025
94c7691
fix lint issues
leakonvalinka Dec 19, 2025
ee42405
fix spotless
leakonvalinka Dec 22, 2025
02539d0
feat(flagd): Communicate Fatal and shutdown connectors
guidobrei Dec 22, 2025
4e32125
fixup: update tests
toddbaert Dec 22, 2025
701069a
fixup: revert rpc test expectations
toddbaert Dec 22, 2025
bdc3e68
fixup: revert enum change
toddbaert Dec 22, 2025
a969488
fixup: test timeout
toddbaert Dec 22, 2025
e05539d
fixup: flaky test and init changedflags
toddbaert Jan 2, 2026
39b59ea
Merge branch 'main' into fix/flagd-infinite-connection-retries
toddbaert Jan 2, 2026
9397ec2
fixup: race condition with FATAL
toddbaert Jan 6, 2026
586b4e1
Merge branch 'main' into fix/flagd-infinite-connection-retries
toddbaert Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/** Helper class to hold configuration default values. */
Expand Down Expand Up @@ -36,6 +39,7 @@ public final class Config {
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
static final String FATAL_STATUS_CODES_ENV_VAR_NAME = "FLAGD_FATAL_STATUS_CODES";
/**
* Environment variable to fetch Provider id.
*
Expand Down Expand Up @@ -91,6 +95,16 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) {
}
}

static List<String> fallBackToEnvOrDefaultList(String key, List<String> defaultValue) {
try {
return System.getenv(key) != null ? Arrays.stream(System.getenv(key).split(","))
.map(String::trim)
.collect(Collectors.toList()) : defaultValue;
} catch (Exception e) {
return defaultValue;
}
}

static Resolver fromValueProvider(Function<String, String> provider) {
final String resolverVar = provider.apply(RESOLVER_ENV_VAR);
if (resolverVar == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd;

import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault;
import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefaultList;
import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider;

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
Expand Down Expand Up @@ -122,6 +123,14 @@ public class FlagdOptions {
@Builder.Default
private int retryGracePeriod =
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);

/**
* List of grpc response status codes for which failed connections are not retried.
* Defaults to empty list
*/
@Builder.Default
private List<String> fatalStatusCodes = fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of());

/**
* Selector to be used with flag sync gRPC contract.
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Hook;
Expand Down Expand Up @@ -222,20 +223,25 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
onReady();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY);
break;

case PROVIDER_ERROR:
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) {
onError();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR);
case PROVIDER_STALE:
if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) {
onStale();
syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE);
}
break;

case PROVIDER_ERROR:
onError();
break;
default:
log.warn("Unknown event {}", flagdProviderEvent.getEvent());
}
}
}

private void onError() {
this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build());
}

private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
Expand All @@ -255,7 +261,7 @@ private void onReady() {
ProviderEventDetails.builder().message("connected to flagd").build());
}

private void onError() {
private void onStale() {
log.debug(
"Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.",
gracePeriod);
Expand All @@ -270,7 +276,7 @@ private void onError() {
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) {
if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) {
log.error(
"Provider did not reconnect successfully within {}s. Emitting ERROR event...",
gracePeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public void init() throws Exception {
storageStateChange.getSyncMetadata()));
log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
break;
case STALE:
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE));
break;
case ERROR:
onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
}
break;
case ERROR:
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) {
log.warn("Failed to convey STALE status, queue is full");
}
break;
case FATAL:
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) {
log.warn("Failed to convey ERROR status, queue is full");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

/** Satus of the storage. */
/** Status of the storage. */
public enum StorageState {
/** Storage is upto date and working as expected. */
OK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
/** Payload type emitted by {@link QueueSource}. */
public enum QueuePayloadType {
DATA,
ERROR
ERROR,
FATAL
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -37,6 +38,7 @@ public class SyncStreamQueueSource implements QueueSource {

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
private final AtomicBoolean successfulSync = new AtomicBoolean(false);
private final int streamDeadline;
private final int deadline;
private final int maxBackoffMs;
Expand All @@ -47,6 +49,7 @@ public class SyncStreamQueueSource implements QueueSource {
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final FlagSyncServiceStub flagSyncStub;
private final FlagSyncServiceBlockingStub metadataStub;
private final List<String> fatalStatusCodes;

/**
* Creates a new SyncStreamQueueSource responsible for observing the event stream.
Expand All @@ -63,6 +66,7 @@ public SyncStreamQueueSource(final FlagdOptions options) {
FlagSyncServiceGrpc.newStub(channelConnector.getChannel()).withWaitForReady();
metadataStub = FlagSyncServiceGrpc.newBlockingStub(channelConnector.getChannel())
.withWaitForReady();
fatalStatusCodes = options.getFatalStatusCodes();
}

// internal use only
Expand All @@ -80,6 +84,7 @@ protected SyncStreamQueueSource(
flagSyncStub = stubMock;
syncMetadataDisabled = options.isSyncMetadataDisabled();
metadataStub = blockingStubMock;
fatalStatusCodes = options.getFatalStatusCodes();
}

/** Initialize sync stream connector. */
Expand Down Expand Up @@ -129,23 +134,35 @@ private void observeSyncStream() {
}

log.debug("Initializing sync stream request");
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle);
SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle, fatalStatusCodes);
try {
observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
} catch (StatusRuntimeException metaEx) {
if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) {
log.debug("Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode());
enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", metaEx.getStatus().getCode()));
} else {
// retry for other status codes
String message = metaEx.getMessage();
log.debug("Metadata request error: {}, will restart", message, metaEx);
enqueueError(String.format("Error in getMetadata request: %s", message));
}
shouldThrottle.set(true);
continue;
}

try {
syncFlags(observer);
} catch (Exception ex) {
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
successfulSync.set(true);
} catch (StatusRuntimeException ex) {
if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) {
log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode());
enqueueFatal(String.format("Fatal: Failed to connect for metadata request, not retrying for error %s", ex.getStatus().getCode()));
} else {
// retry for other status codes
log.error("Unexpected sync stream exception, will restart.", ex);
enqueueError(String.format("Error in syncStream: %s", ex.getMessage()));
}
shouldThrottle.set(true);
}
} catch (InterruptedException ie) {
Expand Down Expand Up @@ -212,22 +229,34 @@ private void enqueueError(String message) {
enqueueError(outgoingQueue, message);
}

private void enqueueFatal(String message) {
enqueueFatal(outgoingQueue, message);
}

private static void enqueueError(BlockingQueue<QueuePayload> queue, String message) {
if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, message, null))) {
log.error("Failed to convey ERROR status, queue is full");
}
}

private static void enqueueFatal(BlockingQueue<QueuePayload> queue, String message) {
if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) {
log.error("Failed to convey FATAL status, queue is full");
}
}

private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> {
private final BlockingQueue<QueuePayload> outgoingQueue;
private final AtomicBoolean shouldThrottle;
private final Awaitable done = new Awaitable();
private final List<String> fatalStatusCodes;

private Struct metadata;

public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle) {
public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle, List<String> fatalStatusCodes) {
this.outgoingQueue = outgoingQueue;
this.shouldThrottle = shouldThrottle;
this.fatalStatusCodes = fatalStatusCodes;
}

@Override
Expand All @@ -245,9 +274,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) {
@Override
public void onError(Throwable throwable) {
try {
Status status = Status.fromThrowable(throwable);
String message = throwable != null ? throwable.getMessage() : "unknown";
log.debug("Stream error: {}, will restart", message, throwable);
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
if (fatalStatusCodes.contains(status.getCode())) {
enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message));
} else {
enqueueError(outgoingQueue, String.format("Error from stream: %s", message));
}

// Set throttling flag to ensure backoff before retry
this.shouldThrottle.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
public final class RpcResolver implements Resolver {
private static final int QUEUE_SIZE = 5;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean successfulConnection = new AtomicBoolean(false);
private final ChannelConnector connector;
private final Cache cache;
private final ResolveStrategy strategy;
Expand All @@ -68,6 +69,7 @@ public final class RpcResolver implements Resolver {
private final Consumer<FlagdProviderEvent> onProviderEvent;
private final ServiceStub stub;
private final ServiceBlockingStub blockingStub;
private final List<String> fatalStatusCodes;

/**
* Resolves flag values using
Expand All @@ -89,6 +91,7 @@ public RpcResolver(
this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady();
this.blockingStub =
ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady();
this.fatalStatusCodes = options.getFatalStatusCodes();
}

// testing only
Expand All @@ -107,6 +110,7 @@ protected RpcResolver(
this.onProviderEvent = onProviderEvent;
this.stub = mockStub;
this.blockingStub = mockBlockingStub;
this.fatalStatusCodes = options.getFatalStatusCodes();
}

/**
Expand Down Expand Up @@ -348,13 +352,20 @@ private void observeEventStream() throws InterruptedException {

Throwable streamException = taken.getError();
if (streamException != null) {
log.debug(
"Exception in event stream connection, streamException {}, will reconnect",
streamException);
this.handleErrorOrComplete();
if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains(
((StatusRuntimeException) streamException).getStatus().getCode().name()) && !successfulConnection.get()) {
log.debug("Fatal error code received: {}", ((StatusRuntimeException) streamException).getStatus().getCode());
this.handleFatalError();
} else {
log.debug(
"Exception in event stream connection, streamException {}, will reconnect",
streamException);
this.handleErrorOrComplete();
}
break;
}

successfulConnection.set(true);
final EventStreamResponse response = taken.getResponse();
log.debug("Got stream response: {}", response);

Expand Down Expand Up @@ -410,9 +421,19 @@ private void handleProviderReadyEvent() {
* Handles provider error events by clearing the cache (if enabled) and notifying listeners of the error.
*/
private void handleErrorOrComplete() {
log.debug("Emitting provider error event");
log.debug("Emitting provider stale event");

// complete is an error, logically...even if the server went down gracefully we need to reconnect.
onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE));
}

/**
* Handles fatal error events (i.e. error codes defined in fatalStatusCodes) by transitioning the provider into
* fatal state
*/
private void handleFatalError() {
log.debug("Emitting provider error event");

onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR));
}
}
Loading
Loading