Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
17ba37d
flat index: Add support for RQ and include cache param
rlmanrique Oct 8, 2025
d81781c
Update tests for RQ
rlmanrique Oct 8, 2025
86594fa
Add function to retry if http error
rlmanrique Oct 8, 2025
a1e5aac
Add 1.34 CI and test configuration and move function to conftest
rlmanrique Oct 16, 2025
97a4c21
Update comments
rlmanrique Oct 16, 2025
8558889
Add 134 version
rlmanrique Oct 17, 2025
392aa97
Set 1.34 dev image for CI jobs
rlmanrique Oct 17, 2025
68affd8
Add ACORN as defaul filter strategy in 1.34
rlmanrique Oct 17, 2025
d80c83f
Comment backup test temporarily
rlmanrique Oct 20, 2025
191416e
Merge pull request #1844 from weaviate/rq_flat/cache
dirkkul Oct 20, 2025
d3a8e30
Merge branch 'main' into dev/1.34
dirkkul Oct 31, 2025
8186d11
Introduce `batch.experimental()` while server-side batching is in bet…
tsmith023 Nov 4, 2025
2199790
Ensure created backup in test finishes before starting new test
tsmith023 Nov 4, 2025
dfa3cd3
Use `grpc.aio` and `asyncio.Task` to provide experimental ssb to asyn…
tsmith023 Nov 7, 2025
b245290
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Nov 18, 2025
a1b4400
Introduce `collection.data.ingest` for sync/async SSB usage
tsmith023 Nov 20, 2025
9e74bb3
Add logic to handle acks msg capping in-flight stream usage
tsmith023 Nov 25, 2025
49367c4
Align to latest changes with OOM error
tsmith023 Dec 11, 2025
f4e4b8b
Bump batch stream init timeout to 60s from 10s
tsmith023 Dec 12, 2025
ac00a93
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Dec 15, 2025
a56c17f
Fix `Unknown` type in old batch objs/refs
tsmith023 Dec 18, 2025
359830c
Merge branch 'main' into ssb/update-to-1-35-api
tsmith023 Dec 18, 2025
d28b35c
cast AnyUrl to AnyHttpUrl for pyrite
DanielJanicek Dec 30, 2025
d34bb2c
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Jan 5, 2026
74df2e3
Merge branch 'ssb/update-to-1-35-api' of https://github.com/weaviate/…
tsmith023 Jan 5, 2026
15cb3f2
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Jan 14, 2026
5879cec
Update CI images
tsmith023 Jan 14, 2026
5e428c5
Ensuire `uuid_lookup` is updated properly on `Acks` msg
tsmith023 Jan 14, 2026
2ae44be
Remove rate limit batch test (too slow)
tsmith023 Jan 14, 2026
4678e8c
Remove creds
tsmith023 Jan 14, 2026
fb9c1e7
Fix uuid lookup in old batching
tsmith023 Jan 14, 2026
bc28ab4
Merge branch 'ssb/update-to-1-35-api' of https://github.com/weaviate/…
tsmith023 Jan 20, 2026
dcf586e
Only test SSB fpr <1.36 vers
tsmith023 Jan 20, 2026
0671036
Merge pull request #1884 from weaviate/ssb/update-to-1-35-api
tsmith023 Jan 26, 2026
fd94fc4
Merge branch 'dev/1.36' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Jan 26, 2026
69fb0f9
Add experimental batching to async collections
tsmith023 Jan 26, 2026
d77e31c
Rename test funcs to avoid naming collisions
tsmith023 Jan 26, 2026
51340b2
Merge branch 'introduce-experimental-batching-to-async-client' of htt…
tsmith023 Jan 27, 2026
3e018e7
Periodically renew streams prior to 180s timeout when running with GC…
tsmith023 Jan 28, 2026
22b5333
Review changes
tsmith023 Jan 28, 2026
59a9c12
Merge pull request #1931 from weaviate/ssb/renew-stream-during-import…
tsmith023 Jan 28, 2026
0d3dda6
Merge branch 'dev/1.36' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Jan 28, 2026
18496b9
Align async/sync impls:
tsmith023 Jan 28, 2026
2598ba2
Add missing `stream.done_writing()` to `__send` task
tsmith023 Jan 28, 2026
2bb2659
Abstract stop logic
tsmith023 Jan 28, 2026
0f05810
Merge branch 'introduce-experimental-batching-to-async-client' of htt…
tsmith023 Jan 28, 2026
99f88ef
Refactor .ingest internals to use public classes/types from batch
tsmith023 Jan 28, 2026
1aa344f
Remove rate limit tests
tsmith023 Jan 29, 2026
e752211
Bump all ver checks for SSB from 1.34 to 1.36
tsmith023 Jan 29, 2026
70db90b
Add back rate limit tests
tsmith023 Jan 29, 2026
0253cd4
Move tests around
tsmith023 Jan 29, 2026
cdddbd4
Revert more changes to reduce diff
tsmith023 Jan 29, 2026
982a332
Change another test to only apply for <1.36
tsmith023 Jan 29, 2026
c54ae73
Tidy up exception handling in sync/async batching
tsmith023 Jan 30, 2026
6f31d1e
Add support for stream timeouts, defaults to `None`
tsmith023 Jan 30, 2026
335de4b
Handle async queue get timeouts properly
tsmith023 Jan 30, 2026
4bbaf35
Merge pull request #1872 from weaviate/introduce-experimental-batchin…
tsmith023 Jan 30, 2026
32631b1
Merge branch 'dev/1.36' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Jan 30, 2026
ab08410
SSB Improvements:
tsmith023 Jan 30, 2026
33ff4b9
Merge branch 'dev/1.36' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Jan 30, 2026
69ce7d5
Fix test typos
tsmith023 Jan 30, 2026
d697b7c
Call `._wait` in sync wrapper
tsmith023 Jan 30, 2026
687c7de
Update CI version of `1.36.0-dev`
tsmith023 Jan 30, 2026
4c07a77
Merge pull request #1876 from weaviate/introduce-data-ingest-for-ssb
tsmith023 Jan 30, 2026
005fb12
Merge branch 'main' into dev/1.36
tsmith023 Jan 30, 2026
28d6b9b
Update other images in CI
tsmith023 Jan 30, 2026
31d3ce9
Remove long running rate limit tests
tsmith023 Jan 30, 2026
067f962
Merge branch 'dev/1.36' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Jan 30, 2026
f452e31
- Move shutdown waiting back into `__generate_stream_requests` loop
tsmith023 Feb 2, 2026
f385d44
Bump `reqs.put` timeout
tsmith023 Feb 2, 2026
7626f99
Add more async locks in public functions
tsmith023 Feb 3, 2026
d4498a4
Relinquish control back to event loop on obj/ref add
tsmith023 Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ env:
WEAVIATE_128: 1.28.16
WEAVIATE_129: 1.29.11
WEAVIATE_130: 1.30.22
WEAVIATE_131: 1.31.20
WEAVIATE_132: 1.32.23
WEAVIATE_133: 1.33.10
WEAVIATE_134: 1.34.5
WEAVIATE_135: 1.35.0
WEAVIATE_131: 1.31.22
WEAVIATE_132: 1.32.26
WEAVIATE_133: 1.33.11
WEAVIATE_134: 1.34.8
WEAVIATE_135: 1.35.2
WEAVIATE_136: 1.36.0-dev-0bbf31a

jobs:
lint-and-format:
Expand Down Expand Up @@ -154,11 +155,11 @@ jobs:
fail-fast: false
matrix:
versions: [
{ py: "3.10", weaviate: $WEAVIATE_132, grpc: "1.59.0"},
{ py: "3.11", weaviate: $WEAVIATE_132, grpc: "1.66.0"},
{ py: "3.12", weaviate: $WEAVIATE_132, grpc: "1.70.0"},
{ py: "3.13", weaviate: $WEAVIATE_132, grpc: "1.72.1"},
{ py: "3.14", weaviate: $WEAVIATE_132, grpc: "1.76.0"}
{ py: "3.10", weaviate: $WEAVIATE_136, grpc: "1.59.0"},
{ py: "3.11", weaviate: $WEAVIATE_136, grpc: "1.66.0"},
{ py: "3.12", weaviate: $WEAVIATE_136, grpc: "1.70.0"},
{ py: "3.13", weaviate: $WEAVIATE_136, grpc: "1.72.1"},
{ py: "3.14", weaviate: $WEAVIATE_136, grpc: "1.76.0"}
]
optional_dependencies: [false]
steps:
Expand Down Expand Up @@ -209,11 +210,11 @@ jobs:
fail-fast: false
matrix:
versions: [
{ py: "3.10", weaviate: $WEAVIATE_132},
{ py: "3.11", weaviate: $WEAVIATE_132},
{ py: "3.12", weaviate: $WEAVIATE_132},
{ py: "3.13", weaviate: $WEAVIATE_132},
{ py: "3.14", weaviate: $WEAVIATE_132}
{ py: "3.10", weaviate: $WEAVIATE_136},
{ py: "3.11", weaviate: $WEAVIATE_136},
{ py: "3.12", weaviate: $WEAVIATE_136},
{ py: "3.13", weaviate: $WEAVIATE_136},
{ py: "3.14", weaviate: $WEAVIATE_136}
]
optional_dependencies: [false]
steps:
Expand Down Expand Up @@ -305,8 +306,9 @@ jobs:
$WEAVIATE_131,
$WEAVIATE_132,
$WEAVIATE_133,
$WEAVIATE_134
$WEAVIATE_135
$WEAVIATE_134,
$WEAVIATE_135,
$WEAVIATE_136
]
steps:
- name: Checkout
Expand Down
116 changes: 95 additions & 21 deletions integration/test_batch_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Callable, Generator, List, Optional, Protocol, Tuple

import pytest
import pytest_asyncio
from _pytest.fixtures import SubRequest

import weaviate
Expand Down Expand Up @@ -119,6 +120,53 @@ def _factory(
client_fixture.close()


class AsyncClientFactory(Protocol):
"""Typing for fixture."""

async def __call__(
self, name: str = "", ports: Tuple[int, int] = (8080, 50051), multi_tenant: bool = False
) -> Tuple[weaviate.WeaviateAsyncClient, str]:
"""Typing for fixture."""
...


@pytest_asyncio.fixture
async def async_client_factory(request: SubRequest):
name_fixtures: List[str] = []
client_fixture: Optional[weaviate.WeaviateAsyncClient] = None

async def _factory(
name: str = "", ports: Tuple[int, int] = (8080, 50051), multi_tenant: bool = False
):
nonlocal client_fixture, name_fixtures # noqa: F824
name_fixture = _sanitize_collection_name(request.node.name) + name
name_fixtures.append(name_fixture)
if client_fixture is None:
client_fixture = weaviate.use_async_with_local(grpc_port=ports[1], port=ports[0])
await client_fixture.connect()

if await client_fixture.collections.exists(name_fixture):
await client_fixture.collections.delete(name_fixture)

await client_fixture.collections.create(
name=name_fixture,
properties=[
Property(name="name", data_type=DataType.TEXT),
Property(name="age", data_type=DataType.INT),
],
references=[ReferenceProperty(name="test", target_collection=name_fixture)],
multi_tenancy_config=Configure.multi_tenancy(multi_tenant),
vectorizer_config=Configure.Vectorizer.none(),
)
return client_fixture, name_fixture

try:
yield _factory
finally:
if client_fixture is not None:
await client_fixture.close()


def test_add_objects_in_multiple_batches(client_factory: ClientFactory) -> None:
client, name = client_factory()
with client.batch.rate_limit(50) as batch:
Expand Down Expand Up @@ -365,16 +413,14 @@ def test_add_ref_batch_with_tenant(client_factory: ClientFactory) -> None:
@pytest.mark.parametrize(
"batching_method",
[
# lambda client: client.batch.dynamic(),
# lambda client: client.batch.fixed_size(),
# lambda client: client.batch.rate_limit(9999),
lambda client: client.batch.experimental(concurrency=1),
lambda client: client.batch.dynamic(),
lambda client: client.batch.fixed_size(),
lambda client: client.batch.stream(concurrency=1),
],
ids=[
# "test_add_ten_thousand_data_objects_dynamic",
# "test_add_ten_thousand_data_objects_fixed_size",
# "test_add_ten_thousand_data_objects_rate_limit",
"test_add_ten_thousand_data_objects_experimental",
"test_add_ten_thousand_data_objects_dynamic",
"test_add_ten_thousand_data_objects_fixed_size",
"test_add_ten_thousand_data_objects_stream",
],
)
def test_add_ten_thousand_data_objects(
Expand All @@ -385,10 +431,10 @@ def test_add_ten_thousand_data_objects(
"""Test adding ten thousand data objects."""
client, name = client_factory()
if (
request.node.callspec.id == "test_add_ten_thousand_data_objects_experimental"
and client._connection._weaviate_version.is_lower_than(1, 34, 0)
request.node.callspec.id == "test_add_ten_thousand_data_objects_stream"
and client._connection._weaviate_version.is_lower_than(1, 36, 0)
):
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
nr_objects = 100000
import time

Expand Down Expand Up @@ -575,14 +621,12 @@ def test_add_1000_tenant_objects_with_async_indexing_and_wait_for_only_one(
[
lambda client: client.batch.dynamic(),
lambda client: client.batch.fixed_size(),
lambda client: client.batch.rate_limit(1000),
lambda client: client.batch.experimental(),
lambda client: client.batch.stream(),
],
ids=[
"test_add_one_hundred_objects_and_references_between_all_dynamic",
"test_add_one_hundred_objects_and_references_between_all_fixed_size",
"test_add_one_hundred_objects_and_references_between_all_rate_limit",
"test_add_one_hundred_objects_and_references_between_all_experimental",
"test_add_one_object_and_a_self_reference_dynamic",
"test_add_one_object_and_a_self_reference_fixed_size",
"test_add_one_object_and_a_self_reference_stream",
],
)
def test_add_one_object_and_a_self_reference(
Expand All @@ -593,11 +637,10 @@ def test_add_one_object_and_a_self_reference(
"""Test adding one object and a self reference."""
client, name = client_factory()
if (
request.node.callspec.id
== "test_add_one_hundred_objects_and_references_between_all_experimental"
and client._connection._weaviate_version.is_lower_than(1, 34, 0)
request.node.callspec.id == "test_add_one_object_and_a_self_reference_stream"
and client._connection._weaviate_version.is_lower_than(1, 36, 0)
):
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
with batching_method(client) as batch:
uuid = batch.add_object(collection=name, properties={})
batch.add_reference(
Expand Down Expand Up @@ -768,3 +811,34 @@ def test_references_with_to_uuids(client_factory: ClientFactory) -> None:

assert len(client.batch.failed_references) == 0, client.batch.failed_references
client.collections.delete(["target", "source"])


@pytest.mark.asyncio
async def test_add_one_hundred_thousand_objects_async_client(
async_client_factory: AsyncClientFactory,
) -> None:
"""Test adding one hundred thousand data objects."""
client, name = await async_client_factory()
if client._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
nr_objects = 100000
import time

start = time.time()
async with client.batch.stream(concurrency=1) as batch:
for i in range(nr_objects):
await batch.add_object(
collection=name,
properties={"name": "test" + str(i)},
)
end = time.time()
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
assert len(client.batch.results.objs.errors) == 0
assert len(client.batch.results.objs.all_responses) == nr_objects
assert len(client.batch.results.objs.uuids) == nr_objects
assert await client.collections.use(name).length() == nr_objects
assert client.batch.results.objs.has_errors is False
assert len(client.batch.failed_objects) == 0, [
obj.message for obj in client.batch.failed_objects
]
await client.collections.delete(name)
111 changes: 108 additions & 3 deletions integration/test_collection_batch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import uuid
from dataclasses import dataclass
from typing import Any, Generator, Optional, Protocol, Union
from typing import Any, Awaitable, Generator, Optional, Protocol, Union

import pytest

from integration.conftest import CollectionFactory, CollectionFactoryGet
from integration.conftest import AsyncCollectionFactory, CollectionFactory, CollectionFactoryGet
from weaviate.collections import Collection
from weaviate.collections.classes.config import (
Configure,
Expand All @@ -17,6 +17,8 @@
from weaviate.collections.classes.tenants import Tenant
from weaviate.types import VECTORS

from weaviate.collections.collection.async_ import CollectionAsync

UUID = Union[str, uuid.UUID]


Expand Down Expand Up @@ -55,11 +57,21 @@ def __call__(self, name: str = "", multi_tenancy: bool = False) -> Collection[An
...


class BatchCollectionAsync(Protocol):
"""Typing for fixture."""

def __call__(
self, name: str = "", multi_tenancy: bool = False
) -> Awaitable[CollectionAsync[Any, Any]]:
"""Typing for fixture."""
...


@pytest.fixture
def batch_collection(
collection_factory: CollectionFactory,
) -> Generator[BatchCollection, None, None]:
def _factory(name: str = "", multi_tenancy: bool = False) -> Collection[Any, Any]:
def _factory(name: str = "", multi_tenancy: bool = False):
collection = collection_factory(
name=name,
vectorizer_config=Configure.Vectorizer.none(),
Expand All @@ -78,6 +90,29 @@ def _factory(name: str = "", multi_tenancy: bool = False) -> Collection[Any, Any
yield _factory


@pytest.fixture
def batch_collection_async(
async_collection_factory: AsyncCollectionFactory,
) -> Generator[BatchCollectionAsync, None, None]:
async def _factory(name: str = "", multi_tenancy: bool = False):
collection = await async_collection_factory(
name=name,
vectorizer_config=Configure.Vectorizer.none(),
properties=[
Property(name="name", data_type=DataType.TEXT),
Property(name="age", data_type=DataType.INT),
],
multi_tenancy_config=Configure.multi_tenancy(multi_tenancy),
)
await collection.config.add_reference(
ReferenceProperty(name="test", target_collection=collection.name)
)

return collection

yield _factory


@pytest.mark.parametrize(
"vector",
[None, [1, 2, 3], MockNumpyTorch([1, 2, 3]), MockTensorFlow([1, 2, 3])],
Expand Down Expand Up @@ -233,3 +268,73 @@ def test_non_existant_collection(collection_factory_get: CollectionFactoryGet) -

# above should not throw - depending on the autoschema config this might create an error or
# not, so we do not check for errors here


@pytest.mark.asyncio
async def test_batch_one_hundred_thousand_objects_async_collection(
batch_collection_async: BatchCollectionAsync,
) -> None:
"""Test adding one hundred thousand data objects."""
col = await batch_collection_async()
if col._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
nr_objects = 100000
import time

start = time.time()
async with col.batch.stream() as batch:
for i in range(nr_objects):
await batch.add_object(
properties={"name": "test" + str(i)},
)
end = time.time()
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
assert len(col.batch.results.objs.errors) == 0
assert len(col.batch.results.objs.all_responses) == nr_objects
assert len(col.batch.results.objs.uuids) == nr_objects
assert await col.length() == nr_objects
assert col.batch.results.objs.has_errors is False
assert len(col.batch.failed_objects) == 0, [obj.message for obj in col.batch.failed_objects]


@pytest.mark.asyncio
async def test_ingest_one_hundred_thousand_data_objects_async(
batch_collection_async: BatchCollectionAsync,
) -> None:
col = await batch_collection_async()
if col._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
nr_objects = 100000
import time

start = time.time()
results = await col.data.ingest({"name": "test" + str(i)} for i in range(nr_objects))
end = time.time()
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
assert len(results.errors) == 0
assert len(results.all_responses) == nr_objects
assert len(results.uuids) == nr_objects
assert await col.length() == nr_objects
assert results.has_errors is False
assert len(results.errors) == 0, [obj.message for obj in results.errors.values()]


def test_ingest_one_hundred_thousand_data_objects(
batch_collection: BatchCollection,
) -> None:
col = batch_collection()
if col._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
nr_objects = 100000
import time

start = time.time()
results = col.data.ingest({"name": "test" + str(i)} for i in range(nr_objects))
end = time.time()
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
assert len(results.errors) == 0
assert len(results.all_responses) == nr_objects
assert len(results.uuids) == nr_objects
assert len(col) == nr_objects
assert results.has_errors is False
assert len(results.errors) == 0, [obj.message for obj in results.errors.values()]
6 changes: 3 additions & 3 deletions integration/test_rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,10 +742,10 @@ def test_server_side_batching_with_auth() -> None:
with connect_to_local(
port=RBAC_PORTS[0], grpc_port=RBAC_PORTS[1], auth_credentials=RBAC_AUTH_CREDS
) as client:
if client._connection._weaviate_version.is_lower_than(1, 34, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
if client._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.36.0")
collection = client.collections.create(collection_name)
with client.batch.experimental() as batch:
with client.batch.stream() as batch:
batch.add_object(collection_name)
batch.add_object(collection_name)
batch.add_object(collection_name)
Expand Down
Loading
Loading