2727import lombok .extern .slf4j .Slf4j ;
2828
2929/**
30- * Implements the {@link QueueSource} contract and emit flags obtained from flagd sync gRPC contract.
30+ * Implements the {@link QueueSource} contract and emit flags obtained from
31+ * flagd sync gRPC contract.
3132 */
3233@ Slf4j
3334@ SuppressFBWarnings (
3435 value = {"EI_EXPOSE_REP" },
35- justification = "Random is used to generate a variation & flag configurations require exposing " )
36+ justification = "We need to expose the BlockingQueue to allow consumers to read from it " )
3637public class SyncStreamQueueSource implements QueueSource {
3738 private static final int QUEUE_SIZE = 5 ;
3839
@@ -45,14 +46,32 @@ public class SyncStreamQueueSource implements QueueSource {
4546 private final String selector ;
4647 private final String providerId ;
4748 private final boolean syncMetadataDisabled ;
48- private final ChannelConnector channelConnector ;
49+ private final boolean reinitializeOnError ;
50+ private final FlagdOptions options ;
4951 private final BlockingQueue <QueuePayload > outgoingQueue = new LinkedBlockingQueue <>(QUEUE_SIZE );
50- private final FlagSyncServiceStub flagSyncStub ;
51- private final FlagSyncServiceBlockingStub metadataStub ;
5252 private final List <String > fatalStatusCodes ;
53+ private volatile GrpcComponents grpcComponents ;
5354
5455 /**
55- * Creates a new SyncStreamQueueSource responsible for observing the event stream.
56+ * Container for gRPC components to ensure atomicity during reinitialization.
57+ * All three components are updated together to prevent consumers from seeing
58+ * an inconsistent state where components are from different channel instances.
59+ */
60+ private static class GrpcComponents {
61+ final ChannelConnector channelConnector ;
62+ final FlagSyncServiceStub flagSyncStub ;
63+ final FlagSyncServiceBlockingStub metadataStub ;
64+
65+ GrpcComponents (ChannelConnector connector , FlagSyncServiceStub stub , FlagSyncServiceBlockingStub blockingStub ) {
66+ this .channelConnector = connector ;
67+ this .flagSyncStub = stub ;
68+ this .metadataStub = blockingStub ;
69+ }
70+ }
71+
72+ /**
73+ * Creates a new SyncStreamQueueSource responsible for observing the event
74+ * stream.
5675 */
5776 public SyncStreamQueueSource (final FlagdOptions options ) {
5877 streamDeadline = options .getStreamDeadlineMs ();
@@ -61,12 +80,10 @@ public SyncStreamQueueSource(final FlagdOptions options) {
6180 providerId = options .getProviderId ();
6281 maxBackoffMs = options .getRetryBackoffMaxMs ();
6382 syncMetadataDisabled = options .isSyncMetadataDisabled ();
64- channelConnector = new ChannelConnector (options , ChannelBuilder .nettyChannel (options ));
65- flagSyncStub =
66- FlagSyncServiceGrpc .newStub (channelConnector .getChannel ()).withWaitForReady ();
67- metadataStub = FlagSyncServiceGrpc .newBlockingStub (channelConnector .getChannel ())
68- .withWaitForReady ();
6983 fatalStatusCodes = options .getFatalStatusCodes ();
84+ reinitializeOnError = options .isReinitializeOnError ();
85+ this .options = options ;
86+ initializeChannelComponents ();
7087 }
7188
7289 // internal use only
@@ -79,12 +96,51 @@ protected SyncStreamQueueSource(
7996 deadline = options .getDeadline ();
8097 selector = options .getSelector ();
8198 providerId = options .getProviderId ();
82- channelConnector = connectorMock ;
8399 maxBackoffMs = options .getRetryBackoffMaxMs ();
84- flagSyncStub = stubMock ;
85100 syncMetadataDisabled = options .isSyncMetadataDisabled ();
86- metadataStub = blockingStubMock ;
87101 fatalStatusCodes = options .getFatalStatusCodes ();
102+ reinitializeOnError = options .isReinitializeOnError ();
103+ this .options = options ;
104+ this .grpcComponents = new GrpcComponents (connectorMock , stubMock , blockingStubMock );
105+ }
106+
107+ /** Initialize channel connector and stubs. */
108+ private synchronized void initializeChannelComponents () {
109+ ChannelConnector newConnector = new ChannelConnector (options , ChannelBuilder .nettyChannel (options ));
110+ FlagSyncServiceStub newFlagSyncStub =
111+ FlagSyncServiceGrpc .newStub (newConnector .getChannel ()).withWaitForReady ();
112+ FlagSyncServiceBlockingStub newMetadataStub =
113+ FlagSyncServiceGrpc .newBlockingStub (newConnector .getChannel ()).withWaitForReady ();
114+
115+ // atomic assignment of all components as a single unit
116+ grpcComponents = new GrpcComponents (newConnector , newFlagSyncStub , newMetadataStub );
117+ }
118+
119+ /** Reinitialize channel connector and stubs on error. */
120+ public synchronized void reinitializeChannelComponents () {
121+ if (!reinitializeOnError || shutdown .get ()) {
122+ return ;
123+ }
124+
125+ log .info ("Reinitializing channel gRPC components in attempt to restore stream." );
126+ GrpcComponents oldComponents = grpcComponents ;
127+
128+ try {
129+ // create new channel components first
130+ initializeChannelComponents ();
131+ } catch (Exception e ) {
132+ log .error ("Failed to reinitialize channel components" , e );
133+ return ;
134+ }
135+
136+ // shutdown old connector after successful reinitialization
137+ if (oldComponents != null && oldComponents .channelConnector != null ) {
138+ try {
139+ oldComponents .channelConnector .shutdown ();
140+ } catch (Exception e ) {
141+ log .debug ("Error shutting down old channel connector during reinitialization" , e );
142+ }
143+ }
88144 }
89145
90146 /** Initialize sync stream connector. */
@@ -111,7 +167,7 @@ public void shutdown() throws InterruptedException {
111167 log .debug ("Shutdown already in progress or completed" );
112168 return ;
113169 }
114- this .channelConnector .shutdown ();
170+ grpcComponents .channelConnector .shutdown ();
115171 }
116172
117173 /** Contains blocking calls, to be used concurrently. */
@@ -175,13 +231,14 @@ private void observeSyncStream() {
175231 log .info ("Shutdown invoked, exiting event stream listener" );
176232 }
177233
178- // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584
234+ // TODO: remove the metadata call entirely after
235+ // https://github.com/open-feature/flagd/issues/1584
179236 private Struct getMetadata () {
180237 if (syncMetadataDisabled ) {
181238 return null ;
182239 }
183240
184- FlagSyncServiceBlockingStub localStub = metadataStub ;
241+ FlagSyncServiceBlockingStub localStub = grpcComponents . metadataStub ;
185242
186243 if (deadline > 0 ) {
187244 localStub = localStub .withDeadlineAfter (deadline , TimeUnit .MILLISECONDS );
@@ -196,7 +253,8 @@ private Struct getMetadata() {
196253
197254 return null ;
198255 } catch (StatusRuntimeException e ) {
199- // In newer versions of flagd, metadata is part of the sync stream. If the method is unimplemented, we
256+ // In newer versions of flagd, metadata is part of the sync stream. If the
257+ // method is unimplemented, we
200258 // can ignore the error
201259 if (e .getStatus () != null
202260 && Status .Code .UNIMPLEMENTED .equals (e .getStatus ().getCode ())) {
@@ -208,7 +266,7 @@ private Struct getMetadata() {
208266 }
209267
210268 private void syncFlags (SyncStreamObserver streamObserver ) {
211- FlagSyncServiceStub localStub = flagSyncStub ; // don't mutate the stub
269+ FlagSyncServiceStub localStub = grpcComponents . flagSyncStub ; // don't mutate the stub
212270 if (streamDeadline > 0 ) {
213271 localStub = localStub .withDeadlineAfter (streamDeadline , TimeUnit .MILLISECONDS );
214272 }
0 commit comments