diff --git a/dist/pom.xml b/dist/pom.xml index f7e064f66..40fcc7b1e 100644 --- a/dist/pom.xml +++ b/dist/pom.xml @@ -30,10 +30,10 @@ under the License. pom - 1.20.3 - 2.0.1 - 2.1.1 - 2.2.0 + 1.20.4 + 2.0.2 + 2.1.2 + 2.2.1 diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java index 87c63d69a..47f60fb2d 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java @@ -30,7 +30,7 @@ public final class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); private static final String DEFAULT_VALUE = ""; public static final List requiredVersions = - List.of("1.20.4", "2.0.1", "2.1.1", "2.2.0"); + List.of("1.20.4", "2.0.1", "2.1.2", "2.2.0"); static final Versions INSTANCE = new Versions(); @@ -57,9 +57,9 @@ public static boolean supportAsync() { int minor = Integer.parseInt(st.nextToken()); int micro = Integer.parseInt(st.nextToken()); - if ((major == 1 && (minor < 20 || (minor == 20 && micro <= 3))) + if ((major == 1 && (minor < 20 || (minor == 20 && micro <= 4))) || (major == 2 && minor == 0 && micro <= 1) - || (major == 2 && minor == 1 && micro <= 1) + || (major == 2 && minor == 1 && micro <= 2) || (major == 2 && minor == 2 && micro <= 0)) { LOG.debug( "Flink {} doesn't support async execution for java resource, will fallback to sync execution.", diff --git a/python/flink_agents/plan/actions/utils.py b/python/flink_agents/plan/actions/utils.py index 30f24a031..5f1bbb2b1 100644 --- a/python/flink_agents/plan/actions/utils.py +++ b/python/flink_agents/plan/actions/utils.py @@ -29,7 +29,7 @@ UNSUPPORTED_RANGES: List[Tuple[str, str]] = [ ("1.0.0", "1.20.4"), ("2.0.0", "2.0.1"), - ("2.1.0", "2.1.1"), + ("2.1.0", "2.1.2"), ("2.2.0", "2.2.0"), ] diff --git a/python/flink_agents/runtime/java/java_vector_store.py b/python/flink_agents/runtime/java/java_vector_store.py index 923482df2..76e547547 100644 --- a/python/flink_agents/runtime/java/java_vector_store.py +++ b/python/flink_agents/runtime/java/java_vector_store.py @@ -25,14 +25,9 @@ ) from flink_agents.api.vector_stores.vector_store import ( Document, - VectorStoreQuery, - VectorStoreQueryResult, _maybe_cast_to_list, ) -from flink_agents.runtime.python_java_utils import ( - from_java_document, - from_java_vector_store_query_result, -) +from flink_agents.runtime.python_java_utils import from_java_document class JavaVectorStoreImpl(JavaCollectionManageableVectorStore): @@ -68,28 +63,16 @@ def store_kwargs(self) -> Dict[str, Any]: @override def open(self) -> None: + # Resolve ``embedding_model`` (string → BaseEmbeddingModelSetup instance) on + # the Python side via ``BaseVectorStore.open`` so that ``add``/``query``/ + # ``update`` can embed in Python before crossing to Java. Doing the embed + # on the Java side would force a Java→Python re-entry through pemja for + # PYTHON-backed embedding models, which corrupts the CPython per-thread + # state and crashes the next ``interpreter.get(...)`` inside + # ``JcpPyDecimal_Check → PyImport_ImportModule``. + super().open() self._j_resource.open() - @override - def add( - self, - documents: Document | List[Document], - collection_name: str | None = None, - **kwargs: Any, - ) -> List[str]: - documents = _maybe_cast_to_list(documents) - j_documents = [ - _to_j_document(self._j_resource_adapter, doc) for doc in documents - ] - - return self._j_resource.add(j_documents, collection_name, kwargs) - - @override - def query(self, query: VectorStoreQuery) -> VectorStoreQueryResult: - j_query = self._j_resource_adapter.fromPythonVectorStoreQuery(query) - j_query_result = self._j_resource.query(j_query) - return from_java_vector_store_query_result(j_query_result) - @override def get( self, @@ -114,19 +97,6 @@ def delete( ids = _maybe_cast_to_list(ids) return self._j_resource.delete(ids, collection_name, filters, kwargs) - @override - def update( - self, - documents: Document | List[Document], - collection_name: str | None = None, - **kwargs: Any, - ) -> None: - documents = _maybe_cast_to_list(documents) - j_documents = [ - _to_j_document(self._j_resource_adapter, doc) for doc in documents - ] - self._j_resource.update(j_documents, collection_name, kwargs) - @override def create_collection_if_not_exists(self, name: str, **kwargs: Any) -> None: """Forward to the Java side, passing all kwargs through as a map; the Java diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 7d494638c..58389c82d 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -34,7 +34,6 @@ Document, VectorStoreQuery, VectorStoreQueryMode, - VectorStoreQueryResult, ) from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING from flink_agents.runtime.java.java_resource_wrapper import ( @@ -263,15 +262,6 @@ def from_java_vector_store_query(j_query: Any) -> VectorStoreQuery: ) -def from_java_vector_store_query_result(j_query: Any) -> VectorStoreQueryResult: - """Convert a Java vector store query result to a Python query result.""" - return VectorStoreQueryResult( - documents=[ - from_java_document(j_document) for j_document in j_query.getDocuments() - ], - ) - - def from_java_message_role(j_role: Any) -> MessageRole: """Convert a Java message role to a Python message role.""" return MessageRole(j_role.getValue()) @@ -288,11 +278,6 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> typing.Dict[str, str]: } -def get_mode_value(query: VectorStoreQuery) -> str: - """Get the mode value of a VectorStoreQuery.""" - return query.mode.value - - def get_long_term_memory(ctx: Any) -> Any: """Return ``ctx.long_term_memory`` (or ``None``). Used by the Java side to avoid relying on Pemja's ``PyObject.getAttr`` semantics for attributes that diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java index fbb0365e8..f17d7ce79 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java @@ -23,10 +23,7 @@ import org.apache.flink.agents.api.resource.ResourceContext; import org.apache.flink.agents.api.resource.ResourceType; import org.apache.flink.agents.api.vectorstores.Document; -import org.apache.flink.agents.api.vectorstores.VectorStoreQuery; -import org.apache.flink.agents.api.vectorstores.VectorStoreQueryMode; import pemja.core.PythonInterpreter; -import pemja.core.object.PyObject; import java.util.List; import java.util.Map; @@ -108,20 +105,4 @@ public Document fromPythonDocument( Float score) { return new Document(content, metadata, id, embedding, score); } - - @SuppressWarnings("unchecked") - public VectorStoreQuery fromPythonVectorStoreQuery(PyObject pythonVectorStoreQuery) { - // TODO: Delete this method after the pemja findClass method is fixed. - String modeValue = - (String) - interpreter.invoke( - "python_java_utils.get_mode_value", pythonVectorStoreQuery); - return new VectorStoreQuery( - VectorStoreQueryMode.fromValue(modeValue), - (String) pythonVectorStoreQuery.getAttr("query_text"), - pythonVectorStoreQuery.getAttr("limit", Integer.class), - (String) pythonVectorStoreQuery.getAttr("collection_name"), - (Map) pythonVectorStoreQuery.getAttr("filters", Map.class), - (Map) pythonVectorStoreQuery.getAttr("extra_args", Map.class)); - } }