Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions table/arrow_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package table

import (
"context"
"fmt"
"io"
"iter"
"strconv"
Expand Down Expand Up @@ -753,6 +754,14 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks
}

func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask) (*arrow.Schema, iter.Seq2[arrow.RecordBatch, error], error) {
for _, t := range tasks {
if len(t.DeletionVectorFiles) > 0 {
return nil, nil, fmt.Errorf(
"%w: deletion vector read is not yet implemented, data file: %s has %d deletion vector(s)",
iceberg.ErrNotImplemented, t.File.FilePath(), len(t.DeletionVectorFiles))
}
}

var err error
as.useLargeTypes, err = strconv.ParseBool(as.options.Get(ScanOptionArrowUseLargeTypes, "false"))
if err != nil {
Expand Down
263 changes: 263 additions & 0 deletions table/dv_scan_planning_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package table

import (
"testing"

"github.com/apache/iceberg-go"
"github.com/stretchr/testify/assert"
)

// dvMockDataFile extends mockDataFile with DV fields.
type dvMockDataFile struct {
mockDataFile
referencedDataFile *string
contentOffset *int64
contentSizeInBytes *int64
}

func (d *dvMockDataFile) ReferencedDataFile() *string { return d.referencedDataFile }
func (d *dvMockDataFile) ContentOffset() *int64 { return d.contentOffset }
func (d *dvMockDataFile) ContentSizeInBytes() *int64 { return d.contentSizeInBytes }

func strPtr(s string) *string { return &s }
func int64Ptr(v int64) *int64 { return &v }

func TestIsDeletionVector(t *testing.T) {
tests := []struct {
name string
df iceberg.DataFile
expected bool
}{
{
name: "regular data file",
df: &mockDataFile{
path: "s3://bucket/data/file.parquet",
contentType: iceberg.EntryContentData,
},
expected: false,
},
{
name: "regular position delete file",
df: &mockDataFile{
path: "s3://bucket/data/pos-del.parquet",
contentType: iceberg.EntryContentPosDeletes,
},
expected: false,
},
{
name: "deletion vector",
df: &dvMockDataFile{
mockDataFile: mockDataFile{
path: "s3://bucket/data/dv.puffin",
contentType: iceberg.EntryContentPosDeletes,
},
referencedDataFile: strPtr("s3://bucket/data/file.parquet"),
contentOffset: int64Ptr(100),
contentSizeInBytes: int64Ptr(256),
},
expected: true,
},
{
name: "equality delete file (never a DV)",
df: &mockDataFile{
path: "s3://bucket/data/eq-del.parquet",
contentType: iceberg.EntryContentEqDeletes,
},
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, isDeletionVector(tt.df))
})
}
}

func TestManifestEntries_DVClassification(t *testing.T) {
entries := newManifestEntries()
snapshotID := int64(1)

// Data entry
dataFile := &mockDataFile{
path: "s3://bucket/data/data-001.parquet",
contentType: iceberg.EntryContentData,
}
entries.addDataEntry(iceberg.NewManifestEntry(
iceberg.EntryStatusADDED, &snapshotID, nil, nil, dataFile))

// Regular position delete entry
posDelFile := &mockDataFile{
path: "s3://bucket/data/pos-del-001.parquet",
contentType: iceberg.EntryContentPosDeletes,
}
entries.addPositionalDeleteEntry(iceberg.NewManifestEntry(
iceberg.EntryStatusADDED, &snapshotID, nil, nil, posDelFile))

// DV entry
dvFile := &dvMockDataFile{
mockDataFile: mockDataFile{
path: "s3://bucket/data/dv-001.puffin",
contentType: iceberg.EntryContentPosDeletes,
},
referencedDataFile: strPtr("s3://bucket/data/data-001.parquet"),
contentOffset: int64Ptr(0),
contentSizeInBytes: int64Ptr(128),
}
entries.addDVEntry(iceberg.NewManifestEntry(
iceberg.EntryStatusADDED, &snapshotID, nil, nil, dvFile))

// Equality delete entry
eqDelFile := &mockDataFile{
path: "s3://bucket/data/eq-del-001.parquet",
contentType: iceberg.EntryContentEqDeletes,
}
entries.addEqualityDeleteEntry(iceberg.NewManifestEntry(
iceberg.EntryStatusADDED, &snapshotID, nil, nil, eqDelFile))

assert.Len(t, entries.dataEntries, 1)
assert.Len(t, entries.positionalDeleteEntries, 1)
assert.Len(t, entries.dvEntries, 1)
assert.Len(t, entries.equalityDeleteEntries, 1)
}

func TestDVMatchingToDataFiles(t *testing.T) {
dataFilePath := "s3://bucket/data/data-001.parquet"
otherDataFilePath := "s3://bucket/data/data-002.parquet"

dvForData1 := &dvMockDataFile{
mockDataFile: mockDataFile{
path: "s3://bucket/data/dv-001.puffin",
contentType: iceberg.EntryContentPosDeletes,
},
referencedDataFile: strPtr(dataFilePath),
contentOffset: int64Ptr(0),
contentSizeInBytes: int64Ptr(128),
}

dvForData2 := &dvMockDataFile{
mockDataFile: mockDataFile{
path: "s3://bucket/data/dv-002.puffin",
contentType: iceberg.EntryContentPosDeletes,
},
referencedDataFile: strPtr(otherDataFilePath),
contentOffset: int64Ptr(0),
contentSizeInBytes: int64Ptr(64),
}

snapshotID := int64(1)
dvEntries := []iceberg.ManifestEntry{
iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, nil, nil, dvForData1),
iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, nil, nil, dvForData2),
}

// Match DVs against data-001 — should only get dv-001
var matched []iceberg.DataFile
for _, del := range dvEntries {
if del.DataFile().ReferencedDataFile() == nil {
continue
}

if *del.DataFile().ReferencedDataFile() == dataFilePath {
matched = append(matched, del.DataFile())
}
}

assert.Len(t, matched, 1)
assert.Equal(t, dvForData1.path, matched[0].FilePath())

// Match DVs against data-002 — should only get dv-002
var matched2 []iceberg.DataFile
for _, del := range dvEntries {
if del.DataFile().ReferencedDataFile() == nil {
continue
}

if *del.DataFile().ReferencedDataFile() == otherDataFilePath {
matched2 = append(matched2, del.DataFile())
}
}

assert.Len(t, matched2, 1)
assert.Equal(t, dvForData2.path, matched2[0].FilePath())
}

func TestDVMatchingNoMatch(t *testing.T) {
dvFile := &dvMockDataFile{
mockDataFile: mockDataFile{
path: "s3://bucket/data/dv-001.puffin",
contentType: iceberg.EntryContentPosDeletes,
},
referencedDataFile: strPtr("s3://bucket/data/data-999.parquet"),
contentOffset: int64Ptr(0),
contentSizeInBytes: int64Ptr(128),
}

snapshotID := int64(1)
dvEntries := []iceberg.ManifestEntry{
iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapshotID, nil, nil, dvFile),
}

var matched []iceberg.DataFile
for _, del := range dvEntries {
if del.DataFile().ReferencedDataFile() == nil {
continue
}

if *del.DataFile().ReferencedDataFile() == "s3://bucket/data/data-001.parquet" {
matched = append(matched, del.DataFile())
}
}

assert.Empty(t, matched)
}

func TestFileScanTask_DeletionVectorFilesField(t *testing.T) {
dataFile := &mockDataFile{
path: "s3://bucket/data/data-001.parquet",
contentType: iceberg.EntryContentData,
filesize: 1024,
}

dvFile := &dvMockDataFile{
mockDataFile: mockDataFile{
path: "s3://bucket/data/dv-001.puffin",
contentType: iceberg.EntryContentPosDeletes,
},
referencedDataFile: strPtr("s3://bucket/data/data-001.parquet"),
contentOffset: int64Ptr(0),
contentSizeInBytes: int64Ptr(128),
}

task := FileScanTask{
File: dataFile,
DeletionVectorFiles: []iceberg.DataFile{dvFile},
Start: 0,
Length: 1024,
}

assert.Equal(t, dataFile, task.File)
assert.Len(t, task.DeletionVectorFiles, 1)
assert.Equal(t, dvFile, task.DeletionVectorFiles[0])
assert.NotNil(t, task.DeletionVectorFiles[0].ReferencedDataFile())
assert.Equal(t, "s3://bucket/data/data-001.parquet", *task.DeletionVectorFiles[0].ReferencedDataFile())
assert.Empty(t, task.DeleteFiles)
assert.Empty(t, task.EqualityDeleteFiles)
}
28 changes: 27 additions & 1 deletion table/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type manifestEntries struct {
dataEntries []iceberg.ManifestEntry
positionalDeleteEntries []iceberg.ManifestEntry
equalityDeleteEntries []iceberg.ManifestEntry
dvEntries []iceberg.ManifestEntry
mu sync.Mutex
}

Expand All @@ -106,6 +107,7 @@ func newManifestEntries() *manifestEntries {
dataEntries: make([]iceberg.ManifestEntry, 0),
positionalDeleteEntries: make([]iceberg.ManifestEntry, 0),
equalityDeleteEntries: make([]iceberg.ManifestEntry, 0),
dvEntries: make([]iceberg.ManifestEntry, 0),
}
}

Expand All @@ -127,6 +129,12 @@ func (m *manifestEntries) addEqualityDeleteEntry(e iceberg.ManifestEntry) {
m.equalityDeleteEntries = append(m.equalityDeleteEntries, e)
}

func (m *manifestEntries) addDVEntry(e iceberg.ManifestEntry) {
m.mu.Lock()
defer m.mu.Unlock()
m.dvEntries = append(m.dvEntries, e)
}

func newPartitionRecord(partitionData map[int]any, partitionType *iceberg.StructType) partitionRecord {
out := make(partitionRecord, len(partitionType.FieldList))
for i, f := range partitionType.FieldList {
Expand Down Expand Up @@ -170,6 +178,10 @@ func openManifest(io io.IO, manifest iceberg.ManifestFile,
return out, nil
}

func isDeletionVector(df iceberg.DataFile) bool {
return df.ReferencedDataFile() != nil
}

type Scan struct {
metadata Metadata
ioF FSysF
Expand Down Expand Up @@ -473,7 +485,11 @@ func (scan *Scan) collectManifestEntries(
case iceberg.EntryContentData:
entries.addDataEntry(e)
case iceberg.EntryContentPosDeletes:
entries.addPositionalDeleteEntry(e)
if isDeletionVector(e.DataFile()) {
entries.addDVEntry(e)
} else {
entries.addPositionalDeleteEntry(e)
}
case iceberg.EntryContentEqDeletes:
entries.addEqualityDeleteEntry(e)
default:
Expand Down Expand Up @@ -530,6 +546,14 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
return cmp.Compare(a.SequenceNum(), b.SequenceNum())
})

// Index DVs by referenced data file path for O(1) lookup.
dvIndex := make(map[string][]iceberg.DataFile, len(entries.dvEntries))
for _, del := range entries.dvEntries {
if ref := del.DataFile().ReferencedDataFile(); ref != nil {
dvIndex[*ref] = append(dvIndex[*ref], del.DataFile())
}
}

results := make([]FileScanTask, 0, len(entries.dataEntries))
for _, e := range entries.dataEntries {
deleteFiles, err := matchDeletesToData(e, entries.positionalDeleteEntries)
Expand All @@ -543,6 +567,7 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
File: e.DataFile(),
DeleteFiles: deleteFiles,
EqualityDeleteFiles: eqDeleteFiles,
DeletionVectorFiles: dvIndex[e.DataFile().FilePath()],
Start: 0,
Length: e.DataFile().FileSizeBytes(),
})
Expand All @@ -555,6 +580,7 @@ type FileScanTask struct {
File iceberg.DataFile
DeleteFiles []iceberg.DataFile // positional delete files
EqualityDeleteFiles []iceberg.DataFile // equality delete files
DeletionVectorFiles []iceberg.DataFile // deletion vectors (puffin files)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the actual scanner will ignore these, we should probably also update the scanner to error if this is non-empty just so people don't think it's working.

Start, Length int64
}

Expand Down
Loading