Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 67 additions & 57 deletions java/com/engflow/notificationqueue/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.SslContextBuilder;
Expand Down Expand Up @@ -121,71 +123,79 @@ private static void pull(
asyncStub = asyncStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
final CountDownLatch finishLatch = new CountDownLatch(1);
System.out.println("Listening for build events...");
StreamObserver<PullNotificationRequest> requestObserver =
asyncStub.pull(
new StreamObserver<PullNotificationResponse>() {
@Override
public void onNext(PullNotificationResponse response) {
Notification streamedNotification = response.getNotification().getNotification();
System.out.println("Notification: " + streamedNotification.toString());
try {
/** Forward notification data to external server */
forwardToBESStub(
forwardChannel,
streamedNotification.getId().toString(),
streamedNotification.getPayload().toString());
} catch (Exception e) {
System.err.println("Could not forward notification to external sever...");
}
Any notificationContent = streamedNotification.getPayload();
var observer =
new ClientResponseObserver<PullNotificationRequest, PullNotificationResponse>() {
private ClientCallStreamObserver<PullNotificationRequest> requestStream;

@Override
public void beforeStart(ClientCallStreamObserver<PullNotificationRequest> requestStream) {
this.requestStream = requestStream;
}

@Override
public void onNext(PullNotificationResponse response) {
if (!response.hasNotification()) {
return;
}
Notification streamedNotification = response.getNotification().getNotification();
System.out.println("Notification: " + streamedNotification.toString());
try {
/** Forward notification data to external server */
forwardToBESStub(
forwardChannel,
streamedNotification.getId().toString(),
streamedNotification.getPayload().toString());
} catch (Exception e) {
System.err.println("Could not forward notification to external sever...");
}
Any notificationContent = streamedNotification.getPayload();
try {
BuildLifecycleEventNotification lifeCycleEvent =
notificationContent.unpack(BuildLifecycleEventNotification.class);
/**
* Check if this is an invocation started event. Options are INVOCATION_STARTED and
* INVOCATION_FINISHED
*/
if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) {
String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId();
try {
BuildLifecycleEventNotification lifeCycleEvent =
notificationContent.unpack(BuildLifecycleEventNotification.class);
/**
* Check if this is an invocation started event. Options are INVOCATION_STARTED
* and INVOCATION_FINISHED
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
* acquired invocation id
*/
if (lifeCycleEvent.getKindCase().name().equals("INVOCATION_STARTED")) {
String invocation = lifeCycleEvent.getInvocationStarted().getInvocationId();
try {
/**
* Fetch the invocation using the grpc {@link EventStoreGrpc} stub using the
* acquired invocation id
*/
getInvocations(channel, invocation, header, forwardChannel);
} catch (InterruptedException e) {
System.err.println("Could not get invocation with uuid " + invocation);
}
}

} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
getInvocations(channel, invocation, header, forwardChannel);
} catch (InterruptedException e) {
System.err.println("Could not get invocation with uuid " + invocation);
}
}

@Override
public void onError(Throwable t) {
System.err.println("Error on request: " + t.getMessage());
finishLatch.countDown();
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
requestStream.onNext(
PullNotificationRequest.newBuilder()
.addAcknowledgementTokens(response.getNotification().getToken())
.build());
}

@Override
public void onCompleted() {
System.out.println("Finished pulling notifications");
finishLatch.countDown();
}
});
@Override
public void onError(Throwable t) {
System.err.println("Error on request: " + t.getMessage());
finishLatch.countDown();
}

try {
requestObserver.onNext(
PullNotificationRequest.newBuilder()
.setQueue(QueueId.newBuilder().setName(queueName).build())
.build());
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
@Override
public void onCompleted() {
System.out.println("Finished pulling notifications");
finishLatch.countDown();
}
};
asyncStub.pull(observer);

observer.requestStream.onNext(
PullNotificationRequest.newBuilder()
.setQueue(QueueId.newBuilder().setName(queueName).build())
.build());

finishLatch.await();
}
Expand Down