Skip to content

Commit 06f84cc

Browse files
authored
Merge pull request #493 from splitio/task/streamingDecorator
Task/streaming decorator
2 parents 537c8cf + 1df6d66 commit 06f84cc

File tree

14 files changed

+61
-34
lines changed

14 files changed

+61
-34
lines changed

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.12.0-rc1</version>
8+
<version>4.12.0-rc2</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.split.client;
2+
3+
import io.split.client.CustomHeaderDecorator;
4+
import io.split.client.dtos.RequestContext;
5+
6+
import java.util.HashMap;
7+
import java.util.List;
8+
import java.util.Map;
9+
10+
class NoOpHeaderDecorator implements CustomHeaderDecorator {
11+
public NoOpHeaderDecorator() {
12+
}
13+
14+
@Override
15+
public Map<String, List<String>> getHeaderOverrides(RequestContext context) {
16+
return new HashMap<>();
17+
}
18+
}

client/src/main/java/io/split/client/RequestDecorator.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.split.client.dtos.RequestContext;
44

5-
//`import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
65
import org.apache.hc.core5.http.HttpRequest;
76
import org.apache.hc.core5.http.Header;
87

@@ -14,16 +13,6 @@
1413
import java.util.Set;
1514
import java.util.List;
1615

17-
class NoOpHeaderDecorator implements CustomHeaderDecorator {
18-
public NoOpHeaderDecorator() {
19-
}
20-
21-
@Override
22-
public Map<String, List<String>> getHeaderOverrides(RequestContext context) {
23-
return new HashMap<>();
24-
}
25-
}
26-
2716
public final class RequestDecorator {
2817
CustomHeaderDecorator _headerDecorator;
2918

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
187187
_gates = new SDKReadinessGates();
188188

189189
// HttpClient
190-
_splitHttpClient = buildSplitHttpClient(apiToken, config, _sdkMetadata);
190+
RequestDecorator requestDecorator = new RequestDecorator(config.customHeaderDecorator());
191+
_splitHttpClient = buildSplitHttpClient(apiToken, config, _sdkMetadata, requestDecorator);
191192

192193
// Roots
193194
_rootTarget = URI.create(config.endpoint());
@@ -258,7 +259,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
258259
// SyncManager
259260
SplitTasks splitTasks = SplitTasks.build(_splitSynchronizationTask, _segmentSynchronizationTaskImp,
260261
_impressionsManager, _eventsTask, _telemetrySyncTask, _uniqueKeysTracker);
261-
SplitAPI splitAPI = SplitAPI.build(_splitHttpClient, buildSSEdHttpClient(apiToken, config, _sdkMetadata));
262+
SplitAPI splitAPI = SplitAPI.build(_splitHttpClient, buildSSEdHttpClient(apiToken, config, _sdkMetadata), requestDecorator);
262263

263264
_syncManager = SyncManagerImp.build(splitTasks, _splitFetcher, splitCache, splitAPI,
264265
segmentCache, _gates, _telemetryStorageProducer, _telemetrySynchronizer, config, splitParser,
@@ -495,7 +496,7 @@ public boolean isDestroyed() {
495496
}
496497

497498
private static SplitHttpClient buildSplitHttpClient(String apiToken, SplitClientConfig config,
498-
SDKMetadata sdkMetadata)
499+
SDKMetadata sdkMetadata, RequestDecorator requestDecorator)
499500
throws URISyntaxException {
500501
SSLConnectionSocketFactory sslSocketFactory = SSLConnectionSocketFactoryBuilder.create()
501502
.setSslContext(SSLContexts.createSystemDefault())
@@ -529,7 +530,7 @@ private static SplitHttpClient buildSplitHttpClient(String apiToken, SplitClient
529530
}
530531

531532
return SplitHttpClientImpl.create(httpClientbuilder.build(),
532-
new RequestDecorator(config.customHeaderDecorator()),
533+
requestDecorator,
533534
apiToken,
534535
sdkMetadata);
535536
}

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static PushManagerImp build(Synchronizer synchronizer,
8383
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
8484
return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer),
8585
EventSourceClientImp.build(streamingUrl, featureFlagsWorker, segmentWorker, pushStatusTracker, splitAPI.getSseHttpClient(),
86-
telemetryRuntimeProducer, threadFactory),
86+
telemetryRuntimeProducer, threadFactory, splitAPI.getRequestDecorator()),
8787
featureFlagsWorker,
8888
segmentWorker,
8989
pushStatusTracker,

client/src/main/java/io/split/engine/common/SplitAPI.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.common;
22

3+
import io.split.client.RequestDecorator;
34
import io.split.service.SplitHttpClient;
45
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
56
import org.slf4j.Logger;
@@ -9,15 +10,17 @@ public class SplitAPI {
910

1011
private final SplitHttpClient _httpClient;
1112
private final CloseableHttpClient _sseHttpClient;
13+
private final RequestDecorator _requestDecorator;
1214
private static final Logger _log = LoggerFactory.getLogger(SplitAPI.class);
1315

14-
private SplitAPI(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient) {
16+
private SplitAPI(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient, RequestDecorator requestDecorator) {
1517
_httpClient = httpClient;
1618
_sseHttpClient = sseHttpClient;
19+
_requestDecorator = requestDecorator;
1720
}
1821

19-
public static SplitAPI build(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient){
20-
return new SplitAPI(httpClient,sseHttpClient);
22+
public static SplitAPI build(SplitHttpClient httpClient, CloseableHttpClient sseHttpClient, RequestDecorator requestDecorator){
23+
return new SplitAPI(httpClient, sseHttpClient, requestDecorator);
2124
}
2225

2326
public SplitHttpClient getHttpClient() {
@@ -28,6 +31,8 @@ public CloseableHttpClient getSseHttpClient() {
2831
return _sseHttpClient;
2932
}
3033

34+
public RequestDecorator getRequestDecorator() { return _requestDecorator; }
35+
3136
public void close(){
3237
try {
3338
_sseHttpClient.close();

client/src/main/java/io/split/engine/sse/EventSourceClientImp.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.base.Strings;
5+
import io.split.client.RequestDecorator;
56
import io.split.engine.sse.client.RawEvent;
67
import io.split.engine.sse.client.SSEClient;
78
import io.split.engine.sse.dtos.SegmentQueueDto;
@@ -40,7 +41,8 @@ public class EventSourceClientImp implements EventSourceClient {
4041
PushStatusTracker pushStatusTracker,
4142
CloseableHttpClient sseHttpClient,
4243
TelemetryRuntimeProducer telemetryRuntimeProducer,
43-
ThreadFactory threadFactory) {
44+
ThreadFactory threadFactory,
45+
RequestDecorator requestDecorator) {
4446
_baseStreamingUrl = checkNotNull(baseStreamingUrl);
4547
_notificationParser = checkNotNull(notificationParser);
4648
_notificationProcessor = checkNotNull(notificationProcessor);
@@ -51,7 +53,8 @@ public class EventSourceClientImp implements EventSourceClient {
5153
status -> { _pushStatusTracker.handleSseStatus(status); return null; },
5254
sseHttpClient,
5355
telemetryRuntimeProducer,
54-
threadFactory);
56+
threadFactory,
57+
requestDecorator);
5558
_firstEvent = new AtomicBoolean();
5659
}
5760

@@ -61,14 +64,16 @@ public static EventSourceClientImp build(String baseStreamingUrl,
6164
PushStatusTracker pushStatusTracker,
6265
CloseableHttpClient sseHttpClient,
6366
TelemetryRuntimeProducer telemetryRuntimeProducer,
64-
ThreadFactory threadFactory) {
67+
ThreadFactory threadFactory,
68+
RequestDecorator requestDecorator) {
6569
return new EventSourceClientImp(baseStreamingUrl,
6670
new NotificationParserImp(),
6771
NotificationProcessorImp.build(featureFlagsWorker, segmentWorker, pushStatusTracker),
6872
pushStatusTracker,
6973
sseHttpClient,
7074
telemetryRuntimeProducer,
71-
threadFactory);
75+
threadFactory,
76+
requestDecorator);
7277
}
7378

7479
@Override

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package io.split.engine.sse.client;
22

33
import com.google.common.base.Strings;
4+
import io.split.client.RequestDecorator;
45
import io.split.telemetry.domain.StreamingEvent;
56
import io.split.telemetry.domain.enums.StreamEventsEnum;
67
import io.split.telemetry.storage.TelemetryRuntimeProducer;
78
import org.apache.hc.client5.http.classic.methods.HttpGet;
9+
import org.apache.hc.client5.http.classic.methods.HttpPost;
810
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
911
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
1012
import org.slf4j.Logger;
@@ -56,20 +58,23 @@ private enum ConnectionState {
5658
private final AtomicReference<CloseableHttpResponse> _ongoingResponse = new AtomicReference<>();
5759
private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference<>();
5860
private AtomicBoolean _forcedStop;
61+
private final RequestDecorator _requestDecorator;
5962

6063
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
6164

6265
public SSEClient(Function<RawEvent, Void> eventCallback,
6366
Function<StatusMessage, Void> statusCallback,
6467
CloseableHttpClient client,
6568
TelemetryRuntimeProducer telemetryRuntimeProducer,
66-
ThreadFactory threadFactory) {
69+
ThreadFactory threadFactory,
70+
RequestDecorator requestDecorator) {
6771
_eventCallback = eventCallback;
6872
_statusCallback = statusCallback;
6973
_client = client;
7074
_forcedStop = new AtomicBoolean();
7175
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
7276
_connectionExecutor = buildExecutorService(threadFactory, "SPLIT-SSEConnection-%d");
77+
_requestDecorator = requestDecorator;
7378
}
7479

7580
public synchronized boolean open(URI uri) {
@@ -177,7 +182,9 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
177182
}
178183

179184
private boolean establishConnection(URI uri, CountDownLatch signal) {
180-
_ongoingRequest.set(new HttpGet(uri));
185+
HttpGet request = new HttpGet(uri);
186+
request = (HttpGet) _requestDecorator.decorateHeaders(request);
187+
_ongoingRequest.set(request);
181188
try {
182189
_ongoingResponse.set(_client.execute(_ongoingRequest.get()));
183190
if (_ongoingResponse.get().getCode() != 200) {

client/src/test/java/io/split/engine/sse/EventSourceClientTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.split.engine.sse;
22

33
import io.split.SSEMockServer;
4+
import io.split.client.RequestDecorator;
45
import io.split.engine.sse.client.SSEClient;
56
import io.split.engine.sse.dtos.ErrorNotification;
67
import io.split.engine.sse.dtos.FeatureFlagChangeNotification;
@@ -42,7 +43,7 @@ public void startShouldConnect() throws IOException {
4243
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class);
4344
sseServer.start();
4445

45-
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null);
46+
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null, new RequestDecorator(null));
4647

4748
boolean result = eventSourceClient.start("channel-test", "token-test");
4849

@@ -57,7 +58,7 @@ public void startShouldReconnect() throws IOException {
5758
SSEMockServer sseServer = buildSSEMockServer(eventQueue);
5859
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class);
5960
sseServer.start();
60-
EventSourceClient eventSourceClient = new EventSourceClientImp("http://fake:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null);
61+
EventSourceClient eventSourceClient = new EventSourceClientImp("http://fake:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null, new RequestDecorator(null));
6162

6263
boolean result = eventSourceClient.start("channel-test", "token-test");
6364

@@ -74,7 +75,7 @@ public void startAndReceiveNotification() throws IOException {
7475
SSEMockServer sseServer = buildSSEMockServer(eventQueue);
7576
TelemetryRuntimeProducer telemetryRuntimeProducer = Mockito.mock(InMemoryTelemetryStorage.class);
7677
sseServer.start();
77-
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null);
78+
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient(), telemetryRuntimeProducer, null, new RequestDecorator(null));
7879

7980
boolean result = eventSourceClient.start("channel-test", "token-test");
8081

client/src/test/java/io/split/engine/sse/SSEClientTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.sse;
22

3+
import io.split.client.RequestDecorator;
34
import io.split.engine.sse.client.SSEClient;
45
import io.split.telemetry.storage.InMemoryTelemetryStorage;
56
import io.split.telemetry.storage.TelemetryRuntimeProducer;
@@ -38,7 +39,7 @@ public void basicUsageTest() throws URISyntaxException, InterruptedException {
3839
CloseableHttpClient httpClient = httpClientbuilder.build();
3940

4041
SSEClient sse = new SSEClient(e -> null,
41-
s -> null, httpClient, telemetryRuntimeProducer, null);
42+
s -> null, httpClient, telemetryRuntimeProducer, null, new RequestDecorator(null));
4243
sse.open(uri);
4344
Thread.sleep(5000);
4445
sse.close();

0 commit comments

Comments
 (0)