Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ add_extension_if_enabled_and_skip_32bit("delta")
add_extension_if_enabled_and_skip_32bit("iceberg")
add_extension_if_enabled_and_skip_32bit("azure")
add_extension_if_enabled_and_skip_32bit("unity_catalog")
add_extension_if_enabled_and_skip_32bit("lance")
add_extension_if_enabled("json")
add_extension_if_enabled("fts")
add_extension_if_enabled("vector")
Expand Down
2 changes: 1 addition & 1 deletion extension_config.cmake
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
set(EXTENSION_LIST adbc azure delta duckdb fts httpfs iceberg json llm postgres sqlite unity_catalog vector neo4j algo)
set(EXTENSION_LIST adbc azure delta duckdb fts httpfs iceberg json lance llm postgres sqlite unity_catalog vector neo4j algo)

#set(EXTENSION_STATIC_LINK_LIST fts)
foreach(extension IN LISTS EXTENSION_STATIC_LINK_LIST)
Expand Down
65 changes: 65 additions & 0 deletions lance/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
cmake_minimum_required(VERSION 3.22)

# ── lance-c dependency ────────────────────────────────────────────────────────
# Lance-c lives next to the ladybug repo at ../../lancedb/lance-c.
# We use add_subdirectory so its Corrosion / prebuilt CMake logic runs in-tree.
set(LANCE_C_ROOT "${PROJECT_SOURCE_DIR}/../../lancedb/lance-c"
CACHE PATH "Path to the lance-c source tree")

if(NOT TARGET LanceC::lance_c)
if(NOT EXISTS "${LANCE_C_ROOT}/CMakeLists.txt")
message(FATAL_ERROR
"lance-c not found at ${LANCE_C_ROOT}. "
"Set LANCE_C_ROOT to the lance-c repository path.")
endif()
# Use prebuilt if LANCE_C_PREBUILT_DIR is set, otherwise build from source
if(LANCE_C_PREBUILT_DIR)
set(LANCE_C_USE_PREBUILT ON CACHE BOOL "" FORCE)
endif()
add_subdirectory(${LANCE_C_ROOT} ${CMAKE_CURRENT_BINARY_DIR}/lance-c EXCLUDE_FROM_ALL)
endif()

# ── Extension source ──────────────────────────────────────────────────────────
include_directories(
${PROJECT_SOURCE_DIR}/src/include
${CMAKE_BINARY_DIR}/src/include
src/include
${LANCE_C_ROOT}/include
)

set(LANCE_EXTENSION_OBJECT_FILES
$<TARGET_OBJECTS:lance_extension_objects>
)

add_library(lance_extension_objects OBJECT
src/lance_extension.cpp
src/lance_node_table.cpp
src/lance_rel_table.cpp
src/lance_functions.cpp
)

target_include_directories(lance_extension_objects PRIVATE
${PROJECT_SOURCE_DIR}/src/include
${CMAKE_BINARY_DIR}/src/include
src/include
${LANCE_C_ROOT}/include
)

# ── Build the extension library ───────────────────────────────────────────────
build_extension_lib(${BUILD_STATIC_EXTENSION} "lance")

target_link_libraries(lbug_${EXTENSION_LIB_NAME}_extension
PRIVATE
LanceC::lance_c_static
)

# Platform-specific link dependencies for the Rust-backed lance-c library
if(UNIX AND NOT APPLE)
target_link_libraries(lbug_${EXTENSION_LIB_NAME}_extension
PRIVATE pthread dl m)
endif()

# ── Tests ─────────────────────────────────────────────────────────────────────
if(BUILD_EXTENSION_TESTS)
add_subdirectory(test)
endif()
16 changes: 16 additions & 0 deletions lance/src/include/lance_extension.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include "extension/extension.h"

namespace lbug {
namespace lance_extension {

class LanceExtension final : public extension::Extension {
public:
static constexpr char EXTENSION_NAME[] = "LANCE";

static void load(main::ClientContext* context);
};

} // namespace lance_extension
} // namespace lbug
32 changes: 32 additions & 0 deletions lance/src/include/lance_functions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "function/table/table_function.h"

namespace lbug {
namespace lance_extension {

using function::function_set;

/// LANCE_VECTOR_SEARCH(dataset_path, column, query_vector, k [, metric [, nprobes]])
/// Returns nearest-neighbour rows together with a '_distance' column.
struct LanceVectorSearchFunction {
static constexpr char name[] = "LANCE_VECTOR_SEARCH";
static function_set getFunctionSet();
};

/// LANCE_FTS(dataset_path, query [, columns [, max_fuzzy_distance]])
/// Returns full-text search result rows together with a '_score' column.
struct LanceFTSFunction {
static constexpr char name[] = "LANCE_FTS";
static function_set getFunctionSet();
};

/// LANCE_HYBRID_SEARCH(dataset_path, column, query_vector, k, fts_query)
/// Returns rows matching both vector and full-text criteria with fusion scoring.
struct LanceHybridSearchFunction {
static constexpr char name[] = "LANCE_HYBRID_SEARCH";
static function_set getFunctionSet();
};

} // namespace lance_extension
} // namespace lbug
146 changes: 146 additions & 0 deletions lance/src/include/lance_node_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#pragma once

#include <memory>
#include <mutex>
#include <vector>

#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "common/arrow/arrow.h"
#include "common/exception/runtime.h"
#include "common/types/internal_id_util.h"
#include "storage/table/columnar_node_table_base.h"

namespace lbug {
namespace lance_extension {

/// Owns a single Arrow RecordBatch obtained from an ArrowArrayStream.
/// The ArrowArray is released in the destructor; ArrowSchema is NOT owned
/// (it comes from the shared stream schema and is managed by the shared state).
struct LanceBatchData {
ArrowSchema schema;
ArrowArray array;
uint64_t length = 0;

LanceBatchData() {
std::memset(&schema, 0, sizeof(schema));
std::memset(&array, 0, sizeof(array));
}
~LanceBatchData() {
if (array.release) array.release(&array);
// schema.release is intentionally left null (owned by shared state)
}

LanceBatchData(const LanceBatchData&) = delete;
LanceBatchData& operator=(const LanceBatchData&) = delete;
LanceBatchData(LanceBatchData&&) = delete;
LanceBatchData& operator=(LanceBatchData&&) = delete;
};

/// Per-thread scan state for LanceNodeTable.
struct LanceNodeTableScanState final : storage::ColumnarNodeTableScanState {
std::shared_ptr<LanceBatchData> currentBatch;
uint64_t batchStartGlobalOffset = 0;
uint64_t morselStart = 0;
uint64_t morselEnd = 0;

LanceNodeTableScanState(storage::MemoryManager& mm, common::ValueVector* nodeIDVector,
std::vector<common::ValueVector*> outputVectors,
std::shared_ptr<common::DataChunkState> outChunkState)
: storage::ColumnarNodeTableScanState{mm, nodeIDVector, std::move(outputVectors),
std::move(outChunkState)} {}
};

/// Shared scan coordination state for LanceNodeTable.
struct LanceNodeTableScanSharedState final : storage::ColumnarNodeTableScanSharedState {
explicit LanceNodeTableScanSharedState(size_t morselSize) : morselSize(morselSize) {
std::memset(&stream_, 0, sizeof(stream_));
}

~LanceNodeTableScanSharedState() override {
if (stream_.release) stream_.release(&stream_);
}

void reset(ArrowArrayStream newStream);

bool getNextMorsel(storage::ColumnarNodeTableScanState* scanState) override;

private:
bool readNextBatch();

std::mutex mtx;
ArrowArrayStream stream_;
bool streamExhausted = true;

std::shared_ptr<LanceBatchData> currentBatch;
uint64_t currentBatchGlobalOffset = 0;
uint64_t currentMorselStart = 0;
const size_t morselSize;

bool streamSchemaFetched = false;
ArrowSchema streamSchema_;

LanceNodeTableScanSharedState(const LanceNodeTableScanSharedState&) = delete;
LanceNodeTableScanSharedState& operator=(const LanceNodeTableScanSharedState&) = delete;
};

/// A node table backed by a Lance dataset.
class LanceNodeTable final : public storage::ColumnarNodeTableBase {
public:
LanceNodeTable(const storage::StorageManager* storageManager,
const catalog::NodeTableCatalogEntry* nodeTableEntry,
storage::MemoryManager* memoryManager, main::ClientContext* context);

~LanceNodeTable() override = default;

void initializeScanCoordination(const transaction::Transaction* transaction) override;

void initScanState(transaction::Transaction* transaction, storage::TableScanState& scanState,
bool resetCachedBoundNodeSelVec = true) const override;

bool scanInternal(transaction::Transaction* transaction,
storage::TableScanState& scanState) override;

bool requiresExplicitScanInit() const override { return true; }
bool usesMorselScan() const override { return true; }
size_t getNumScanMorsels(const transaction::Transaction* transaction) const override;

std::unique_ptr<storage::TableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors,
storage::MemoryManager* memoryManager) const override;

bool isVisible(const transaction::Transaction* transaction,
common::offset_t offset) const override;
bool isVisibleNoLock(const transaction::Transaction* transaction,
common::offset_t offset) const override;

bool lookupPK(const transaction::Transaction* transaction, common::ValueVector* keyVector,
uint64_t vectorPos, common::offset_t& result) const override;

const std::string& getLanceDatasetPath() const { return datasetPath; }

protected:
std::string getColumnarFormatName() const override { return "lance"; }
common::node_group_idx_t getNumBatches(
const transaction::Transaction* transaction) const override;
common::row_idx_t getTotalRowCount(const transaction::Transaction* transaction) const override;

private:
std::vector<int64_t> getOutputToLanceColumnIdx(
const std::vector<common::column_id_t>& columnIDs) const;

void copyLanceMorselToOutputVectors(const LanceBatchData& batch, uint64_t morselStart,
uint64_t numRows, const std::vector<common::ValueVector*>& outputVectors,
const std::vector<int64_t>& outputToLanceColumnIdx) const;

private:
std::string datasetPath;
mutable uint64_t cachedTotalRows = common::INVALID_ROW_IDX;
uint32_t numLanceColumns = 0;
ArrowSchema cachedSchema_;
bool schemaCached = false;

static constexpr size_t kDefaultMorselSize = 2048;
};

} // namespace lance_extension
} // namespace lbug
90 changes: 90 additions & 0 deletions lance/src/include/lance_rel_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#pragma once

#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>

#include "catalog/catalog_entry/rel_group_catalog_entry.h"
#include "common/arrow/arrow.h"
#include "common/exception/runtime.h"
#include "common/types/internal_id_util.h"
#include "storage/table/columnar_rel_table_base.h"

namespace lbug {
namespace lance_extension {

struct LanceBatchData;

/// Per-thread scan state for LanceRelTable.
struct LanceRelTableScanState final : storage::RelTableScanState {
std::shared_ptr<LanceBatchData> cachedBatchData;
uint64_t currentBatchStartOffset = 0;
uint64_t currentLocalRowIdx = 0;

std::unordered_map<common::offset_t, common::sel_t> boundNodeOffsets;

ArrowArrayStream stream;
bool streamExhausted = false;
bool streamInitialized = false;

ArrowSchema streamSchema;
bool schemaFetched = false;

LanceRelTableScanState(storage::MemoryManager& mm, common::ValueVector* nodeIDVector,
std::vector<common::ValueVector*> outputVectors,
std::shared_ptr<common::DataChunkState> outChunkState);

~LanceRelTableScanState() override;

void setToTable(const transaction::Transaction* transaction, storage::Table* table_,
std::vector<common::column_id_t> columnIDs_,
std::vector<storage::ColumnPredicateSet> columnPredicateSets_,
common::RelDataDirection direction_) override;

void reset(std::unordered_map<common::offset_t, common::sel_t> boundNodeOffsets_);

LanceRelTableScanState(const LanceRelTableScanState&) = delete;
LanceRelTableScanState& operator=(const LanceRelTableScanState&) = delete;
};

/// A relationship table backed by a Lance dataset.
class LanceRelTable final : public storage::ColumnarRelTableBase {
public:
LanceRelTable(catalog::RelGroupCatalogEntry* relGroupEntry, common::table_id_t fromTableID,
common::table_id_t toTableID, const storage::StorageManager* storageManager,
storage::MemoryManager* memoryManager, main::ClientContext* context);

~LanceRelTable() override = default;

void initScanState(transaction::Transaction* transaction, storage::TableScanState& scanState,
bool resetCachedBoundNodeSelVec = true) const override;

bool scanInternal(transaction::Transaction* transaction,
storage::TableScanState& scanState) override;

const std::string& getLanceDatasetPath() const { return datasetPath; }

protected:
std::string getColumnarFormatName() const override { return "lance"; }
common::row_idx_t getTotalRowCount(const transaction::Transaction* transaction) const override;
common::row_idx_t getActiveBoundNodeCount(const transaction::Transaction* transaction,
common::RelDataDirection direction) const override;
std::vector<std::pair<common::offset_t, common::row_idx_t>> getAllDegreeEntries(
const transaction::Transaction* transaction,
common::RelDataDirection direction) const override;
std::vector<std::pair<common::offset_t, common::row_idx_t>> getTopKDegreeEntries(
const transaction::Transaction* transaction, common::RelDataDirection direction,
common::idx_t k) const override;

private:
bool scanFlat(transaction::Transaction* transaction, LanceRelTableScanState& scanState);

int32_t fromColumnIdx = -1;
int32_t toColumnIdx = -1;
std::string datasetPath;
mutable uint64_t cachedTotalRows = common::INVALID_ROW_IDX;
};

} // namespace lance_extension
} // namespace lbug
Loading