From 308ddb858467b0f04fe613ae4a8e93689d7f1793 Mon Sep 17 00:00:00 2001 From: Nathanael Mortensen Date: Mon, 15 Dec 2025 09:01:28 -0800 Subject: [PATCH] Propagate Tracer when building TChannel While workflows still capture the tracing information in the headers, we should pass the Tracer to TChannel so that we correctly trace all RPC requests sent to the Cadence server. Ensure that Tracer can't be set to null and correct the testing library version to avoid classpath conflicts. --- build.gradle | 2 +- .../cadence/serviceclient/ClientOptions.java | 13 +++- .../WorkflowServiceTChannel.java | 3 +- .../sync/WorkflowClientInternalTest.java | 31 +++++++- .../internal/tracing/StartWorkflowTest.java | 75 ++++++++++++------- 5 files changed, 87 insertions(+), 37 deletions(-) diff --git a/build.gradle b/build.gradle index f57c0add2..3d8345448 100644 --- a/build.gradle +++ b/build.gradle @@ -100,7 +100,7 @@ dependencies { testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4' testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' testCompile group: 'io.grpc', name: 'grpc-testing', version: '1.54.2' - testImplementation 'io.opentracing:opentracing-mock:0.33.0' + testImplementation 'io.opentracing:opentracing-mock:0.32.0' testImplementation group: 'org.mockito', name: 'mockito-core', version: '4.5.1' testImplementation "org.mockito:mockito-inline:4.5.1" // for mocking final classes } diff --git a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java index d7c7072d7..be311dc1c 100644 --- a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java +++ b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java @@ -26,6 +26,7 @@ import io.grpc.ManagedChannel; import io.opentracing.Tracer; import io.opentracing.noop.NoopTracerFactory; +import io.opentracing.util.GlobalTracer; import java.util.Map; public class ClientOptions { @@ -130,7 +131,13 @@ private ClientOptions(Builder builder) { } this.authProvider = builder.authProvider; this.isolationGroup = builder.isolationGroup; - this.tracer = builder.tracer; + if (builder.tracer != null) { + this.tracer = builder.tracer; + } else { + // Default value is GlobalTracer. If the user overrides it with null, fall back to NoopTracer. + // We need some tracer instance + this.tracer = NoopTracerFactory.create(); + } } public static ClientOptions defaultInstance() { @@ -233,8 +240,8 @@ public static class Builder { private IAuthorizationProvider authProvider; private FeatureFlags featureFlags; private String isolationGroup; - // by default NoopTracer - private Tracer tracer = NoopTracerFactory.create(); + // by default GlobalTracer + private Tracer tracer = GlobalTracer.get(); private Builder() {} diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index d8d22695f..d2f08494c 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -86,7 +86,8 @@ public class WorkflowServiceTChannel implements IWorkflowService { public WorkflowServiceTChannel(ClientOptions options) { this.options = options; this.thriftHeaders = getThriftHeaders(options); - this.tChannel = new TChannel.Builder(options.getClientAppName()).build(); + this.tChannel = + new TChannel.Builder(options.getClientAppName()).setTracer(options.getTracer()).build(); this.tracingPropagator = new TracingPropagator(options.getTracer()); this.tracer = options.getTracer(); diff --git a/src/test/java/com/uber/cadence/internal/sync/WorkflowClientInternalTest.java b/src/test/java/com/uber/cadence/internal/sync/WorkflowClientInternalTest.java index 84facee0c..1956e5fbb 100644 --- a/src/test/java/com/uber/cadence/internal/sync/WorkflowClientInternalTest.java +++ b/src/test/java/com/uber/cadence/internal/sync/WorkflowClientInternalTest.java @@ -116,8 +116,18 @@ public void testEnqueueStart_includesTracing() { stub.enqueueStart("input"); StartWorkflowExecutionAsyncRequest request = requestFuture.getNow(null).getStartRequest(); - assertEquals(1, fakeService.getTracer().finishedSpans().size()); - MockSpan mockSpan = fakeService.getTracer().finishedSpans().get(0); + MockSpan mockSpan = + fakeService + .getTracer() + .finishedSpans() + .stream() + .filter(span -> "cadence-StartWorkflowExecutionAsync".equals(span.operationName())) + .findFirst() + .orElseThrow( + () -> + new AssertionError( + "No span found for StartWorkflowExecutionAsync:" + + fakeService.getTracer().finishedSpans())); assertEquals( mockSpan.context().toTraceId(), Charsets.UTF_8 @@ -269,8 +279,21 @@ public void testEnqueueSignalWithStart_includesTracing() { SignalWithStartWorkflowExecutionRequest request = requestFuture.getNow(null).getSignalWithStartRequest().getRequest(); - assertEquals(1, fakeService.getTracer().finishedSpans().size()); - MockSpan mockSpan = fakeService.getTracer().finishedSpans().get(0); + assertEquals(2, fakeService.getTracer().finishedSpans().size()); + MockSpan mockSpan = + fakeService + .getTracer() + .finishedSpans() + .stream() + .filter( + span -> + "cadence-SignalWithStartWorkflowExecutionAsync".equals(span.operationName())) + .findFirst() + .orElseThrow( + () -> + new AssertionError( + "No span found for SignalWithStartWorkflowExecutionAsync:" + + fakeService.getTracer().finishedSpans())); assertEquals( mockSpan.context().toTraceId(), Charsets.UTF_8.decode(request.getHeader().getFields().get("traceid")).toString()); diff --git a/src/test/java/com/uber/cadence/internal/tracing/StartWorkflowTest.java b/src/test/java/com/uber/cadence/internal/tracing/StartWorkflowTest.java index 23b39f000..357a35098 100644 --- a/src/test/java/com/uber/cadence/internal/tracing/StartWorkflowTest.java +++ b/src/test/java/com/uber/cadence/internal/tracing/StartWorkflowTest.java @@ -267,7 +267,8 @@ public void testSignalWithStartWorkflowGRPC() { public void testStartWorkflowTchannelNoPropagation() { Assume.assumeTrue(useDockerService); MockTracer mockTracer = new MockTracer(); - IWorkflowService service = new WorkflowServiceTChannel(ClientOptions.newBuilder().build()); + IWorkflowService service = + new WorkflowServiceTChannel(ClientOptions.newBuilder().setTracer(null).build()); testStartWorkflowHelper(service, mockTracer, false); } @@ -277,7 +278,8 @@ public void testStartWorkflowGRPCNoPropagation() { MockTracer mockTracer = new MockTracer(); IWorkflowService service = new Thrift2ProtoAdapter( - IGrpcServiceStubs.newInstance(ClientOptions.newBuilder().setPort(7833).build())); + IGrpcServiceStubs.newInstance( + ClientOptions.newBuilder().setPort(7833).setTracer(null).build())); testStartWorkflowHelper(service, mockTracer, false); } @@ -285,7 +287,8 @@ public void testStartWorkflowGRPCNoPropagation() { public void testSignalStartWorkflowTchannelNoPropagation() { Assume.assumeTrue(useDockerService); MockTracer mockTracer = new MockTracer(); - IWorkflowService service = new WorkflowServiceTChannel(ClientOptions.newBuilder().build()); + IWorkflowService service = + new WorkflowServiceTChannel(ClientOptions.newBuilder().setTracer(null).build()); testSignalWithStartWorkflowHelper(service, mockTracer, false); } @@ -295,7 +298,8 @@ public void testSignalStartWorkflowGRPCNoPropagation() { MockTracer mockTracer = new MockTracer(); IWorkflowService service = new Thrift2ProtoAdapter( - IGrpcServiceStubs.newInstance(ClientOptions.newBuilder().setPort(7833).build())); + IGrpcServiceStubs.newInstance( + ClientOptions.newBuilder().setPort(7833).setTracer(null).build())); testSignalWithStartWorkflowHelper(service, mockTracer, false); } @@ -330,7 +334,7 @@ private void testStartWorkflowHelper( int res = wf.AddOneThenDouble(3); assertEquals(8, res); } catch (Exception e) { - fail("workflow failure: " + e); + throw new AssertionError("workflow failure", e); } finally { rootSpan.finish(); List spans = mockTracer.finishedSpans(); @@ -358,19 +362,11 @@ private void testStartWorkflowHelper( } else { // assert start workflow MockSpan spanStartWorkflow = - spans - .stream() - .filter(span -> span.operationName().contains("StartWorkflow")) - .findFirst() - .orElse(null); - if (spanStartWorkflow == null) { - fail("StartWorkflow span not found"); - } + getFirstSpanByOperationName(spans, "cadence-StartWorkflowExecution"); // assert workflow spans - MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(0); - assertEquals(spanExecuteWF.operationName(), "cadence-ExecuteWorkflow"); - assertSpanReferences(spanExecuteWF, "follows_from", spanStartWorkflow); + MockSpan spanExecuteWF = getFirstSpanByOperationName(spans, "cadence-ExecuteWorkflow"); + assertChildOf(spans, spanExecuteWF, spanStartWorkflow); MockSpan spanExecuteActivity = getLinkedSpans(spans, spanExecuteWF.context()).get(0); assertEquals(spanExecuteActivity.operationName(), "cadence-ExecuteActivity"); @@ -432,7 +428,7 @@ private void testSignalWithStartWorkflowHelper( int res = wf.getResult(Integer.class); assertEquals(8, res); } catch (Exception e) { - fail("workflow failure: " + e); + throw new AssertionError("Workflow failure", e); } finally { rootSpan.finish(); List spans = mockTracer.finishedSpans(); @@ -460,21 +456,14 @@ private void testSignalWithStartWorkflowHelper( } else { // assert start workflow MockSpan spanStartWorkflow = - spans - .stream() - .filter(span -> span.operationName().contains("StartWorkflow")) - .findFirst() - .orElse(null); - if (spanStartWorkflow == null) { - fail("StartWorkflow span not found"); - } + getFirstSpanByOperationName(spans, "cadence-SignalWithStartWorkflowExecution"); // assert workflow spans List workflowSpans = getSpansByTraceID(spans, spanStartWorkflow.context().toTraceId()); - MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(0); - assertEquals(spanExecuteWF.operationName(), "cadence-ExecuteWorkflow"); - assertSpanReferences(spanExecuteWF, "follows_from", spanStartWorkflow); + MockSpan spanExecuteWF = + getFirstSpanByOperationName(workflowSpans, "cadence-ExecuteWorkflow"); + assertChildOf(spans, spanExecuteWF, spanStartWorkflow); MockSpan spanExecuteActivity = getLinkedSpans(spans, spanExecuteWF.context()).get(0); assertEquals(spanExecuteActivity.operationName(), "cadence-ExecuteActivity"); @@ -493,6 +482,15 @@ private void testSignalWithStartWorkflowHelper( } } + private MockSpan getFirstSpanByOperationName(List spans, String operation) { + return spans + .stream() + .filter(span -> span.operationName().equals(operation)) + .findFirst() + .orElseThrow( + () -> new IllegalStateException("Failed to find span with operation: " + operation)); + } + private List getSpansByTraceID(List spans, String traceID) { return spans .stream() @@ -517,6 +515,27 @@ private List getLinkedSpans(List spans, SpanContext spanCont .collect(Collectors.toList()); } + void assertChildOf(List spans, MockSpan span, MockSpan parentSpan) { + Map byId = new HashMap<>(); + for (MockSpan s : spans) { + byId.put(s.context().traceId() + "-" + s.context().toSpanId(), s); + } + Queue toVisit = new LinkedList<>(); + toVisit.add(span); + while (!toVisit.isEmpty()) { + MockSpan current = toVisit.poll(); + MockSpan parent = byId.get(current.context().traceId() + "-" + current.parentId()); + if (parent != null) { + if (parent.equals(parentSpan)) { + return; + } else { + toVisit.add(parent); + } + } + } + fail(String.format("span %s is not a child of parent span %s", span, parentSpan)); + } + void assertSpanReferences(MockSpan span, String referenceType, MockSpan parentSpan) { assertTrue( String.format(