Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
17ba37d
flat index: Add support for RQ and include cache param
rlmanrique Oct 8, 2025
d81781c
Update tests for RQ
rlmanrique Oct 8, 2025
86594fa
Add function to retry if http error
rlmanrique Oct 8, 2025
a1e5aac
Add 1.34 CI and test configuration and move function to conftest
rlmanrique Oct 16, 2025
97a4c21
Update comments
rlmanrique Oct 16, 2025
8558889
Add 134 version
rlmanrique Oct 17, 2025
392aa97
Set 1.34 dev image for CI jobs
rlmanrique Oct 17, 2025
68affd8
Add ACORN as defaul filter strategy in 1.34
rlmanrique Oct 17, 2025
d80c83f
Comment backup test temporarily
rlmanrique Oct 20, 2025
191416e
Merge pull request #1844 from weaviate/rq_flat/cache
dirkkul Oct 20, 2025
d3a8e30
Merge branch 'main' into dev/1.34
dirkkul Oct 31, 2025
8186d11
Introduce `batch.experimental()` while server-side batching is in bet…
tsmith023 Nov 4, 2025
2199790
Ensure created backup in test finishes before starting new test
tsmith023 Nov 4, 2025
dfa3cd3
Use `grpc.aio` and `asyncio.Task` to provide experimental ssb to asyn…
tsmith023 Nov 7, 2025
b245290
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Nov 18, 2025
bc28ab4
Merge branch 'ssb/update-to-1-35-api' of https://github.com/weaviate/…
tsmith023 Jan 20, 2026
dcf586e
Only test SSB fpr <1.36 vers
tsmith023 Jan 20, 2026
fd94fc4
Merge branch 'dev/1.36' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Jan 26, 2026
69fb0f9
Add experimental batching to async collections
tsmith023 Jan 26, 2026
d77e31c
Rename test funcs to avoid naming collisions
tsmith023 Jan 26, 2026
0d3dda6
Merge branch 'dev/1.36' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Jan 28, 2026
18496b9
Align async/sync impls:
tsmith023 Jan 28, 2026
2598ba2
Add missing `stream.done_writing()` to `__send` task
tsmith023 Jan 28, 2026
2bb2659
Abstract stop logic
tsmith023 Jan 28, 2026
c54ae73
Tidy up exception handling in sync/async batching
tsmith023 Jan 30, 2026
6f31d1e
Add support for stream timeouts, defaults to `None`
tsmith023 Jan 30, 2026
335de4b
Handle async queue get timeouts properly
tsmith023 Jan 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions integration/test_batch_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Callable, Generator, List, Optional, Protocol, Tuple

import pytest
import pytest_asyncio
from _pytest.fixtures import SubRequest

import weaviate
Expand Down Expand Up @@ -119,6 +120,53 @@ def _factory(
client_fixture.close()


class AsyncClientFactory(Protocol):
"""Typing for fixture."""

async def __call__(
self, name: str = "", ports: Tuple[int, int] = (8080, 50051), multi_tenant: bool = False
) -> Tuple[weaviate.WeaviateAsyncClient, str]:
"""Typing for fixture."""
...


@pytest_asyncio.fixture
async def async_client_factory(request: SubRequest):
name_fixtures: List[str] = []
client_fixture: Optional[weaviate.WeaviateAsyncClient] = None

async def _factory(
name: str = "", ports: Tuple[int, int] = (8080, 50051), multi_tenant: bool = False
):
nonlocal client_fixture, name_fixtures # noqa: F824
name_fixture = _sanitize_collection_name(request.node.name) + name
name_fixtures.append(name_fixture)
if client_fixture is None:
client_fixture = weaviate.use_async_with_local(grpc_port=ports[1], port=ports[0])
await client_fixture.connect()

if await client_fixture.collections.exists(name_fixture):
await client_fixture.collections.delete(name_fixture)

await client_fixture.collections.create(
name=name_fixture,
properties=[
Property(name="name", data_type=DataType.TEXT),
Property(name="age", data_type=DataType.INT),
],
references=[ReferenceProperty(name="test", target_collection=name_fixture)],
multi_tenancy_config=Configure.multi_tenancy(multi_tenant),
vectorizer_config=Configure.Vectorizer.none(),
)
return client_fixture, name_fixture

try:
yield _factory
finally:
if client_fixture is not None:
await client_fixture.close()


def test_add_objects_in_multiple_batches(client_factory: ClientFactory) -> None:
client, name = client_factory()
with client.batch.rate_limit(50) as batch:
Expand Down Expand Up @@ -367,13 +415,13 @@ def test_add_ref_batch_with_tenant(client_factory: ClientFactory) -> None:
[
lambda client: client.batch.dynamic(),
lambda client: client.batch.fixed_size(),
# lambda client: client.batch.rate_limit(9999),
lambda client: client.batch.rate_limit(9999),
lambda client: client.batch.experimental(concurrency=1),
],
ids=[
"test_add_ten_thousand_data_objects_dynamic",
"test_add_ten_thousand_data_objects_fixed_size",
# "test_add_ten_thousand_data_objects_rate_limit",
"test_add_ten_thousand_data_objects_rate_limit",
"test_add_ten_thousand_data_objects_experimental",
],
)
Expand Down Expand Up @@ -767,3 +815,34 @@ def test_references_with_to_uuids(client_factory: ClientFactory) -> None:

assert len(client.batch.failed_references) == 0, client.batch.failed_references
client.collections.delete(["target", "source"])


@pytest.mark.asyncio
async def test_add_one_hundred_thousand_objects_async_client(
async_client_factory: AsyncClientFactory,
) -> None:
"""Test adding one hundred thousand data objects."""
client, name = await async_client_factory()
if client._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
nr_objects = 100000
import time

start = time.time()
async with client.batch.experimental(concurrency=1) as batch:
for i in range(nr_objects):
await batch.add_object(
collection=name,
properties={"name": "test" + str(i)},
)
end = time.time()
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
assert len(client.batch.results.objs.errors) == 0
assert len(client.batch.results.objs.all_responses) == nr_objects
assert len(client.batch.results.objs.uuids) == nr_objects
assert await client.collections.use(name).length() == nr_objects
assert client.batch.results.objs.has_errors is False
assert len(client.batch.failed_objects) == 0, [
obj.message for obj in client.batch.failed_objects
]
await client.collections.delete(name)
68 changes: 65 additions & 3 deletions integration/test_collection_batch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import uuid
from dataclasses import dataclass
from typing import Any, Generator, Optional, Protocol, Union
from typing import Any, Awaitable, Generator, Optional, Protocol, Union

import pytest

from integration.conftest import CollectionFactory, CollectionFactoryGet
from integration.conftest import AsyncCollectionFactory, CollectionFactory, CollectionFactoryGet
from weaviate.collections import Collection
from weaviate.collections.classes.config import (
Configure,
Expand All @@ -17,6 +17,8 @@
from weaviate.collections.classes.tenants import Tenant
from weaviate.types import VECTORS

from weaviate.collections.collection.async_ import CollectionAsync

UUID = Union[str, uuid.UUID]


Expand Down Expand Up @@ -55,11 +57,21 @@ def __call__(self, name: str = "", multi_tenancy: bool = False) -> Collection[An
...


class BatchCollectionAsync(Protocol):
"""Typing for fixture."""

def __call__(
self, name: str = "", multi_tenancy: bool = False
) -> Awaitable[CollectionAsync[Any, Any]]:
"""Typing for fixture."""
...


@pytest.fixture
def batch_collection(
collection_factory: CollectionFactory,
) -> Generator[BatchCollection, None, None]:
def _factory(name: str = "", multi_tenancy: bool = False) -> Collection[Any, Any]:
def _factory(name: str = "", multi_tenancy: bool = False):
collection = collection_factory(
name=name,
vectorizer_config=Configure.Vectorizer.none(),
Expand All @@ -78,6 +90,29 @@ def _factory(name: str = "", multi_tenancy: bool = False) -> Collection[Any, Any
yield _factory


@pytest.fixture
def batch_collection_async(
async_collection_factory: AsyncCollectionFactory,
) -> Generator[BatchCollectionAsync, None, None]:
async def _factory(name: str = "", multi_tenancy: bool = False):
collection = await async_collection_factory(
name=name,
vectorizer_config=Configure.Vectorizer.none(),
properties=[
Property(name="name", data_type=DataType.TEXT),
Property(name="age", data_type=DataType.INT),
],
multi_tenancy_config=Configure.multi_tenancy(multi_tenancy),
)
await collection.config.add_reference(
ReferenceProperty(name="test", target_collection=collection.name)
)

return collection

yield _factory


@pytest.mark.parametrize(
"vector",
[None, [1, 2, 3], MockNumpyTorch([1, 2, 3]), MockTensorFlow([1, 2, 3])],
Expand Down Expand Up @@ -233,3 +268,30 @@ def test_non_existant_collection(collection_factory_get: CollectionFactoryGet) -

# above should not throw - depending on the autoschema config this might create an error or
# not, so we do not check for errors here


@pytest.mark.asyncio
async def test_add_one_hundred_thousand_objects_async_collection(
batch_collection_async: BatchCollectionAsync,
) -> None:
"""Test adding one hundred thousand data objects."""
col = await batch_collection_async()
if col._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
nr_objects = 100000
import time

start = time.time()
async with col.batch.experimental() as batch:
for i in range(nr_objects):
await batch.add_object(
properties={"name": "test" + str(i)},
)
end = time.time()
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
assert len(col.batch.results.objs.errors) == 0
assert len(col.batch.results.objs.all_responses) == nr_objects
assert len(col.batch.results.objs.uuids) == nr_objects
assert await col.length() == nr_objects
assert col.batch.results.objs.has_errors is False
assert len(col.batch.failed_objects) == 0, [obj.message for obj in col.batch.failed_objects]
3 changes: 2 additions & 1 deletion weaviate/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .auth import AuthCredentials
from .backup import _Backup, _BackupAsync
from .cluster import _Cluster, _ClusterAsync
from .collections.batch.client import _BatchClientWrapper
from .collections.batch.client import _BatchClientWrapper, _BatchClientWrapperAsync
from .collections.collections import _Collections, _CollectionsAsync
from .config import AdditionalConfig
from .connect import executor
Expand Down Expand Up @@ -76,6 +76,7 @@ def __init__(
)
self.alias = _AliasAsync(self._connection)
self.backup = _BackupAsync(self._connection)
self.batch = _BatchClientWrapperAsync(self._connection)
self.cluster = _ClusterAsync(self._connection)
self.collections = _CollectionsAsync(self._connection)
self.debug = _DebugAsync(self._connection)
Expand Down
3 changes: 2 additions & 1 deletion weaviate/client.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ from weaviate.users.sync import _Users

from .backup import _Backup, _BackupAsync
from .cluster import _Cluster, _ClusterAsync
from .collections.batch.client import _BatchClientWrapper
from .collections.batch.client import _BatchClientWrapper, _BatchClientWrapperAsync
from .debug import _Debug, _DebugAsync
from .rbac import _Roles, _RolesAsync
from .types import NUMBER
Expand All @@ -29,6 +29,7 @@ class WeaviateAsyncClient(_WeaviateClientExecutor[ConnectionAsync]):
_connection: ConnectionAsync
alias: _AliasAsync
backup: _BackupAsync
batch: _BatchClientWrapperAsync
collections: _CollectionsAsync
cluster: _ClusterAsync
debug: _DebugAsync
Expand Down
Loading
Loading