-
Notifications
You must be signed in to change notification settings - Fork 65
feat(flagd): introduce fatalStatusCodes option #1624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8b7f574
f0a1db2
654c8da
07195a7
ccf5120
e6d4057
75392e6
95a880c
45a9822
e50aa7f
a636257
5794b1a
d27e4e9
94c7691
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; | ||
| import dev.openfeature.contrib.providers.flagd.resolver.rpc.RpcResolver; | ||
| import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache; | ||
| import dev.openfeature.sdk.ErrorCode; | ||
| import dev.openfeature.sdk.EvaluationContext; | ||
| import dev.openfeature.sdk.EventProvider; | ||
| import dev.openfeature.sdk.Hook; | ||
|
|
@@ -222,20 +223,25 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { | |
| onReady(); | ||
| syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); | ||
| break; | ||
|
|
||
| case PROVIDER_ERROR: | ||
| if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) { | ||
| onError(); | ||
| syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR); | ||
| case PROVIDER_STALE: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update the javadoc above the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do once we agree on a final implementation |
||
| if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_STALE) { | ||
| onStale(); | ||
| syncResources.setPreviousEvent(ProviderEvent.PROVIDER_STALE); | ||
| } | ||
| break; | ||
|
|
||
| case PROVIDER_ERROR: | ||
| onError(); | ||
| break; | ||
| default: | ||
| log.warn("Unknown event {}", flagdProviderEvent.getEvent()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void onError() { | ||
| this.emitProviderError(ProviderEventDetails.builder().errorCode(ErrorCode.PROVIDER_FATAL).build()); | ||
| } | ||
|
|
||
| private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { | ||
| this.emitProviderConfigurationChanged(ProviderEventDetails.builder() | ||
| .flagsChanged(flagdProviderEvent.getFlagsChanged()) | ||
|
|
@@ -255,7 +261,7 @@ private void onReady() { | |
| ProviderEventDetails.builder().message("connected to flagd").build()); | ||
| } | ||
|
|
||
| private void onError() { | ||
| private void onStale() { | ||
| log.debug( | ||
| "Stream error. Emitting STALE, scheduling ERROR, and waiting {}s for connection to become available.", | ||
| gracePeriod); | ||
|
|
@@ -270,7 +276,7 @@ private void onError() { | |
| if (!errorExecutor.isShutdown()) { | ||
| errorTask = errorExecutor.schedule( | ||
| () -> { | ||
| if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) { | ||
| if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_STALE) { | ||
| log.error( | ||
| "Provider did not reconnect successfully within {}s. Emitting ERROR event...", | ||
| gracePeriod); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -138,6 +138,11 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc | |
| } | ||
| break; | ||
| case ERROR: | ||
| if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.STALE))) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we simply add a
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we could, but this would only solve the missuse issue in the communication step from |
||
| log.warn("Failed to convey STALE status, queue is full"); | ||
| } | ||
| break; | ||
| case FATAL: | ||
| if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.ERROR))) { | ||
| log.warn("Failed to convey ERROR status, queue is full"); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,5 +3,6 @@ | |
| /** Payload type emitted by {@link QueueSource}. */ | ||
| public enum QueuePayloadType { | ||
| DATA, | ||
| ERROR | ||
| ERROR, | ||
| FATAL | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| import io.grpc.Status; | ||
| import io.grpc.StatusRuntimeException; | ||
| import io.grpc.stub.StreamObserver; | ||
| import java.util.List; | ||
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -38,6 +39,7 @@ public class SyncStreamQueueSource implements QueueSource { | |
|
|
||
| private final AtomicBoolean shutdown = new AtomicBoolean(false); | ||
| private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); | ||
| private final AtomicBoolean successfulSync = new AtomicBoolean(false); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My biggest question with this whole concept (not your implementation) is whether or not we should care about whether this is the initial sync or not. I'm actually leaning towards "not"... here is my reasoning (anyone feel free to disagree):
WDYT?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's easy to get the same behaviour through event handlers, I think that might be better, because it allows for more customization. I get both sides, that one might not want to completely shut down if a valid flag config was previously received, but also that one might not want to work with stale data given that a non-transient error was received |
||
| private final int streamDeadline; | ||
| private final int deadline; | ||
| private final int maxBackoffMs; | ||
|
|
@@ -47,6 +49,7 @@ public class SyncStreamQueueSource implements QueueSource { | |
| private final boolean reinitializeOnError; | ||
| private final FlagdOptions options; | ||
| private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); | ||
| private final List<String> fatalStatusCodes; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we do lots of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's hard for me to estimate, what do the others think? The currently defined default is an empty list |
||
| private volatile GrpcComponents grpcComponents; | ||
|
|
||
| /** | ||
|
|
@@ -77,6 +80,7 @@ public SyncStreamQueueSource(final FlagdOptions options) { | |
| providerId = options.getProviderId(); | ||
| maxBackoffMs = options.getRetryBackoffMaxMs(); | ||
| syncMetadataDisabled = options.isSyncMetadataDisabled(); | ||
| fatalStatusCodes = options.getFatalStatusCodes(); | ||
| reinitializeOnError = options.isReinitializeOnError(); | ||
| this.options = options; | ||
| initializeChannelComponents(); | ||
|
|
@@ -94,6 +98,7 @@ protected SyncStreamQueueSource( | |
| providerId = options.getProviderId(); | ||
| maxBackoffMs = options.getRetryBackoffMaxMs(); | ||
| syncMetadataDisabled = options.isSyncMetadataDisabled(); | ||
| fatalStatusCodes = options.getFatalStatusCodes(); | ||
| reinitializeOnError = options.isReinitializeOnError(); | ||
| this.options = options; | ||
| this.grpcComponents = new GrpcComponents(connectorMock, stubMock, blockingStubMock); | ||
|
|
@@ -185,23 +190,42 @@ private void observeSyncStream() { | |
| } | ||
|
|
||
| log.debug("Initializing sync stream request"); | ||
| SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle); | ||
| SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle, fatalStatusCodes); | ||
| try { | ||
| observer.metadata = getMetadata(); | ||
| } catch (Exception metaEx) { | ||
| // retry if getMetadata fails | ||
| String message = metaEx.getMessage(); | ||
| log.debug("Metadata request error: {}, will restart", message, metaEx); | ||
| enqueueError(String.format("Error in getMetadata request: %s", message)); | ||
| } catch (StatusRuntimeException metaEx) { | ||
| if (fatalStatusCodes.contains(metaEx.getStatus().getCode().name()) && !successfulSync.get()) { | ||
| log.debug("Fatal status code for metadata request: {}, not retrying", | ||
| metaEx.getStatus().getCode()); | ||
| enqueueFatal(String.format( | ||
| "Fatal: Failed to connect for metadata request, not retrying for error %s", | ||
| metaEx.getStatus().getCode())); | ||
| return; | ||
| } else { | ||
| // retry for other status codes | ||
| String message = metaEx.getMessage(); | ||
| log.debug("Metadata request error: {}, will restart", message, metaEx); | ||
| enqueueError(String.format("Error in getMetadata request: %s", message)); | ||
| } | ||
| shouldThrottle.set(true); | ||
| continue; | ||
| } | ||
|
|
||
| try { | ||
| syncFlags(observer); | ||
| } catch (Exception ex) { | ||
| log.error("Unexpected sync stream exception, will restart.", ex); | ||
| enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); | ||
| successfulSync.set(true); | ||
| } catch (StatusRuntimeException ex) { | ||
| if (fatalStatusCodes.contains(ex.getStatus().getCode().toString()) && !successfulSync.get()) { | ||
| log.debug("Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); | ||
| enqueueFatal(String.format( | ||
| "Fatal: Failed to connect for metadata request, not retrying for error %s", | ||
| ex.getStatus().getCode())); | ||
| return; | ||
| } else { | ||
| // retry for other status codes | ||
| log.error("Unexpected sync stream exception, will restart.", ex); | ||
| enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); | ||
| } | ||
| shouldThrottle.set(true); | ||
| } | ||
| } catch (InterruptedException ie) { | ||
|
|
@@ -276,16 +300,29 @@ private static void enqueueError(BlockingQueue<QueuePayload> queue, String messa | |
| } | ||
| } | ||
|
|
||
| private void enqueueFatal(String message) { | ||
| enqueueFatal(outgoingQueue, message); | ||
| } | ||
|
|
||
| private static void enqueueFatal(BlockingQueue<QueuePayload> queue, String message) { | ||
| if (!queue.offer(new QueuePayload(QueuePayloadType.FATAL, message, null))) { | ||
| log.error("Failed to convey FATAL status, queue is full"); | ||
| } | ||
| } | ||
|
|
||
| private static class SyncStreamObserver implements StreamObserver<SyncFlagsResponse> { | ||
| private final BlockingQueue<QueuePayload> outgoingQueue; | ||
| private final AtomicBoolean shouldThrottle; | ||
| private final Awaitable done = new Awaitable(); | ||
| private final List<String> fatalStatusCodes; | ||
|
|
||
| private Struct metadata; | ||
|
|
||
| public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle) { | ||
| public SyncStreamObserver(BlockingQueue<QueuePayload> outgoingQueue, AtomicBoolean shouldThrottle, | ||
| List<String> fatalStatusCodes) { | ||
| this.outgoingQueue = outgoingQueue; | ||
| this.shouldThrottle = shouldThrottle; | ||
| this.fatalStatusCodes = fatalStatusCodes; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -303,9 +340,14 @@ public void onNext(SyncFlagsResponse syncFlagsResponse) { | |
| @Override | ||
| public void onError(Throwable throwable) { | ||
| try { | ||
| Status status = Status.fromThrowable(throwable); | ||
| String message = throwable != null ? throwable.getMessage() : "unknown"; | ||
| log.debug("Stream error: {}, will restart", message, throwable); | ||
| enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); | ||
| if (fatalStatusCodes.contains(status.getCode().name())) { | ||
| enqueueFatal(outgoingQueue, String.format("Error from stream: %s", message)); | ||
| } else { | ||
| enqueueError(outgoingQueue, String.format("Error from stream: %s", message)); | ||
| } | ||
|
|
||
| // Set throttling flag to ensure backoff before retry | ||
| this.shouldThrottle.set(true); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should print an info/warn that the env vars are invalid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for this method? Or the other ones too? I'd either leave it or add it in all cases to be consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we should add it everywhere, but in a different PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, sounds good. Should we create a new issue for this or is that overkill?