Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,25 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo*
"Cannot mix icebug-disk tables with non-icebug-disk tables in CREATE REL TABLE.");
}

bool isSrcLance = srcEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY ?
srcEntry->ptrCast<NodeTableCatalogEntry>()->getStorageFormat() ==
StorageFormat::LANCE :
false;
bool isDstLance = dstEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY ?
dstEntry->ptrCast<NodeTableCatalogEntry>()->getStorageFormat() ==
StorageFormat::LANCE :
false;
bool isRelLance = (storageFormat == StorageFormat::LANCE);

// Lance rel tables must connect lance node tables, and non-lance rel tables
// cannot connect lance node tables.
if ((!isRelLance && (isSrcLance || isDstLance)) ||
(isRelLance && (!isSrcLance || !isDstLance))) {
throw BinderException(
"Cannot mix lance tables with non-lance tables in CREATE REL TABLE. "
"Lance rel tables must connect lance node tables.");
}

// Use the actual shadow table IDs, not FOREIGN_TABLE_ID
// The shadow tables allow the query planner to distinguish between different node tables
auto srcTableID = srcEntry->getTableID();
Expand Down
5 changes: 4 additions & 1 deletion src/common/enums/storage_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ StorageFormat StorageFormatUtils::fromString(const std::string& str) {
if (str == "icebug-disk") {
return StorageFormat::ICEBUG_DISK;
}
if (str == "lance") {
return StorageFormat::LANCE;
}
throw BinderException(
std::format("Unsupported storage format '{}'. Valid options are: icebug-disk.", str));
std::format("Unsupported storage format '{}'. Valid options are: icebug-disk, lance.", str));
}

} // namespace common
Expand Down
3 changes: 3 additions & 0 deletions src/extension/extension_entries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ static constexpr std::array vectorExtensionFunctions = {"QUERY_VECTOR_INDEX", "C
"DROP_VECTOR_INDEX"};
static constexpr std::array llmExtensionFunctions = {"CREATE_EMBEDDING"};
static constexpr std::array neo4jExtensionFunctions = {"NEO4J_MIGRATE"};
static constexpr std::array lanceExtensionFunctions = {"LANCE_VECTOR_SEARCH", "LANCE_FTS",
"LANCE_HYBRID_SEARCH"};
static constexpr std::array algoExtensionFunctions = {"K_CORE_DECOMPOSITION", "PAGE_RANK",
"STRONGLY_CONNECTED_COMPONENTS_KOSARAJU", "STRONGLY_CONNECTED_COMPONENTS",
"WEAKLY_CONNECTED_COMPONENTS"};
Expand All @@ -40,6 +42,7 @@ static constexpr EntriesForExtension functionsForExtensionsRaw[] = {
{"LLM", llmExtensionFunctions, llmExtensionFunctions.size()},
{"NEO4J", neo4jExtensionFunctions, neo4jExtensionFunctions.size()},
{"ALGO", algoExtensionFunctions, algoExtensionFunctions.size()},
{"LANCE", lanceExtensionFunctions, lanceExtensionFunctions.size()},
};
static constexpr std::array functionsForExtensions = std::to_array(functionsForExtensionsRaw);

Expand Down
15 changes: 15 additions & 0 deletions src/include/common/arrow/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ struct ArrowArray {
void* private_data;
};

// Arrow C Stream Interface
// https://arrow.apache.org/docs/format/CStreamInterface.html
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

struct ArrowArrayStream {
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
void (*release)(struct ArrowArrayStream*);
void* private_data;
};

#endif // ARROW_C_STREAM_INTERFACE

#endif // ARROW_C_DATA_INTERFACE

#ifdef __cplusplus
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/enums/storage_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace lbug {
namespace common {

enum class StorageFormat : uint8_t { NONE, ICEBUG_DISK };
enum class StorageFormat : uint8_t { NONE, ICEBUG_DISK, LANCE };

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the base need to know about LANCE? I'm concerned that isLance kind of negates the purpose of extensions.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it’s really possible to distinguish Lance from other tables without a catalog or namespace otherwise

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you concerned about the mix tables check or the enum? The enum serves a similiar purpose as CatalogEntryType::FOREIGN_TABLE_ENTRY

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename it to something generic instead of Lance. There are at least 5 table formats that want to replace parquet.


struct StorageFormatUtils {
static StorageFormat fromString(const std::string& str);
Expand Down
2 changes: 1 addition & 1 deletion src/include/extension/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct LBUG_API ExtensionUtils {

static constexpr const char* OFFICIAL_EXTENSION[] = {"ADBC", "HTTPFS", "POSTGRES", "DUCKDB",
"JSON", "SQLITE", "FTS", "DELTA", "ICEBERG", "AZURE", "UNITY_CATALOG", "VECTOR", "NEO4J",
"ALGO", "LLM"};
"ALGO", "LLM", "LANCE"};

static constexpr const char* EXTENSION_LOADER_SUFFIX = "_loader";

Expand Down
27 changes: 27 additions & 0 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#pragma once

#include <functional>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>

#include "common/enums/storage_format.h"
#include "shadow_file.h"
#include "storage/index/index.h"
#include "storage/stats/planner_stats.h"
Expand Down Expand Up @@ -33,6 +36,15 @@ class RelTable;
class DiskArrayCollection;
struct DatabaseHeader;

/// Factory signature for extension-provided node tables (e.g. LanceNodeTable).
using NodeTableFactory = std::function<std::unique_ptr<Table>(const StorageManager*,
const catalog::NodeTableCatalogEntry*, MemoryManager*, main::ClientContext*)>;

/// Factory signature for extension-provided rel tables (e.g. LanceRelTable).
using RelTableFactory = std::function<std::unique_ptr<Table>(catalog::RelGroupCatalogEntry*,
common::table_id_t, common::table_id_t, const StorageManager*, MemoryManager*,
main::ClientContext*)>;

class LBUG_API StorageManager {
public:
StorageManager(const std::string& databasePath, bool readOnly, bool enableChecksums,
Expand Down Expand Up @@ -83,6 +95,16 @@ class LBUG_API StorageManager {
std::optional<std::reference_wrapper<const IndexType>> getIndexType(
const std::string& typeName) const;

/// Register factories for an extension-defined storage format (e.g. LANCE).
/// The extension must call this during its load() function before any
/// CREATE TABLE with that format is attempted.
void registerStorageFormatHandler(common::StorageFormat format, NodeTableFactory nodeFactory,
RelTableFactory relFactory) {
std::unique_lock lck{formatFactoryMtx};
nodeTableFactories[format] = std::move(nodeFactory);
relTableFactories[format] = std::move(relFactory);
}

void serialize(const catalog::Catalog& catalog, common::Serializer& ser);
void serialize(const catalog::Catalog& catalog, const transaction::Transaction& snapshotTxn,
common::Serializer& ser);
Expand Down Expand Up @@ -132,6 +154,11 @@ class LBUG_API StorageManager {
std::unordered_map<common::table_id_t, PlannerTableStats> plannerStatsCache;
std::unordered_map<common::table_id_t, std::string> tableNameCache;
common::VirtualFileSystem* vfs_; // non-owning, owned by Database

// Extension-provided storage format factories (protected by formatFactoryMtx)
mutable std::mutex formatFactoryMtx;
std::unordered_map<common::StorageFormat, NodeTableFactory> nodeTableFactories;
std::unordered_map<common::StorageFormat, RelTableFactory> relTableFactories;
};

} // namespace storage
Expand Down
9 changes: 7 additions & 2 deletions src/include/storage/table/arrow_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,19 @@ class ArrowNodeTable final : public ColumnarNodeTableBase {
bool isVisibleNoLock(const transaction::Transaction* transaction,
common::offset_t offset) const override;

// Virtual dispatch methods for scan_node_table.cpp extensibility
bool requiresExplicitScanInit() const override { return true; }
bool usesMorselScan() const override { return true; }
size_t getNumScanMorsels(const transaction::Transaction* transaction) const override;
// Note: createScanState() left at base default; ArrowNodeTable scan state is
// created via the existing dynamic_cast path in createNodeTableScanState().

const ArrowSchemaWrapper& getArrowSchema() const { return schema; }
const std::vector<ArrowArrayWrapper>& getArrowArrays() const { return arrays; }

common::node_group_idx_t getNumBatches(
const transaction::Transaction* transaction) const override;

size_t getNumScanMorsels(const transaction::Transaction* transaction) const;

const catalog::NodeTableCatalogEntry* getCatalogEntry() const { return nodeTableCatalogEntry; }

protected:
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/table/ice_disk_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class IceDiskNodeTable final : public ColumnarNodeTableBase {
bool isVisibleNoLock(const transaction::Transaction* transaction,
common::offset_t offset) const override;

// Virtual dispatch for scan_node_table.cpp extensibility
bool requiresExplicitScanInit() const override { return true; }
// IceDisk uses nodeGroupIdx-based scanning, not morsel-based.
// usesMorselScan() and getNumScanMorsels() are left at NodeTable defaults (false/0).

const std::string& getParquetFilePath() const { return parquetFilePath; }

protected:
Expand Down
29 changes: 29 additions & 0 deletions src/include/storage/table/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,35 @@ class LBUG_API NodeTable : public Table {
virtual void initializeScanCoordination(
[[maybe_unused]] const transaction::Transaction* transaction) {}

// Virtual dispatch methods for scan_node_table.cpp extensibility.
// These allow extension-defined table types (e.g. LanceNodeTable) to plug
// into the core scan infrastructure without requiring dynamic_casts to
// concrete types that live in extension libraries.

/// Returns true if this table type requires explicit initScanState() calls
/// in initCurrentTable() (e.g. Arrow, Lance, IceDisk columnar tables).
virtual bool requiresExplicitScanInit() const { return false; }

/// Returns true if this table drives scanning via ColumnarNodeTableScanSharedState::getNextMorsel()
/// rather than via nodeGroupIdx assignment (e.g. Arrow, Lance).
virtual bool usesMorselScan() const { return false; }

/// Returns the number of scan morsels for progress tracking.
/// Returns 0 for tables that use the IceDisk row-group path.
virtual size_t getNumScanMorsels(
[[maybe_unused]] const transaction::Transaction* transaction) const {
return 0;
}

/// Creates a format-specific TableScanState. Returns nullptr to fall back to
/// the built-in dispatch (ArrowNodeTable / IceDiskNodeTable / default).
virtual std::unique_ptr<TableScanState> createScanState(
[[maybe_unused]] common::ValueVector* nodeIDVector,
[[maybe_unused]] const std::vector<common::ValueVector*>& outVectors,
[[maybe_unused]] MemoryManager* memoryManager) const {
return nullptr;
}

bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override;
template<bool lock = true>
bool lookup(const transaction::Transaction* transaction, const TableScanState& scanState) const;
Expand Down
52 changes: 33 additions & 19 deletions src/processor/operator/scan/scan_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "storage/local_storage/local_node_table.h"
#include "storage/local_storage/local_storage.h"
#include "storage/table/arrow_node_table.h"
#include "storage/table/columnar_node_table_base.h"
#include "storage/table/ice_disk_node_table.h"

using namespace lbug::common;
Expand All @@ -17,6 +18,11 @@ namespace processor {
static std::unique_ptr<TableScanState> createNodeTableScanState(NodeTable* table,
ValueVector* nodeIDVector, const std::vector<ValueVector*>& outVectors,
MemoryManager* memoryManager) {
// Allow extension-defined table types to supply their own scan state via virtual factory.
auto extensionState = table->createScanState(nodeIDVector, outVectors, memoryManager);
if (extensionState) {
return extensionState;
}
if (dynamic_cast<IceDiskNodeTable*>(table) != nullptr) {
return std::make_unique<IceDiskNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
Expand Down Expand Up @@ -69,12 +75,15 @@ void ScanNodeTableSharedState::initialize(const transaction::Transaction* transa
} catch (const std::exception& e) {
this->numCommittedNodeGroups = 1;
}
} else if (const auto arrowTable = dynamic_cast<ArrowNodeTable*>(table)) {
// For Arrow tables, set numCommittedNodeGroups to number of morsels
this->numCommittedNodeGroups =
static_cast<common::node_group_idx_t>(arrowTable->getNumScanMorsels(transaction));
} else {
this->numCommittedNodeGroups = table->getNumCommittedNodeGroups();
// For morsel-based columnar tables (Arrow, Lance, etc.) getNumScanMorsels() returns
// the morsel count; for regular NodeTable it returns 0 → fall back to node groups.
auto morselCount = table->getNumScanMorsels(transaction);
if (morselCount > 0) {
this->numCommittedNodeGroups = static_cast<common::node_group_idx_t>(morselCount);
} else {
this->numCommittedNodeGroups = table->getNumCommittedNodeGroups();
}
}
if (transaction->isWriteTransaction()) {
if (const auto localTable =
Expand All @@ -90,18 +99,22 @@ void ScanNodeTableSharedState::nextMorsel(TableScanState& scanState,
ScanNodeTableProgressSharedState& progressSharedState) {
std::unique_lock lck{mtx};

// ColumnarNodeTables handle morsel assignment internally
// Morsel-based columnar tables (Arrow, Lance, …) dispatch through
// ColumnarNodeTableBase::getTableScanSharedState() / usesMorselScan().
// TODO: icebug-disk tables https://github.com/LadybugDB/ladybug/issues/245
if (const auto arrowTable = dynamic_cast<ArrowNodeTable*>(this->table)) {
const auto tableSharedState = arrowTable->getTableScanSharedState();
if (tableSharedState->getNextMorsel(static_cast<ColumnarNodeTableScanState*>(&scanState))) {
scanState.source = TableScanSource::COMMITTED;
progressSharedState.numMorselsScanned++;
} else {
scanState.source = TableScanSource::NONE;
if (this->table->usesMorselScan()) {
const auto columnarTable = dynamic_cast<ColumnarNodeTableBase*>(this->table);
if (columnarTable) {
const auto tableSharedState = columnarTable->getTableScanSharedState();
if (tableSharedState->getNextMorsel(
static_cast<ColumnarNodeTableScanState*>(&scanState))) {
scanState.source = TableScanSource::COMMITTED;
progressSharedState.numMorselsScanned++;
} else {
scanState.source = TableScanSource::NONE;
}
return;
}

return;
}

auto& nodeScanState = scanState.cast<NodeTableScanState>();
Expand Down Expand Up @@ -149,11 +162,12 @@ void ScanNodeTable::initCurrentTable(ExecutionContext* context) {
outVectors, MemoryManager::Get(*context->clientContext));
currentInfo.initScanState(*scanState, outVectors, context->clientContext);
scanState->semiMask = sharedStates[currentTableIdx]->getSemiMask();
// Call table->initScanState for IceDiskNodeTable or ArrowNodeTable
if (dynamic_cast<IceDiskNodeTable*>(tableInfos[currentTableIdx].table) ||
dynamic_cast<ArrowNodeTable*>(tableInfos[currentTableIdx].table)) {
// Use virtual dispatch so extension-defined table types (Lance, etc.) work without
// hard-coding their concrete types here.
auto* nodeTable = tableInfos[currentTableIdx].table->ptrCast<NodeTable>();
if (nodeTable->requiresExplicitScanInit()) {
auto transaction = transaction::Transaction::Get(*context->clientContext);
tableInfos[currentTableIdx].table->initScanState(transaction, *scanState);
nodeTable->initScanState(transaction, *scanState);
}
}

Expand Down
35 changes: 29 additions & 6 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,20 @@ void StorageManager::createNodeTable(NodeTableCatalogEntry* entry, main::ClientC
tables[entry->getTableID()] =
std::make_unique<IceDiskNodeTable>(this, entry, &memoryManager, context);
} else {
throw common::RuntimeException(
"Unsupported storage format option for node table: " +
std::to_string(static_cast<int>(entry->getStorageFormat())));
// Try extension-registered factory (e.g. LANCE)
std::unique_lock factoryLck{formatFactoryMtx};
auto it = nodeTableFactories.find(entry->getStorageFormat());
if (it != nodeTableFactories.end()) {
auto factory = it->second;
factoryLck.unlock();
tables[entry->getTableID()] = factory(this, entry, &memoryManager, context);
} else {
throw common::RuntimeException(
std::format("Unsupported storage format for node table. "
"The extension providing this format may not be loaded. "
"Format id: {}",
static_cast<int>(entry->getStorageFormat())));
}
}
} else if (!entry->getStorage().empty()) {
// Check if storage is Arrow backed
Expand Down Expand Up @@ -186,9 +197,21 @@ void StorageManager::addRelTable(RelGroupCatalogEntry* entry, const RelTableCata
tables[info.oid] = std::make_unique<IceDiskRelTable>(entry, info.nodePair.srcTableID,
info.nodePair.dstTableID, this, &memoryManager, context);
} else {
throw common::RuntimeException(
"Unsupported storage format option for rel table: " +
std::to_string(static_cast<int>(entry->getStorageFormat())));
// Try extension-registered factory (e.g. LANCE)
std::unique_lock factoryLck{formatFactoryMtx};
auto it = relTableFactories.find(entry->getStorageFormat());
if (it != relTableFactories.end()) {
auto factory = it->second;
factoryLck.unlock();
tables[info.oid] = factory(entry, info.nodePair.srcTableID,
info.nodePair.dstTableID, this, &memoryManager, context);
} else {
throw common::RuntimeException(
std::format("Unsupported storage format for rel table. "
"The extension providing this format may not be loaded. "
"Format id: {}",
static_cast<int>(entry->getStorageFormat())));
}
}
} else if (!entry->getStorage().empty()) {
if (entry->getStorage().substr(0, 8) == "arrow://") {
Expand Down
3 changes: 2 additions & 1 deletion test/include/test_runner/test_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ struct TestGroup {
LBUG,
JSON,
CSV_TO_JSON,
ICEBUG_DISK
ICEBUG_DISK,
LANCE
};
DatasetType datasetType;

Expand Down
10 changes: 10 additions & 0 deletions test/runner/e2e_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ class EndToEndTest final : public DBTest {
lbug::main::Connection* connection =
conn ? conn.get() : (connMap.begin()->second).get();
TestHelper::executeScript(dataset + "/" + TestHelper::SCHEMA_FILE_NAME, *connection);
} else if (datasetType == TestGroup::DatasetType::LANCE) {
// For LANCE, load the extension first, then run schema.cypher
lbug::main::Connection* connection =
conn ? conn.get() : (connMap.begin()->second).get();
auto loadResult = connection->query("LOAD EXTENSION 'lance'");
if (!loadResult->isSuccess()) {
throw lbug::common::Exception(
std::format("Failed to load lance extension: {}", loadResult->getErrorMessage()));
}
TestHelper::executeScript(dataset + "/" + TestHelper::SCHEMA_FILE_NAME, *connection);
} else if (datasetType != TestGroup::DatasetType::LBUG && dataset != "empty") {
initGraph();
} else if (generateBinaryDemo && TestHelper::E2E_OVERRIDE_IMPORT_DIR.empty()) {
Expand Down
3 changes: 2 additions & 1 deletion test/test_helper/test_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ void TestHelper::executeScript(const std::string& cypherScript, Connection& conn
size_t fmtEnd = line.find("\"", fmtStart);
if (fmtEnd != std::string::npos) {
std::string storageFormat = line.substr(fmtStart, fmtEnd - fmtStart);
if (storageFormat.find("icebug-disk") != std::string::npos) {
if (storageFormat.find("icebug-disk") != std::string::npos ||
storageFormat.find("lance") != std::string::npos) {
std::vector<std::string> storagePaths;
size_t storageIndex = 0;
while (true) {
Expand Down
Loading
Loading