Skip to content
This repository was archived by the owner on Dec 19, 2025. It is now read-only.

Commit 37fdd92

Browse files
RxNettySSE Client Factory
Temporary fix until ReactiveX/RxNetty#89 is resolved.
1 parent 8f33dd1 commit 37fdd92

12 files changed

Lines changed: 148 additions & 39 deletions

File tree

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.reactivex.lab.edge.common;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.EventLoopGroup;
5+
import io.netty.channel.nio.NioEventLoopGroup;
6+
import io.reactivex.netty.channel.RxDefaultThreadFactory;
7+
import io.reactivex.netty.pipeline.PipelineConfigurators;
8+
import io.reactivex.netty.protocol.http.client.HttpClient;
9+
import io.reactivex.netty.protocol.http.client.HttpClientBuilder;
10+
import io.reactivex.netty.protocol.http.server.HttpServer;
11+
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
12+
import io.reactivex.netty.protocol.http.server.RequestHandler;
13+
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
14+
15+
public class RxNettySSE {
16+
17+
private static final EventLoopGroup group = new NioEventLoopGroup(0 /* means default in netty */, new RxNettyThreadFactory());
18+
19+
public static HttpServer<ByteBuf, ServerSentEvent> createHttpServer(int port,
20+
RequestHandler<ByteBuf, ServerSentEvent> requestHandler) {
21+
return new HttpServerBuilder<ByteBuf, ServerSentEvent>(port, requestHandler)
22+
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> sseServerConfigurator())
23+
.eventLoop(group)
24+
.build();
25+
}
26+
27+
public static HttpClient<ByteBuf, ServerSentEvent> createHttpClient(String host, int port) {
28+
return new HttpClientBuilder<ByteBuf, ServerSentEvent>(host, port)
29+
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> sseClientConfigurator())
30+
.eventloop(group)
31+
.build();
32+
33+
}
34+
35+
public static class RxNettyThreadFactory extends RxDefaultThreadFactory {
36+
37+
public RxNettyThreadFactory() {
38+
super("rx-netty-selector");
39+
}
40+
}
41+
42+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.reactivex.lab.edge.common;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.EventLoopGroup;
5+
import io.netty.channel.nio.NioEventLoopGroup;
6+
import io.reactivex.netty.channel.RxDefaultThreadFactory;
7+
import io.reactivex.netty.pipeline.PipelineConfigurators;
8+
import io.reactivex.netty.protocol.http.client.HttpClient;
9+
import io.reactivex.netty.protocol.http.client.HttpClientBuilder;
10+
import io.reactivex.netty.protocol.http.server.HttpServer;
11+
import io.reactivex.netty.protocol.http.server.HttpServerBuilder;
12+
import io.reactivex.netty.protocol.http.server.RequestHandler;
13+
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
14+
15+
public class RxNettySSE {
16+
17+
private static final EventLoopGroup group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new RxNettyThreadFactory());
18+
19+
public static HttpServer<ByteBuf, ServerSentEvent> createHttpServer(int port,
20+
RequestHandler<ByteBuf, ServerSentEvent> requestHandler) {
21+
return new HttpServerBuilder<ByteBuf, ServerSentEvent>(port, requestHandler)
22+
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> sseServerConfigurator())
23+
.eventLoop(group)
24+
.build();
25+
}
26+
27+
public static HttpClient<ByteBuf, ServerSentEvent> createHttpClient(String host, int port) {
28+
return new HttpClientBuilder<ByteBuf, ServerSentEvent>(host, port)
29+
.pipelineConfigurator(PipelineConfigurators.<ByteBuf> sseClientConfigurator())
30+
.eventloop(group)
31+
.build();
32+
33+
}
34+
35+
public static class RxNettyThreadFactory extends RxDefaultThreadFactory {
36+
37+
public RxNettyThreadFactory() {
38+
super("rx-netty-selector");
39+
}
40+
}
41+
42+
}

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/EdgeServer.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package io.reactivex.lab.edge.nf;
22

3-
import io.netty.buffer.ByteBuf;
43
import io.netty.handler.codec.http.HttpResponseStatus;
5-
import io.reactivex.netty.RxNetty;
6-
import io.reactivex.netty.pipeline.PipelineConfigurators;
4+
import io.reactivex.lab.edge.common.RxNettySSE;
75
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
86
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
97
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
@@ -23,7 +21,7 @@ public static void main(String... args) {
2321
startHystrixMetricsStream();
2422

2523
// start web services => http://localhost:8080
26-
RxNetty.createHttpServer(8080, (request, response) -> {
24+
RxNettySSE.createHttpServer(8080, (request, response) -> {
2725
System.out.println("Server => Request: " + request.getPath());
2826
try {
2927
if (request.getPath().equals("/device/home")) {
@@ -42,11 +40,11 @@ public static void main(String... args) {
4240
response.setStatus(HttpResponseStatus.BAD_REQUEST);
4341
return response.writeStringAndFlush("Error 500: Bad Request\n" + e.getMessage() + "\n");
4442
}
45-
}, PipelineConfigurators.<ByteBuf> sseServerConfigurator()).startAndWait();
43+
}).startAndWait();
4644
}
4745

4846
private static void startHystrixMetricsStream() {
49-
RxNetty.createHttpServer(9999, (request, response) -> {
47+
RxNettySSE.createHttpServer(9999, (request, response) -> {
5048
System.out.println("Start Hystrix Stream");
5149
response.getHeaders().add("content-type", "text/event-stream");
5250
return Observable.create((Subscriber<? super Void> s) -> {
@@ -65,7 +63,7 @@ private static void startHystrixMetricsStream() {
6563
});
6664
}).subscribe());
6765
});
68-
}, PipelineConfigurators.<ByteBuf> sseServerConfigurator()).start();
66+
}).start();
6967
}
7068

7169
final static Observable<String> streamPoller = Observable.create((Subscriber<? super String> s) -> {

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/clients/BookmarksCommand.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package io.reactivex.lab.edge.nf.clients;
22

33
import io.netty.buffer.ByteBuf;
4+
import io.reactivex.lab.edge.common.RxNettySSE;
45
import io.reactivex.lab.edge.common.SimpleJson;
56
import io.reactivex.lab.edge.nf.clients.BookmarksCommand.Bookmark;
67
import io.reactivex.lab.edge.nf.clients.PersonalizedCatalogCommand.Video;
7-
import io.reactivex.netty.RxNetty;
88
import io.reactivex.netty.pipeline.PipelineConfigurators;
99
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
1010

@@ -33,7 +33,7 @@ public BookmarksCommand(List<Video> videos) {
3333

3434
@Override
3535
protected Observable<Bookmark> run() {
36-
return RxNetty.createHttpClient("localhost", 9190, PipelineConfigurators.<ByteBuf> sseClientConfigurator())
36+
return RxNettySSE.createHttpClient("localhost", 9190)
3737
.submit(HttpClientRequest.createGet("/bookmarks?" + UrlGenerator.generate("videoId", videos)))
3838
.flatMap(r -> {
3939
Observable<Bookmark> bytesToJson = r.getContent().map(sse -> {

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/clients/GeoCommand.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package io.reactivex.lab.edge.nf.clients;
22

3-
import io.netty.buffer.ByteBuf;
3+
import io.reactivex.lab.edge.common.RxNettySSE;
44
import io.reactivex.lab.edge.common.SimpleJson;
55
import io.reactivex.lab.edge.nf.clients.GeoCommand.GeoIP;
6-
import io.reactivex.netty.RxNetty;
7-
import io.reactivex.netty.pipeline.PipelineConfigurators;
86
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
9-
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
107

118
import java.util.List;
129
import java.util.Map;
@@ -27,7 +24,7 @@ public GeoCommand(List<String> ips) {
2724

2825
@Override
2926
protected Observable<GeoIP> run() {
30-
return RxNetty.createHttpClient("localhost", 9191, PipelineConfigurators.<ByteBuf> sseClientConfigurator())
27+
return RxNettySSE.createHttpClient("localhost", 9191)
3128
.submit(HttpClientRequest.createGet("/geo?" + UrlGenerator.generate("ip", ips)))
3229
.flatMap(r -> {
3330
Observable<GeoIP> bytesToJson = r.getContent().map(sse -> {

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/clients/PersonalizedCatalogCommand.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package io.reactivex.lab.edge.nf.clients;
22

3-
import io.netty.buffer.ByteBuf;
3+
import io.reactivex.lab.edge.common.RxNettySSE;
44
import io.reactivex.lab.edge.common.SimpleJson;
55
import io.reactivex.lab.edge.nf.clients.PersonalizedCatalogCommand.Catalog;
66
import io.reactivex.lab.edge.nf.clients.UserCommand.User;
7-
import io.reactivex.netty.RxNetty;
8-
import io.reactivex.netty.pipeline.PipelineConfigurators;
97
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
108

119
import java.util.Arrays;
@@ -33,7 +31,7 @@ public PersonalizedCatalogCommand(List<User> users) {
3331

3432
@Override
3533
protected Observable<Catalog> run() {
36-
return RxNetty.createHttpClient("localhost", 9192, PipelineConfigurators.<ByteBuf> sseClientConfigurator())
34+
return RxNettySSE.createHttpClient("localhost", 9192)
3735
.submit(HttpClientRequest.createGet("/catalog?" + UrlGenerator.generate("userId", users)))
3836
.flatMap(r -> {
3937
Observable<Catalog> bytesToJson = r.getContent().map(sse -> {

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/clients/RatingsCommand.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package io.reactivex.lab.edge.nf.clients;
22

3-
import io.netty.buffer.ByteBuf;
3+
import io.reactivex.lab.edge.common.RxNettySSE;
44
import io.reactivex.lab.edge.common.SimpleJson;
55
import io.reactivex.lab.edge.nf.clients.PersonalizedCatalogCommand.Video;
66
import io.reactivex.lab.edge.nf.clients.RatingsCommand.Rating;
7-
import io.reactivex.netty.RxNetty;
8-
import io.reactivex.netty.pipeline.PipelineConfigurators;
97
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
108

119
import java.util.Arrays;
@@ -32,7 +30,7 @@ public RatingsCommand(List<Video> videos) {
3230

3331
@Override
3432
protected Observable<Rating> run() {
35-
return RxNetty.createHttpClient("localhost", 9193, PipelineConfigurators.<ByteBuf> sseClientConfigurator())
33+
return RxNettySSE.createHttpClient("localhost", 9193)
3634
.submit(HttpClientRequest.createGet("/ratings?" + UrlGenerator.generate("videoId", videos)))
3735
.flatMap(r -> {
3836
Observable<Rating> bytesToJson = r.getContent().map(sse -> {

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/clients/SocialCommand.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package io.reactivex.lab.edge.nf.clients;
22

3-
import io.netty.buffer.ByteBuf;
3+
import io.reactivex.lab.edge.common.RxNettySSE;
44
import io.reactivex.lab.edge.common.SimpleJson;
55
import io.reactivex.lab.edge.nf.clients.SocialCommand.Social;
66
import io.reactivex.lab.edge.nf.clients.UserCommand.User;
7-
import io.reactivex.netty.RxNetty;
8-
import io.reactivex.netty.pipeline.PipelineConfigurators;
97
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
10-
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
118

129
import java.util.Arrays;
1310
import java.util.List;
@@ -34,7 +31,7 @@ public SocialCommand(List<User> users) {
3431

3532
@Override
3633
protected Observable<Social> run() {
37-
return RxNetty.createHttpClient("localhost", 9194, PipelineConfigurators.<ByteBuf> sseClientConfigurator())
34+
return RxNettySSE.createHttpClient("localhost", 9194)
3835
.submit(HttpClientRequest.createGet("/social?" + UrlGenerator.generate("userId", users)))
3936
.flatMap(r -> {
4037
Observable<Social> bytesToJson = r.getContent().map(sse -> {

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/clients/UserCommand.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package io.reactivex.lab.edge.nf.clients;
22

3-
import io.netty.buffer.ByteBuf;
3+
import io.reactivex.lab.edge.common.RxNettySSE;
44
import io.reactivex.lab.edge.common.SimpleJson;
55
import io.reactivex.lab.edge.nf.clients.UserCommand.User;
6-
import io.reactivex.netty.RxNetty;
7-
import io.reactivex.netty.pipeline.PipelineConfigurators;
86
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
97

108
import java.util.List;
@@ -26,7 +24,7 @@ public UserCommand(List<String> userIds) {
2624

2725
@Override
2826
protected Observable<User> run() {
29-
return RxNetty.createHttpClient("localhost", 9195, PipelineConfigurators.<ByteBuf> sseClientConfigurator())
27+
return RxNettySSE.createHttpClient("localhost", 9195)
3028
.submit(HttpClientRequest.createGet("/user?" + UrlGenerator.generate("userId", userIds)))
3129
.flatMap(r -> {
3230
Observable<User> user = r.getContent().map(sse -> {

rl-nf-edge-tier/src/main/java/io/reactivex/lab/edge/nf/clients/VideoMetadataCommand.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package io.reactivex.lab.edge.nf.clients;
22

3-
import io.netty.buffer.ByteBuf;
3+
import io.reactivex.lab.edge.common.RxNettySSE;
44
import io.reactivex.lab.edge.common.SimpleJson;
55
import io.reactivex.lab.edge.nf.clients.PersonalizedCatalogCommand.Video;
66
import io.reactivex.lab.edge.nf.clients.VideoMetadataCommand.VideoMetadata;
7-
import io.reactivex.netty.RxNetty;
8-
import io.reactivex.netty.pipeline.PipelineConfigurators;
97
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
108

119
import java.util.Arrays;
@@ -33,7 +31,7 @@ public VideoMetadataCommand(List<Video> videos) {
3331

3432
@Override
3533
protected Observable<VideoMetadata> run() {
36-
return RxNetty.createHttpClient("localhost", 9196, PipelineConfigurators.<ByteBuf> sseClientConfigurator())
34+
return RxNettySSE.createHttpClient("localhost", 9196)
3735
.submit(HttpClientRequest.createGet("/metadata?" + UrlGenerator.generate("videoId", videos)))
3836
.flatMap(r -> {
3937
Observable<VideoMetadata> bytesToJson = r.getContent().map(sse -> {

0 commit comments

Comments
 (0)