diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go index a42e92bbf..2653f1140 100644 --- a/cmd/iceberg/main.go +++ b/cmd/iceberg/main.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "strconv" "strings" "github.com/apache/iceberg-go" @@ -47,6 +48,7 @@ Usage: iceberg create [options] (namespace | table) IDENTIFIER iceberg drop [options] (namespace | table) IDENTIFIER iceberg files [options] TABLE_ID [--history] + iceberg add-files [options] TABLE_ID FILES... [--branch TEXT] [--ignore-duplicates] iceberg rename [options] iceberg properties [options] get (namespace | table) IDENTIFIER [PROPNAME] iceberg properties [options] set (namespace | table) IDENTIFIER PROPNAME VALUE @@ -63,6 +65,7 @@ Commands: location Return the location of the table. drop Operations to drop a namespace or table. files List all the files of the table. + add-files Add existing data files to a table. rename Rename a table. properties Properties on tables/namespaces. @@ -70,6 +73,7 @@ Arguments: PARENT Catalog parent namespace IDENTIFIER fully qualified namespace or table TABLE_ID full path to a table + FILES one or more data file paths to add (for add-files) PROPNAME name of a property VALUE value to set @@ -92,7 +96,9 @@ Options: --partition-spec TEXT specify partition spec as comma-separated field names(for create table use only) Ex:"field1,field2" --sort-order TEXT specify sort order as field:direction[:null-order] format(for create table use only) - Ex:"field1:asc,field2:desc:nulls-first,field3:asc:nulls-last"` + Ex:"field1:asc,field2:desc:nulls-first,field3:asc:nulls-last" + --branch TEXT target branch for add-files [default: main] + --ignore-duplicates allow adding files already referenced by the table` type Config struct { List bool `docopt:"list"` @@ -105,6 +111,7 @@ type Config struct { Create bool `docopt:"create"` Drop bool `docopt:"drop"` Files bool `docopt:"files"` + AddFiles bool `docopt:"add-files"` Rename bool `docopt:"rename"` Get bool `docopt:"get"` @@ -117,27 +124,30 @@ type Config struct { RenameFrom string `docopt:""` RenameTo string `docopt:""` - Parent string `docopt:"PARENT"` - Ident string `docopt:"IDENTIFIER"` - TableID string `docopt:"TABLE_ID"` - PropName string `docopt:"PROPNAME"` - Value string `docopt:"VALUE"` - - Catalog string `docopt:"--catalog"` - URI string `docopt:"--uri"` - Output string `docopt:"--output"` - History bool `docopt:"--history"` - Cred string `docopt:"--credential"` - Token string `docopt:"--token"` - Warehouse string `docopt:"--warehouse"` - Config string `docopt:"--config"` - Scope string `docopt:"--scope"` - Description string `docopt:"--description"` - LocationURI string `docopt:"--location-uri"` - SchemaStr string `docopt:"--schema"` - TableProps string `docopt:"--properties"` - PartitionSpec string `docopt:"--partition-spec"` - SortOrder string `docopt:"--sort-order"` + Parent string `docopt:"PARENT"` + Ident string `docopt:"IDENTIFIER"` + TableID string `docopt:"TABLE_ID"` + FilesToAdd []string `docopt:"FILES"` + PropName string `docopt:"PROPNAME"` + Value string `docopt:"VALUE"` + + Catalog string `docopt:"--catalog"` + URI string `docopt:"--uri"` + Output string `docopt:"--output"` + History bool `docopt:"--history"` + Cred string `docopt:"--credential"` + Token string `docopt:"--token"` + Warehouse string `docopt:"--warehouse"` + Config string `docopt:"--config"` + Scope string `docopt:"--scope"` + Description string `docopt:"--description"` + LocationURI string `docopt:"--location-uri"` + SchemaStr string `docopt:"--schema"` + TableProps string `docopt:"--properties"` + PartitionSpec string `docopt:"--partition-spec"` + SortOrder string `docopt:"--sort-order"` + Branch string `docopt:"--branch"` + IgnoreDuplicates bool `docopt:"--ignore-duplicates"` } func main() { @@ -343,6 +353,24 @@ func main() { case cfg.Files: tbl := loadTable(ctx, output, cat, cfg.TableID) output.Files(tbl, cfg.History) + case cfg.AddFiles: + tbl := loadTable(ctx, output, cat, cfg.TableID) + txn := tbl.NewTransaction() + var opts []table.WriteOpt + if cfg.Branch != "" { + opts = append(opts, table.WithBranch(cfg.Branch)) + } + err := txn.AddFiles(ctx, cfg.FilesToAdd, nil, cfg.IgnoreDuplicates, opts...) + if err != nil { + output.Error(err) + os.Exit(1) + } + _, err = txn.Commit(ctx) + if err != nil { + output.Error(err) + os.Exit(1) + } + output.Text("Added " + strconv.Itoa(len(cfg.FilesToAdd)) + " file(s) to " + cfg.TableID) } } diff --git a/cmd/iceberg/output.go b/cmd/iceberg/output.go index 695ba28d9..b767ae97f 100644 --- a/cmd/iceberg/output.go +++ b/cmd/iceberg/output.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "sort" "strconv" "strings" @@ -106,6 +107,32 @@ func (t textOutput) DescribeTable(tbl *table.Table) { WithData(pterm.TableData{ {"Current Snapshot", snap}, }).Render() + + refsData := pterm.TableData{{"Name", "Type", "Snapshot ID"}} + type refRow struct { + name string + ref table.SnapshotRef + } + var refRows []refRow + for name, ref := range tbl.Metadata().Refs() { + refRows = append(refRows, refRow{name, ref}) + } + sort.Slice(refRows, func(i, j int) bool { return refRows[i].name < refRows[j].name }) + for _, r := range refRows { + refsData = append(refsData, []string{ + r.name, + string(r.ref.SnapshotRefType), + strconv.FormatInt(r.ref.SnapshotID, 10), + }) + } + if len(refsData) > 1 { + pterm.Println("Refs") + pterm.DefaultTable. + WithHasHeader(true). + WithHeaderRowSeparator("-"). + WithData(refsData).Render() + } + pterm.DefaultTree.WithRoot(snapshotTreeNode).Render() pterm.Println("Properties") propTable.Render() @@ -241,12 +268,18 @@ func (j jsonOutput) Identifiers(idList []table.Identifier) { func (j jsonOutput) DescribeTable(tbl *table.Table) { type dataType struct { - Metadata table.Metadata `json:"metadata,omitempty"` - MetadataLocation string `json:"metadata-location,omitempty"` - SortOrder table.SortOrder `json:"sort-order,omitempty"` - CurrentSnapshot *table.Snapshot `json:"current-snapshot,omitempty"` - Spec iceberg.PartitionSpec `json:"spec,omitempty"` - Schema *iceberg.Schema `json:"schema,omitempty"` + Metadata table.Metadata `json:"metadata,omitempty"` + MetadataLocation string `json:"metadata-location,omitempty"` + SortOrder table.SortOrder `json:"sort-order,omitempty"` + CurrentSnapshot *table.Snapshot `json:"current-snapshot,omitempty"` + Spec iceberg.PartitionSpec `json:"spec,omitempty"` + Schema *iceberg.Schema `json:"schema,omitempty"` + Refs map[string]table.SnapshotRef `json:"refs,omitempty"` + } + + refs := make(map[string]table.SnapshotRef) + for name, ref := range tbl.Metadata().Refs() { + refs[name] = ref } data := dataType{ @@ -256,6 +289,7 @@ func (j jsonOutput) DescribeTable(tbl *table.Table) { CurrentSnapshot: tbl.CurrentSnapshot(), Spec: tbl.Spec(), Schema: tbl.Schema(), + Refs: refs, } if err := json.NewEncoder(os.Stdout).Encode(data); err != nil { j.Error(err) diff --git a/cmd/iceberg/output_test.go b/cmd/iceberg/output_test.go index 3cb20196e..45c00eb0a 100644 --- a/cmd/iceberg/output_test.go +++ b/cmd/iceberg/output_test.go @@ -121,6 +121,12 @@ Current Schema, id=1 Current Snapshot | append, {}: id=3055729675574597004, parent_id=3051729675574597004, schema_id=1, sequence_number=1, timestamp_ms=1555100955770, manifest_list=s3://a/b/2.avro +Refs +Name | Type | Snapshot ID +----------------------------------- +main | branch | 3055729675574597004 +test | tag | 3051729675574597004 + Snapshots ├──Snapshot 3051729675574597004, schema 1: s3://a/b/1.avro └──Snapshot 3055729675574597004, schema 1: s3://a/b/2.avro @@ -341,7 +347,7 @@ func Test_jsonOutput_DescribeTable(t *testing.T) { } }`, }, - expected: `{"metadata":{"last-sequence-number":34,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"},"schema-id":1},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"current-snapshot-id":3055729675574597004,"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}},"sort-order":{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]},"current-snapshot":{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1},"spec":{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}}`, + expected: `{"metadata":{"last-sequence-number":34,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"},"schema-id":1},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"current-snapshot-id":3055729675574597004,"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}},"sort-order":{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]},"current-snapshot":{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1},"spec":{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"name":"x","transform":"identity"}]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true},{"type":"long","id":2,"name":"y","required":true,"doc":"comment"},{"type":"long","id":3,"name":"z","required":true}],"schema-id":1,"identifier-field-ids":[1,2]},"refs":{"main":{"snapshot-id":3055729675574597004,"type":"branch"},"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000}}}`, }, { name: "Describe a table with empty objects", @@ -381,7 +387,7 @@ func Test_jsonOutput_DescribeTable(t *testing.T) { "refs": { } }`, }, - expected: `{"metadata":{"last-sequence-number":0,"format-version":2,"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","location":"s3://bucket/test/location","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0},"sort-order":{"order-id":0,"fields":[]},"spec":{"spec-id":0,"fields":[]},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}}`, + expected: `{"metadata":{"current-schema-id":0,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":3,"last-partition-id":1000,"last-sequence-number":0,"last-updated-ms":1602638573590,"location":"s3://bucket/test/location","partition-specs":[{"spec-id":0,"fields":[]}],"properties":{"read.split.target.size":"134217728"},"schemas":[{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]}],"sort-orders":[{"order-id":0,"fields":[]}],"table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1"},"schema":{"type":"struct","fields":[{"type":"long","id":1,"name":"x","required":true}],"schema-id":0,"identifier-field-ids":[]},"sort-order":{"order-id":0,"fields":[]},"spec":{"spec-id":0,"fields":[]}}`, }, } for _, tt := range tests { diff --git a/table/metadata.go b/table/metadata.go index 5d303c454..4b2dcd18a 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -317,6 +317,22 @@ func (b *MetadataBuilder) currentSnapshot() *Snapshot { return s } +// SnapshotIDForRef returns the snapshot ID for the given ref (branch or tag name). +// For MainBranch it returns currentSnapshotID; for other refs it looks up b.refs. +// Returns nil if the ref does not exist (e.g. a new branch not yet created). +func (b *MetadataBuilder) SnapshotIDForRef(refName string) *int64 { + if refName == MainBranch { + return b.currentSnapshotID + } + if ref, ok := b.refs[refName]; ok { + id := ref.SnapshotID + + return &id + } + + return nil +} + func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) error { if err := checkSchemaCompatibility(schema, b.formatVersion); err != nil { return err diff --git a/table/metadata_builder_internal_test.go b/table/metadata_builder_internal_test.go index 6c9fed32e..a845eb949 100644 --- a/table/metadata_builder_internal_test.go +++ b/table/metadata_builder_internal_test.go @@ -570,6 +570,36 @@ func TestSetBranchSnapshotCreatesBranchIfNotExists(t *testing.T) { require.Equal(t, int64(2), builder.updates[1].(*setSnapshotRefUpdate).SnapshotID) } +func TestSnapshotIDForRef(t *testing.T) { + builder := builderWithoutChanges(2) + schemaID := 0 + snapshot := Snapshot{ + SnapshotID: 2, + ParentSnapshotID: nil, + SequenceNumber: 0, + TimestampMs: builder.base.LastUpdatedMillis(), + ManifestList: "/snap-1.avro", + Summary: &Summary{Operation: OpAppend}, + SchemaID: &schemaID, + } + require.NoError(t, builder.AddSnapshot(&snapshot)) + require.NoError(t, builder.SetSnapshotRef(MainBranch, 2, BranchRef)) + require.NoError(t, builder.SetSnapshotRef("feature", 2, BranchRef)) + + // MainBranch returns currentSnapshotID + mainID := builder.SnapshotIDForRef(MainBranch) + require.NotNil(t, mainID) + require.Equal(t, int64(2), *mainID) + + // Other ref returns ref's snapshot ID + featureID := builder.SnapshotIDForRef("feature") + require.NotNil(t, featureID) + require.Equal(t, int64(2), *featureID) + + // Unknown ref returns nil + require.Nil(t, builder.SnapshotIDForRef("nonexistent")) +} + func TestRemoveSnapshotRemovesBranch(t *testing.T) { builder := builderWithoutChanges(2) schemaID := 0 diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 9bbc0e85d..5b83236f2 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -58,8 +58,8 @@ func newManifestListFileName(snapshotID int64, attempt int, commit uuid.UUID) st return fmt.Sprintf("snap-%d-%d-%s.avro", snapshotID, attempt, commit) } -func newFastAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { - prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) +func newFastAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps, branch) prod.producerImpl = &fastAppendFiles{base: prod} return prod @@ -105,8 +105,8 @@ type overwriteFiles struct { base *snapshotProducer } -func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { - prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) +func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps, branch) prod.producerImpl = &overwriteFiles{base: prod} return prod @@ -121,8 +121,11 @@ func (of *overwriteFiles) existingManifests() ([]iceberg.ManifestFile, error) { // determine if there are any existing manifest files existingFiles := make([]iceberg.ManifestFile, 0) - snap := of.base.txn.meta.currentSnapshot() - if snap == nil { + if of.base.parentSnapshotID <= 0 { + return existingFiles, nil + } + snap, err := of.base.txn.meta.SnapshotByID(of.base.parentSnapshotID) + if err != nil || snap == nil { return existingFiles, nil } @@ -387,8 +390,8 @@ type mergeAppendFiles struct { mergeEnabled bool } -func newMergeAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { - prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) +func newMergeAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps, branch) prod.producerImpl = &mergeAppendFiles{ fastAppendFiles: fastAppendFiles{base: prod}, targetSizeBytes: txn.meta.props.GetInt(ManifestTargetSizeBytesKey, ManifestTargetSizeBytesDefault), @@ -438,9 +441,10 @@ type snapshotProducer struct { manifestCount atomic.Int32 deletedFiles map[string]iceberg.DataFile snapshotProps iceberg.Properties + branch string } -func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { +func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties, branch string) *snapshotProducer { var ( commit uuid.UUID parentSnapshot int64 = -1 @@ -452,8 +456,11 @@ func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO commit = *commitUUID } - if snap := txn.meta.currentSnapshot(); snap != nil { - parentSnapshot = snap.SnapshotID + if branch == "" { + branch = MainBranch + } + if sid := txn.meta.SnapshotIDForRef(branch); sid != nil && *sid > 0 { + parentSnapshot = *sid } return &snapshotProducer{ @@ -466,6 +473,7 @@ func createSnapshotProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO addedFiles: []iceberg.DataFile{}, deletedFiles: make(map[string]iceberg.DataFile), snapshotProps: snapshotProps, + branch: branch, } } @@ -771,10 +779,18 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { snapshot.AddedRows = &addedRows } + var assertReq Requirement + if sp.parentSnapshotID > 0 { + parentID := sp.parentSnapshotID + assertReq = AssertRefSnapshotID(sp.branch, &parentID) + } else { + assertReq = AssertRefSnapshotID(sp.branch, nil) + } + return []Update{ NewAddSnapshotUpdate(&snapshot), - NewSetSnapshotRefUpdate("main", sp.snapshotID, BranchRef, -1, -1, -1), + NewSetSnapshotRefUpdate(sp.branch, sp.snapshotID, BranchRef, -1, -1, -1), }, []Requirement{ - AssertRefSnapshotID("main", sp.txn.meta.currentSnapshotID), + assertReq, }, nil } diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index 88e981ed4..1d06a6c86 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -193,7 +193,7 @@ func TestCommitV3RowLineage(t *testing.T) { // Single data file with record count 1 (newTestDataFile uses 1, 1 for record count and file size). const expectedAddedRows = 1 - sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data.parquet", nil) sp.appendDataFile(df) @@ -226,7 +226,7 @@ func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) { txn.meta.formatVersion = 3 // First commit: new table, append one file (1 row). - sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil, MainBranch) sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) updates1, reqs1, err := sp1.commit() require.NoError(t, err, "first commit should succeed") @@ -244,7 +244,7 @@ func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) { tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) txn2 := tbl2.NewTransaction() txn2.meta.formatVersion = 3 - sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil, MainBranch) sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) updates2, reqs2, err := sp2.commit() require.NoError(t, err, "second commit should succeed") @@ -270,7 +270,7 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) { txn.meta.formatVersion = 3 // First commit: one file (1 row). - sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil, MainBranch) sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) updates1, reqs1, err := sp1.commit() require.NoError(t, err, "first commit should succeed") @@ -289,7 +289,7 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) { } txn2.meta.props[ManifestMergeEnabledKey] = "true" txn2.meta.props[ManifestMinMergeCountKey] = "2" - sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil, MainBranch) sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) updates2, reqs2, err := sp2.commit() require.NoError(t, err, "second commit (merge) should succeed") @@ -354,7 +354,7 @@ func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) { txn.meta.formatVersion = 3 // Use multi-row files to make row-range starts obvious. - sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil, MainBranch) sp1.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-1.parquet", nil, 3)) updates1, reqs1, err := sp1.commit() require.NoError(t, err, "first commit should succeed") @@ -376,7 +376,7 @@ func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) { tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) txn2 := tbl2.NewTransaction() txn2.meta.formatVersion = 3 - sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil, MainBranch) sp2.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-2.parquet", nil, 5)) updates2, _, err := sp2.commit() require.NoError(t, err, "second commit should succeed") @@ -396,7 +396,7 @@ func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) { mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite) txn := createTestTransaction(t, mem, spec) - sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil, MainBranch) validPartition := map[int]any{1000: int32(1)} sp.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", validPartition)) sp.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) @@ -411,7 +411,7 @@ func TestManifestMergeManagerClosesWriterOnError(t *testing.T) { mem := newMemIO(manifestHeaderSize(t, 2, spec, schema), errLimitedWrite) txn := createTestTransaction(t, mem, spec) - sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, mem, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data-1.parquet", nil) entries := []iceberg.ManifestEntry{ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID, nil, nil, df), @@ -483,7 +483,7 @@ func TestOverwriteFilesExistingManifestsClosesWriterOnError(t *testing.T) { txn.meta.snapshotList = []Snapshot{snap} txn.meta.currentSnapshotID = &snapshotID - sp := newOverwriteFilesProducer(OpOverwrite, txn, mem, nil, nil) + sp := newOverwriteFilesProducer(OpOverwrite, txn, mem, nil, nil, MainBranch) sp.deleteDataFile(deletedFile) _, err = sp.existingManifests() @@ -598,7 +598,7 @@ func TestManifestWriterClosesUnderlyingFile(t *testing.T) { spec := iceberg.NewPartitionSpec() txn := createTestTransaction(t, trackIO, spec) - sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data-1.parquet", nil) sp.appendDataFile(df) @@ -618,7 +618,7 @@ func TestCreateManifestClosesUnderlyingFile(t *testing.T) { txn := createTestTransaction(t, trackIO, spec) schema := simpleSchema() - sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil, MainBranch) df := newTestDataFile(t, spec, "file://data-1.parquet", nil) entries := []iceberg.ManifestEntry{ iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID, nil, nil, df), @@ -680,7 +680,7 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t *testing.T) { txn.meta.snapshotList = []Snapshot{snap} txn.meta.currentSnapshotID = &snapshotID - sp := newOverwriteFilesProducer(OpOverwrite, txn, trackIO, nil, nil) + sp := newOverwriteFilesProducer(OpOverwrite, txn, trackIO, nil, nil, MainBranch) sp.deleteDataFile(deletedFile) trackIO.writers = make(map[string]*trackingWriteCloser) @@ -754,7 +754,7 @@ func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) { spec := iceberg.NewPartitionSpec() txn := createTestTransaction(t, blockingIO, spec) - sp := createSnapshotProducer(OpAppend, txn, blockingIO, nil, nil) + sp := createSnapshotProducer(OpAppend, txn, blockingIO, nil, nil, MainBranch) errDeletedEntries := errors.New("simulated deletedEntries error") sp.producerImpl = &errorOnDeletedEntries{ base: sp, @@ -785,3 +785,36 @@ func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) { require.Zero(t, writerCount, "expected no writerFactory to be created when deletedEntries is called first") } } + +// TestCreateSnapshotProducerUsesBranch verifies that createSnapshotProducer sets +// branch and resolves parentSnapshotID from SnapshotIDForRef(branch). +func TestCreateSnapshotProducerUsesBranch(t *testing.T) { + builder := builderWithoutChanges(2) + schemaID := 0 + const snapID int64 = 42 + snapshot := Snapshot{ + SnapshotID: snapID, + ParentSnapshotID: nil, + SequenceNumber: 0, + TimestampMs: builder.base.LastUpdatedMillis(), + ManifestList: "table-location/metadata/snap-1.avro", + Summary: &Summary{Operation: OpAppend}, + SchemaID: &schemaID, + } + require.NoError(t, builder.AddSnapshot(&snapshot)) + require.NoError(t, builder.SetSnapshotRef(MainBranch, snapID, BranchRef)) + require.NoError(t, builder.SetSnapshotRef("feature", snapID, BranchRef)) + + meta, err := builder.Build() + require.NoError(t, err) + + mem := newMemIO(0, nil) + tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", func(context.Context) (iceio.IO, error) { + return mem, nil + }, nil) + txn := tbl.NewTransaction() + + sp := createSnapshotProducer(OpAppend, txn, mem, nil, nil, "feature") + require.Equal(t, "feature", sp.branch) + require.Equal(t, snapID, sp.parentSnapshotID) +} diff --git a/table/transaction.go b/table/transaction.go index c946c17b3..c8156eea2 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -40,28 +40,61 @@ import ( "golang.org/x/sync/errgroup" ) +// WriteOpt is an option for write operations (Append, AppendTable, AddFiles, etc.). +type WriteOpt interface { + applyWriteOpt(*writeOpts) +} + +type writeOpts struct { + branch string +} + +func (o *writeOpts) applyWriteOpt(target *writeOpts) { + if o.branch != "" { + target.branch = o.branch + } +} + +// WithBranch sets the target branch for the write. Default is main. +func WithBranch(branch string) WriteOpt { + return &writeOpts{branch: branch} +} + +func resolveBranch(opts []WriteOpt) string { + var o writeOpts + for _, opt := range opts { + opt.applyWriteOpt(&o) + } + if o.branch == "" { + return MainBranch + } + + return o.branch +} + type snapshotUpdate struct { txn *Transaction io io.WriteFileIO snapshotProps iceberg.Properties operation Operation + branch string } func (s snapshotUpdate) fastAppend() *snapshotProducer { - return newFastAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) + return newFastAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps, s.branch) } func (s snapshotUpdate) mergeOverwrite(commitUUID *uuid.UUID) *snapshotProducer { op := s.operation - if s.operation == OpOverwrite && s.txn.meta.currentSnapshot() == nil { + if s.operation == OpOverwrite && s.txn.meta.SnapshotIDForRef(s.branch) == nil { op = OpAppend } - return newOverwriteFilesProducer(op, s.txn, s.io, commitUUID, s.snapshotProps) + return newOverwriteFilesProducer(op, s.txn, s.io, commitUUID, s.snapshotProps, s.branch) } func (s snapshotUpdate) mergeAppend() *snapshotProducer { - return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps) + return newMergeAppendFilesProducer(OpAppend, s.txn, s.io, nil, s.snapshotProps, s.branch) } type Transaction struct { @@ -123,9 +156,9 @@ func (t *Transaction) apply(updates []Update, reqs []Requirement) error { return nil } -func (t *Transaction) appendSnapshotProducer(afs io.IO, props iceberg.Properties) *snapshotProducer { +func (t *Transaction) appendSnapshotProducer(afs io.IO, props iceberg.Properties, branch string) *snapshotProducer { manifestMerge := t.meta.props.GetBool(ManifestMergeEnabledKey, ManifestMergeEnabledDefault) - updateSnapshot := t.updateSnapshot(afs, props, OpAppend) + updateSnapshot := t.updateSnapshot(afs, props, OpAppend, branch) if manifestMerge { return updateSnapshot.mergeAppend() } @@ -133,12 +166,17 @@ func (t *Transaction) appendSnapshotProducer(afs io.IO, props iceberg.Properties return updateSnapshot.fastAppend() } -func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties, operation Operation) snapshotUpdate { +func (t *Transaction) updateSnapshot(fs io.IO, props iceberg.Properties, operation Operation, branch string) snapshotUpdate { + if branch == "" { + branch = MainBranch + } + return snapshotUpdate{ txn: t, io: fs.(io.WriteFileIO), snapshotProps: props, operation: operation, + branch: branch, } } @@ -305,19 +343,20 @@ func (t *Transaction) ExpireSnapshots(opts ...ExpireSnapshotsOpt) error { return t.apply(updates, reqs) } -func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { +func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties, opts ...WriteOpt) error { rdr := array.NewTableReader(tbl, batchSize) defer rdr.Release() - return t.Append(ctx, rdr, snapshotProps) + return t.Append(ctx, rdr, snapshotProps, opts...) } -func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties) error { +func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapshotProps iceberg.Properties, opts ...WriteOpt) error { fs, err := t.tbl.fsF(ctx) if err != nil { return err } - appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + branch := resolveBranch(opts) + appendFiles := t.appendSnapshotProducer(fs, snapshotProps, branch) itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ sc: rdr.Schema(), itr: array.IterFromReader(rdr), @@ -348,10 +387,10 @@ func (t *Transaction) Append(ctx context.Context, rdr array.RecordReader, snapsh // operation is only valid if the data is exactly the same as the previous snapshot. // // For now, we'll keep using an overwrite operation. -func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties) error { +func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, filesToAdd []string, snapshotProps iceberg.Properties, opts ...WriteOpt) error { if len(filesToDelete) == 0 { if len(filesToAdd) > 0 { - return t.AddFiles(ctx, filesToAdd, snapshotProps, false) + return t.AddFiles(ctx, filesToAdd, snapshotProps, false, opts...) } } @@ -376,7 +415,11 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files return errors.New("add file paths must be unique for ReplaceDataFiles") } - s := t.meta.currentSnapshot() + branch := resolveBranch(opts) + var s *Snapshot + if sid := t.meta.SnapshotIDForRef(branch); sid != nil { + s, _ = t.meta.SnapshotByID(*sid) + } if s == nil { return fmt.Errorf("%w: cannot replace files in a table without an existing snapshot", ErrInvalidOperation) } @@ -417,7 +460,7 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite, branch).mergeOverwrite(&commitUUID) for _, df := range markedForDeletion { updater.deleteDataFile(df) @@ -566,7 +609,7 @@ func (t *Transaction) AddDataFiles(ctx context.Context, dataFiles []iceberg.Data } } - appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + appendFiles := t.appendSnapshotProducer(fs, snapshotProps, MainBranch) for _, df := range dataFiles { appendFiles.appendDataFile(df) } @@ -669,7 +712,7 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context, filesTo } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, OpOverwrite, MainBranch).mergeOverwrite(&commitUUID) for _, df := range markedForDeletion { updater.deleteDataFile(df) @@ -687,7 +730,7 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context, filesTo return t.apply(updates, reqs) } -func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool) error { +func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool, opts ...WriteOpt) error { set := make(map[string]string) for _, f := range files { set[f] = f @@ -714,7 +757,7 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp } } if len(referenced) > 0 { - return fmt.Errorf("cannot add files that are already referenced by table, files: %v", referenced) + return fmt.Errorf("cannot add files that are already referenced by table, files: %s", referenced) } } } @@ -736,7 +779,8 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp return err } - updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend() + branch := resolveBranch(opts) + updater := t.updateSnapshot(fs, snapshotProps, OpAppend, branch).fastAppend() dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files)) for df, err := range dataFiles { @@ -785,6 +829,7 @@ type overwriteOperation struct { concurrency int filter iceberg.BooleanExpression caseSensitive bool + branch string } // OverwriteOption applies options to overwrite operations @@ -820,6 +865,13 @@ func WithOverwriteCaseInsensitive() OverwriteOption { } } +// WithOverwriteBranch sets the target branch for the overwrite. Default is main. +func WithOverwriteBranch(branch string) OverwriteOption { + return func(op *overwriteOperation) { + op.branch = branch + } +} + // Overwrite overwrites the table data using a RecordReader. // // An optional filter (see WithOverwriteFilter) determines which existing data to delete or rewrite: @@ -844,12 +896,16 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, sna concurrency: runtime.GOMAXPROCS(0), filter: iceberg.AlwaysTrue{}, caseSensitive: true, + branch: MainBranch, } for _, apply := range opts { apply(&overwrite) } + if overwrite.branch == "" { + overwrite.branch = MainBranch + } - updater, err := t.performCopyOnWriteDeletion(ctx, OpOverwrite, snapshotProps, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency) + updater, err := t.performCopyOnWriteDeletion(ctx, OpOverwrite, snapshotProps, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency, overwrite.branch) if err != nil { return err } @@ -880,7 +936,7 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader, sna return t.apply(updates, reqs) } -func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation Operation, snapshotProps iceberg.Properties, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) (*snapshotProducer, error) { +func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation Operation, snapshotProps iceberg.Properties, filter iceberg.BooleanExpression, caseSensitive bool, concurrency int, branch string) (*snapshotProducer, error) { fs, err := t.tbl.fsF(ctx) if err != nil { return nil, err @@ -898,8 +954,11 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context, operation } } + if branch == "" { + branch = MainBranch + } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, operation).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, operation, branch).mergeOverwrite(&commitUUID) filesToDelete, filesToRewrite, err := t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency) if err != nil { @@ -938,7 +997,7 @@ func (t *Transaction) performMergeOnReadDeletion(ctx context.Context, snapshotPr } commitUUID := uuid.New() - updater := t.updateSnapshot(fs, snapshotProps, OpDelete).mergeOverwrite(&commitUUID) + updater := t.updateSnapshot(fs, snapshotProps, OpDelete, MainBranch).mergeOverwrite(&commitUUID) filesToDelete, withPartialDeletions, err := t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency) if err != nil { @@ -1019,7 +1078,7 @@ func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpressi } switch writeDeleteMode { case WriteModeCopyOnWrite: - updater, err = t.performCopyOnWriteDeletion(ctx, OpDelete, snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency) + updater, err = t.performCopyOnWriteDeletion(ctx, OpDelete, snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency, MainBranch) if err != nil { return err } diff --git a/table/transaction_test.go b/table/transaction_test.go index d356b4682..9bd3880a3 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -650,6 +650,94 @@ func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadPartitioned() { +----------+---------+---+`) } +// TestBranchWrites verifies that WithBranch writes to the given branch: main stays +// unchanged when appending to a branch, and scanning by ref returns the correct data. +func (s *SparkIntegrationTestSuite) TestBranchWrites() { + icebergSchema := iceberg.NewSchema(0, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32}, + iceberg.NestedField{ID: 2, Name: "value", Type: iceberg.PrimitiveTypes.String}, + ) + + tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default", "go_test_branch_writes"), icebergSchema) + s.Require().NoError(err) + + arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true, false) + s.Require().NoError(err) + + // 1) Append to main (default), commit + mainTable, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[{"id": 1, "value": "main-a"}, {"id": 2, "value": "main-b"}]`, + }) + s.Require().NoError(err) + defer mainTable.Release() + + tx := tbl.NewTransaction() + err = tx.AppendTable(s.ctx, mainTable, 2, nil) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + mainSnapID := tbl.CurrentSnapshot().SnapshotID + mainRows, err := tbl.Scan().ToArrowTable(s.ctx) + s.Require().NoError(err) + defer mainRows.Release() + s.Require().Equal(int64(2), mainRows.NumRows(), "main should have 2 rows") + + // 2) Append to branch "test-branch", commit + branchTable, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[{"id": 10, "value": "branch-x"}, {"id": 11, "value": "branch-y"}]`, + }) + s.Require().NoError(err) + defer branchTable.Release() + + tx = tbl.NewTransaction() + err = tx.AppendTable(s.ctx, branchTable, 2, nil, table.WithBranch("test-branch")) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + // 3) Main unchanged: same snapshot and row count + s.Require().Equal(mainSnapID, tbl.CurrentSnapshot().SnapshotID, "main ref should still point to first snapshot") + mainAfter, err := tbl.Scan().ToArrowTable(s.ctx) + s.Require().NoError(err) + defer mainAfter.Release() + s.Require().Equal(int64(2), mainAfter.NumRows(), "main should still have 2 rows after branch write") + + // 4) Branch has its own snapshot and the branch-only rows + branchScan, err := tbl.Scan().UseRef("test-branch") + s.Require().NoError(err) + branchRows, err := branchScan.ToArrowTable(s.ctx) + s.Require().NoError(err) + defer branchRows.Release() + s.Require().Equal(int64(2), branchRows.NumRows(), "test-branch should have 2 rows") + + // 5) Optional: append again to main; branch ref unchanged + moreMain, err := array.TableFromJSON(memory.DefaultAllocator, arrowSchema, []string{ + `[{"id": 3, "value": "main-c"}]`, + }) + s.Require().NoError(err) + defer moreMain.Release() + + tx = tbl.NewTransaction() + err = tx.AppendTable(s.ctx, moreMain, 1, nil) + s.Require().NoError(err) + tbl, err = tx.Commit(s.ctx) + s.Require().NoError(err) + + s.Require().NotEqual(mainSnapID, tbl.CurrentSnapshot().SnapshotID, "main should advance to new snapshot") + mainFinal, err := tbl.Scan().ToArrowTable(s.ctx) + s.Require().NoError(err) + defer mainFinal.Release() + s.Require().Equal(int64(3), mainFinal.NumRows(), "main should have 3 rows") + + branchAgain, err := tbl.Scan().UseRef("test-branch") + s.Require().NoError(err) + branchFinal, err := branchAgain.ToArrowTable(s.ctx) + s.Require().NoError(err) + defer branchFinal.Release() + s.Require().Equal(int64(2), branchFinal.NumRows(), "test-branch should still have 2 rows (isolation)") +} + func TestSparkIntegration(t *testing.T) { suite.Run(t, new(SparkIntegrationTestSuite)) }