-
Notifications
You must be signed in to change notification settings - Fork 170
fix(table/scanner): Fix nested field scan #311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
c1971bf
fix(table/scanner): fix scanning of List/Map typed fields.
zeroshade 5ad625d
point at Arrow fork for now
zeroshade 6f83075
proper error propagation
zeroshade 99ab06f
update large types
zeroshade 120796f
fix pruning
zeroshade a9bc6dd
point at updated main branch for arrow-go
zeroshade f0d787f
Merge branch 'main' into fix-nested-field-scan
zeroshade 3c08737
run go mod tidy
zeroshade d302cae
use updated arrow version
zeroshade 45cdb9f
disable S3 checksum log warning
zeroshade a1fa78a
re generate go.sum
zeroshade File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -497,7 +497,10 @@ func (c convertToArrow) Field(field iceberg.NestedField, result arrow.Field) arr | |
|
|
||
| func (c convertToArrow) List(list iceberg.ListType, elemResult arrow.Field) arrow.Field { | ||
| elemField := c.Field(list.ElementField(), elemResult) | ||
| return arrow.Field{Type: arrow.LargeListOfField(elemField)} | ||
| if c.useLargeTypes { | ||
| return arrow.Field{Type: arrow.LargeListOfField(elemField)} | ||
| } | ||
| return arrow.Field{Type: arrow.ListOfField(elemField)} | ||
| } | ||
|
|
||
| func (c convertToArrow) Map(m iceberg.MapType, keyResult, valResult arrow.Field) arrow.Field { | ||
|
|
@@ -589,9 +592,9 @@ func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string, include | |
| // TypeToArrowType converts a given iceberg type, into the equivalent Arrow data type. | ||
| // For dealing with nested fields (List, Struct, Map) if includeFieldIDs is true, then | ||
| // the child fields will contain a metadata key PARQUET:field_id set to the field id. | ||
| func TypeToArrowType(t iceberg.Type, includeFieldIDs bool) (arrow.DataType, error) { | ||
| func TypeToArrowType(t iceberg.Type, includeFieldIDs bool, useLargeTypes bool) (arrow.DataType, error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit add |
||
| top, err := iceberg.Visit(iceberg.NewSchema(0, iceberg.NestedField{Type: t}), | ||
| convertToArrow{includeFieldIDs: includeFieldIDs}) | ||
| convertToArrow{includeFieldIDs: includeFieldIDs, useLargeTypes: useLargeTypes}) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -677,7 +680,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals ar | |
|
|
||
| if !field.Type.Equals(typ) { | ||
| promoted := retOrPanic(iceberg.PromoteType(fileField.Type, field.Type)) | ||
| targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs)) | ||
| targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs, a.useLargeTypes)) | ||
| if !a.useLargeTypes { | ||
| targetType = retOrPanic(ensureSmallArrowTypes(targetType)) | ||
| } | ||
|
|
@@ -686,7 +689,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals ar | |
| compute.SafeCastOptions(targetType))) | ||
| } | ||
|
|
||
| targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs)) | ||
| targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs, a.useLargeTypes)) | ||
| if !arrow.TypeEqual(targetType, vals.DataType()) { | ||
| switch field.Type.(type) { | ||
| case iceberg.TimestampType: | ||
|
|
@@ -756,12 +759,16 @@ func (a *arrowProjectionVisitor) Struct(st iceberg.StructType, structArr arrow.A | |
| for i, field := range st.FieldList { | ||
| arr := fieldResults[i] | ||
| if arr != nil { | ||
| if _, ok := arr.DataType().(arrow.NestedType); ok { | ||
| defer arr.Release() | ||
| } | ||
|
|
||
| arr = a.castIfNeeded(field, arr) | ||
| defer arr.Release() | ||
| fieldArrs[i] = arr | ||
| fields[i] = a.constructField(field, arr.DataType()) | ||
| } else if !field.Required { | ||
| dt := retOrPanic(TypeToArrowType(field.Type, false)) | ||
| dt := retOrPanic(TypeToArrowType(field.Type, false, a.useLargeTypes)) | ||
|
|
||
| arr = array.MakeArrayOfNull(compute.GetAllocator(a.ctx), dt, structArr.Len()) | ||
| defer arr.Release() | ||
|
|
@@ -786,11 +793,11 @@ func (a *arrowProjectionVisitor) List(listType iceberg.ListType, listArr arrow.A | |
| return nil | ||
| } | ||
|
|
||
| valueArr := a.castIfNeeded(listType.ElementField(), valArr) | ||
| defer valueArr.Release() | ||
| valArr = a.castIfNeeded(listType.ElementField(), valArr) | ||
| defer valArr.Release() | ||
|
|
||
| var outType arrow.ListLikeType | ||
| elemField := a.constructField(listType.ElementField(), valueArr.DataType()) | ||
| elemField := a.constructField(listType.ElementField(), valArr.DataType()) | ||
| switch arr.DataType().ID() { | ||
| case arrow.LIST: | ||
| outType = arrow.ListOfField(elemField) | ||
|
|
@@ -801,7 +808,7 @@ func (a *arrowProjectionVisitor) List(listType iceberg.ListType, listArr arrow.A | |
| } | ||
|
|
||
| data := array.NewData(outType, arr.Len(), arr.Data().Buffers(), | ||
| []arrow.ArrayData{valueArr.Data()}, arr.NullN(), arr.Data().Offset()) | ||
| []arrow.ArrayData{valArr.Data()}, arr.NullN(), arr.Data().Offset()) | ||
| defer data.Release() | ||
| return array.MakeFromData(data) | ||
| } | ||
|
|
@@ -855,7 +862,8 @@ func ToRequestedSchema(ctx context.Context, requested, fileSchema *iceberg.Schem | |
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer result.Release() | ||
|
|
||
| return array.RecordFromStructArray(result.(*array.Struct), nil), nil | ||
| st.Release() | ||
| out := array.RecordFromStructArray(result.(*array.Struct), nil) | ||
| result.Release() | ||
| return out, nil | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was pretty vocal on this one on the Python side. I strongly believe that we should not expose the options of large types to the user, and that's the direction that we're heading with PyIceberg. In the end, it is up to Arrow to decide if you need large types, or if small types are sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's actually pretty difficult to do, and would make schema conversion inconsistent.
Determining whether to use the large types cannot be determined until you have the data at which point, if you're streaming the data, it's too late to switch as you can't safely change the schema during a stream. For example:
The same problem can occur for List/LargeList depending on the number of total elements in a given column among the lists.
We also can't determine ahead of time by the stats in the iceberg metadata alone whether or not we should use Large types or not. The only way to know ahead of time is to read the parquet file metadata for every data file before we start producing record batches and then reconcile whether or not we should use large types before we start processing the files.
It looks like in the pyiceberg PR you linked, if i'm reading it correctly, you just automatically push everything to large types for streaming to avoid the problems I mentioned above? which kinda defeats the benefits if the goal was to avoid using LargeTypes when they aren't needed (in most cases they aren't needed).