Skip to content

Commit 39f5baa

Browse files
authored
fix ConcurrentModificationException in DeterministicRunnerImpl (#194)
* fix ConcurrentModificationException in deterministic runner implementation
1 parent 95896f2 commit 39f5baa

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,12 @@ public void close() {
282282
c.stop();
283283
}
284284
threads.clear();
285-
for (Promise<?> f : failedPromises) {
285+
286+
// We cannot use an iterator to unregister failed Promises since f.get()
287+
// will remove the promise directly from failedPromises. This causes an ConcurrentModificationException
288+
// For this reason we will loop over a copy of failedPromises.
289+
Set<Promise> failedPromisesLoop = new HashSet<>(failedPromises);
290+
for (Promise f : failedPromisesLoop) {
286291
if (!f.isCompleted()) {
287292
throw new Error("expected failed");
288293
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.junit.Assert.assertTrue;
2626
import static org.junit.Assert.fail;
2727

28+
import com.google.common.util.concurrent.UncheckedExecutionException;
2829
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
2930
import com.uber.cadence.TimeoutType;
3031
import com.uber.cadence.WorkflowExecution;
@@ -634,6 +635,44 @@ public void testSyncUntypedAndStackTrace() throws InterruptedException {
634635
assertEquals("activity10", result);
635636
}
636637

638+
public static class TestCancellationForWorkflowsWithFailedPromises implements TestWorkflow1 {
639+
640+
@Override
641+
public String execute(String taskList) {
642+
Promise<String> failedPromise =
643+
Async.function(
644+
() -> {
645+
throw new UncheckedExecutionException(new Exception("Oh noo!"));
646+
});
647+
Promise<String> failedPromise2 =
648+
Async.function(
649+
() -> {
650+
throw new UncheckedExecutionException(new Exception("Oh noo again!"));
651+
});
652+
Workflow.await(() -> false);
653+
fail("unreachable");
654+
return "done";
655+
}
656+
}
657+
658+
@Test
659+
public void WorkflowsWithFailedPromisesCanBeCancelled() {
660+
worker.registerWorkflowImplementationTypes(
661+
TestCancellationForWorkflowsWithFailedPromises.class);
662+
testEnvironment.start();
663+
WorkflowStub client =
664+
workflowClient.newUntypedWorkflowStub(
665+
"TestWorkflow1::execute", newWorkflowOptionsBuilder(taskList).build());
666+
client.start(taskList);
667+
client.cancel();
668+
669+
try {
670+
client.getResult(String.class);
671+
fail("unreachable");
672+
} catch (CancellationException ignored) {
673+
}
674+
}
675+
637676
@Test
638677
public void testWorkflowCancellation() {
639678
startWorkerFor(TestSyncWorkflowImpl.class);

0 commit comments

Comments
 (0)