Skip to content

Commit d6bf9d7

Browse files
committed
Fix flakiness in ManualActivityCompletionWorkflowTest
The operations here are flaky because we're starting an activity and then attempting to cancel/complete/fail it in parallel. We need the second activity to block until the first one has started, and there's no way to orchestrate that within the workflow. The best we can do is make the acitivites block until the ActivityTask is available.
1 parent 9d4bed7 commit d6bf9d7

File tree

1 file changed

+31
-15
lines changed

1 file changed

+31
-15
lines changed

src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.uber.cadence.workflow;
1919

20-
import com.google.common.base.Preconditions;
2120
import com.uber.cadence.WorkflowExecution;
2221
import com.uber.cadence.activity.Activity;
2322
import com.uber.cadence.activity.ActivityMethod;
@@ -27,6 +26,7 @@
2726
import com.uber.cadence.testUtils.CadenceTestRule;
2827
import java.time.Duration;
2928
import java.util.concurrent.CancellationException;
29+
import java.util.concurrent.CompletableFuture;
3030
import org.junit.Rule;
3131
import org.junit.Test;
3232

@@ -48,6 +48,9 @@ public interface ManualCompletionActivities {
4848
@ActivityMethod
4949
String asyncActivity();
5050

51+
@ActivityMethod
52+
void reset();
53+
5154
@ActivityMethod
5255
void completeAsyncActivity(String result);
5356

@@ -68,53 +71,56 @@ public interface ManualCompletionActivities {
6871
}
6972

7073
private class ManualCompletionActivitiesImpl implements ManualCompletionActivities {
71-
private ActivityTask openTask;
74+
private CompletableFuture<ActivityTask> openTask = new CompletableFuture<>();
7275

7376
@Override
7477
public synchronized String asyncActivity() {
75-
openTask = Activity.getTask();
78+
openTask.complete(Activity.getTask());
7679

7780
Activity.doNotCompleteOnReturn();
7881
return null;
7982
}
8083

84+
@Override
85+
public synchronized void reset() {
86+
openTask = new CompletableFuture<>();
87+
}
88+
8189
@Override
8290
public synchronized void completeAsyncActivity(String details) {
83-
Preconditions.checkState(openTask != null);
84-
getClient().complete(openTask.getTaskToken(), details);
91+
getClient().complete(openTask.join().getTaskToken(), details);
8592
}
8693

8794
@Override
8895
public synchronized void completeAsyncActivityById(String details) {
89-
Preconditions.checkState(openTask != null);
90-
getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details);
96+
getClient().complete(getCurrentWorkflow(), openTask.join().getActivityId(), details);
9197
}
9298

9399
@Override
94100
public synchronized void failAsyncActivity(String details) {
95-
Preconditions.checkState(openTask != null);
96101
getClient()
97-
.completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details));
102+
.completeExceptionally(
103+
openTask.join().getTaskToken(), new ExceptionWithDetaills(details));
98104
}
99105

100106
@Override
101107
public synchronized void failAsyncActivityById(String details) {
102-
Preconditions.checkState(openTask != null);
103108
getClient()
104109
.completeExceptionally(
105-
getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details));
110+
getCurrentWorkflow(),
111+
openTask.join().getActivityId(),
112+
new ExceptionWithDetaills(details));
106113
}
107114

108115
@Override
109116
public synchronized void cancelAsyncActivity(String details) {
110-
Preconditions.checkState(openTask != null);
111-
getClient().reportCancellation(openTask.getTaskToken(), details);
117+
getClient().reportCancellation(openTask.join().getTaskToken(), details);
112118
}
113119

114120
@Override
115121
public synchronized void cancelAsyncActivityById(String details) {
116-
Preconditions.checkState(openTask != null);
117-
getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details);
122+
getClient()
123+
.reportCancellation(getCurrentWorkflow(), openTask.join().getActivityId(), details);
118124
}
119125

120126
private WorkflowExecution getCurrentWorkflow() {
@@ -146,21 +152,29 @@ public void run() {
146152
expectSuccess("1", result);
147153
expectFailure(() -> activities.completeAsyncActivity("again"));
148154

155+
activities.reset();
156+
149157
result = Async.function(activities::asyncActivity);
150158
activities.completeAsyncActivityById("2");
151159
expectSuccess("2", result);
152160
expectFailure(() -> activities.completeAsyncActivityById("again"));
153161

162+
activities.reset();
163+
154164
result = Async.function(activities::asyncActivity);
155165
activities.failAsyncActivity("3");
156166
expectFailureWithDetails(result, "3");
157167
expectFailure(() -> activities.failAsyncActivity("again"));
158168

169+
activities.reset();
170+
159171
result = Async.function(activities::asyncActivity);
160172
activities.failAsyncActivityById("4");
161173
expectFailureWithDetails(result, "4");
162174
expectFailure(() -> activities.failAsyncActivityById("again"));
163175

176+
activities.reset();
177+
164178
// Need to request cancellation, then the activity can respond with the cancel
165179
CompletablePromise<String> completablePromise = Workflow.newPromise();
166180
CancellationScope scope =
@@ -178,6 +192,8 @@ public void run() {
178192
activities.cancelAsyncActivity("5");
179193
expectCancelled(result);
180194

195+
activities.reset();
196+
181197
// Need to request cancellation, then the activity can respond with the cancel
182198
CompletablePromise<String> completablePromise2 = Workflow.newPromise();
183199
scope =

0 commit comments

Comments
 (0)