Skip to content

Commit 50dc950

Browse files
authored
Merge pull request #508 from splitio/development
Release 4.12.1: fixed deadlock for VirtualThread
2 parents e84af2d + 23074ab commit 50dc950

File tree

10 files changed

+152
-62
lines changed

10 files changed

+152
-62
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
4.12.1 (Jun 10, 2024)
2+
- Fixed deadlock for virtual thread in Push Manager and SSE Client.
3+
14
4.12.0 (May 15, 2024)
25
- Added support for targeting rules based on semantic versions (https://semver.org/).
36
- Added the logic to handle correctly when the SDK receives an unsupported Matcher type.

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.12.0</version>
8+
<version>4.12.1</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.concurrent.ThreadFactory;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicLong;
33+
import java.util.concurrent.locks.Lock;
34+
import java.util.concurrent.locks.ReentrantLock;
3335

3436
import static com.google.common.base.Preconditions.checkNotNull;
3537
import static io.split.client.utils.SplitExecutorFactory.buildSingleThreadScheduledExecutor;
@@ -42,6 +44,7 @@ public class PushManagerImp implements PushManager {
4244
private final FeatureFlagsWorker _featureFlagsWorker;
4345
private final Worker<SegmentQueueDto> _segmentWorker;
4446
private final PushStatusTracker _pushStatusTracker;
47+
private static final Lock lock = new ReentrantLock();
4548

4649
private Future<?> _nextTokenRefreshTask;
4750
private final ScheduledExecutorService _scheduledExecutorService;
@@ -92,37 +95,42 @@ public static PushManagerImp build(Synchronizer synchronizer,
9295
}
9396

9497
@Override
95-
public synchronized void start() {
96-
AuthenticationResponse response = _authApiClient.Authenticate();
97-
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
98-
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
99-
_expirationTime.set(response.getExpiration());
100-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
101-
response.getExpiration(), System.currentTimeMillis()));
102-
return;
103-
}
104-
105-
stop();
106-
if (response.isRetry()) {
107-
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
108-
} else {
109-
_pushStatusTracker.forcePushDisable();
98+
public void start() {
99+
try {
100+
lock.lock();
101+
AuthenticationResponse response = _authApiClient.Authenticate();
102+
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
103+
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
104+
_expirationTime.set(response.getExpiration());
105+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
106+
response.getExpiration(), System.currentTimeMillis()));
107+
return;
108+
}
109+
110+
cleanUpResources();
111+
if (response.isRetry()) {
112+
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
113+
} else {
114+
_pushStatusTracker.forcePushDisable();
115+
}
116+
} finally {
117+
lock.unlock();
110118
}
111119
}
112120

113121
@Override
114-
public synchronized void stop() {
115-
_log.debug("Stopping PushManagerImp");
116-
_eventSourceClient.stop();
117-
stopWorkers();
118-
if (_nextTokenRefreshTask != null) {
119-
_log.debug("Cancel nextTokenRefreshTask");
120-
_nextTokenRefreshTask.cancel(false);
122+
public void stop() {
123+
try {
124+
lock.lock();
125+
_log.debug("Stopping PushManagerImp");
126+
cleanUpResources();
127+
} finally {
128+
lock.unlock();
121129
}
122130
}
123131

124132
@Override
125-
public synchronized void scheduleConnectionReset() {
133+
public void scheduleConnectionReset() {
126134
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
127135
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
128136
_log.debug("Starting scheduleNextTokenRefresh ...");
@@ -142,14 +150,23 @@ private boolean startSse(String token, String channels) {
142150
}
143151

144152
@Override
145-
public synchronized void startWorkers() {
153+
public void startWorkers() {
146154
_featureFlagsWorker.start();
147155
_segmentWorker.start();
148156
}
149157

150158
@Override
151-
public synchronized void stopWorkers() {
159+
public void stopWorkers() {
152160
_featureFlagsWorker.stop();
153161
_segmentWorker.stop();
154162
}
163+
164+
private void cleanUpResources() {
165+
_eventSourceClient.stop();
166+
stopWorkers();
167+
if (_nextTokenRefreshTask != null) {
168+
_log.debug("Cancel nextTokenRefreshTask");
169+
_nextTokenRefreshTask.cancel(false);
170+
}
171+
}
155172
}

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ private void startPollingMode() {
218218
long howLong = _backoff.interval();
219219
_log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong));
220220
_synchronizer.startPeriodicFetching();
221-
_pushManager.stopWorkers();
222221
_pushManager.stop();
223222
Thread.sleep(howLong * 1000);
224223
_incomingPushStatus.clear();

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import io.split.telemetry.domain.enums.StreamEventsEnum;
77
import io.split.telemetry.storage.TelemetryRuntimeProducer;
88
import org.apache.hc.client5.http.classic.methods.HttpGet;
9-
import org.apache.hc.client5.http.classic.methods.HttpPost;
109
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1110
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
1211
import org.slf4j.Logger;
@@ -25,6 +24,8 @@
2524
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.atomic.AtomicBoolean;
2726
import java.util.concurrent.atomic.AtomicReference;
27+
import java.util.concurrent.locks.Lock;
28+
import java.util.concurrent.locks.ReentrantLock;
2829
import java.util.function.Function;
2930

3031
import static com.google.common.base.Preconditions.checkNotNull;
@@ -49,6 +50,7 @@ private enum ConnectionState {
4950
private final static String SOCKET_CLOSED_MESSAGE = "Socket closed";
5051
private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n";
5152
private final static long CONNECT_TIMEOUT = 30000;
53+
private static final Lock lock = new ReentrantLock();
5254
private static final Logger _log = LoggerFactory.getLogger(SSEClient.class);
5355
private final ExecutorService _connectionExecutor;
5456
private final CloseableHttpClient _client;
@@ -59,7 +61,6 @@ private enum ConnectionState {
5961
private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference<>();
6062
private AtomicBoolean _forcedStop;
6163
private final RequestDecorator _requestDecorator;
62-
6364
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
6465

6566
public SSEClient(Function<RawEvent, Void> eventCallback,
@@ -77,47 +78,57 @@ public SSEClient(Function<RawEvent, Void> eventCallback,
7778
_requestDecorator = requestDecorator;
7879
}
7980

80-
public synchronized boolean open(URI uri) {
81-
if (isOpen()) {
82-
_log.info("SSEClient already open.");
83-
return false;
84-
}
85-
86-
_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);
87-
88-
CountDownLatch signal = new CountDownLatch(1);
89-
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
81+
public boolean open(URI uri) {
9082
try {
91-
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
83+
lock.lock();
84+
if (isOpen()) {
85+
_log.info("SSEClient already open.");
9286
return false;
9387
}
94-
} catch (InterruptedException e) {
95-
Thread.currentThread().interrupt();
96-
if(e.getMessage() == null){
97-
_log.info("The thread was interrupted while opening SSEClient");
88+
89+
_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);
90+
91+
CountDownLatch signal = new CountDownLatch(1);
92+
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
93+
try {
94+
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
95+
return false;
96+
}
97+
} catch (InterruptedException e) {
98+
Thread.currentThread().interrupt();
99+
if(e.getMessage() == null){
100+
_log.info("The thread was interrupted while opening SSEClient");
101+
return false;
102+
}
103+
_log.info(e.getMessage());
98104
return false;
99105
}
100-
_log.info(e.getMessage());
101-
return false;
106+
return isOpen();
107+
} finally {
108+
lock.unlock();
102109
}
103-
return isOpen();
104110
}
105111

106112
public boolean isOpen() {
107113
return (ConnectionState.OPEN.equals(_state.get()));
108114
}
109115

110-
public synchronized void close() {
111-
_forcedStop.set(true);
112-
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
113-
if (_ongoingResponse.get() != null) {
114-
try {
115-
_ongoingRequest.get().abort();
116-
_ongoingResponse.get().close();
117-
} catch (IOException e) {
118-
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
116+
public void close() {
117+
try {
118+
lock.lock();
119+
_forcedStop.set(true);
120+
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
121+
if (_ongoingResponse.get() != null) {
122+
try {
123+
_ongoingRequest.get().abort();
124+
_ongoingResponse.get().close();
125+
} catch (IOException e) {
126+
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
127+
}
119128
}
120129
}
130+
} finally {
131+
lock.unlock();
121132
}
122133
}
123134

client/src/test/java/io/split/engine/common/PushManagerTest.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,22 @@ public class PushManagerTest {
2222
private PushManager _pushManager;
2323
private PushStatusTracker _pushStatusTracker;
2424
private TelemetryStorage _telemetryStorage;
25+
private FeatureFlagsWorker _featureFlagsWorker;
26+
private SegmentsWorkerImp _segmentsWorkerImp;
2527

2628
@Before
2729
public void setUp() {
30+
_featureFlagsWorker = Mockito.mock(FeatureFlagsWorker.class);
31+
_segmentsWorkerImp = Mockito.mock(SegmentsWorkerImp.class);
2832
_authApiClient = Mockito.mock(AuthApiClient.class);
2933
_eventSourceClient = Mockito.mock(EventSourceClient.class);
3034
_backoff = Mockito.mock(Backoff.class);
3135
_pushStatusTracker = Mockito.mock(PushStatusTrackerImp.class);
3236
_telemetryStorage = new InMemoryTelemetryStorage();
3337
_pushManager = new PushManagerImp(_authApiClient,
3438
_eventSourceClient,
35-
Mockito.mock(FeatureFlagsWorker.class),
36-
Mockito.mock(SegmentsWorkerImp.class),
39+
_featureFlagsWorker,
40+
_segmentsWorkerImp,
3741
_pushStatusTracker,
3842
_telemetryStorage,
3943
null);
@@ -107,4 +111,60 @@ public void startWithPushDisabledAndRetryTrueShouldConnect() throws InterruptedE
107111
Thread.sleep(1500);
108112
Mockito.verify(_pushStatusTracker, Mockito.times(1)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
109113
}
114+
115+
116+
@Test
117+
public void startAndStop() throws InterruptedException {
118+
AuthenticationResponse response = new AuthenticationResponse(true, "token-test", "channels-test", 1, false);
119+
120+
Mockito.when(_authApiClient.Authenticate())
121+
.thenReturn(response);
122+
123+
Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
124+
.thenReturn(true);
125+
126+
_pushManager.start();
127+
128+
Mockito.verify(_authApiClient, Mockito.times(1)).Authenticate();
129+
Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response.getChannels(), response.getToken());
130+
131+
Thread.sleep(1500);
132+
133+
Mockito.verify(_pushStatusTracker, Mockito.times(0)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
134+
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forcePushDisable();
135+
Assert.assertEquals(1, _telemetryStorage.popStreamingEvents().size());
136+
137+
_pushManager.stop();
138+
139+
Mockito.verify(_eventSourceClient, Mockito.times(1)).stop();
140+
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).stop();
141+
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).stop();
142+
}
143+
144+
@Test
145+
public void validateStartWorkers() {
146+
_pushManager.startWorkers();
147+
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).start();
148+
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).start();
149+
}
150+
151+
@Test
152+
public void validateScheduleConnectionReset() throws InterruptedException {
153+
AuthenticationResponse response = new AuthenticationResponse(false, "token-test", "channels-test", 3, false);
154+
155+
Mockito.when(_authApiClient.Authenticate())
156+
.thenReturn(response);
157+
158+
Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
159+
.thenReturn(true);
160+
161+
_pushManager.start();
162+
163+
_pushManager.scheduleConnectionReset();
164+
Thread.sleep(1000);
165+
166+
Mockito.verify(_eventSourceClient, Mockito.times(3)).stop();
167+
Mockito.verify(_featureFlagsWorker, Mockito.times(3)).stop();
168+
Mockito.verify(_segmentsWorkerImp, Mockito.times(3)).stop();
169+
}
110170
}

pluggable-storage/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<artifactId>java-client-parent</artifactId>
88
<groupId>io.split.client</groupId>
9-
<version>4.12.0</version>
9+
<version>4.12.1</version>
1010
</parent>
1111

1212
<version>2.1.0</version>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>io.split.client</groupId>
66
<artifactId>java-client-parent</artifactId>
7-
<version>4.12.0</version>
7+
<version>4.12.1</version>
88
<dependencyManagement>
99
<dependencies>
1010
<dependency>

redis-wrapper/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<artifactId>java-client-parent</artifactId>
88
<groupId>io.split.client</groupId>
9-
<version>4.12.0</version>
9+
<version>4.12.1</version>
1010
</parent>
1111
<artifactId>redis-wrapper</artifactId>
1212
<version>3.1.0</version>

testing/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.12.0</version>
8+
<version>4.12.1</version>
99
</parent>
1010
<artifactId>java-client-testing</artifactId>
1111
<packaging>jar</packaging>

0 commit comments

Comments
 (0)