Skip to content

Commit 60a7fdb

Browse files
authored
2. Wire sticky poller and cache (#202)
1 parent 07c389c commit 60a7fdb

21 files changed

+859
-296
lines changed

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -553,12 +553,34 @@ public static WorkflowExecutionInfo describeWorkflowInstance(
553553
return instanceMetadata;
554554
}
555555

556+
public static GetWorkflowExecutionHistoryResponse getHistoryPage(
557+
byte[] nextPageToken,
558+
IWorkflowService service,
559+
String domain,
560+
WorkflowExecution workflowExecution) {
561+
562+
GetWorkflowExecutionHistoryRequest getHistoryRequest = new GetWorkflowExecutionHistoryRequest();
563+
getHistoryRequest.setDomain(domain);
564+
getHistoryRequest.setExecution(workflowExecution);
565+
getHistoryRequest.setNextPageToken(nextPageToken);
566+
567+
GetWorkflowExecutionHistoryResponse history;
568+
try {
569+
history = service.GetWorkflowExecutionHistory(getHistoryRequest);
570+
} catch (TException e) {
571+
throw new Error(e);
572+
}
573+
if (history == null) {
574+
throw new IllegalArgumentException("unknown workflow execution: " + workflowExecution);
575+
}
576+
return history;
577+
}
578+
556579
/** Returns workflow instance history in a human readable format. */
557580
public static String prettyPrintHistory(
558581
IWorkflowService service, String domain, WorkflowExecution workflowExecution) {
559582
return prettyPrintHistory(service, domain, workflowExecution, true);
560583
}
561-
562584
/**
563585
* Returns workflow instance history in a human readable format.
564586
*
@@ -607,29 +629,6 @@ private void getNextPage() {
607629
};
608630
}
609631

610-
public static GetWorkflowExecutionHistoryResponse getHistoryPage(
611-
byte[] nextPageToken,
612-
IWorkflowService service,
613-
String domain,
614-
WorkflowExecution workflowExecution) {
615-
616-
GetWorkflowExecutionHistoryRequest getHistoryRequest = new GetWorkflowExecutionHistoryRequest();
617-
getHistoryRequest.setDomain(domain);
618-
getHistoryRequest.setExecution(workflowExecution);
619-
getHistoryRequest.setNextPageToken(nextPageToken);
620-
621-
GetWorkflowExecutionHistoryResponse history;
622-
try {
623-
history = service.GetWorkflowExecutionHistory(getHistoryRequest);
624-
} catch (TException e) {
625-
throw new Error(e);
626-
}
627-
if (history == null) {
628-
throw new IllegalArgumentException("unknown workflow execution: " + workflowExecution);
629-
}
630-
return history;
631-
}
632-
633632
/**
634633
* Returns workflow instance history in a human readable format.
635634
*

src/main/java/com/uber/cadence/internal/replay/Decider.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.PollForDecisionTaskResponse;
2021
import com.uber.cadence.WorkflowQuery;
21-
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
2222

2323
public interface Decider {
2424

2525
// TODO: refactor in future CR. Merge methods and decide should return a list of decisions.
26-
void decide(DecisionTaskWithHistoryIterator iterator) throws Throwable;
26+
void decide(PollForDecisionTaskResponse decisionTask) throws Throwable;
2727

28-
byte[] query(DecisionTaskWithHistoryIterator decisionTaskIterator, WorkflowQuery query)
29-
throws Throwable;
28+
byte[] query(PollForDecisionTaskResponse decisionTask, WorkflowQuery query) throws Throwable;
3029

3130
void close();
3231
}

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ public long size() {
9898
}
9999

100100
private boolean isFullHistory(PollForDecisionTaskResponse decisionTask) {
101-
return decisionTask.history.events.get(0).getEventId() == 1;
101+
return decisionTask.getHistory() != null
102+
&& decisionTask.getHistory().getEventsSize() > 0
103+
&& decisionTask.history.events.get(0).getEventId() == 1;
102104
}
103105

104106
public void invalidateAll() {

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,35 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import com.uber.cadence.EventType;
21+
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
22+
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
23+
import com.uber.cadence.History;
2124
import com.uber.cadence.HistoryEvent;
2225
import com.uber.cadence.PollForDecisionTaskResponse;
2326
import com.uber.cadence.TimerFiredEventAttributes;
2427
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
2528
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
2629
import com.uber.cadence.WorkflowQuery;
30+
import com.uber.cadence.common.RetryOptions;
2731
import com.uber.cadence.internal.common.OptionsUtils;
32+
import com.uber.cadence.internal.common.Retryer;
2833
import com.uber.cadence.internal.metrics.MetricsType;
2934
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
3035
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEventsIterator;
3136
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
3237
import com.uber.cadence.internal.worker.WorkflowExecutionException;
38+
import com.uber.cadence.serviceclient.IWorkflowService;
3339
import com.uber.cadence.workflow.Functions;
3440
import com.uber.m3.tally.Scope;
41+
import com.uber.m3.tally.Stopwatch;
3542
import java.time.Duration;
43+
import java.util.Iterator;
44+
import java.util.Objects;
3645
import java.util.concurrent.CancellationException;
3746
import java.util.concurrent.TimeUnit;
3847
import java.util.concurrent.atomic.AtomicReference;
3948
import java.util.function.Consumer;
49+
import org.apache.thrift.TException;
4050
import org.slf4j.Logger;
4151
import org.slf4j.LoggerFactory;
4252

@@ -48,12 +58,13 @@ class ReplayDecider implements Decider {
4858

4959
private static final Logger log = LoggerFactory.getLogger(ReplayDecider.class);
5060

51-
private static final int MILLION = 1000000;
61+
private static final int MAXIMUM_PAGE_SIZE = 10000;
5262

5363
private final DecisionsHelper decisionsHelper;
5464

5565
private final DecisionContextImpl context;
5666

67+
private IWorkflowService service;
5768
private ReplayWorkflow workflow;
5869

5970
private boolean cancelRequested;
@@ -70,18 +81,23 @@ class ReplayDecider implements Decider {
7081

7182
private long wfStartTime = -1;
7283

84+
private final WorkflowExecutionStartedEventAttributes startedEvent;
85+
7386
ReplayDecider(
87+
IWorkflowService service,
7488
String domain,
7589
ReplayWorkflow workflow,
7690
DecisionsHelper decisionsHelper,
7791
Scope metricsScope,
7892
boolean enableLoggingInReplay) {
93+
this.service = service;
7994
this.workflow = workflow;
8095
this.decisionsHelper = decisionsHelper;
8196
this.metricsScope = metricsScope;
8297
PollForDecisionTaskResponse decisionTask = decisionsHelper.getTask();
83-
WorkflowExecutionStartedEventAttributes startedEvent =
84-
decisionTask.getHistory().events.get(0).getWorkflowExecutionStartedEventAttributes();
98+
99+
startedEvent =
100+
decisionTask.getHistory().getEvents().get(0).getWorkflowExecutionStartedEventAttributes();
85101
if (startedEvent == null) {
86102
throw new IllegalArgumentException(
87103
"First event in the history is not WorkflowExecutionStarted");
@@ -349,15 +365,16 @@ private void handleDecisionTaskCompleted(HistoryEvent event) {
349365
}
350366

351367
@Override
352-
public void decide(DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator)
353-
throws Throwable {
354-
decideImpl(decisionTaskWithHistoryIterator, null);
368+
public void decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
369+
decideImpl(decisionTask, null);
355370
}
356371

357-
private void decideImpl(
358-
DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator, Functions.Proc query)
372+
private void decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query)
359373
throws Throwable {
360374
try {
375+
DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator =
376+
new DecisionTaskWithHistoryIteratorImpl(
377+
decisionTask, Duration.ofSeconds(startedEvent.getTaskStartToCloseTimeoutSeconds()));
361378
HistoryHelper historyHelper = new HistoryHelper(decisionTaskWithHistoryIterator);
362379
DecisionEventsIterator iterator = historyHelper.getIterator();
363380
if ((decisionsHelper.getNextDecisionEventId()
@@ -419,10 +436,86 @@ DecisionsHelper getDecisionsHelper() {
419436
}
420437

421438
@Override
422-
public byte[] query(DecisionTaskWithHistoryIterator decisionTaskIterator, WorkflowQuery query)
423-
throws Throwable {
439+
public byte[] query(PollForDecisionTaskResponse response, WorkflowQuery query) throws Throwable {
424440
AtomicReference<byte[]> result = new AtomicReference<>();
425-
decideImpl(decisionTaskIterator, () -> result.set(workflow.query(query)));
441+
decideImpl(response, () -> result.set(workflow.query(query)));
426442
return result.get();
427443
}
444+
445+
private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHistoryIterator {
446+
447+
private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
448+
private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
449+
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
450+
private Duration decisionTaskStartToCloseTimeout;
451+
452+
private final Duration retryServiceOperationExpirationInterval() {
453+
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
454+
return decisionTaskStartToCloseTimeout.minus(passed);
455+
}
456+
457+
private final PollForDecisionTaskResponse task;
458+
private Iterator<HistoryEvent> current;
459+
private byte[] nextPageToken;
460+
461+
DecisionTaskWithHistoryIteratorImpl(
462+
PollForDecisionTaskResponse task, Duration decisionTaskStartToCloseTimeout) {
463+
this.task = Objects.requireNonNull(task);
464+
this.decisionTaskStartToCloseTimeout =
465+
Objects.requireNonNull(decisionTaskStartToCloseTimeout);
466+
467+
History history = task.getHistory();
468+
current = history.getEventsIterator();
469+
nextPageToken = task.getNextPageToken();
470+
}
471+
472+
@Override
473+
public PollForDecisionTaskResponse getDecisionTask() {
474+
return task;
475+
}
476+
477+
@Override
478+
public Iterator<HistoryEvent> getHistory() {
479+
return new Iterator<HistoryEvent>() {
480+
@Override
481+
public boolean hasNext() {
482+
return current.hasNext() || nextPageToken != null;
483+
}
484+
485+
@Override
486+
public HistoryEvent next() {
487+
if (current.hasNext()) {
488+
return current.next();
489+
}
490+
491+
metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1);
492+
Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_GET_HISTORY_LATENCY).start();
493+
RetryOptions retryOptions =
494+
new RetryOptions.Builder()
495+
.setExpiration(retryServiceOperationExpirationInterval())
496+
.setInitialInterval(retryServiceOperationInitialInterval)
497+
.setMaximumInterval(retryServiceOperationMaxInterval)
498+
.build();
499+
500+
GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
501+
request.setDomain(context.getDomain());
502+
request.setExecution(task.getWorkflowExecution());
503+
request.setMaximumPageSize(MAXIMUM_PAGE_SIZE);
504+
try {
505+
GetWorkflowExecutionHistoryResponse r =
506+
Retryer.retryWithResult(
507+
retryOptions, () -> service.GetWorkflowExecutionHistory(request));
508+
current = r.getHistory().getEventsIterator();
509+
nextPageToken = r.getNextPageToken();
510+
metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_SUCCEED_COUNTER).inc(1);
511+
sw.stop();
512+
} catch (TException e) {
513+
metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_FAILED_COUNTER).inc(1);
514+
throw new Error(e);
515+
}
516+
return current.next();
517+
}
518+
};
519+
}
520+
}
428521
}

0 commit comments

Comments
 (0)