Skip to content

Commit 06cd803

Browse files
Backport to branch(3.16) : Add configuration option to enable/disable active transaction management (#3256)
Co-authored-by: Toshihiro Suzuki <brfrn169@gmail.com>
1 parent b19649d commit 06cd803

15 files changed

+400
-227
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.scalar.db.api;
2+
3+
import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager;
4+
import com.scalar.db.common.ActiveTransactionManagedTwoPhaseCommitTransactionManager;
5+
import com.scalar.db.common.StateManagedDistributedTransactionManager;
6+
import com.scalar.db.common.StateManagedTwoPhaseCommitTransactionManager;
7+
import com.scalar.db.config.DatabaseConfig;
8+
import javax.annotation.Nullable;
9+
10+
public abstract class AbstractDistributedTransactionProvider
11+
implements DistributedTransactionProvider {
12+
13+
@Override
14+
public DistributedTransactionManager createDistributedTransactionManager(DatabaseConfig config) {
15+
DistributedTransactionManager transactionManager =
16+
createRawDistributedTransactionManager(config);
17+
18+
// Wrap the transaction manager for state management
19+
transactionManager = new StateManagedDistributedTransactionManager(transactionManager);
20+
21+
if (config.isActiveTransactionManagementEnabled()) {
22+
// Wrap the transaction manager for active transaction management
23+
transactionManager =
24+
new ActiveTransactionManagedDistributedTransactionManager(
25+
transactionManager, config.getActiveTransactionManagementExpirationTimeMillis());
26+
}
27+
28+
return transactionManager;
29+
}
30+
31+
protected abstract DistributedTransactionManager createRawDistributedTransactionManager(
32+
DatabaseConfig config);
33+
34+
@Nullable
35+
@Override
36+
public TwoPhaseCommitTransactionManager createTwoPhaseCommitTransactionManager(
37+
DatabaseConfig config) {
38+
TwoPhaseCommitTransactionManager transactionManager =
39+
createRawTwoPhaseCommitTransactionManager(config);
40+
41+
if (transactionManager == null) {
42+
return null;
43+
}
44+
45+
// Wrap the transaction manager for state management
46+
transactionManager = new StateManagedTwoPhaseCommitTransactionManager(transactionManager);
47+
48+
if (config.isActiveTransactionManagementEnabled()) {
49+
// Wrap the transaction manager for active transaction management
50+
transactionManager =
51+
new ActiveTransactionManagedTwoPhaseCommitTransactionManager(
52+
transactionManager, config.getActiveTransactionManagementExpirationTimeMillis());
53+
}
54+
55+
return transactionManager;
56+
}
57+
58+
protected abstract TwoPhaseCommitTransactionManager createRawTwoPhaseCommitTransactionManager(
59+
DatabaseConfig config);
60+
}

core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java

Lines changed: 27 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,9 @@
1919
import com.scalar.db.exception.transaction.TransactionException;
2020
import com.scalar.db.exception.transaction.TransactionNotFoundException;
2121
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
22-
import com.scalar.db.util.ActiveExpiringMap;
2322
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24-
import java.util.Iterator;
2523
import java.util.List;
2624
import java.util.Optional;
27-
import java.util.concurrent.atomic.AtomicReference;
28-
import java.util.function.BiConsumer;
29-
import javax.annotation.Nonnull;
3025
import javax.annotation.concurrent.ThreadSafe;
3126
import org.slf4j.Logger;
3227
import org.slf4j.LoggerFactory;
@@ -35,53 +30,24 @@
3530
public class ActiveTransactionManagedDistributedTransactionManager
3631
extends DecoratedDistributedTransactionManager {
3732

38-
private static final long TRANSACTION_EXPIRATION_INTERVAL_MILLIS = 1000;
39-
4033
private static final Logger logger =
4134
LoggerFactory.getLogger(ActiveTransactionManagedDistributedTransactionManager.class);
4235

43-
private final ActiveExpiringMap<String, ActiveTransaction> activeTransactions;
44-
45-
private final AtomicReference<BiConsumer<String, DistributedTransaction>>
46-
transactionExpirationHandler =
47-
new AtomicReference<>(
48-
(id, t) -> {
49-
try {
50-
t.rollback();
51-
} catch (Exception e) {
52-
logger.warn("Rollback failed. Transaction ID: {}", id, e);
53-
}
54-
});
36+
private final ActiveTransactionRegistry<DistributedTransaction> registry;
5537

5638
public ActiveTransactionManagedDistributedTransactionManager(
57-
DistributedTransactionManager transactionManager,
58-
long activeTransactionManagementExpirationTimeMillis) {
39+
DistributedTransactionManager transactionManager, long expirationTimeMillis) {
5940
super(transactionManager);
60-
activeTransactions =
61-
new ActiveExpiringMap<>(
62-
activeTransactionManagementExpirationTimeMillis,
63-
TRANSACTION_EXPIRATION_INTERVAL_MILLIS,
64-
(id, t) -> {
65-
logger.warn("The transaction is expired. Transaction ID: {}", id);
66-
transactionExpirationHandler.get().accept(id, t);
67-
});
68-
}
69-
70-
@Override
71-
public void setTransactionExpirationHandler(BiConsumer<String, DistributedTransaction> handler) {
72-
transactionExpirationHandler.set(handler);
41+
registry =
42+
new ActiveTransactionRegistry<>(expirationTimeMillis, DistributedTransaction::rollback);
7343
}
7444

75-
private void add(ActiveTransaction transaction) throws TransactionException {
76-
if (activeTransactions.putIfAbsent(transaction.getId(), transaction).isPresent()) {
77-
transaction.rollback();
78-
throw new TransactionException(
79-
CoreError.TRANSACTION_ALREADY_EXISTS.buildMessage(), transaction.getId());
80-
}
81-
}
82-
83-
private void remove(String transactionId) {
84-
activeTransactions.remove(transactionId);
45+
public ActiveTransactionManagedDistributedTransactionManager(
46+
DistributedTransactionManager transactionManager,
47+
long expirationTimeMillis,
48+
ActiveTransactionRegistry.TransactionRollback<DistributedTransaction> rollbackFunction) {
49+
super(transactionManager);
50+
registry = new ActiveTransactionRegistry<>(expirationTimeMillis, rollbackFunction);
8551
}
8652

8753
@Override
@@ -97,7 +63,7 @@ public DistributedTransaction join(String txId) throws TransactionNotFoundExcept
9763

9864
@Override
9965
public DistributedTransaction resume(String txId) throws TransactionNotFoundException {
100-
return activeTransactions
66+
return registry
10167
.get(txId)
10268
.orElseThrow(
10369
() ->
@@ -116,7 +82,18 @@ class ActiveTransaction extends DecoratedDistributedTransaction {
11682
@SuppressFBWarnings("EI_EXPOSE_REP2")
11783
private ActiveTransaction(DistributedTransaction transaction) throws TransactionException {
11884
super(transaction);
119-
add(this);
85+
if (!registry.add(getId(), this)) {
86+
try {
87+
transaction.rollback();
88+
} catch (RollbackException e) {
89+
logger.warn(
90+
"Rollback failed during duplicate transaction handling. Transaction ID: {}",
91+
getId(),
92+
e);
93+
}
94+
throw new TransactionException(
95+
CoreError.TRANSACTION_ALREADY_EXISTS.buildMessage(), getId());
96+
}
12097
}
12198

12299
@Override
@@ -131,37 +108,7 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {
131108

132109
@Override
133110
public synchronized Scanner getScanner(Scan scan) throws CrudException {
134-
Scanner scanner = super.getScanner(scan);
135-
return new Scanner() {
136-
@Override
137-
public Optional<Result> one() throws CrudException {
138-
synchronized (ActiveTransaction.this) {
139-
return scanner.one();
140-
}
141-
}
142-
143-
@Override
144-
public List<Result> all() throws CrudException {
145-
synchronized (ActiveTransaction.this) {
146-
return scanner.all();
147-
}
148-
}
149-
150-
@Override
151-
public void close() throws CrudException {
152-
synchronized (ActiveTransaction.this) {
153-
scanner.close();
154-
}
155-
}
156-
157-
@Nonnull
158-
@Override
159-
public Iterator<Result> iterator() {
160-
synchronized (ActiveTransaction.this) {
161-
return scanner.iterator();
162-
}
163-
}
164-
};
111+
return new SynchronizedScanner(this, super.getScanner(scan));
165112
}
166113

167114
/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
@@ -213,15 +160,15 @@ public synchronized void mutate(List<? extends Mutation> mutations) throws CrudE
213160
@Override
214161
public synchronized void commit() throws CommitException, UnknownTransactionStatusException {
215162
super.commit();
216-
remove(getId());
163+
registry.remove(getId());
217164
}
218165

219166
@Override
220167
public synchronized void rollback() throws RollbackException {
221168
try {
222169
super.rollback();
223170
} finally {
224-
remove(getId());
171+
registry.remove(getId());
225172
}
226173
}
227174

@@ -230,7 +177,7 @@ public synchronized void abort() throws AbortException {
230177
try {
231178
super.abort();
232179
} finally {
233-
remove(getId());
180+
registry.remove(getId());
234181
}
235182
}
236183
}

0 commit comments

Comments
 (0)