From b5109a615a5450d5f4a61064d07ec35f918db753 Mon Sep 17 00:00:00 2001 From: Poseidon <18646154381@163.com> Date: Wed, 18 Mar 2026 16:31:52 +0800 Subject: [PATCH 1/6] feat: fork database --- src/pyseekdb/client/admin_client.py | 27 ++ src/pyseekdb/client/client_base.py | 34 ++- tests/integration_tests/test_database_fork.py | 232 ++++++++++++++++++ 3 files changed, 291 insertions(+), 2 deletions(-) create mode 100644 tests/integration_tests/test_database_fork.py diff --git a/src/pyseekdb/client/admin_client.py b/src/pyseekdb/client/admin_client.py index 40964ce..9c18ff9 100644 --- a/src/pyseekdb/client/admin_client.py +++ b/src/pyseekdb/client/admin_client.py @@ -88,6 +88,29 @@ def list_databases( """ pass + @abstractmethod + def fork_database(self, source_name: str, destination_name: str, tenant: str = DEFAULT_TENANT) -> Database: + """ + Fork (duplicate) a database to create a new independent copy. + + The destination database is logically equivalent to the source database at the + fork snapshot moment, containing all user tables and their data. It can be used + as an independent database for subsequent read/write operations. + + Args: + source_name: source database name + destination_name: destination database name (must not already exist) + tenant: tenant name (for OceanBase) + + Returns: + Database object for the newly created destination database + + Raises: + ValueError: If fork is not supported (requires seekdb >= 1.2.0), + or if the destination database already exists. + """ + pass + class _AdminClientProxy(AdminAPI): """ @@ -129,6 +152,10 @@ def list_databases( """Proxy to server implementation""" return self._server.list_databases(limit=limit, offset=offset, tenant=tenant) + def fork_database(self, source_name: str, destination_name: str, tenant: str = DEFAULT_TENANT) -> Database: + """Proxy to server implementation""" + return self._server.fork_database(source_name=source_name, destination_name=destination_name, tenant=tenant) + def __repr__(self): return f"" diff --git a/src/pyseekdb/client/client_base.py b/src/pyseekdb/client/client_base.py index 05a5847..4a6831d 100644 --- a/src/pyseekdb/client/client_base.py +++ b/src/pyseekdb/client/client_base.py @@ -577,6 +577,30 @@ def list_databases( logger.debug(f"✅ Found {len(databases)} databases{self._database_context(effective_tenant)}") return databases + def fork_database(self, source_name: str, destination_name: str, tenant: str = DEFAULT_TENANT) -> Database: + """ + Fork (duplicate) a database to create a new independent copy. + + Args: + source_name: source database name + destination_name: destination database name (must not already exist) + tenant: tenant name (for OceanBase) + + Returns: + Database object for the newly created destination database + """ + if not self._fork_database_enabled(): + raise ValueError("Fork database is not enabled (requires seekdb >= 1.2.0)") + + effective_tenant = self._database_tenant(tenant) + logger.debug( + f"Forking database: {source_name} -> {destination_name}{self._database_context(effective_tenant)}" + ) + sql = f"FORK DATABASE `{source_name}` TO `{destination_name}`" + self._execute(sql) + logger.debug(f"✅ Successfully forked database '{source_name}' to '{destination_name}'") + return self.get_database(destination_name, tenant=tenant) + # ==================== Collection Management (User-facing) ==================== def _prepare_schema_parameters( # noqa: C901 @@ -1460,12 +1484,18 @@ def _get_collection_table_name(self, collection_id: str | None, collection_name: return CollectionNames.table_name_v2(collection_id) return CollectionNames.table_name(collection_name) - def _fork_enabled(self) -> bool: + def _fork_table_enabled(self) -> bool: db_type, version = self.detect_db_type_and_version() version_110 = Version("1.1.0.0") logger.debug(f"db_type: {db_type}, version: {version}") return db_type.lower() == "seekdb" and version >= version_110 + def _fork_database_enabled(self) -> bool: + db_type, version = self.detect_db_type_and_version() + version_120 = Version("1.2.0.0") + logger.debug(f"db_type: {db_type}, version: {version}") + return db_type.lower() == "seekdb" and version >= version_120 + def _get_collection_id(self, collection_name: str) -> str: collection_id_query_sql = f"SELECT COLLECTION_ID FROM `{CollectionNames.sdk_collections_table_name()}` WHERE COLLECTION_NAME = '{collection_name}'" collection_id_query_result = self._execute(collection_id_query_sql) @@ -1487,7 +1517,7 @@ def _collection_fork(self, collection: Collection, forked_name: str) -> None: collection: Collection to fork forked_name: Forked collection name """ - if not self._fork_enabled(): + if not self._fork_table_enabled(): raise ValueError("Fork is not enabled for this database") _validate_collection_name(forked_name) diff --git a/tests/integration_tests/test_database_fork.py b/tests/integration_tests/test_database_fork.py new file mode 100644 index 0000000..efabdc3 --- /dev/null +++ b/tests/integration_tests/test_database_fork.py @@ -0,0 +1,232 @@ +""" +Integration tests for AdminClient.fork_database method. + +Tests the fork_database functionality against real databases, including: +- Successful fork operations +- Empty database fork +- Fork preserves database attributes (charset, collation) +- Fork creates independent databases +- Error handling when destination already exists +- Error handling when fork is not enabled +""" + +import contextlib +import logging +import time + +import pytest + +import pyseekdb + +logger = logging.getLogger(__name__) + + +class TestDatabaseFork: + """Tests for AdminClient.fork_database() using real database connections.""" + + def _is_fork_database_enabled(self, admin_client) -> bool: + """Check if fork_database is enabled for the given admin client.""" + try: + return admin_client._server._fork_database_enabled() + except Exception: + logger.exception("Failed to check if fork_database is enabled") + return False + + def _unique_name(self, prefix: str) -> str: + return f"{prefix}_{int(time.time() * 1000)}" + + def test_fork_database_success(self, admin_client): + """ + Test successful fork_database operation. + + Automatically runs for: embedded, server, oceanbase + Skips if fork_database is not enabled. + """ + if not self._is_fork_database_enabled(admin_client): + pytest.skip("Fork database is not enabled for this database") + + source_name = self._unique_name("test_forkdb_src") + dest_name = self._unique_name("test_forkdb_dst") + + try: + admin_client.create_database(source_name) + + forked_db = admin_client.fork_database(source_name, dest_name) + + assert forked_db is not None + assert forked_db.name == dest_name + + retrieved_db = admin_client.get_database(dest_name) + assert retrieved_db.name == dest_name + + finally: + with contextlib.suppress(Exception): + admin_client.delete_database(dest_name) + with contextlib.suppress(Exception): + admin_client.delete_database(source_name) + + def test_fork_empty_database(self, admin_client): + """ + Test fork of an empty database (no tables). + + Automatically runs for: embedded, server, oceanbase + Skips if fork_database is not enabled. + """ + if not self._is_fork_database_enabled(admin_client): + pytest.skip("Fork database is not enabled for this database") + + source_name = self._unique_name("test_forkdb_empty_src") + dest_name = self._unique_name("test_forkdb_empty_dst") + + try: + admin_client.create_database(source_name) + + forked_db = admin_client.fork_database(source_name, dest_name) + assert forked_db is not None + assert forked_db.name == dest_name + + finally: + with contextlib.suppress(Exception): + admin_client.delete_database(dest_name) + with contextlib.suppress(Exception): + admin_client.delete_database(source_name) + + def test_fork_database_preserves_attributes(self, admin_client): + """ + Test that fork preserves database attributes (charset, collation). + + Automatically runs for: embedded, server, oceanbase + Skips if fork_database is not enabled. + """ + if not self._is_fork_database_enabled(admin_client): + pytest.skip("Fork database is not enabled for this database") + + source_name = self._unique_name("test_forkdb_attr_src") + dest_name = self._unique_name("test_forkdb_attr_dst") + + try: + admin_client.create_database(source_name) + source_db = admin_client.get_database(source_name) + + forked_db = admin_client.fork_database(source_name, dest_name) + + assert forked_db.charset == source_db.charset + assert forked_db.collation == source_db.collation + + finally: + with contextlib.suppress(Exception): + admin_client.delete_database(dest_name) + with contextlib.suppress(Exception): + admin_client.delete_database(source_name) + + def test_fork_database_destination_already_exists(self, admin_client): + """ + Test that forking to an existing database raises an error. + + Automatically runs for: embedded, server, oceanbase + Skips if fork_database is not enabled. + """ + if not self._is_fork_database_enabled(admin_client): + pytest.skip("Fork database is not enabled for this database") + + source_name = self._unique_name("test_forkdb_dup_src") + dest_name = self._unique_name("test_forkdb_dup_dst") + + try: + admin_client.create_database(source_name) + admin_client.create_database(dest_name) + + with pytest.raises(Exception): + admin_client.fork_database(source_name, dest_name) + + finally: + with contextlib.suppress(Exception): + admin_client.delete_database(dest_name) + with contextlib.suppress(Exception): + admin_client.delete_database(source_name) + + def test_fork_database_independent_operations(self, admin_client): + """ + Test that forked database is independent from the source. + + Creates a table in source before fork, then verifies that modifications + to the forked database do not affect the source and vice versa. + + Automatically runs for: embedded, server, oceanbase + Skips if fork_database is not enabled. + """ + if not self._is_fork_database_enabled(admin_client): + pytest.skip("Fork database is not enabled for this database") + + source_name = self._unique_name("test_forkdb_indep_src") + dest_name = self._unique_name("test_forkdb_indep_dst") + + try: + admin_client.create_database(source_name) + + admin_client._server._execute( + f"CREATE TABLE `{source_name}`.`t1` (id INT PRIMARY KEY, val VARCHAR(100))" + ) + admin_client._server._execute( + f"INSERT INTO `{source_name}`.`t1` VALUES (1, 'original')" + ) + + admin_client.fork_database(source_name, dest_name) + + admin_client._server._execute( + f"INSERT INTO `{dest_name}`.`t1` VALUES (2, 'forked_only')" + ) + + source_rows = admin_client._server._execute( + f"SELECT COUNT(*) as cnt FROM `{source_name}`.`t1`" + ) + dest_rows = admin_client._server._execute( + f"SELECT COUNT(*) as cnt FROM `{dest_name}`.`t1`" + ) + + source_count = source_rows[0]["cnt"] if isinstance(source_rows[0], dict) else source_rows[0][0] + dest_count = dest_rows[0]["cnt"] if isinstance(dest_rows[0], dict) else dest_rows[0][0] + + assert source_count == 1, f"Source should have 1 row, got {source_count}" + assert dest_count == 2, f"Destination should have 2 rows, got {dest_count}" + + finally: + with contextlib.suppress(Exception): + admin_client.delete_database(dest_name) + with contextlib.suppress(Exception): + admin_client.delete_database(source_name) + + def test_fork_database_multiple_times(self, admin_client): + """ + Test that the same source database can be forked multiple times. + + Automatically runs for: embedded, server, oceanbase + Skips if fork_database is not enabled. + """ + if not self._is_fork_database_enabled(admin_client): + pytest.skip("Fork database is not enabled for this database") + + source_name = self._unique_name("test_forkdb_multi_src") + dest_name_1 = self._unique_name("test_forkdb_multi_d1") + dest_name_2 = self._unique_name("test_forkdb_multi_d2") + + try: + admin_client.create_database(source_name) + + forked_1 = admin_client.fork_database(source_name, dest_name_1) + forked_2 = admin_client.fork_database(source_name, dest_name_2) + + assert forked_1.name == dest_name_1 + assert forked_2.name == dest_name_2 + + finally: + with contextlib.suppress(Exception): + admin_client.delete_database(dest_name_2) + with contextlib.suppress(Exception): + admin_client.delete_database(dest_name_1) + with contextlib.suppress(Exception): + admin_client.delete_database(source_name) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) From c731e77358500092fbac66fda1b74c36b4a2e775 Mon Sep 17 00:00:00 2001 From: Poseidon <18646154381@163.com> Date: Wed, 18 Mar 2026 16:49:39 +0800 Subject: [PATCH 2/6] refactor: fix lint err --- .pre-commit-config.yaml | 3 +++ pyproject.toml | 2 +- src/pyseekdb/client/client_base.py | 4 +--- ...llection_hybrid_search_source_inference.py | 2 +- tests/integration_tests/test_database_fork.py | 24 +++++-------------- 5 files changed, 12 insertions(+), 23 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index bcc2d25..a2c71b2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,6 @@ +default_language_version: + python: python3.11 + repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: "v6.0.0" diff --git a/pyproject.toml b/pyproject.toml index 8a11c9d..3bf28af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,7 +107,7 @@ ignore = [ ] [tool.ruff.lint.per-file-ignores] -"tests/*" = ["S101"] +"tests/*" = ["S101", "S608"] "src/pyseekdb/client/client_base.py" = ["S608"] [tool.ruff.format] diff --git a/src/pyseekdb/client/client_base.py b/src/pyseekdb/client/client_base.py index 4a6831d..88411e0 100644 --- a/src/pyseekdb/client/client_base.py +++ b/src/pyseekdb/client/client_base.py @@ -593,9 +593,7 @@ def fork_database(self, source_name: str, destination_name: str, tenant: str = D raise ValueError("Fork database is not enabled (requires seekdb >= 1.2.0)") effective_tenant = self._database_tenant(tenant) - logger.debug( - f"Forking database: {source_name} -> {destination_name}{self._database_context(effective_tenant)}" - ) + logger.debug(f"Forking database: {source_name} -> {destination_name}{self._database_context(effective_tenant)}") sql = f"FORK DATABASE `{source_name}` TO `{destination_name}`" self._execute(sql) logger.debug(f"✅ Successfully forked database '{source_name}' to '{destination_name}'") diff --git a/tests/integration_tests/test_collection_hybrid_search_source_inference.py b/tests/integration_tests/test_collection_hybrid_search_source_inference.py index 8ee21af..053f364 100644 --- a/tests/integration_tests/test_collection_hybrid_search_source_inference.py +++ b/tests/integration_tests/test_collection_hybrid_search_source_inference.py @@ -70,7 +70,7 @@ def _insert_test_data(self, collection, dimension: int): def _get_sql_query(self, client, table_name: str, search_parm: dict) -> str: search_parm_json = json.dumps(search_parm, ensure_ascii=False) client._server._execute(f"SET @search_parm = '{escape_string(search_parm_json)}'") - get_sql_query = f"SELECT DBMS_HYBRID_SEARCH.GET_SQL('{table_name}', @search_parm) as query_sql FROM dual" # noqa: S608 + get_sql_query = f"SELECT DBMS_HYBRID_SEARCH.GET_SQL('{table_name}', @search_parm) as query_sql FROM dual" rows = client._server._execute(get_sql_query) assert rows and rows[0].get("query_sql") query_sql = rows[0]["query_sql"] diff --git a/tests/integration_tests/test_database_fork.py b/tests/integration_tests/test_database_fork.py index efabdc3..2453383 100644 --- a/tests/integration_tests/test_database_fork.py +++ b/tests/integration_tests/test_database_fork.py @@ -16,8 +16,6 @@ import pytest -import pyseekdb - logger = logging.getLogger(__name__) @@ -136,7 +134,7 @@ def test_fork_database_destination_already_exists(self, admin_client): admin_client.create_database(source_name) admin_client.create_database(dest_name) - with pytest.raises(Exception): + with pytest.raises(ValueError): admin_client.fork_database(source_name, dest_name) finally: @@ -164,25 +162,15 @@ def test_fork_database_independent_operations(self, admin_client): try: admin_client.create_database(source_name) - admin_client._server._execute( - f"CREATE TABLE `{source_name}`.`t1` (id INT PRIMARY KEY, val VARCHAR(100))" - ) - admin_client._server._execute( - f"INSERT INTO `{source_name}`.`t1` VALUES (1, 'original')" - ) + admin_client._server._execute(f"CREATE TABLE `{source_name}`.`t1` (id INT PRIMARY KEY, val VARCHAR(100))") + admin_client._server._execute(f"INSERT INTO `{source_name}`.`t1` VALUES (1, 'original')") admin_client.fork_database(source_name, dest_name) - admin_client._server._execute( - f"INSERT INTO `{dest_name}`.`t1` VALUES (2, 'forked_only')" - ) + admin_client._server._execute(f"INSERT INTO `{dest_name}`.`t1` VALUES (2, 'forked_only')") - source_rows = admin_client._server._execute( - f"SELECT COUNT(*) as cnt FROM `{source_name}`.`t1`" - ) - dest_rows = admin_client._server._execute( - f"SELECT COUNT(*) as cnt FROM `{dest_name}`.`t1`" - ) + source_rows = admin_client._server._execute(f"SELECT COUNT(*) as cnt FROM `{source_name}`.`t1`") + dest_rows = admin_client._server._execute(f"SELECT COUNT(*) as cnt FROM `{dest_name}`.`t1`") source_count = source_rows[0]["cnt"] if isinstance(source_rows[0], dict) else source_rows[0][0] dest_count = dest_rows[0]["cnt"] if isinstance(dest_rows[0], dict) else dest_rows[0][0] From 60b7b52bff114461ec5cbee96328d9fa9fd9e7d9 Mon Sep 17 00:00:00 2001 From: Poseidon <18646154381@163.com> Date: Thu, 19 Mar 2026 11:23:38 +0800 Subject: [PATCH 3/6] test: rename _fork_enabled to _fork_table_enabled --- .../integration_tests/test_collection_fork.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/integration_tests/test_collection_fork.py b/tests/integration_tests/test_collection_fork.py index 98eabd3..45bf6e6 100644 --- a/tests/integration_tests/test_collection_fork.py +++ b/tests/integration_tests/test_collection_fork.py @@ -23,10 +23,10 @@ class TestCollectionFork: """Tests for collection.fork() method using real database connections.""" - def _is_fork_enabled(self, client) -> bool: + def _is_fork_table_enabled(self, client) -> bool: """Check if fork is enabled for the given client.""" try: - return client._server._fork_enabled() + return client._server._fork_table_enabled() except Exception: logger.exception("Failed to check if fork is enabled") return False @@ -39,7 +39,7 @@ def test_fork_success(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -101,7 +101,7 @@ def test_fork_with_invalid_name(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -130,7 +130,7 @@ def test_fork_with_empty_name(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -159,7 +159,7 @@ def test_fork_preserves_original_collection(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -214,7 +214,7 @@ def test_fork_independent_operations(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create test collection @@ -278,7 +278,7 @@ def test_fork_v1_collection_success(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create v1 test collection @@ -350,7 +350,7 @@ def test_fork_v1_collection_preserves_original(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create v1 test collection @@ -412,7 +412,7 @@ def test_fork_v1_collection_independent_operations(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create v1 test collection @@ -494,7 +494,7 @@ def test_fork_v1_and_v2_collections(self, db_client): Skips if fork is not enabled for the database. """ # Check if fork is enabled - if not self._is_fork_enabled(db_client): + if not self._is_fork_table_enabled(db_client): pytest.skip("Fork is not enabled for this database") # Create both v1 and v2 collections From c521d0de1a896a61758dc24d20dec46566808a3e Mon Sep 17 00:00:00 2001 From: Poseidon <18646154381@163.com> Date: Fri, 20 Mar 2026 11:47:39 +0800 Subject: [PATCH 4/6] feat: support diff & merge table for seekdb --- src/pyseekdb/client/client_base.py | 62 +++- src/pyseekdb/client/collection.py | 86 ++++++ .../integration_tests/test_collection_diff.py | 240 +++++++++++++++ .../test_collection_merge.py | 287 ++++++++++++++++++ 4 files changed, 674 insertions(+), 1 deletion(-) create mode 100644 tests/integration_tests/test_collection_diff.py create mode 100644 tests/integration_tests/test_collection_merge.py diff --git a/src/pyseekdb/client/client_base.py b/src/pyseekdb/client/client_base.py index 88411e0..8fbe255 100644 --- a/src/pyseekdb/client/client_base.py +++ b/src/pyseekdb/client/client_base.py @@ -11,7 +11,7 @@ from abc import ABC, abstractmethod from collections.abc import Sequence from dataclasses import dataclass -from typing import Any +from typing import Any, ClassVar from pymysql.converters import escape_string @@ -1494,6 +1494,12 @@ def _fork_database_enabled(self) -> bool: logger.debug(f"db_type: {db_type}, version: {version}") return db_type.lower() == "seekdb" and version >= version_120 + def _diff_merge_enabled(self) -> bool: + db_type, version = self.detect_db_type_and_version() + version_120 = Version("1.2.0.0") + logger.debug(f"db_type: {db_type}, version: {version}") + return db_type.lower() == "seekdb" and version >= version_120 + def _get_collection_id(self, collection_name: str) -> str: collection_id_query_sql = f"SELECT COLLECTION_ID FROM `{CollectionNames.sdk_collections_table_name()}` WHERE COLLECTION_NAME = '{collection_name}'" collection_id_query_result = self._execute(collection_id_query_sql) @@ -1555,6 +1561,60 @@ def _collection_fork(self, collection: Collection, forked_name: str) -> None: raise ValueError(f"Failed to fork collection: {ex}") from ex logger.debug(f"✅ Successfully forked collection '{collection.name}' to '{forked_name}'") + _VALID_MERGE_STRATEGIES: ClassVar[set[str]] = {"FAIL", "THEIRS", "OURS"} + + def _collection_diff(self, incoming: Collection, current: Collection) -> list: + """ + Diff two collections, returning rows that differ between them. + + Args: + incoming: The incoming (comparison) collection + current: The current (baseline) collection + + Returns: + List of diff result rows. Empty list if no differences. + """ + if not self._diff_merge_enabled(): + raise ValueError("Diff is not enabled for this database (requires seekdb >= 1.2.0)") + + incoming_table = self._get_collection_table_name(incoming.id, incoming.name) + current_table = self._get_collection_table_name(current.id, current.name) + + diff_sql = f"DIFF TABLE `{incoming_table}` AGAINST `{current_table}`" + result = self._execute(diff_sql) + logger.debug( + f"✅ Successfully diffed collection '{incoming.name}' against '{current.name}', " + f"found {len(result) if result else 0} diff rows" + ) + return result if result else [] + + def _collection_merge(self, incoming: Collection, current: Collection, strategy: str = "FAIL") -> None: + """ + Merge incoming collection into current collection. + + Args: + incoming: The incoming (source) collection + current: The current (target) collection + strategy: Conflict resolution strategy - FAIL, THEIRS, or OURS (default: FAIL) + """ + if not self._diff_merge_enabled(): + raise ValueError("Merge is not enabled for this database (requires seekdb >= 1.2.0)") + + strategy_upper = strategy.upper() + if strategy_upper not in self._VALID_MERGE_STRATEGIES: + raise ValueError( + f"Invalid merge strategy: '{strategy}'. Must be one of: {', '.join(sorted(self._VALID_MERGE_STRATEGIES))}" + ) + + incoming_table = self._get_collection_table_name(incoming.id, incoming.name) + current_table = self._get_collection_table_name(current.id, current.name) + + merge_sql = f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY {strategy_upper}" + self._execute(merge_sql) + logger.debug( + f"✅ Successfully merged collection '{incoming.name}' into '{current.name}' with strategy {strategy_upper}" + ) + # ==================== Collection Internal Operations (Called by Collection) ==================== # These methods are called by Collection objects, different clients implement different logic diff --git a/src/pyseekdb/client/collection.py b/src/pyseekdb/client/collection.py index 5498283..13b5578 100644 --- a/src/pyseekdb/client/collection.py +++ b/src/pyseekdb/client/collection.py @@ -155,6 +155,92 @@ def fork(self, forked_name: str) -> "Collection": collection = self._client.get_collection(forked_name, embedding_function=self._embedding_function) return collection + def diff(self, other: "Collection") -> list[dict[str, Any]]: + """ + Diff this collection against another collection (baseline). + + Compares rows by primary key. Returns rows that differ between the two collections: + conflicting rows (same key, different values), and rows unique to either collection. + Rows that are identical in both collections are not included. + + This is a read-only operation - neither collection is modified. + + Args: + other: The baseline collection to compare against. Must have the same + column definitions (column names, types, and order) and a primary key. + + Returns: + list[dict[str, Any]]: List of diff result rows. Each row is a dict containing + the table columns plus system columns indicating the source table and diff type. + Returns an empty list if the collections are identical. + + Raises: + ValueError: If diff is not enabled (requires seekdb >= 1.2.0), + or if the two collections have incompatible schemas. + + Note: + - Diff is only available for seekdb database version 1.2.0.0 or higher. + - In the SQL semantics, ``self`` is the "incoming" table and ``other`` is + the "current" (baseline) table. + + Examples: + .. code-block:: python + original = client.get_collection("baseline") + modified = original.fork("modified_copy") + + # Make changes to modified copy + modified.add(ids="new_id", embeddings=[1.0, 2.0, 3.0], documents="New doc") + + # See what changed + diff_rows = modified.diff(original) + for row in diff_rows: + print(row) + + """ + return self._client._collection_diff(incoming=self, current=other) + + def merge_into(self, target: "Collection", strategy: str = "FAIL") -> None: + """ + Merge this collection's changes into the target collection. + + Non-conflicting rows (rows unique to this collection) are inserted into the target. + Rows unique to the target are preserved. Conflicting rows (same primary key, + different values) are handled according to the specified strategy. + Merge does not delete any rows from the target. + + Args: + target: The target collection to merge into. Must have the same + column definitions (column names, types, and order) and a primary key. + strategy: Conflict resolution strategy. One of: + - ``"FAIL"`` (default): Raise an error and roll back if any conflicts exist. + - ``"THEIRS"``: Use this collection's values to overwrite the target on conflict. + - ``"OURS"``: Keep the target's values unchanged on conflict. + + Raises: + ValueError: If merge is not enabled (requires seekdb >= 1.2.0), + if the strategy is invalid, or if the two collections have + incompatible schemas. + + Note: + - Merge is only available for seekdb database version 1.2.0.0 or higher. + - The merge executes in a single transaction; on failure it rolls back entirely. + - In the SQL semantics, ``self`` is the "incoming" table and ``target`` is + the "current" table. + + Examples: + .. code-block:: python + original = client.get_collection("main_data") + branch = original.fork("experiment") + + # Make changes in the branch + branch.add(ids="new_id", embeddings=[1.0, 2.0, 3.0], documents="New doc") + + # Merge branch changes back, using branch values on conflict + branch.merge_into(original, strategy="THEIRS") + + """ + self._client._collection_merge(incoming=self, current=target, strategy=strategy) + # ==================== DML Operations ==================== # All methods delegate to client's internal implementation diff --git a/tests/integration_tests/test_collection_diff.py b/tests/integration_tests/test_collection_diff.py new file mode 100644 index 0000000..bfd9b29 --- /dev/null +++ b/tests/integration_tests/test_collection_diff.py @@ -0,0 +1,240 @@ +""" +Integration tests for DIFF TABLE functionality. + +Tests the diff functionality against real databases using plain (non-LOB) tables, +since DIFF TABLE does not support LOB column types (e.g., vectors). + +Tests include: +- Diff between identical tables (no differences) +- Diff detecting conflicting rows (same key, different values) +- Diff detecting rows unique to either table +- Diff with empty tables +- Diff through Collection API with proper table naming +""" + +import contextlib +import logging +import time + +import pytest + +logger = logging.getLogger(__name__) + + +class TestCollectionDiff: + """Tests for DIFF TABLE using real database connections.""" + + def _is_diff_merge_enabled(self, client) -> bool: + """Check if diff/merge is enabled for the given client.""" + try: + return client._server._diff_merge_enabled() + except Exception: + logger.exception("Failed to check if diff/merge is enabled") + return False + + def _execute(self, client, sql): + """Execute SQL via the underlying server client.""" + return client._server._execute(sql) + + def _cleanup_tables(self, client, *table_names): + """Drop tables, ignoring errors.""" + for name in table_names: + with contextlib.suppress(Exception): + self._execute(client, f"DROP TABLE IF EXISTS `{name}`") + + def test_diff_identical_tables(self, db_client): + """ + Test diff between two identical tables returns empty result. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + copy_table = f"test_diff_copy_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute( + db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{copy_table}`") + + result = self._execute(db_client, f"DIFF TABLE `{copy_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff between identical tables: {len(diff_rows)} rows") + assert len(diff_rows) == 0, f"Expected 0 diff rows for identical tables, got {len(diff_rows)}" + finally: + self._cleanup_tables(db_client, copy_table, base_table) + + def test_diff_with_added_rows(self, db_client): + """ + Test diff detects rows unique to the incoming table. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_added_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute(db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + + # Add a new row to incoming + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + result = self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff with added rows: {len(diff_rows)} diff rows") + assert len(diff_rows) > 0, "Expected diff rows for table with added data" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_with_modified_rows(self, db_client): + """ + Test diff detects conflicting rows (same key, different values). + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_modified_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute( + db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + + # Modify a row in incoming + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + + result = self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff with modified rows: {len(diff_rows)} diff rows") + # Conflict produces 2 rows (one from each table) + assert len(diff_rows) == 2, f"Expected 2 diff rows for one modified row, got {len(diff_rows)}" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_with_deleted_rows(self, db_client): + """ + Test diff detects rows unique to the current (baseline) table. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_deleted_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute( + db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + + # Delete a row from incoming + self._execute(db_client, f"DELETE FROM `{incoming_table}` WHERE id = 2") + + result = self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff with deleted rows: {len(diff_rows)} diff rows") + assert len(diff_rows) > 0, "Expected diff rows when incoming has deleted data" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_is_readonly(self, db_client): + """ + Test that diff does not modify either table. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_base_{ts}" + incoming_table = f"test_diff_ro_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute(db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + count_base_before = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{base_table}`") + count_incoming_before = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{incoming_table}`") + + # Perform diff + self._execute(db_client, f"DIFF TABLE `{incoming_table}` AGAINST `{base_table}`") + + count_base_after = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{base_table}`") + count_incoming_after = self._execute(db_client, f"SELECT COUNT(*) AS cnt FROM `{incoming_table}`") + + assert count_base_before == count_base_after, "Base table should not be modified by diff" + assert count_incoming_before == count_incoming_after, "Incoming table should not be modified by diff" + print("\n✅ Diff is read-only verified") + finally: + self._cleanup_tables(db_client, incoming_table, base_table) + + def test_diff_empty_tables(self, db_client): + """ + Test diff between two empty tables returns empty result. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + base_table = f"test_diff_empty_base_{ts}" + copy_table = f"test_diff_empty_copy_{ts}" + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50))") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{copy_table}`") + + result = self._execute(db_client, f"DIFF TABLE `{copy_table}` AGAINST `{base_table}`") + diff_rows = result if result else [] + print(f"\n✅ Diff between empty tables: {len(diff_rows)} rows") + assert len(diff_rows) == 0, f"Expected 0 diff rows for empty tables, got {len(diff_rows)}" + finally: + self._cleanup_tables(db_client, copy_table, base_table) + + def test_diff_via_collection_api(self, db_client): + """ + Test diff through the Collection.diff() public API. + Creates tables with the SDK naming convention (c$v1$ prefix). + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + from pyseekdb.client.collection import Collection + from pyseekdb.client.meta_info import CollectionNames + + ts = int(time.time() * 1000) + base_name = f"test_diff_api_base_{ts}" + incoming_name = f"test_diff_api_incoming_{ts}" + base_table = CollectionNames.table_name(base_name) + incoming_table = CollectionNames.table_name(incoming_name) + + try: + self._execute(db_client, f"CREATE TABLE `{base_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)") + self._execute(db_client, f"INSERT INTO `{base_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{base_table}` TO `{incoming_table}`") + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + # Create Collection objects pointing to these tables (collection_id=None -> v1 naming) + server = db_client._server + base_col = Collection(client=server, name=base_name, collection_id=None) + incoming_col = Collection(client=server, name=incoming_name, collection_id=None) + + # Call the public API + diff_rows = incoming_col.diff(base_col) + print(f"\n✅ Diff via Collection API: {len(diff_rows)} diff rows") + assert len(diff_rows) > 0, "Expected diff rows via Collection API" + finally: + self._cleanup_tables(db_client, incoming_table, base_table) diff --git a/tests/integration_tests/test_collection_merge.py b/tests/integration_tests/test_collection_merge.py new file mode 100644 index 0000000..695d2d9 --- /dev/null +++ b/tests/integration_tests/test_collection_merge.py @@ -0,0 +1,287 @@ +""" +Integration tests for MERGE TABLE functionality. + +Tests the merge functionality against real databases using plain (non-LOB) tables, +since MERGE TABLE does not support LOB column types (e.g., vectors). + +Tests include: +- Merge with no conflicts (new rows inserted) +- Merge with THEIRS strategy (incoming overwrites on conflict) +- Merge with OURS strategy (current preserved on conflict) +- Merge with FAIL strategy (error on conflict, rollback) +- Invalid strategy parameter handling +- Merge through Collection API with proper table naming +""" + +import contextlib +import logging +import time + +import pymysql.err +import pytest + +logger = logging.getLogger(__name__) + + +class TestCollectionMerge: + """Tests for MERGE TABLE using real database connections.""" + + def _is_diff_merge_enabled(self, client) -> bool: + """Check if diff/merge is enabled for the given client.""" + try: + return client._server._diff_merge_enabled() + except Exception: + logger.exception("Failed to check if diff/merge is enabled") + return False + + def _execute(self, client, sql): + """Execute SQL via the underlying server client.""" + return client._server._execute(sql) + + def _get_count(self, client, table_name): + """Get row count for a table.""" + result = self._execute(client, f"SELECT COUNT(*) AS cnt FROM `{table_name}`") + if isinstance(result[0], dict): + return result[0]["cnt"] + return result[0][0] + + def _get_row(self, client, table_name, row_id): + """Get a single row by id.""" + result = self._execute(client, f"SELECT * FROM `{table_name}` WHERE id = {row_id}") + return result[0] if result else None + + def _cleanup_tables(self, client, *table_names): + """Drop tables, ignoring errors.""" + for name in table_names: + with contextlib.suppress(Exception): + self._execute(client, f"DROP TABLE IF EXISTS `{name}`") + + def test_merge_no_conflict(self, db_client): + """ + Test merge with no conflicts - new rows are inserted into target. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_incoming_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Add new row only to incoming (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + count_before = self._get_count(db_client, current_table) + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY FAIL") + + count_after = self._get_count(db_client, current_table) + assert count_after == count_before + 1, f"Expected {count_before + 1} rows, got {count_after}" + print(f"\n✅ Merge no conflict: {count_before} -> {count_after} rows") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_strategy_theirs(self, db_client): + """ + Test merge with THEIRS strategy - incoming values overwrite on conflict. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_theirs_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute( + db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Modify a row in incoming (creates conflict) + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + # Add a new row in incoming (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (4, 'David', 95)") + + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY THEIRS") + + # Current should have 4 rows now + assert self._get_count(db_client, current_table) == 4 + + # Conflicting row should have incoming's value + row = self._get_row(db_client, current_table, 1) + if isinstance(row, dict): + assert row["score"] == 85, f"Expected score=85 after THEIRS merge, got {row['score']}" + else: + assert row[2] == 85, f"Expected score=85 after THEIRS merge, got {row[2]}" + print("\n✅ Merge THEIRS: conflict resolved with incoming value (score=85)") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_strategy_ours(self, db_client): + """ + Test merge with OURS strategy - current values preserved on conflict. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_ours_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute( + db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90), (3, 'Charlie', 70)" + ) + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Modify a row in incoming (creates conflict) + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + # Add a new row in incoming (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (4, 'David', 95)") + + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY OURS") + + # Current should have 4 rows (new row inserted) + assert self._get_count(db_client, current_table) == 4 + + # Conflicting row should keep current's value + row = self._get_row(db_client, current_table, 1) + if isinstance(row, dict): + assert row["score"] == 80, f"Expected score=80 after OURS merge, got {row['score']}" + else: + assert row[2] == 80, f"Expected score=80 after OURS merge, got {row[2]}" + print("\n✅ Merge OURS: conflict resolved by keeping current value (score=80)") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_strategy_fail_with_conflict(self, db_client): + """ + Test merge with FAIL strategy raises error when conflicts exist, and rolls back. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_fail_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Modify a row in incoming to create a conflict + self._execute(db_client, f"UPDATE `{incoming_table}` SET score = 85 WHERE id = 1") + + # FAIL strategy should raise error on conflict + with pytest.raises(pymysql.err.OperationalError): + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY FAIL") + + # Current should be unchanged (transaction rolled back) + assert self._get_count(db_client, current_table) == 2 + row = self._get_row(db_client, current_table, 1) + if isinstance(row, dict): + assert row["score"] == 80, "Current table should be unchanged after FAIL rollback" + else: + assert row[2] == 80, "Current table should be unchanged after FAIL rollback" + print("\n✅ Merge FAIL: correctly raised error on conflict and rolled back") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_invalid_strategy(self, db_client): + """ + Test merge with invalid strategy raises ValueError via Collection API. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + from pyseekdb.client.collection import Collection + + server = db_client._server + dummy_incoming = Collection(client=server, name="dummy_incoming", collection_id=None) + dummy_target = Collection(client=server, name="dummy_target", collection_id=None) + + with pytest.raises(ValueError, match="Invalid merge strategy"): + dummy_incoming.merge_into(dummy_target, strategy="INVALID") + + print("\n✅ Invalid strategy correctly raises ValueError") + + def test_merge_default_strategy(self, db_client): + """ + Test merge with default strategy (FAIL) - no conflicts should succeed. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + ts = int(time.time() * 1000) + current_table = f"test_merge_current_{ts}" + incoming_table = f"test_merge_default_{ts}" + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + + # Add new row only (no conflict) + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + # Default strategy (no STRATEGY clause = FAIL, but no conflicts so it works) + self._execute(db_client, f"MERGE TABLE `{incoming_table}` INTO `{current_table}`") + + assert self._get_count(db_client, current_table) == 3 + print("\n✅ Merge with default strategy: 3 rows") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) + + def test_merge_via_collection_api(self, db_client): + """ + Test merge through the Collection.merge_into() public API. + """ + if not self._is_diff_merge_enabled(db_client): + pytest.skip("Diff/Merge is not enabled for this database") + + from pyseekdb.client.collection import Collection + from pyseekdb.client.meta_info import CollectionNames + + ts = int(time.time() * 1000) + current_name = f"test_merge_api_current_{ts}" + incoming_name = f"test_merge_api_incoming_{ts}" + current_table = CollectionNames.table_name(current_name) + incoming_table = CollectionNames.table_name(incoming_name) + + try: + self._execute( + db_client, f"CREATE TABLE `{current_table}` (id INT PRIMARY KEY, name VARCHAR(50), score INT)" + ) + self._execute(db_client, f"INSERT INTO `{current_table}` VALUES (1, 'Alice', 80), (2, 'Bob', 90)") + self._execute(db_client, f"FORK TABLE `{current_table}` TO `{incoming_table}`") + self._execute(db_client, f"INSERT INTO `{incoming_table}` VALUES (3, 'Charlie', 70)") + + server = db_client._server + current_col = Collection(client=server, name=current_name, collection_id=None) + incoming_col = Collection(client=server, name=incoming_name, collection_id=None) + + # Merge via Collection API + incoming_col.merge_into(current_col, strategy="FAIL") + + count = self._get_count(db_client, current_table) + assert count == 3, f"Expected 3 rows after merge via API, got {count}" + print(f"\n✅ Merge via Collection API: {count} rows") + finally: + self._cleanup_tables(db_client, incoming_table, current_table) From a07caed816311ed298c7bcb4c0d16a3eb4cf9afb Mon Sep 17 00:00:00 2001 From: Poseidon <18646154381@163.com> Date: Fri, 20 Mar 2026 14:00:39 +0800 Subject: [PATCH 5/6] fix: normalize diff line exec & validate cross collection --- src/pyseekdb/client/client_base.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/pyseekdb/client/client_base.py b/src/pyseekdb/client/client_base.py index 8fbe255..e458886 100644 --- a/src/pyseekdb/client/client_base.py +++ b/src/pyseekdb/client/client_base.py @@ -1563,7 +1563,20 @@ def _collection_fork(self, collection: Collection, forked_name: str) -> None: _VALID_MERGE_STRATEGIES: ClassVar[set[str]] = {"FAIL", "THEIRS", "OURS"} - def _collection_diff(self, incoming: Collection, current: Collection) -> list: + def _validate_same_client(self, incoming: Collection, current: Collection) -> None: + """Ensure both collections belong to this client.""" + if incoming.client is not self: + raise ValueError( + f"Collection '{incoming.name}' belongs to a different client; " + "diff/merge requires both collections to be from the same client" + ) + if current.client is not self: + raise ValueError( + f"Collection '{current.name}' belongs to a different client; " + "diff/merge requires both collections to be from the same client" + ) + + def _collection_diff(self, incoming: Collection, current: Collection) -> list[dict[str, Any]]: """ Diff two collections, returning rows that differ between them. @@ -1572,21 +1585,26 @@ def _collection_diff(self, incoming: Collection, current: Collection) -> list: current: The current (baseline) collection Returns: - List of diff result rows. Empty list if no differences. + List of normalized diff result dicts. Empty list if no differences. """ if not self._diff_merge_enabled(): raise ValueError("Diff is not enabled for this database (requires seekdb >= 1.2.0)") + self._validate_same_client(incoming, current) + incoming_table = self._get_collection_table_name(incoming.id, incoming.name) current_table = self._get_collection_table_name(current.id, current.name) diff_sql = f"DIFF TABLE `{incoming_table}` AGAINST `{current_table}`" - result = self._execute(diff_sql) + conn = self._ensure_connection() + result = self._execute_query_with_cursor( + conn, diff_sql, params=[], use_context_manager=self._use_context_manager_for_cursor() + ) logger.debug( f"✅ Successfully diffed collection '{incoming.name}' against '{current.name}', " - f"found {len(result) if result else 0} diff rows" + f"found {len(result)} diff rows" ) - return result if result else [] + return result def _collection_merge(self, incoming: Collection, current: Collection, strategy: str = "FAIL") -> None: """ @@ -1606,6 +1624,8 @@ def _collection_merge(self, incoming: Collection, current: Collection, strategy: f"Invalid merge strategy: '{strategy}'. Must be one of: {', '.join(sorted(self._VALID_MERGE_STRATEGIES))}" ) + self._validate_same_client(incoming, current) + incoming_table = self._get_collection_table_name(incoming.id, incoming.name) current_table = self._get_collection_table_name(current.id, current.name) From 2e8402a3fd906e2534debcaeacf81aa36b56df57 Mon Sep 17 00:00:00 2001 From: Poseidon <18646154381@163.com> Date: Fri, 20 Mar 2026 14:05:26 +0800 Subject: [PATCH 6/6] doc: limitation on diff merge --- src/pyseekdb/client/collection.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pyseekdb/client/collection.py b/src/pyseekdb/client/collection.py index 13b5578..8fbf4ed 100644 --- a/src/pyseekdb/client/collection.py +++ b/src/pyseekdb/client/collection.py @@ -180,8 +180,9 @@ def diff(self, other: "Collection") -> list[dict[str, Any]]: Note: - Diff is only available for seekdb database version 1.2.0.0 or higher. - - In the SQL semantics, ``self`` is the "incoming" table and ``other`` is - the "current" (baseline) table. + - Collections created via the standard SDK path (using ``create_collection()``) + are not supported, as seekdb does not support DIFF/MERGE TABLE on LOB column + types (``document``, ``embedding``, ``metadata``). Use plain non-LOB schemas instead. Examples: .. code-block:: python @@ -226,6 +227,9 @@ def merge_into(self, target: "Collection", strategy: str = "FAIL") -> None: - The merge executes in a single transaction; on failure it rolls back entirely. - In the SQL semantics, ``self`` is the "incoming" table and ``target`` is the "current" table. + - Collections created via the standard SDK path (using ``create_collection()``) + are not supported, as seekdb does not support DIFF/MERGE TABLE on LOB column + types (``document``, ``embedding``, ``metadata``). Use plain non-LOB schemas instead. Examples: .. code-block:: python