Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion src/pyseekdb/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from abc import ABC, abstractmethod
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any
from typing import Any, ClassVar

from pymysql.converters import escape_string

Expand Down Expand Up @@ -1494,6 +1494,12 @@ def _fork_database_enabled(self) -> bool:
logger.debug(f"db_type: {db_type}, version: {version}")
return db_type.lower() == "seekdb" and version >= version_120

def _diff_merge_enabled(self) -> bool:
db_type, version = self.detect_db_type_and_version()
version_120 = Version("1.2.0.0")
logger.debug(f"db_type: {db_type}, version: {version}")
return db_type.lower() == "seekdb" and version >= version_120

def _get_collection_id(self, collection_name: str) -> str:
collection_id_query_sql = f"SELECT COLLECTION_ID FROM `{CollectionNames.sdk_collections_table_name()}` WHERE COLLECTION_NAME = '{collection_name}'"
collection_id_query_result = self._execute(collection_id_query_sql)
Expand Down Expand Up @@ -1555,6 +1561,80 @@ def _collection_fork(self, collection: Collection, forked_name: str) -> None:
raise ValueError(f"Failed to fork collection: {ex}") from ex
logger.debug(f"✅ Successfully forked collection '{collection.name}' to '{forked_name}'")

_VALID_MERGE_STRATEGIES: ClassVar[set[str]] = {"FAIL", "THEIRS", "OURS"}

def _validate_same_client(self, incoming: Collection, current: Collection) -> None:
"""Ensure both collections belong to this client."""
if incoming.client is not self:
raise ValueError(
f"Collection '{incoming.name}' belongs to a different client; "
"diff/merge requires both collections to be from the same client"
)
if current.client is not self:
raise ValueError(
f"Collection '{current.name}' belongs to a different client; "
"diff/merge requires both collections to be from the same client"
)

def _collection_diff(self, incoming: Collection, current: Collection) -> list[dict[str, Any]]:
"""
Diff two collections, returning rows that differ between them.

Args:
incoming: The incoming (comparison) collection
current: The current (baseline) collection

Returns:
List of normalized diff result dicts. Empty list if no differences.
"""
if not self._diff_merge_enabled():
raise ValueError("Diff is not enabled for this database (requires seekdb >= 1.2.0)")

self._validate_same_client(incoming, current)

incoming_table = self._get_collection_table_name(incoming.id, incoming.name)
current_table = self._get_collection_table_name(current.id, current.name)

diff_sql = f"DIFF TABLE `{incoming_table}` AGAINST `{current_table}`"
conn = self._ensure_connection()
result = self._execute_query_with_cursor(
conn, diff_sql, params=[], use_context_manager=self._use_context_manager_for_cursor()
)
logger.debug(
f"✅ Successfully diffed collection '{incoming.name}' against '{current.name}', "
f"found {len(result)} diff rows"
)
return result

def _collection_merge(self, incoming: Collection, current: Collection, strategy: str = "FAIL") -> None:
"""
Merge incoming collection into current collection.

Args:
incoming: The incoming (source) collection
current: The current (target) collection
strategy: Conflict resolution strategy - FAIL, THEIRS, or OURS (default: FAIL)
"""
if not self._diff_merge_enabled():
raise ValueError("Merge is not enabled for this database (requires seekdb >= 1.2.0)")

strategy_upper = strategy.upper()
if strategy_upper not in self._VALID_MERGE_STRATEGIES:
raise ValueError(
f"Invalid merge strategy: '{strategy}'. Must be one of: {', '.join(sorted(self._VALID_MERGE_STRATEGIES))}"
)

self._validate_same_client(incoming, current)

incoming_table = self._get_collection_table_name(incoming.id, incoming.name)
current_table = self._get_collection_table_name(current.id, current.name)

merge_sql = f"MERGE TABLE `{incoming_table}` INTO `{current_table}` STRATEGY {strategy_upper}"
self._execute(merge_sql)
logger.debug(
f"✅ Successfully merged collection '{incoming.name}' into '{current.name}' with strategy {strategy_upper}"
)

# ==================== Collection Internal Operations (Called by Collection) ====================
# These methods are called by Collection objects, different clients implement different logic

Expand Down
90 changes: 90 additions & 0 deletions src/pyseekdb/client/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,96 @@ def fork(self, forked_name: str) -> "Collection":
collection = self._client.get_collection(forked_name, embedding_function=self._embedding_function)
return collection

def diff(self, other: "Collection") -> list[dict[str, Any]]:
"""
Diff this collection against another collection (baseline).

Compares rows by primary key. Returns rows that differ between the two collections:
conflicting rows (same key, different values), and rows unique to either collection.
Rows that are identical in both collections are not included.

This is a read-only operation - neither collection is modified.

Args:
other: The baseline collection to compare against. Must have the same
column definitions (column names, types, and order) and a primary key.

Returns:
list[dict[str, Any]]: List of diff result rows. Each row is a dict containing
the table columns plus system columns indicating the source table and diff type.
Returns an empty list if the collections are identical.

Raises:
ValueError: If diff is not enabled (requires seekdb >= 1.2.0),
or if the two collections have incompatible schemas.

Note:
- Diff is only available for seekdb database version 1.2.0.0 or higher.
- Collections created via the standard SDK path (using ``create_collection()``)
are not supported, as seekdb does not support DIFF/MERGE TABLE on LOB column
types (``document``, ``embedding``, ``metadata``). Use plain non-LOB schemas instead.

Examples:
.. code-block:: python
original = client.get_collection("baseline")
modified = original.fork("modified_copy")

# Make changes to modified copy
modified.add(ids="new_id", embeddings=[1.0, 2.0, 3.0], documents="New doc")

# See what changed
diff_rows = modified.diff(original)
for row in diff_rows:
print(row)

"""
return self._client._collection_diff(incoming=self, current=other)

def merge_into(self, target: "Collection", strategy: str = "FAIL") -> None:
"""
Merge this collection's changes into the target collection.

Non-conflicting rows (rows unique to this collection) are inserted into the target.
Rows unique to the target are preserved. Conflicting rows (same primary key,
different values) are handled according to the specified strategy.
Merge does not delete any rows from the target.

Args:
target: The target collection to merge into. Must have the same
column definitions (column names, types, and order) and a primary key.
strategy: Conflict resolution strategy. One of:
- ``"FAIL"`` (default): Raise an error and roll back if any conflicts exist.
- ``"THEIRS"``: Use this collection's values to overwrite the target on conflict.
- ``"OURS"``: Keep the target's values unchanged on conflict.

Raises:
ValueError: If merge is not enabled (requires seekdb >= 1.2.0),
if the strategy is invalid, or if the two collections have
incompatible schemas.

Note:
- Merge is only available for seekdb database version 1.2.0.0 or higher.
- The merge executes in a single transaction; on failure it rolls back entirely.
- In the SQL semantics, ``self`` is the "incoming" table and ``target`` is
the "current" table.
- Collections created via the standard SDK path (using ``create_collection()``)
are not supported, as seekdb does not support DIFF/MERGE TABLE on LOB column
types (``document``, ``embedding``, ``metadata``). Use plain non-LOB schemas instead.

Examples:
.. code-block:: python
original = client.get_collection("main_data")
branch = original.fork("experiment")

# Make changes in the branch
branch.add(ids="new_id", embeddings=[1.0, 2.0, 3.0], documents="New doc")

# Merge branch changes back, using branch values on conflict
branch.merge_into(original, strategy="THEIRS")

"""
self._client._collection_merge(incoming=self, current=target, strategy=strategy)

# ==================== DML Operations ====================
# All methods delegate to client's internal implementation

Expand Down
Loading
Loading