Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/include/storage/table/arrow_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ class ArrowNodeTable final : public ColumnarNodeTableBase {
const ArrowSchemaWrapper& getArrowSchema() const { return schema; }
const std::vector<ArrowArrayWrapper>& getArrowArrays() const { return arrays; }

common::node_group_idx_t getNumBatches(
common::node_group_idx_t getNumScanMorsels(
const transaction::Transaction* transaction) const override;

size_t getNumScanMorsels(const transaction::Transaction* transaction) const;

const catalog::NodeTableCatalogEntry* getCatalogEntry() const { return nodeTableCatalogEntry; }

std::unique_ptr<TableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors,
MemoryManager* memoryManager) const override;

protected:
std::string getColumnarFormatName() const override { return "Arrow"; }
common::row_idx_t getTotalRowCount(const transaction::Transaction* transaction) const override;
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/table/arrow_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class ArrowRelTable final : public ColumnarRelTableBase {
bool resetCachedBoundNodeSelVec = true) const override;

bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override;
std::unique_ptr<TableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager,
std::shared_ptr<common::DataChunkState> outChunkState) const override;

protected:
std::string getColumnarFormatName() const override { return "Arrow"; }
Expand Down
7 changes: 5 additions & 2 deletions src/include/storage/table/columnar_node_table_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,18 @@ class ColumnarNodeTableBase : public NodeTable {

// Template method pattern: subclasses implement format-specific operations
virtual std::string getColumnarFormatName() const = 0;
virtual common::node_group_idx_t getNumBatches(
const transaction::Transaction* transaction) const = 0;
virtual common::row_idx_t getTotalRowCount(
const transaction::Transaction* transaction) const = 0;

public:
ColumnarNodeTableScanSharedState* getTableScanSharedState() const {
return tableScanSharedState.get();
}
virtual std::unique_ptr<TableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors,
MemoryManager* memoryManager) const = 0;
virtual common::node_group_idx_t getNumScanMorsels(
const transaction::Transaction* transaction) const = 0;
};

} // namespace storage
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/table/columnar_rel_table_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class ColumnarRelTableBase : public RelTable {
std::vector<std::pair<common::offset_t, common::row_idx_t>> getTopKDegrees(
const transaction::Transaction* transaction, common::RelDataDirection direction,
common::idx_t k) override;
virtual std::unique_ptr<TableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager,
std::shared_ptr<common::DataChunkState> outChunkState) const = 0;

protected:
catalog::RelGroupCatalogEntry* relGroupEntry;
Expand Down
7 changes: 5 additions & 2 deletions src/include/storage/table/ice_disk_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,15 @@ class IceDiskNodeTable final : public ColumnarNodeTableBase {
common::offset_t offset) const override;

const std::string& getParquetFilePath() const { return parquetFilePath; }
std::unique_ptr<TableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors,
MemoryManager* memoryManager) const override;
common::node_group_idx_t getNumScanMorsels(
const transaction::Transaction* transaction) const override;

protected:
// Implement ColumnarNodeTableBase interface
std::string getColumnarFormatName() const override { return "icebug-disk"; }
common::node_group_idx_t getNumBatches(
const transaction::Transaction* transaction) const override;
common::row_idx_t getTotalRowCount(const transaction::Transaction* transaction) const override;

private:
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/table/ice_disk_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class IceDiskRelTable final : public ColumnarRelTableBase {
bool resetCachedBoundNodeSelVec = true) const override;

bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override;
std::unique_ptr<TableScanState> createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager,
std::shared_ptr<common::DataChunkState> outChunkState) const override;

protected:
// Implement ColumnarRelTableBase interface
Expand Down
27 changes: 10 additions & 17 deletions src/processor/operator/scan/scan_multi_rel_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,27 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo
auto nbrNodeIDVector = outVectors[0];

// Check if any table in any scanner is an external rel table with a custom scan state.
bool hasArrowTable = false;
bool hasIceDiskTable = false;
storage::ColumnarRelTableBase* columnarTable = nullptr;
for (auto& [_, scanner] : scanners) {
for (auto& relInfo : scanner.relInfos) {
if (dynamic_cast<storage::ArrowRelTable*>(relInfo.table) != nullptr) {
hasArrowTable = true;
if (dynamic_cast<storage::ColumnarRelTableBase*>(relInfo.table) != nullptr) {
columnarTable = dynamic_cast<storage::ColumnarRelTableBase*>(relInfo.table);
break;
}
if (dynamic_cast<storage::IceDiskRelTable*>(relInfo.table) != nullptr) {
hasIceDiskTable = true;

if (columnarTable != nullptr) {
break;
}
}
if (hasArrowTable || hasIceDiskTable) {
break;
}
}

// IceDisk scan state extends the common rel scan state and Arrow stores its per-table state
// there, so one scan state can now cover IceDisk, Arrow, and native rel tables.
if (hasIceDiskTable) {
scanState =
std::make_unique<storage::IceDiskRelTableScanState>(*MemoryManager::Get(*clientContext),
boundNodeIDVector, outVectors, nbrNodeIDVector->state);
} else if (hasArrowTable) {
scanState =
std::make_unique<storage::ArrowRelTableScanState>(*MemoryManager::Get(*clientContext),
boundNodeIDVector, outVectors, nbrNodeIDVector->state);
if (columnarTable != nullptr) {
auto tableScanState = columnarTable->createScanState(boundNodeIDVector, outVectors,
MemoryManager::Get(*clientContext), nbrNodeIDVector->state);
scanState = std::unique_ptr<RelTableScanState>(
dynamic_cast<RelTableScanState*>(tableScanState.release()));
} else {
scanState = std::make_unique<RelTableScanState>(*MemoryManager::Get(*clientContext),
boundNodeIDVector, outVectors, nbrNodeIDVector->state);
Expand Down
41 changes: 10 additions & 31 deletions src/processor/operator/scan/scan_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ namespace processor {
static std::unique_ptr<TableScanState> createNodeTableScanState(NodeTable* table,
ValueVector* nodeIDVector, const std::vector<ValueVector*>& outVectors,
MemoryManager* memoryManager) {
if (dynamic_cast<IceDiskNodeTable*>(table) != nullptr) {
return std::make_unique<IceDiskNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
}
if (dynamic_cast<ArrowNodeTable*>(table) != nullptr) {
return std::make_unique<ArrowNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
if (dynamic_cast<ColumnarNodeTableBase*>(table) != nullptr) {
return table->cast<ColumnarNodeTableBase>().createScanState(nodeIDVector, outVectors,
memoryManager);
}

return std::make_unique<NodeTableScanState>(nodeIDVector, outVectors, nodeIDVector->state);
}

Expand Down Expand Up @@ -56,23 +53,8 @@ void ScanNodeTableSharedState::initialize(const transaction::Transaction* transa
// Initialize table-specific scan coordination (e.g., for IceDiskNodeTable)
table->initializeScanCoordination(transaction);

if (const auto iceDiskTable = dynamic_cast<IceDiskNodeTable*>(table)) {
// For ice-disk tables, set numCommittedNodeGroups to number of row groups
std::vector<bool> columnSkips;
try {
auto context = transaction->getClientContext();
auto resolvedPath =
common::VirtualFileSystem::resolvePath(context, iceDiskTable->getParquetFilePath());
auto tempReader =
std::make_unique<processor::ParquetReader>(resolvedPath, columnSkips, context);
this->numCommittedNodeGroups = tempReader->getNumRowGroups();
} catch (const std::exception& e) {
this->numCommittedNodeGroups = 1;
}
} else if (const auto arrowTable = dynamic_cast<ArrowNodeTable*>(table)) {
// For Arrow tables, set numCommittedNodeGroups to number of morsels
this->numCommittedNodeGroups =
static_cast<common::node_group_idx_t>(arrowTable->getNumScanMorsels(transaction));
if (const auto columnarNodeTable = dynamic_cast<ColumnarNodeTableBase*>(table)) {
this->numCommittedNodeGroups = columnarNodeTable->getNumScanMorsels(transaction);
} else {
this->numCommittedNodeGroups = table->getNumCommittedNodeGroups();
}
Expand All @@ -90,10 +72,8 @@ void ScanNodeTableSharedState::nextMorsel(TableScanState& scanState,
ScanNodeTableProgressSharedState& progressSharedState) {
std::unique_lock lck{mtx};

// ColumnarNodeTables handle morsel assignment internally
// TODO: icebug-disk tables https://github.com/LadybugDB/ladybug/issues/245
if (const auto arrowTable = dynamic_cast<ArrowNodeTable*>(this->table)) {
const auto tableSharedState = arrowTable->getTableScanSharedState();
if (const auto columnarTable = dynamic_cast<ColumnarNodeTableBase*>(this->table)) {
const auto tableSharedState = columnarTable->getTableScanSharedState();
if (tableSharedState->getNextMorsel(static_cast<ColumnarNodeTableScanState*>(&scanState))) {
scanState.source = TableScanSource::COMMITTED;
progressSharedState.numMorselsScanned++;
Expand Down Expand Up @@ -149,9 +129,8 @@ void ScanNodeTable::initCurrentTable(ExecutionContext* context) {
outVectors, MemoryManager::Get(*context->clientContext));
currentInfo.initScanState(*scanState, outVectors, context->clientContext);
scanState->semiMask = sharedStates[currentTableIdx]->getSemiMask();
// Call table->initScanState for IceDiskNodeTable or ArrowNodeTable
if (dynamic_cast<IceDiskNodeTable*>(tableInfos[currentTableIdx].table) ||
dynamic_cast<ArrowNodeTable*>(tableInfos[currentTableIdx].table)) {
// Call table->initScanState for ColumnarNodeTables
if (dynamic_cast<ColumnarNodeTableBase*>(tableInfos[currentTableIdx].table)) {
auto transaction = transaction::Transaction::Get(*context->clientContext);
tableInfos[currentTableIdx].table->initScanState(transaction, *scanState);
}
Expand Down
29 changes: 10 additions & 19 deletions src/processor/operator/scan/scan_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ namespace processor {
static std::unique_ptr<TableScanState> createSourceNodeTableScanState(NodeTable* table,
ValueVector* nodeIDVector, const std::vector<ValueVector*>& outVectors,
MemoryManager* memoryManager) {
if (dynamic_cast<IceDiskNodeTable*>(table) != nullptr) {
return std::make_unique<IceDiskNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
}
if (dynamic_cast<ArrowNodeTable*>(table) != nullptr) {
return std::make_unique<ArrowNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
if (dynamic_cast<ColumnarNodeTableBase*>(table) != nullptr) {
return table->cast<ColumnarNodeTableBase>().createScanState(nodeIDVector, outVectors,
memoryManager);
}
return std::make_unique<NodeTableScanState>(nodeIDVector, outVectors, nodeIDVector->state);
}
Expand Down Expand Up @@ -91,17 +87,13 @@ void ScanRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext
auto boundNodeIDVector = resultSet->getValueVector(opInfo.nodeIDPos).get();
auto nbrNodeIDVector = outVectors[0];
// Check if this is an external rel table and create the corresponding scan state.
auto* arrowTable = dynamic_cast<storage::ArrowRelTable*>(tableInfo.table);
auto* iceDiskTable = dynamic_cast<storage::IceDiskRelTable*>(tableInfo.table);
auto* columnarTable = dynamic_cast<storage::ColumnarRelTableBase*>(tableInfo.table);
auto* foreignTable = dynamic_cast<storage::ForeignRelTable*>(tableInfo.table);
if (arrowTable) {
scanState =
std::make_unique<storage::ArrowRelTableScanState>(*MemoryManager::Get(*clientContext),
boundNodeIDVector, outVectors, nbrNodeIDVector->state);
} else if (iceDiskTable) {
scanState =
std::make_unique<storage::IceDiskRelTableScanState>(*MemoryManager::Get(*clientContext),
boundNodeIDVector, outVectors, nbrNodeIDVector->state);
if (columnarTable) {
auto tableScanState = columnarTable->createScanState(boundNodeIDVector, outVectors,
MemoryManager::Get(*clientContext), nbrNodeIDVector->state);
scanState = std::unique_ptr<RelTableScanState>(
dynamic_cast<RelTableScanState*>(tableScanState.release()));
} else if (foreignTable) {
scanState =
std::make_unique<storage::ForeignRelTableScanState>(*MemoryManager::Get(*clientContext),
Expand Down Expand Up @@ -142,8 +134,7 @@ static void initSourceNodeScanState(ScanNodeTableInfo& sourceInfo,
sourceScanState = createSourceNodeTableScanState(sourceInfo.table->ptrCast<NodeTable>(),
boundNodeIDVector, sourceNodeOutVectors, MemoryManager::Get(*context));
sourceInfo.initScanState(*sourceScanState, sourceNodeOutVectors, context);
if (dynamic_cast<IceDiskNodeTable*>(sourceInfo.table) ||
dynamic_cast<ArrowNodeTable*>(sourceInfo.table)) {
if (dynamic_cast<ColumnarNodeTableBase*>(sourceInfo.table)) {
sourceInfo.table->initScanState(transaction::Transaction::Get(*context), *sourceScanState);
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/storage/table/arrow_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ ArrowNodeTable::~ArrowNodeTable() {
}
}

std::unique_ptr<TableScanState> ArrowNodeTable::createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager) const {
return std::make_unique<ArrowNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
}

void ArrowNodeTable::initializeScanCoordination(const transaction::Transaction* transaction) {
auto arrowScanSharedState =
static_cast<ArrowNodeTableScanSharedState*>(tableScanSharedState.get());
Expand Down Expand Up @@ -135,11 +141,6 @@ bool ArrowNodeTable::scanInternal([[maybe_unused]] transaction::Transaction* tra
return true;
}

common::node_group_idx_t ArrowNodeTable::getNumBatches(
[[maybe_unused]] const transaction::Transaction* transaction) const {
return arrays.size();
}

common::row_idx_t ArrowNodeTable::getTotalRowCount(
[[maybe_unused]] const transaction::Transaction* transaction) const {
return totalRows;
Expand All @@ -156,9 +157,9 @@ std::vector<size_t> ArrowNodeTable::getBatchSizes(
return batchSizes;
}

size_t ArrowNodeTable::getNumScanMorsels(
common::node_group_idx_t ArrowNodeTable::getNumScanMorsels(
[[maybe_unused]] const transaction::Transaction* transaction) const {
size_t numMorsels = 0;
common::node_group_idx_t numMorsels = 0;
for (const auto& array : arrays) {
auto batchLength = getArrowBatchLength(array);
numMorsels += (batchLength + scanMorselSize - 1) / scanMorselSize;
Expand Down
7 changes: 7 additions & 0 deletions src/storage/table/arrow_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ static int64_t findColumnIdx(const ArrowSchemaWrapper& schema, const std::string
return -1;
}

std::unique_ptr<TableScanState> ArrowRelTable::createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager,
std::shared_ptr<common::DataChunkState> outChunkState) const {
return std::make_unique<storage::ArrowRelTableScanState>(*memoryManager, nodeIDVector,
outVectors, outChunkState);
}

void ArrowRelTableScanState::setToTable(const transaction::Transaction* transaction, Table* table_,
std::vector<column_id_t> columnIDs_, std::vector<ColumnPredicateSet> columnPredicateSets_,
RelDataDirection direction_) {
Expand Down
15 changes: 11 additions & 4 deletions src/storage/table/ice_disk_node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ IceDiskNodeTable::IceDiskNodeTable(const StorageManager* storageManager,
parquetFilePath = resolvedPath;
}

std::unique_ptr<TableScanState> IceDiskNodeTable::createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager) const {
return std::make_unique<IceDiskNodeTableScanState>(*memoryManager, nodeIDVector, outVectors,
nodeIDVector->state);
}

void IceDiskNodeTable::initializeScanCoordination(const transaction::Transaction* transaction) {
auto iceDiskScanSharedState =
static_cast<IceDiskNodeTableScanSharedState*>(tableScanSharedState.get());
auto numBatches = getNumBatches(transaction);
iceDiskScanSharedState->reset(numBatches);
auto numMorsels = getNumScanMorsels(transaction);
iceDiskScanSharedState->reset(numMorsels);
}

void IceDiskNodeTable::initScanState(Transaction* transaction, TableScanState& scanState,
Expand Down Expand Up @@ -85,7 +91,8 @@ void IceDiskNodeTable::initScanState(Transaction* transaction, TableScanState& s
initParquetScanForRowGroup(transaction, iceDiskScanState);
}

common::node_group_idx_t IceDiskNodeTable::getNumBatches(const Transaction* transaction) const {
common::node_group_idx_t IceDiskNodeTable::getNumScanMorsels(
const transaction::Transaction* transaction) const {
auto context = transaction->getClientContext();
if (!context) {
return 1;
Expand All @@ -94,7 +101,7 @@ common::node_group_idx_t IceDiskNodeTable::getNumBatches(const Transaction* tran
std::vector<bool> columnSkips;
try {
auto tempReader = std::make_unique<ParquetReader>(parquetFilePath, columnSkips, context);
return tempReader->getNumRowGroups();
return static_cast<common::node_group_idx_t>(tempReader->getNumRowGroups());
} catch (const std::exception& e) {
return 1; // Fallback
}
Expand Down
7 changes: 7 additions & 0 deletions src/storage/table/ice_disk_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ using namespace lbug::transaction;
namespace lbug {
namespace storage {

std::unique_ptr<TableScanState> IceDiskRelTable::createScanState(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outVectors, MemoryManager* memoryManager,
std::shared_ptr<common::DataChunkState> outChunkState) const {
return std::make_unique<storage::IceDiskRelTableScanState>(*memoryManager, nodeIDVector,
outVectors, outChunkState);
}

void IceDiskRelTableScanState::setToTable(const Transaction* transaction, Table* table_,
std::vector<column_id_t> columnIDs_, std::vector<ColumnPredicateSet> columnPredicateSets_,
RelDataDirection direction_) {
Expand Down
Loading