Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class EventProducer implements DatastreamEventProducer {

public static final String CFG_SKIP_MSG_SERIALIZATION_ERRORS = "skipMessageOnSerializationErrors";
public static final String DEFAULT_SKIP_MSG_SERIALIZATION_ERRORS = "false";
public static final String CFG_DISABLE_SLA_METRIC = "system.disableSlaMetric";
public static final String CONFIG_FLUSH_INTERVAL_MS = "flushIntervalMs";
public static final String CONFIG_ENABLE_PER_TOPIC_METRICS = "enablePerTopicMetrics";
public static final String CONFIG_ENABLE_PER_TOPIC_EVENT_LATENCY_METRICS = "enablePerTopicEventLatencyMetrics";
Expand Down Expand Up @@ -134,6 +135,7 @@ public class EventProducer implements DatastreamEventProducer {
private final boolean _skipMessageOnSerializationErrors;
private final boolean _enablePerTopicMetrics;
private final boolean _enablePerTopicEventLatencyMetrics;
private final boolean _disableSlaMetric;
private final boolean _enableThroughputMetrics;
// Cached source database name parsed from the connection string at construction time (null for non-CDC sources)
private final String _sourceDatabase;
Expand Down Expand Up @@ -220,6 +222,8 @@ public EventProducer(DatastreamTask task, TransportProvider transportProvider, C
Boolean.parseBoolean(config.getProperty(CONFIG_ENABLE_PER_TOPIC_EVENT_LATENCY_METRICS,
Boolean.FALSE.toString()));

_disableSlaMetric = getDisableSlaMetric(task);
Comment thread
manishGoyalCode marked this conversation as resolved.

_enableThroughputMetrics =
Boolean.parseBoolean(config.getProperty(CONFIG_ENABLE_THROUGHPUT_METRICS, Boolean.FALSE.toString()));

Expand Down Expand Up @@ -379,6 +383,9 @@ private boolean isWithinGracePeriod() {
* sites do not need to know about every gating condition individually.
*/
private boolean shouldEmitMetric() {
if (_disableSlaMetric) {
return false;
}
if (isWithinGracePeriod()) {
return false;
}
Expand Down Expand Up @@ -470,7 +477,8 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT
String topicOrDatastreamName = _enablePerTopicMetrics ? metadata.getTopic() : datastreamName;
// Treat all events within this record equally (assume same timestamp)
if (eventsSourceTimestamp > 0) {
// Report availability metrics
// Report availability metrics. Streams that opt out via system.disableSlaMetric still emit
// a latency histogram, but under eventsLatencyMsSlaIneligible so they don't pollute the SLA metric.
long sourceToDestinationLatencyMs = System.currentTimeMillis() - eventsSourceTimestamp;
// Redirect the latency histogram to eventsLatencyMsSlaIneligible while SLA emission is suppressed
// so lag alerts wired to eventsLatencyMs do not fire on the initial CDC catch-up. The
Expand Down Expand Up @@ -655,6 +663,26 @@ private Boolean getSkipMessageOnSerializationErrors(DatastreamTask task, Propert
.orElse(skipMessageOnSerializationErrors));
}

/**
* Looks for config {@value CFG_DISABLE_SLA_METRIC} in the datastream metadata and returns its value.
* Default value is false.
*
* <p>Only honored for tasks owning a single datastream. Deduped tasks (multiple datastreams
* sharing one EventProducer) always return false, since one stream's opt-out would otherwise
* suppress {@code eventsLatencyMs} for every stream in the group.
*/
private boolean getDisableSlaMetric(DatastreamTask task) {
Comment thread
manishGoyalCode marked this conversation as resolved.
if (task.getDatastreams().size() > 1) {
return false;
}
return Boolean.parseBoolean(task.getDatastreams()
.stream()
.findFirst()
.map(Datastream::getMetadata)
.map(metadata -> metadata.getOrDefault(CFG_DISABLE_SLA_METRIC, Boolean.FALSE.toString()))
.orElse(Boolean.FALSE.toString()));
}

@Override
public void flush() {
Instant beforeFlush = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,93 @@ public void testSlaGraceDedupedTaskAllStreamsNew() {
"Alternate-SLA counter must remain suppressed during grace across the deduped task");
}

// ---------------------------------------------------------------------------
// system.disableSlaMetric opt-out tests
//
// The opt-out is a per-datastream metadata flag that, when set, suppresses
// primary + alternate SLA counters and redirects the latency histogram from
// eventsLatencyMs to eventsLatencyMsSlaIneligible. Honored only for tasks
// that own a single datastream — deduped tasks ignore the flag so one
// stream's opt-out cannot suppress eventsLatencyMs for the whole group.
//
// Tests use a creation timestamp 3h in the past to bypass the CDC grace
// gate, so the only thing being exercised is the opt-out path itself.
// ---------------------------------------------------------------------------

@Test
public void testDisableSlaMetricSuppressesSlaCountersAndRedirectsLatency() {
Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-opt-out")[0];
datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable");
long threeHoursAgo = System.currentTimeMillis() - (3 * 60 * 60 * 1000L);
datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(threeHoursAgo));
datastream.getMetadata().put(EventProducer.CFG_DISABLE_SLA_METRIC, Boolean.TRUE.toString());

String topicName = "optOutLatencyTopic";
sendOneEventThroughProducer(datastream, new Properties(), topicName);

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Assert.assertNull(metrics.getMetric(SLA_WITHIN_AGG),
"Primary withinSla counter must not be created when system.disableSlaMetric=true");
Assert.assertNull(metrics.getMetric(SLA_WITHIN_ALT_AGG),
"Alternate-SLA counter must not be created when system.disableSlaMetric=true");
Assert.assertNull(
metrics.getMetric("EventProducer." + topicName + "." + EventProducer.EVENTS_LATENCY_MS_STRING),
"eventsLatencyMs must not fire for opted-out streams");
Assert.assertNotNull(
metrics.getMetric("EventProducer." + topicName + "." + EventProducer.EVENTS_LATENCY_MS_SLA_INELIGIBLE_STRING),
"Latency observation should be redirected to eventsLatencyMsSlaIneligible for opted-out streams");
}

@Test
public void testDisableSlaMetricFalseLeavesSlaReportingActive() {
// Explicit false should behave identically to the flag being absent.
Datastream datastream = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-opt-out-false")[0];
datastream.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable");
long threeHoursAgo = System.currentTimeMillis() - (3 * 60 * 60 * 1000L);
datastream.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(threeHoursAgo));
datastream.getMetadata().put(EventProducer.CFG_DISABLE_SLA_METRIC, Boolean.FALSE.toString());

String topicName = "optOutFalseTopic";
sendOneEventThroughProducer(datastream, new Properties(), topicName);

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Counter withinAgg = (Counter) metrics.getMetric(SLA_WITHIN_AGG);
Assert.assertNotNull(withinAgg, "withinSla counter should be created when opt-out flag is false");
Assert.assertEquals(withinAgg.getCount(), 1L);
Assert.assertNotNull(
metrics.getMetric("EventProducer." + topicName + "." + EventProducer.EVENTS_LATENCY_MS_STRING),
"eventsLatencyMs must fire normally when opt-out flag is false");
}

@Test
public void testDisableSlaMetricIgnoredForDedupedTask() {
// Two datastreams sharing one EventProducer: one opted out, one not. The opt-out must be
// ignored so the non-opted-out stream's SLA reporting is preserved.
long threeHoursAgo = System.currentTimeMillis() - (3 * 60 * 60 * 1000L);

Datastream optedOut = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-dedup-optout")[0];
optedOut.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable");
optedOut.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(threeHoursAgo));
optedOut.getMetadata().put(EventProducer.CFG_DISABLE_SLA_METRIC, Boolean.TRUE.toString());

Datastream regular = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, "ds-dedup-regular")[0];
regular.getSource().setConnectionString("mysql:/myhost/testDatabase/myTable");
regular.getMetadata().put(DatastreamMetadataConstants.CREATION_MS, String.valueOf(threeHoursAgo));

DatastreamTaskImpl task = new DatastreamTaskImpl(Arrays.asList(optedOut, regular));
String topicName = "dedupOptOutTopic";
sendOneEventThroughTask(task, new Properties(), topicName);

DynamicMetricsManager metrics = DynamicMetricsManager.getInstance();
Counter withinAgg = (Counter) metrics.getMetric(SLA_WITHIN_AGG);
Assert.assertNotNull(withinAgg,
"Deduped task must ignore opt-out — one stream's flag cannot suppress SLA for the whole group");
Assert.assertEquals(withinAgg.getCount(), 1L);
Assert.assertNotNull(
metrics.getMetric("EventProducer." + topicName + "." + EventProducer.EVENTS_LATENCY_MS_STRING),
"eventsLatencyMs must continue to fire for deduped tasks regardless of any member's opt-out flag");
}

private void sendOneEventThroughProducer(Datastream datastream, Properties props) {
sendOneEventThroughProducer(datastream, props, "someTopicName");
}
Expand Down
Loading