From 4f148c548689e7fdfa160336f200a0c00d1dff21 Mon Sep 17 00:00:00 2001 From: Ally Heev Date: Fri, 19 Jun 2026 21:25:46 +0530 Subject: [PATCH] add lance tables support --- src/binder/bind/bind_ddl.cpp | 19 +++++++ src/common/enums/storage_format.cpp | 5 +- src/extension/extension_entries.cpp | 3 ++ src/include/common/arrow/arrow.h | 15 ++++++ src/include/common/enums/storage_format.h | 2 +- src/include/extension/extension.h | 2 +- src/include/storage/storage_manager.h | 27 ++++++++++ src/include/storage/table/arrow_node_table.h | 9 +++- .../storage/table/ice_disk_node_table.h | 5 ++ src/include/storage/table/node_table.h | 29 +++++++++++ .../operator/scan/scan_node_table.cpp | 52 ++++++++++++------- src/storage/storage_manager.cpp | 35 ++++++++++--- test/include/test_runner/test_group.h | 3 +- test/runner/e2e_test.cpp | 10 ++++ test/test_helper/test_helper.cpp | 3 +- test/test_runner/test_parser.cpp | 3 ++ 16 files changed, 190 insertions(+), 32 deletions(-) diff --git a/src/binder/bind/bind_ddl.cpp b/src/binder/bind/bind_ddl.cpp index 7e6de9d102..5fb4922b3e 100644 --- a/src/binder/bind/bind_ddl.cpp +++ b/src/binder/bind/bind_ddl.cpp @@ -365,6 +365,25 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo* "Cannot mix icebug-disk tables with non-icebug-disk tables in CREATE REL TABLE."); } + bool isSrcLance = srcEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY ? + srcEntry->ptrCast()->getStorageFormat() == + StorageFormat::LANCE : + false; + bool isDstLance = dstEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY ? + dstEntry->ptrCast()->getStorageFormat() == + StorageFormat::LANCE : + false; + bool isRelLance = (storageFormat == StorageFormat::LANCE); + + // Lance rel tables must connect lance node tables, and non-lance rel tables + // cannot connect lance node tables. + if ((!isRelLance && (isSrcLance || isDstLance)) || + (isRelLance && (!isSrcLance || !isDstLance))) { + throw BinderException( + "Cannot mix lance tables with non-lance tables in CREATE REL TABLE. " + "Lance rel tables must connect lance node tables."); + } + // Use the actual shadow table IDs, not FOREIGN_TABLE_ID // The shadow tables allow the query planner to distinguish between different node tables auto srcTableID = srcEntry->getTableID(); diff --git a/src/common/enums/storage_format.cpp b/src/common/enums/storage_format.cpp index b1da19c8f1..5ce8f0387a 100644 --- a/src/common/enums/storage_format.cpp +++ b/src/common/enums/storage_format.cpp @@ -10,8 +10,11 @@ StorageFormat StorageFormatUtils::fromString(const std::string& str) { if (str == "icebug-disk") { return StorageFormat::ICEBUG_DISK; } + if (str == "lance") { + return StorageFormat::LANCE; + } throw BinderException( - std::format("Unsupported storage format '{}'. Valid options are: icebug-disk.", str)); + std::format("Unsupported storage format '{}'. Valid options are: icebug-disk, lance.", str)); } } // namespace common diff --git a/src/extension/extension_entries.cpp b/src/extension/extension_entries.cpp index ae3029cbb3..7237f113fc 100644 --- a/src/extension/extension_entries.cpp +++ b/src/extension/extension_entries.cpp @@ -25,6 +25,8 @@ static constexpr std::array vectorExtensionFunctions = {"QUERY_VECTOR_INDEX", "C "DROP_VECTOR_INDEX"}; static constexpr std::array llmExtensionFunctions = {"CREATE_EMBEDDING"}; static constexpr std::array neo4jExtensionFunctions = {"NEO4J_MIGRATE"}; +static constexpr std::array lanceExtensionFunctions = {"LANCE_VECTOR_SEARCH", "LANCE_FTS", + "LANCE_HYBRID_SEARCH"}; static constexpr std::array algoExtensionFunctions = {"K_CORE_DECOMPOSITION", "PAGE_RANK", "STRONGLY_CONNECTED_COMPONENTS_KOSARAJU", "STRONGLY_CONNECTED_COMPONENTS", "WEAKLY_CONNECTED_COMPONENTS"}; @@ -40,6 +42,7 @@ static constexpr EntriesForExtension functionsForExtensionsRaw[] = { {"LLM", llmExtensionFunctions, llmExtensionFunctions.size()}, {"NEO4J", neo4jExtensionFunctions, neo4jExtensionFunctions.size()}, {"ALGO", algoExtensionFunctions, algoExtensionFunctions.size()}, + {"LANCE", lanceExtensionFunctions, lanceExtensionFunctions.size()}, }; static constexpr std::array functionsForExtensions = std::to_array(functionsForExtensionsRaw); diff --git a/src/include/common/arrow/arrow.h b/src/include/common/arrow/arrow.h index 4953825a64..2ad85377ad 100644 --- a/src/include/common/arrow/arrow.h +++ b/src/include/common/arrow/arrow.h @@ -49,6 +49,21 @@ struct ArrowArray { void* private_data; }; +// Arrow C Stream Interface +// https://arrow.apache.org/docs/format/CStreamInterface.html +#ifndef ARROW_C_STREAM_INTERFACE +#define ARROW_C_STREAM_INTERFACE + +struct ArrowArrayStream { + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + const char* (*get_last_error)(struct ArrowArrayStream*); + void (*release)(struct ArrowArrayStream*); + void* private_data; +}; + +#endif // ARROW_C_STREAM_INTERFACE + #endif // ARROW_C_DATA_INTERFACE #ifdef __cplusplus diff --git a/src/include/common/enums/storage_format.h b/src/include/common/enums/storage_format.h index 58b39f14e1..4f7d4f453e 100644 --- a/src/include/common/enums/storage_format.h +++ b/src/include/common/enums/storage_format.h @@ -6,7 +6,7 @@ namespace lbug { namespace common { -enum class StorageFormat : uint8_t { NONE, ICEBUG_DISK }; +enum class StorageFormat : uint8_t { NONE, ICEBUG_DISK, LANCE }; struct StorageFormatUtils { static StorageFormat fromString(const std::string& str); diff --git a/src/include/extension/extension.h b/src/include/extension/extension.h index 1ef9f49009..50456d8a3f 100644 --- a/src/include/extension/extension.h +++ b/src/include/extension/extension.h @@ -78,7 +78,7 @@ struct LBUG_API ExtensionUtils { static constexpr const char* OFFICIAL_EXTENSION[] = {"ADBC", "HTTPFS", "POSTGRES", "DUCKDB", "JSON", "SQLITE", "FTS", "DELTA", "ICEBERG", "AZURE", "UNITY_CATALOG", "VECTOR", "NEO4J", - "ALGO", "LLM"}; + "ALGO", "LLM", "LANCE"}; static constexpr const char* EXTENSION_LOADER_SUFFIX = "_loader"; diff --git a/src/include/storage/storage_manager.h b/src/include/storage/storage_manager.h index 31ec304ff3..8f6f798e7e 100644 --- a/src/include/storage/storage_manager.h +++ b/src/include/storage/storage_manager.h @@ -1,8 +1,11 @@ #pragma once +#include #include #include +#include +#include "common/enums/storage_format.h" #include "shadow_file.h" #include "storage/index/index.h" #include "storage/stats/planner_stats.h" @@ -33,6 +36,15 @@ class RelTable; class DiskArrayCollection; struct DatabaseHeader; +/// Factory signature for extension-provided node tables (e.g. LanceNodeTable). +using NodeTableFactory = std::function(const StorageManager*, + const catalog::NodeTableCatalogEntry*, MemoryManager*, main::ClientContext*)>; + +/// Factory signature for extension-provided rel tables (e.g. LanceRelTable). +using RelTableFactory = std::function(catalog::RelGroupCatalogEntry*, + common::table_id_t, common::table_id_t, const StorageManager*, MemoryManager*, + main::ClientContext*)>; + class LBUG_API StorageManager { public: StorageManager(const std::string& databasePath, bool readOnly, bool enableChecksums, @@ -83,6 +95,16 @@ class LBUG_API StorageManager { std::optional> getIndexType( const std::string& typeName) const; + /// Register factories for an extension-defined storage format (e.g. LANCE). + /// The extension must call this during its load() function before any + /// CREATE TABLE with that format is attempted. + void registerStorageFormatHandler(common::StorageFormat format, NodeTableFactory nodeFactory, + RelTableFactory relFactory) { + std::unique_lock lck{formatFactoryMtx}; + nodeTableFactories[format] = std::move(nodeFactory); + relTableFactories[format] = std::move(relFactory); + } + void serialize(const catalog::Catalog& catalog, common::Serializer& ser); void serialize(const catalog::Catalog& catalog, const transaction::Transaction& snapshotTxn, common::Serializer& ser); @@ -132,6 +154,11 @@ class LBUG_API StorageManager { std::unordered_map plannerStatsCache; std::unordered_map tableNameCache; common::VirtualFileSystem* vfs_; // non-owning, owned by Database + + // Extension-provided storage format factories (protected by formatFactoryMtx) + mutable std::mutex formatFactoryMtx; + std::unordered_map nodeTableFactories; + std::unordered_map relTableFactories; }; } // namespace storage diff --git a/src/include/storage/table/arrow_node_table.h b/src/include/storage/table/arrow_node_table.h index febe709207..cb01c7d8b6 100644 --- a/src/include/storage/table/arrow_node_table.h +++ b/src/include/storage/table/arrow_node_table.h @@ -93,14 +93,19 @@ class ArrowNodeTable final : public ColumnarNodeTableBase { bool isVisibleNoLock(const transaction::Transaction* transaction, common::offset_t offset) const override; + // Virtual dispatch methods for scan_node_table.cpp extensibility + bool requiresExplicitScanInit() const override { return true; } + bool usesMorselScan() const override { return true; } + size_t getNumScanMorsels(const transaction::Transaction* transaction) const override; + // Note: createScanState() left at base default; ArrowNodeTable scan state is + // created via the existing dynamic_cast path in createNodeTableScanState(). + const ArrowSchemaWrapper& getArrowSchema() const { return schema; } const std::vector& getArrowArrays() const { return arrays; } common::node_group_idx_t getNumBatches( const transaction::Transaction* transaction) const override; - size_t getNumScanMorsels(const transaction::Transaction* transaction) const; - const catalog::NodeTableCatalogEntry* getCatalogEntry() const { return nodeTableCatalogEntry; } protected: diff --git a/src/include/storage/table/ice_disk_node_table.h b/src/include/storage/table/ice_disk_node_table.h index 2a6dc251ff..101433f70e 100644 --- a/src/include/storage/table/ice_disk_node_table.h +++ b/src/include/storage/table/ice_disk_node_table.h @@ -75,6 +75,11 @@ class IceDiskNodeTable final : public ColumnarNodeTableBase { bool isVisibleNoLock(const transaction::Transaction* transaction, common::offset_t offset) const override; + // Virtual dispatch for scan_node_table.cpp extensibility + bool requiresExplicitScanInit() const override { return true; } + // IceDisk uses nodeGroupIdx-based scanning, not morsel-based. + // usesMorselScan() and getNumScanMorsels() are left at NodeTable defaults (false/0). + const std::string& getParquetFilePath() const { return parquetFilePath; } protected: diff --git a/src/include/storage/table/node_table.h b/src/include/storage/table/node_table.h index a5259f8dc0..b71e0b35e5 100644 --- a/src/include/storage/table/node_table.h +++ b/src/include/storage/table/node_table.h @@ -129,6 +129,35 @@ class LBUG_API NodeTable : public Table { virtual void initializeScanCoordination( [[maybe_unused]] const transaction::Transaction* transaction) {} + // Virtual dispatch methods for scan_node_table.cpp extensibility. + // These allow extension-defined table types (e.g. LanceNodeTable) to plug + // into the core scan infrastructure without requiring dynamic_casts to + // concrete types that live in extension libraries. + + /// Returns true if this table type requires explicit initScanState() calls + /// in initCurrentTable() (e.g. Arrow, Lance, IceDisk columnar tables). + virtual bool requiresExplicitScanInit() const { return false; } + + /// Returns true if this table drives scanning via ColumnarNodeTableScanSharedState::getNextMorsel() + /// rather than via nodeGroupIdx assignment (e.g. Arrow, Lance). + virtual bool usesMorselScan() const { return false; } + + /// Returns the number of scan morsels for progress tracking. + /// Returns 0 for tables that use the IceDisk row-group path. + virtual size_t getNumScanMorsels( + [[maybe_unused]] const transaction::Transaction* transaction) const { + return 0; + } + + /// Creates a format-specific TableScanState. Returns nullptr to fall back to + /// the built-in dispatch (ArrowNodeTable / IceDiskNodeTable / default). + virtual std::unique_ptr createScanState( + [[maybe_unused]] common::ValueVector* nodeIDVector, + [[maybe_unused]] const std::vector& outVectors, + [[maybe_unused]] MemoryManager* memoryManager) const { + return nullptr; + } + bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override; template bool lookup(const transaction::Transaction* transaction, const TableScanState& scanState) const; diff --git a/src/processor/operator/scan/scan_node_table.cpp b/src/processor/operator/scan/scan_node_table.cpp index e9c9524854..6ac673b2ea 100644 --- a/src/processor/operator/scan/scan_node_table.cpp +++ b/src/processor/operator/scan/scan_node_table.cpp @@ -7,6 +7,7 @@ #include "storage/local_storage/local_node_table.h" #include "storage/local_storage/local_storage.h" #include "storage/table/arrow_node_table.h" +#include "storage/table/columnar_node_table_base.h" #include "storage/table/ice_disk_node_table.h" using namespace lbug::common; @@ -17,6 +18,11 @@ namespace processor { static std::unique_ptr createNodeTableScanState(NodeTable* table, ValueVector* nodeIDVector, const std::vector& outVectors, MemoryManager* memoryManager) { + // Allow extension-defined table types to supply their own scan state via virtual factory. + auto extensionState = table->createScanState(nodeIDVector, outVectors, memoryManager); + if (extensionState) { + return extensionState; + } if (dynamic_cast(table) != nullptr) { return std::make_unique(*memoryManager, nodeIDVector, outVectors, nodeIDVector->state); @@ -69,12 +75,15 @@ void ScanNodeTableSharedState::initialize(const transaction::Transaction* transa } catch (const std::exception& e) { this->numCommittedNodeGroups = 1; } - } else if (const auto arrowTable = dynamic_cast(table)) { - // For Arrow tables, set numCommittedNodeGroups to number of morsels - this->numCommittedNodeGroups = - static_cast(arrowTable->getNumScanMorsels(transaction)); } else { - this->numCommittedNodeGroups = table->getNumCommittedNodeGroups(); + // For morsel-based columnar tables (Arrow, Lance, etc.) getNumScanMorsels() returns + // the morsel count; for regular NodeTable it returns 0 → fall back to node groups. + auto morselCount = table->getNumScanMorsels(transaction); + if (morselCount > 0) { + this->numCommittedNodeGroups = static_cast(morselCount); + } else { + this->numCommittedNodeGroups = table->getNumCommittedNodeGroups(); + } } if (transaction->isWriteTransaction()) { if (const auto localTable = @@ -90,18 +99,22 @@ void ScanNodeTableSharedState::nextMorsel(TableScanState& scanState, ScanNodeTableProgressSharedState& progressSharedState) { std::unique_lock lck{mtx}; - // ColumnarNodeTables handle morsel assignment internally + // Morsel-based columnar tables (Arrow, Lance, …) dispatch through + // ColumnarNodeTableBase::getTableScanSharedState() / usesMorselScan(). // TODO: icebug-disk tables https://github.com/LadybugDB/ladybug/issues/245 - if (const auto arrowTable = dynamic_cast(this->table)) { - const auto tableSharedState = arrowTable->getTableScanSharedState(); - if (tableSharedState->getNextMorsel(static_cast(&scanState))) { - scanState.source = TableScanSource::COMMITTED; - progressSharedState.numMorselsScanned++; - } else { - scanState.source = TableScanSource::NONE; + if (this->table->usesMorselScan()) { + const auto columnarTable = dynamic_cast(this->table); + if (columnarTable) { + const auto tableSharedState = columnarTable->getTableScanSharedState(); + if (tableSharedState->getNextMorsel( + static_cast(&scanState))) { + scanState.source = TableScanSource::COMMITTED; + progressSharedState.numMorselsScanned++; + } else { + scanState.source = TableScanSource::NONE; + } + return; } - - return; } auto& nodeScanState = scanState.cast(); @@ -149,11 +162,12 @@ void ScanNodeTable::initCurrentTable(ExecutionContext* context) { outVectors, MemoryManager::Get(*context->clientContext)); currentInfo.initScanState(*scanState, outVectors, context->clientContext); scanState->semiMask = sharedStates[currentTableIdx]->getSemiMask(); - // Call table->initScanState for IceDiskNodeTable or ArrowNodeTable - if (dynamic_cast(tableInfos[currentTableIdx].table) || - dynamic_cast(tableInfos[currentTableIdx].table)) { + // Use virtual dispatch so extension-defined table types (Lance, etc.) work without + // hard-coding their concrete types here. + auto* nodeTable = tableInfos[currentTableIdx].table->ptrCast(); + if (nodeTable->requiresExplicitScanInit()) { auto transaction = transaction::Transaction::Get(*context->clientContext); - tableInfos[currentTableIdx].table->initScanState(transaction, *scanState); + nodeTable->initScanState(transaction, *scanState); } } diff --git a/src/storage/storage_manager.cpp b/src/storage/storage_manager.cpp index bd4c9954c9..191ad70a72 100644 --- a/src/storage/storage_manager.cpp +++ b/src/storage/storage_manager.cpp @@ -132,9 +132,20 @@ void StorageManager::createNodeTable(NodeTableCatalogEntry* entry, main::ClientC tables[entry->getTableID()] = std::make_unique(this, entry, &memoryManager, context); } else { - throw common::RuntimeException( - "Unsupported storage format option for node table: " + - std::to_string(static_cast(entry->getStorageFormat()))); + // Try extension-registered factory (e.g. LANCE) + std::unique_lock factoryLck{formatFactoryMtx}; + auto it = nodeTableFactories.find(entry->getStorageFormat()); + if (it != nodeTableFactories.end()) { + auto factory = it->second; + factoryLck.unlock(); + tables[entry->getTableID()] = factory(this, entry, &memoryManager, context); + } else { + throw common::RuntimeException( + std::format("Unsupported storage format for node table. " + "The extension providing this format may not be loaded. " + "Format id: {}", + static_cast(entry->getStorageFormat()))); + } } } else if (!entry->getStorage().empty()) { // Check if storage is Arrow backed @@ -186,9 +197,21 @@ void StorageManager::addRelTable(RelGroupCatalogEntry* entry, const RelTableCata tables[info.oid] = std::make_unique(entry, info.nodePair.srcTableID, info.nodePair.dstTableID, this, &memoryManager, context); } else { - throw common::RuntimeException( - "Unsupported storage format option for rel table: " + - std::to_string(static_cast(entry->getStorageFormat()))); + // Try extension-registered factory (e.g. LANCE) + std::unique_lock factoryLck{formatFactoryMtx}; + auto it = relTableFactories.find(entry->getStorageFormat()); + if (it != relTableFactories.end()) { + auto factory = it->second; + factoryLck.unlock(); + tables[info.oid] = factory(entry, info.nodePair.srcTableID, + info.nodePair.dstTableID, this, &memoryManager, context); + } else { + throw common::RuntimeException( + std::format("Unsupported storage format for rel table. " + "The extension providing this format may not be loaded. " + "Format id: {}", + static_cast(entry->getStorageFormat()))); + } } } else if (!entry->getStorage().empty()) { if (entry->getStorage().substr(0, 8) == "arrow://") { diff --git a/test/include/test_runner/test_group.h b/test/include/test_runner/test_group.h index e05754edda..597dd08b60 100644 --- a/test/include/test_runner/test_group.h +++ b/test/include/test_runner/test_group.h @@ -118,7 +118,8 @@ struct TestGroup { LBUG, JSON, CSV_TO_JSON, - ICEBUG_DISK + ICEBUG_DISK, + LANCE }; DatasetType datasetType; diff --git a/test/runner/e2e_test.cpp b/test/runner/e2e_test.cpp index 2a26f9bf5d..f19320893b 100644 --- a/test/runner/e2e_test.cpp +++ b/test/runner/e2e_test.cpp @@ -45,6 +45,16 @@ class EndToEndTest final : public DBTest { lbug::main::Connection* connection = conn ? conn.get() : (connMap.begin()->second).get(); TestHelper::executeScript(dataset + "/" + TestHelper::SCHEMA_FILE_NAME, *connection); + } else if (datasetType == TestGroup::DatasetType::LANCE) { + // For LANCE, load the extension first, then run schema.cypher + lbug::main::Connection* connection = + conn ? conn.get() : (connMap.begin()->second).get(); + auto loadResult = connection->query("LOAD EXTENSION 'lance'"); + if (!loadResult->isSuccess()) { + throw lbug::common::Exception( + std::format("Failed to load lance extension: {}", loadResult->getErrorMessage())); + } + TestHelper::executeScript(dataset + "/" + TestHelper::SCHEMA_FILE_NAME, *connection); } else if (datasetType != TestGroup::DatasetType::LBUG && dataset != "empty") { initGraph(); } else if (generateBinaryDemo && TestHelper::E2E_OVERRIDE_IMPORT_DIR.empty()) { diff --git a/test/test_helper/test_helper.cpp b/test/test_helper/test_helper.cpp index 8d0e92c751..0e1931a029 100644 --- a/test/test_helper/test_helper.cpp +++ b/test/test_helper/test_helper.cpp @@ -97,7 +97,8 @@ void TestHelper::executeScript(const std::string& cypherScript, Connection& conn size_t fmtEnd = line.find("\"", fmtStart); if (fmtEnd != std::string::npos) { std::string storageFormat = line.substr(fmtStart, fmtEnd - fmtStart); - if (storageFormat.find("icebug-disk") != std::string::npos) { + if (storageFormat.find("icebug-disk") != std::string::npos || + storageFormat.find("lance") != std::string::npos) { std::vector storagePaths; size_t storageIndex = 0; while (true) { diff --git a/test/test_runner/test_parser.cpp b/test/test_runner/test_parser.cpp index 71d3f2f4d0..678e258355 100644 --- a/test/test_runner/test_parser.cpp +++ b/test/test_runner/test_parser.cpp @@ -91,6 +91,9 @@ void TestParser::extractDataset() { } else if (datasetType == "ICEBUG-DISK") { testGroup->datasetType = TestGroup::DatasetType::ICEBUG_DISK; testGroup->dataset = currentToken.params[2]; + } else if (datasetType == "LANCE") { + testGroup->datasetType = TestGroup::DatasetType::LANCE; + testGroup->dataset = currentToken.params[2]; } else { throw TestException( "Invalid dataset type `" + currentToken.params[1] + "` [" + path + ":" + line + "].");