Skip to content

Commit 153df85

Browse files
authored
Avoid consuming ByteBuffers (#913)
A ByteBuffer is a pointer to a byte[] with a starting position, a current position, and a limit. Any function that reads from its contents updates the current position. Both TracingPropagator and WorkflowUtils copy the entirety of its contents, and in doing so they mutate the current position. WorkflowUtils resets it afterwards but this still isn't thread-safe as another thread may be trying to read it. By duplicating the ByteBuffer (copying only the metadata, not the actual contents) we avoid modifying it. It doesn't seem likely that there's real impact in either of these cases beyond unit tests, where these ByteBuffers stick around in the workflow history and are repeatedly serialized/deserialized. Modifying them during serialization can create test flakiness as that can trigger exceptions.
1 parent b1c7e38 commit 153df85

File tree

3 files changed

+52
-4
lines changed

3 files changed

+52
-4
lines changed

src/main/java/com/uber/cadence/internal/tracing/TracingPropagator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private SpanContext extract(Header header) {
161161
Map.Entry::getKey,
162162
e -> {
163163
byte[] bytes = new byte[e.getValue().remaining()];
164-
e.getValue().get(bytes);
164+
e.getValue().duplicate().get(bytes);
165165
return new String(bytes);
166166
}))));
167167
}

src/main/java/com/uber/cadence/workflow/WorkflowUtils.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@ public static <T> T getValueFromSearchAttributes(
3535
}
3636

3737
private static byte[] getValueBytes(SearchAttributes searchAttributes, String key) {
38-
ByteBuffer byteBuffer = searchAttributes.getIndexedFields().get(key);
38+
ByteBuffer byteBuffer = searchAttributes.getIndexedFields().get(key).duplicate();
3939
final byte[] valueBytes = new byte[byteBuffer.remaining()];
40-
byteBuffer.mark();
4140
byteBuffer.get(valueBytes);
42-
byteBuffer.reset();
4341
return valueBytes;
4442
}
4543
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.uber.cadence.internal.tracing;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
6+
import com.google.common.collect.ImmutableMap;
7+
import com.uber.cadence.Header;
8+
import com.uber.cadence.PollForActivityTaskResponse;
9+
import io.opentracing.Span;
10+
import io.opentracing.mock.MockSpan;
11+
import io.opentracing.mock.MockTracer;
12+
import java.nio.ByteBuffer;
13+
import java.util.List;
14+
import org.junit.Test;
15+
16+
public class TracingPropagatorTest {
17+
18+
private final MockTracer mockTracer = new MockTracer();
19+
private final TracingPropagator propagator = new TracingPropagator(mockTracer);
20+
21+
@Test
22+
public void testSpanForExecuteActivity_allowReusingHeaders() {
23+
Header header =
24+
new Header()
25+
.setFields(
26+
ImmutableMap.of(
27+
"traceid",
28+
ByteBuffer.wrap("100".getBytes()),
29+
"spanid",
30+
ByteBuffer.wrap("200".getBytes())));
31+
32+
Span span =
33+
propagator.spanForExecuteActivity(
34+
new PollForActivityTaskResponse().setHeader(header).setActivityId("id"));
35+
span.finish();
36+
Span span2 =
37+
propagator.spanForExecuteActivity(
38+
new PollForActivityTaskResponse().setHeader(header).setActivityId("id2"));
39+
span2.finish();
40+
41+
for (MockSpan mockSpan : mockTracer.finishedSpans()) {
42+
assertEquals("100", mockSpan.context().toTraceId());
43+
List<MockSpan.Reference> references = mockSpan.references();
44+
assertFalse(references.isEmpty());
45+
MockSpan.Reference from = references.get(0);
46+
assertEquals("200", from.getContext().toSpanId());
47+
assertEquals("follows_from", from.getReferenceType());
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)