Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@
</developer>
</developers>
<dependencies>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.30</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.2.30</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
Expand All @@ -27,6 +29,7 @@
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.ByteRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.JsonRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.StringRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsUtil;

/**
* This class is used to write gzipped rolling files.
Expand Down Expand Up @@ -58,6 +61,13 @@ public class FileWriter implements Closeable {
private boolean stopped = false;
private boolean isDlqEnabled = false;

private final Counter fileCountOnStage;
private final Counter fileCountPurged;
private final Counter bufferSizeBytes;
private final Counter bufferRecordCount;
private final Counter flushedOffset;
private final Counter purgedOffset;

/**
* @param basePath - This is path to which to write the files to.
* @param fileThreshold - Max size, uncompressed bytes.
Expand All @@ -73,7 +83,9 @@ public FileWriter(String basePath,
ReentrantReadWriteLock reentrantLock,
IngestionProperties.DataFormat format,
BehaviorOnError behaviorOnError,
boolean isDlqEnabled) {
boolean isDlqEnabled,
String tpname,
MetricRegistry metricRegistry) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
Expand All @@ -86,6 +98,12 @@ public FileWriter(String basePath,
// If we failed on flush we want to throw the error from the put() flow.
flushError = null;
this.format = format;
this.fileCountOnStage = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_ON_STAGE));
this.fileCountPurged = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_PURGED));
this.bufferSizeBytes = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.BUFFER_SUB_DOMAIN, KustoKafkaMetricsUtil.BUFFER_SIZE_BYTES));
this.bufferRecordCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.BUFFER_SUB_DOMAIN, KustoKafkaMetricsUtil.BUFFER_RECORD_COUNT));
this.flushedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.FLUSHED_OFFSET));
this.purgedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tpname, KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.PURGED_OFFSET));
}

boolean isDirty() {
Expand Down Expand Up @@ -153,6 +171,7 @@ public void openFile(@Nullable Long offset) throws IOException {
countingStream = new CountingOutputStream(new GZIPOutputStream(fos));
outputStream = countingStream.getOutputStream();
recordWriter = recordWriterProvider.getRecordWriter(currentFile.path, countingStream);
fileCountOnStage.inc();
}

void rotate(@Nullable Long offset) throws IOException, DataException {
Expand Down Expand Up @@ -211,6 +230,10 @@ private void dumpFile() throws IOException {
boolean deleted = temp.file.delete();
if (!deleted) {
log.warn("Couldn't delete temporary file. File exists: {}", temp.file.exists());
}else {
fileCountPurged.inc();
purgedOffset.inc(flushedOffset.getCount());
fileCountOnStage.dec();
}
}
}
Expand Down Expand Up @@ -297,6 +320,12 @@ public void writeData(SinkRecord sinkRecord) throws IOException, DataException {
}
currentFile.rawBytes = countingStream.numBytes;
currentFile.numRecords++;
bufferSizeBytes.dec(bufferSizeBytes.getCount()); // Reset the counter to zero
bufferSizeBytes.inc(currentFile.rawBytes); // Set the counter to the current size
bufferRecordCount.dec(bufferRecordCount.getCount()); // Reset the counter to zero
bufferRecordCount.inc(currentFile.numRecords); // Set the counter to the current number of records
flushedOffset.inc();

if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes) {
rotate(sinkRecord.kafkaOffset());
resetFlushTimer(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand All @@ -25,6 +26,7 @@
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.WorkloadIdentityCredential;
import com.azure.identity.WorkloadIdentityCredentialBuilder;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
Expand All @@ -35,6 +37,8 @@
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsJmxReporter;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsUtil;

/**
* Kusto sink uses file system to buffer records.
Expand Down Expand Up @@ -63,6 +67,8 @@ public class KustoSinkTask extends SinkTask {
private boolean isDlqEnabled;
private String dlqTopicName;
private Producer<byte[], byte[]> dlqProducer;
private MetricRegistry metricRegistry;
private KustoKafkaMetricsJmxReporter jmxReporter;

public KustoSinkTask() {
assignment = new HashSet<>();
Expand Down Expand Up @@ -371,7 +377,7 @@ public void open(Collection<TopicPartition> partitions) {
} else {
IngestClient client = ingestionProps.streaming ? streamingIngestClient : kustoIngestClient;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, client, ingestionProps, config, isDlqEnabled,
dlqTopicName, dlqProducer);
dlqTopicName, dlqProducer, metricRegistry);
writer.open();
writers.put(tp, writer);
}
Expand Down Expand Up @@ -435,6 +441,22 @@ public void start(Map<String, String> props) {
if (context != null) {
open(context.assignment());
}

// Initialize metricRegistry and JmxReporter
metricRegistry = new MetricRegistry();
jmxReporter = new KustoKafkaMetricsJmxReporter(metricRegistry, "KustoSinkConnector");
jmxReporter.start();
log.info("JmxReporter started for KustoSinkConnector");

// Register metrics
if (context != null) {
for (TopicPartition tp : context.assignment()) {
metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(
tp.topic(), KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.COMMITTED_OFFSET));
metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(
tp.topic(), KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.KAFKA_LAG.getMetricName()));
}
}
}

@Override
Expand All @@ -455,6 +477,12 @@ public void stop() {
} catch (IOException e) {
log.error("Error closing kusto client", e);
}
// Unregister metrics
if (jmxReporter != null) {
jmxReporter.removeMetricsFromRegistry("KustoSinkConnector");
jmxReporter.stop();
}

}

@Override
Expand All @@ -477,6 +505,13 @@ public void put(Collection<SinkRecord> records) {
} else {
writer.writeRecord(sinkRecord);
}
Long timestamp = sinkRecord.timestamp();
if (timestamp != null) {
long kafkaLagValue = System.currentTimeMillis() - timestamp;
metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(
sinkRecord.topic(), KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.KAFKA_LAG.getMetricName()))
.update(kafkaLagValue, TimeUnit.MILLISECONDS);
}
}
if (lastRecord != null) {
log.debug("Last record offset: {}", lastRecord.kafkaOffset());
Expand All @@ -503,6 +538,8 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset,
tp, offsets.get(tp));
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(
tp.topic(), KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.COMMITTED_OFFSET)).inc();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient;
Expand All @@ -33,6 +36,7 @@
import com.microsoft.azure.kusto.ingest.result.IngestionStatusResult;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.kusto.kafka.connect.sink.metrics.KustoKafkaMetricsUtil;

public class TopicPartitionWriter {

Expand All @@ -56,9 +60,22 @@ public class TopicPartitionWriter {
long currentOffset;
Long lastCommittedOffset;
private final ReentrantReadWriteLock reentrantReadWriteLock;
private final MetricRegistry metricRegistry;

private final Counter fileCountOnIngestion;
private final Counter fileCountTableStageIngestionFail;
private final Counter dlqRecordCount;
private final Timer commitLag;
private final Timer ingestionLag;
private final Counter ingestionErrorCount;
private final Counter ingestionSuccessCount;
private final Counter processedOffset;
private final Counter committedOffset;
private long writeTime;


TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps,
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer) {
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer, MetricRegistry metricRegistry) {
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps;
Expand All @@ -73,6 +90,16 @@ public class TopicPartitionWriter {
this.isDlqEnabled = isDlqEnabled;
this.dlqTopicName = dlqTopicName;
this.dlqProducer = dlqProducer;
this.metricRegistry = metricRegistry;
this.fileCountOnIngestion = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_ON_INGESTION));
this.fileCountTableStageIngestionFail = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.FILE_COUNT_SUB_DOMAIN, KustoKafkaMetricsUtil.FILE_COUNT_TABLE_STAGE_INGESTION_FAIL));
this.dlqRecordCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.DLQ_SUB_DOMAIN, KustoKafkaMetricsUtil.DLQ_RECORD_COUNT));
this.commitLag = metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.COMMIT_LAG.getMetricName()));
this.ingestionLag = metricRegistry.timer(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.LATENCY_SUB_DOMAIN, KustoKafkaMetricsUtil.EventType.INGESTION_LAG.getMetricName()));
this.ingestionErrorCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.DLQ_SUB_DOMAIN, KustoKafkaMetricsUtil.INGESTION_ERROR_COUNT));
this.ingestionSuccessCount = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.DLQ_SUB_DOMAIN, KustoKafkaMetricsUtil.INGESTION_SUCCESS_COUNT));
this.processedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.PROCESSED_OFFSET));
this.committedOffset = metricRegistry.counter(KustoKafkaMetricsUtil.constructMetricName(tp.topic(), KustoKafkaMetricsUtil.OFFSET_SUB_DOMAIN, KustoKafkaMetricsUtil.COMMITTED_OFFSET));
}

static String getTempDirectoryName(String tempDirPath) {
Expand All @@ -90,6 +117,10 @@ public void handleRollFile(SourceFile fileDescriptor) {
* into DLQ topic. Recommendation is to set the following worker configuration as `connector.client.config.override.policy=All` and set the
* `consumer.override.max.poll.interval.ms` config to a high enough value to avoid consumer leaving the group while the Connector is retrying.
*/
fileCountOnIngestion.inc();
long uploadStartTime = System.currentTimeMillis(); // Record the start time of file upload
commitLag.update(uploadStartTime - writeTime, TimeUnit.MILLISECONDS);

for (int retryAttempts = 0; true; retryAttempts++) {
try {
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProps.ingestionProperties);
Expand All @@ -112,8 +143,16 @@ public void handleRollFile(SourceFile fileDescriptor) {
log.info("Kusto ingestion: file ({}) of size ({}) at current offset ({}) with status ({})",
fileDescriptor.path, fileDescriptor.rawBytes, currentOffset,ingestionStatus);
this.lastCommittedOffset = currentOffset;
committedOffset.inc(); // Increment the committed offset counter
fileCountOnIngestion.dec();
long ingestionEndTime = System.currentTimeMillis(); // Record the end time of ingestion
ingestionLag.update(ingestionEndTime - uploadStartTime, TimeUnit.MILLISECONDS); // Update ingestion-lag
ingestionSuccessCount.inc();
return;
} catch (IngestionServiceException exception) {
fileCountTableStageIngestionFail.inc();
ingestionErrorCount.inc();
fileCountOnIngestion.dec();
if (ingestionProps.streaming) {
Throwable innerException = exception.getCause();
if (innerException instanceof KustoDataExceptionBase &&
Expand All @@ -125,6 +164,9 @@ public void handleRollFile(SourceFile fileDescriptor) {
// retrying transient exceptions
backOffForRemainingAttempts(retryAttempts, exception, fileDescriptor);
} catch (IngestionClientException | URISyntaxException exception) {
fileCountTableStageIngestionFail.inc();
ingestionErrorCount.inc();
fileCountOnIngestion.dec();
throw new ConnectException(exception);
}
}
Expand Down Expand Up @@ -198,6 +240,7 @@ public void sendFailedRecordToDlq(SinkRecord sinkRecord) {
exception);
}
});
dlqRecordCount.inc();
} catch (IllegalStateException e) {
log.error("Failed to write records to miscellaneous dead-letter queue topic, "
+ "kafka producer has already been closed. Exception={0}", e);
Expand All @@ -218,6 +261,9 @@ void writeRecord(SinkRecord sinkRecord) throws ConnectException {
try (AutoCloseableLock ignored = new AutoCloseableLock(reentrantReadWriteLock.readLock())) {
this.currentOffset = sinkRecord.kafkaOffset();
fileWriter.writeData(sinkRecord);
processedOffset.inc();
// Record the time when data is written to the file
writeTime = System.currentTimeMillis();
} catch (IOException | DataException ex) {
handleErrors(sinkRecord, ex);
}
Expand Down Expand Up @@ -247,7 +293,9 @@ void open() {
reentrantReadWriteLock,
ingestionProps.ingestionProperties.getDataFormat(),
behaviorOnError,
isDlqEnabled);
isDlqEnabled,
tp.topic(),
metricRegistry);
}

void close() {
Expand Down
Loading