diff --git a/dist/pom.xml b/dist/pom.xml
index f7e064f66..40fcc7b1e 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -30,10 +30,10 @@ under the License.
pom
- 1.20.3
- 2.0.1
- 2.1.1
- 2.2.0
+ 1.20.4
+ 2.0.2
+ 2.1.2
+ 2.2.1
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java
index 87c63d69a..47f60fb2d 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java
@@ -30,7 +30,7 @@ public final class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
private static final String DEFAULT_VALUE = "";
public static final List requiredVersions =
- List.of("1.20.4", "2.0.1", "2.1.1", "2.2.0");
+ List.of("1.20.4", "2.0.1", "2.1.2", "2.2.0");
static final Versions INSTANCE = new Versions();
@@ -57,9 +57,9 @@ public static boolean supportAsync() {
int minor = Integer.parseInt(st.nextToken());
int micro = Integer.parseInt(st.nextToken());
- if ((major == 1 && (minor < 20 || (minor == 20 && micro <= 3)))
+ if ((major == 1 && (minor < 20 || (minor == 20 && micro <= 4)))
|| (major == 2 && minor == 0 && micro <= 1)
- || (major == 2 && minor == 1 && micro <= 1)
+ || (major == 2 && minor == 1 && micro <= 2)
|| (major == 2 && minor == 2 && micro <= 0)) {
LOG.debug(
"Flink {} doesn't support async execution for java resource, will fallback to sync execution.",
diff --git a/python/flink_agents/plan/actions/utils.py b/python/flink_agents/plan/actions/utils.py
index 30f24a031..5f1bbb2b1 100644
--- a/python/flink_agents/plan/actions/utils.py
+++ b/python/flink_agents/plan/actions/utils.py
@@ -29,7 +29,7 @@
UNSUPPORTED_RANGES: List[Tuple[str, str]] = [
("1.0.0", "1.20.4"),
("2.0.0", "2.0.1"),
- ("2.1.0", "2.1.1"),
+ ("2.1.0", "2.1.2"),
("2.2.0", "2.2.0"),
]
diff --git a/python/flink_agents/runtime/java/java_vector_store.py b/python/flink_agents/runtime/java/java_vector_store.py
index 923482df2..76e547547 100644
--- a/python/flink_agents/runtime/java/java_vector_store.py
+++ b/python/flink_agents/runtime/java/java_vector_store.py
@@ -25,14 +25,9 @@
)
from flink_agents.api.vector_stores.vector_store import (
Document,
- VectorStoreQuery,
- VectorStoreQueryResult,
_maybe_cast_to_list,
)
-from flink_agents.runtime.python_java_utils import (
- from_java_document,
- from_java_vector_store_query_result,
-)
+from flink_agents.runtime.python_java_utils import from_java_document
class JavaVectorStoreImpl(JavaCollectionManageableVectorStore):
@@ -68,28 +63,16 @@ def store_kwargs(self) -> Dict[str, Any]:
@override
def open(self) -> None:
+ # Resolve ``embedding_model`` (string → BaseEmbeddingModelSetup instance) on
+ # the Python side via ``BaseVectorStore.open`` so that ``add``/``query``/
+ # ``update`` can embed in Python before crossing to Java. Doing the embed
+ # on the Java side would force a Java→Python re-entry through pemja for
+ # PYTHON-backed embedding models, which corrupts the CPython per-thread
+ # state and crashes the next ``interpreter.get(...)`` inside
+ # ``JcpPyDecimal_Check → PyImport_ImportModule``.
+ super().open()
self._j_resource.open()
- @override
- def add(
- self,
- documents: Document | List[Document],
- collection_name: str | None = None,
- **kwargs: Any,
- ) -> List[str]:
- documents = _maybe_cast_to_list(documents)
- j_documents = [
- _to_j_document(self._j_resource_adapter, doc) for doc in documents
- ]
-
- return self._j_resource.add(j_documents, collection_name, kwargs)
-
- @override
- def query(self, query: VectorStoreQuery) -> VectorStoreQueryResult:
- j_query = self._j_resource_adapter.fromPythonVectorStoreQuery(query)
- j_query_result = self._j_resource.query(j_query)
- return from_java_vector_store_query_result(j_query_result)
-
@override
def get(
self,
@@ -114,19 +97,6 @@ def delete(
ids = _maybe_cast_to_list(ids)
return self._j_resource.delete(ids, collection_name, filters, kwargs)
- @override
- def update(
- self,
- documents: Document | List[Document],
- collection_name: str | None = None,
- **kwargs: Any,
- ) -> None:
- documents = _maybe_cast_to_list(documents)
- j_documents = [
- _to_j_document(self._j_resource_adapter, doc) for doc in documents
- ]
- self._j_resource.update(j_documents, collection_name, kwargs)
-
@override
def create_collection_if_not_exists(self, name: str, **kwargs: Any) -> None:
"""Forward to the Java side, passing all kwargs through as a map; the Java
diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py
index 7d494638c..58389c82d 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -34,7 +34,6 @@
Document,
VectorStoreQuery,
VectorStoreQueryMode,
- VectorStoreQueryResult,
)
from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING
from flink_agents.runtime.java.java_resource_wrapper import (
@@ -263,15 +262,6 @@ def from_java_vector_store_query(j_query: Any) -> VectorStoreQuery:
)
-def from_java_vector_store_query_result(j_query: Any) -> VectorStoreQueryResult:
- """Convert a Java vector store query result to a Python query result."""
- return VectorStoreQueryResult(
- documents=[
- from_java_document(j_document) for j_document in j_query.getDocuments()
- ],
- )
-
-
def from_java_message_role(j_role: Any) -> MessageRole:
"""Convert a Java message role to a Python message role."""
return MessageRole(j_role.getValue())
@@ -288,11 +278,6 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> typing.Dict[str, str]:
}
-def get_mode_value(query: VectorStoreQuery) -> str:
- """Get the mode value of a VectorStoreQuery."""
- return query.mode.value
-
-
def get_long_term_memory(ctx: Any) -> Any:
"""Return ``ctx.long_term_memory`` (or ``None``). Used by the Java side to
avoid relying on Pemja's ``PyObject.getAttr`` semantics for attributes that
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
index fbb0365e8..f17d7ce79 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java
@@ -23,10 +23,7 @@
import org.apache.flink.agents.api.resource.ResourceContext;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.vectorstores.Document;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQueryMode;
import pemja.core.PythonInterpreter;
-import pemja.core.object.PyObject;
import java.util.List;
import java.util.Map;
@@ -108,20 +105,4 @@ public Document fromPythonDocument(
Float score) {
return new Document(content, metadata, id, embedding, score);
}
-
- @SuppressWarnings("unchecked")
- public VectorStoreQuery fromPythonVectorStoreQuery(PyObject pythonVectorStoreQuery) {
- // TODO: Delete this method after the pemja findClass method is fixed.
- String modeValue =
- (String)
- interpreter.invoke(
- "python_java_utils.get_mode_value", pythonVectorStoreQuery);
- return new VectorStoreQuery(
- VectorStoreQueryMode.fromValue(modeValue),
- (String) pythonVectorStoreQuery.getAttr("query_text"),
- pythonVectorStoreQuery.getAttr("limit", Integer.class),
- (String) pythonVectorStoreQuery.getAttr("collection_name"),
- (Map) pythonVectorStoreQuery.getAttr("filters", Map.class),
- (Map) pythonVectorStoreQuery.getAttr("extra_args", Map.class));
- }
}