diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 75b2ecd1..767de5b3 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -807,6 +807,14 @@ public Map getProperties() { ? ImmutableMap.of(Constants.WRITETIME, msg.getProperty(Constants.WRITETIME)) : ImmutableMap.of(); } + + @Override + public Optional getEventTime() { + if(msg.hasProperty(Constants.WRITETIME) && msg.getProperty(Constants.WRITETIME) != null){ + return Optional.of(Long.parseLong(msg.getProperty(Constants.WRITETIME))); + } + return Optional.empty(); + } } @RequiredArgsConstructor @@ -862,5 +870,10 @@ public Schema getValueSchema() { public KeyValueEncodingType getKeyValueEncodingType() { throw new UnsupportedOperationException(); } + + @Override + public Optional getEventTime() { + return kvRecord.getEventTime(); + } } }