Skip to content

Add in-memory cache for getLastEntryInLedger to reduce RocksDB getFloor CPU cost#4732

Open
hangc0276 wants to merge 1 commit into
apache:masterfrom
hangc0276:chenhang/improve_getLastEntryIndex
Open

Add in-memory cache for getLastEntryInLedger to reduce RocksDB getFloor CPU cost#4732
hangc0276 wants to merge 1 commit into
apache:masterfrom
hangc0276:chenhang/improve_getLastEntryIndex

Conversation

@hangc0276
Copy link
Copy Markdown
Contributor

Motivation

EntryLocationIndex.getLastEntryInLedgerInternal() calls locationsDb.getFloor() on every invocation, which is an expensive RocksDB seek operation. When a large number of ledgers query their last entry concurrently — for example, during a Pulsar broker failover where thousands of topics are loaded by another broker and send get-LAC requests — this causes severe CPU saturation on the bookie.

In a production Pulsar cluster with 3000+ topics and a bookie configured with 1 CPU, a single broker OOM event triggered massive concurrent getFloor() calls that throttled the bookie CPU for over 1 hour.

The existing WriteCache already caches last entry IDs for recent unflushed entries, but once entries are flushed to RocksDB, every getLastEntryInLedger() call falls through to the database with no caching layer.

Changes

Add a bounded in-memory ConcurrentLongLongHashMap cache in EntryLocationIndex that maps ledgerId → lastEntryId:

  • Read path (getLastEntryInLedgerInternal): Check the cache before calling getFloor(). On cache hit, return immediately without touching RocksDB. On cache miss, populate the cache after a
    successful getFloor() lookup.
  • Write path (addLocation): Eagerly update the cache when a newer entry is added, so the cache stays current even with interleaved reads and writes.
  • Delete path (delete): Remove the ledger from the cache.
  • Bounded size: Configurable via dbStorage_lastEntryCacheMaxSize (default: 10,000). When the cache exceeds this limit and a new ledger needs to be inserted, the cache is cleared and repopulates
    naturally with hot ledgers.

Using ConcurrentLongLongHashMap (lock-striped open hash map with primitive long keys/values) ensures minimal overhead on the write-hot path — no boxing, no LRU bookkeeping, no timestamp computation
per update.

Test plan

  • Added testGetLastEntryInLedgerCache covering cache hit, update on new entry, and invalidation on delete
  • All existing EntryLocationIndexTest tests pass

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a bounded in-memory cache in EntryLocationIndex to reduce expensive RocksDB getFloor() seeks when repeatedly querying the last entry in a ledger, targeting CPU saturation scenarios during high-concurrency ledger loads (e.g., broker failover).

Changes:

  • Added a ConcurrentLongLongHashMap-backed ledgerId -> lastEntryId cache checked on getLastEntryInLedgerInternal().
  • Updated addLocation() and delete() to keep the cache warm/consistent with writes and invalidations.
  • Added a unit test covering cache hit behavior, update-on-write, and delete behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java Adds/configures the last-entry cache and integrates it into read/write/delete paths.
bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java Adds a new test exercising the cache behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +165 to +169
// Populate cache for future lookups
if (lastEntryCache.size() >= lastEntryCacheMaxSize) {
lastEntryCache.clear();
}
lastEntryCache.put(ledgerId, lastEntryId);
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentLongLongHashMap requires values to be >= 0 (it uses -1 as a sentinel). lastEntryId can be -1 when the last key for a ledger is the special entry (see existing tests that write entryId -1), so lastEntryCache.put(ledgerId, lastEntryId) can throw IllegalArgumentException and break getLastEntryInLedger(). Consider guarding against negative lastEntryId or storing an encoded value (eg. entryId + 1) so -1 can be cached safely.

Copilot uses AI. Check for mistakes.
Comment on lines +136 to +138
stats.getGetLastEntryInLedgerStats()
.registerSuccessfulEvent(0, TimeUnit.NANOSECONDS);
return cachedLastEntry;
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache hit path records a latency of 0 via registerSuccessfulEvent(0, ...). This will skew getLastEntryInLedger latency metrics (and can make averages look artificially perfect under high cache hit rates). Please record the actual elapsed time for cache hits (eg. measure from method entry) or use a separate counter/metric for cache hits if you want to distinguish them.

Copilot uses AI. Check for mistakes.
Comment on lines +203 to +213
// Update the last entry cache if this entry is newer.
// ConcurrentLongLongHashMap.get() returns -1 if not found.
long cachedLastEntry = lastEntryCache.get(ledgerId);
if (cachedLastEntry < entryId) {
// Clear the cache if it exceeds the max size to bound memory usage.
// The cache will quickly repopulate with hot ledgers on subsequent reads.
if (cachedLastEntry < 0 && lastEntryCache.size() >= lastEntryCacheMaxSize) {
lastEntryCache.clear();
}
lastEntryCache.put(ledgerId, entryId);
}
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addLocation() can repopulate lastEntryCache even after delete(ledgerId) has been called (the cache update happens unconditionally here). That creates a correctness risk with existing behavior: removeOffsetFromDeletedLedgers() later removes ledgerId from deletedLedgers and deletes the DB range, but it does not clear lastEntryCache again. If entries were added after delete() (see addLedgerAfterDeleteTest), getLastEntryInLedger() may return a stale cached entryId for a ledger whose indexes were deleted. Consider skipping cache updates when deletedLedgers.contains(ledgerId) is true, and/or clearing cache entries for ledgersToDelete inside removeOffsetFromDeletedLedgers() before removing them from deletedLedgers.

Copilot uses AI. Check for mistakes.
Comment on lines +241 to +248
// Delete should invalidate cache
idx.delete(1);
try {
idx.getLastEntryInLedger(1);
fail("Should have thrown NoEntryException");
} catch (Bookie.NoEntryException e) {
// expected
}
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test’s “cache invalidation on delete” assertion currently relies on deletedLedgers.contains(ledgerId) throwing before the cache is consulted, so it doesn’t actually verify that the cache entry was cleared (or that stale cache entries can’t leak after removeOffsetFromDeletedLedgers() removes the ledger from deletedLedgers). To validate cache invalidation, consider calling idx.removeOffsetFromDeletedLedgers() after idx.delete(1) and then asserting getLastEntryInLedger(1) throws because the DB entries are gone (which would fail if the cache still returned a stale value).

Copilot uses AI. Check for mistakes.
Comment on lines +63 to +66
this.lastEntryCacheMaxSize = conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE);
this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder()
.expectedItems((int) Math.min(lastEntryCacheMaxSize, Integer.MAX_VALUE))
.build();
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastEntryCacheMaxSize is used directly as expectedItems(...) with the default ConcurrentLongLongHashMap concurrency level (16). ConcurrentLongLongHashMap enforces expectedItems > 0 and expectedItems >= concurrencyLevel, so setting dbStorage_lastEntryCacheMaxSize to a small value (e.g., 1–15) or 0 will throw IllegalArgumentException during EntryLocationIndex construction. Please clamp/validate the config and/or explicitly set the cache map concurrency level (eg. concurrencyLevel(min(16, expectedItems))) and treat <= 0 as “cache disabled”.

Suggested change
this.lastEntryCacheMaxSize = conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE);
this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder()
.expectedItems((int) Math.min(lastEntryCacheMaxSize, Integer.MAX_VALUE))
.build();
long configuredLastEntryCacheMaxSize =
conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE);
if (configuredLastEntryCacheMaxSize <= 0) {
// Treat non-positive values as "cache disabled", but still create a minimal map
this.lastEntryCacheMaxSize = 0;
this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder()
.expectedItems(1)
.concurrencyLevel(1)
.build();
} else {
this.lastEntryCacheMaxSize = configuredLastEntryCacheMaxSize;
int expectedItems = (int) Math.min(this.lastEntryCacheMaxSize, Integer.MAX_VALUE);
int concurrencyLevel = Math.min(16, expectedItems);
this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder()
.expectedItems(expectedItems)
.concurrencyLevel(concurrencyLevel)
.build();
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangc0276 thanks for the patch — the motivation is clear and the data-structure choice is right. A few things I'd like to discuss before merge:

1. Stale-cache race on the read path (correctness)

T_A (read L)                       T_B (addLocation L, N+1)
cache.get(L) → -1 (miss)
getFloor() → N (slow, contended)
                                   batch.put(N+1) + flush
                                   cache.get(L) → -1 (miss)
                                   cache.put(L, N+1)
cache.put(L, N)   ← regresses

The cached value can end up older than what's in RocksDB. It stays stale until the next addLocation on L (which fixes it via the cachedLastEntry < entryId check) or until the cache is cleared. For a closed/idle ledger that just stopped receiving writes, the staleness sticks. The whole point of this PR is that getFloor() is slow under load, so the window can be wide.

The same max-wins race exists in addLocation between two concurrent writers — both can pass the cachedLastEntry < entryId check and the lower entry can win.

Suggestion: mirror the CAS-loop pattern already used in WriteCache.put() (lines 168-178). ConcurrentLongLongHashMap already exposes compareAndSet, so the fix is local and matches existing convention in this package.

2. Cache eviction is "clear everything"

if (size >= max) cache.clear() in both addLocation and getLastEntryInLedgerInternal. With the 10K default and an active ledger set just over 10K, the cache gets wiped repeatedly and is effectively useless during the exact failover scenario this PR targets. The "repopulates naturally with hot ledgers" assumes warm-up time the storm doesn't provide.

Options:

  • Bump the default substantially (cost is ~16B/entry plus table overhead — 100K entries is ~2-3 MB)
  • Use a real bounded cache with LRU eviction (e.g., Caffeine) — more bookkeeping but stable hit rate
  • Stop accepting new inserts when full rather than wiping the working set

3. Cache hits skew the latency histogram

The cache-hit branch calls getGetLastEntryInLedgerStats().registerSuccessfulEvent(0, NS). Every cache hit records a zero-latency sample into the same op-stats logger used for actual RocksDB lookups, which destroys the p50/p99 signal operators rely on to detect storage regressions. Add a separate hit/miss counter (or distinct histograms) in EntryLocationIndexStats instead.

4. Logging style

The two new log.debug(...) calls use {} placeholder formatting, but the rest of this file is on the structured slog style (log.debug().attr(...).log(...) — see lines 81-84, 138-141). Please match.

5. Minor

  • New config dbStorage_lastEntryCacheMaxSize should be documented in conf/bk_server.conf alongside the other dbStorage_* settings.
  • expectedItems(max) will resize the table as it fills toward the cap (default fill factor 0.66). (int)(max / 0.66) matches the intent more closely.
  • testGetLastEntryInLedgerCache is single-threaded — given the races above, a small concurrent test would be valuable.
  • The cache invalidation in delete() runs before the actual DB row removal in removeOffsetFromDeletedLedgers(). The existing deletedLedgers.contains() guard in getLastEntryInLedger() keeps this correct, but a one-line comment noting the ordering invariant would help future readers.

The main blocker for me is (1) — the regression scenario is exactly the kind of bug that's hard to attribute later. The CAS-loop fix is small and brings this in line with the sibling code in WriteCache.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants