Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ go 1.23.0
toolchain go1.23.6

require (
github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d
github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6
github.com/aws/aws-sdk-go-v2 v1.36.2
github.com/aws/aws-sdk-go-v2/config v1.29.7
github.com/aws/aws-sdk-go-v2/credentials v1.17.60
Expand All @@ -45,7 +45,6 @@ require (
github.com/uptrace/bun/driver/sqliteshim v1.2.10
github.com/uptrace/bun/extra/bundebug v1.2.10
gocloud.dev v0.40.0
golang.org/x/exp v0.0.0-20250215185904-eff6e970281f
golang.org/x/sync v0.11.0
google.golang.org/api v0.222.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -103,7 +102,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
Expand Down Expand Up @@ -136,6 +135,7 @@ require (
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
golang.org/x/crypto v0.33.0 // indirect
golang.org/x/exp v0.0.0-20250215185904-eff6e970281f // indirect
golang.org/x/mod v0.23.0 // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/oauth2 v0.26.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7X
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d h1:OL//u0ke+2Dld3s5szg3KC7ZgwaATfdmF5LyxpRxtC8=
github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d/go.mod h1:6vlLiEonoJLH6uX9X84ki6mZlOPjuVC4qoQkxsLzwd8=
github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6 h1:RWgwATcu+8ZR65fmb/esaQkQqoCBhiRnB6WPMfAxJOs=
github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6/go.mod h1:ev6qDk3+bf7eh/goQbjA4ZViRDhu+9pmad8YTLvupIg=
github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
Expand Down Expand Up @@ -188,8 +188,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
Expand Down
1 change: 1 addition & 0 deletions io/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func createS3Bucket(ctx context.Context, parsed *url.URL, props map[string]strin
o.BaseEndpoint = aws.String(endpoint)
}
o.UsePathStyle = usePathStyle
o.DisableLogOutputChecksumValidationSkipped = true
})

// Create a *blob.Bucket.
Expand Down
4 changes: 4 additions & 0 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (s *Schema) AsStruct() StructType { return StructType{FieldList: s.field
func (s *Schema) NumFields() int { return len(s.fields) }
func (s *Schema) Field(i int) NestedField { return s.fields[i] }
func (s *Schema) Fields() []NestedField { return slices.Clone(s.fields) }
func (s *Schema) FieldIDs() []int {
idx, _ := s.lazyNameToID()
return slices.Collect(maps.Values(idx))
}

func (s *Schema) UnmarshalJSON(b []byte) error {
type Alias Schema
Expand Down
18 changes: 11 additions & 7 deletions table/arrow_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,12 @@ type arrowScan struct {

func (as *arrowScan) projectedFieldIDs() (set[int], error) {
idset := set[int]{}
for _, field := range as.projectedSchema.Fields() {
switch field.Type.(type) {
for _, id := range as.projectedSchema.FieldIDs() {
typ, _ := as.projectedSchema.FindTypeByID(id)
switch typ.(type) {
case *iceberg.MapType, *iceberg.ListType:
default:
idset[field.ID] = struct{}{}
idset[id] = struct{}{}
}
}

Expand Down Expand Up @@ -435,7 +436,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat
return
}

func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelFunc, rowLimit int64) iter.Seq2[arrow.Record, error] {
func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.Record, error] {
isBeforeAny := func(batch enumeratedRecord) bool {
return batch.Task.Index < 0
}
Expand Down Expand Up @@ -484,11 +485,14 @@ func createIterator(ctx context.Context, numWorkers uint, records <-chan enumera
}
}()

defer cancel()
defer cancel(nil)

for {
select {
case <-ctx.Done():
if err := context.Cause(ctx); err != nil {
yield(nil, err)
}
return
case enum, ok := <-sequenced:
if !ok {
Expand Down Expand Up @@ -531,7 +535,7 @@ func createIterator(ctx context.Context, numWorkers uint, records <-chan enumera
func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks []FileScanTask, deletesPerFile perFilePosDeletes) iter.Seq2[arrow.Record, error] {
extSet := substrait.NewExtensionSet()

ctx, cancel := context.WithCancel(exprs.WithExtensionIDSet(ctx, extSet))
ctx, cancel := context.WithCancelCause(exprs.WithExtensionIDSet(ctx, extSet))
taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))

// numWorkers := 1
Expand All @@ -554,7 +558,7 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks

if err := as.recordsFromTask(ctx, task, records,
deletesPerFile[task.Value.File.FilePath()]); err != nil {
cancel()
cancel(err)
return
}
}
Expand Down
34 changes: 21 additions & 13 deletions table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,10 @@ func (c convertToArrow) Field(field iceberg.NestedField, result arrow.Field) arr

func (c convertToArrow) List(list iceberg.ListType, elemResult arrow.Field) arrow.Field {
elemField := c.Field(list.ElementField(), elemResult)
return arrow.Field{Type: arrow.LargeListOfField(elemField)}
if c.useLargeTypes {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was pretty vocal on this one on the Python side. I strongly believe that we should not expose the options of large types to the user, and that's the direction that we're heading with PyIceberg. In the end, it is up to Arrow to decide if you need large types, or if small types are sufficient.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually pretty difficult to do, and would make schema conversion inconsistent.

Determining whether to use the large types cannot be determined until you have the data at which point, if you're streaming the data, it's too late to switch as you can't safely change the schema during a stream. For example:

  1. Start reading parquet file with string column, total column data in this file is only 100MB so we use a regular string (small type) and start streaming record batches
  2. one of the last files has 3GB of raw data in the string column so we use LargeString for that column from that file
  3. We can't cast the LargeString to String, we can't change the schema of the stream to change the column to LargeString, so now we're stuck.

The same problem can occur for List/LargeList depending on the number of total elements in a given column among the lists.

We also can't determine ahead of time by the stats in the iceberg metadata alone whether or not we should use Large types or not. The only way to know ahead of time is to read the parquet file metadata for every data file before we start producing record batches and then reconcile whether or not we should use large types before we start processing the files.

It looks like in the pyiceberg PR you linked, if i'm reading it correctly, you just automatically push everything to large types for streaming to avoid the problems I mentioned above? which kinda defeats the benefits if the goal was to avoid using LargeTypes when they aren't needed (in most cases they aren't needed).

return arrow.Field{Type: arrow.LargeListOfField(elemField)}
}
return arrow.Field{Type: arrow.ListOfField(elemField)}
}

func (c convertToArrow) Map(m iceberg.MapType, keyResult, valResult arrow.Field) arrow.Field {
Expand Down Expand Up @@ -589,9 +592,9 @@ func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string, include
// TypeToArrowType converts a given iceberg type, into the equivalent Arrow data type.
// For dealing with nested fields (List, Struct, Map) if includeFieldIDs is true, then
// the child fields will contain a metadata key PARQUET:field_id set to the field id.
func TypeToArrowType(t iceberg.Type, includeFieldIDs bool) (arrow.DataType, error) {
func TypeToArrowType(t iceberg.Type, includeFieldIDs bool, useLargeTypes bool) (arrow.DataType, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit add useLargeTypes to docstring above

top, err := iceberg.Visit(iceberg.NewSchema(0, iceberg.NestedField{Type: t}),
convertToArrow{includeFieldIDs: includeFieldIDs})
convertToArrow{includeFieldIDs: includeFieldIDs, useLargeTypes: useLargeTypes})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -677,7 +680,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals ar

if !field.Type.Equals(typ) {
promoted := retOrPanic(iceberg.PromoteType(fileField.Type, field.Type))
targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs))
targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs, a.useLargeTypes))
if !a.useLargeTypes {
targetType = retOrPanic(ensureSmallArrowTypes(targetType))
}
Expand All @@ -686,7 +689,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals ar
compute.SafeCastOptions(targetType)))
}

targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs))
targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs, a.useLargeTypes))
if !arrow.TypeEqual(targetType, vals.DataType()) {
switch field.Type.(type) {
case iceberg.TimestampType:
Expand Down Expand Up @@ -756,12 +759,16 @@ func (a *arrowProjectionVisitor) Struct(st iceberg.StructType, structArr arrow.A
for i, field := range st.FieldList {
arr := fieldResults[i]
if arr != nil {
if _, ok := arr.DataType().(arrow.NestedType); ok {
defer arr.Release()
}

arr = a.castIfNeeded(field, arr)
defer arr.Release()
fieldArrs[i] = arr
fields[i] = a.constructField(field, arr.DataType())
} else if !field.Required {
dt := retOrPanic(TypeToArrowType(field.Type, false))
dt := retOrPanic(TypeToArrowType(field.Type, false, a.useLargeTypes))

arr = array.MakeArrayOfNull(compute.GetAllocator(a.ctx), dt, structArr.Len())
defer arr.Release()
Expand All @@ -786,11 +793,11 @@ func (a *arrowProjectionVisitor) List(listType iceberg.ListType, listArr arrow.A
return nil
}

valueArr := a.castIfNeeded(listType.ElementField(), valArr)
defer valueArr.Release()
valArr = a.castIfNeeded(listType.ElementField(), valArr)
defer valArr.Release()

var outType arrow.ListLikeType
elemField := a.constructField(listType.ElementField(), valueArr.DataType())
elemField := a.constructField(listType.ElementField(), valArr.DataType())
switch arr.DataType().ID() {
case arrow.LIST:
outType = arrow.ListOfField(elemField)
Expand All @@ -801,7 +808,7 @@ func (a *arrowProjectionVisitor) List(listType iceberg.ListType, listArr arrow.A
}

data := array.NewData(outType, arr.Len(), arr.Data().Buffers(),
[]arrow.ArrayData{valueArr.Data()}, arr.NullN(), arr.Data().Offset())
[]arrow.ArrayData{valArr.Data()}, arr.NullN(), arr.Data().Offset())
defer data.Release()
return array.MakeFromData(data)
}
Expand Down Expand Up @@ -855,7 +862,8 @@ func ToRequestedSchema(ctx context.Context, requested, fileSchema *iceberg.Schem
if err != nil {
return nil, err
}
defer result.Release()

return array.RecordFromStructArray(result.(*array.Struct), nil), nil
st.Release()
out := array.RecordFromStructArray(result.(*array.Struct), nil)
result.Release()
return out, nil
}
43 changes: 40 additions & 3 deletions table/arrow_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package table_test

import (
"bufio"
"context"
"strings"
"testing"
Expand Down Expand Up @@ -114,7 +115,7 @@ func TestArrowToIceberg(t *testing.T) {
ElementID: 1,
Element: iceberg.PrimitiveTypes.Int32,
ElementRequired: true,
}, false, ""},
}, true, ""},
{arrow.LargeListOfField(arrow.Field{
Name: "element",
Type: arrow.PrimitiveTypes.Int32,
Expand All @@ -124,7 +125,7 @@ func TestArrowToIceberg(t *testing.T) {
ElementID: 1,
Element: iceberg.PrimitiveTypes.Int32,
ElementRequired: true,
}, true, ""},
}, false, ""},
{arrow.FixedSizeListOfField(1, arrow.Field{
Name: "element",
Type: arrow.PrimitiveTypes.Int32,
Expand Down Expand Up @@ -164,7 +165,7 @@ func TestArrowToIceberg(t *testing.T) {
}

if tt.reciprocal {
result, err := table.TypeToArrowType(tt.ice, true)
result, err := table.TypeToArrowType(tt.ice, true, false)
require.NoError(t, err)
assert.True(t, arrow.TypeEqual(tt.dt, result), tt.dt.String(), result.String())
}
Expand Down Expand Up @@ -510,3 +511,39 @@ func TestToRequestedSchemaTimestamps(t *testing.T) {
}
}
}

func TestToRequestedSchema(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

schema := arrow.NewSchema([]arrow.Field{
{
Name: "nested", Type: arrow.ListOfField(arrow.Field{
Name: "element", Type: arrow.PrimitiveTypes.Int32, Nullable: false,
Metadata: arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"2"}),
}),
Metadata: arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"1"})},
}, nil)

bldr := array.NewRecordBuilder(mem, schema)
defer bldr.Release()

const data = `{"nested": [1, 2, 3]}
{"nested": [4, 5, 6]}`

s := bufio.NewScanner(strings.NewReader(data))
require.True(t, s.Scan())
require.NoError(t, bldr.UnmarshalJSON(s.Bytes()))

rec := bldr.NewRecord()
defer rec.Release()

icesc, err := table.ArrowSchemaToIceberg(schema, false, nil)
require.NoError(t, err)

rec2, err := table.ToRequestedSchema(context.Background(), icesc, icesc, rec, true, true, false)
require.NoError(t, err)
defer rec2.Release()

assert.True(t, array.RecordEqual(rec, rec2))
}
11 changes: 7 additions & 4 deletions table/internal/parquet_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func visitParquetManifestStruct[T any](field pqarrow.SchemaField, visitor manife
results := make([]T, len(field.Children))

for i, f := range field.Children {
results[i] = visitManifestField(f, visitor)
res := visitManifestField(f, visitor)
results[i] = visitor.Field(f, res)
}

return visitor.Struct(field, results)
Expand Down Expand Up @@ -189,7 +190,7 @@ func pruneParquetColumns(manifest *pqarrow.SchemaManifest, selected map[int]stru
indices: []int{},
}

result, err := visitParquetManifest[arrow.Field](manifest, visitor)
result, err := visitParquetManifest(manifest, visitor)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -335,7 +336,7 @@ func (p *pruneParquetSchema) List(field pqarrow.SchemaField, elemResult arrow.Fi
panic(fmt.Errorf("cannot explicitly project list or map types"))
}

p.indices = append(p.indices, field.ColIndex)
p.indices = append(p.indices, field.Children[0].ColIndex)
return *field.Field
}

Expand Down Expand Up @@ -371,10 +372,12 @@ func (p *pruneParquetSchema) Map(field pqarrow.SchemaField, keyResult, valResult
panic("cannot explicitly project list or map types")
}

p.indices = append(p.indices, field.Children[0].Children[0].ColIndex)
p.indices = append(p.indices, field.Children[0].Children[1].ColIndex)
return *field.Field
}

func (p *pruneParquetSchema) Primitive(field pqarrow.SchemaField) arrow.Field {
func (p *pruneParquetSchema) Primitive(_ pqarrow.SchemaField) arrow.Field {
return arrow.Field{}
}

Expand Down
17 changes: 17 additions & 0 deletions table/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,23 @@ func (s *ScannerSuite) TestPartitionedTables() {
}
}

func (s *ScannerSuite) TestNestedColumns() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(s.T(), 0)

ident := catalog.ToIdentifier("default", "test_all_types")
Comment thread
kevinjqliu marked this conversation as resolved.

tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
s.Require().NoError(err)

ctx := compute.WithAllocator(s.ctx, mem)
results, err := tbl.Scan().ToArrowTable(ctx)
s.Require().NoError(err)
defer results.Release()

s.EqualValues(5, results.NumRows())
}

func (s *ScannerSuite) TestUnpartitionedUUIDTable() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(s.T(), 0)
Expand Down