Skip to content

Commit 5fc3a31

Browse files
committed
SDKS-8457
1 parent 68ff698 commit 5fc3a31

File tree

2 files changed

+71
-52
lines changed

2 files changed

+71
-52
lines changed

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.concurrent.ThreadFactory;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicLong;
33+
import java.util.concurrent.locks.Lock;
34+
import java.util.concurrent.locks.ReentrantLock;
3335

3436
import static com.google.common.base.Preconditions.checkNotNull;
3537
import static io.split.client.utils.SplitExecutorFactory.buildSingleThreadScheduledExecutor;
@@ -42,6 +44,8 @@ public class PushManagerImp implements PushManager {
4244
private final FeatureFlagsWorker _featureFlagsWorker;
4345
private final Worker<SegmentQueueDto> _segmentWorker;
4446
private final PushStatusTracker _pushStatusTracker;
47+
private static final Lock startLock = new ReentrantLock();
48+
private static final Lock stopLock = new ReentrantLock();
4549

4650
private Future<?> _nextTokenRefreshTask;
4751
private final ScheduledExecutorService _scheduledExecutorService;
@@ -92,28 +96,38 @@ public static PushManagerImp build(Synchronizer synchronizer,
9296
}
9397

9498
@Override
95-
public synchronized void start() {
96-
AuthenticationResponse response = _authApiClient.Authenticate();
97-
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
98-
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
99-
_expirationTime.set(response.getExpiration());
100-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
101-
response.getExpiration(), System.currentTimeMillis()));
102-
return;
103-
}
104-
105-
cleanUpResources();
106-
if (response.isRetry()) {
107-
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
108-
} else {
109-
_pushStatusTracker.forcePushDisable();
99+
public void start() {
100+
try {
101+
startLock.lock();
102+
AuthenticationResponse response = _authApiClient.Authenticate();
103+
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
104+
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
105+
_expirationTime.set(response.getExpiration());
106+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
107+
response.getExpiration(), System.currentTimeMillis()));
108+
return;
109+
}
110+
111+
cleanUpResources();
112+
if (response.isRetry()) {
113+
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
114+
} else {
115+
_pushStatusTracker.forcePushDisable();
116+
}
117+
} finally {
118+
startLock.unlock();
110119
}
111120
}
112121

113122
@Override
114-
public synchronized void stop() {
115-
_log.debug("Stopping PushManagerImp");
116-
cleanUpResources();
123+
public void stop() {
124+
try {
125+
stopLock.lock();
126+
_log.debug("Stopping PushManagerImp");
127+
cleanUpResources();
128+
} finally {
129+
stopLock.unlock();
130+
}
117131
}
118132

119133
@Override

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import io.split.telemetry.domain.enums.StreamEventsEnum;
77
import io.split.telemetry.storage.TelemetryRuntimeProducer;
88
import org.apache.hc.client5.http.classic.methods.HttpGet;
9-
import org.apache.hc.client5.http.classic.methods.HttpPost;
109
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1110
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
1211
import org.slf4j.Logger;
@@ -25,6 +24,8 @@
2524
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.atomic.AtomicBoolean;
2726
import java.util.concurrent.atomic.AtomicReference;
27+
import java.util.concurrent.locks.Lock;
28+
import java.util.concurrent.locks.ReentrantLock;
2829
import java.util.function.Function;
2930

3031
import static com.google.common.base.Preconditions.checkNotNull;
@@ -49,6 +50,8 @@ private enum ConnectionState {
4950
private final static String SOCKET_CLOSED_MESSAGE = "Socket closed";
5051
private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n";
5152
private final static long CONNECT_TIMEOUT = 30000;
53+
private static final Lock openLock = new ReentrantLock();
54+
private static final Lock closeLock = new ReentrantLock();
5255
private static final Logger _log = LoggerFactory.getLogger(SSEClient.class);
5356
private final ExecutorService _connectionExecutor;
5457
private final CloseableHttpClient _client;
@@ -60,7 +63,6 @@ private enum ConnectionState {
6063
private AtomicBoolean _forcedStop;
6164
private final RequestDecorator _requestDecorator;
6265
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
63-
private final AtomicBoolean openGuard = new AtomicBoolean(false);
6466

6567
public SSEClient(Function<RawEvent, Void> eventCallback,
6668
Function<StatusMessage, Void> statusCallback,
@@ -78,53 +80,56 @@ public SSEClient(Function<RawEvent, Void> eventCallback,
7880
}
7981

8082
public boolean open(URI uri) {
81-
if (isOpen()) {
82-
_log.info("SSEClient already open.");
83-
return false;
84-
}
85-
86-
if (!openGuard.compareAndSet(false, true)) {
87-
_log.debug("Open SSEClient already running");
88-
return false;
89-
}
90-
91-
_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);
92-
93-
CountDownLatch signal = new CountDownLatch(1);
94-
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
9583
try {
96-
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
84+
openLock.lock();
85+
if (isOpen()) {
86+
_log.info("SSEClient already open.");
9787
return false;
9888
}
99-
} catch (InterruptedException e) {
100-
Thread.currentThread().interrupt();
101-
if(e.getMessage() == null){
102-
_log.info("The thread was interrupted while opening SSEClient");
89+
90+
_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);
91+
92+
CountDownLatch signal = new CountDownLatch(1);
93+
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
94+
try {
95+
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
96+
return false;
97+
}
98+
} catch (InterruptedException e) {
99+
Thread.currentThread().interrupt();
100+
if(e.getMessage() == null){
101+
_log.info("The thread was interrupted while opening SSEClient");
102+
return false;
103+
}
104+
_log.info(e.getMessage());
103105
return false;
104106
}
105-
_log.info(e.getMessage());
106-
return false;
107+
return isOpen();
107108
} finally {
108-
openGuard.set(false);
109+
openLock.unlock();
109110
}
110-
return isOpen();
111111
}
112112

113113
public boolean isOpen() {
114114
return (ConnectionState.OPEN.equals(_state.get()));
115115
}
116116

117-
public synchronized void close() {
118-
_forcedStop.set(true);
119-
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
120-
if (_ongoingResponse.get() != null) {
121-
try {
122-
_ongoingRequest.get().abort();
123-
_ongoingResponse.get().close();
124-
} catch (IOException e) {
125-
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
117+
public void close() {
118+
try {
119+
closeLock.lock();
120+
_forcedStop.set(true);
121+
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
122+
if (_ongoingResponse.get() != null) {
123+
try {
124+
_ongoingRequest.get().abort();
125+
_ongoingResponse.get().close();
126+
} catch (IOException e) {
127+
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
128+
}
126129
}
127130
}
131+
} finally {
132+
closeLock.unlock();
128133
}
129134
}
130135

0 commit comments

Comments
 (0)