Skip to content
Closed
4 changes: 2 additions & 2 deletions db-client-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ dependencies {
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation "org.slf4j:slf4j-api:2.0.17"
implementation "org.bouncycastle:bcprov-jdk18on:1.80"
implementation "org.bouncycastle:bcpkix-jdk18on:1.80"

implementation platform("io.opentelemetry:opentelemetry-bom:${openTelemetryVersion}")
implementation "io.opentelemetry:opentelemetry-api"
implementation "io.opentelemetry.semconv:opentelemetry-semconv:${openTelemetrySemConvVersion}"
Expand All @@ -64,7 +65,6 @@ dependencies {
testImplementation "org.reactivestreams:reactive-streams-tck:${reactiveStreamsApiVersion}"
testImplementation "org.testcontainers:testcontainers:${testcontainersVersion}"
testImplementation platform("com.fasterxml.jackson:jackson-bom:${jacksonVersion}")
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation "com.github.javafaker:javafaker:1.0.2"
testImplementation 'org.slf4j:slf4j-simple:2.0.17'
testImplementation "io.opentelemetry:opentelemetry-sdk"
Expand Down
18 changes: 11 additions & 7 deletions db-client-java/src/main/java/io/kurrent/dbclient/AbstractRead.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ abstract class AbstractRead implements Publisher<ReadMessage> {
protected static final StreamsOuterClass.ReadReq.Options.Builder defaultReadOptions;

private final GrpcClient client;
private final OptionsWithBackPressure<?> options;
private final OptionsWithBackPressureAndSerialization<?> options;

protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {
protected AbstractRead(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
this.client = client;
this.options = options;
}
Expand All @@ -27,13 +27,17 @@ protected AbstractRead(GrpcClient client, OptionsWithBackPressure<?> options) {

@Override
public void subscribe(Subscriber<? super ReadMessage> subscriber) {
ReadResponseObserver observer = new ReadResponseObserver(options, new ReadStreamConsumer(subscriber));
ReadResponseObserver observer = new ReadResponseObserver(
options,
new ReadStreamConsumer(subscriber),
this.client.getSerializer(options.serializationSettings().orElse(null))
);

this.client.getWorkItemArgs().whenComplete((args, error) -> {
if (error != null) {
observer.onError(error);
return;
}
if (error != null) {
observer.onError(error);
return;
}

StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ abstract class AbstractRegularSubscription {
protected SubscriptionListener listener;
protected Checkpointer checkpointer = null;
private final GrpcClient client;
private final OptionsWithBackPressure<?> options;
private final OptionsWithBackPressureAndSerialization<?> options;

protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure<?> options) {
protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressureAndSerialization<?> options) {
this.client = client;
this.options = options;
}
Expand Down Expand Up @@ -72,6 +72,10 @@ private ReadResponseObserver createObserver(WorkItemArgs args, CompletableFuture
event);
});

return new ReadResponseObserver(this.options, consumer);
return new ReadResponseObserver(
this.options,
consumer,
this.client.getSerializer(options.serializationSettings().orElse(null))
);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.kurrent.dbclient;

import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
import io.kurrent.dbclient.proto.shared.Shared;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.kurrent.dbclient.proto.persistentsubscriptions.Persistent;
import io.kurrent.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.serialization.MessageSerializer;

import java.util.concurrent.CompletableFuture;

Expand All @@ -18,20 +19,26 @@ abstract class AbstractSubscribePersistentSubscription {
private final String group;
private final PersistentSubscriptionListener listener;
private final SubscribePersistentSubscriptionOptions options;
private final MessageSerializer messageSerializer;

static {
defaultReadOptions = Persistent.ReadReq.Options.newBuilder()
.setUuidOption(Persistent.ReadReq.Options.UUIDOption.newBuilder()
.setStructured(Shared.Empty.getDefaultInstance()));
}

public AbstractSubscribePersistentSubscription(GrpcClient client, String group,
SubscribePersistentSubscriptionOptions options,
PersistentSubscriptionListener listener) {
public AbstractSubscribePersistentSubscription(
GrpcClient client,
String group,
SubscribePersistentSubscriptionOptions options,
PersistentSubscriptionListener listener,
MessageSerializer messageSerializer
) {
this.client = client;
this.group = group;
this.options = options;
this.listener = listener;
this.messageSerializer = messageSerializer;
}

protected abstract Persistent.ReadReq.Options.Builder createOptions();
Expand Down Expand Up @@ -91,7 +98,7 @@ public void onNext(Persistent.ReadResp readResp) {
int retryCount = readResp.getEvent().hasNoRetryCount() ? 0 : readResp.getEvent().getRetryCount();

try {
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent(), messageSerializer);
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, retryCount, resolvedEvent),
_subscription.getSubscriptionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
class AppendToStream {
private final GrpcClient client;
private final String streamName;
private final List<EventData> events;
private final StreamState streamState;
private final List<MessageData> events;
private final AppendToStreamOptions options;

public AppendToStream(GrpcClient client, String streamName, Iterator<EventData> events, AppendToStreamOptions options) {
public AppendToStream(
GrpcClient client,
String streamName,
StreamState streamState,
Iterator<MessageData> events,
AppendToStreamOptions options
) {
this.client = client;
this.streamName = streamName;
this.streamState = streamState;
this.events = new ArrayList<>();
while (events.hasNext()) {
this.events.add(events.next());
Expand All @@ -40,9 +48,9 @@ public CompletableFuture<WriteResult> execute() {
this.options.getCredentials()));
}

private CompletableFuture<WriteResult> append(ManagedChannel channel, List<EventData> events) {
private CompletableFuture<WriteResult> append(ManagedChannel channel, List<MessageData> events) {
CompletableFuture<WriteResult> result = new CompletableFuture<>();
StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
StreamsOuterClass.AppendReq.Options.Builder options = this.streamState.applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
.setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
.setStreamName(ByteString.copyFromUtf8(streamName))
.build()));
Expand Down Expand Up @@ -93,18 +101,18 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
try {
requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder().setOptions(options).build());

for (EventData e : events) {
for (MessageData e : events) {
StreamsOuterClass.AppendReq.ProposedMessage.Builder msgBuilder = StreamsOuterClass.AppendReq.ProposedMessage.newBuilder()
.setId(Shared.UUID.newBuilder()
.setStructured(Shared.UUID.Structured.newBuilder()
.setMostSignificantBits(e.getEventId().getMostSignificantBits())
.setLeastSignificantBits(e.getEventId().getLeastSignificantBits())))
.setData(ByteString.copyFrom(e.getEventData()))
.setMostSignificantBits(e.getMessageId().getMostSignificantBits())
.setLeastSignificantBits(e.getMessageId().getLeastSignificantBits())))
.setData(ByteString.copyFrom(e.getMessageData()))
.putMetadata(SystemMetadataKeys.CONTENT_TYPE, e.getContentType())
.putMetadata(SystemMetadataKeys.TYPE, e.getEventType());
.putMetadata(SystemMetadataKeys.TYPE, e.getMessageType());

if (e.getUserMetadata() != null) {
msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getUserMetadata()));
if (e.getMessageMetadata() != null) {
msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getMessageMetadata()));
}

requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder()
Expand All @@ -117,7 +125,7 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Event
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));

if (leaderHost != null && leaderPort != null) {
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.parseInt(leaderPort));
result.completeExceptionally(reason);
} else {
result.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
package io.kurrent.dbclient;

import io.kurrent.dbclient.serialization.OperationSerializationSettings;

import java.util.Optional;

/**
* Options of the append stream request.
*/
public class AppendToStreamOptions extends OptionsWithStreamStateBase<AppendToStreamOptions> {
private OperationSerializationSettings serializationSettings = null;

private AppendToStreamOptions() {
}

/**
* Returns optional serialization settings
*/
public Optional<OperationSerializationSettings> serializationSettings() {
return Optional.ofNullable(this.serializationSettings);
}

/**
* Allows to customize or disable the automatic deserialization
*
* @param serializationSettings - expected revision.
* @return updated options.
*/
public AppendToStreamOptions serializationSettings(OperationSerializationSettings serializationSettings) {
this.serializationSettings = serializationSettings;
return this;
}

/**
* Returns options with default values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ private static Tracer getTracer() {
ClientTelemetry.class.getPackage().getImplementationVersion());
}

private static List<EventData> tryInjectTracingContext(Span span, List<EventData> events) {
List<EventData> injectedEvents = new ArrayList<>();
for (EventData event : events) {
private static List<MessageData> tryInjectTracingContext(Span span, List<MessageData> events) {
List<MessageData> injectedEvents = new ArrayList<>();
for (MessageData event : events) {
boolean isJsonEvent = Objects.equals(event.getContentType(), ContentType.JSON);

injectedEvents.add(EventDataBuilder
.binary(event.getEventId(), event.getEventType(), event.getEventData(), isJsonEvent)
.metadataAsBytes(tryInjectTracingContext(span, event.getUserMetadata()))
.build());
injectedEvents.add(
MessageDataBuilder
.with(event.getMessageType(), event.getMessageData(), tryInjectTracingContext(span, event.getMessageMetadata()), event.getMessageId(), isJsonEvent)
.build());
}
return injectedEvents;
}
Expand Down Expand Up @@ -85,9 +85,9 @@ private static SpanContext tryExtractTracingContext(byte[] userMetadataBytes) {
}

static CompletableFuture<WriteResult> traceAppend(
BiFunction<ManagedChannel, List<EventData>, CompletableFuture<WriteResult>> appendOperation,
BiFunction<ManagedChannel, List<MessageData>, CompletableFuture<WriteResult>> appendOperation,
ManagedChannel channel,
List<EventData> events, String streamId, KurrentDBClientSettings settings,
List<MessageData> events, String streamId, KurrentDBClientSettings settings,
UserCredentials optionalCallCredentials) {
Span span = createSpan(
ClientTelemetryConstants.Operations.APPEND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import io.grpc.ClientInterceptor;
import io.kurrent.dbclient.serialization.KurrentDBClientSerializationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -36,6 +37,7 @@ public class ConnectionSettingsBuilder {
private List<ClientInterceptor> _interceptors = new ArrayList<>();
private String _tlsCaFile = null;
private Set<String> _features = new HashSet<>();
private KurrentDBClientSerializationSettings _serializationSettings;

ConnectionSettingsBuilder() {}

Expand All @@ -60,7 +62,9 @@ public KurrentDBClientSettings buildConnectionSettings() {
_defaultDeadline,
_interceptors,
_tlsCaFile,
_features);
_features,
_serializationSettings
);
}

/**
Expand Down Expand Up @@ -241,6 +245,15 @@ public ConnectionSettingsBuilder feature(String feature) {
return this;
}

/**
* Provides configuration options for messages serialization and deserialization in the KurrentDB client.
* If null, default settings are used.
*/
public ConnectionSettingsBuilder serialization(KurrentDBClientSerializationSettings serializationSettings) {
this._serializationSettings = serializationSettings;
return this;
}

void parseGossipSeed(String host) {
String[] hostParts = host.split(":");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static <A> EventDataBuilder builderAsJson(String eventType, A eventData)
* @return an event data builder.
* @param <A> a type that can be serialized in JSON.
*/
@Deprecated
public static <A> EventDataBuilder builderAsJson(UUID eventId, String eventType, A eventData) {
return EventDataBuilder.json(eventId, eventType, eventData);
}
Expand Down Expand Up @@ -120,5 +121,9 @@ public static EventDataBuilder builderAsBinary(String eventType, byte[] eventDat
public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, byte[] eventData) {
return EventDataBuilder.binary(eventId, eventType, eventData);
}

public MessageData toMessageData() {
return new MessageData(eventType, eventData, userMetadata, eventId, contentType);
}
}

12 changes: 11 additions & 1 deletion db-client-java/src/main/java/io/kurrent/dbclient/GrpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.kurrent.dbclient.serialization.MessageSerializer;
import io.kurrent.dbclient.serialization.MessageSerializerBuilder;
import io.kurrent.dbclient.serialization.OperationSerializationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,11 +22,14 @@ class GrpcClient {
private final AtomicBoolean closed;
private final LinkedBlockingQueue<Msg> queue;
private final KurrentDBClientSettings settings;
private final MessageSerializer serializer;

GrpcClient(KurrentDBClientSettings settings, AtomicBoolean closed, LinkedBlockingQueue<Msg> queue) {
this.settings = settings;
this.closed = closed;
this.queue = queue;

this.serializer = MessageSerializerBuilder.get(settings.getSerializationSettings());
}

public boolean isShutdown() {
Expand Down Expand Up @@ -101,7 +107,7 @@ public <A> CompletableFuture<A> runWithArgs(Function<WorkItemArgs, CompletableFu
logger.debug("RunWorkItem[{}] completed exceptionally: {}", args.getId(), e.toString());

if (e instanceof RuntimeException)
throw (RuntimeException)e;
throw (RuntimeException) e;
else
throw new RuntimeException(e);
}
Expand All @@ -120,4 +126,8 @@ public CompletableFuture<Void> shutdown() {
public KurrentDBClientSettings getSettings() {
return this.settings;
}

public MessageSerializer getSerializer(OperationSerializationSettings serializationSettings) {
return this.serializer.with(serializationSettings);
}
}
Loading