diff --git a/java/com/engflow/notificationqueue/Client.java b/java/com/engflow/notificationqueue/Client.java index a0e5b99e..fdae5f52 100644 --- a/java/com/engflow/notificationqueue/Client.java +++ b/java/com/engflow/notificationqueue/Client.java @@ -22,6 +22,8 @@ import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; import io.netty.handler.ssl.SslContextBuilder; @@ -121,71 +123,79 @@ private static void pull( asyncStub = asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header)); final CountDownLatch finishLatch = new CountDownLatch(1); System.out.println("Listening for build events..."); - StreamObserver requestObserver = - asyncStub.pull( - new StreamObserver() { - @Override - public void onNext(PullNotificationResponse response) { - Notification streamedNotification = response.getNotification().getNotification(); - System.out.println("Notification: " + streamedNotification.toString()); - try { - /** Forward notification data to external server */ - forwardToBESStub( - forwardChannel, - streamedNotification.getId().toString(), - streamedNotification.getPayload().toString()); - } catch (Exception e) { - System.err.println("Could not forward notification to external sever..."); - } - Any notificationContent = streamedNotification.getPayload(); + var observer = + new ClientResponseObserver() { + private ClientCallStreamObserver requestStream; + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + this.requestStream = requestStream; + } + + @Override + public void onNext(PullNotificationResponse response) { + if (!response.hasNotification()) { + return; + } + Notification streamedNotification = response.getNotification().getNotification(); + System.out.println("Notification: " + streamedNotification.toString()); + try { + /** Forward notification data to external server */ + forwardToBESStub( + forwardChannel, + streamedNotification.getId().toString(), + streamedNotification.getPayload().toString()); + } catch (Exception e) { + System.err.println("Could not forward notification to external sever..."); + } + Any notificationContent = streamedNotification.getPayload(); + try { + BuildLifecycleEventNotification lifeCycleEvent = + notificationContent.unpack(BuildLifecycleEventNotification.class); + /** + * Check if this is an invocation started event. Options are INVOCATION_STARTED and + * INVOCATION_FINISHED + */ + if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) { + String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId(); try { - BuildLifecycleEventNotification lifeCycleEvent = - notificationContent.unpack(BuildLifecycleEventNotification.class); /** - * Check if this is an invocation started event. Options are INVOCATION_STARTED - * and INVOCATION_FINISHED + * Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the + * acquired invocation id */ - if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) { - String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId(); - try { - /** - * Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the - * acquired invocation id - */ - getInvocations(channel, invocation, header, forwardChannel); - } catch (InterruptedException e) { - System.err.println("Could not get invocation with uuid " + invocation); - } - } - - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); + getInvocations(channel, invocation, header, forwardChannel); + } catch (InterruptedException e) { + System.err.println("Could not get invocation with uuid " + invocation); } } - @Override - public void onError(Throwable t) { - System.err.println("Error on request: " + t.getMessage()); - finishLatch.countDown(); - } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + requestStream.onNext( + PullNotificationRequest.newBuilder() + .addAcknowledgementTokens(response.getNotification().getToken()) + .build()); + } - @Override - public void onCompleted() { - System.out.println("Finished pulling notifications"); - finishLatch.countDown(); - } - }); + @Override + public void onError(Throwable t) { + System.err.println("Error on request: " + t.getMessage()); + finishLatch.countDown(); + } - try { - requestObserver.onNext( - PullNotificationRequest.newBuilder() - .setQueue(QueueId.newBuilder().setName(queueName).build()) - .build()); - } catch (RuntimeException e) { - // Cancel RPC - requestObserver.onError(e); - throw e; - } + @Override + public void onCompleted() { + System.out.println("Finished pulling notifications"); + finishLatch.countDown(); + } + }; + asyncStub.pull(observer); + + observer.requestStream.onNext( + PullNotificationRequest.newBuilder() + .setQueue(QueueId.newBuilder().setName(queueName).build()) + .build()); finishLatch.await(); }