From 19e8dee55920505277ac38c2e2f30460ac2e8a27 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Mon, 9 Mar 2026 15:16:27 -0700 Subject: [PATCH 01/10] feat: Add metadata-only replace API to Table for REPLACE snapshot operations Fixes #3130 --- pyiceberg/table/__init__.py | 53 +++++++++++++++++ pyiceberg/table/snapshots.py | 2 +- pyiceberg/table/update/snapshot.py | 83 ++++++++++++++++++++++++++- tests/table/test_replace.py | 92 ++++++++++++++++++++++++++++++ 4 files changed, 228 insertions(+), 2 deletions(-) create mode 100644 tests/table/test_replace.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 68089beb54..08e3ada8bb 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -450,6 +450,32 @@ def update_statistics(self) -> UpdateStatistics: """ return UpdateStatistics(transaction=self) + def replace( + self, + files_to_delete: Iterable[DataFile], + files_to_add: Iterable[DataFile], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand for replacing existing files with new files. + + A replace will produce a REPLACE snapshot that will ignore existing + files and replace them with the new files. + + Args: + files_to_delete: The files to delete + files_to_add: The new files to add that replace the deleted files + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the replace operation + """ + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).replace() as replace_snapshot: + for file_to_delete in files_to_delete: + replace_snapshot.delete_data_file(file_to_delete) + + for data_file in files_to_add: + replace_snapshot.append_data_file(data_file) + def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. @@ -1384,6 +1410,33 @@ def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) + def replace( + self, + files_to_delete: Iterable[DataFile], + files_to_add: Iterable[DataFile], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand for replacing existing files with new files. + + A replace will produce a REPLACE snapshot that will ignore existing + files and replace them with the new files. + + Args: + files_to_delete: The files to delete + files_to_add: The new files to add that replace the deleted files + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch Reference to run the replace operation + """ + with self.transaction() as tx: + tx.replace( + files_to_delete=files_to_delete, + files_to_add=files_to_add, + snapshot_properties=snapshot_properties, + branch=branch, + ) + def dynamic_partition_overwrite( self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH ) -> None: diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 7e4c6eb1ec..7bd4597399 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -344,7 +344,7 @@ def _partition_summary(self, update_metrics: UpdateMetrics) -> str: def update_snapshot_summaries(summary: Summary, previous_summary: Mapping[str, str] | None = None) -> Summary: - if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}: raise ValueError(f"Operation not implemented: {summary.operation}") if not previous_summary: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 37d120969a..cc6792398a 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -666,6 +666,81 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: else: return [] +class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]): + """A snapshot producer that rewrites data files.""" + + def __init__(self, operation: Operation, transaction: Transaction, io: FileIO, snapshot_properties: dict[str, str]): + super().__init__(operation, transaction, io, snapshot_properties) + + def _commit(self) -> UpdatesAndRequirements: + # Only produce a commit when there is something to rewrite + if self._deleted_data_files or self._added_data_files: + return super()._commit() + else: + return (), () + + def _deleted_entries(self) -> list[ManifestEntry]: + """Check if we need to mark the files as deleted.""" + if self._parent_snapshot_id is not None: + previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if previous_snapshot is None: + raise ValueError(f"Could not find the previous snapshot: {self._parent_snapshot_id}") + + executor = ExecutorFactory.get_or_create() + + def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + for entry in manifest.fetch_manifest_entry(self._io, discard_deleted=True) + if entry.data_file.content == DataFileContent.DATA and entry.data_file in self._deleted_data_files + ] + + list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io)) + return list(itertools.chain(*list_of_entries)) + else: + return [] + + def _existing_manifests(self) -> list[ManifestFile]: + """To determine if there are any existing manifests.""" + existing_files = [] + if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch): + for manifest_file in snapshot.manifests(io=self._io): + entries_to_write: set[ManifestEntry] = set() + found_deleted_entries: set[ManifestEntry] = set() + + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if entry.data_file in self._deleted_data_files: + found_deleted_entries.add(entry) + else: + entries_to_write.add(entry) + + if len(found_deleted_entries) == 0: + existing_files.append(manifest_file) + continue + + if len(entries_to_write) == 0: + continue + + with self.new_manifest_writer(self.spec(manifest_file.partition_spec_id)) as writer: + for entry in entries_to_write: + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + sequence_number=entry.sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + existing_files.append(writer.to_manifest_file()) + return existing_files + class UpdateSnapshot: _transaction: Transaction @@ -724,7 +799,13 @@ def delete(self) -> _DeleteFiles: snapshot_properties=self._snapshot_properties, ) - + def replace(self) -> _RewriteFiles: + return _RewriteFiles( + operation=Operation.REPLACE, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + ) class _ManifestMergeManager(Generic[U]): _target_size_bytes: int _min_count_to_merge: int diff --git a/tests/table/test_replace.py b/tests/table/test_replace.py new file mode 100644 index 0000000000..3f8835dca0 --- /dev/null +++ b/tests/table/test_replace.py @@ -0,0 +1,92 @@ +import uuid +import pytest +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat +from pyiceberg.table.snapshots import Operation +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.typedef import Record + +def test_replace_api(catalog): + # Setup a basic table using the catalog fixture + catalog.create_namespace("default") + table = catalog.create_table( + identifier="default.test_replace", + schema=Schema(), + ) + + # Create mock DataFiles for the test + file_to_delete = DataFile.from_args( + file_path="s3://bucket/test/data/deleted.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1024, + content=DataFileContent.DATA, + ) + file_to_delete.spec_id = 0 + + file_to_add = DataFile.from_args( + file_path="s3://bucket/test/data/added.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1024, + content=DataFileContent.DATA, + ) + file_to_add.spec_id = 0 + + # Initially append to have something to replace + with table.transaction() as tx: + with tx.update_snapshot().fast_append() as append_snapshot: + append_snapshot.append_data_file(file_to_delete) + + # Verify initial append snapshot + assert len(table.history()) == 1 + snapshot = table.current_snapshot() + assert snapshot.summary["operation"] == Operation.APPEND + + # Call the replace API + table.replace( + files_to_delete=[file_to_delete], + files_to_add=[file_to_add] + ) + + # Verify the replacement created a REPLACE snapshot + assert len(table.history()) == 2 + snapshot = table.current_snapshot() + assert snapshot.summary["operation"] == Operation.REPLACE + + # Verify the correct files are added and deleted + # The summary property tracks these counts + assert snapshot.summary["added-data-files"] == "1" + assert snapshot.summary["deleted-data-files"] == "1" + assert snapshot.summary["added-records"] == "100" + assert snapshot.summary["deleted-records"] == "100" + + # Verify the new file exists in the new manifest + manifest_files = snapshot.manifests(table.io) + assert len(manifest_files) == 2 # One for ADDED, one for DELETED + + # Check that sequence numbers were handled properly natively by verifying the manifest contents + entries = [] + for manifest in manifest_files: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + entries.append(entry) + + # One entry for ADDED (new file), one for DELETED (old file) + assert len(entries) == 2 + +def test_replace_empty_files(catalog): + # Setup a basic table using the catalog fixture + catalog.create_namespace("default") + table = catalog.create_table( + identifier="default.test_replace_empty", + schema=Schema(), + ) + + # Replacing empty lists should not throw errors, but should produce no changes. + table.replace([], []) + + # History should be completely empty since no files were rewritten + assert len(table.history()) == 0 + assert table.current_snapshot() is None From 6800bb49c69ad9c848c7d8dd6a328b22284372d0 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Mon, 9 Mar 2026 15:40:53 -0700 Subject: [PATCH 02/10] chore: fix linter and mypy errors in replace API and tests - Fixed positional argument type mismatch for `snapshot_properties` in [_RewriteFiles](iceberg-python/pyiceberg/table/update/snapshot.py) - Added missing `Catalog` type annotations to pytest fixtures in [test_replace.py](iceberg-python/tests/table/test_replace.py) - Added strict `is not None` assertions for `table.current_snapshot()` to satisfy mypy Optional checking - Auto-formatted tests with ruff --- pyiceberg/table/update/snapshot.py | 5 +++- tests/table/test_replace.py | 47 +++++++++++++++--------------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index cc6792398a..0157a40eb8 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -666,11 +666,12 @@ def _get_entries(manifest: ManifestFile) -> list[ManifestEntry]: else: return [] + class _RewriteFiles(_SnapshotProducer["_RewriteFiles"]): """A snapshot producer that rewrites data files.""" def __init__(self, operation: Operation, transaction: Transaction, io: FileIO, snapshot_properties: dict[str, str]): - super().__init__(operation, transaction, io, snapshot_properties) + super().__init__(operation, transaction, io, snapshot_properties=snapshot_properties) def _commit(self) -> UpdatesAndRequirements: # Only produce a commit when there is something to rewrite @@ -806,6 +807,8 @@ def replace(self) -> _RewriteFiles: io=self._io, snapshot_properties=self._snapshot_properties, ) + + class _ManifestMergeManager(Generic[U]): _target_size_bytes: int _min_count_to_merge: int diff --git a/tests/table/test_replace.py b/tests/table/test_replace.py index 3f8835dca0..a1cdc79bc4 100644 --- a/tests/table/test_replace.py +++ b/tests/table/test_replace.py @@ -1,19 +1,18 @@ -import uuid -import pytest +from pyiceberg.catalog import Catalog from pyiceberg.manifest import DataFile, DataFileContent, FileFormat -from pyiceberg.table.snapshots import Operation -from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table.snapshots import Operation from pyiceberg.typedef import Record -def test_replace_api(catalog): + +def test_replace_api(catalog: Catalog) -> None: # Setup a basic table using the catalog fixture catalog.create_namespace("default") table = catalog.create_table( identifier="default.test_replace", schema=Schema(), ) - + # Create mock DataFiles for the test file_to_delete = DataFile.from_args( file_path="s3://bucket/test/data/deleted.parquet", @@ -24,7 +23,7 @@ def test_replace_api(catalog): content=DataFileContent.DATA, ) file_to_delete.spec_id = 0 - + file_to_add = DataFile.from_args( file_path="s3://bucket/test/data/added.parquet", file_format=FileFormat.PARQUET, @@ -34,28 +33,29 @@ def test_replace_api(catalog): content=DataFileContent.DATA, ) file_to_add.spec_id = 0 - + # Initially append to have something to replace with table.transaction() as tx: with tx.update_snapshot().fast_append() as append_snapshot: append_snapshot.append_data_file(file_to_delete) - + # Verify initial append snapshot assert len(table.history()) == 1 snapshot = table.current_snapshot() + assert snapshot is not None + assert snapshot.summary is not None assert snapshot.summary["operation"] == Operation.APPEND - + # Call the replace API - table.replace( - files_to_delete=[file_to_delete], - files_to_add=[file_to_add] - ) - + table.replace(files_to_delete=[file_to_delete], files_to_add=[file_to_add]) + # Verify the replacement created a REPLACE snapshot assert len(table.history()) == 2 snapshot = table.current_snapshot() + assert snapshot is not None + assert snapshot.summary is not None assert snapshot.summary["operation"] == Operation.REPLACE - + # Verify the correct files are added and deleted # The summary property tracks these counts assert snapshot.summary["added-data-files"] == "1" @@ -65,28 +65,29 @@ def test_replace_api(catalog): # Verify the new file exists in the new manifest manifest_files = snapshot.manifests(table.io) - assert len(manifest_files) == 2 # One for ADDED, one for DELETED - + assert len(manifest_files) == 2 # One for ADDED, one for DELETED + # Check that sequence numbers were handled properly natively by verifying the manifest contents entries = [] for manifest in manifest_files: for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): entries.append(entry) - + # One entry for ADDED (new file), one for DELETED (old file) assert len(entries) == 2 - -def test_replace_empty_files(catalog): + + +def test_replace_empty_files(catalog: Catalog) -> None: # Setup a basic table using the catalog fixture catalog.create_namespace("default") table = catalog.create_table( identifier="default.test_replace_empty", schema=Schema(), ) - + # Replacing empty lists should not throw errors, but should produce no changes. table.replace([], []) - + # History should be completely empty since no files were rewritten assert len(table.history()) == 0 assert table.current_snapshot() is None From e5e11b9b09997c0e90144ae1cf610b445f0cb2e7 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Mon, 9 Mar 2026 15:55:15 -0700 Subject: [PATCH 03/10] test: fix invalid operation assertion and add license headers --- tests/table/test_replace.py | 16 ++++++++++++++++ tests/table/test_snapshots.py | 4 ++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/tests/table/test_replace.py b/tests/table/test_replace.py index a1cdc79bc4..60f270c0a7 100644 --- a/tests/table/test_replace.py +++ b/tests/table/test_replace.py @@ -1,3 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. from pyiceberg.catalog import Catalog from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.schema import Schema diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index cfdc516227..c0f76ae316 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -398,8 +398,8 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: def test_invalid_operation() -> None: with pytest.raises(ValueError) as e: - update_snapshot_summaries(summary=Summary(Operation.REPLACE)) - assert "Operation not implemented: Operation.REPLACE" in str(e.value) + update_snapshot_summaries(summary=Summary(Operation("invalid"))) # type: ignore + assert "Operation not implemented: Operation" in str(e.value) def test_invalid_type() -> None: From 070c8859a0ef9a7ded7cdc9225aef3b6b3722bb8 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Mon, 9 Mar 2026 16:05:18 -0700 Subject: [PATCH 04/10] chore: Remove comment for linter --- tests/table/test_snapshots.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index c0f76ae316..2693d8517f 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -398,7 +398,7 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: def test_invalid_operation() -> None: with pytest.raises(ValueError) as e: - update_snapshot_summaries(summary=Summary(Operation("invalid"))) # type: ignore + update_snapshot_summaries(summary=Summary(Operation("invalid"))) assert "Operation not implemented: Operation" in str(e.value) From f12fa5df2b229d839a6acddcdf15086141f10b78 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Mon, 9 Mar 2026 16:15:14 -0700 Subject: [PATCH 05/10] test: fix invalid operation assertion by using model_construct to bypass enum validation (Operation.REPLACE is valid so we can no longer use it in test_invalid_operation) --- tests/table/test_snapshots.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 2693d8517f..7f78a7546d 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -398,8 +398,8 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: def test_invalid_operation() -> None: with pytest.raises(ValueError) as e: - update_snapshot_summaries(summary=Summary(Operation("invalid"))) - assert "Operation not implemented: Operation" in str(e.value) + update_snapshot_summaries(summary=Summary.model_construct(operation="unknown_operation")) + assert "Operation not implemented: unknown_operation" in str(e.value) def test_invalid_type() -> None: From 94bd87e1d64f8ee524dd46b789e90eacbe471076 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 27 Mar 2026 13:09:33 -0700 Subject: [PATCH 06/10] fix: Remove replace as a public function for the Transaction and Table classes --- pyiceberg/table/__init__.py | 53 ------------------------------------- 1 file changed, 53 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 08e3ada8bb..68089beb54 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -450,32 +450,6 @@ def update_statistics(self) -> UpdateStatistics: """ return UpdateStatistics(transaction=self) - def replace( - self, - files_to_delete: Iterable[DataFile], - files_to_add: Iterable[DataFile], - snapshot_properties: dict[str, str] = EMPTY_DICT, - branch: str | None = MAIN_BRANCH, - ) -> None: - """ - Shorthand for replacing existing files with new files. - - A replace will produce a REPLACE snapshot that will ignore existing - files and replace them with the new files. - - Args: - files_to_delete: The files to delete - files_to_add: The new files to add that replace the deleted files - snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the replace operation - """ - with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).replace() as replace_snapshot: - for file_to_delete in files_to_delete: - replace_snapshot.delete_data_file(file_to_delete) - - for data_file in files_to_add: - replace_snapshot.append_data_file(data_file) - def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None: """ Shorthand API for appending a PyArrow table to a table transaction. @@ -1410,33 +1384,6 @@ def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch) - def replace( - self, - files_to_delete: Iterable[DataFile], - files_to_add: Iterable[DataFile], - snapshot_properties: dict[str, str] = EMPTY_DICT, - branch: str | None = MAIN_BRANCH, - ) -> None: - """ - Shorthand for replacing existing files with new files. - - A replace will produce a REPLACE snapshot that will ignore existing - files and replace them with the new files. - - Args: - files_to_delete: The files to delete - files_to_add: The new files to add that replace the deleted files - snapshot_properties: Custom properties to be added to the snapshot summary - branch: Branch Reference to run the replace operation - """ - with self.transaction() as tx: - tx.replace( - files_to_delete=files_to_delete, - files_to_add=files_to_add, - snapshot_properties=snapshot_properties, - branch=branch, - ) - def dynamic_partition_overwrite( self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH ) -> None: From a2f2b18c2f63b19e2d0e017e81ad4f322ffafa15 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 27 Mar 2026 16:53:24 -0700 Subject: [PATCH 07/10] test: Update tests to reflect stricter requirements for REPLACE --- tests/table/test_replace.py | 387 +++++++++++++++++++++++++++++++++--- 1 file changed, 359 insertions(+), 28 deletions(-) diff --git a/tests/table/test_replace.py b/tests/table/test_replace.py index 60f270c0a7..eeec588547 100644 --- a/tests/table/test_replace.py +++ b/tests/table/test_replace.py @@ -15,13 +15,13 @@ # specific language governing permissions and limitations # under the License. from pyiceberg.catalog import Catalog -from pyiceberg.manifest import DataFile, DataFileContent, FileFormat +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntryStatus from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation from pyiceberg.typedef import Record -def test_replace_api(catalog: Catalog) -> None: +def test_replace_internally(catalog: Catalog) -> None: # Setup a basic table using the catalog fixture catalog.create_namespace("default") table = catalog.create_table( @@ -29,7 +29,7 @@ def test_replace_api(catalog: Catalog) -> None: schema=Schema(), ) - # Create mock DataFiles for the test + # 1. File we will delete file_to_delete = DataFile.from_args( file_path="s3://bucket/test/data/deleted.parquet", file_format=FileFormat.PARQUET, @@ -40,6 +40,18 @@ def test_replace_api(catalog: Catalog) -> None: ) file_to_delete.spec_id = 0 + # 2. File we will leave completely untouched + file_to_keep = DataFile.from_args( + file_path="s3://bucket/test/data/kept.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=50, + file_size_in_bytes=512, + content=DataFileContent.DATA, + ) + file_to_keep.spec_id = 0 + + # 3. File we are adding as a replacement file_to_add = DataFile.from_args( file_path="s3://bucket/test/data/added.parquet", file_format=FileFormat.PARQUET, @@ -50,47 +62,163 @@ def test_replace_api(catalog: Catalog) -> None: ) file_to_add.spec_id = 0 - # Initially append to have something to replace + # Initially append BOTH the file to delete and the file to keep with table.transaction() as tx: with tx.update_snapshot().fast_append() as append_snapshot: append_snapshot.append_data_file(file_to_delete) + append_snapshot.append_data_file(file_to_keep) - # Verify initial append snapshot - assert len(table.history()) == 1 - snapshot = table.current_snapshot() - assert snapshot is not None - assert snapshot.summary is not None - assert snapshot.summary["operation"] == Operation.APPEND + old_snapshot = table.current_snapshot() + old_snapshot_id = old_snapshot.snapshot_id + old_sequence_number = old_snapshot.sequence_number - # Call the replace API - table.replace(files_to_delete=[file_to_delete], files_to_add=[file_to_add]) + # Call the internal replace API + with table.transaction() as tx: + with tx.update_snapshot().replace() as rewrite: + rewrite.delete_data_file(file_to_delete) + rewrite.append_data_file(file_to_add) - # Verify the replacement created a REPLACE snapshot - assert len(table.history()) == 2 snapshot = table.current_snapshot() - assert snapshot is not None - assert snapshot.summary is not None + + # 1. Has a unique snapshot ID + assert snapshot.snapshot_id is not None + assert snapshot.snapshot_id != old_snapshot_id + + # 2. Parent points to the previous snapshot + assert snapshot.parent_snapshot_id == old_snapshot_id + + # 3. Sequence number is exactly previous + 1 + assert snapshot.sequence_number == old_sequence_number + 1 + + # 4. Operation type is set to "replace" assert snapshot.summary["operation"] == Operation.REPLACE - - # Verify the correct files are added and deleted - # The summary property tracks these counts + + # 5. Manifest list path is correct (just verify it exists and is a string path) + assert snapshot.manifest_list is not None + assert isinstance(snapshot.manifest_list, str) + + # 6. Summary counts are accurate assert snapshot.summary["added-data-files"] == "1" assert snapshot.summary["deleted-data-files"] == "1" assert snapshot.summary["added-records"] == "100" assert snapshot.summary["deleted-records"] == "100" + assert snapshot.summary["total-records"] == "150" - # Verify the new file exists in the new manifest + # Fetch all entries from the new manifests manifest_files = snapshot.manifests(table.io) - assert len(manifest_files) == 2 # One for ADDED, one for DELETED - - # Check that sequence numbers were handled properly natively by verifying the manifest contents entries = [] for manifest in manifest_files: - for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): - entries.append(entry) + entries.extend(manifest.fetch_manifest_entry(table.io, discard_deleted=False)) + + # We expect 3 entries: ADDED, DELETED, and EXISTING + assert len(entries) == 3 + + # Check ADDED + added_entries = [e for e in entries if e.status == ManifestEntryStatus.ADDED] + assert len(added_entries) == 1 + assert added_entries[0].data_file.file_path == file_to_add.file_path + assert added_entries[0].snapshot_id == snapshot.snapshot_id + + # Check DELETED + deleted_entries = [e for e in entries if e.status == ManifestEntryStatus.DELETED] + assert len(deleted_entries) == 1 + assert deleted_entries[0].data_file.file_path == file_to_delete.file_path + assert deleted_entries[0].snapshot_id == snapshot.snapshot_id + + # Check EXISTING + existing_entries = [e for e in entries if e.status == ManifestEntryStatus.EXISTING] + assert len(existing_entries) == 1 + assert existing_entries[0].data_file.file_path == file_to_keep.file_path + assert existing_entries[0].snapshot_id == old_snapshot_id + + +def test_replace_reuses_unaffected_manifests(catalog: Catalog) -> None: + # Setup a basic table + catalog.create_namespace("default") + table = catalog.create_table( + identifier="default.test_replace_reuse_manifest", + schema=Schema(), + ) + + file_a = DataFile.from_args( + file_path="s3://bucket/test/data/a.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + content=DataFileContent.DATA, + ) + file_a.spec_id = 0 + + file_b = DataFile.from_args( + file_path="s3://bucket/test/data/b.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + content=DataFileContent.DATA, + ) + file_b.spec_id = 0 + + file_c = DataFile.from_args( + file_path="s3://bucket/test/data/c.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + content=DataFileContent.DATA, + ) + file_c.spec_id = 0 + + # Commit 1: Append file A (Creates Manifest 1) + with table.transaction() as tx: + with tx.update_snapshot().fast_append() as append_snapshot: + append_snapshot.append_data_file(file_a) + + # Commit 2: Append file B (Creates Manifest 2) + with table.transaction() as tx: + with tx.update_snapshot().fast_append() as append_snapshot: + append_snapshot.append_data_file(file_b) - # One entry for ADDED (new file), one for DELETED (old file) - assert len(entries) == 2 + snapshot_before = table.current_snapshot() + manifests_before = snapshot_before.manifests(table.io) + assert len(manifests_before) == 2 + + # Identify which manifest belongs to file_b and file_a + manifest_b_path = None + manifest_a_path = None + for m in manifests_before: + entries = m.fetch_manifest_entry(table.io, discard_deleted=False) + if any(e.data_file.file_path == file_b.file_path for e in entries): + manifest_b_path = m.manifest_path + if any(e.data_file.file_path == file_a.file_path for e in entries): + manifest_a_path = m.manifest_path + + assert manifest_b_path is not None + assert manifest_a_path is not None + + # Commit 3: Replace file A with file C + with table.transaction() as tx: + with tx.update_snapshot().replace() as rewrite: + rewrite.delete_data_file(file_a) + rewrite.append_data_file(file_c) + + snapshot_after = table.current_snapshot() + manifests_after = snapshot_after.manifests(table.io) + + # We expect 3 manifests: + # 1. The reused one for file B + # 2. The newly rewritten one marking file A as DELETED + # 3. The new one for file C (ADDED) + assert len(manifests_after) == 3 + + manifest_paths_after = [m.manifest_path for m in manifests_after] + + # ASSERTION 1: The untouched manifest is completely reused (the path matches exactly) + assert manifest_b_path in manifest_paths_after + + # ASSERTION 2: File A's old manifest is NOT reused (since it was rewritten to change status to DELETED) + assert manifest_a_path not in manifest_paths_after def test_replace_empty_files(catalog: Catalog) -> None: @@ -102,8 +230,211 @@ def test_replace_empty_files(catalog: Catalog) -> None: ) # Replacing empty lists should not throw errors, but should produce no changes. - table.replace([], []) + with table.transaction() as tx: + with tx.update_snapshot().replace(): + pass # Entering and exiting the context manager without adding/deleting # History should be completely empty since no files were rewritten assert len(table.history()) == 0 assert table.current_snapshot() is None + + +def test_replace_missing_file_abort(catalog: Catalog) -> None: + # Setup a basic table + catalog.create_namespace("default") + table = catalog.create_table( + identifier="default.test_replace_missing", + schema=Schema(), + ) + + fake_data_file = DataFile.from_args( + file_path="s3://bucket/test/data/does_not_exist.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1024, + content=DataFileContent.DATA, + ) + fake_data_file.spec_id = 0 + + new_data_file = DataFile.from_args( + file_path="s3://bucket/test/data/new.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1024, + content=DataFileContent.DATA, + ) + new_data_file.spec_id = 0 + + # Ensure it aborts when trying to replace a file that isn't in the table + with pytest.raises(ValueError, match="Cannot delete files that are not present in the table"): + with table.transaction() as tx: + with tx.update_snapshot().replace() as rewrite: + rewrite.delete_data_file(fake_data_file) + rewrite.append_data_file(new_data_file) + + +def test_replace_invariant_violation(catalog: Catalog) -> None: + # Setup a basic table + catalog.create_namespace("default") + table = catalog.create_table( + identifier="default.test_replace_invariant", + schema=Schema(), + ) + + file_to_delete = DataFile.from_args( + file_path="s3://bucket/test/data/deleted.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1024, + content=DataFileContent.DATA, + ) + file_to_delete.spec_id = 0 + + # Create a new file with MORE records than the one we are deleting + too_many_records_file = DataFile.from_args( + file_path="s3://bucket/test/data/too_many.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=101, + file_size_in_bytes=1024, + content=DataFileContent.DATA, + ) + too_many_records_file.spec_id = 0 + + # Initially append to have something to replace + with table.transaction() as tx: + with tx.update_snapshot().fast_append() as append_snapshot: + append_snapshot.append_data_file(file_to_delete) + + # Ensure it enforces the invariant: records added <= records removed + with pytest.raises(ValueError, match=r"Invalid replace: records added \(101\) exceeds records removed \(100\)"): + with table.transaction() as tx: + with tx.update_snapshot().replace() as rewrite: + rewrite.delete_data_file(file_to_delete) + rewrite.append_data_file(too_many_records_file) + + +def test_replace_allows_shrinking_for_soft_deletes(catalog: Catalog) -> None: + # Setup a basic table + catalog.create_namespace("default") + table = catalog.create_table( + identifier="default.test_replace_shrink", + schema=Schema(), + ) + + # Old data file has 100 records + file_to_delete = DataFile.from_args( + file_path="s3://bucket/test/data/deleted.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1024, + content=DataFileContent.DATA, + ) + file_to_delete.spec_id = 0 + + # New data file only has 90 records (simulating 10 records were soft-deleted) + shrunk_file_to_add = DataFile.from_args( + file_path="s3://bucket/test/data/shrunk.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=90, + file_size_in_bytes=900, + content=DataFileContent.DATA, + ) + shrunk_file_to_add.spec_id = 0 + + # Initially append + with table.transaction() as tx: + with tx.update_snapshot().fast_append() as append_snapshot: + append_snapshot.append_data_file(file_to_delete) + + # This should succeed without throwing an invariant violation + with table.transaction() as tx: + with tx.update_snapshot().replace() as rewrite: + rewrite.delete_data_file(file_to_delete) + rewrite.append_data_file(shrunk_file_to_add) + + snapshot = table.current_snapshot() + assert snapshot.summary["operation"] == Operation.REPLACE + assert snapshot.summary["added-records"] == "90" + assert snapshot.summary["deleted-records"] == "100" + + +def test_replace_passes_through_delete_manifests(catalog: Catalog) -> None: + # Setup a basic table + catalog.create_namespace("default") + table = catalog.create_table( + identifier="default.test_replace_delete_manifests", + schema=Schema(), + ) + + # 1. Data file we will replace + file_a = DataFile.from_args( + file_path="s3://bucket/test/data/a.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + content=DataFileContent.DATA, + ) + file_a.spec_id = 0 + + # 2. A Position Delete file (representing row-level deletes) + file_a_deletes = DataFile.from_args( + file_path="s3://bucket/test/data/a_deletes.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=2, + file_size_in_bytes=50, + content=DataFileContent.POSITION_DELETES, + ) + file_a_deletes.spec_id = 0 + + # 3. Data file we are adding as a replacement + file_b = DataFile.from_args( + file_path="s3://bucket/test/data/b.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=10, + file_size_in_bytes=100, + content=DataFileContent.DATA, + ) + file_b.spec_id = 0 + + # Commit 1: Append the data file + with table.transaction() as tx: + with tx.update_snapshot().fast_append() as append_snapshot: + append_snapshot.append_data_file(file_a) + + # Commit 2: Append the delete file + with table.transaction() as tx: + with tx.update_snapshot().fast_append() as append_snapshot: + append_snapshot.append_data_file(file_a_deletes) + + # Find the path of the delete manifest so we can verify it survives + snapshot_before = table.current_snapshot() + manifests_before = snapshot_before.manifests(table.io) + + delete_manifest_path = None + for m in manifests_before: + if m.content == ManifestContent.DELETES: + delete_manifest_path = m.manifest_path + + assert delete_manifest_path is not None + + # Commit 3: Replace data file A with data file B + with table.transaction() as tx: + with tx.update_snapshot().replace() as rewrite: + rewrite.delete_data_file(file_a) + rewrite.append_data_file(file_b) + + # Verify the delete manifest was passed through unchanged + snapshot_after = table.current_snapshot() + manifests_after = snapshot_after.manifests(table.io) + manifest_paths_after = [m.manifest_path for m in manifests_after] + + assert delete_manifest_path in manifest_paths_after \ No newline at end of file From 356d704e2d0223941dcec65f816f8f83ab764047 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 27 Mar 2026 17:27:24 -0700 Subject: [PATCH 08/10] test: Fix import errors and mypy exception --- tests/table/test_replace.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/table/test_replace.py b/tests/table/test_replace.py index eeec588547..29ac2744c5 100644 --- a/tests/table/test_replace.py +++ b/tests/table/test_replace.py @@ -14,9 +14,16 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import pytest from pyiceberg.catalog import Catalog -from pyiceberg.manifest import DataFile, DataFileContent, FileFormat, ManifestEntryStatus -from pyiceberg.schema import Schema +from pyiceberg.manifest import ( + DataFile, + DataFileContent, + FileFormat, + ManifestContent, + ManifestEntry, + ManifestEntryStatus, +)from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation from pyiceberg.typedef import Record @@ -204,6 +211,7 @@ def test_replace_reuses_unaffected_manifests(catalog: Catalog) -> None: rewrite.append_data_file(file_c) snapshot_after = table.current_snapshot() + assert snapshot_after is not None manifests_after = snapshot_after.manifests(table.io) # We expect 3 manifests: @@ -434,6 +442,7 @@ def test_replace_passes_through_delete_manifests(catalog: Catalog) -> None: # Verify the delete manifest was passed through unchanged snapshot_after = table.current_snapshot() + assert snapshot_after is not None manifests_after = snapshot_after.manifests(table.io) manifest_paths_after = [m.manifest_path for m in manifests_after] From 87f6848f85e25606ecbc7e614036ad86f4f11932 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 27 Mar 2026 18:37:59 -0700 Subject: [PATCH 09/10] test: Fix syntax error --- tests/table/test_replace.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/table/test_replace.py b/tests/table/test_replace.py index 29ac2744c5..0ce7675ed5 100644 --- a/tests/table/test_replace.py +++ b/tests/table/test_replace.py @@ -23,7 +23,8 @@ ManifestContent, ManifestEntry, ManifestEntryStatus, -)from pyiceberg.schema import Schema +) +from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation from pyiceberg.typedef import Record From 41eb549d77ee239e0f0d0012c6a0cfa70bc62003 Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Fri, 27 Mar 2026 18:55:42 -0700 Subject: [PATCH 10/10] test: Fix cast errors --- tests/table/test_replace.py | 46 ++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/tests/table/test_replace.py b/tests/table/test_replace.py index 0ce7675ed5..9a3a4b247e 100644 --- a/tests/table/test_replace.py +++ b/tests/table/test_replace.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import cast import pytest from pyiceberg.catalog import Catalog from pyiceberg.manifest import ( @@ -25,7 +26,7 @@ ManifestEntryStatus, ) from pyiceberg.schema import Schema -from pyiceberg.table.snapshots import Operation +from pyiceberg.table.snapshots import Operation, Snapshot, Summary from pyiceberg.typedef import Record @@ -76,9 +77,9 @@ def test_replace_internally(catalog: Catalog) -> None: append_snapshot.append_data_file(file_to_delete) append_snapshot.append_data_file(file_to_keep) - old_snapshot = table.current_snapshot() + old_snapshot = cast(Snapshot, table.current_snapshot()) old_snapshot_id = old_snapshot.snapshot_id - old_sequence_number = old_snapshot.sequence_number + old_sequence_number = cast(int, old_snapshot.sequence_number) # Call the internal replace API with table.transaction() as tx: @@ -86,8 +87,9 @@ def test_replace_internally(catalog: Catalog) -> None: rewrite.delete_data_file(file_to_delete) rewrite.append_data_file(file_to_add) - snapshot = table.current_snapshot() - + snapshot = cast(Snapshot, table.current_snapshot()) + summary = cast(Summary, snapshot.summary) + # 1. Has a unique snapshot ID assert snapshot.snapshot_id is not None assert snapshot.snapshot_id != old_snapshot_id @@ -99,22 +101,22 @@ def test_replace_internally(catalog: Catalog) -> None: assert snapshot.sequence_number == old_sequence_number + 1 # 4. Operation type is set to "replace" - assert snapshot.summary["operation"] == Operation.REPLACE + assert summary["operation"] == Operation.REPLACE # 5. Manifest list path is correct (just verify it exists and is a string path) assert snapshot.manifest_list is not None assert isinstance(snapshot.manifest_list, str) # 6. Summary counts are accurate - assert snapshot.summary["added-data-files"] == "1" - assert snapshot.summary["deleted-data-files"] == "1" - assert snapshot.summary["added-records"] == "100" - assert snapshot.summary["deleted-records"] == "100" - assert snapshot.summary["total-records"] == "150" + assert summary["added-data-files"] == "1" + assert summary["deleted-data-files"] == "1" + assert summary["added-records"] == "100" + assert summary["deleted-records"] == "100" + assert summary["total-records"] == "150" # Fetch all entries from the new manifests manifest_files = snapshot.manifests(table.io) - entries = [] + entries: list[ManifestEntry] = [] for manifest in manifest_files: entries.extend(manifest.fetch_manifest_entry(table.io, discard_deleted=False)) @@ -188,7 +190,7 @@ def test_replace_reuses_unaffected_manifests(catalog: Catalog) -> None: with tx.update_snapshot().fast_append() as append_snapshot: append_snapshot.append_data_file(file_b) - snapshot_before = table.current_snapshot() + snapshot_before = cast(Snapshot, table.current_snapshot()) manifests_before = snapshot_before.manifests(table.io) assert len(manifests_before) == 2 @@ -211,7 +213,7 @@ def test_replace_reuses_unaffected_manifests(catalog: Catalog) -> None: rewrite.delete_data_file(file_a) rewrite.append_data_file(file_c) - snapshot_after = table.current_snapshot() + snapshot_after = cast(Snapshot, table.current_snapshot()) assert snapshot_after is not None manifests_after = snapshot_after.manifests(table.io) @@ -367,10 +369,12 @@ def test_replace_allows_shrinking_for_soft_deletes(catalog: Catalog) -> None: rewrite.delete_data_file(file_to_delete) rewrite.append_data_file(shrunk_file_to_add) - snapshot = table.current_snapshot() - assert snapshot.summary["operation"] == Operation.REPLACE - assert snapshot.summary["added-records"] == "90" - assert snapshot.summary["deleted-records"] == "100" + snapshot = cast(Snapshot, table.current_snapshot()) + summary = cast(Summary, snapshot.summary) + + assert summary["operation"] == Operation.REPLACE + assert summary["added-records"] == "90" + assert summary["deleted-records"] == "100" def test_replace_passes_through_delete_manifests(catalog: Catalog) -> None: @@ -425,9 +429,9 @@ def test_replace_passes_through_delete_manifests(catalog: Catalog) -> None: append_snapshot.append_data_file(file_a_deletes) # Find the path of the delete manifest so we can verify it survives - snapshot_before = table.current_snapshot() + snapshot_before = cast(Snapshot, table.current_snapshot()) manifests_before = snapshot_before.manifests(table.io) - + delete_manifest_path = None for m in manifests_before: if m.content == ManifestContent.DELETES: @@ -442,7 +446,7 @@ def test_replace_passes_through_delete_manifests(catalog: Catalog) -> None: rewrite.append_data_file(file_b) # Verify the delete manifest was passed through unchanged - snapshot_after = table.current_snapshot() + snapshot_after = cast(Snapshot, table.current_snapshot()) assert snapshot_after is not None manifests_after = snapshot_after.manifests(table.io) manifest_paths_after = [m.manifest_path for m in manifests_after]