2626import lombok .extern .slf4j .Slf4j ;
2727
2828/**
29- * Implements the {@link QueueSource} contract and emit flags obtained from flagd sync gRPC contract.
29+ * Implements the {@link QueueSource} contract and emit flags obtained from
30+ * flagd sync gRPC contract.
3031 */
3132@ Slf4j
3233@ SuppressFBWarnings (
3334 value = {"EI_EXPOSE_REP" },
34- justification = "Random is used to generate a variation & flag configurations require exposing " )
35+ justification = "We need to expose the BlockingQueue to allow consumers to read from it " )
3536public class SyncStreamQueueSource implements QueueSource {
3637 private static final int QUEUE_SIZE = 5 ;
3738
@@ -43,13 +44,31 @@ public class SyncStreamQueueSource implements QueueSource {
4344 private final String selector ;
4445 private final String providerId ;
4546 private final boolean syncMetadataDisabled ;
46- private final ChannelConnector channelConnector ;
47+ private final boolean reinitializeOnError ;
48+ private final FlagdOptions options ;
4749 private final BlockingQueue <QueuePayload > outgoingQueue = new LinkedBlockingQueue <>(QUEUE_SIZE );
48- private final FlagSyncServiceStub flagSyncStub ;
49- private final FlagSyncServiceBlockingStub metadataStub ;
50+ private volatile GrpcComponents grpcComponents ;
5051
5152 /**
52- * Creates a new SyncStreamQueueSource responsible for observing the event stream.
53+ * Container for gRPC components to ensure atomicity during reinitialization.
54+ * All three components are updated together to prevent consumers from seeing
55+ * an inconsistent state where components are from different channel instances.
56+ */
57+ private static class GrpcComponents {
58+ final ChannelConnector channelConnector ;
59+ final FlagSyncServiceStub flagSyncStub ;
60+ final FlagSyncServiceBlockingStub metadataStub ;
61+
62+ GrpcComponents (ChannelConnector connector , FlagSyncServiceStub stub , FlagSyncServiceBlockingStub blockingStub ) {
63+ this .channelConnector = connector ;
64+ this .flagSyncStub = stub ;
65+ this .metadataStub = blockingStub ;
66+ }
67+ }
68+
69+ /**
70+ * Creates a new SyncStreamQueueSource responsible for observing the event
71+ * stream.
5372 */
5473 public SyncStreamQueueSource (final FlagdOptions options ) {
5574 streamDeadline = options .getStreamDeadlineMs ();
@@ -58,11 +77,9 @@ public SyncStreamQueueSource(final FlagdOptions options) {
5877 providerId = options .getProviderId ();
5978 maxBackoffMs = options .getRetryBackoffMaxMs ();
6079 syncMetadataDisabled = options .isSyncMetadataDisabled ();
61- channelConnector = new ChannelConnector (options , ChannelBuilder .nettyChannel (options ));
62- flagSyncStub =
63- FlagSyncServiceGrpc .newStub (channelConnector .getChannel ()).withWaitForReady ();
64- metadataStub = FlagSyncServiceGrpc .newBlockingStub (channelConnector .getChannel ())
65- .withWaitForReady ();
80+ reinitializeOnError = options .isReinitializeOnError ();
81+ this .options = options ;
82+ initializeChannelComponents ();
6683 }
6784
6885 // internal use only
@@ -75,11 +92,50 @@ protected SyncStreamQueueSource(
7592 deadline = options .getDeadline ();
7693 selector = options .getSelector ();
7794 providerId = options .getProviderId ();
78- channelConnector = connectorMock ;
7995 maxBackoffMs = options .getRetryBackoffMaxMs ();
80- flagSyncStub = stubMock ;
8196 syncMetadataDisabled = options .isSyncMetadataDisabled ();
82- metadataStub = blockingStubMock ;
97+ reinitializeOnError = options .isReinitializeOnError ();
98+ this .options = options ;
99+ this .grpcComponents = new GrpcComponents (connectorMock , stubMock , blockingStubMock );
100+ }
101+
102+ /** Initialize channel connector and stubs. */
103+ private synchronized void initializeChannelComponents () {
104+ ChannelConnector newConnector = new ChannelConnector (options , ChannelBuilder .nettyChannel (options ));
105+ FlagSyncServiceStub newFlagSyncStub =
106+ FlagSyncServiceGrpc .newStub (newConnector .getChannel ()).withWaitForReady ();
107+ FlagSyncServiceBlockingStub newMetadataStub =
108+ FlagSyncServiceGrpc .newBlockingStub (newConnector .getChannel ()).withWaitForReady ();
109+
110+ // atomic assignment of all components as a single unit
111+ grpcComponents = new GrpcComponents (newConnector , newFlagSyncStub , newMetadataStub );
112+ }
113+
114+ /** Reinitialize channel connector and stubs on error. */
115+ public synchronized void reinitializeChannelComponents () {
116+ if (!reinitializeOnError || shutdown .get ()) {
117+ return ;
118+ }
119+
120+ log .info ("Reinitializing channel gRPC components in attempt to restore stream." );
121+ GrpcComponents oldComponents = grpcComponents ;
122+
123+ try {
124+ // create new channel components first
125+ initializeChannelComponents ();
126+ } catch (Exception e ) {
127+ log .error ("Failed to reinitialize channel components" , e );
128+ return ;
129+ }
130+
131+ // shutdown old connector after successful reinitialization
132+ if (oldComponents != null && oldComponents .channelConnector != null ) {
133+ try {
134+ oldComponents .channelConnector .shutdown ();
135+ } catch (Exception e ) {
136+ log .debug ("Error shutting down old channel connector during reinitialization" , e );
137+ }
138+ }
83139 }
84140
85141 /** Initialize sync stream connector. */
@@ -106,7 +162,7 @@ public void shutdown() throws InterruptedException {
106162 log .debug ("Shutdown already in progress or completed" );
107163 return ;
108164 }
109- this .channelConnector .shutdown ();
165+ grpcComponents .channelConnector .shutdown ();
110166 }
111167
112168 /** Contains blocking calls, to be used concurrently. */
@@ -156,13 +212,14 @@ private void observeSyncStream() {
156212 log .info ("Shutdown invoked, exiting event stream listener" );
157213 }
158214
159- // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
215+ // TODO: remove the metadata call entirely after
216+ // https://github.com/open-feature/flagd/issues/1584
160217 private Struct getMetadata () {
161218 if (syncMetadataDisabled ) {
162219 return null ;
163220 }
164221
165- FlagSyncServiceBlockingStub localStub = metadataStub ;
222+ FlagSyncServiceBlockingStub localStub = grpcComponents . metadataStub ;
166223
167224 if (deadline > 0 ) {
168225 localStub = localStub .withDeadlineAfter (deadline , TimeUnit .MILLISECONDS );
@@ -177,7 +234,8 @@ private Struct getMetadata() {
177234
178235 return null ;
179236 } catch (StatusRuntimeException e ) {
180- // In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we
237+ // In newer versions of flagd, metadata is part of the sync stream. If the
238+ // method is unimplemented, we
181239 // can ignore the error
182240 if (e .getStatus () != null
183241 && Status .Code .UNIMPLEMENTED .equals (e .getStatus ().getCode ())) {
@@ -189,7 +247,7 @@ private Struct getMetadata() {
189247 }
190248
191249 private void syncFlags (SyncStreamObserver streamObserver ) {
192- FlagSyncServiceStub localStub = flagSyncStub ; // don't mutate the stub
250+ FlagSyncServiceStub localStub = grpcComponents . flagSyncStub ; // don't mutate the stub
193251 if (streamDeadline > 0 ) {
194252 localStub = localStub .withDeadlineAfter (streamDeadline , TimeUnit .MILLISECONDS );
195253 }
0 commit comments