diff --git a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java index 0f7cbf1a5..f4b747955 100644 --- a/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java +++ b/integrations/vector-stores/opensearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/opensearch/OpenSearchVectorStore.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; @@ -245,11 +246,18 @@ private boolean indexExists(String idx) { } private void createKnnIndex(String idx) { + // Use the FAISS engine with HNSW: it supports both pre-filtered and post-filtered KNN + // queries (the default NMSLIB engine on AOSS does NOT support filters and rejects + // queries with "Engine [NMSLIB] does not support filters"), and is the recommended + // engine for both AOSS VECTORSEARCH collections and OpenSearch Service domains 2.x+. String body = String.format( "{\"settings\":{\"index\":{\"knn\":true}}," + "\"mappings\":{\"properties\":{\"%s\":{\"type\":\"knn_vector\"," - + "\"dimension\":%d},\"%s\":{\"type\":\"text\"}," + + "\"dimension\":%d," + + "\"method\":{\"engine\":\"faiss\",\"name\":\"hnsw\"," + + "\"space_type\":\"l2\"}}," + + "\"%s\":{\"type\":\"text\"}," + "\"metadata\":{\"type\":\"object\"}}}}", vectorField, dims, contentField); try { @@ -259,6 +267,16 @@ private void createKnnIndex(String idx) { throw e; } } + if (serverless) { + // AOSS index creation is eventually consistent; the index returns 200 on PUT but + // queries against it can fail with "no such index" for ~5-30s afterward. Give the + // service a short window to propagate before any read/write hits the index. + try { + Thread.sleep(15_000L); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } } /** Sanitize collection name to valid OpenSearch index name (lowercase, no special chars). */ @@ -326,7 +344,7 @@ public void delete( } } executeRequest("POST", "/" + idx + "/_delete_by_query", body.toString()); - executeRequest("POST", "/" + idx + "/_refresh", null); + refreshIfSupported(idx); } @Override @@ -377,7 +395,19 @@ public List queryEmbedding( public void updateEmbedding( List documents, @Nullable String collection, Map extraArgs) throws IOException { - // OpenSearch's bulk index operation is upsert-by-id, so addEmbedding doubles as update. + // OpenSearch's bulk index operation is upsert-by-id, so addEmbedding doubles as update on + // OpenSearch Service domains. On Amazon OpenSearch Serverless this pattern cannot work: + // AOSS rejects client-supplied _id in create/index operations, so the addEmbedding path + // below has to fall back to AOSS-generated ids. With no client-controllable id there is + // no way to target an existing document, and "updating" would silently insert a new copy + // instead, which would be worse than failing loudly. + if (serverless) { + throw new UnsupportedOperationException( + "updateEmbedding is not supported on Amazon OpenSearch Serverless: AOSS does" + + " not allow clients to specify document ids, so update-by-id cannot be" + + " implemented. Use a provisioned OpenSearch Service domain if you need" + + " in-place embedding updates."); + } // BaseVectorStore.update() already enforces that every document carries an id, so // addEmbedding will not generate new ones here. addEmbedding(documents, collection, extraArgs); @@ -391,16 +421,26 @@ public List addEmbedding( if (!indexExists(idx)) { createKnnIndex(idx); } - List allIds = new ArrayList<>(); + List clientIds = new ArrayList<>(); + // For serverless we accumulate ids returned by AOSS across batches; for provisioned + // domains we keep returning the client-supplied/generated ids that we sent in _bulk. + List aossIds = serverless ? new ArrayList<>() : null; StringBuilder bulk = new StringBuilder(); int bulkBytes = 0; for (Document doc : documents) { String id = doc.getId() != null ? doc.getId() : UUID.randomUUID().toString(); - allIds.add(id); + clientIds.add(id); ObjectNode action = MAPPER.createObjectNode(); - action.putObject("index").put("_index", idx).put("_id", id); + ObjectNode indexAction = action.putObject("index").put("_index", idx); + // Amazon OpenSearch Serverless rejects custom _id in create/index operations + // ("Document ID is not supported in create/index operation request"). Auto-generated + // ids are mandatory on AOSS, so for serverless we omit _id here and harvest the + // AOSS-generated ids out of the _bulk response below. + if (!serverless) { + indexAction.put("_id", id); + } String actionLine = action.toString() + "\n"; ObjectNode source = MAPPER.createObjectNode(); @@ -419,7 +459,11 @@ public List addEmbedding( int entryBytes = actionLine.length() + sourceLine.length(); if (bulkBytes > 0 && bulkBytes + entryBytes > maxBulkBytes) { - executeRequest("POST", "/_bulk", bulk.toString()); + JsonNode resp = executeRequest("POST", "/_bulk", bulk.toString()); + checkBulkResponse(resp); + if (aossIds != null) { + collectBulkIds(resp, aossIds); + } bulk.setLength(0); bulkBytes = 0; } @@ -429,10 +473,14 @@ public List addEmbedding( } if (bulkBytes > 0) { - executeRequest("POST", "/_bulk", bulk.toString()); + JsonNode resp = executeRequest("POST", "/_bulk", bulk.toString()); + checkBulkResponse(resp); + if (aossIds != null) { + collectBulkIds(resp, aossIds); + } } - executeRequest("POST", "/" + idx + "/_refresh", null); - return allIds; + refreshIfSupported(idx); + return aossIds != null ? aossIds : clientIds; } @SuppressWarnings("unchecked") @@ -502,6 +550,76 @@ private JsonNode executeRequest(String method, String path, @Nullable String bod () -> doExecuteRequest(method, path, body), "OpenSearchRequest"); } + /** SHA-256 hex of the given bytes. Required by AOSS as x-amz-content-sha256. */ + private static String sha256Hex(byte[] data) { + try { + byte[] digest = MessageDigest.getInstance("SHA-256").digest(data); + StringBuilder sb = new StringBuilder(digest.length * 2); + for (byte b : digest) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } catch (Exception e) { + throw new RuntimeException("SHA-256 not available", e); + } + } + + /** + * The OpenSearch _bulk API returns HTTP 200 even when individual items fail (e.g. AOSS + * rejecting custom _id). The response has {@code errors:true} when any item failed; surface + * that as an exception so callers don't get silent partial-success behaviour. + */ + private static void checkBulkResponse(JsonNode resp) { + if (resp != null && resp.has("errors") && resp.get("errors").asBoolean()) { + String firstError = "unknown"; + JsonNode items = resp.path("items"); + if (items.isArray()) { + for (JsonNode it : items) { + JsonNode err = it.path("index").path("error"); + if (!err.isMissingNode()) { + firstError = err.toString(); + break; + } + } + } + throw new RuntimeException("OpenSearch _bulk had errors. First: " + firstError); + } + } + + /** + * Extract the {@code _id} from each {@code items[].index} entry of a successful {@code _bulk} + * response and append to {@code out}. Used on AOSS where ids are server-generated and the + * caller of {@code add()} needs them in order to later {@code get}/{@code delete} the + * documents. Caller must invoke {@link #checkBulkResponse(JsonNode)} first. + */ + private static void collectBulkIds(JsonNode resp, List out) { + if (resp == null) { + return; + } + JsonNode items = resp.path("items"); + if (!items.isArray()) { + return; + } + for (JsonNode it : items) { + JsonNode idNode = it.path("index").path("_id"); + if (!idNode.isMissingNode() && !idNode.isNull()) { + out.add(idNode.asText()); + } + } + } + + /** + * Refreshes the index if the underlying service supports it. Amazon OpenSearch Serverless does + * NOT expose the {@code _refresh} API and returns 404 — for AOSS we rely on the service's + * eventual-consistency window (~1-30s) instead. + */ + private void refreshIfSupported(String idx) { + if (serverless) { + return; + } + executeRequest("POST", "/" + idx + "/_refresh", null); + } + private static boolean isRetryableStatus(Exception e) { String msg = e.getMessage(); return msg != null @@ -518,8 +636,17 @@ private JsonNode doExecuteRequest(String method, String path, @Nullable String b .putHeader("Content-Type", "application/json"); if (body != null) { - reqBuilder.contentStreamProvider( - () -> new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8))); + byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8); + reqBuilder.contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes)); + // Amazon OpenSearch Serverless requires both Content-Length and + // x-amz-content-sha256 on signed write requests. The legacy Aws4Signer + // does not populate them when the body is supplied via + // contentStreamProvider, so we set them explicitly. OpenSearch Service + // domains accept the request with or without these headers. + reqBuilder.putHeader("Content-Length", String.valueOf(bodyBytes.length)); + if (useIamAuth) { + reqBuilder.putHeader("x-amz-content-sha256", sha256Hex(bodyBytes)); + } } SdkHttpFullRequest request;