Skip to content

Commit fb6083c

Browse files
authored
Refactor replay decider - Remove dependency on requiring full history for decider.Decide() (#191)
1 parent 1f8cdcf commit fb6083c

File tree

6 files changed

+52
-77
lines changed

6 files changed

+52
-77
lines changed

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.uber.cadence.EventType;
2222
import com.uber.cadence.HistoryEvent;
2323
import com.uber.cadence.PollForDecisionTaskResponse;
24-
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2524
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
2625
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
2726
import java.util.ArrayList;
@@ -259,10 +258,6 @@ public PollForDecisionTaskResponse getDecisionTask() {
259258
return decisionTaskWithHistoryIterator.getDecisionTask();
260259
}
261260

262-
public WorkflowExecutionStartedEventAttributes getWorkflowExecutionStartedEventAttributes() {
263-
return decisionTaskWithHistoryIterator.getStartedEvent();
264-
}
265-
266261
@Override
267262
public String toString() {
268263
return WorkflowExecutionUtils.prettyPrintHistory(

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.PollForDecisionTaskResponse;
2323
import com.uber.cadence.TimerFiredEventAttributes;
2424
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
25+
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2526
import com.uber.cadence.WorkflowQuery;
2627
import com.uber.cadence.internal.common.OptionsUtils;
2728
import com.uber.cadence.internal.metrics.MetricsType;
@@ -48,8 +49,6 @@ class ReplayDecider {
4849

4950
private static final int MILLION = 1000000;
5051

51-
private final HistoryHelper historyHelper;
52-
5352
private final DecisionsHelper decisionsHelper;
5453

5554
private final DecisionContextImpl context;
@@ -73,22 +72,23 @@ class ReplayDecider {
7372
ReplayDecider(
7473
String domain,
7574
ReplayWorkflow workflow,
76-
HistoryHelper historyHelper,
7775
DecisionsHelper decisionsHelper,
7876
Scope metricsScope,
7977
boolean enableLoggingInReplay) {
8078
this.workflow = workflow;
81-
this.historyHelper = historyHelper;
8279
this.decisionsHelper = decisionsHelper;
8380
this.metricsScope = metricsScope;
84-
PollForDecisionTaskResponse decisionTask = historyHelper.getDecisionTask();
81+
PollForDecisionTaskResponse decisionTask = decisionsHelper.getTask();
82+
WorkflowExecutionStartedEventAttributes startedEvent =
83+
decisionTask.getHistory().events.get(0).getWorkflowExecutionStartedEventAttributes();
84+
if (startedEvent == null) {
85+
throw new IllegalArgumentException(
86+
"First event in the history is not WorkflowExecutionStarted");
87+
}
88+
8589
context =
8690
new DecisionContextImpl(
87-
decisionsHelper,
88-
domain,
89-
decisionTask,
90-
historyHelper.getWorkflowExecutionStartedEventAttributes(),
91-
enableLoggingInReplay);
91+
decisionsHelper, domain, decisionTask, startedEvent, enableLoggingInReplay);
9292
context.setMetricsScope(metricsScope);
9393
}
9494

@@ -351,11 +351,11 @@ private void handleDecisionTaskCompleted(HistoryEvent event) {
351351
decisionsHelper.handleDecisionCompletion(event.getDecisionTaskCompletedEventAttributes());
352352
}
353353

354-
void decide() throws Throwable {
355-
decideImpl(null);
354+
void decide(HistoryHelper historyHelper) throws Throwable {
355+
decideImpl(historyHelper, null);
356356
}
357357

358-
private void decideImpl(Functions.Proc query) throws Throwable {
358+
private void decideImpl(HistoryHelper historyHelper, Functions.Proc query) throws Throwable {
359359
try {
360360
DecisionEventsIterator iterator = historyHelper.getIterator();
361361
while (iterator.hasNext()) {
@@ -396,9 +396,9 @@ DecisionsHelper getDecisionsHelper() {
396396
return decisionsHelper;
397397
}
398398

399-
public byte[] query(WorkflowQuery query) throws Throwable {
399+
public byte[] query(HistoryHelper historyHelper, WorkflowQuery query) throws Throwable {
400400
AtomicReference<byte[]> result = new AtomicReference<>();
401-
decideImpl(() -> result.set(workflow.query(query)));
401+
decideImpl(historyHelper, () -> result.set(workflow.query(query)));
402402
return result.get();
403403
}
404404
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,15 @@ public DecisionTaskHandler.Result handleDecisionTask(
9797
private Result handleDecisionTaskImpl(DecisionTaskWithHistoryIterator decisionTaskIterator)
9898
throws Throwable {
9999
HistoryHelper historyHelper = new HistoryHelper(decisionTaskIterator);
100-
ReplayDecider decider = createDecider(historyHelper);
101100
PollForDecisionTaskResponse decisionTask = historyHelper.getDecisionTask();
101+
ReplayDecider decider = createDecider(decisionTask);
102+
102103
if (decisionTask.isSetQuery()) {
103104
RespondQueryTaskCompletedRequest queryCompletedRequest =
104105
new RespondQueryTaskCompletedRequest();
105106
queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
106107
try {
107-
byte[] queryResult = decider.query(decisionTask.getQuery());
108+
byte[] queryResult = decider.query(historyHelper, decisionTask.getQuery());
108109
queryCompletedRequest.setQueryResult(queryResult);
109110
queryCompletedRequest.setCompletedType(QueryTaskCompletedType.COMPLETED);
110111
} catch (Throwable e) {
@@ -117,7 +118,7 @@ private Result handleDecisionTaskImpl(DecisionTaskWithHistoryIterator decisionTa
117118
}
118119
return new DecisionTaskHandler.Result(null, null, queryCompletedRequest, null);
119120
} else {
120-
decider.decide();
121+
decider.decide(historyHelper);
121122
DecisionsHelper decisionsHelper = decider.getDecisionsHelper();
122123
List<Decision> decisions = decisionsHelper.getDecisions();
123124
byte[] context = decisionsHelper.getWorkflowContextDataToReturn();
@@ -159,12 +160,11 @@ public boolean isAnyTypeSupported() {
159160
return workflowFactory.isAnyTypeSupported();
160161
}
161162

162-
private ReplayDecider createDecider(HistoryHelper historyHelper) throws Exception {
163-
PollForDecisionTaskResponse decisionTask = historyHelper.getDecisionTask();
163+
private ReplayDecider createDecider(PollForDecisionTaskResponse decisionTask) throws Exception {
164164
WorkflowType workflowType = decisionTask.getWorkflowType();
165165
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask);
166166
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
167167
return new ReplayDecider(
168-
domain, workflow, historyHelper, decisionsHelper, metricsScope, enableLoggingInReplay);
168+
domain, workflow, decisionsHelper, metricsScope, enableLoggingInReplay);
169169
}
170170
}

src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20-
import com.uber.cadence.HistoryEvent;
2120
import com.uber.cadence.PollForDecisionTaskResponse;
2221
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2322

@@ -115,7 +114,6 @@ String getDomain() {
115114
}
116115

117116
private WorkflowExecutionStartedEventAttributes getWorkflowStartedEventAttributes() {
118-
HistoryEvent firstHistoryEvent = decisionTask.getHistory().getEvents().get(0);
119-
return firstHistoryEvent.getWorkflowExecutionStartedEventAttributes();
117+
return startedAttributes;
120118
}
121119
}

src/main/java/com/uber/cadence/internal/worker/DecisionTaskWithHistoryIterator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.uber.cadence.HistoryEvent;
2121
import com.uber.cadence.PollForDecisionTaskResponse;
22-
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2322
import java.util.Iterator;
2423

2524
/** Contains DecisionTask and history iterator that paginates history behind the scene. */
@@ -28,6 +27,4 @@ public interface DecisionTaskWithHistoryIterator {
2827
PollForDecisionTaskResponse getDecisionTask();
2928

3029
Iterator<HistoryEvent> getHistory();
31-
32-
WorkflowExecutionStartedEventAttributes getStartedEvent();
3330
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 30 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,7 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20-
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
21-
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
22-
import com.uber.cadence.History;
23-
import com.uber.cadence.HistoryEvent;
24-
import com.uber.cadence.PollForDecisionTaskResponse;
25-
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
26-
import com.uber.cadence.RespondDecisionTaskFailedRequest;
27-
import com.uber.cadence.RespondQueryTaskCompletedRequest;
28-
import com.uber.cadence.WorkflowExecution;
29-
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
30-
import com.uber.cadence.WorkflowQuery;
20+
import com.uber.cadence.*;
3121
import com.uber.cadence.common.RetryOptions;
3222
import com.uber.cadence.internal.common.Retryer;
3323
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
@@ -250,23 +240,39 @@ private void sendReply(
250240

251241
private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHistoryIterator {
252242

253-
private long start = System.currentTimeMillis();
243+
private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
244+
private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
245+
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
246+
private Duration decisionTaskStartToCloseTimeout;
247+
248+
private final Duration retryServiceOperationExpirationInterval() {
249+
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
250+
return decisionTaskStartToCloseTimeout.minus(passed);
251+
}
252+
254253
private final PollForDecisionTaskResponse task;
255254
private Iterator<HistoryEvent> current;
256255
private byte[] nextPageToken;
257-
private WorkflowExecutionStartedEventAttributes startedEvent;
258256

259257
DecisionTaskWithHistoryIteratorImpl(PollForDecisionTaskResponse task) {
260258
this.task = task;
261259
History history = task.getHistory();
262-
HistoryEvent firstEvent = history.getEvents().get(0);
263-
this.startedEvent = firstEvent.getWorkflowExecutionStartedEventAttributes();
264-
if (this.startedEvent == null) {
265-
throw new IllegalArgumentException(
266-
"First event in the history is not WorkflowExecutionStarted");
267-
}
268260
current = history.getEventsIterator();
269261
nextPageToken = task.getNextPageToken();
262+
263+
for (int i = history.events.size() - 1; i >= 0; i--) {
264+
DecisionTaskScheduledEventAttributes attributes =
265+
history.events.get(i).getDecisionTaskScheduledEventAttributes();
266+
if (attributes != null) {
267+
decisionTaskStartToCloseTimeout =
268+
Duration.ofSeconds(attributes.getStartToCloseTimeoutSeconds());
269+
break;
270+
}
271+
}
272+
273+
if(decisionTaskStartToCloseTimeout == null){
274+
throw new IllegalArgumentException(String.format("PollForDecisionTaskResponse is missing DecisionTaskScheduled event. RunId: %s, WorkflowId: %s", task.getWorkflowExecution().runId, task.getWorkflowExecution().workflowId));
275+
}
270276
}
271277

272278
@Override
@@ -287,21 +293,15 @@ public HistoryEvent next() {
287293
if (current.hasNext()) {
288294
return current.next();
289295
}
290-
Duration passed = Duration.ofMillis(System.currentTimeMillis() - start);
291-
Duration timeout = Duration.ofSeconds(startedEvent.getTaskStartToCloseTimeoutSeconds());
292-
Duration expiration = timeout.minus(passed);
293-
if (expiration.isZero() || expiration.isNegative()) {
294-
throw new Error("History pagination time exceeded TaskStartToCloseTimeout");
295-
}
296296

297297
options.getMetricsScope().counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1);
298298
Stopwatch sw =
299299
options.getMetricsScope().timer(MetricsType.WORKFLOW_GET_HISTORY_LATENCY).start();
300300
RetryOptions retryOptions =
301301
new RetryOptions.Builder()
302-
.setExpiration(expiration)
303-
.setInitialInterval(Duration.ofMillis(50))
304-
.setMaximumInterval(Duration.ofSeconds(1))
302+
.setExpiration(retryServiceOperationExpirationInterval())
303+
.setInitialInterval(retryServiceOperationInitialInterval)
304+
.setMaximumInterval(retryServiceOperationMaxInterval)
305305
.build();
306306

307307
GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
@@ -330,35 +330,25 @@ public HistoryEvent next() {
330330
}
331331
};
332332
}
333-
334-
@Override
335-
public WorkflowExecutionStartedEventAttributes getStartedEvent() {
336-
return startedEvent;
337-
}
338333
}
339334

340335
private static class ReplayDecisionTaskWithHistoryIterator
341336
implements DecisionTaskWithHistoryIterator {
342337

343338
private final Iterator<HistoryEvent> history;
344339
private final PollForDecisionTaskResponse task;
345-
private final WorkflowExecutionStartedEventAttributes startedEvent;
346340
private HistoryEvent first;
347341

348342
private ReplayDecisionTaskWithHistoryIterator(
349343
WorkflowExecution execution, Iterator<HistoryEvent> history) {
350344
this.history = history;
351345
first = history.next();
352-
this.startedEvent = first.getWorkflowExecutionStartedEventAttributes();
353-
if (startedEvent == null) {
354-
throw new IllegalArgumentException(
355-
"First history event is not WorkflowExecutionStarted, but: " + first.getEventType());
356-
}
346+
357347
task = new PollForDecisionTaskResponse();
358348
task.setWorkflowExecution(execution);
359349
task.setStartedEventId(Long.MAX_VALUE);
360350
task.setPreviousStartedEventId(Long.MAX_VALUE);
361-
task.setWorkflowType(startedEvent.getWorkflowType());
351+
task.setWorkflowType(task.getWorkflowType());
362352
}
363353

364354
@Override
@@ -385,10 +375,5 @@ public HistoryEvent next() {
385375
}
386376
};
387377
}
388-
389-
@Override
390-
public WorkflowExecutionStartedEventAttributes getStartedEvent() {
391-
return startedEvent;
392-
}
393378
}
394379
}

0 commit comments

Comments
 (0)