Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class KinesisDispatcher implements Dispatcher {

Timer dispatchTimer;
Meter dispatchFailureMeter;
Meter dispatchSuccessMeter;
Meter outstandingRecordsError;

KinesisProducer producer;
Expand Down Expand Up @@ -102,6 +103,8 @@ public void initialize(final Config config) {

this.producer = new KinesisProducer(buildKinesisProducerConfiguration(props));
this.dispatchTimer = newTimer(buildMetricName(agentName, "kinesis.dispatch.timer"));

this.dispatchSuccessMeter = newMeter(buildMetricName(agentName, "kinesis.dispatch.success"));
this.dispatchFailureMeter = newMeter(buildMetricName(agentName, "kinesis.dispatch.failure"));
this.outstandingRecordsError = newMeter(buildMetricName(agentName, "kinesis.dispatch.outstanding.records.error"));
newGauge(buildMetricName(agentName, "kinesis.outstanding.requests"),
Expand All @@ -112,7 +115,7 @@ public void initialize(final Config config) {

@Override
public void close() {
LOGGER.info("Closing the kinesis span dispatcher now...");
LOGGER.info("Closing the kinesis dispatcher now...");
if (producer != null) {
producer.flushSync();
producer.destroy();
Expand Down Expand Up @@ -177,8 +180,10 @@ public void onSuccess(final UserRecordResult result) {
timer.close();
if(!result.isSuccessful()) {
dispatchFailureMeter.mark();
LOGGER.error("Fail to put the span record to kinesis after attempts={}",
LOGGER.error("Fail to put the record to kinesis after attempts={}",
formatAttempts(result.getAttempts()));
} else {
dispatchSuccessMeter.mark();
}
}

Expand All @@ -190,7 +195,7 @@ public void onFailure(final Throwable throwable) {
if (throwable instanceof UserRecordFailedException) {
final UserRecordFailedException e = (UserRecordFailedException) throwable;
final UserRecordResult result = e.getResult();
LOGGER.error("Record failed to put span record to kinesis with attempts={}",
LOGGER.error("Record failed to put record to kinesis with attempts={}",
formatAttempts(result.getAttempts()), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.amazonaws.services.kinesis.producer.{KinesisProducer, UserRecordResul
import com.codahale.metrics.{Meter, Timer}
import com.expedia.open.tracing.Span
import com.expedia.www.haystack.agent.dispatcher.KinesisDispatcher._
import com.expedia.www.haystack.agent.core.RateLimitException
import com.google.common.util.concurrent.ListenableFuture
import org.easymock.EasyMock
import org.scalatest.easymock.EasyMockSugar
Expand Down Expand Up @@ -161,11 +162,9 @@ class KinesisSpanDispatcherSpec extends FunSpec with Matchers with EasyMockSugar
}

whenExecuting(kinesisProducer, outstandRecErrorMeter) {
val caught = intercept[Exception] {
val caught = intercept[RateLimitException] {
dispatcher.dispatch(span.getTraceId.getBytes("utf-8"), span.toByteArray)
}

caught.getMessage shouldEqual "fail to dispatch to kinesis due to rate limit, outstanding records: 1001"
}
}
}
Expand Down