66import org .apache .logging .log4j .Logger ;
77import in .erail .glue .annotation .StartService ;
88import in .erail .model .RequestEvent ;
9+ import in .erail .model .ResponseEvent ;
910import io .reactivex .Scheduler ;
1011import io .reactivex .Single ;
1112import io .vertx .reactivex .core .eventbus .Message ;
13+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
1214
1315/**
1416 *
@@ -34,15 +36,19 @@ public void start() {
3436 .doOnSubscribe ((s ) -> getLog ().info (() -> String .format ("%s[%s] service started" , getServiceUniqueId (), Thread .currentThread ().getName ())))
3537 .doOnTerminate (() -> getLog ().info (() -> String .format ("%s[%s] service stopped" , getServiceUniqueId (), Thread .currentThread ().getName ())))
3638 .flatMapSingle (this ::handleRequest )
37- .subscribe ((resp ) -> getLog ().trace (() -> resp .toString ()), err -> getLog ().error (() -> String .format ("Process exception:[%s],Error:[%s]" , getServiceUniqueId (), err )));
39+ .subscribe (
40+ resp -> getLog ().trace (() -> resp .toString ()),
41+ err -> getLog ().error (() -> String .format ("Process exception:[%s],Error:[%s]" , getServiceUniqueId (), ExceptionUtils .getStackTrace (err )))
42+ );
3843 }
3944 }
4045
4146 public Single <JsonObject > handleRequest (Message <JsonObject > pMessage ) {
4247 return Single
4348 .just (pMessage )
4449 .map (m -> pMessage .body ().mapTo (RequestEvent .class ))
45- .map (req -> process (req ))
50+ .flatMapMaybe (req -> process (req ))
51+ .toSingle (new ResponseEvent ())
4652 .map (resp -> JsonObject .mapFrom (resp ))
4753 .doOnSuccess (resp -> pMessage .reply (resp ));
4854 }
0 commit comments