diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java index 1a3e1d352..3adc9fbdb 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java @@ -1,7 +1,10 @@ package dev.openfeature.contrib.providers.flagd; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType; +import java.util.Arrays; +import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; /** Helper class to hold configuration default values. */ @@ -37,6 +40,7 @@ public final class Config { static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS"; static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS"; static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR"; + static final String FATAL_STATUS_CODES_ENV_VAR_NAME = "FLAGD_FATAL_STATUS_CODES"; /** * Environment variable to fetch Provider id. * @@ -93,6 +97,16 @@ static long fallBackToEnvOrDefault(String key, long defaultValue) { } } + static List fallBackToEnvOrDefaultList(String key, List defaultValue) { + try { + return System.getenv(key) != null ? Arrays.stream(System.getenv(key).split(",")) + .map(String::trim) + .collect(Collectors.toList()) : defaultValue; + } catch (Exception e) { + return defaultValue; + } + } + static Resolver fromValueProvider(Function provider) { final String resolverVar = provider.apply(RESOLVER_ENV_VAR); if (resolverVar == null) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index 4cda34df4..79ac22a86 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -1,6 +1,7 @@ package dev.openfeature.contrib.providers.flagd; import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault; +import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefaultList; import static dev.openfeature.contrib.providers.flagd.Config.fromValueProvider; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; @@ -122,6 +123,15 @@ public class FlagdOptions { @Builder.Default private int retryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD); + + /** + * List of grpc response status codes for which the provider transitions into fatal state upon first connection. + * Defaults to empty list + */ + @Builder.Default + private List fatalStatusCodes = + fallBackToEnvOrDefaultList(Config.FATAL_STATUS_CODES_ENV_VAR_NAME, List.of()); + /** * Selector to be used with flag sync gRPC contract. **/ diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index caf864175..4bd250377 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -5,6 +5,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache; +import dev.openfeature.sdk.ErrorCode; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; import dev.openfeature.sdk.Hook; @@ -222,20 +223,25 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { onReady(); syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); break; - - case PROVIDER_ERROR: - if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) { - onError(); - syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR); + case PROVIDER_STALE: + if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) { + onStale(); + syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE); } break; - + case PROVIDER_ERROR: + onError(); + break; default: log.warn("Unknown event {}", flagdProviderEvent.getEvent()); } } } + private void onError() { + this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); + } + private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { this.emitProviderConfigurationChanged(ProviderEventDetails.builder() .flagsChanged(flagdProviderEvent.getFlagsChanged()) @@ -255,7 +261,7 @@ private void onReady() { ProviderEventDetails.builder().message("connected to flagd").build()); } - private void onError() { + private void onStale() { log.debug( "Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.", gracePeriod); @@ -270,7 +276,7 @@ private void onError() { if (!errorExecutor.isShutdown()) { errorTask = errorExecutor.schedule( () -> { - if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) { + if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) { log.error( "Provider did not reconnect successfully within {}s. Emitting ERROR event...", gracePeriod); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index c898aef3a..9eb84b0a4 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -79,6 +79,9 @@ public void init() throws Exception { storageStateChange.getSyncMetadata())); log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; + case STALE: + onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + break; case ERROR: onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); break; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index eaa3dfa5f..a01f93c23 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc } break; case ERROR: + if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { + log.warn("Failed to convey STALE status, queue is full"); + } + break; + case FATAL: if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { log.warn("Failed to convey ERROR status, queue is full"); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java index c47670b7d..d6b8b30c5 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageState.java @@ -1,6 +1,6 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage; -/** Satus of the storage. */ +/** Status of the storage. */ public enum StorageState { /** Storage is upto date and working as expected. */ OK, diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java index 93675fb60..74e02912e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java @@ -3,5 +3,6 @@ /** Payload type emitted by {@link QueueSource}. */ public enum QueuePayloadType { DATA, - ERROR + ERROR, + FATAL } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 3c1058566..3522a18b7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -19,6 +19,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -38,6 +39,7 @@ public class SyncStreamQueueSource implements QueueSource { private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); + private final AtomicBoolean successfulSync = new AtomicBoolean(false); private final int streamDeadline; private final int deadline; private final int maxBackoffMs; @@ -47,6 +49,7 @@ public class SyncStreamQueueSource implements QueueSource { private final boolean reinitializeOnError; private final FlagdOptions options; private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final List fatalStatusCodes; private volatile GrpcComponents grpcComponents; /** @@ -77,6 +80,7 @@ public SyncStreamQueueSource(final FlagdOptions options) { providerId = options.getProviderId(); maxBackoffMs = options.getRetryBackoffMaxMs(); syncMetadataDisabled = options.isSyncMetadataDisabled(); + fatalStatusCodes = options.getFatalStatusCodes(); reinitializeOnError = options.isReinitializeOnError(); this.options = options; initializeChannelComponents(); @@ -94,6 +98,7 @@ protected SyncStreamQueueSource( providerId = options.getProviderId(); maxBackoffMs = options.getRetryBackoffMaxMs(); syncMetadataDisabled = options.isSyncMetadataDisabled(); + fatalStatusCodes = options.getFatalStatusCodes(); reinitializeOnError = options.isReinitializeOnError(); this.options = options; this.grpcComponents = new GrpcComponents(connectorMock, stubMock, blockingStubMock); @@ -185,23 +190,42 @@ private void observeSyncStream() { } log.debug("Initializing sync stream request"); - SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle); + SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle, fatalStatusCodes); try { observer.metadata = getMetadata(); - } catch (Exception metaEx) { - // retry if getMetadata fails - String message = metaEx.getMessage(); - log.debug("Metadata request error: {}, will restart", message, metaEx); - enqueueError(String.format("Error in getMetadata request: %s", message)); + } catch (StatusRuntimeException metaEx) { + if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { + log.debug("Fatal status code for metadata request: {}, not retrying", + metaEx.getStatus().getCode()); + enqueueFatal(String.format( + "Fatal: Failed to connect for metadata request, not retrying for error %s", + metaEx.getStatus().getCode())); + return; + } else { + // retry for other status codes + String message = metaEx.getMessage(); + log.debug("Metadata request error: {}, will restart", message, metaEx); + enqueueError(String.format("Error in getMetadata request: %s", message)); + } shouldThrottle.set(true); continue; } try { syncFlags(observer); - } catch (Exception ex) { - log.error("Unexpected sync stream exception, will restart.", ex); - enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + successfulSync.set(true); + } catch (StatusRuntimeException ex) { + if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { + log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); + enqueueFatal(String.format( + "Fatal: Failed to connect for metadata request, not retrying for error %s", + ex.getStatus().getCode())); + return; + } else { + // retry for other status codes + log.error("Unexpected sync stream exception, will restart.", ex); + enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + } shouldThrottle.set(true); } } catch (InterruptedException ie) { @@ -276,16 +300,29 @@ private static void enqueueError(BlockingQueue queue, String messa } } + private void enqueueFatal(String message) { + enqueueFatal(outgoingQueue, message); + } + + private static void enqueueFatal(BlockingQueue queue, String message) { + if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) { + log.error("Failed to convey FATAL status, queue is full"); + } + } + private static class SyncStreamObserver implements StreamObserver { private final BlockingQueue outgoingQueue; private final AtomicBoolean shouldThrottle; private final Awaitable done = new Awaitable(); + private final List fatalStatusCodes; private Struct metadata; - public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle) { + public SyncStreamObserver(BlockingQueue outgoingQueue, AtomicBoolean shouldThrottle, + List fatalStatusCodes) { this.outgoingQueue = outgoingQueue; this.shouldThrottle = shouldThrottle; + this.fatalStatusCodes = fatalStatusCodes; } @Override @@ -303,9 +340,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) { @Override public void onError(Throwable throwable) { try { + Status status = Status.fromThrowable(throwable); String message = throwable != null ? throwable.getMessage() : "unknown"; log.debug("Stream error: {}, will restart", message, throwable); - enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + if (fatalStatusCodes.contains(status.getCode().name())) { + enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message)); + } else { + enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); + } // Set throttling flag to ensure backoff before retry this.shouldThrottle.set(true); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java index afb06120b..15b5c8e3d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java @@ -60,6 +60,7 @@ public final class RpcResolver implements Resolver { private static final int QUEUE_SIZE = 5; private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean successfulConnection = new AtomicBoolean(false); private final ChannelConnector connector; private final Cache cache; private final ResolveStrategy strategy; @@ -68,6 +69,7 @@ public final class RpcResolver implements Resolver { private final Consumer onProviderEvent; private final ServiceStub stub; private final ServiceBlockingStub blockingStub; + private final List fatalStatusCodes; /** * Resolves flag values using @@ -89,6 +91,7 @@ public RpcResolver( this.stub = ServiceGrpc.newStub(this.connector.getChannel()).withWaitForReady(); this.blockingStub = ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady(); + this.fatalStatusCodes = options.getFatalStatusCodes(); } // testing only @@ -107,6 +110,7 @@ protected RpcResolver( this.onProviderEvent = onProviderEvent; this.stub = mockStub; this.blockingStub = mockBlockingStub; + this.fatalStatusCodes = options.getFatalStatusCodes(); } /** @@ -348,13 +352,22 @@ private void observeEventStream() throws InterruptedException { Throwable streamException = taken.getError(); if (streamException != null) { - log.debug( - "Exception in event stream connection, streamException {}, will reconnect", - streamException); - this.handleErrorOrComplete(); + if (streamException instanceof StatusRuntimeException && fatalStatusCodes.contains( + ((StatusRuntimeException) streamException).getStatus().getCode().name()) + && !successfulConnection.get()) { + log.debug("Fatal error code received: {}", + ((StatusRuntimeException) streamException).getStatus().getCode()); + this.handleFatalError(); + } else { + log.debug( + "Exception in event stream connection, streamException {}, will reconnect", + streamException); + this.handleErrorOrComplete(); + } break; } + successfulConnection.set(true); final EventStreamResponse response = taken.getResponse(); log.debug("Got stream response: {}", response); @@ -410,9 +423,19 @@ private void handleProviderReadyEvent() { * Handles provider error events by clearing the cache (if enabled) and notifying listeners of the error. */ private void handleErrorOrComplete() { - log.debug("Emitting provider error event"); + log.debug("Emitting provider stale event"); // complete is an error, logically...even if the server went down gracefully we need to reconnect. + onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_STALE)); + } + + /** + * Handles fatal error events (i.e. error codes defined in fatalStatusCodes) by transitioning the provider into + * fatal state + */ + private void handleFatalError() { + log.debug("Emitting provider error event"); + onProviderEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index 27806f955..90d082292 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -1,6 +1,7 @@ package dev.openfeature.contrib.providers.flagd.e2e.steps; import static io.restassured.RestAssured.when; +import static org.assertj.core.api.Assertions.assertThat; import dev.openfeature.contrib.providers.flagd.Config; import dev.openfeature.contrib.providers.flagd.FlagdOptions; @@ -9,10 +10,12 @@ import dev.openfeature.contrib.providers.flagd.e2e.State; import dev.openfeature.sdk.FeatureProvider; import dev.openfeature.sdk.OpenFeatureAPI; +import dev.openfeature.sdk.ProviderState; import io.cucumber.java.After; import io.cucumber.java.AfterAll; import io.cucumber.java.BeforeAll; import io.cucumber.java.en.Given; +import io.cucumber.java.en.Then; import io.cucumber.java.en.When; import java.io.File; import java.io.IOException; @@ -31,6 +34,7 @@ public class ProviderSteps extends AbstractSteps { public static final int UNAVAILABLE_PORT = 9999; + public static final int FORBIDDEN_PORT = 9212; static ComposeContainer container; static Path sharedTempDir; @@ -49,6 +53,7 @@ public static void beforeAll() throws IOException { .withExposedService("flagd", 8015, Wait.forListeningPort()) .withExposedService("flagd", 8080, Wait.forListeningPort()) .withExposedService("envoy", 9211, Wait.forListeningPort()) + .withExposedService("envoy", FORBIDDEN_PORT, Wait.forListeningPort()) .withStartupTimeout(Duration.ofSeconds(45)); container.start(); } @@ -85,6 +90,10 @@ public void setupProvider(String providerType) throws InterruptedException { } wait = false; break; + case "forbidden": + state.builder.port(container.getServicePort("envoy", FORBIDDEN_PORT)); + wait = false; + break; case "socket": this.state.providerType = ProviderType.SOCKET; String socketPath = @@ -188,4 +197,9 @@ public void the_flag_was_modded() { .then() .statusCode(200); } + + @Then("the client should be in {} state") + public void the_client_should_be_in_fatal_state(String clientState) { + assertThat(state.client.getProviderState()).isEqualTo(ProviderState.valueOf(clientState.toUpperCase())); + } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java index 7dca50533..a89f8560e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/Utils.java @@ -4,7 +4,10 @@ import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.CacheType; import dev.openfeature.sdk.Value; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; public final class Utils { @@ -37,6 +40,9 @@ public static Object convert(String value, String type) throws ClassNotFoundExce } case "CacheType": return CacheType.valueOf(value.toUpperCase()).getValue(); + case "StringList": + return value.isEmpty() ? List.of() : Arrays.stream(value.split(",")).map(String::trim).collect( + Collectors.toList()); case "Object": return Value.objectToValue(new ObjectMapper().readValue(value, Object.class)); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index 86ca298e3..e58b4eb3f 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -64,14 +64,14 @@ void connectorHandling() throws Exception { }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.ERROR, states.take().getStorageState()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); // Shutdown handling store.shutdown(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - assertEquals(StorageState.ERROR, states.take().getStorageState()); + assertEquals(StorageState.STALE, states.take().getStorageState()); }); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java index 119f9e2e6..0ed2b8e3c 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java @@ -111,7 +111,7 @@ void onCompletedRerunsStreamWithError() throws Exception { // should run consumer with error await().untilAsserted(() -> - verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_ERROR))); + verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_STALE))); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } @@ -128,7 +128,7 @@ void onErrorRunsConsumerWithError() throws Exception { // should run consumer with error await().untilAsserted(() -> - verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_ERROR))); + verify(consumer).accept(argThat((arg) -> arg.getEvent() == ProviderEvent.PROVIDER_STALE))); // should have restarted the stream (2 calls) await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any())); } diff --git a/providers/flagd/test-harness b/providers/flagd/test-harness index b62f5dbe8..9b73b3a95 160000 --- a/providers/flagd/test-harness +++ b/providers/flagd/test-harness @@ -1 +1 @@ -Subproject commit b62f5dbe860ecf4f36ec757dfdc0b38f7b3dec6e +Subproject commit 9b73b3a95cd9e0885937d244b118713b26374b1d