Skip to content

Commit 8d0c56e

Browse files
committed
Upgraded version
1 parent b248070 commit 8d0c56e

File tree

7 files changed

+68
-89
lines changed

7 files changed

+68
-89
lines changed

pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
</profile>
9797
</profiles>
9898
<properties>
99-
<vertx.version>3.5.4</vertx.version>
99+
<vertx.version>3.6.2</vertx.version>
100100
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
101101
<maven.compiler.source>1.8</maven.compiler.source>
102102
<maven.compiler.target>1.8</maven.compiler.target>
@@ -166,7 +166,7 @@
166166
<dependency>
167167
<groupId>io.netty</groupId>
168168
<artifactId>netty-transport-native-epoll</artifactId>
169-
<version>4.1.19.Final</version>
169+
<version>4.1.30.Final</version>
170170
<classifier>linux-x86_64</classifier>
171171
</dependency>
172172
<dependency>
@@ -182,7 +182,7 @@
182182
<dependency>
183183
<groupId>org.mockito</groupId>
184184
<artifactId>mockito-core</artifactId>
185-
<version>2.22.0</version>
185+
<version>2.23.4</version>
186186
<scope>test</scope>
187187
</dependency>
188188
<dependency>
@@ -194,7 +194,7 @@
194194
<dependency>
195195
<groupId>org.hamcrest</groupId>
196196
<artifactId>hamcrest-core</artifactId>
197-
<version>1.3</version>
197+
<version>2.1</version>
198198
<scope>test</scope>
199199
</dependency>
200200
</dependencies>
@@ -203,7 +203,7 @@
203203
<plugin>
204204
<groupId>org.apache.maven.plugins</groupId>
205205
<artifactId>maven-surefire-plugin</artifactId>
206-
<version>2.22.0</version>
206+
<version>2.22.1</version>
207207
<configuration>
208208
<argLine>-Dhazelcast.jmx=true -Dvertx.logger-delegate-factory-class-name=io.vertx.core.logging.Log4j2LogDelegateFactory -Dglue.layers=${project.basedir}/config-layers/common,${project.basedir}/config-layers/test</argLine>
209209
</configuration>
@@ -229,7 +229,7 @@
229229
<plugin>
230230
<groupId>org.jacoco</groupId>
231231
<artifactId>jacoco-maven-plugin</artifactId>
232-
<version>0.8.1</version>
232+
<version>0.8.2</version>
233233
<executions>
234234
<execution>
235235
<id>prepare-agent</id>

src/main/java/in/erail/model/ResponseEvent.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public class ResponseEvent {
2424

2525
public ResponseEvent() {
2626
mMultiValueHeaders = MultiMap.caseInsensitiveMultiMap();
27-
mMultiValueHeaders.add(HttpHeaders.CONTENT_TYPE, MediaType.JAVASCRIPT_UTF_8.toString());
2827
}
2928

3029
public boolean isIsBase64Encoded() {
@@ -83,6 +82,10 @@ public void setMultiValueHeaders(Map<String, String[]> pValue) {
8382
});
8483
}
8584

85+
/**
86+
* Return copy of headers map
87+
* @return
88+
*/
8689
public Map<String, String[]> getMultiValueHeaders() {
8790

8891
Map<String, String[]> result
@@ -122,6 +125,10 @@ public void setHeaders(Map<String, String> pValue) {
122125
});
123126
}
124127

128+
/**
129+
* Return copy of Header Map
130+
* @return
131+
*/
125132
public Map<String, String> getHeaders() {
126133

127134
Map<String, String> result
@@ -157,4 +164,8 @@ public void addHeader(String pHeaderName, String pMediaType) {
157164
public void addHeader(String pHeaderName, MediaType pMediaType) {
158165
addHeader(HttpHeaders.CONTENT_TYPE, pMediaType.toString());
159166
}
167+
168+
public String headerValue(String pHeaderName) {
169+
return mMultiValueHeaders.get(pHeaderName);
170+
}
160171
}

src/main/java/in/erail/route/OpenAPI3RouteBuilder.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ public void process(RoutingContext pRequestContext, String pServiceUniqueId) {
9090
(reply) -> {
9191
if (reply.succeeded()) {
9292
JsonObject response = (JsonObject) reply.result().body();
93-
buildResponseFromReply(response, pRequestContext).end();
93+
HttpServerResponse resp = buildResponseFromReply(response, pRequestContext);
94+
resp.end();
9495
} else {
9596
((Meter) getMetrics().get(pServiceUniqueId + FAIL_SUFFIX)).mark();
9697
getLog().error(() -> "Error in reply:" + reply.cause().toString());
@@ -144,8 +145,10 @@ public HttpServerResponse buildResponseFromReply(JsonObject pReplyResponse, Rout
144145

145146
ResponseEvent response = pReplyResponse.mapTo(ResponseEvent.class);
146147

147-
if (!response.getHeaders().containsKey(HttpHeaders.CONTENT_TYPE)) {
148-
response.getHeaders().put(HttpHeaders.CONTENT_TYPE, MediaType.OCTET_STREAM.toString());
148+
Optional<String> contentType = Optional.ofNullable(response.headerValue(HttpHeaders.CONTENT_TYPE));
149+
150+
if (contentType.isPresent()) {
151+
response.addHeader(HttpHeaders.CONTENT_TYPE, MediaType.OCTET_STREAM);
149152
}
150153

151154
response
@@ -159,7 +162,7 @@ public HttpServerResponse buildResponseFromReply(JsonObject pReplyResponse, Rout
159162
pContext.response().setStatusCode(response.getStatusCode());
160163

161164
@SuppressWarnings("unchecked")
162-
Map<String,String>[] cookies = Optional.ofNullable(response.getCookies()).orElse(new Map[0]);
165+
Map<String, String>[] cookies = Optional.ofNullable(response.getCookies()).orElse(new Map[0]);
163166

164167
Arrays
165168
.stream(cookies)
@@ -184,9 +187,8 @@ public HttpServerResponse buildResponseFromReply(JsonObject pReplyResponse, Rout
184187

185188
body.ifPresent((t) -> {
186189
pContext.response().putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), Integer.toString(t.length));
187-
pContext.response().write(Buffer.newInstance(io.vertx.core.buffer.Buffer.buffer(t)));
190+
pContext.response().write(Buffer.buffer(t));
188191
});
189-
190192
return pContext.response();
191193
}
192194

@@ -210,7 +212,7 @@ public void setDeliveryOptions(DeliveryOptions pDeliveryOptions) {
210212
public Router getRouter(Router pRouter) {
211213

212214
OpenAPI3RouterFactory apiFactory = OpenAPI3RouterFactory
213-
.rxCreateRouterFactoryFromFile(getVertx(), getOpenAPI3File().getAbsolutePath())
215+
.rxCreate(getVertx(), getOpenAPI3File().getAbsolutePath())
214216
.blockingGet();
215217

216218
Optional

src/main/java/in/erail/security/SecurityTools.java

Lines changed: 21 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@
77
import in.erail.glue.annotation.StartService;
88
import io.reactivex.Single;
99
import io.vertx.reactivex.core.Vertx;
10-
import io.vertx.reactivex.core.shareddata.AsyncMap;
11-
import io.vertx.reactivex.core.shareddata.Lock;
1210
import java.security.InvalidAlgorithmParameterException;
1311
import java.security.InvalidKeyException;
1412
import java.security.NoSuchAlgorithmException;
1513
import java.security.SecureRandom;
1614
import java.util.Arrays;
1715
import java.util.Base64;
18-
import java.util.HashMap;
19-
import java.util.Map;
2016
import java.util.concurrent.CompletableFuture;
2117
import java.util.concurrent.ExecutionException;
2218
import javax.crypto.BadPaddingException;
@@ -47,63 +43,8 @@ public void startup() {
4743
return;
4844
}
4945

50-
Map<String, Object> cryptCtx = new HashMap<>();
51-
52-
Single<Lock> lock = getVertx()
53-
.sharedData()
54-
.rxGetLockWithTimeout("_in.erail.security", 5000);
55-
56-
getVertx()
57-
.sharedData()
58-
.<String, byte[]>rxGetClusterWideMap("_in.erail.security")
59-
.flatMap((m) -> {
60-
cryptCtx.put("map", m);
61-
return m.rxGet("key");
62-
})
63-
.map((k) -> {
64-
cryptCtx.put("key", k);
65-
return cryptCtx;
66-
})
67-
.flatMap((ctx) -> {
68-
if (ctx.get("key") == null) {
69-
return lock
70-
.map((l) -> {
71-
ctx.put("lock", l);
72-
return ctx;
73-
});
74-
}
75-
return Single.just(ctx);
76-
})
77-
.flatMap(ctx -> {
78-
if (ctx.get("lock") != null) {
79-
return ((AsyncMap<String, Object>) (ctx.get("map")))
80-
.rxGet("key")
81-
.map((k) -> {
82-
ctx.put("key", k);
83-
return ctx;
84-
});
85-
}
86-
return Single.just(ctx);
87-
})
88-
.flatMap((ctx) -> {
89-
if (ctx.get("key") == null) {
90-
KeyGenerator keygen = KeyGenerator.getInstance("AES");
91-
keygen.init(128);
92-
byte[] key = keygen.generateKey().getEncoded();
93-
return ((AsyncMap<String, Object>) (ctx.get("map")))
94-
.rxPut("key", key)
95-
.doOnComplete(() -> ctx.put("key", key))
96-
.toSingleDefault(ctx);
97-
}
98-
return Single.just(ctx);
99-
})
100-
.map(ctx -> (byte[]) ctx.get("key"))
101-
.doFinally(() -> {
102-
if (cryptCtx.containsKey("lock")) {
103-
Lock l = (Lock) cryptCtx.get("lock");
104-
l.release();
105-
}
106-
})
46+
generateKey()
47+
.flatMap(v -> addValueToClusterMap("key", v))
10748
.subscribe((key) -> {
10849
mKeySpec.complete(new SecretKeySpec(key, "AES"));
10950
String unique = Base64.getEncoder().encodeToString(Arrays.copyOfRange(key, 0, 5));
@@ -112,6 +53,25 @@ public void startup() {
11253
});
11354
}
11455

56+
protected Single<byte[]> addValueToClusterMap(String pKey, byte[] pValue) {
57+
return getVertx()
58+
.sharedData()
59+
.<String, byte[]>rxGetClusterWideMap("_in.erail.security")
60+
.flatMapMaybe(m -> m.rxPutIfAbsent(pKey, pValue))
61+
.toSingle(pValue);
62+
}
63+
64+
protected Single<byte[]> generateKey() {
65+
KeyGenerator keygen;
66+
try {
67+
keygen = KeyGenerator.getInstance("AES");
68+
} catch (NoSuchAlgorithmException ex) {
69+
return Single.error(ex);
70+
}
71+
keygen.init(128);
72+
return Single.just(keygen.generateKey().getEncoded());
73+
}
74+
11575
/**
11676
* Unique string across cluster. Changes on each restart of cluster.
11777
*

src/main/java/in/erail/service/SingletonServiceImpl.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import in.erail.glue.annotation.StartService;
44
import io.reactivex.Completable;
5+
import io.reactivex.Single;
56
import io.reactivex.schedulers.Schedulers;
67
import io.vertx.core.spi.cluster.ClusterManager;
78
import io.vertx.core.spi.cluster.NodeListener;
@@ -28,17 +29,8 @@ public void start() {
2829
return;
2930
}
3031

31-
getVertx()
32-
.sharedData()
33-
.<String, String>rxGetClusterWideMap(getServiceMapName())
32+
allowServiceToStart()
3433
.subscribeOn(Schedulers.io())
35-
.flatMap((m) -> m.rxPutIfAbsent(getServiceName(), getClusterManager().getNodeID()))
36-
.map((ownerNodeId) -> {
37-
if (ownerNodeId == null) {
38-
return true;
39-
}
40-
return getClusterManager().getNodeID().equals(ownerNodeId);
41-
})
4234
.flatMapCompletable((success) -> {
4335
if (success) {
4436
getLog().info(String.format("Starting Service:[%s]", getServiceName()));
@@ -50,6 +42,20 @@ public void start() {
5042
.blockingAwait();
5143
}
5244

45+
protected Single<Boolean> allowServiceToStart() {
46+
final String serviceName = getServiceName();
47+
final String value = getClusterManager().getNodeID();
48+
49+
return getVertx()
50+
.sharedData()
51+
.<String, String>rxGetClusterWideMap(getServiceMapName())
52+
.flatMapMaybe(m -> m.rxPutIfAbsent(serviceName, value))
53+
.toSingle(value)
54+
.doOnSuccess(serviceOwnerId -> getLog().debug(() -> "Service Owner ID:" + serviceOwnerId + ", This Node ID:" + value))
55+
.map(serviceOwnerId -> getClusterManager().getNodeID().equals(serviceOwnerId))
56+
.doOnSuccess(t -> getLog().debug(() -> "Service Start Decision:" + getServiceName() + ":" + t));
57+
}
58+
5359
@Override
5460
public void nodeAdded(String pNodeID) {
5561
}

src/main/java/io/vertx/reactivex/ext/web/handler/sockjs/processor/SetSubscriberCountHeaderProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public Single<BridgeEventContext> process(Single<BridgeEventContext> pContext) {
5454
.map((count) -> setHeader(count, ctx))
5555
.doOnError((err) -> {
5656
getLog().error(String.format("[%s] Error getting value for Key[%s] from redis", ctx.getId(), ctx.getAddressKey()), err);
57-
});
58-
57+
})
58+
.toSingle(ctx);
5959
});
6060
}
6161

src/test/java/in/erail/service/SingletonServiceImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void testStart(TestContext context) throws InterruptedException, Executio
4444
vertx
4545
.sharedData()
4646
.<String, String>rxGetClusterWideMap("__in.erail.services")
47-
.flatMap((m) -> {
47+
.flatMapMaybe((m) -> {
4848
return m.rxGet("DummySingletonService");
4949
})
5050
.doOnSuccess((nodeId) -> {
@@ -59,7 +59,7 @@ public void testStart(TestContext context) throws InterruptedException, Executio
5959
vertx
6060
.sharedData()
6161
.<String, String>rxGetClusterWideMap("__in.erail.services")
62-
.flatMap(m2 -> {
62+
.flatMapMaybe(m2 -> {
6363
return m2.rxGet("DummySingletonService");
6464
})
6565
.subscribe((updatedNodeId) -> {

0 commit comments

Comments
 (0)