Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ under the License.
<packaging>pom</packaging>

<properties>
<flink.1.20.version>1.20.3</flink.1.20.version>
<flink.2.0.version>2.0.1</flink.2.0.version>
<flink.2.1.version>2.1.1</flink.2.1.version>
<flink.2.2.version>2.2.0</flink.2.2.version>
<flink.1.20.version>1.20.4</flink.1.20.version>
<flink.2.0.version>2.0.2</flink.2.0.version>
<flink.2.1.version>2.1.2</flink.2.1.version>
<flink.2.2.version>2.2.1</flink.2.2.version>
</properties>

<modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public final class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
private static final String DEFAULT_VALUE = "<unknown>";
public static final List<String> requiredVersions =
List.of("1.20.4", "2.0.1", "2.1.1", "2.2.0");
List.of("1.20.4", "2.0.1", "2.1.2", "2.2.0");

static final Versions INSTANCE = new Versions();

Expand All @@ -57,9 +57,9 @@ public static boolean supportAsync() {
int minor = Integer.parseInt(st.nextToken());
int micro = Integer.parseInt(st.nextToken());

if ((major == 1 && (minor < 20 || (minor == 20 && micro <= 3)))
if ((major == 1 && (minor < 20 || (minor == 20 && micro <= 4)))
|| (major == 2 && minor == 0 && micro <= 1)
|| (major == 2 && minor == 1 && micro <= 1)
|| (major == 2 && minor == 1 && micro <= 2)
|| (major == 2 && minor == 2 && micro <= 0)) {
LOG.debug(
"Flink {} doesn't support async execution for java resource, will fallback to sync execution.",
Expand Down
2 changes: 1 addition & 1 deletion python/flink_agents/plan/actions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
UNSUPPORTED_RANGES: List[Tuple[str, str]] = [
("1.0.0", "1.20.4"),
("2.0.0", "2.0.1"),
("2.1.0", "2.1.1"),
("2.1.0", "2.1.2"),
("2.2.0", "2.2.0"),
]

Expand Down
48 changes: 9 additions & 39 deletions python/flink_agents/runtime/java/java_vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,9 @@
)
from flink_agents.api.vector_stores.vector_store import (
Document,
VectorStoreQuery,
VectorStoreQueryResult,
_maybe_cast_to_list,
)
from flink_agents.runtime.python_java_utils import (
from_java_document,
from_java_vector_store_query_result,
)
from flink_agents.runtime.python_java_utils import from_java_document


class JavaVectorStoreImpl(JavaCollectionManageableVectorStore):
Expand Down Expand Up @@ -68,28 +63,16 @@ def store_kwargs(self) -> Dict[str, Any]:

@override
def open(self) -> None:
# Resolve ``embedding_model`` (string → BaseEmbeddingModelSetup instance) on
# the Python side via ``BaseVectorStore.open`` so that ``add``/``query``/
# ``update`` can embed in Python before crossing to Java. Doing the embed
# on the Java side would force a Java→Python re-entry through pemja for
# PYTHON-backed embedding models, which corrupts the CPython per-thread
# state and crashes the next ``interpreter.get(...)`` inside
# ``JcpPyDecimal_Check → PyImport_ImportModule``.
super().open()
self._j_resource.open()

@override
def add(
self,
documents: Document | List[Document],
collection_name: str | None = None,
**kwargs: Any,
) -> List[str]:
documents = _maybe_cast_to_list(documents)
j_documents = [
_to_j_document(self._j_resource_adapter, doc) for doc in documents
]

return self._j_resource.add(j_documents, collection_name, kwargs)

@override
def query(self, query: VectorStoreQuery) -> VectorStoreQueryResult:
j_query = self._j_resource_adapter.fromPythonVectorStoreQuery(query)
j_query_result = self._j_resource.query(j_query)
return from_java_vector_store_query_result(j_query_result)

@override
def get(
self,
Expand All @@ -114,19 +97,6 @@ def delete(
ids = _maybe_cast_to_list(ids)
return self._j_resource.delete(ids, collection_name, filters, kwargs)

@override
def update(
self,
documents: Document | List[Document],
collection_name: str | None = None,
**kwargs: Any,
) -> None:
documents = _maybe_cast_to_list(documents)
j_documents = [
_to_j_document(self._j_resource_adapter, doc) for doc in documents
]
self._j_resource.update(j_documents, collection_name, kwargs)

@override
def create_collection_if_not_exists(self, name: str, **kwargs: Any) -> None:
"""Forward to the Java side, passing all kwargs through as a map; the Java
Expand Down
15 changes: 0 additions & 15 deletions python/flink_agents/runtime/python_java_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
Document,
VectorStoreQuery,
VectorStoreQueryMode,
VectorStoreQueryResult,
)
from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING
from flink_agents.runtime.java.java_resource_wrapper import (
Expand Down Expand Up @@ -263,15 +262,6 @@ def from_java_vector_store_query(j_query: Any) -> VectorStoreQuery:
)


def from_java_vector_store_query_result(j_query: Any) -> VectorStoreQueryResult:
"""Convert a Java vector store query result to a Python query result."""
return VectorStoreQueryResult(
documents=[
from_java_document(j_document) for j_document in j_query.getDocuments()
],
)


def from_java_message_role(j_role: Any) -> MessageRole:
"""Convert a Java message role to a Python message role."""
return MessageRole(j_role.getValue())
Expand All @@ -288,11 +278,6 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> typing.Dict[str, str]:
}


def get_mode_value(query: VectorStoreQuery) -> str:
"""Get the mode value of a VectorStoreQuery."""
return query.mode.value


def get_long_term_memory(ctx: Any) -> Any:
"""Return ``ctx.long_term_memory`` (or ``None``). Used by the Java side to
avoid relying on Pemja's ``PyObject.getAttr`` semantics for attributes that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import org.apache.flink.agents.api.resource.ResourceContext;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.vectorstores.Document;
import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
import org.apache.flink.agents.api.vectorstores.VectorStoreQueryMode;
import pemja.core.PythonInterpreter;
import pemja.core.object.PyObject;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -108,20 +105,4 @@ public Document fromPythonDocument(
Float score) {
return new Document(content, metadata, id, embedding, score);
}

@SuppressWarnings("unchecked")
public VectorStoreQuery fromPythonVectorStoreQuery(PyObject pythonVectorStoreQuery) {
// TODO: Delete this method after the pemja findClass method is fixed.
String modeValue =
(String)
interpreter.invoke(
"python_java_utils.get_mode_value", pythonVectorStoreQuery);
return new VectorStoreQuery(
VectorStoreQueryMode.fromValue(modeValue),
(String) pythonVectorStoreQuery.getAttr("query_text"),
pythonVectorStoreQuery.getAttr("limit", Integer.class),
(String) pythonVectorStoreQuery.getAttr("collection_name"),
(Map<String, Object>) pythonVectorStoreQuery.getAttr("filters", Map.class),
(Map<String, Object>) pythonVectorStoreQuery.getAttr("extra_args", Map.class));
}
}
Loading