Skip to content

Commit d4b5997

Browse files
authored
Use the same Request ID for retries (#915)
Currently the RequestId is initialized in the implementation of IWorkflowService (either Thrift2ProtoAdapter or WorkflowServiceTChannel). Since retries are handled within GenericWorkflowClientExternalImpl, each retry uses a different request ID. This causes Cadence's idempotency feature not to work correctly for retries from the Java client. Move Request ID initialization to GenericWorkflowClientExternalImpl, and for backwards compatibility (since users may be interacting with the IWorkflowService directly) continue setting the request ID if not specified.
1 parent 153df85 commit d4b5997

File tree

5 files changed

+292
-35
lines changed

5 files changed

+292
-35
lines changed

src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,9 @@ private StartWorkflowExecutionResponse startWorkflowExecution(
252252
}
253253

254254
private void initializeStartWorkflowExecutionRequest(StartWorkflowExecutionRequest request) {
255-
request.setRequestId(UUID.randomUUID().toString());
255+
if (!request.isSetRequestId()) {
256+
request.setRequestId(UUID.randomUUID().toString());
257+
}
256258
}
257259

258260
@Override
@@ -466,7 +468,9 @@ public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest
466468
throws BadRequestError, EntityNotExistsError, CancellationAlreadyRequestedError,
467469
ServiceBusyError, DomainNotActiveError, LimitExceededError,
468470
ClientVersionNotSupportedError, WorkflowExecutionAlreadyCompletedError, TException {
469-
cancelRequest.setRequestId(UUID.randomUUID().toString());
471+
if (!cancelRequest.isSetRequestId()) {
472+
cancelRequest.setRequestId(UUID.randomUUID().toString());
473+
}
470474
try {
471475
grpcServiceStubs
472476
.workflowBlockingStub()
@@ -482,7 +486,9 @@ public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest
482486
throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError,
483487
LimitExceededError, ClientVersionNotSupportedError,
484488
WorkflowExecutionAlreadyCompletedError, TException {
485-
signalRequest.setRequestId(UUID.randomUUID().toString());
489+
if (!signalRequest.isSetRequestId()) {
490+
signalRequest.setRequestId(UUID.randomUUID().toString());
491+
}
486492
try {
487493
grpcServiceStubs
488494
.workflowBlockingStub()
@@ -533,7 +539,9 @@ public SignalWithStartWorkflowExecutionAsyncResponse SignalWithStartWorkflowExec
533539

534540
private void initializeSignalWithStartWorkflowExecution(
535541
SignalWithStartWorkflowExecutionRequest request) {
536-
request.setRequestId(UUID.randomUUID().toString());
542+
if (!request.isSetRequestId()) {
543+
request.setRequestId(UUID.randomUUID().toString());
544+
}
537545
}
538546

539547
@Override
@@ -542,7 +550,9 @@ public ResetWorkflowExecutionResponse ResetWorkflowExecution(
542550
throws BadRequestError, EntityNotExistsError, ServiceBusyError, DomainNotActiveError,
543551
LimitExceededError, ClientVersionNotSupportedError, TException {
544552
try {
545-
resetRequest.setRequestId(UUID.randomUUID().toString());
553+
if (!resetRequest.isSetRequestId()) {
554+
resetRequest.setRequestId(UUID.randomUUID().toString());
555+
}
546556
com.uber.cadence.api.v1.ResetWorkflowExecutionResponse response =
547557
grpcServiceStubs
548558
.workflowBlockingStub()
@@ -980,7 +990,9 @@ public void SignalWorkflowExecution(
980990
SignalWorkflowExecutionRequest signalRequest, AsyncMethodCallback resultHandler)
981991
throws TException {
982992
try {
983-
signalRequest.setRequestId(UUID.randomUUID().toString());
993+
if (!signalRequest.isSetRequestId()) {
994+
signalRequest.setRequestId(UUID.randomUUID().toString());
995+
}
984996
ListenableFuture<com.uber.cadence.api.v1.SignalWorkflowExecutionResponse> resultFuture =
985997
grpcServiceStubs
986998
.workflowFutureStub()

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,8 @@ public void onError(Exception exception) {
244244

245245
private StartWorkflowExecutionRequest getStartRequest(
246246
StartWorkflowExecutionParameters startParameters) {
247-
StartWorkflowExecutionRequest request = new StartWorkflowExecutionRequest();
247+
StartWorkflowExecutionRequest request =
248+
new StartWorkflowExecutionRequest().setRequestId(generateUniqueId());
248249
request.setDomain(domain);
249250
if (startParameters.getInput() != null) {
250251
request.setInput(startParameters.getInput());
@@ -383,7 +384,8 @@ public void onError(Exception exception) {
383384

384385
private SignalWorkflowExecutionRequest getSignalRequest(
385386
SignalExternalWorkflowParameters signalParameters) {
386-
SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
387+
SignalWorkflowExecutionRequest request =
388+
new SignalWorkflowExecutionRequest().setRequestId(generateUniqueId());
387389
request.setDomain(domain);
388390
request.setInput(signalParameters.getInput());
389391
request.setSignalName(signalParameters.getSignalName());
@@ -466,7 +468,8 @@ private WorkflowExecution signalWithStartWorkflowInternal(
466468

467469
private SignalWithStartWorkflowExecutionRequest createSignalWithStartRequest(
468470
SignalWithStartWorkflowExecutionParameters parameters) {
469-
SignalWithStartWorkflowExecutionRequest request = new SignalWithStartWorkflowExecutionRequest();
471+
SignalWithStartWorkflowExecutionRequest request =
472+
new SignalWithStartWorkflowExecutionRequest().setRequestId(generateUniqueId());
470473
request.setDomain(domain);
471474
StartWorkflowExecutionParameters startParameters = parameters.getStartParameters();
472475
request.setSignalName(parameters.getSignalName());
@@ -509,7 +512,8 @@ private SignalWithStartWorkflowExecutionRequest createSignalWithStartRequest(
509512

510513
@Override
511514
public void requestCancelWorkflowExecution(WorkflowExecution execution) {
512-
RequestCancelWorkflowExecutionRequest request = new RequestCancelWorkflowExecutionRequest();
515+
RequestCancelWorkflowExecutionRequest request =
516+
new RequestCancelWorkflowExecutionRequest().setRequestId(generateUniqueId());
513517
request.setDomain(domain);
514518
request.setWorkflowExecution(execution);
515519
try {
@@ -544,8 +548,7 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete
544548

545549
@Override
546550
public String generateUniqueId() {
547-
String workflowId = UUID.randomUUID().toString();
548-
return workflowId;
551+
return UUID.randomUUID().toString();
549552
}
550553

551554
@Override

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,9 @@ private StartWorkflowExecutionAsyncResponse startWorkflowExecutionAsync(
712712
}
713713

714714
private void initializeStartWorkflowRequest(StartWorkflowExecutionRequest startRequest) {
715-
startRequest.setRequestId(UUID.randomUUID().toString());
715+
if (!startRequest.isSetRequestId()) {
716+
startRequest.setRequestId(UUID.randomUUID().toString());
717+
}
716718
// Write span context to header
717719
if (!startRequest.isSetHeader()) {
718720
startRequest.setHeader(new Header());
@@ -1412,7 +1414,9 @@ public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest
14121414

14131415
private void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest)
14141416
throws TException {
1415-
cancelRequest.setRequestId(UUID.randomUUID().toString());
1417+
if (!cancelRequest.isSetRequestId()) {
1418+
cancelRequest.setRequestId(UUID.randomUUID().toString());
1419+
}
14161420
ThriftResponse<WorkflowService.RequestCancelWorkflowExecution_result> response = null;
14171421
try {
14181422
ThriftRequest<WorkflowService.RequestCancelWorkflowExecution_args> request =
@@ -1667,7 +1671,9 @@ private StartWorkflowExecutionResponse signalWithStartWorkflowExecution(
16671671

16681672
private void initializeSignalWithStartWorkflowRequest(
16691673
SignalWithStartWorkflowExecutionRequest request) {
1670-
request.setRequestId(UUID.randomUUID().toString());
1674+
if (!request.isSetRequestId()) {
1675+
request.setRequestId(UUID.randomUUID().toString());
1676+
}
16711677
// Write span context to header
16721678
if (!request.isSetHeader()) {
16731679
request.setHeader(new Header());

src/test/java/com/uber/cadence/FakeWorkflowServiceRule.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import com.uber.tchannel.messages.ThriftResponse;
1111
import io.opentracing.mock.MockTracer;
1212
import java.util.Map;
13+
import java.util.Queue;
1314
import java.util.concurrent.CompletableFuture;
1415
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.ConcurrentLinkedQueue;
1517
import org.junit.rules.ExternalResource;
1618

1719
/**
@@ -21,7 +23,7 @@
2123
*/
2224
public class FakeWorkflowServiceRule extends ExternalResource {
2325

24-
private final Map<String, StubbedResponse<?>> stubbedResponses = new ConcurrentHashMap<>();
26+
private final Map<String, StubbedEndpoint> stubbedEndpoints = new ConcurrentHashMap<>();
2527
private final MockTracer tracer = new MockTracer();
2628
private TChannel tChannel;
2729
private IWorkflowService clientConn;
@@ -33,11 +35,16 @@ protected void before() throws Throwable {
3335
new ThriftRequestHandler<Object, Object>() {
3436
@Override
3537
public ThriftResponse<Object> handleImpl(ThriftRequest<Object> request) {
38+
StubbedEndpoint endpoint = stubbedEndpoints.get(request.getEndpoint());
39+
if (endpoint == null) {
40+
throw new IllegalStateException(
41+
"Endpoint " + request.getEndpoint() + " was not stubbed");
42+
}
3643
@SuppressWarnings("rawtypes")
37-
StubbedResponse stub = stubbedResponses.get(request.getEndpoint());
44+
StubbedResponse stub = endpoint.getNext();
3845
if (stub == null) {
3946
throw new IllegalStateException(
40-
"Endpoint " + request.getEndpoint() + " was not stubbed");
47+
"Exhausted all invocations of " + request.getEndpoint());
4148
}
4249
//noinspection unchecked
4350
stub.future.complete(request.getBody(stub.requestType));
@@ -59,7 +66,7 @@ public ThriftResponse<Object> handleImpl(ThriftRequest<Object> request) {
5966

6067
@Override
6168
protected void after() {
62-
stubbedResponses.clear();
69+
stubbedEndpoints.clear();
6370
if (clientConn != null) {
6471
clientConn.close();
6572
}
@@ -71,7 +78,7 @@ protected void after() {
7178

7279
public void resetStubs() {
7380
tracer.reset();
74-
stubbedResponses.clear();
81+
stubbedEndpoints.clear();
7582
}
7683

7784
public IWorkflowService getClient() {
@@ -82,19 +89,37 @@ public MockTracer getTracer() {
8289
return tracer;
8390
}
8491

85-
public <V> CompletableFuture<V> stubEndpoint(
92+
public <V> CompletableFuture<V> stubSuccess(
8693
String endpoint, Class<V> requestType, Object response) {
94+
return stubEndpoint(endpoint, requestType, ResponseCode.OK, response);
95+
}
96+
97+
public <V> CompletableFuture<V> stubError(
98+
String endpoint, Class<V> requestType, Object response) {
99+
return stubEndpoint(endpoint, requestType, ResponseCode.Error, response);
100+
}
101+
102+
public <V> CompletableFuture<V> stubEndpoint(
103+
String endpoint, Class<V> requestType, ResponseCode code, Object response) {
87104
CompletableFuture<V> future = new CompletableFuture<>();
88-
StubbedResponse<?> existingStub =
89-
stubbedResponses.putIfAbsent(
90-
endpoint, new StubbedResponse<>(response, ResponseCode.OK, future, requestType));
91-
if (existingStub != null) {
92-
throw new IllegalStateException(
93-
"Endpoint " + endpoint + " was already stubbed to return " + existingStub.body);
94-
}
105+
StubbedEndpoint endpointStub =
106+
stubbedEndpoints.computeIfAbsent(endpoint, id -> new StubbedEndpoint());
107+
endpointStub.addStub(new StubbedResponse<>(response, code, future, requestType));
95108
return future;
96109
}
97110

111+
private static class StubbedEndpoint {
112+
private final Queue<StubbedResponse<?>> responses = new ConcurrentLinkedQueue<>();
113+
114+
public void addStub(StubbedResponse<?> response) {
115+
responses.add(response);
116+
}
117+
118+
public StubbedResponse<?> getNext() {
119+
return responses.poll();
120+
}
121+
}
122+
98123
private static class StubbedResponse<V> {
99124
private final Object body;
100125
private final ResponseCode code;

0 commit comments

Comments
 (0)