diff --git a/src/pyseekdb/client/client_base.py b/src/pyseekdb/client/client_base.py index 88411e0..e458886 100644 --- a/src/pyseekdb/client/client_base.py +++ b/src/pyseekdb/client/client_base.py @@ -11,7 +11,7 @@ from abc import ABC, abstractmethod from collections.abc import Sequence from dataclasses import dataclass -from typing import Any +from typing import Any, ClassVar from pymysql.converters import escape_string @@ -1494,6 +1494,12 @@ def _fork_database_enabled(self) -> bool: logger.debug(f"db_type: {db_type}, version: {version}") return db_type.lower() == "seekdb" and version >= version_120 + def _diff_merge_enabled(self) -> bool: + db_type, version = self.detect_db_type_and_version() + version_120 = Version("1.2.0.0") + logger.debug(f"db_type: {db_type}, version: {version}") + return db_type.lower() == "seekdb" and version >= version_120 + def _get_collection_id(self, collection_name: str) -> str: collection_id_query_sql = f"SELECT COLLECTION_ID FROM `{CollectionNames.sdk_collections_table_name()}` WHERE COLLECTION_NAME = '{collection_name}'" collection_id_query_result = self._execute(collection_id_query_sql) @@ -1555,6 +1561,80 @@ def _collection_fork(self, collection: Collection, forked_name: str) -> None: raise ValueError(f"Failed to fork collection: {ex}") from ex logger.debug(f"✅ Successfully forked collection '{collection.name}' to '{forked_name}'") + _VALID_MERGE_STRATEGIES: ClassVar[set[str]] = {"FAIL", "THEIRS", "OURS"} + + def _validate_same_client(self, incoming: Collection, current: Collection) -> None: + """Ensure both collections belong to this client.""" + if incoming.client is not self: + raise ValueError( + f"Collection '{incoming.name}' belongs to a different client; " + "diff/merge requires both collections to be from the same client" + ) + if current.client is not self: + raise ValueError( + f"Collection '{current.name}' belongs to a different client; " + "diff/merge requires both collections to be from the same client" + ) + + def _collection_diff(self, incoming: Collection, current: Collection) -> list[dict[str, Any]]: + """ + Diff two collections, returning rows that differ between them. + + Args: + incoming: The incoming (comparison) collection + current: The current (baseline) collection + + Returns: + List of normalized diff result dicts. Empty list if no differences. + """ + if not self._diff_merge_enabled(): + raise ValueError("Diff is not enabled for this database (requires seekdb >= 1.2.0)") + + self._validate_same_client(incoming, current) + + incoming_table = self._get_collection_table_name(incoming.id, incoming.name) + current_table = self._get_collection_table_name(current.id, current.name) + + diff_sql = f"DIFF TABLE `{incoming_table}` AGAINST `{current_table}`" + conn = self._ensure_connection() + result = self._execute_query_with_cursor( + conn, diff_sql, params=[], use_context_manager=self._use_context_manager_for_cursor() + ) + logger.debug( + f"✅ Successfully diffed collection '{incoming.name}' against '{current.name}', " + f"found {len(result)} diff rows" + ) + return result + + def _collection_merge(self, incoming: Collection, current: Collection, strategy: str = "FAIL") -> None: + """ + Merge incoming collection into current collection. + + Args: + incoming: The incoming (source) collection + current: The current (target) collection + strategy: Conflict resolution strategy - FAIL, THEIRS, or OURS (default: FAIL) + """ + if not self._diff_merge_enabled(): + raise ValueError("Merge is not enabled for this database (requires seekdb >= 1.2.0)") + + strategy_upper = strategy.upper() + if strategy_upper not in self._VALID_MERGE_STRATEGIES: + raise ValueError( + f"Invalid merge strategy: '{strategy}'. Must be one of: {', '.join(sorted(self._VALID_MERGE_STRATEGIES))}" + ) + + self._validate_same_client(incoming, current) + + incoming_table = self._get_collection_table_name(incoming.id, incoming.name) + current_table = self._get_collection_table_name(current.id, current.name) + + merge_sql = f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY {strategy_upper}" + self._execute(merge_sql) + logger.debug( + f"✅ Successfully merged collection '{incoming.name}' into '{current.name}' with strategy {strategy_upper}" + ) + # ==================== Collection Internal Operations (Called by Collection) ==================== # These methods are called by Collection objects, different clients implement different logic diff --git a/src/pyseekdb/client/collection.py b/src/pyseekdb/client/collection.py index 5498283..8fbf4ed 100644 --- a/src/pyseekdb/client/collection.py +++ b/src/pyseekdb/client/collection.py @@ -155,6 +155,96 @@ def fork(self, forked_name: str) -> "Collection": collection = self._client.get_collection(forked_name, embedding_function=self._embedding_function) return collection + def diff(self, other: "Collection") -> list[dict[str, Any]]: + """ + Diff this collection against another collection (baseline). + + Compares rows by primary key. Returns rows that differ between the two collections: + conflicting rows (same key, different values), and rows unique to either collection. + Rows that are identical in both collections are not included. + + This is a read-only operation - neither collection is modified. + + Args: + other: The baseline collection to compare against. Must have the same + column definitions (column names, types, and order) and a primary key. + + Returns: + list[dict[str, Any]]: List of diff result rows. Each row is a dict containing + the table columns plus system columns indicating the source table and diff type. + Returns an empty list if the collections are identical. + + Raises: + ValueError: If diff is not enabled (requires seekdb >= 1.2.0), + or if the two collections have incompatible schemas. + + Note: + - Diff is only available for seekdb database version 1.2.0.0 or higher. + - Collections created via the standard SDK path (using ``create_collection()``) + are not supported, as seekdb does not support DIFF/MERGE TABLE on LOB column + types (``document``, ``embedding``, ``metadata``). Use plain non-LOB schemas instead. + + Examples: + .. code-block:: python + original = client.get_collection("baseline") + modified = original.fork("modified_copy") + + # Make changes to modified copy + modified.add(ids="new_id", embeddings=[1.0, 2.0, 3.0], documents="New doc") + + # See what changed + diff_rows = modified.diff(original) + for row in diff_rows: + print(row) + + """ + return self._client._collection_diff(incoming=self, current=other) + + def merge_into(self, target: "Collection", strategy: str = "FAIL") -> None: + """ + Merge this collection's changes into the target collection. + + Non-conflicting rows (rows unique to this collection) are inserted into the target. + Rows unique to the target are preserved. Conflicting rows (same primary key, + different values) are handled according to the specified strategy. + Merge does not delete any rows from the target. + + Args: + target: The target collection to merge into. Must have the same + column definitions (column names, types, and order) and a primary key. + strategy: Conflict resolution strategy. One of: + - ``"FAIL"`` (default): Raise an error and roll back if any conflicts exist. + - ``"THEIRS"``: Use this collection's values to overwrite the target on conflict. + - ``"OURS"``: Keep the target's values unchanged on conflict. + + Raises: + ValueError: If merge is not enabled (requires seekdb >= 1.2.0), + if the strategy is invalid, or if the two collections have + incompatible schemas. + + Note: + - Merge is only available for seekdb database version 1.2.0.0 or higher. + - The merge executes in a single transaction; on failure it rolls back entirely. + - In the SQL semantics, ``self`` is the "incoming" table and ``target`` is + the "current" table. + - Collections created via the standard SDK path (using ``create_collection()``) + are not supported, as seekdb does not support DIFF/MERGE TABLE on LOB column + types (``document``, ``embedding``, ``metadata``). Use plain non-LOB schemas instead. + + Examples: + .. code-block:: python + original = client.get_collection("main_data") + branch = original.fork("experiment") + + # Make changes in the branch + branch.add(ids="new_id", embeddings=[1.0, 2.0, 3.0], documents="New doc") + + # Merge branch changes back, using branch values on conflict + branch.merge_into(original, strategy="THEIRS") + + """ + self._client._collection_merge(incoming=self, current=target, strategy=strategy) + # ==================== DML Operations ==================== # All methods delegate to client's internal implementation diff --git a/tests/integration_tests/test_collection_diff.py b/tests/integration_tests/test_collection_diff.py new file mode 100644 index 0000000..bfd9b29 --- /dev/null +++ b/tests/integration_tests/test_collection_diff.py @@ -0,0 +1,240 @@ +""" +Integration tests for DIFF TABLE functionality. + +Tests the diff functionality against real databases using plain (non-LOB) tables, +since DIFF TABLE does not support LOB column types (e.g., vectors). + +Tests include: +- Diff between identical tables (no differences) +- Diff detecting conflicting rows (same key, different values) +- Diff detecting rows unique to either table +- Diff with empty tables +- Diff through Collection API with proper table naming +""" + +import contextlib +import logging +import time + +import pytest + +logger = logging.getLogger(__name__) + + +class TestCollectionDiff: + """Tests for DIFF TABLE using real database connections.""" + + def _is_diff_merge_enabled(self, client) -> bool: + """Check if diff/merge is enabled for the given client.""" + try: + return client._server._diff_merge_enabled() + except Exception: + logger.exception("Failed to check if diff/merge is enabled") + return False + + def _execute(self, client, sql): + """Execute SQL via the underlying server client.""" + return client._server._execute(sql) + + def _cleanup_tables(self, client, *table_names): + """Drop tables, ignoring errors.""" + for name in table_names: + with contextlib.suppress(Exception): + self._execute(client, f"DROP TABLE IF EXISTS `{name}`") + + def test_diff_identical_tables(self, db_client): + """ + Test diff between two identical tables returns empty result. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + copy_table = f"test_diff_copy_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute( + db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{copy_table}`") + + result = self._execute(db_client, f"DIFF TABLE `{copy_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff between identical tables: {len(diff_rows)} rows") + assert len(diff_rows) == 0, f"Expected 0 diff rows for identical tables, got {len(diff_rows)}" + finally: + self._cleanup_tables(db_client, copy_table, base_table) + + def test_diff_with_added_rows(self, db_client): + """ + Test diff detects rows unique to the incoming table. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_added_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute(db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + + # Add a new row to incoming + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + result = self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff with added rows: {len(diff_rows)} diff rows") + assert len(diff_rows) > 0, "Expected diff rows for table with added data" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_with_modified_rows(self, db_client): + """ + Test diff detects conflicting rows (same key, different values). + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_modified_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute( + db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + + # Modify a row in incoming + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + + result = self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff with modified rows: {len(diff_rows)} diff rows") + # Conflict produces 2 rows (one from each table) + assert len(diff_rows) == 2, f"Expected 2 diff rows for one modified row, got {len(diff_rows)}" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_with_deleted_rows(self, db_client): + """ + Test diff detects rows unique to the current (baseline) table. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_deleted_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute( + db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + + # Delete a row from incoming + self._execute(db_client, f"DELETE FROM `{incoming_table}` WHERE id = 2") + + result = self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff with deleted rows: {len(diff_rows)} diff rows") + assert len(diff_rows) > 0, "Expected diff rows when incoming has deleted data" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_is_readonly(self, db_client): + """ + Test that diff does not modify either table. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_ro_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute(db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + count_base_before = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{base_table}`") + count_incoming_before = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{incoming_table}`") + + # Perform diff + self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + + count_base_after = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{base_table}`") + count_incoming_after = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{incoming_table}`") + + assert count_base_before == count_base_after, "Base table should not be modified by diff" + assert count_incoming_before == count_incoming_after, "Incoming table should not be modified by diff" + print("\n✅ Diff is read-only verified") + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_empty_tables(self, db_client): + """ + Test diff between two empty tables returns empty result. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_empty_base_{ts}" + copy_table = f"test_diff_empty_copy_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50))") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{copy_table}`") + + result = self._execute(db_client, f"DIFF TABLE `{copy_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff between empty tables: {len(diff_rows)} rows") + assert len(diff_rows) == 0, f"Expected 0 diff rows for empty tables, got {len(diff_rows)}" + finally: + self._cleanup_tables(db_client, copy_table, base_table) + + def test_diff_via_collection_api(self, db_client): + """ + Test diff through the Collection.diff() public API. + Creates tables with the SDK naming convention (c$v1$ prefix). + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + from pyseekdb.client.collection import Collection + from pyseekdb.client.meta_info import CollectionNames + + ts = int(time.time() * 1000) + base_name = f"test_diff_api_base_{ts}" + incoming_name = f"test_diff_api_incoming_{ts}" + base_table = CollectionNames.table_name(base_name) + incoming_table = CollectionNames.table_name(incoming_name) + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute(db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + # Create Collection objects pointing to these tables (collection_id=None -> v1 naming) + server = db_client._server + base_col = Collection(client=server, name=base_name, collection_id=None) + incoming_col = Collection(client=server, name=incoming_name, collection_id=None) + + # Call the public API + diff_rows = incoming_col.diff(base_col) + print(f"\n✅ Diff via Collection API: {len(diff_rows)} diff rows") + assert len(diff_rows) > 0, "Expected diff rows via Collection API" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) diff --git a/tests/integration_tests/test_collection_fork.py b/tests/integration_tests/test_collection_fork.py index 98eabd3..45bf6e6 100644 --- a/tests/integration_tests/test_collection_fork.py +++ b/tests/integration_tests/test_collection_fork.py @@ -23,10 +23,10 @@ class TestCollectionFork: """Tests for collection.fork() method using real database connections.""" - def _is_fork_enabled(self, client) -> bool: + def _is_fork_table_enabled(self, client) -> bool: """Check if fork is enabled for the given client.""" try: - return client._server._fork_enabled() + return client._server._fork_table_enabled() except Exception: logger.exception("Failed to check if fork is enabled") return False @@ -39,7 +39,7 @@ def test_fork_success(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -101,7 +101,7 @@ def test_fork_with_invalid_name(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -130,7 +130,7 @@ def test_fork_with_empty_name(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -159,7 +159,7 @@ def test_fork_preserves_original_collection(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -214,7 +214,7 @@ def test_fork_independent_operations(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -278,7 +278,7 @@ def test_fork_v1_collection_success(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create v1 test collection @@ -350,7 +350,7 @@ def test_fork_v1_collection_preserves_original(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create v1 test collection @@ -412,7 +412,7 @@ def test_fork_v1_collection_independent_operations(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create v1 test collection @@ -494,7 +494,7 @@ def test_fork_v1_and_v2_collections(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create both v1 and v2 collections diff --git a/tests/integration_tests/test_collection_merge.py b/tests/integration_tests/test_collection_merge.py new file mode 100644 index 0000000..695d2d9 --- /dev/null +++ b/tests/integration_tests/test_collection_merge.py @@ -0,0 +1,287 @@ +""" +Integration tests for MERGE TABLE functionality. + +Tests the merge functionality against real databases using plain (non-LOB) tables, +since MERGE TABLE does not support LOB column types (e.g., vectors). + +Tests include: +- Merge with no conflicts (new rows inserted) +- Merge with THEIRS strategy (incoming overwrites on conflict) +- Merge with OURS strategy (current preserved on conflict) +- Merge with FAIL strategy (error on conflict, rollback) +- Invalid strategy parameter handling +- Merge through Collection API with proper table naming +""" + +import contextlib +import logging +import time + +import pymysql.err +import pytest + +logger = logging.getLogger(__name__) + + +class TestCollectionMerge: + """Tests for MERGE TABLE using real database connections.""" + + def _is_diff_merge_enabled(self, client) -> bool: + """Check if diff/merge is enabled for the given client.""" + try: + return client._server._diff_merge_enabled() + except Exception: + logger.exception("Failed to check if diff/merge is enabled") + return False + + def _execute(self, client, sql): + """Execute SQL via the underlying server client.""" + return client._server._execute(sql) + + def _get_count(self, client, table_name): + """Get row count for a table.""" + result = self._execute(client, f"SELECT COUNT(*) AS cnt FROM `{table_name}`") + if isinstance(result[0], dict): + return result[0]["cnt"] + return result[0][0] + + def _get_row(self, client, table_name, row_id): + """Get a single row by id.""" + result = self._execute(client, f"SELECT * FROM `{table_name}` WHERE id = {row_id}") + return result[0] if result else None + + def _cleanup_tables(self, client, *table_names): + """Drop tables, ignoring errors.""" + for name in table_names: + with contextlib.suppress(Exception): + self._execute(client, f"DROP TABLE IF EXISTS `{name}`") + + def test_merge_no_conflict(self, db_client): + """ + Test merge with no conflicts - new rows are inserted into target. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_incoming_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Add new row only to incoming (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + count_before = self._get_count(db_client, current_table) + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY FAIL") + + count_after = self._get_count(db_client, current_table) + assert count_after == count_before + 1, f"Expected {count_before + 1} rows, got {count_after}" + print(f"\n✅ Merge no conflict: {count_before} -> {count_after} rows") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_strategy_theirs(self, db_client): + """ + Test merge with THEIRS strategy - incoming values overwrite on conflict. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_theirs_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute( + db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Modify a row in incoming (creates conflict) + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + # Add a new row in incoming (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (4, 'David', 95)") + + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY THEIRS") + + # Current should have 4 rows now + assert self._get_count(db_client, current_table) == 4 + + # Conflicting row should have incoming's value + row = self._get_row(db_client, current_table, 1) + if isinstance(row, dict): + assert row["score"] == 85, f"Expected score=85 after THEIRS merge, got {row['score']}" + else: + assert row[2] == 85, f"Expected score=85 after THEIRS merge, got {row[2]}" + print("\n✅ Merge THEIRS: conflict resolved with incoming value (score=85)") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_strategy_ours(self, db_client): + """ + Test merge with OURS strategy - current values preserved on conflict. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_ours_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute( + db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Modify a row in incoming (creates conflict) + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + # Add a new row in incoming (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (4, 'David', 95)") + + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY OURS") + + # Current should have 4 rows (new row inserted) + assert self._get_count(db_client, current_table) == 4 + + # Conflicting row should keep current's value + row = self._get_row(db_client, current_table, 1) + if isinstance(row, dict): + assert row["score"] == 80, f"Expected score=80 after OURS merge, got {row['score']}" + else: + assert row[2] == 80, f"Expected score=80 after OURS merge, got {row[2]}" + print("\n✅ Merge OURS: conflict resolved by keeping current value (score=80)") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_strategy_fail_with_conflict(self, db_client): + """ + Test merge with FAIL strategy raises error when conflicts exist, and rolls back. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_fail_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Modify a row in incoming to create a conflict + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + + # FAIL strategy should raise error on conflict + with pytest.raises(pymysql.err.OperationalError): + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY FAIL") + + # Current should be unchanged (transaction rolled back) + assert self._get_count(db_client, current_table) == 2 + row = self._get_row(db_client, current_table, 1) + if isinstance(row, dict): + assert row["score"] == 80, "Current table should be unchanged after FAIL rollback" + else: + assert row[2] == 80, "Current table should be unchanged after FAIL rollback" + print("\n✅ Merge FAIL: correctly raised error on conflict and rolled back") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_invalid_strategy(self, db_client): + """ + Test merge with invalid strategy raises ValueError via Collection API. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + from pyseekdb.client.collection import Collection + + server = db_client._server + dummy_incoming = Collection(client=server, name="dummy_incoming", collection_id=None) + dummy_target = Collection(client=server, name="dummy_target", collection_id=None) + + with pytest.raises(ValueError, match="Invalid merge strategy"): + dummy_incoming.merge_into(dummy_target, strategy="INVALID") + + print("\n✅ Invalid strategy correctly raises ValueError") + + def test_merge_default_strategy(self, db_client): + """ + Test merge with default strategy (FAIL) - no conflicts should succeed. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_default_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Add new row only (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + # Default strategy (no STRATEGY clause = FAIL, but no conflicts so it works) + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}`") + + assert self._get_count(db_client, current_table) == 3 + print("\n✅ Merge with default strategy: 3 rows") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_via_collection_api(self, db_client): + """ + Test merge through the Collection.merge_into() public API. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + from pyseekdb.client.collection import Collection + from pyseekdb.client.meta_info import CollectionNames + + ts = int(time.time() * 1000) + current_name = f"test_merge_api_current_{ts}" + incoming_name = f"test_merge_api_incoming_{ts}" + current_table = CollectionNames.table_name(current_name) + incoming_table = CollectionNames.table_name(incoming_name) + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + server = db_client._server + current_col = Collection(client=server, name=current_name, collection_id=None) + incoming_col = Collection(client=server, name=incoming_name, collection_id=None) + + # Merge via Collection API + incoming_col.merge_into(current_col, strategy="FAIL") + + count = self._get_count(db_client, current_table) + assert count == 3, f"Expected 3 rows after merge via API, got {count}" + print(f"\n✅ Merge via Collection API: {count} rows") + finally: + self._cleanup_tables(db_client, incoming_table, current_table)