Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.linkedin.datastream.common.BrooklinEnvelope;
import com.linkedin.datastream.common.Datastream;
import com.linkedin.datastream.common.DatastreamMetadataConstants;
import com.linkedin.datastream.common.DatastreamRuntimeException;
import com.linkedin.datastream.common.ErrorLogger;
import com.linkedin.datastream.metrics.BrooklinCounterInfo;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class EventProducer implements DatastreamEventProducer {
public static final String DEFAULT_FLUSH_INTERVAL_MS = String.valueOf(Duration.ofMinutes(5).toMillis());

static final String EVENTS_LATENCY_MS_STRING = "eventsLatencyMs";
static final String EVENTS_LATENCY_MS_SLA_INELIGIBLE_STRING = "eventsLatencyMsSlaIneligible";
static final String EVENTS_SEND_LATENCY_MS_STRING = "eventsSendLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING = "throughputViolatingEventsLatencyMs";
static final String THROUGHPUT_VIOLATING_EVENTS_SEND_LATENCY_MS_STRING = "throughputViolatingEventsSendLatencyMs";
Expand All @@ -90,6 +92,8 @@ public class EventProducer implements DatastreamEventProducer {
private static final String WARN_LOG_LATENCY_THRESHOLD_MS = "warnLogLatencyThresholdMs";
private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_ENABLED = "numEventsOutsideAltSlaLogEnabled";
private static final String NUM_EVENTS_OUTSIDE_ALT_SLA_LOG_FREQUENCY_MS = "numEventsOutsideAltSlaFrequencyMs";
private static final String NEW_STREAM_GRACE_PERIOD_MS = "newStreamGracePeriodMs";
private static final String DEFAULT_NEW_STREAM_GRACE_PERIOD_MS = "7200000"; // 2 hours
private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla";
private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla";
private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError";
Expand All @@ -114,6 +118,11 @@ public class EventProducer implements DatastreamEventProducer {
private final int _availabilityThresholdSlaMs;
// Alternate SLA for comparison with the main SLA
private final int _availabilityThresholdAlternateSlaMs;
// Grace period for newly created streams. While a stream is inside this window, primary/alternate
// SLA counters are suppressed and the latency histogram is redirected to eventsLatencyMsSlaIneligible.
private final long _newStreamGracePeriodMs;
// Timestamp when the stream was created (from datastream metadata)
private final long _streamCreationTimeMs;
// Whether to enable warning logs if the latency threshold is met
private final boolean _warnLogLatencyEnabled;
// Latency threshold at which to log a warning message
Expand Down Expand Up @@ -185,6 +194,10 @@ public EventProducer(DatastreamTask task, TransportProvider transportProvider, C
_availabilityThresholdAlternateSlaMs = Integer.parseInt(
config.getProperty(AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS, DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS));

_newStreamGracePeriodMs = Long.parseLong(
config.getProperty(NEW_STREAM_GRACE_PERIOD_MS, DEFAULT_NEW_STREAM_GRACE_PERIOD_MS));
_streamCreationTimeMs = parseStreamCreationTimeMs(task);

_warnLogLatencyEnabled =
Boolean.parseBoolean(config.getProperty(WARN_LOG_LATENCY_ENABLED, DEFAULT_WARN_LOG_LATENCY_ENABLED));

Expand Down Expand Up @@ -342,6 +355,58 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
return broadcastMetadata;
}

private boolean isCdcSource() {
return _sourceDatabase != null;
}

/**
* Returns true while a CDC stream is still within its cdc-catch-up grace window. Non-CDC
* sources (BMM kafka://, Inlogs, etc.) always return false here so neither SLA suppression
* nor the latency-histogram redirect applies to them.
*/
private boolean isWithinGracePeriod() {
if (!isCdcSource()) {
return false;
}
return (System.currentTimeMillis() - _streamCreationTimeMs) < _newStreamGracePeriodMs;
}

/**
* Single gate that decides whether to emit metrics to their regular destinations. When this
* returns false, primary and alternate SLA counters are suppressed and the latency histogram
* is redirected to eventsLatencyMsSlaIneligible. Combines all suppression conditions in one place
* — additional conditions (e.g. per-datastream opt-out flags) should be ORed in here so call
* sites do not need to know about every gating condition individually.
*/
private boolean shouldEmitMetric() {
if (isWithinGracePeriod()) {
return false;
}
return true;
}

/**
* Parse the stream creation time used for SLA grace-period gating. When multiple datastreams
* share a task via dedup, returns the oldest (min) CREATION_MS so a freshly deduped stream
* cannot suppress SLA on long-running siblings. Zero / missing / malformed values are filtered
* out; the method returns 0 in those cases (grace disabled, fail-open to SLA reporting).
*/
private long parseStreamCreationTimeMs(DatastreamTask task) {
try {
if (task.getDatastreams() != null && !task.getDatastreams().isEmpty()) {
return task.getDatastreams().stream()
.map(ds -> ds.getMetadata().getOrDefault(DatastreamMetadataConstants.CREATION_MS, "0"))
.mapToLong(Long::parseLong)
.filter(ms -> ms > 0)
.min()
.orElse(0L);
}
} catch (Exception e) {
_logger.warn("Failed to parse stream creation time, SLA grace period will not be applied", e);
}
return 0L;
}

// Report SLA metrics for aggregate, connector and task
private void reportSLAMetrics(String topicOrDatastreamName, boolean isWithinSLA, String metricNameForWithinSLA,
String metricNameForOutsideSLA) {
Expand Down Expand Up @@ -407,13 +472,21 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT
if (eventsSourceTimestamp > 0) {
// Report availability metrics
long sourceToDestinationLatencyMs = System.currentTimeMillis() - eventsSourceTimestamp;
reportEventLatencyMetrics(topicOrDatastreamName, metadata, sourceToDestinationLatencyMs, EVENTS_LATENCY_MS_STRING);

reportSLAMetrics(topicOrDatastreamName, sourceToDestinationLatencyMs <= _availabilityThresholdSlaMs,
EVENTS_PRODUCED_WITHIN_SLA, EVENTS_PRODUCED_OUTSIDE_SLA);

reportSLAMetrics(topicOrDatastreamName, sourceToDestinationLatencyMs <= _availabilityThresholdAlternateSlaMs,
EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA, EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA);
// Redirect the latency histogram to eventsLatencyMsSlaIneligible while SLA emission is suppressed
// so lag alerts wired to eventsLatencyMs do not fire on the initial CDC catch-up. The
// catch-up curve is still observable on eventsLatencyMsSlaIneligible.
String latencyMetricName = shouldEmitMetric() ? EVENTS_LATENCY_MS_STRING : EVENTS_LATENCY_MS_SLA_INELIGIBLE_STRING;
reportEventLatencyMetrics(topicOrDatastreamName, metadata, sourceToDestinationLatencyMs, latencyMetricName);

// While shouldEmitMetric() returns false (currently only during the CDC catch-up grace
// window) both primary and alternate SLA counter pairs are suppressed entirely.
if (shouldEmitMetric()) {
reportSLAMetrics(topicOrDatastreamName, sourceToDestinationLatencyMs <= _availabilityThresholdSlaMs,
EVENTS_PRODUCED_WITHIN_SLA, EVENTS_PRODUCED_OUTSIDE_SLA);

reportSLAMetrics(topicOrDatastreamName, sourceToDestinationLatencyMs <= _availabilityThresholdAlternateSlaMs,
EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA, EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA);
}

if (_logger.isDebugEnabled()) {
if (sourceToDestinationLatencyMs > _availabilityThresholdSlaMs) {
Expand Down Expand Up @@ -683,6 +756,9 @@ public static List<BrooklinMetricInfo> getMetricInfos() {
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + EVENTS_LATENCY_MS_STRING, Optional.of(
Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99,
BrooklinHistogramInfo.PERCENTILE_999))));
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + EVENTS_LATENCY_MS_SLA_INELIGIBLE_STRING, Optional.of(
Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99,
BrooklinHistogramInfo.PERCENTILE_999))));
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + EVENTS_SEND_LATENCY_MS_STRING));
metrics.add(new BrooklinHistogramInfo(METRICS_PREFIX + THROUGHPUT_VIOLATING_EVENTS_LATENCY_MS_STRING, Optional.of(
Arrays.asList(BrooklinHistogramInfo.PERCENTILE_50, BrooklinHistogramInfo.PERCENTILE_99,
Expand Down
Loading
Loading