Skip to content

Commit bdb984d

Browse files
[BugFix] fix iceberg table cache bug (backport #65917) (backport #66000) (#66186)
Signed-off-by: SevenJ <wenjun7j@gmail.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: SevenJ <166966490+Wenjun7J@users.noreply.github.com>
1 parent 5027420 commit bdb984d

File tree

2 files changed

+81
-51
lines changed

2 files changed

+81
-51
lines changed

fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java

Lines changed: 44 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717
import com.github.benmanes.caffeine.cache.Caffeine;
1818
import com.github.benmanes.caffeine.cache.RemovalCause;
1919
import com.github.benmanes.caffeine.cache.Weigher;
20-
import com.google.common.cache.Cache;
21-
import com.google.common.cache.CacheBuilder;
22-
import com.google.common.cache.CacheLoader;
23-
import com.google.common.cache.LoadingCache;
24-
import com.google.common.cache.RemovalNotification;
2520
import com.google.common.collect.Lists;
2621
import com.starrocks.catalog.Database;
2722
import com.starrocks.catalog.IcebergTable;
@@ -65,7 +60,6 @@
6560
import java.util.concurrent.ExecutorService;
6661
import java.util.stream.Collectors;
6762

68-
import static com.google.common.cache.CacheLoader.asyncReloading;
6963
import static java.util.concurrent.TimeUnit.SECONDS;
7064

7165
public class CachingIcebergCatalog implements IcebergCatalog {
@@ -76,8 +70,8 @@ public class CachingIcebergCatalog implements IcebergCatalog {
7670
private static final int MEMORY_FILE_SAMPLES = 100;
7771
private final String catalogName;
7872
private final IcebergCatalog delegate;
79-
private final LoadingCache<IcebergTableCacheKey, Table> tables;
80-
private final Cache<String, Database> databases;
73+
private final com.github.benmanes.caffeine.cache.LoadingCache<IcebergTableCacheKey, Table> tables;
74+
private final com.github.benmanes.caffeine.cache.Cache<String, Database> databases;
8175
private final ExecutorService backgroundExecutor;
8276

8377
private final IcebergCatalogProperties icebergProperties;
@@ -87,7 +81,7 @@ public class CachingIcebergCatalog implements IcebergCatalog {
8781
private final Map<IcebergTableName, Long> tableLatestAccessTime = new ConcurrentHashMap<>();
8882
private final Map<IcebergTableName, Long> tableLatestRefreshTime = new ConcurrentHashMap<>();
8983

90-
private final LoadingCache<IcebergTableName, Map<String, Partition>> partitionCache;
84+
private final com.github.benmanes.caffeine.cache.LoadingCache<IcebergTableName, Map<String, Partition>> partitionCache;
9185

9286
public CachingIcebergCatalog(String catalogName, IcebergCatalog delegate, IcebergCatalogProperties icebergProperties,
9387
ExecutorService executorService) {
@@ -98,30 +92,39 @@ public CachingIcebergCatalog(String catalogName, IcebergCatalog delegate, Iceber
9892
boolean enableTableCache = icebergProperties.isEnableIcebergTableCache();
9993
this.databases = newCacheBuilder(icebergProperties.getIcebergMetaCacheTtlSec(), NEVER_CACHE,
10094
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build();
101-
this.tables = newCacheBuilder(icebergProperties.getIcebergMetaCacheTtlSec(),
95+
this.tables = newCacheBuilder(
96+
icebergProperties.getIcebergMetaCacheTtlSec(),
10297
icebergProperties.getIcebergTableCacheRefreshIntervalSec(),
10398
enableTableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE)
104-
.removalListener((RemovalNotification<IcebergTableCacheKey, Table> n) -> {
105-
LOG.debug("iceberg table cache removal: {}.{}, cause={}, evicted={}",
106-
n.getKey().icebergTableName.dbName, n.getKey().icebergTableName.tableName,
107-
n.getCause(), n.wasEvicted());
99+
.executor(executorService)
100+
.removalListener((IcebergTableCacheKey key, Table value, RemovalCause cause) -> {
101+
if (key != null) {
102+
LOG.debug("iceberg table cache removal: {}.{}, cause={}, evicted={}",
103+
key.icebergTableName.dbName, key.icebergTableName.tableName,
104+
cause, cause.wasEvicted());
105+
}
108106
})
109-
.build(asyncReloading(CacheLoader.from(key -> {
110-
LOG.debug("Loading iceberg table {}.{} from remote catalog",
111-
key.icebergTableName.dbName, key.icebergTableName.tableName);
112-
return delegate.getTable(key.connectContext,
113-
key.icebergTableName.dbName, key.icebergTableName.tableName);
114-
}), executorService));
107+
.build(new com.github.benmanes.caffeine.cache.CacheLoader<IcebergTableCacheKey, Table>() {
108+
@Override
109+
public Table load(IcebergTableCacheKey key) throws Exception {
110+
LOG.debug("Loading iceberg table {}.{} from remote catalog",
111+
key.icebergTableName.dbName, key.icebergTableName.tableName);
112+
return delegate.getTable(key.connectContext, key.icebergTableName.dbName,
113+
key.icebergTableName.tableName);
114+
}
115+
});
115116
this.partitionCache = newCacheBuilder(icebergProperties.getIcebergMetaCacheTtlSec(), NEVER_CACHE,
116117
enableCache ? DEFAULT_CACHE_NUM : NEVER_CACHE).build(
117-
CacheLoader.from(key -> {
118-
Table nativeTable = getTable(new ConnectContext(), key.dbName, key.tableName);
119-
IcebergTable icebergTable =
120-
IcebergTable.builder().setCatalogDBName(key.dbName).setCatalogTableName(key.tableName)
121-
.setNativeTable(nativeTable).build();
122-
return delegate.getPartitions(icebergTable, key.snapshotId, null);
123-
}));
124-
118+
new com.github.benmanes.caffeine.cache.CacheLoader<IcebergTableName, Map<String, Partition>>() {
119+
@Override
120+
public Map<String, Partition> load(IcebergTableName key) throws Exception {
121+
Table nativeTable = getTable(new ConnectContext(), key.dbName, key.tableName);
122+
IcebergTable icebergTable =
123+
IcebergTable.builder().setCatalogDBName(key.dbName).setCatalogTableName(key.tableName)
124+
.setNativeTable(nativeTable).build();
125+
return delegate.getPartitions(icebergTable, key.snapshotId, null);
126+
}
127+
});
125128
long dataFileCacheSize = Math.round(Runtime.getRuntime().maxMemory() *
126129
icebergProperties.getIcebergDataFileCacheMemoryUsageRatio());
127130
long deleteFileCacheSize = Math.round(Runtime.getRuntime().maxMemory() *
@@ -210,8 +213,8 @@ public Table getTable(ConnectContext connectContext, String dbName, String table
210213
}
211214

212215
// do not cache if jwt or oauth2 is not used OR if it is not a REST Catalog.
213-
boolean cacheAllowed = Strings.isNullOrEmpty(connectContext.getAuthToken())
214-
|| !(delegate instanceof IcebergRESTCatalog);
216+
boolean cacheAllowed = icebergProperties.isEnableIcebergTableCache() &&
217+
(Strings.isNullOrEmpty(connectContext.getAuthToken()) || !(delegate instanceof IcebergRESTCatalog));
215218
if (!cacheAllowed) {
216219
return delegate.getTable(connectContext, dbName, tableName);
217220
}
@@ -286,7 +289,7 @@ public Map<String, Partition> getPartitions(IcebergTable icebergTable, long snap
286289
ExecutorService executorService) {
287290
IcebergTableName key =
288291
new IcebergTableName(icebergTable.getCatalogDBName(), icebergTable.getCatalogTableName(), snapshotId);
289-
return partitionCache.getUnchecked(key);
292+
return partitionCache.get(key);
290293
}
291294

292295
@Override
@@ -363,19 +366,19 @@ private void refreshTable(BaseTable currentTable, BaseTable updatedTable,
363366
long updatedSnapshotId = updatedTable.currentSnapshot().snapshotId();
364367
IcebergTableName baseIcebergTableName = new IcebergTableName(dbName, tableName, baseSnapshotId);
365368
IcebergTableName updatedIcebergTableName = new IcebergTableName(dbName, tableName, updatedSnapshotId);
366-
IcebergTableCacheKey baseKey = new IcebergTableCacheKey(baseIcebergTableName, ctx);
369+
IcebergTableCacheKey keyWithoutSnap = new IcebergTableCacheKey(new IcebergTableName(dbName, tableName), ctx);
367370
IcebergTableCacheKey updateKey = new IcebergTableCacheKey(updatedIcebergTableName, ctx);
368371
long latestRefreshTime = tableLatestRefreshTime.computeIfAbsent(new IcebergTableName(dbName, tableName), ignore -> -1L);
369372

370373
// update tables before refresh partition cache
371374
// so when refreshing partition cache, `getTables` can return the latest one.
372375
// another way to fix is to call `delegate.getTables` when refreshing partition cache.
373376
synchronized (this) {
374-
tables.put(updateKey, updatedTable);
377+
tables.put(keyWithoutSnap, updatedTable);
375378
}
376379

377380
partitionCache.invalidate(baseIcebergTableName);
378-
partitionCache.getUnchecked(updatedIcebergTableName);
381+
partitionCache.get(updatedIcebergTableName);
379382

380383
TableMetadata updatedTableMetadata = updatedTable.operations().current();
381384
List<ManifestFile> manifestFiles = updatedTable.currentSnapshot().dataManifests(updatedTable.io()).stream()
@@ -438,16 +441,7 @@ public void invalidateCache(String dbName, String tableName) {
438441
}
439442

440443
private void invalidateCache(IcebergTableName key) {
441-
if (key.ignoreSnapshotId) {
442-
// invalidate all snapshots of this table if snapshotId is not specified
443-
tables.asMap().keySet().stream()
444-
.filter(k -> k.icebergTableName.dbName.equals(key.dbName) &&
445-
k.icebergTableName.tableName.equals(key.tableName))
446-
.forEach(tables::invalidate);
447-
} else {
448-
// only invalidate the specified snapshot
449-
tables.invalidate(new IcebergTableCacheKey(key, new ConnectContext()));
450-
}
444+
tables.invalidate(new IcebergTableCacheKey(key, new ConnectContext()));
451445
// will invalidate all snapshots of this table
452446
partitionCache.invalidate(key);
453447
Set<String> paths = metaFileCacheMap.get(key);
@@ -470,8 +464,9 @@ public StarRocksIcebergTableScan getTableScan(Table table, StarRocksIcebergTable
470464
return delegate.getTableScan(table, scanContext);
471465
}
472466

473-
private CacheBuilder<Object, Object> newCacheBuilder(long expiresAfterWriteSec, long refreshInterval, long maximumSize) {
474-
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
467+
private Caffeine<Object, Object> newCacheBuilder(long expiresAfterWriteSec, long refreshInterval,
468+
long maximumSize) {
469+
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
475470
if (expiresAfterWriteSec >= 0) {
476471
cacheBuilder.expireAfterWrite(expiresAfterWriteSec, SECONDS);
477472
}
@@ -573,7 +568,7 @@ public List<Pair<List<Object>, Long>> getSamples() {
573568
.stream()
574569
.limit(MEMORY_META_SAMPLES)
575570
.collect(Collectors.toList()),
576-
databases.size());
571+
databases.estimatedSize());
577572

578573
List<List<String>> partitionNames = getAllCachedPartitionNames();
579574
List<Object> partitions = partitionNames
@@ -614,8 +609,8 @@ public List<Pair<List<Object>, Long>> getSamples() {
614609
public Map<String, Long> estimateCount() {
615610
Map<String, Long> counter = new HashMap<>();
616611
List<List<String>> partitionNames = getAllCachedPartitionNames();
617-
counter.put("Database", databases.size());
618-
counter.put("Table", tables.size());
612+
counter.put("Database", databases.estimatedSize());
613+
counter.put("Table", tables.estimatedSize());
619614
counter.put("PartitionNames", partitionNames
620615
.stream()
621616
.mapToLong(List::size)

fe/fe-core/src/test/java/com/starrocks/connector/iceberg/CachingIcebergCatalogTest.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.starrocks.catalog.IcebergTable;
1919
import com.starrocks.common.MetaNotFoundException;
2020
import com.starrocks.connector.ConnectorMetadatRequestContext;
21+
import com.starrocks.connector.iceberg.rest.IcebergRESTCatalog;
2122
import com.starrocks.qe.ConnectContext;
2223
import com.starrocks.qe.SessionVariable;
2324
import com.starrocks.utframe.UtFrameUtils;
@@ -253,12 +254,46 @@ public void testTableCacheDisabled_hitsDelegateTwice(@Mocked IcebergCatalog dele
253254
new Verifications() {
254255
{
255256
delegate.getTable(ctx, "db1", "t1");
256-
times = 2;
257+
times = 2; //caffeine has a diff with guava here
257258
}
258259
};
259260
} finally {
260261
es.shutdownNow();
261262
}
262263
}
263-
}
264264

265+
@Test
266+
public void testEstimateCountReflectsTableCache(@Mocked IcebergCatalog icebergCatalog, @Mocked Table nativeTable) {
267+
new Expectations() {
268+
{
269+
icebergCatalog.getTable(connectContext, "db2", "tbl2");
270+
result = nativeTable;
271+
times = 1;
272+
}
273+
};
274+
CachingIcebergCatalog cachingIcebergCatalog = new CachingIcebergCatalog(CATALOG_NAME, icebergCatalog,
275+
DEFAULT_CATALOG_PROPERTIES, Executors.newSingleThreadExecutor());
276+
cachingIcebergCatalog.getTable(connectContext, "db2", "tbl2");
277+
Map<String, Long> counts = cachingIcebergCatalog.estimateCount();
278+
Assertions.assertEquals(1L, counts.get("Table"));
279+
}
280+
281+
@Test
282+
public void testGetTableBypassCacheForRestCatalogWhenAuthToken(@Mocked IcebergRESTCatalog restCatalog,
283+
@Mocked Table nativeTable) {
284+
ConnectContext ctx = new ConnectContext();
285+
ctx.setAuthToken("token");
286+
new Expectations() {
287+
{
288+
restCatalog.getTable(ctx, "db3", "tbl3");
289+
result = nativeTable;
290+
times = 2;
291+
}
292+
};
293+
294+
CachingIcebergCatalog cachingIcebergCatalog = new CachingIcebergCatalog(CATALOG_NAME, restCatalog,
295+
DEFAULT_CATALOG_PROPERTIES, Executors.newSingleThreadExecutor());
296+
Assertions.assertEquals(nativeTable, cachingIcebergCatalog.getTable(ctx, "db3", "tbl3"));
297+
Assertions.assertEquals(nativeTable, cachingIcebergCatalog.getTable(ctx, "db3", "tbl3"));
298+
}
299+
}

0 commit comments

Comments
 (0)