Skip to content

Commit 163502b

Browse files
authored
Fix test environment (#196)
Support Sticky Workflow Scenario Start event ids from 1 instead of 0
1 parent fb6083c commit 163502b

File tree

12 files changed

+306
-17
lines changed

12 files changed

+306
-17
lines changed

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20+
import com.uber.cadence.TaskList;
21+
import com.uber.cadence.TaskListKind;
2022
import com.uber.cadence.workflow.WorkflowMethod;
2123
import java.lang.reflect.Method;
2224

@@ -64,6 +66,20 @@ public static Method getWorkflowMethod(Class<?> workflowInterface) {
6466
return result;
6567
}
6668

69+
public static TaskList createStickyTaskList(String taskListName) {
70+
TaskList tl = new TaskList();
71+
tl.setName(taskListName);
72+
tl.setKind(TaskListKind.STICKY);
73+
return tl;
74+
}
75+
76+
public static TaskList createNormalTaskList(String taskListName) {
77+
TaskList tl = new TaskList();
78+
tl.setName(taskListName);
79+
tl.setKind(TaskListKind.NORMAL);
80+
return tl;
81+
}
82+
6783
/** Prohibit instantiation */
6884
private InternalUtils() {}
6985
}

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ public void close() {
284284
threads.clear();
285285

286286
// We cannot use an iterator to unregister failed Promises since f.get()
287-
// will remove the promise directly from failedPromises. This causes an ConcurrentModificationException
287+
// will remove the promise directly from failedPromises. This causes an
288+
// ConcurrentModificationException
288289
// For this reason we will loop over a copy of failedPromises.
289290
Set<Promise> failedPromisesLoop = new HashSet<>(failedPromises);
290291
for (Promise f : failedPromisesLoop) {

src/main/java/com/uber/cadence/internal/testservice/StateMachines.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,11 @@ private static void startDecisionTask(
728728
.getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest)
729729
.getHistory()
730730
.getEvents();
731+
732+
if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
733+
events = events.subList((int) data.previousStartedEventId, events.size());
734+
}
735+
// get it from pervious started event id.
731736
} catch (EntityNotExistsError entityNotExistsError) {
732737
throw new InternalServiceError(entityNotExistsError.toString());
733738
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableState.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.uber.cadence.SignalWorkflowExecutionRequest;
4949
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
5050
import com.uber.cadence.StartWorkflowExecutionRequest;
51+
import com.uber.cadence.StickyExecutionAttributes;
5152
import com.uber.cadence.WorkflowExecutionCloseStatus;
5253
import com.uber.cadence.internal.testservice.TestWorkflowMutableStateImpl.QueryId;
5354
import java.util.Optional;
@@ -136,4 +137,6 @@ void cancelActivityTaskById(String id, RespondActivityTaskCanceledByIDRequest ca
136137

137138
void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest completeRequest)
138139
throws EntityNotExistsError;
140+
141+
StickyExecutionAttributes getStickyExecutionAttributes();
139142
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
6969
import com.uber.cadence.StartTimerDecisionAttributes;
7070
import com.uber.cadence.StartWorkflowExecutionRequest;
71+
import com.uber.cadence.StickyExecutionAttributes;
7172
import com.uber.cadence.TimeoutType;
7273
import com.uber.cadence.WorkflowExecutionCloseStatus;
7374
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
@@ -137,6 +138,7 @@ void apply(RequestContext ctx)
137138
private long lastNonFailedDecisionStartEventId;
138139
private final Map<String, CompletableFuture<QueryWorkflowResponse>> queries =
139140
new ConcurrentHashMap<>();
141+
public StickyExecutionAttributes stickyExecutionAttributes;
140142

141143
/** @param parentChildInitiatedEventId id of the child initiated event in the parent history */
142144
TestWorkflowMutableStateImpl(
@@ -164,9 +166,10 @@ private void update(UpdateProcedure updater)
164166
update(false, updater, stackTraceElements[2].getMethodName());
165167
}
166168

167-
private void completeDecisionUpdate(UpdateProcedure updater)
169+
private void completeDecisionUpdate(UpdateProcedure updater, StickyExecutionAttributes attributes)
168170
throws InternalServiceError, EntityNotExistsError, BadRequestError {
169171
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
172+
stickyExecutionAttributes = attributes;
170173
update(true, updater, stackTraceElements[2].getMethodName());
171174
}
172175

@@ -175,11 +178,13 @@ private void update(boolean completeDecisionUpdate, UpdateProcedure updater, Str
175178
String callerInfo = "Decision Update from " + caller;
176179
lock.lock();
177180
LockHandle lockHandle = selfAdvancingTimer.lockTimeSkipping(callerInfo);
181+
178182
try {
179183
checkCompleted();
180184
boolean concurrentDecision =
181185
!completeDecisionUpdate
182186
&& (decision != null && decision.getState() == StateMachines.State.STARTED);
187+
183188
RequestContext ctx = new RequestContext(clock, this, nextEventId);
184189
updater.apply(ctx);
185190
if (concurrentDecision && workflow.getState() != State.TIMED_OUT) {
@@ -231,6 +236,11 @@ public StartWorkflowExecutionRequest getStartRequest() {
231236
return startRequest;
232237
}
233238

239+
@Override
240+
public StickyExecutionAttributes getStickyExecutionAttributes() {
241+
return stickyExecutionAttributes;
242+
}
243+
234244
@Override
235245
public void startDecisionTask(
236246
PollForDecisionTaskResponse task, PollForDecisionTaskRequest pollRequest)
@@ -241,7 +251,9 @@ public void startDecisionTask(
241251
long scheduledEventId = decision.getData().scheduledEventId;
242252
decision.action(StateMachines.Action.START, ctx, pollRequest, 0);
243253
ctx.addTimer(
244-
startRequest.getTaskStartToCloseTimeoutSeconds(),
254+
stickyExecutionAttributes != null
255+
? stickyExecutionAttributes.getScheduleToStartTimeoutSeconds()
256+
: startRequest.getTaskStartToCloseTimeoutSeconds(),
245257
() -> timeoutDecisionTask(scheduledEventId),
246258
"DecisionTask StartToCloseTimeout");
247259
});
@@ -254,7 +266,7 @@ public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRe
254266
List<Decision> decisions = request.getDecisions();
255267
completeDecisionUpdate(
256268
ctx -> {
257-
if (ctx.getInitialEventId() != historySize) {
269+
if (ctx.getInitialEventId() != historySize + 1) {
258270
throw new BadRequestError(
259271
"Expired decision: expectedHistorySize="
260272
+ historySize
@@ -275,6 +287,9 @@ public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRe
275287
ctx.add(deferredCtx);
276288
}
277289
this.concurrentToDecision.clear();
290+
291+
// Reset sticky execution attributes on failure
292+
stickyExecutionAttributes = null;
278293
scheduleDecision(ctx);
279294
return;
280295
}
@@ -299,7 +314,8 @@ public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRe
299314
}
300315
this.concurrentToDecision.clear();
301316
ctx.unlockTimer();
302-
});
317+
},
318+
request.getStickyAttributes());
303319
}
304320

305321
private boolean hasCompleteDecision(List<Decision> decisions) {
@@ -567,7 +583,8 @@ public void failDecisionTask(RespondDecisionTaskFailedRequest request)
567583
ctx -> {
568584
decision.action(Action.FAIL, ctx, request, 0);
569585
scheduleDecision(ctx);
570-
});
586+
},
587+
null); // reset sticky attributes to null
571588
}
572589

573590
// TODO: insert a single decision timeout into the history
@@ -581,7 +598,8 @@ private void timeoutDecisionTask(long scheduledEventId) {
581598
}
582599
decision.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
583600
scheduleDecision(ctx);
584-
});
601+
},
602+
null); // reset sticky attributes to null
585603
} catch (EntityNotExistsError e) {
586604
// Expected as timers are not removed
587605
} catch (Exception e) {

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,18 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques
266266
TestWorkflowMutableState mutableState = getMutableState(executionId);
267267
try {
268268
mutableState.startDecisionTask(task, pollRequest);
269+
// The task always has the original tasklist is was created on as part of the response. This
270+
// may different
271+
// then the task list it was scheduled on as in the case of sticky execution.
272+
task.setWorkflowExecutionTaskList(mutableState.getStartRequest().taskList);
269273
return task;
270274
} catch (EntityNotExistsError e) {
271275
if (log.isDebugEnabled()) {
272276
log.debug("Skipping outdated decision task for " + executionId, e);
273277
}
274278
// skip the task
275279
}
280+
task.setWorkflowExecutionTaskList(mutableState.getStartRequest().taskList);
276281
return task;
277282
}
278283

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.uber.cadence.PollForActivityTaskResponse;
3030
import com.uber.cadence.PollForDecisionTaskRequest;
3131
import com.uber.cadence.PollForDecisionTaskResponse;
32+
import com.uber.cadence.StickyExecutionAttributes;
3233
import com.uber.cadence.WorkflowExecution;
3334
import com.uber.cadence.WorkflowExecutionInfo;
3435
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
@@ -75,7 +76,7 @@ public List<HistoryEvent> getHistory() {
7576
}
7677

7778
private void checkNextEventId(long nextEventId) {
78-
if (nextEventId != history.size()) {
79+
if (nextEventId != history.size() + 1L && (nextEventId != 0 && history.size() != 0)) {
7980
throw new IllegalStateException(
8081
"NextEventId=" + nextEventId + ", historySize=" + history.size() + " for " + id);
8182
}
@@ -88,7 +89,7 @@ void addAllLocked(List<HistoryEvent> events, long timeInNanos) throws EntityNotE
8889
"Attempt to add an event after a completion event: "
8990
+ WorkflowExecutionUtils.prettyPrintHistoryEvent(event));
9091
}
91-
event.setEventId(history.size());
92+
event.setEventId(history.size() + 1L);
9293
event.setTimestamp(timeInNanos);
9394
history.add(event);
9495
completed = completed || WorkflowExecutionUtils.isWorkflowExecutionCompletedEvent(event);
@@ -97,7 +98,7 @@ void addAllLocked(List<HistoryEvent> events, long timeInNanos) throws EntityNotE
9798
}
9899

99100
long getNextEventIdLocked() {
100-
return history.size();
101+
return history.size() + 1L;
101102
}
102103

103104
List<HistoryEvent> getEventsLocked() {
@@ -197,9 +198,18 @@ public long save(RequestContext ctx) throws InternalServiceError, EntityNotExist
197198
}
198199
// Push tasks to the queues out of locks
199200
DecisionTask decisionTask = ctx.getDecisionTask();
201+
200202
if (decisionTask != null) {
201-
BlockingQueue<PollForDecisionTaskResponse> decisionsQueue =
202-
getDecisionTaskListQueue(decisionTask.getTaskListId());
203+
StickyExecutionAttributes attributes =
204+
ctx.getWorkflowMutableState().getStickyExecutionAttributes();
205+
TaskListId id =
206+
new TaskListId(
207+
decisionTask.getTaskListId().getDomain(),
208+
attributes == null
209+
? decisionTask.getTaskListId().getTaskListName()
210+
: attributes.getWorkerTaskList().getName());
211+
212+
BlockingQueue<PollForDecisionTaskResponse> decisionsQueue = getDecisionTaskListQueue(id);
203213
decisionsQueue.add(decisionTask.getTask());
204214
}
205215

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,11 @@ private final Duration retryServiceOperationExpirationInterval() {
270270
}
271271
}
272272

273-
if(decisionTaskStartToCloseTimeout == null){
274-
throw new IllegalArgumentException(String.format("PollForDecisionTaskResponse is missing DecisionTaskScheduled event. RunId: %s, WorkflowId: %s", task.getWorkflowExecution().runId, task.getWorkflowExecution().workflowId));
273+
if (decisionTaskStartToCloseTimeout == null) {
274+
throw new IllegalArgumentException(
275+
String.format(
276+
"PollForDecisionTaskResponse is missing DecisionTaskScheduled event. RunId: %s, WorkflowId: %s",
277+
task.getWorkflowExecution().runId, task.getWorkflowExecution().workflowId));
275278
}
276279
}
277280

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.testing;
19+
20+
import static com.uber.cadence.internal.common.InternalUtils.createNormalTaskList;
21+
import static com.uber.cadence.internal.common.InternalUtils.createStickyTaskList;
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertNotNull;
24+
25+
import com.uber.cadence.EventType;
26+
import com.uber.cadence.HistoryEvent;
27+
import com.uber.cadence.PollForDecisionTaskResponse;
28+
import com.uber.cadence.internal.testservice.TestWorkflowService;
29+
import com.uber.cadence.testUtils.TestServiceUtils;
30+
import java.time.Duration;
31+
import java.util.List;
32+
import org.junit.Test;
33+
34+
public class WorkflowStickynessTest {
35+
36+
private final String DOMAIN = "domain";
37+
private final String TASK_LIST = "taskList";
38+
private final String HOST_TASKLIST = "stickyTaskList";
39+
private final String WORKFLOW_TYPE = "wfType";
40+
private final String CALLER = "WorkflowStickynessTest";
41+
42+
@Test
43+
public void taskCompletionWithStickyExecutionAttributesWillScheduleDecisionsOnStickyTaskList()
44+
throws Exception {
45+
46+
TestWorkflowService service = new TestWorkflowService();
47+
service.lockTimeSkipping(CALLER);
48+
49+
TestServiceUtils.startWorkflowExecution(DOMAIN, TASK_LIST, WORKFLOW_TYPE, service);
50+
PollForDecisionTaskResponse response =
51+
TestServiceUtils.pollForDecisionTask(DOMAIN, createNormalTaskList(TASK_LIST), service);
52+
53+
TestServiceUtils.respondDecisionTaskCompletedWithSticky(
54+
response.taskToken, HOST_TASKLIST, service);
55+
TestServiceUtils.signalWorkflow(response.workflowExecution, DOMAIN, service);
56+
response =
57+
TestServiceUtils.pollForDecisionTask(DOMAIN, createStickyTaskList(HOST_TASKLIST), service);
58+
59+
assertEquals(4, response.history.getEventsSize());
60+
assertEquals(TASK_LIST, response.getWorkflowExecutionTaskList().getName());
61+
List<HistoryEvent> events = response.history.getEvents();
62+
assertEquals(EventType.DecisionTaskCompleted, events.get(0).eventType);
63+
assertEquals(EventType.WorkflowExecutionSignaled, events.get(1).eventType);
64+
assertEquals(EventType.DecisionTaskScheduled, events.get(2).eventType);
65+
assertEquals(EventType.DecisionTaskStarted, events.get(3).eventType);
66+
}
67+
68+
@Test
69+
public void taskFailureWillRescheduleTheTaskOnTheGlobalList() throws Exception {
70+
71+
TestWorkflowService service = new TestWorkflowService();
72+
service.lockTimeSkipping(CALLER);
73+
74+
TestServiceUtils.startWorkflowExecution(DOMAIN, TASK_LIST, WORKFLOW_TYPE, service);
75+
PollForDecisionTaskResponse response =
76+
TestServiceUtils.pollForDecisionTask(DOMAIN, createNormalTaskList(TASK_LIST), service);
77+
78+
TestServiceUtils.respondDecisionTaskCompletedWithSticky(
79+
response.taskToken, HOST_TASKLIST, service);
80+
TestServiceUtils.signalWorkflow(response.workflowExecution, DOMAIN, service);
81+
response =
82+
TestServiceUtils.pollForDecisionTask(DOMAIN, createStickyTaskList(HOST_TASKLIST), service);
83+
TestServiceUtils.respondDecisionTaskFailedWithSticky(response.taskToken, service);
84+
response =
85+
TestServiceUtils.pollForDecisionTask(DOMAIN, createNormalTaskList(TASK_LIST), service);
86+
87+
// Assert Full history
88+
// Make sure first is workflow execution started
89+
assertNotNull(response.history.events.get(0).getWorkflowExecutionStartedEventAttributes());
90+
// 10 is the expected number of events for the full history.
91+
assertEquals(10, response.history.getEventsSize());
92+
}
93+
94+
@Test
95+
public void taskTimeoutWillRescheduleTheTaskOnTheGlobalList() throws Exception {
96+
97+
TestWorkflowService service = new TestWorkflowService();
98+
service.lockTimeSkipping(CALLER);
99+
TestServiceUtils.startWorkflowExecution(DOMAIN, TASK_LIST, WORKFLOW_TYPE, 10, 2, service);
100+
PollForDecisionTaskResponse response =
101+
TestServiceUtils.pollForDecisionTask(DOMAIN, createNormalTaskList(TASK_LIST), service);
102+
103+
TestServiceUtils.respondDecisionTaskCompletedWithSticky(
104+
response.taskToken, HOST_TASKLIST, 1, service);
105+
TestServiceUtils.signalWorkflow(response.workflowExecution, DOMAIN, service);
106+
TestServiceUtils.pollForDecisionTask(DOMAIN, createStickyTaskList(HOST_TASKLIST), service);
107+
service.unlockTimeSkipping(CALLER);
108+
service.sleep(Duration.ofMillis(1100));
109+
110+
response =
111+
TestServiceUtils.pollForDecisionTask(DOMAIN, createNormalTaskList(TASK_LIST), service);
112+
113+
// Assert Full history
114+
// Make sure first is workflow execution started
115+
assertNotNull(response.history.events.get(0).getWorkflowExecutionStartedEventAttributes());
116+
// 10 is the expected number of events for the full history.
117+
assertEquals(10, response.history.getEventsSize());
118+
}
119+
}

src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
public class WorkflowTestingTest {
7575
private static final Logger log = LoggerFactory.getLogger(WorkflowTestingTest.class);
7676

77-
@Rule public Timeout globalTimeout = Timeout.seconds(5);
77+
@Rule public Timeout globalTimeout = Timeout.seconds(5000);
7878

7979
@Rule
8080
public TestWatcher watchman =

0 commit comments

Comments
 (0)