fix: async-safe embeddings and resilient drain_writes#5702
Open
MatthiasHowellYopp wants to merge 1 commit intocrewAIInc:mainfrom
Open
fix: async-safe embeddings and resilient drain_writes#5702MatthiasHowellYopp wants to merge 1 commit intocrewAIInc:mainfrom
MatthiasHowellYopp wants to merge 1 commit intocrewAIInc:mainfrom
Conversation
f81d201 to
48ce384
Compare
Add bytes→float validators on MemoryRecord and ItemState to handle Valkey returning embeddings as raw bytes. Make embed_texts() safe when called from an async context by using a thread pool. Improve drain_writes() with per-save timeouts and error logging instead of raising on failure. Part 3/4 of Valkey storage implementation.
48ce384 to
d425472
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description:
Part 3/4 of adding Valkey as a storage backend for CrewAI. This PR makes the embedding and memory persistence paths robust enough to work with async storage backends like Valkey.
What changed:
types.py — Added a field_validator on MemoryRecord.embedding that converts bytes to list[float] via numpy. Valkey stores vectors as raw bytes, so this ensures embeddings are always in the expected format regardless of storage backend. Also added a thread pool to embed_texts() so it doesn't block the event loop when called from an async context.
encoding_flow.py — Added a matching field_validator on ItemState.similar_records and result_record to handle the same bytes→float conversion during the consolidation flow.
unified_memory.py — drain_writes() now accepts a timeout_per_save parameter (default 60s) and logs warnings on timeout or failure instead of raising. This prevents a single slow or failed save from blocking crew completion. Added structured debug/warning/error logging throughout the drain cycle.
Testing:
test_embedding_safety.py (15 tests) — Covers bytes→float conversion, empty bytes, numpy arrays, int-to-float coercion, sync and async embed_texts behavior, empty/whitespace input handling.