Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/avro_schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,17 @@ func init() {
Must(avro.NewField("added_files_count",
IntSchema,
avro.WithDoc("Added entry count"),
avro.WithDefault(int32(0)),
WithFieldID(504))),
Must(avro.NewField("existing_files_count",
IntSchema,
avro.WithDoc("Existing entry count"),
avro.WithDefault(int32(0)),
WithFieldID(505))),
Must(avro.NewField("deleted_files_count",
IntSchema,
avro.WithDoc("Deleted entry count"),
avro.WithDefault(int32(0)),
WithFieldID(506))),
Must(avro.NewField("partitions",
NullableSchema(
Expand Down Expand Up @@ -397,14 +400,17 @@ func init() {
Must(avro.NewField("added_files_count",
IntSchema,
avro.WithDoc("Added entry count"),
avro.WithDefault(int32(0)),
WithFieldID(504))),
Must(avro.NewField("existing_files_count",
IntSchema,
avro.WithDoc("Existing entry count"),
avro.WithDefault(int32(0)),
WithFieldID(505))),
Must(avro.NewField("deleted_files_count",
IntSchema,
avro.WithDoc("Deleted entry count"),
avro.WithDefault(int32(0)),
WithFieldID(506))),
Must(avro.NewField("partitions",
NullableSchema(
Expand Down
100 changes: 65 additions & 35 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,26 +309,47 @@ func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]Manif
}

type manifestFile struct {
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
Content ManifestContent `avro:"content"`
SeqNumber int64 `avro:"sequence_number"`
MinSeqNumber int64 `avro:"min_sequence_number"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
AddedFilesCount int32 `avro:"added_files_count"`
ExistingFilesCount int32 `avro:"existing_files_count"`
DeletedFilesCount int32 `avro:"deleted_files_count"`
AddedRowsCount int64 `avro:"added_rows_count"`
ExistingRowsCount int64 `avro:"existing_rows_count"`
DeletedRowsCount int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
FirstRowId *int64 `avro:"first_row_id"`
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
Content ManifestContent `avro:"content"`
SeqNumber int64 `avro:"sequence_number"`
MinSeqNumber int64 `avro:"min_sequence_number"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
// Canonical count fields (spec / Spark / Trino names).
AddedFilesCount int32 `avro:"added_files_count"`
ExistingFilesCount int32 `avro:"existing_files_count"`
DeletedFilesCount int32 `avro:"deleted_files_count"`
// Legacy count fields used by pre-1.4 Java Iceberg and Athena.
// Populated by the OCF decoder when the writer schema uses these names;
// normalizeLegacyCounts() promotes them into the canonical fields above.
LegacyAddedFilesCount int32 `avro:"added_data_files_count"`
LegacyExistingFilesCount int32 `avro:"existing_data_files_count"`
LegacyDeletedFilesCount int32 `avro:"deleted_data_files_count"`
AddedRowsCount int64 `avro:"added_rows_count"`
ExistingRowsCount int64 `avro:"existing_rows_count"`
DeletedRowsCount int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
FirstRowId *int64 `avro:"first_row_id"`

version int `avro:"-"`
}

// normalizeLegacyCounts promotes pre-1.4 Java Iceberg field values (Athena names)
// into the canonical count fields. Must be called once immediately after Avro decode.
func (m *manifestFile) normalizeLegacyCounts() {
if m.AddedFilesCount <= 0 && m.LegacyAddedFilesCount > 0 {
m.AddedFilesCount = m.LegacyAddedFilesCount
}
if m.ExistingFilesCount <= 0 && m.LegacyExistingFilesCount > 0 {
m.ExistingFilesCount = m.LegacyExistingFilesCount
}
if m.DeletedFilesCount <= 0 && m.LegacyDeletedFilesCount > 0 {
m.DeletedFilesCount = m.LegacyDeletedFilesCount
}
}

func (m *manifestFile) setVersion(v int) {
m.version = v
}
Expand All @@ -341,20 +362,23 @@ func (m *manifestFile) toV1(v1file *manifestFileV1) {
v1file.PartitionList = m.PartitionList
v1file.Key = m.Key

if m.AddedFilesCount >= 0 {
v1file.AddedFilesCount = &m.AddedFilesCount
addedCount := m.AddedDataFiles()
if addedCount >= 0 {
v1file.AddedFilesCount = &addedCount
} else {
v1file.AddedFilesCount = nil
}

if m.ExistingFilesCount >= 0 {
v1file.ExistingFilesCount = &m.ExistingFilesCount
existingCount := m.ExistingDataFiles()
if existingCount >= 0 {
v1file.ExistingFilesCount = &existingCount
} else {
v1file.ExistingFilesCount = nil
}

if m.DeletedFilesCount >= 0 {
v1file.DeletedFilesCount = &m.DeletedFilesCount
deletedCount := m.DeletedDataFiles()
if deletedCount >= 0 {
v1file.DeletedFilesCount = &deletedCount
} else {
v1file.DeletedFilesCount = nil
}
Expand Down Expand Up @@ -384,25 +408,25 @@ func (m *manifestFile) Length() int64 { return m.Len }
func (m *manifestFile) PartitionSpecID() int32 { return m.SpecID }
func (m *manifestFile) ManifestContent() ManifestContent { return m.Content }
func (m *manifestFile) SnapshotID() int64 { return m.AddedSnapshotID }
func (m *manifestFile) AddedDataFiles() int32 { return m.AddedFilesCount }
func (m *manifestFile) ExistingDataFiles() int32 { return m.ExistingFilesCount }
func (m *manifestFile) DeletedDataFiles() int32 { return m.DeletedFilesCount }
func (m *manifestFile) AddedRows() int64 { return m.AddedRowsCount }
func (m *manifestFile) ExistingRows() int64 { return m.ExistingRowsCount }
func (m *manifestFile) DeletedRows() int64 { return m.DeletedRowsCount }
func (m *manifestFile) SequenceNum() int64 { return m.SeqNumber }
func (m *manifestFile) MinSequenceNum() int64 { return m.MinSeqNumber }
func (m *manifestFile) KeyMetadata() []byte { return m.Key }

func (m *manifestFile) AddedDataFiles() int32 { return m.AddedFilesCount }
func (m *manifestFile) ExistingDataFiles() int32 { return m.ExistingFilesCount }
func (m *manifestFile) DeletedDataFiles() int32 { return m.DeletedFilesCount }
func (m *manifestFile) AddedRows() int64 { return m.AddedRowsCount }
func (m *manifestFile) ExistingRows() int64 { return m.ExistingRowsCount }
func (m *manifestFile) DeletedRows() int64 { return m.DeletedRowsCount }
func (m *manifestFile) SequenceNum() int64 { return m.SeqNumber }
func (m *manifestFile) MinSequenceNum() int64 { return m.MinSeqNumber }
func (m *manifestFile) KeyMetadata() []byte { return m.Key }
func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
func (m *manifestFile) Partitions() []FieldSummary {
if m.PartitionList == nil {
return nil
}

return *m.PartitionList
}

func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)
}
Expand Down Expand Up @@ -537,6 +561,7 @@ type ManifestFile interface {
// WriteEntries(out io.Writer, entries []ManifestEntry) error

setVersion(int)
normalizeLegacyCounts()
}

type fallbackManifest[T any] interface {
Expand All @@ -553,7 +578,9 @@ func decodeManifestsWithFallback[P fallbackManifest[T], T any](dec *ocf.Decoder)
return nil, err
}

results = append(results, tmp.toFile())
result := tmp.toFile()
result.normalizeLegacyCounts()
results = append(results, result)
}

return results, dec.Error()
Expand All @@ -571,6 +598,7 @@ func decodeManifests[I interface {
}

tmp.setVersion(version)
tmp.normalizeLegacyCounts()
results = append(results, tmp)
}

Expand Down Expand Up @@ -1440,6 +1468,8 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
}

wrapped := *(file.(*manifestFile))


if m.version == 3 {
// Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil {
Expand Down
86 changes: 86 additions & 0 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,3 +1641,89 @@ func (m *ManifestTestSuite) TestWriteManifestClosesWriterOnEntryError() {
m.Require().ErrorContains(err, "only entries with status ADDED")
m.Require().ErrorIs(err, errLimitedWrite)
}

// TestReadManifestListAthenaFieldNames verifies that manifest list entries written
// with pre-1.4 Java Iceberg / Athena field names (added_data_files_count, etc.) are
// decoded correctly via post-decode normalization into the canonical count fields.
func (m *ManifestTestSuite) TestReadManifestListAthenaFieldNames() {
// Athena writes added_data_files_count / existing_data_files_count /
// deleted_data_files_count instead of the spec names. Simulate that by
// encoding an OCF file whose embedded schema uses the Athena names.
athenaSchema := `{
"type": "record",
"name": "manifest_file",
"fields": [
{"name": "manifest_path", "type": "string", "field-id": 500},
{"name": "manifest_length", "type": "long", "field-id": 501},
{"name": "partition_spec_id", "type": "int", "field-id": 502},
{"name": "content", "type": "int", "default": 0, "field-id": 517},
{"name": "sequence_number", "type": "long", "default": 0, "field-id": 515},
{"name": "min_sequence_number", "type": "long", "default": 0, "field-id": 516},
{"name": "added_snapshot_id", "type": "long", "field-id": 503},
{"name": "added_data_files_count", "type": "int", "field-id": 504},
{"name": "existing_data_files_count", "type": "int", "field-id": 505},
{"name": "deleted_data_files_count", "type": "int", "field-id": 506},
{"name": "added_rows_count", "type": "long", "field-id": 512},
{"name": "existing_rows_count", "type": "long", "field-id": 513},
{"name": "deleted_rows_count", "type": "long", "field-id": 514},
{"name": "key_metadata", "type": ["null", "bytes"], "default": null, "field-id": 519}
]
}`

writerSchema, err := avro.Parse(athenaSchema)
m.Require().NoError(err)

type athenaManifestRecord struct {
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
Content int32 `avro:"content"`
SeqNumber int64 `avro:"sequence_number"`
MinSeqNumber int64 `avro:"min_sequence_number"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
AddedDataFiles int32 `avro:"added_data_files_count"`
ExistingDataFiles int32 `avro:"existing_data_files_count"`
DeletedDataFiles int32 `avro:"deleted_data_files_count"`
AddedRows int64 `avro:"added_rows_count"`
ExistingRows int64 `avro:"existing_rows_count"`
DeletedRows int64 `avro:"deleted_rows_count"`
Key []byte `avro:"key_metadata"`
}

record := athenaManifestRecord{
Path: "s3://bucket/metadata/athena-m0.avro",
Len: 1024,
SpecID: 0,
AddedSnapshotID: 42,
AddedDataFiles: 7,
ExistingDataFiles: 3,
AddedRows: 100,
}

var buf bytes.Buffer
enc, err := ocf.NewEncoderWithSchema(writerSchema, &buf,
ocf.WithSchemaMarshaler(func(schema avro.Schema) ([]byte, error) {
return []byte(athenaSchema), nil
}),
ocf.WithMetadata(map[string][]byte{
"format-version": {'2'},
"snapshot-id": []byte("42"),
"sequence-number": []byte("1"),
"parent-snapshot-id": []byte("null"),
}),
)
m.Require().NoError(err)
m.Require().NoError(enc.Encode(record))
m.Require().NoError(enc.Close())

files, err := ReadManifestList(&buf)
m.Require().NoError(err)
m.Require().Len(files, 1)

f := files[0]
m.Equal("s3://bucket/metadata/athena-m0.avro", f.FilePath())
// Athena's added_data_files_count must be normalized into AddedFilesCount.
m.EqualValues(7, f.AddedDataFiles(), "alias-decoded added count")
m.EqualValues(3, f.ExistingDataFiles(), "alias-decoded existing count")
m.True(f.HasAddedFiles(), "HasAddedFiles must be true")
}
2 changes: 1 addition & 1 deletion table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,8 +1601,8 @@ func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation string,
cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) {
return newPositionDeleteWriter(rootLocation, fs, meta, props, opts...)
}, withSchemaSanitization(false))
nextCount, stopCount := iter.Pull(args.counter)
if latestMetadata.PartitionSpec().IsUnpartitioned() {
nextCount, stopCount := iter.Pull(args.counter)
tasks := func(yield func(WriteTask) bool) {
defer stopCount()

Expand Down
19 changes: 18 additions & 1 deletion table/internal/parquet_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const (
ParquetBloomFilterMaxBytesKey = "write.parquet.bloom-filter-max-bytes"
ParquetBloomFilterMaxBytesDefault = 1024 * 1024
ParquetBloomFilterColumnEnabledKeyPrefix = "write.parquet.bloom-filter-enabled.column"
ParquetRootRepetitionKey = "write.parquet.root-repetition"
ParquetRootRepetitionDefault = "required"
)

type parquetFormat struct{}
Expand Down Expand Up @@ -256,8 +258,23 @@ func (parquetFormat) GetWriteProperties(props iceberg.Properties) any {
slog.Warn("unrecognized compression codec, falling back to uncompressed", "codec", compression)
}

var rootRepetition parquet.Repetition
switch props.Get(ParquetRootRepetitionKey, ParquetRootRepetitionDefault) {
case "required":
rootRepetition = parquet.Repetitions.Required
case "optional":
rootRepetition = parquet.Repetitions.Optional
case "repeated":
rootRepetition = parquet.Repetitions.Repeated
default:
slog.Warn("unrecognized root repetition, falling back to required",
"repetition", props.Get(ParquetRootRepetitionKey, ParquetRootRepetitionDefault))
rootRepetition = parquet.Repetitions.Required
}

return append(writerProps, parquet.WithCompression(codec),
parquet.WithCompressionLevel(compressionLevel))
parquet.WithCompressionLevel(compressionLevel),
parquet.WithRootRepetition(rootRepetition))
}

func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches []arrow.RecordBatch) (iceberg.DataFile, error) {
Expand Down
Loading