Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Security.Cryptography;
using System.Text;
using System.Text.Json.Nodes;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
Expand All @@ -25,7 +27,8 @@ public class ContentDateEnrichment(

public string PipelineName => $"{_lookupAlias}-pipeline";

private string PolicyName => $"{_lookupAlias}-policy";
private string PolicyBaseName => $"{_lookupAlias}-policy";
private string PolicyName => $"{PolicyBaseName}-{ComputePolicyHash()}";

/// <summary>
/// Creates the lookup index (if needed), enrich policy, executes it, and creates the ingest pipeline.
Expand All @@ -37,6 +40,7 @@ public async Task InitializeAsync(Cancel ct)
await PutEnrichPolicyAsync(ct);
await ExecutePolicyAsync(ct);
await PutPipelineAsync(ct);
await CleanupOldPoliciesAsync(ct);
}

/// <summary>
Expand Down Expand Up @@ -180,27 +184,87 @@ private async Task DeleteIndexAsync(string indexName, Cancel ct)

private async Task PutEnrichPolicyAsync(Cancel ct)
{
var policy = new JsonObject
var response = await operations.WithRetryAsync(
() => transport.PutAsync<StringResponse>(
$"/_enrich/policy/{PolicyName}",
PostData.String(BuildPolicyBody().ToJsonString()),
ct
),
$"PUT _enrich/policy/{PolicyName}",
ct
);

if (response.ApiCallDetails.HasSuccessfulStatusCode)
{
["match"] = new JsonObject
{
["indices"] = _lookupAlias,
["match_field"] = "url",
["enrich_fields"] = new JsonArray("content_hash", "content_last_updated")
}
};
logger.LogInformation("Created enrich policy {Policy}", PolicyName);
return;
}

// Same-hash policy already exists — the definition is identical, safe to reuse
var errorType = response.Body != null
? JsonNode.Parse(response.Body)?["error"]?["type"]?.GetValue<string>()
: null;
if (errorType == "resource_already_exists_exception")
{
logger.LogInformation("Enrich policy {Policy} already exists, continuing", PolicyName);
return;
}

throw new InvalidOperationException(
$"Failed to create enrich policy {PolicyName}: {response.ApiCallDetails.DebugInformation}");
}

private string ComputePolicyHash()
{
var json = BuildPolicyBody().ToJsonString();
var hash = SHA256.HashData(Encoding.UTF8.GetBytes(json));
return Convert.ToHexString(hash)[..8].ToLowerInvariant();
}

private JsonObject BuildPolicyBody() => new()
{
["match"] = new JsonObject
{
["indices"] = _lookupAlias,
["match_field"] = "url",
["enrich_fields"] = new JsonArray("content_hash", "content_last_updated")
}
};

private async Task CleanupOldPoliciesAsync(Cancel ct)
{
var response = await operations.WithRetryAsync(
() => transport.PutAsync<StringResponse>($"/_enrich/policy/{PolicyName}", PostData.String(policy.ToJsonString()), ct),
$"PUT _enrich/policy/{PolicyName}",
() => transport.GetAsync<StringResponse>("/_enrich/policy", ct),
"GET /_enrich/policy",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to create enrich policy {PolicyName}: {response.ApiCallDetails.DebugInformation}");
{
logger.LogWarning("Failed to list enrich policies for cleanup: {Info}", response.ApiCallDetails.DebugInformation);
return;
}

logger.LogInformation("Created enrich policy {Policy}", PolicyName);
var json = JsonNode.Parse(response.Body);
var policies = json?["policies"]?.AsArray() ?? [];

foreach (var policy in policies)
{
var name = policy?["config"]?["match"]?["name"]?.GetValue<string>();
if (name == null || name == PolicyName || !name.StartsWith(PolicyBaseName, StringComparison.Ordinal))
continue;

var deleteResponse = await operations.WithRetryAsync(
() => transport.DeleteAsync<StringResponse>($"/_enrich/policy/{name}", new DefaultRequestParameters(), PostData.Empty, ct),
$"DELETE _enrich/policy/{name}",
ct
);

if (deleteResponse.ApiCallDetails.HasSuccessfulStatusCode)
logger.LogInformation("Deleted old enrich policy {Policy}", name);
else
logger.LogWarning("Failed to delete old enrich policy {Policy}: {Info}", name, deleteResponse.ApiCallDetails.DebugInformation);
}
}

private async Task ExecutePolicyAsync(Cancel ct)
Expand Down
Loading