Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
Expand Down Expand Up @@ -245,11 +246,18 @@ private boolean indexExists(String idx) {
}

private void createKnnIndex(String idx) {
// Use the FAISS engine with HNSW: it supports both pre-filtered and post-filtered KNN
// queries (the default NMSLIB engine on AOSS does NOT support filters and rejects
// queries with "Engine [NMSLIB] does not support filters"), and is the recommended
// engine for both AOSS VECTORSEARCH collections and OpenSearch Service domains 2.x+.
String body =
String.format(
"{\"settings\":{\"index\":{\"knn\":true}},"
+ "\"mappings\":{\"properties\":{\"%s\":{\"type\":\"knn_vector\","
+ "\"dimension\":%d},\"%s\":{\"type\":\"text\"},"
+ "\"dimension\":%d,"
+ "\"method\":{\"engine\":\"faiss\",\"name\":\"hnsw\","
+ "\"space_type\":\"l2\"}},"
+ "\"%s\":{\"type\":\"text\"},"
+ "\"metadata\":{\"type\":\"object\"}}}}",
vectorField, dims, contentField);
try {
Expand All @@ -259,6 +267,16 @@ private void createKnnIndex(String idx) {
throw e;
}
}
if (serverless) {
// AOSS index creation is eventually consistent; the index returns 200 on PUT but
// queries against it can fail with "no such index" for ~5-30s afterward. Give the
// service a short window to propagate before any read/write hits the index.
try {
Thread.sleep(15_000L);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}

/** Sanitize collection name to valid OpenSearch index name (lowercase, no special chars). */
Expand Down Expand Up @@ -326,7 +344,7 @@ public void delete(
}
}
executeRequest("POST", "/" + idx + "/_delete_by_query", body.toString());
executeRequest("POST", "/" + idx + "/_refresh", null);
refreshIfSupported(idx);
}

@Override
Expand Down Expand Up @@ -377,7 +395,19 @@ public List<Document> queryEmbedding(
public void updateEmbedding(
List<Document> documents, @Nullable String collection, Map<String, Object> extraArgs)
throws IOException {
// OpenSearch's bulk index operation is upsert-by-id, so addEmbedding doubles as update.
// OpenSearch's bulk index operation is upsert-by-id, so addEmbedding doubles as update on
// OpenSearch Service domains. On Amazon OpenSearch Serverless this pattern cannot work:
// AOSS rejects client-supplied _id in create/index operations, so the addEmbedding path
// below has to fall back to AOSS-generated ids. With no client-controllable id there is
// no way to target an existing document, and "updating" would silently insert a new copy
// instead, which would be worse than failing loudly.
if (serverless) {
throw new UnsupportedOperationException(
"updateEmbedding is not supported on Amazon OpenSearch Serverless: AOSS does"
+ " not allow clients to specify document ids, so update-by-id cannot be"
+ " implemented. Use a provisioned OpenSearch Service domain if you need"
+ " in-place embedding updates.");
}
// BaseVectorStore.update() already enforces that every document carries an id, so
// addEmbedding will not generate new ones here.
addEmbedding(documents, collection, extraArgs);
Expand All @@ -391,16 +421,26 @@ public List<String> addEmbedding(
if (!indexExists(idx)) {
createKnnIndex(idx);
}
List<String> allIds = new ArrayList<>();
List<String> clientIds = new ArrayList<>();
// For serverless we accumulate ids returned by AOSS across batches; for provisioned
// domains we keep returning the client-supplied/generated ids that we sent in _bulk.
List<String> aossIds = serverless ? new ArrayList<>() : null;
StringBuilder bulk = new StringBuilder();
int bulkBytes = 0;

for (Document doc : documents) {
String id = doc.getId() != null ? doc.getId() : UUID.randomUUID().toString();
allIds.add(id);
clientIds.add(id);

ObjectNode action = MAPPER.createObjectNode();
action.putObject("index").put("_index", idx).put("_id", id);
ObjectNode indexAction = action.putObject("index").put("_index", idx);
// Amazon OpenSearch Serverless rejects custom _id in create/index operations
// ("Document ID is not supported in create/index operation request"). Auto-generated
// ids are mandatory on AOSS, so for serverless we omit _id here and harvest the
// AOSS-generated ids out of the _bulk response below.
if (!serverless) {
indexAction.put("_id", id);
}
String actionLine = action.toString() + "\n";

ObjectNode source = MAPPER.createObjectNode();
Expand All @@ -419,7 +459,11 @@ public List<String> addEmbedding(
int entryBytes = actionLine.length() + sourceLine.length();

if (bulkBytes > 0 && bulkBytes + entryBytes > maxBulkBytes) {
executeRequest("POST", "/_bulk", bulk.toString());
JsonNode resp = executeRequest("POST", "/_bulk", bulk.toString());
checkBulkResponse(resp);
if (aossIds != null) {
collectBulkIds(resp, aossIds);
}
bulk.setLength(0);
bulkBytes = 0;
}
Expand All @@ -429,10 +473,14 @@ public List<String> addEmbedding(
}

if (bulkBytes > 0) {
executeRequest("POST", "/_bulk", bulk.toString());
JsonNode resp = executeRequest("POST", "/_bulk", bulk.toString());
checkBulkResponse(resp);
if (aossIds != null) {
collectBulkIds(resp, aossIds);
}
}
executeRequest("POST", "/" + idx + "/_refresh", null);
return allIds;
refreshIfSupported(idx);
return aossIds != null ? aossIds : clientIds;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -502,6 +550,76 @@ private JsonNode executeRequest(String method, String path, @Nullable String bod
() -> doExecuteRequest(method, path, body), "OpenSearchRequest");
}

/** SHA-256 hex of the given bytes. Required by AOSS as x-amz-content-sha256. */
private static String sha256Hex(byte[] data) {
try {
byte[] digest = MessageDigest.getInstance("SHA-256").digest(data);
StringBuilder sb = new StringBuilder(digest.length * 2);
for (byte b : digest) {
sb.append(String.format("%02x", b));
}
return sb.toString();
} catch (Exception e) {
throw new RuntimeException("SHA-256 not available", e);
}
}

/**
* The OpenSearch _bulk API returns HTTP 200 even when individual items fail (e.g. AOSS
* rejecting custom _id). The response has {@code errors:true} when any item failed; surface
* that as an exception so callers don't get silent partial-success behaviour.
*/
private static void checkBulkResponse(JsonNode resp) {
if (resp != null && resp.has("errors") && resp.get("errors").asBoolean()) {
String firstError = "unknown";
JsonNode items = resp.path("items");
if (items.isArray()) {
for (JsonNode it : items) {
JsonNode err = it.path("index").path("error");
if (!err.isMissingNode()) {
firstError = err.toString();
break;
}
}
}
throw new RuntimeException("OpenSearch _bulk had errors. First: " + firstError);
}
}

/**
* Extract the {@code _id} from each {@code items[].index} entry of a successful {@code _bulk}
* response and append to {@code out}. Used on AOSS where ids are server-generated and the
* caller of {@code add()} needs them in order to later {@code get}/{@code delete} the
* documents. Caller must invoke {@link #checkBulkResponse(JsonNode)} first.
*/
private static void collectBulkIds(JsonNode resp, List<String> out) {
if (resp == null) {
return;
}
JsonNode items = resp.path("items");
if (!items.isArray()) {
return;
}
for (JsonNode it : items) {
JsonNode idNode = it.path("index").path("_id");
if (!idNode.isMissingNode() && !idNode.isNull()) {
out.add(idNode.asText());
}
}
}

/**
* Refreshes the index if the underlying service supports it. Amazon OpenSearch Serverless does
* NOT expose the {@code _refresh} API and returns 404 — for AOSS we rely on the service's
* eventual-consistency window (~1-30s) instead.
*/
private void refreshIfSupported(String idx) {
if (serverless) {
return;
}
executeRequest("POST", "/" + idx + "/_refresh", null);
}

private static boolean isRetryableStatus(Exception e) {
String msg = e.getMessage();
return msg != null
Expand All @@ -518,8 +636,17 @@ private JsonNode doExecuteRequest(String method, String path, @Nullable String b
.putHeader("Content-Type", "application/json");

if (body != null) {
reqBuilder.contentStreamProvider(
() -> new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
reqBuilder.contentStreamProvider(() -> new ByteArrayInputStream(bodyBytes));
// Amazon OpenSearch Serverless requires both Content-Length and
// x-amz-content-sha256 on signed write requests. The legacy Aws4Signer
// does not populate them when the body is supplied via
// contentStreamProvider, so we set them explicitly. OpenSearch Service
// domains accept the request with or without these headers.
reqBuilder.putHeader("Content-Length", String.valueOf(bodyBytes.length));
if (useIamAuth) {
reqBuilder.putHeader("x-amz-content-sha256", sha256Hex(bodyBytes));
}
}

SdkHttpFullRequest request;
Expand Down
Loading