diff --git a/agent-dispatchers/kinesis/src/main/java/com/expedia/www/haystack/agent/dispatcher/KinesisDispatcher.java b/agent-dispatchers/kinesis/src/main/java/com/expedia/www/haystack/agent/dispatcher/KinesisDispatcher.java index 40f084f..cd8ed8c 100644 --- a/agent-dispatchers/kinesis/src/main/java/com/expedia/www/haystack/agent/dispatcher/KinesisDispatcher.java +++ b/agent-dispatchers/kinesis/src/main/java/com/expedia/www/haystack/agent/dispatcher/KinesisDispatcher.java @@ -61,6 +61,7 @@ public class KinesisDispatcher implements Dispatcher { Timer dispatchTimer; Meter dispatchFailureMeter; + Meter dispatchSuccessMeter; Meter outstandingRecordsError; KinesisProducer producer; @@ -102,6 +103,8 @@ public void initialize(final Config config) { this.producer = new KinesisProducer(buildKinesisProducerConfiguration(props)); this.dispatchTimer = newTimer(buildMetricName(agentName, "kinesis.dispatch.timer")); + + this.dispatchSuccessMeter = newMeter(buildMetricName(agentName, "kinesis.dispatch.success")); this.dispatchFailureMeter = newMeter(buildMetricName(agentName, "kinesis.dispatch.failure")); this.outstandingRecordsError = newMeter(buildMetricName(agentName, "kinesis.dispatch.outstanding.records.error")); newGauge(buildMetricName(agentName, "kinesis.outstanding.requests"), @@ -112,7 +115,7 @@ public void initialize(final Config config) { @Override public void close() { - LOGGER.info("Closing the kinesis span dispatcher now..."); + LOGGER.info("Closing the kinesis dispatcher now..."); if (producer != null) { producer.flushSync(); producer.destroy(); @@ -177,8 +180,10 @@ public void onSuccess(final UserRecordResult result) { timer.close(); if(!result.isSuccessful()) { dispatchFailureMeter.mark(); - LOGGER.error("Fail to put the span record to kinesis after attempts={}", + LOGGER.error("Fail to put the record to kinesis after attempts={}", formatAttempts(result.getAttempts())); + } else { + dispatchSuccessMeter.mark(); } } @@ -190,7 +195,7 @@ public void onFailure(final Throwable throwable) { if (throwable instanceof UserRecordFailedException) { final UserRecordFailedException e = (UserRecordFailedException) throwable; final UserRecordResult result = e.getResult(); - LOGGER.error("Record failed to put span record to kinesis with attempts={}", + LOGGER.error("Record failed to put record to kinesis with attempts={}", formatAttempts(result.getAttempts()), e); } } diff --git a/agent-dispatchers/kinesis/src/test/scala/com/expedia/www/haystack/agent/dispatcher/KinesisSpanDispatcherSpec.scala b/agent-dispatchers/kinesis/src/test/scala/com/expedia/www/haystack/agent/dispatcher/KinesisSpanDispatcherSpec.scala index 1da2cee..a01de36 100644 --- a/agent-dispatchers/kinesis/src/test/scala/com/expedia/www/haystack/agent/dispatcher/KinesisSpanDispatcherSpec.scala +++ b/agent-dispatchers/kinesis/src/test/scala/com/expedia/www/haystack/agent/dispatcher/KinesisSpanDispatcherSpec.scala @@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.producer.{KinesisProducer, UserRecordResul import com.codahale.metrics.{Meter, Timer} import com.expedia.open.tracing.Span import com.expedia.www.haystack.agent.dispatcher.KinesisDispatcher._ +import com.expedia.www.haystack.agent.core.RateLimitException import com.google.common.util.concurrent.ListenableFuture import org.easymock.EasyMock import org.scalatest.easymock.EasyMockSugar @@ -161,11 +162,9 @@ class KinesisSpanDispatcherSpec extends FunSpec with Matchers with EasyMockSugar } whenExecuting(kinesisProducer, outstandRecErrorMeter) { - val caught = intercept[Exception] { + val caught = intercept[RateLimitException] { dispatcher.dispatch(span.getTraceId.getBytes("utf-8"), span.toByteArray) } - - caught.getMessage shouldEqual "fail to dispatch to kinesis due to rate limit, outstanding records: 1001" } } }