Skip to content

Commit 9a58002

Browse files
committed
Update PushManager and SyncManager
1 parent 426ea5a commit 9a58002

File tree

3 files changed

+76
-13
lines changed

3 files changed

+76
-13
lines changed

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public synchronized void start() {
102102
return;
103103
}
104104

105-
stop();
105+
cleanUpResources();
106106
if (response.isRetry()) {
107107
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
108108
} else {
@@ -113,16 +113,11 @@ public synchronized void start() {
113113
@Override
114114
public synchronized void stop() {
115115
_log.debug("Stopping PushManagerImp");
116-
_eventSourceClient.stop();
117-
stopWorkers();
118-
if (_nextTokenRefreshTask != null) {
119-
_log.debug("Cancel nextTokenRefreshTask");
120-
_nextTokenRefreshTask.cancel(false);
121-
}
116+
cleanUpResources();
122117
}
123118

124119
@Override
125-
public synchronized void scheduleConnectionReset() {
120+
public void scheduleConnectionReset() {
126121
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
127122
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
128123
_log.debug("Starting scheduleNextTokenRefresh ...");
@@ -142,14 +137,23 @@ private boolean startSse(String token, String channels) {
142137
}
143138

144139
@Override
145-
public synchronized void startWorkers() {
140+
public void startWorkers() {
146141
_featureFlagsWorker.start();
147142
_segmentWorker.start();
148143
}
149144

150145
@Override
151-
public synchronized void stopWorkers() {
146+
public void stopWorkers() {
152147
_featureFlagsWorker.stop();
153148
_segmentWorker.stop();
154149
}
150+
151+
private void cleanUpResources() {
152+
_eventSourceClient.stop();
153+
stopWorkers();
154+
if (_nextTokenRefreshTask != null) {
155+
_log.debug("Cancel nextTokenRefreshTask");
156+
_nextTokenRefreshTask.cancel(false);
157+
}
158+
}
155159
}

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ private void startPollingMode() {
218218
long howLong = _backoff.interval();
219219
_log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong));
220220
_synchronizer.startPeriodicFetching();
221-
_pushManager.stopWorkers();
222221
_pushManager.stop();
223222
Thread.sleep(howLong * 1000);
224223
_incomingPushStatus.clear();

client/src/test/java/io/split/engine/common/PushManagerTest.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,22 @@ public class PushManagerTest {
2222
private PushManager _pushManager;
2323
private PushStatusTracker _pushStatusTracker;
2424
private TelemetryStorage _telemetryStorage;
25+
private FeatureFlagsWorker _featureFlagsWorker;
26+
private SegmentsWorkerImp _segmentsWorkerImp;
2527

2628
@Before
2729
public void setUp() {
30+
_featureFlagsWorker = Mockito.mock(FeatureFlagsWorker.class);
31+
_segmentsWorkerImp = Mockito.mock(SegmentsWorkerImp.class);
2832
_authApiClient = Mockito.mock(AuthApiClient.class);
2933
_eventSourceClient = Mockito.mock(EventSourceClient.class);
3034
_backoff = Mockito.mock(Backoff.class);
3135
_pushStatusTracker = Mockito.mock(PushStatusTrackerImp.class);
3236
_telemetryStorage = new InMemoryTelemetryStorage();
3337
_pushManager = new PushManagerImp(_authApiClient,
3438
_eventSourceClient,
35-
Mockito.mock(FeatureFlagsWorker.class),
36-
Mockito.mock(SegmentsWorkerImp.class),
39+
_featureFlagsWorker,
40+
_segmentsWorkerImp,
3741
_pushStatusTracker,
3842
_telemetryStorage,
3943
null);
@@ -107,4 +111,60 @@ public void startWithPushDisabledAndRetryTrueShouldConnect() throws InterruptedE
107111
Thread.sleep(1500);
108112
Mockito.verify(_pushStatusTracker, Mockito.times(1)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
109113
}
114+
115+
116+
@Test
117+
public void startAndStop() throws InterruptedException {
118+
AuthenticationResponse response = new AuthenticationResponse(true, "token-test", "channels-test", 1, false);
119+
120+
Mockito.when(_authApiClient.Authenticate())
121+
.thenReturn(response);
122+
123+
Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
124+
.thenReturn(true);
125+
126+
_pushManager.start();
127+
128+
Mockito.verify(_authApiClient, Mockito.times(1)).Authenticate();
129+
Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response.getChannels(), response.getToken());
130+
131+
Thread.sleep(1500);
132+
133+
Mockito.verify(_pushStatusTracker, Mockito.times(0)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
134+
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forcePushDisable();
135+
Assert.assertEquals(1, _telemetryStorage.popStreamingEvents().size());
136+
137+
_pushManager.stop();
138+
139+
Mockito.verify(_eventSourceClient, Mockito.times(1)).stop();
140+
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).stop();
141+
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).stop();
142+
}
143+
144+
@Test
145+
public void validateStartWorkers() {
146+
_pushManager.startWorkers();
147+
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).start();
148+
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).start();
149+
}
150+
151+
@Test
152+
public void validateScheduleConnectionReset() throws InterruptedException {
153+
AuthenticationResponse response = new AuthenticationResponse(false, "token-test", "channels-test", 3, false);
154+
155+
Mockito.when(_authApiClient.Authenticate())
156+
.thenReturn(response);
157+
158+
Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
159+
.thenReturn(true);
160+
161+
_pushManager.start();
162+
163+
_pushManager.scheduleConnectionReset();
164+
Thread.sleep(1000);
165+
166+
Mockito.verify(_eventSourceClient, Mockito.times(3)).stop();
167+
Mockito.verify(_featureFlagsWorker, Mockito.times(3)).stop();
168+
Mockito.verify(_segmentsWorkerImp, Mockito.times(3)).stop();
169+
}
110170
}

0 commit comments

Comments
 (0)