Skip to content

Commit 07c389c

Browse files
authored
add dispatcher and replay decider cache (#197)
* add dispatcher and replay decider cache
1 parent eed6ca3 commit 07c389c

15 files changed

+1075
-77
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dependencies {
4949
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
5050
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
5151
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
52-
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.1'
52+
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.3'
5353
compile group: 'com.google.guava', name: 'guava', version: '25.1-jre'
5454
testCompile group: 'junit', name: 'junit', version: '4.12'
5555
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.common;
19+
20+
public interface ThrowableFunc1<T, R, E extends Throwable> {
21+
R apply(T t) throws E;
22+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.replay;
19+
20+
import com.uber.cadence.WorkflowQuery;
21+
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
22+
23+
public interface Decider {
24+
25+
// TODO: refactor in future CR. Merge methods and decide should return a list of decisions.
26+
void decide(DecisionTaskWithHistoryIterator iterator) throws Throwable;
27+
28+
byte[] query(DecisionTaskWithHistoryIterator decisionTaskIterator, WorkflowQuery query)
29+
throws Throwable;
30+
31+
void close();
32+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.replay;
19+
20+
import com.google.common.base.Preconditions;
21+
import com.google.common.cache.CacheBuilder;
22+
import com.google.common.cache.CacheLoader;
23+
import com.google.common.cache.LoadingCache;
24+
import com.google.common.cache.Weigher;
25+
import com.uber.cadence.PollForDecisionTaskResponse;
26+
import com.uber.cadence.internal.common.ThrowableFunc1;
27+
import java.util.UUID;
28+
29+
public final class DeciderCache {
30+
private final String evictionEntryId = UUID.randomUUID().toString();
31+
private final int maxCacheSize;
32+
private LoadingCache<String, WeightedCacheEntry<Decider>> cache;
33+
34+
public DeciderCache(int maxCacheSize) {
35+
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
36+
this.maxCacheSize = maxCacheSize;
37+
this.cache =
38+
CacheBuilder.newBuilder()
39+
.maximumWeight(maxCacheSize)
40+
.weigher(
41+
(Weigher<String, WeightedCacheEntry<Decider>>) (key, value) -> value.getWeight())
42+
.removalListener(
43+
e -> {
44+
Decider entry = e.getValue().entry;
45+
if (entry != null) {
46+
entry.close();
47+
}
48+
})
49+
.build(
50+
new CacheLoader<String, WeightedCacheEntry<Decider>>() {
51+
@Override
52+
public WeightedCacheEntry<Decider> load(String key) {
53+
return null;
54+
}
55+
});
56+
}
57+
58+
public Decider getOrCreate(
59+
PollForDecisionTaskResponse decisionTask,
60+
ThrowableFunc1<PollForDecisionTaskResponse, Decider, Exception> createReplayDecider)
61+
throws Exception {
62+
String runId = decisionTask.getWorkflowExecution().getRunId();
63+
if (isFullHistory(decisionTask)) {
64+
cache.invalidate(runId);
65+
return cache.get(
66+
runId, () -> new WeightedCacheEntry<>(createReplayDecider.apply(decisionTask), 1))
67+
.entry;
68+
}
69+
return getUnchecked(runId);
70+
}
71+
72+
public Decider getUnchecked(String runId) throws Exception {
73+
try {
74+
return cache.getUnchecked(runId).entry;
75+
} catch (CacheLoader.InvalidCacheLoadException e) {
76+
throw new EvictedException(runId);
77+
}
78+
}
79+
80+
public void evictNext() {
81+
int remainingSpace = (int) (maxCacheSize - cache.size());
82+
// Force eviction to happen
83+
cache.put(evictionEntryId, new WeightedCacheEntry<>(null, remainingSpace + 1));
84+
cache.invalidate(evictionEntryId);
85+
}
86+
87+
public void invalidate(PollForDecisionTaskResponse decisionTask) {
88+
String runId = decisionTask.getWorkflowExecution().getRunId();
89+
invalidate(runId);
90+
}
91+
92+
public void invalidate(String runId) {
93+
cache.invalidate(runId);
94+
}
95+
96+
public long size() {
97+
return cache.size();
98+
}
99+
100+
private boolean isFullHistory(PollForDecisionTaskResponse decisionTask) {
101+
return decisionTask.history.events.get(0).getEventId() == 1;
102+
}
103+
104+
public void invalidateAll() {
105+
cache.invalidateAll();
106+
}
107+
108+
// Used for eviction
109+
private static class WeightedCacheEntry<T> {
110+
private T entry;
111+
private int weight;
112+
113+
private WeightedCacheEntry(T entry, int weight) {
114+
this.entry = entry;
115+
this.weight = weight;
116+
}
117+
118+
public T getEntry() {
119+
return entry;
120+
}
121+
122+
public int getWeight() {
123+
return weight;
124+
}
125+
}
126+
127+
public static class EvictedException extends Exception {
128+
129+
public EvictedException(String runId) {
130+
super(String.format("cache was evicted for the decisionTask. RunId: %s", runId));
131+
}
132+
}
133+
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.uber.cadence.internal.metrics.MetricsType;
2929
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
3030
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEventsIterator;
31+
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
3132
import com.uber.cadence.internal.worker.WorkflowExecutionException;
3233
import com.uber.cadence.workflow.Functions;
3334
import com.uber.m3.tally.Scope;
@@ -43,7 +44,7 @@
4344
* Implements decider that relies on replay of a worklfow code. An instance of this class is created
4445
* per decision.
4546
*/
46-
class ReplayDecider {
47+
class ReplayDecider implements Decider {
4748

4849
private static final Logger log = LoggerFactory.getLogger(ReplayDecider.class);
4950

@@ -92,10 +93,6 @@ class ReplayDecider {
9293
context.setMetricsScope(metricsScope);
9394
}
9495

95-
public boolean isCancelRequested() {
96-
return cancelRequested;
97-
}
98-
9996
private void handleWorkflowExecutionStarted(HistoryEvent event) throws Exception {
10097
workflow.start(event, context);
10198
}
@@ -351,15 +348,33 @@ private void handleDecisionTaskCompleted(HistoryEvent event) {
351348
decisionsHelper.handleDecisionCompletion(event.getDecisionTaskCompletedEventAttributes());
352349
}
353350

354-
void decide(HistoryHelper historyHelper) throws Throwable {
355-
decideImpl(historyHelper, null);
351+
@Override
352+
public void decide(DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator)
353+
throws Throwable {
354+
decideImpl(decisionTaskWithHistoryIterator, null);
356355
}
357356

358-
private void decideImpl(HistoryHelper historyHelper, Functions.Proc query) throws Throwable {
357+
private void decideImpl(
358+
DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator, Functions.Proc query)
359+
throws Throwable {
359360
try {
361+
HistoryHelper historyHelper = new HistoryHelper(decisionTaskWithHistoryIterator);
360362
DecisionEventsIterator iterator = historyHelper.getIterator();
363+
if ((decisionsHelper.getNextDecisionEventId()
364+
!= historyHelper.getPreviousStartedEventId()
365+
+ 2) // getNextDecisionEventId() skips over completed.
366+
&& (decisionsHelper.getNextDecisionEventId() != 0
367+
&& historyHelper.getPreviousStartedEventId() != 0)) {
368+
throw new IllegalStateException(
369+
String.format(
370+
"ReplayDecider expects next event id at %d. History's previous started event id is %d",
371+
decisionsHelper.getNextDecisionEventId(),
372+
historyHelper.getPreviousStartedEventId()));
373+
}
374+
361375
while (iterator.hasNext()) {
362376
DecisionEvents decision = iterator.next();
377+
363378
context.setReplaying(decision.isReplay());
364379
context.setReplayCurrentTimeMilliseconds(decision.getReplayCurrentTimeMilliseconds());
365380

@@ -388,17 +403,26 @@ private void decideImpl(HistoryHelper historyHelper, Functions.Proc query) throw
388403
if (query != null) {
389404
query.apply();
390405
}
391-
workflow.close();
406+
if (completed) {
407+
close();
408+
}
392409
}
393410
}
394411

412+
@Override
413+
public void close() {
414+
workflow.close();
415+
}
416+
395417
DecisionsHelper getDecisionsHelper() {
396418
return decisionsHelper;
397419
}
398420

399-
public byte[] query(HistoryHelper historyHelper, WorkflowQuery query) throws Throwable {
421+
@Override
422+
public byte[] query(DecisionTaskWithHistoryIterator decisionTaskIterator, WorkflowQuery query)
423+
throws Throwable {
400424
AtomicReference<byte[]> result = new AtomicReference<>();
401-
decideImpl(historyHelper, () -> result.set(workflow.query(query)));
425+
decideImpl(decisionTaskIterator, () -> result.set(workflow.query(query)));
402426
return result.get();
403427
}
404428
}

0 commit comments

Comments
 (0)