diff --git a/src/common/file_system/local_file_system.cpp b/src/common/file_system/local_file_system.cpp index 64e1cc55e..3c3aa0584 100644 --- a/src/common/file_system/local_file_system.cpp +++ b/src/common/file_system/local_file_system.cpp @@ -123,6 +123,7 @@ std::unique_ptr LocalFileSystem::openFile(const std::string& path, Fil BOOL rc = LockFileEx(handle, dwFlags, 0 /*reserved*/, 1 /*numBytesLow*/, 0 /*numBytesHigh*/, &overlapped); if (!rc) { + CloseHandle(handle); auto error = GetLastError(); throw IOException("Could not set lock on file : " + fullPath + " (Error: " + std::to_string(error) + ")\n" + @@ -146,6 +147,7 @@ std::unique_ptr LocalFileSystem::openFile(const std::string& path, Fil int rc = fcntl(fd, F_SETLK, &fl); if (rc == -1) { int original_errno = errno; + close(fd); if (original_errno == EAGAIN || original_errno == EACCES) { struct flock get_fl {}; memset(&get_fl, 0, sizeof get_fl); diff --git a/src/include/storage/checkpointer.h b/src/include/storage/checkpointer.h index 20ac6df3f..9e7648b44 100644 --- a/src/include/storage/checkpointer.h +++ b/src/include/storage/checkpointer.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -15,6 +16,7 @@ namespace catalog { class Catalog; } namespace common { +struct FileInfo; class VirtualFileSystem; } // namespace common namespace testing { @@ -72,6 +74,8 @@ class Checkpointer { static void readCheckpoint(main::ClientContext* context, catalog::Catalog* catalog, StorageManager* storageManager); + void acquireCheckpointLocks(); + void releaseCheckpointLocks(); PageRange serializeCatalog(const catalog::Catalog& catalog, StorageManager& storageManager); PageRange serializeCatalogSnapshot(const catalog::Catalog& catalog, @@ -105,6 +109,8 @@ class Checkpointer { tableEpochWatermarksByManager; std::unordered_map catalogVersionAtCheckpointByCatalog; std::unordered_map pageManagerVersionAtCheckpointByManager; + std::unique_ptr checkpointIntentLockFile; + std::unique_ptr checkpointApplyLockFile; }; } // namespace storage diff --git a/src/include/storage/storage_utils.h b/src/include/storage/storage_utils.h index 8993ab196..f9efe1936 100644 --- a/src/include/storage/storage_utils.h +++ b/src/include/storage/storage_utils.h @@ -77,6 +77,12 @@ class StorageUtils { static std::string getShadowFilePath(const std::string& path) { return std::format("{}.{}", path, common::StorageConstants::SHADOWING_SUFFIX); } + static std::string getCheckpointIntentLockFilePath(const std::string& path) { + return std::format("{}.checkpoint.intent.lock", path); + } + static std::string getCheckpointApplyLockFilePath(const std::string& path) { + return std::format("{}.checkpoint.apply.lock", path); + } static std::string getTmpFilePath(const std::string& path) { return std::format("{}.{}", path, common::StorageConstants::TEMP_FILE_SUFFIX); } diff --git a/src/storage/checkpointer.cpp b/src/storage/checkpointer.cpp index a982f3ed0..dc2db96ff 100644 --- a/src/storage/checkpointer.cpp +++ b/src/storage/checkpointer.cpp @@ -1,8 +1,15 @@ #include "storage/checkpointer.h" +#include +#include +#include #include #include "catalog/catalog.h" +#include "common/constants.h" +#include "common/exception/io.h" +#include "common/exception/runtime.h" +#include "common/file_system/file_info.h" #include "common/file_system/file_system.h" #include "common/file_system/virtual_file_system.h" #include "common/serializer/buffered_file.h" @@ -17,9 +24,11 @@ #include "storage/database_header.h" #include "storage/shadow_utils.h" #include "storage/storage_manager.h" +#include "storage/storage_utils.h" #include "storage/storage_version_info.h" #include "storage/wal/local_wal.h" #include "transaction/transaction.h" +#include namespace lbug { namespace storage { @@ -63,6 +72,51 @@ void logCheckpointAndApplyShadowPagesForStorage(main::ClientContext& clientConte shadowFile.clear(*bufferManager); } +bool isLockContention(const common::IOException& exception) { + return std::string(exception.what()).find("Could not set lock") != std::string::npos; +} + +std::unique_ptr acquireCheckpointWriteLock(main::ClientContext& clientContext, + const std::string& lockPath) { + auto vfs = common::VirtualFileSystem::GetUnsafe(clientContext); + while (true) { + try { + return vfs->openFile(lockPath, + common::FileOpenFlags(common::FileFlags::READ_ONLY | common::FileFlags::WRITE | + common::FileFlags::CREATE_IF_NOT_EXISTS, + common::FileLockType::WRITE_LOCK), + &clientContext); + } catch (const common::IOException& exception) { + if (!isLockContention(exception)) { + throw; + } + std::this_thread::sleep_for( + std::chrono::microseconds(common::THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS)); + } + } +} + +bool isValidCheckpointPageRange(PageRange range, common::page_idx_t numPages) { + if (range.startPageIdx == common::INVALID_PAGE_IDX) { + return range.numPages == 0; + } + if (range.numPages == 0 || range.startPageIdx >= numPages) { + return false; + } + return range.numPages <= numPages - range.startPageIdx; +} + +void validateCheckpointPageRange(PageRange range, common::page_idx_t numPages, + std::string_view name) { + if (isValidCheckpointPageRange(range, numPages)) { + return; + } + throw common::RuntimeException(std::format( + "Cannot read checkpoint: {} page range starts at {} and spans {} pages, outside the " + "database file with {} pages. The database may be checkpointing; please retry later.", + std::string{name}, range.startPageIdx, range.numPages, numPages)); +} + } // namespace Checkpointer::Checkpointer(main::ClientContext& clientContext) @@ -72,6 +126,22 @@ Checkpointer::Checkpointer(main::ClientContext& clientContext) Checkpointer::~Checkpointer() = default; +void Checkpointer::acquireCheckpointLocks() { + if (isInMemory || checkpointIntentLockFile || checkpointApplyLockFile) { + return; + } + const auto databasePath = clientContext.getDatabasePath(); + checkpointIntentLockFile = acquireCheckpointWriteLock(clientContext, + StorageUtils::getCheckpointIntentLockFilePath(databasePath)); + checkpointApplyLockFile = acquireCheckpointWriteLock(clientContext, + StorageUtils::getCheckpointApplyLockFilePath(databasePath)); +} + +void Checkpointer::releaseCheckpointLocks() { + checkpointApplyLockFile.reset(); + checkpointIntentLockFile.reset(); +} + std::vector Checkpointer::collectCheckpointTargets() const { std::vector result; result.push_back({clientContext.getDatabase()->getCatalog(), mainStorageManager}); @@ -157,6 +227,7 @@ void Checkpointer::writeCheckpoint() { return; } + acquireCheckpointLocks(); checkpointTargets = collectCheckpointTargets(); for (const auto& target : checkpointTargets) { @@ -200,6 +271,7 @@ void Checkpointer::beginCheckpoint(common::transaction_t snapshotTimestamp) { return; } + acquireCheckpointLocks(); snapshotTS = snapshotTimestamp; checkpointTargets = collectCheckpointTargets(); @@ -301,6 +373,7 @@ void Checkpointer::postCheckpointCleanup(bool canResetPageManagerToCurrent) { } target.storageManager->getShadowFile().reset(); } + releaseCheckpointLocks(); } bool Checkpointer::checkpointStorage() { @@ -389,6 +462,7 @@ void Checkpointer::rollback() { for (const auto& target : checkpointTargets) { target.storageManager->rollbackCheckpoint(*target.catalog); } + releaseCheckpointLocks(); } bool Checkpointer::canAutoCheckpoint(const main::ClientContext& clientContext, @@ -426,6 +500,15 @@ void Checkpointer::readCheckpoint(main::ClientContext* context, catalog::Catalog auto reader = std::make_unique(*fileInfo); common::Deserializer deSer(std::move(reader)); auto currentHeader = std::make_unique(DatabaseHeader::deserialize(deSer)); + const auto numPages = storageManager->getDataFH()->getNumPages(); + validateCheckpointPageRange(currentHeader->catalogPageRange, numPages, "catalog"); + validateCheckpointPageRange(currentHeader->metadataPageRange, numPages, "metadata"); + if (currentHeader->dataFileNumPages != 0 && currentHeader->dataFileNumPages > numPages) { + throw common::RuntimeException(std::format( + "Cannot read checkpoint: header expects {} database pages, but the file has {} pages. " + "The database may be checkpointing; please retry later.", + currentHeader->dataFileNumPages, numPages)); + } // If the catalog page range is invalid, it means there is no catalog to read; thus, the // database is empty. if (currentHeader->catalogPageRange.startPageIdx != common::INVALID_PAGE_IDX) { diff --git a/src/storage/storage_manager.cpp b/src/storage/storage_manager.cpp index bd4c9954c..aebf95554 100644 --- a/src/storage/storage_manager.cpp +++ b/src/storage/storage_manager.cpp @@ -18,6 +18,7 @@ #include "storage/buffer_manager/memory_manager.h" #include "storage/checkpointer.h" #include "storage/index/art_index.h" +#include "storage/storage_utils.h" #include "storage/table/arrow_node_table.h" #include "storage/table/arrow_rel_table.h" #include "storage/table/arrow_table_support.h" @@ -47,6 +48,14 @@ StorageManager::StorageManager(const std::string& databasePath, bool readOnly, b shadowFile = std::make_unique(*memoryManager.getBufferManager(), vfs, this->databasePath); inMemory = main::DBConfig::isDBPathInMemory(databasePath); + if (!readOnly && !inMemory) { + vfs->openFile(StorageUtils::getCheckpointIntentLockFilePath(databasePath), + FileOpenFlags( + FileFlags::READ_ONLY | FileFlags::WRITE | FileFlags::CREATE_IF_NOT_EXISTS)); + vfs->openFile(StorageUtils::getCheckpointApplyLockFilePath(databasePath), + FileOpenFlags( + FileFlags::READ_ONLY | FileFlags::WRITE | FileFlags::CREATE_IF_NOT_EXISTS)); + } registerIndexType(PrimaryKeyIndex::getIndexType()); registerIndexType(ArtPrimaryKeyIndex::getIndexType()); } diff --git a/src/storage/wal/wal_replayer.cpp b/src/storage/wal/wal_replayer.cpp index c1b118969..f4aa7f58c 100644 --- a/src/storage/wal/wal_replayer.cpp +++ b/src/storage/wal/wal_replayer.cpp @@ -1,5 +1,7 @@ #include "storage/wal/wal_replayer.h" +#include "common/exception/io.h" +#include "common/exception/runtime.h" #include "common/file_system/file_info.h" #include "common/file_system/file_system.h" #include "common/file_system/virtual_file_system.h" @@ -11,6 +13,7 @@ #include "storage/file_db_id_utils.h" #include "storage/local_storage/local_rel_table.h" #include "storage/storage_manager.h" +#include "storage/storage_utils.h" #include "storage/wal/checksum_reader.h" #include "storage/wal/wal_record.h" #include "transaction/transaction_context.h" @@ -25,6 +28,8 @@ namespace storage { static constexpr std::string_view checksumMismatchMessage = "Checksum verification failed, the WAL file is corrupted."; +static constexpr std::string_view readOnlyCheckpointInProgressMessage = + "Cannot open database in read-only mode while checkpoint is in progress. Please retry later."; WALReplayer::WALReplayer(main::ClientContext& clientContext) : clientContext{clientContext} { walPath = StorageUtils::getWALFilePath(clientContext.getDatabasePath()); @@ -74,11 +79,52 @@ static uint64_t getReadOffset(Deserializer& deSer, bool enableChecksums) { } } +static std::unique_ptr tryAcquireReadOnlyCheckpointLock( + main::ClientContext& clientContext, const std::string& lockPath) { + auto vfs = VirtualFileSystem::GetUnsafe(clientContext); + if (!vfs->fileOrPathExists(lockPath, &clientContext)) { + return nullptr; + } + try { + return vfs->openFile(lockPath, FileOpenFlags(FileFlags::READ_ONLY, FileLockType::READ_LOCK), + &clientContext); + } catch (const IOException&) { + throw RuntimeException(std::string(readOnlyCheckpointInProgressMessage)); + } +} + +static std::unique_ptr acquireReadOnlyCheckpointApplyLock( + main::ClientContext& clientContext) { + if (!StorageManager::Get(clientContext)->isReadOnly()) { + return nullptr; + } + const auto databasePath = clientContext.getDatabasePath(); + auto intentLock = tryAcquireReadOnlyCheckpointLock(clientContext, + StorageUtils::getCheckpointIntentLockFilePath(databasePath)); + auto applyLock = tryAcquireReadOnlyCheckpointLock(clientContext, + StorageUtils::getCheckpointApplyLockFilePath(databasePath)); + return applyLock; +} + +static void throwIfReadOnlyCheckpointState(main::ClientContext& clientContext, bool hasFrozenWAL, + const std::string& shadowFilePath) { + if (!StorageManager::Get(clientContext)->isReadOnly()) { + return; + } + auto vfs = VirtualFileSystem::GetUnsafe(clientContext); + if (hasFrozenWAL || vfs->fileOrPathExists(shadowFilePath, &clientContext)) { + throw RuntimeException(std::string(readOnlyCheckpointInProgressMessage)); + } +} + void WALReplayer::replay(bool throwOnWalReplayFailure, bool enableChecksums) const { auto vfs = VirtualFileSystem::GetUnsafe(clientContext); + [[maybe_unused]] auto readOnlyCheckpointApplyLock = + acquireReadOnlyCheckpointApplyLock(clientContext); Checkpointer checkpointer(clientContext); bool hasFrozenWAL = vfs->fileOrPathExists(checkpointWalPath, &clientContext); bool hasActiveWAL = vfs->fileOrPathExists(walPath, &clientContext); + throwIfReadOnlyCheckpointState(clientContext, hasFrozenWAL, shadowFilePath); if (!hasFrozenWAL && !hasActiveWAL) { removeFileIfExists(shadowFilePath); @@ -115,6 +161,7 @@ void WALReplayer::replayFrozenWAL(Checkpointer& checkpointer, bool throwOnWalRep auto [offsetDeserialized, isLastRecordCheckpoint] = dryReplay(*fileInfo, throwOnWalReplayFailure, enableChecksums); if (isLastRecordCheckpoint) { + throwIfReadOnlyCheckpointState(clientContext, true /* hasFrozenWAL */, shadowFilePath); ShadowFile::replayShadowPageRecords(clientContext); removeFileIfExists(checkpointWalPath); removeFileIfExists(walPath); @@ -161,6 +208,7 @@ void WALReplayer::replayActiveWAL(Checkpointer& checkpointer, bool throwOnWalRep auto [offsetDeserialized, isLastRecordCheckpoint] = dryReplay(*fileInfo, throwOnWalReplayFailure, enableChecksums); if (isLastRecordCheckpoint) { + throwIfReadOnlyCheckpointState(clientContext, true /* hasFrozenWAL */, shadowFilePath); ShadowFile::replayShadowPageRecords(clientContext); removeWALAndShadowFiles(); checkpointer.readCheckpoint(); diff --git a/test/transaction/wal_test.cpp b/test/transaction/wal_test.cpp index 9e4580285..a82cf2f9c 100644 --- a/test/transaction/wal_test.cpp +++ b/test/transaction/wal_test.cpp @@ -710,16 +710,29 @@ TEST_F(WalTest, ReadOnlyRecoveryWithShadowFile) { // Restart in read-only mode systemConfig->readOnly = true; - createDBAndConn(); - auto res = conn->query("MATCH (n:test) RETURN n.id ORDER BY n.id;"); - ASSERT_TRUE(res->isSuccess()); - // Should handle WAL recovery correctly - ASSERT_GE(res->getNumTuples(), 0); - // Files should remain unchanged in read-only mode + EXPECT_THROW(createDBAndConn(), RuntimeException); ASSERT_TRUE(std::filesystem::exists(walFilePath)); ASSERT_TRUE(std::filesystem::exists(shadowFilePath)); } +TEST_F(WalTest, ReadOnlyRecoveryWithCheckpointWAL) { + if (inMemMode || systemConfig->checkpointThreshold == 0) { + GTEST_SKIP(); + } + conn->query("CALL force_checkpoint_on_close=false"); + conn->query("CREATE NODE TABLE test(id INT64 PRIMARY KEY, name STRING);"); + auto checkpointWalFilePath = + lbug::storage::StorageUtils::getCheckpointWALFilePath(databasePath); + + std::ofstream file(checkpointWalFilePath); + file.close(); + ASSERT_TRUE(std::filesystem::exists(checkpointWalFilePath)); + + systemConfig->readOnly = true; + EXPECT_THROW(createDBAndConn(), RuntimeException); + ASSERT_TRUE(std::filesystem::exists(checkpointWalFilePath)); +} + TEST_F(WalTest, ReadOnlyRecoveryEmptyWALFile) { if (inMemMode || systemConfig->checkpointThreshold == 0) { GTEST_SKIP();