Skip to content

Commit 3c7f877

Browse files
committed
added streaming token refresh config
1 parent 0b28f41 commit 3c7f877

File tree

11 files changed

+64
-21
lines changed

11 files changed

+64
-21
lines changed

client/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.18.1-rc2</version>
8+
<version>4.18.1-rc3</version>
99
</parent>
10-
<version>4.18.1-rc2</version>
10+
<version>4.18.1-rc3</version>
1111
<artifactId>java-client</artifactId>
1212
<packaging>jar</packaging>
1313
<name>Java Client</name>

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ private HttpScheme() {
112112
private final CustomHeaderDecorator _customHeaderDecorator;
113113
private final CustomHttpModule _alternativeHTTPModule;
114114

115+
private final int _streamingTokenRefreshRate;
116+
115117
public static Builder builder() {
116118
return new Builder();
117119
}
@@ -170,7 +172,8 @@ private SplitClientConfig(String endpoint,
170172
int invalidSets,
171173
CustomHeaderDecorator customHeaderDecorator,
172174
CustomHttpModule alternativeHTTPModule,
173-
FallbackTreatmentsConfiguration fallbackTreatments) {
175+
FallbackTreatmentsConfiguration fallbackTreatments,
176+
int streamingTokenRefreshRate) {
174177
_endpoint = endpoint;
175178
_eventsEndpoint = eventsEndpoint;
176179
_featuresRefreshRate = pollForFeatureChangesEveryNSeconds;
@@ -226,6 +229,7 @@ private SplitClientConfig(String endpoint,
226229
_customHeaderDecorator = customHeaderDecorator;
227230
_alternativeHTTPModule = alternativeHTTPModule;
228231
_fallbackTreatments = fallbackTreatments;
232+
_streamingTokenRefreshRate = streamingTokenRefreshRate;
229233

230234
Properties props = new Properties();
231235
try {
@@ -446,6 +450,8 @@ public boolean isSdkEndpointOverridden() {
446450

447451
public FallbackTreatmentsConfiguration fallbackTreatments() { return _fallbackTreatments; }
448452

453+
public int streamingTokenRefreshRate() { return _streamingTokenRefreshRate; }
454+
449455
public static final class Builder {
450456
private String _endpoint = SDK_ENDPOINT;
451457
private boolean _endpointSet = false;
@@ -505,6 +511,7 @@ public static final class Builder {
505511
private CustomHeaderDecorator _customHeaderDecorator = null;
506512
private CustomHttpModule _alternativeHTTPModule = null;
507513
private FallbackTreatmentsConfiguration _fallbackTreatments;
514+
private int _streamingTokenRefreshRate = 180;
508515

509516
public Builder() {
510517
}
@@ -1055,6 +1062,11 @@ public Builder threadFactory(ThreadFactory threadFactory) {
10551062
return this;
10561063
}
10571064

1065+
public Builder streamingTokenRefreshRate(int streamingTokenRefreshRate) {
1066+
_streamingTokenRefreshRate = streamingTokenRefreshRate;
1067+
return this;
1068+
}
1069+
10581070
private void verifyRates() {
10591071
if (_featuresRefreshRate < 5 ) {
10601072
throw new IllegalArgumentException("featuresRefreshRate must be >= 5: " + _featuresRefreshRate);
@@ -1071,9 +1083,14 @@ private void verifyRates() {
10711083
if (_metricsRefreshRate < 30) {
10721084
throw new IllegalArgumentException("metricsRefreshRate must be >= 30: " + _metricsRefreshRate);
10731085
}
1086+
10741087
if(_telemetryRefreshRate < 60) {
10751088
throw new IllegalStateException("_telemetryRefreshRate must be >= 60");
10761089
}
1090+
1091+
if (_streamingTokenRefreshRate < 60) {
1092+
throw new IllegalStateException("_streamingTokenRefreshRate must be >= 60");
1093+
}
10771094
}
10781095

10791096
private void verifyEndPoints() {
@@ -1274,7 +1291,8 @@ public SplitClientConfig build() {
12741291
_invalidSetsCount,
12751292
_customHeaderDecorator,
12761293
_alternativeHTTPModule,
1277-
_fallbackTreatments);
1294+
_fallbackTreatments,
1295+
_streamingTokenRefreshRate);
12781296
}
12791297
}
12801298
}

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class PushManagerImp implements PushManager {
5252
private final ScheduledExecutorService _scheduledExecutorService;
5353
private AtomicLong _expirationTime;
5454
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
55+
private final int _streamingTokenRefreshRate;
5556

5657
@VisibleForTesting
5758
/* package private */ PushManagerImp(AuthApiClient authApiClient,
@@ -60,7 +61,8 @@ public class PushManagerImp implements PushManager {
6061
Worker<SegmentQueueDto> segmentWorker,
6162
PushStatusTracker pushStatusTracker,
6263
TelemetryRuntimeProducer telemetryRuntimeProducer,
63-
ThreadFactory threadFactory) {
64+
ThreadFactory threadFactory,
65+
int streamingTokenRefreshRate) {
6466

6567
_authApiClient = checkNotNull(authApiClient);
6668
_eventSourceClient = checkNotNull(eventSourceClient);
@@ -70,6 +72,7 @@ public class PushManagerImp implements PushManager {
7072
_expirationTime = new AtomicLong();
7173
_scheduledExecutorService = buildSingleThreadScheduledExecutor(threadFactory, "Split-SSERefreshToken-%d");
7274
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
75+
_streamingTokenRefreshRate = streamingTokenRefreshRate;
7376
}
7477

7578
public static PushManagerImp build(Synchronizer synchronizer,
@@ -83,7 +86,8 @@ public static PushManagerImp build(Synchronizer synchronizer,
8386
SplitCacheProducer splitCacheProducer,
8487
FlagSetsFilter flagSetsFilter,
8588
RuleBasedSegmentCache ruleBasedSegmentCache,
86-
RuleBasedSegmentParser ruleBasedSegmentParser) {
89+
RuleBasedSegmentParser ruleBasedSegmentParser,
90+
int streamingTokenRefreshRate) {
8791
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
8892
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
8993
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
@@ -96,7 +100,8 @@ public static PushManagerImp build(Synchronizer synchronizer,
96100
segmentWorker,
97101
pushStatusTracker,
98102
telemetryRuntimeProducer,
99-
threadFactory);
103+
threadFactory,
104+
streamingTokenRefreshRate);
100105
}
101106

102107
@Override
@@ -106,18 +111,22 @@ public void start() {
106111
AuthenticationResponse response = _authApiClient.Authenticate();
107112
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
108113
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
109-
_expirationTime.set(response.getExpiration());
114+
_expirationTime.set(_streamingTokenRefreshRate);
110115
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
111116
response.getExpiration(), System.currentTimeMillis()));
112117
return;
113118
}
114119

115120
cleanUpResources();
116121
if (response.isRetry()) {
122+
_log.debug(String.format("Handling retry error response"));
117123
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
118124
} else {
125+
_log.debug(String.format("Auth service response is disabled: %s", response.getToken()));
119126
_pushStatusTracker.forcePushDisable();
120127
}
128+
} catch (Exception e) {
129+
_log.debug("Exception in PushManager start: " + e.getMessage());
121130
} finally {
122131
lock.unlock();
123132
}
@@ -156,14 +165,22 @@ private boolean startSse(String token, String channels) {
156165

157166
@Override
158167
public void startWorkers() {
159-
_featureFlagsWorker.start();
160-
_segmentWorker.start();
168+
try {
169+
_featureFlagsWorker.start();
170+
_segmentWorker.start();
171+
} catch (Exception e) {
172+
_log.debug("Exception in starting workers: " + e.getMessage());
173+
}
161174
}
162175

163176
@Override
164177
public void stopWorkers() {
165-
_featureFlagsWorker.stop();
166-
_segmentWorker.stop();
178+
try {
179+
_featureFlagsWorker.stop();
180+
_segmentWorker.stop();
181+
} catch (Exception e) {
182+
_log.debug("Exception in stopping workers: " + e.getMessage());
183+
}
167184
}
168185

169186
private void cleanUpResources() {

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public static SyncManagerImp build(SplitTasks splitTasks,
116116
splitCacheProducer,
117117
flagSetsFilter,
118118
ruleBasedSegmentCache,
119-
ruleBasedSegmentParser);
119+
ruleBasedSegmentParser,
120+
config.streamingTokenRefreshRate());
120121

121122
return new SyncManagerImp(splitTasks,
122123
config.streamingEnabled(),

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public boolean isOpen() {
115115
}
116116

117117
public void close() {
118+
_log.debug("closing SSE client");
118119
try {
119120
lock.lock();
120121
_forcedStop.set(true);
@@ -128,6 +129,8 @@ public void close() {
128129
}
129130
}
130131
}
132+
} catch (Exception e) {
133+
_log.debug("Exception in closing SSE client: " + e.getMessage());
131134
} finally {
132135
lock.unlock();
133136
}
@@ -184,16 +187,19 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
184187
}
185188
}
186189
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether
190+
_log.debug(String.format("SSE connection exception: %s", e.getMessage()));
187191
_telemetryRuntimeProducer
188192
.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
189193
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(),
190194
System.currentTimeMillis()));
191195
_log.warn(e.getMessage(), e);
192196
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
193197
} finally {
198+
_log.debug(String.format("Attempt to close SSE connection"));
194199
try {
195200
_ongoingResponse.get().close();
196201
} catch (IOException e) {
202+
_log.debug(String.format("SSE connection closing exception: %s", e.getMessage()));
197203
_log.debug(e.getMessage());
198204
}
199205

client/src/test/java/io/split/engine/common/PushManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void setUp() {
4040
_segmentsWorkerImp,
4141
_pushStatusTracker,
4242
_telemetryStorage,
43-
null);
43+
null,
44+
180);
4445
}
4546

4647
@Test

okhttp-modules/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
<parent>
66
<artifactId>java-client-parent</artifactId>
77
<groupId>io.split.client</groupId>
8-
<version>4.18.1-rc2</version>
8+
<version>4.18.1-rc3</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
11-
<version>4.18.1-rc2</version>
11+
<version>4.18.1-rc3</version>
1212
<artifactId>okhttp-modules</artifactId>
1313
<packaging>jar</packaging>
1414
<name>http-modules</name>

pluggable-storage/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<artifactId>java-client-parent</artifactId>
88
<groupId>io.split.client</groupId>
9-
<version>4.18.1-rc2</version>
9+
<version>4.18.1-rc3</version>
1010
</parent>
1111

1212
<version>2.1.0</version>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>io.split.client</groupId>
66
<artifactId>java-client-parent</artifactId>
7-
<version>4.18.1-rc2</version>
7+
<version>4.18.1-rc3</version>
88
<dependencyManagement>
99
<dependencies>
1010
<dependency>

redis-wrapper/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<artifactId>java-client-parent</artifactId>
88
<groupId>io.split.client</groupId>
9-
<version>4.18.1-rc2</version>
9+
<version>4.18.1-rc3</version>
1010
</parent>
1111
<artifactId>redis-wrapper</artifactId>
1212
<version>3.1.1</version>

0 commit comments

Comments
 (0)