Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/file_system/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ std::unique_ptr<FileInfo> LocalFileSystem::openFile(const std::string& path, Fil
BOOL rc = LockFileEx(handle, dwFlags, 0 /*reserved*/, 1 /*numBytesLow*/, 0 /*numBytesHigh*/,
&overlapped);
if (!rc) {
CloseHandle(handle);
auto error = GetLastError();
throw IOException("Could not set lock on file : " + fullPath +
" (Error: " + std::to_string(error) + ")\n" +
Expand All @@ -146,6 +147,7 @@ std::unique_ptr<FileInfo> LocalFileSystem::openFile(const std::string& path, Fil
int rc = fcntl(fd, F_SETLK, &fl);
if (rc == -1) {
int original_errno = errno;
close(fd);
if (original_errno == EAGAIN || original_errno == EACCES) {
struct flock get_fl {};
memset(&get_fl, 0, sizeof get_fl);
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/checkpointer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <memory>
#include <unordered_map>
#include <vector>

Expand All @@ -15,6 +16,7 @@ namespace catalog {
class Catalog;
}
namespace common {
struct FileInfo;
class VirtualFileSystem;
} // namespace common
namespace testing {
Expand Down Expand Up @@ -72,6 +74,8 @@ class Checkpointer {

static void readCheckpoint(main::ClientContext* context, catalog::Catalog* catalog,
StorageManager* storageManager);
void acquireCheckpointLocks();
void releaseCheckpointLocks();

PageRange serializeCatalog(const catalog::Catalog& catalog, StorageManager& storageManager);
PageRange serializeCatalogSnapshot(const catalog::Catalog& catalog,
Expand Down Expand Up @@ -105,6 +109,8 @@ class Checkpointer {
tableEpochWatermarksByManager;
std::unordered_map<catalog::Catalog*, uint64_t> catalogVersionAtCheckpointByCatalog;
std::unordered_map<StorageManager*, uint64_t> pageManagerVersionAtCheckpointByManager;
std::unique_ptr<common::FileInfo> checkpointIntentLockFile;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the thought process here? Why not lock the main database file instead of creating new intent/apply files?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the core reason why we cannot rely solely on the primary database file lock is straightforward: this fix targets the consistency window during checkpoints, rather than simply controlling which processes can open the primary data file.
Let’s break down the problem point by point.

  1. Primary database locks cannot represent the phase semantics of a checkpoint
    A primary file lock can only indicate whether a file is occupied. However, this bug requires us to distinguish three distinct states:
    Normal readable state
    Dangerous intermediate state while a checkpoint is in progress
    Stable state after the checkpoint completes
    This change introduces a two-phase locking mechanism: intent lock and apply lock. It explicitly marks the "checkpoint-in-progress" state instead of conflating it with generic file occupancy status. The relevant code is defined in storage_utils.h, and the locking logic is implemented in checkpointer.cpp.
  2. Sole reliance on the primary lock leads to excessive blocking with overly coarse-grained semantics
    If read-only open operations contend for the primary file lock to avoid race conditions, we end up with two undesirable outcomes:
    All read-only openings fail whenever a writer process exists, resulting in overly conservative behavior.
    Read-only access is permitted to proceed, yet we lack a way to precisely block only during the risky checkpoint window.
    The goal of this patch is to block only unsafe time windows while avoiding unnecessary interruptions in other scenarios. This is why we introduced dedicated checkpoint coordination locks and status checks, rather than lumping all logic into the primary file lock.
  3. The issue involves multi-file consistency, which a single-file lock cannot enforce
    As highlighted in your summary, read-only recovery may encounter intermediate artifacts such as shadow files and checkpoint WAL entries.
    The inconsistency window spans the main data file, shadow files and checkpoint WAL files. Locking only the primary file cannot verify whether the entire set of files is in an atomically recoverable state.
    The corresponding safeguards are implemented in WALReplayer:
    First perform checkpoint lock detection for read-only mode (implemented in wal_replayer.cpp).
    Immediately trigger fail-fast errors for intermediate states involving shadow files or frozen WALs (also in wal_replayer.cpp).
  4. Primary file locks become ineffective after process crashes
    OS file locks are bound to the process lifecycle. Locks are automatically released when a process crashes, yet partial intermediate files may remain on disk.
    To address this, the patch adds checks based on the existence of state files plus validation for metadata page ranges. Any leftover partial state after a crash will result in a controlled failure instead of blind reads that trigger segmentation faults.
    Defensive validation for page ranges is added in the Checkpointer read path, converting invalid memory access crashes into managed exceptions. The code resides in checkpointer.cpp.
  5. The commit essentially converts race conditions into protocol-defined failure modes
    The final paragraph of your summary already captures the essence:
    The fix does not aim to eliminate failures entirely. Instead, it ensures the system fails explicitly whenever it hits the unsafe window, eliminating undefined behavior.
    This objective requires four components:
    Explicit checkpoint coordination with intent and apply locks
    Pre-emptive rejection logic for recovery (fail-fast on read-only opens)
    Defensive validation of checkpoint metadata
    Resource cleanup on lock acquisition failures to prevent file descriptor and handle leaks (implemented in local_file_system.cpp)
    Conclusion:
    The primary database file lock is a necessary but insufficient condition. It only enforces mutual exclusion, yet cannot implement the phase protocol, multi-file consistency boundaries, or detection of leftover intermediate states after crashes that this change requires.

std::unique_ptr<common::FileInfo> checkpointApplyLockFile;
};

} // namespace storage
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ class StorageUtils {
static std::string getShadowFilePath(const std::string& path) {
return std::format("{}.{}", path, common::StorageConstants::SHADOWING_SUFFIX);
}
static std::string getCheckpointIntentLockFilePath(const std::string& path) {
return std::format("{}.checkpoint.intent.lock", path);
}
static std::string getCheckpointApplyLockFilePath(const std::string& path) {
return std::format("{}.checkpoint.apply.lock", path);
}
static std::string getTmpFilePath(const std::string& path) {
return std::format("{}.{}", path, common::StorageConstants::TEMP_FILE_SUFFIX);
}
Expand Down
83 changes: 83 additions & 0 deletions src/storage/checkpointer.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#include "storage/checkpointer.h"

#include <chrono>
#include <string_view>
#include <thread>
#include <vector>

#include "catalog/catalog.h"
#include "common/constants.h"
#include "common/exception/io.h"
#include "common/exception/runtime.h"
#include "common/file_system/file_info.h"
#include "common/file_system/file_system.h"
#include "common/file_system/virtual_file_system.h"
#include "common/serializer/buffered_file.h"
Expand All @@ -17,9 +24,11 @@
#include "storage/database_header.h"
#include "storage/shadow_utils.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/storage_version_info.h"
#include "storage/wal/local_wal.h"
#include "transaction/transaction.h"
#include <format>

namespace lbug {
namespace storage {
Expand Down Expand Up @@ -63,6 +72,51 @@ void logCheckpointAndApplyShadowPagesForStorage(main::ClientContext& clientConte
shadowFile.clear(*bufferManager);
}

bool isLockContention(const common::IOException& exception) {
return std::string(exception.what()).find("Could not set lock") != std::string::npos;
}

std::unique_ptr<common::FileInfo> acquireCheckpointWriteLock(main::ClientContext& clientContext,
const std::string& lockPath) {
auto vfs = common::VirtualFileSystem::GetUnsafe(clientContext);
while (true) {
try {
return vfs->openFile(lockPath,
common::FileOpenFlags(common::FileFlags::READ_ONLY | common::FileFlags::WRITE |
common::FileFlags::CREATE_IF_NOT_EXISTS,
common::FileLockType::WRITE_LOCK),
&clientContext);
} catch (const common::IOException& exception) {
if (!isLockContention(exception)) {
throw;
}
std::this_thread::sleep_for(
std::chrono::microseconds(common::THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}
}

bool isValidCheckpointPageRange(PageRange range, common::page_idx_t numPages) {
if (range.startPageIdx == common::INVALID_PAGE_IDX) {
return range.numPages == 0;
}
if (range.numPages == 0 || range.startPageIdx >= numPages) {
return false;
}
return range.numPages <= numPages - range.startPageIdx;
}

void validateCheckpointPageRange(PageRange range, common::page_idx_t numPages,
std::string_view name) {
if (isValidCheckpointPageRange(range, numPages)) {
return;
}
throw common::RuntimeException(std::format(
"Cannot read checkpoint: {} page range starts at {} and spans {} pages, outside the "
"database file with {} pages. The database may be checkpointing; please retry later.",
std::string{name}, range.startPageIdx, range.numPages, numPages));
}

} // namespace

Checkpointer::Checkpointer(main::ClientContext& clientContext)
Expand All @@ -72,6 +126,22 @@ Checkpointer::Checkpointer(main::ClientContext& clientContext)

Checkpointer::~Checkpointer() = default;

void Checkpointer::acquireCheckpointLocks() {
if (isInMemory || checkpointIntentLockFile || checkpointApplyLockFile) {
return;
}
const auto databasePath = clientContext.getDatabasePath();
checkpointIntentLockFile = acquireCheckpointWriteLock(clientContext,
StorageUtils::getCheckpointIntentLockFilePath(databasePath));
checkpointApplyLockFile = acquireCheckpointWriteLock(clientContext,
StorageUtils::getCheckpointApplyLockFilePath(databasePath));
}

void Checkpointer::releaseCheckpointLocks() {
checkpointApplyLockFile.reset();
checkpointIntentLockFile.reset();
}

std::vector<Checkpointer::CheckpointTarget> Checkpointer::collectCheckpointTargets() const {
std::vector<CheckpointTarget> result;
result.push_back({clientContext.getDatabase()->getCatalog(), mainStorageManager});
Expand Down Expand Up @@ -157,6 +227,7 @@ void Checkpointer::writeCheckpoint() {
return;
}

acquireCheckpointLocks();
checkpointTargets = collectCheckpointTargets();

for (const auto& target : checkpointTargets) {
Expand Down Expand Up @@ -200,6 +271,7 @@ void Checkpointer::beginCheckpoint(common::transaction_t snapshotTimestamp) {
return;
}

acquireCheckpointLocks();
snapshotTS = snapshotTimestamp;
checkpointTargets = collectCheckpointTargets();

Expand Down Expand Up @@ -301,6 +373,7 @@ void Checkpointer::postCheckpointCleanup(bool canResetPageManagerToCurrent) {
}
target.storageManager->getShadowFile().reset();
}
releaseCheckpointLocks();
}

bool Checkpointer::checkpointStorage() {
Expand Down Expand Up @@ -389,6 +462,7 @@ void Checkpointer::rollback() {
for (const auto& target : checkpointTargets) {
target.storageManager->rollbackCheckpoint(*target.catalog);
}
releaseCheckpointLocks();
}

bool Checkpointer::canAutoCheckpoint(const main::ClientContext& clientContext,
Expand Down Expand Up @@ -426,6 +500,15 @@ void Checkpointer::readCheckpoint(main::ClientContext* context, catalog::Catalog
auto reader = std::make_unique<common::BufferedFileReader>(*fileInfo);
common::Deserializer deSer(std::move(reader));
auto currentHeader = std::make_unique<DatabaseHeader>(DatabaseHeader::deserialize(deSer));
const auto numPages = storageManager->getDataFH()->getNumPages();
validateCheckpointPageRange(currentHeader->catalogPageRange, numPages, "catalog");
validateCheckpointPageRange(currentHeader->metadataPageRange, numPages, "metadata");
if (currentHeader->dataFileNumPages != 0 && currentHeader->dataFileNumPages > numPages) {
throw common::RuntimeException(std::format(
"Cannot read checkpoint: header expects {} database pages, but the file has {} pages. "
"The database may be checkpointing; please retry later.",
currentHeader->dataFileNumPages, numPages));
}
// If the catalog page range is invalid, it means there is no catalog to read; thus, the
// database is empty.
if (currentHeader->catalogPageRange.startPageIdx != common::INVALID_PAGE_IDX) {
Expand Down
9 changes: 9 additions & 0 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "storage/buffer_manager/memory_manager.h"
#include "storage/checkpointer.h"
#include "storage/index/art_index.h"
#include "storage/storage_utils.h"
#include "storage/table/arrow_node_table.h"
#include "storage/table/arrow_rel_table.h"
#include "storage/table/arrow_table_support.h"
Expand Down Expand Up @@ -47,6 +48,14 @@ StorageManager::StorageManager(const std::string& databasePath, bool readOnly, b
shadowFile =
std::make_unique<ShadowFile>(*memoryManager.getBufferManager(), vfs, this->databasePath);
inMemory = main::DBConfig::isDBPathInMemory(databasePath);
if (!readOnly && !inMemory) {
vfs->openFile(StorageUtils::getCheckpointIntentLockFilePath(databasePath),
FileOpenFlags(
FileFlags::READ_ONLY | FileFlags::WRITE | FileFlags::CREATE_IF_NOT_EXISTS));
vfs->openFile(StorageUtils::getCheckpointApplyLockFilePath(databasePath),
FileOpenFlags(
FileFlags::READ_ONLY | FileFlags::WRITE | FileFlags::CREATE_IF_NOT_EXISTS));
}
registerIndexType(PrimaryKeyIndex::getIndexType());
registerIndexType(ArtPrimaryKeyIndex::getIndexType());
}
Expand Down
48 changes: 48 additions & 0 deletions src/storage/wal/wal_replayer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "storage/wal/wal_replayer.h"

#include "common/exception/io.h"
#include "common/exception/runtime.h"
#include "common/file_system/file_info.h"
#include "common/file_system/file_system.h"
#include "common/file_system/virtual_file_system.h"
Expand All @@ -11,6 +13,7 @@
#include "storage/file_db_id_utils.h"
#include "storage/local_storage/local_rel_table.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/wal/checksum_reader.h"
#include "storage/wal/wal_record.h"
#include "transaction/transaction_context.h"
Expand All @@ -25,6 +28,8 @@ namespace storage {

static constexpr std::string_view checksumMismatchMessage =
"Checksum verification failed, the WAL file is corrupted.";
static constexpr std::string_view readOnlyCheckpointInProgressMessage =
"Cannot open database in read-only mode while checkpoint is in progress. Please retry later.";

WALReplayer::WALReplayer(main::ClientContext& clientContext) : clientContext{clientContext} {
walPath = StorageUtils::getWALFilePath(clientContext.getDatabasePath());
Expand Down Expand Up @@ -74,11 +79,52 @@ static uint64_t getReadOffset(Deserializer& deSer, bool enableChecksums) {
}
}

static std::unique_ptr<FileInfo> tryAcquireReadOnlyCheckpointLock(
main::ClientContext& clientContext, const std::string& lockPath) {
auto vfs = VirtualFileSystem::GetUnsafe(clientContext);
if (!vfs->fileOrPathExists(lockPath, &clientContext)) {
return nullptr;
}
try {
return vfs->openFile(lockPath, FileOpenFlags(FileFlags::READ_ONLY, FileLockType::READ_LOCK),
&clientContext);
} catch (const IOException&) {
throw RuntimeException(std::string(readOnlyCheckpointInProgressMessage));
}
}

static std::unique_ptr<FileInfo> acquireReadOnlyCheckpointApplyLock(
main::ClientContext& clientContext) {
if (!StorageManager::Get(clientContext)->isReadOnly()) {
return nullptr;
}
const auto databasePath = clientContext.getDatabasePath();
auto intentLock = tryAcquireReadOnlyCheckpointLock(clientContext,
StorageUtils::getCheckpointIntentLockFilePath(databasePath));
auto applyLock = tryAcquireReadOnlyCheckpointLock(clientContext,
StorageUtils::getCheckpointApplyLockFilePath(databasePath));
return applyLock;
}

static void throwIfReadOnlyCheckpointState(main::ClientContext& clientContext, bool hasFrozenWAL,
const std::string& shadowFilePath) {
if (!StorageManager::Get(clientContext)->isReadOnly()) {
return;
}
auto vfs = VirtualFileSystem::GetUnsafe(clientContext);
if (hasFrozenWAL || vfs->fileOrPathExists(shadowFilePath, &clientContext)) {
throw RuntimeException(std::string(readOnlyCheckpointInProgressMessage));
}
}

void WALReplayer::replay(bool throwOnWalReplayFailure, bool enableChecksums) const {
auto vfs = VirtualFileSystem::GetUnsafe(clientContext);
[[maybe_unused]] auto readOnlyCheckpointApplyLock =
acquireReadOnlyCheckpointApplyLock(clientContext);
Checkpointer checkpointer(clientContext);
bool hasFrozenWAL = vfs->fileOrPathExists(checkpointWalPath, &clientContext);
bool hasActiveWAL = vfs->fileOrPathExists(walPath, &clientContext);
throwIfReadOnlyCheckpointState(clientContext, hasFrozenWAL, shadowFilePath);

if (!hasFrozenWAL && !hasActiveWAL) {
removeFileIfExists(shadowFilePath);
Expand Down Expand Up @@ -115,6 +161,7 @@ void WALReplayer::replayFrozenWAL(Checkpointer& checkpointer, bool throwOnWalRep
auto [offsetDeserialized, isLastRecordCheckpoint] =
dryReplay(*fileInfo, throwOnWalReplayFailure, enableChecksums);
if (isLastRecordCheckpoint) {
throwIfReadOnlyCheckpointState(clientContext, true /* hasFrozenWAL */, shadowFilePath);
ShadowFile::replayShadowPageRecords(clientContext);
removeFileIfExists(checkpointWalPath);
removeFileIfExists(walPath);
Expand Down Expand Up @@ -161,6 +208,7 @@ void WALReplayer::replayActiveWAL(Checkpointer& checkpointer, bool throwOnWalRep
auto [offsetDeserialized, isLastRecordCheckpoint] =
dryReplay(*fileInfo, throwOnWalReplayFailure, enableChecksums);
if (isLastRecordCheckpoint) {
throwIfReadOnlyCheckpointState(clientContext, true /* hasFrozenWAL */, shadowFilePath);
ShadowFile::replayShadowPageRecords(clientContext);
removeWALAndShadowFiles();
checkpointer.readCheckpoint();
Expand Down
25 changes: 19 additions & 6 deletions test/transaction/wal_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,16 +710,29 @@ TEST_F(WalTest, ReadOnlyRecoveryWithShadowFile) {

// Restart in read-only mode
systemConfig->readOnly = true;
createDBAndConn();
auto res = conn->query("MATCH (n:test) RETURN n.id ORDER BY n.id;");
ASSERT_TRUE(res->isSuccess());
// Should handle WAL recovery correctly
ASSERT_GE(res->getNumTuples(), 0);
// Files should remain unchanged in read-only mode
EXPECT_THROW(createDBAndConn(), RuntimeException);
ASSERT_TRUE(std::filesystem::exists(walFilePath));
ASSERT_TRUE(std::filesystem::exists(shadowFilePath));
}

TEST_F(WalTest, ReadOnlyRecoveryWithCheckpointWAL) {
if (inMemMode || systemConfig->checkpointThreshold == 0) {
GTEST_SKIP();
}
conn->query("CALL force_checkpoint_on_close=false");
conn->query("CREATE NODE TABLE test(id INT64 PRIMARY KEY, name STRING);");
auto checkpointWalFilePath =
lbug::storage::StorageUtils::getCheckpointWALFilePath(databasePath);

std::ofstream file(checkpointWalFilePath);
file.close();
ASSERT_TRUE(std::filesystem::exists(checkpointWalFilePath));

systemConfig->readOnly = true;
EXPECT_THROW(createDBAndConn(), RuntimeException);
ASSERT_TRUE(std::filesystem::exists(checkpointWalFilePath));
}

TEST_F(WalTest, ReadOnlyRecoveryEmptyWALFile) {
if (inMemMode || systemConfig->checkpointThreshold == 0) {
GTEST_SKIP();
Expand Down
Loading