diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs index b4d16d699..3ee19d5b8 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs @@ -11,8 +11,8 @@ namespace Elastic.Markdown.Exporters.Elasticsearch; /// /// Manages content-date tracking via an Elasticsearch enrich policy and ingest pipeline. -/// Instead of draining the lookup index into memory, the pipeline compares content hashes -/// at index time and preserves or updates content_last_updated accordingly. +/// Uses a stable alias over timestamped backing indices so that lookup data is atomically +/// swapped after a full reindex, avoiding any window where the lookup is empty. /// public class ContentDateEnrichment( DistributedTransport transport, @@ -21,11 +21,11 @@ public class ContentDateEnrichment( string buildType, string environment) { - private readonly string _lookupIndex = $"docs-{buildType}-content-dates-{environment}"; + private readonly string _lookupAlias = $"docs-{buildType}-content-dates-{environment}"; - public string PipelineName => $"{_lookupIndex}-pipeline"; + public string PipelineName => $"{_lookupAlias}-pipeline"; - private string PolicyName => $"{_lookupIndex}-policy"; + private string PolicyName => $"{_lookupAlias}-policy"; /// /// Creates the lookup index (if needed), enrich policy, executes it, and creates the ingest pipeline. @@ -40,35 +40,79 @@ public async Task InitializeAsync(Cancel ct) } /// - /// After indexing completes, syncs the lookup index from the lexical index and re-executes the enrich policy. - /// This replaces all lookup entries with current data (implicitly removing orphans) and ensures the next - /// run's pipeline sees up-to-date content hashes. + /// After indexing completes, reindexes into a fresh staging index and atomically swaps the + /// alias to point at it. The old backing index is deleted only after the swap succeeds. + /// This replaces all lookup entries with current data (implicitly removing orphans) and ensures + /// the next run's pipeline sees up-to-date content hashes. /// public async Task SyncLookupIndexAsync(string lexicalAlias, Cancel ct) { - logger.LogInformation("Syncing content date lookup index {Index} from {Source}", _lookupIndex, lexicalAlias); + logger.LogInformation("Syncing content date lookup from {Source} via staging index", lexicalAlias); + + var oldIndex = await ResolveBackingIndexAsync(ct); + var stagingIndex = GenerateStagingName(); + + await CreateLookupIndexAsync(stagingIndex, ct); + await ReindexToLookupAsync(lexicalAlias, stagingIndex, ct); + await RefreshIndexAsync(stagingIndex, ct); + await SwapAliasAsync(oldIndex, stagingIndex, ct); + + if (oldIndex != null) + await DeleteIndexAsync(oldIndex, ct); - await DeleteLookupContentsAsync(ct); - await ReindexToLookupAsync(lexicalAlias, ct); - await RefreshLookupIndexAsync(ct); await ExecutePolicyAsync(ct); logger.LogInformation("Content date lookup sync complete"); } - private async Task EnsureLookupIndexAsync(Cancel ct) + private string GenerateStagingName() => + $"{_lookupAlias}-{DateTime.UtcNow:yyyyMMddHHmmss}-{Guid.NewGuid().ToString("N")[..8]}"; + + private async Task ResolveBackingIndexAsync(Cancel ct) { - var head = await operations.WithRetryAsync( - () => transport.HeadAsync(_lookupIndex, ct), - $"HEAD {_lookupIndex}", + var response = await operations.WithRetryAsync( + () => transport.GetAsync($"/_alias/{_lookupAlias}", ct), + $"GET /_alias/{_lookupAlias}", ct ); - if (head.ApiCallDetails.HttpStatusCode == 200) + + if (response.ApiCallDetails.HttpStatusCode == 404) + return null; + + if (!response.ApiCallDetails.HasSuccessfulStatusCode) + throw new InvalidOperationException( + $"Failed to resolve alias {_lookupAlias}: {response.ApiCallDetails.DebugInformation}"); + + var json = JsonNode.Parse(response.Body); + var indices = json?.AsObject().Select(kv => kv.Key).ToList() ?? []; + + return indices.Count switch + { + 0 => null, + 1 => indices[0], + _ => throw new InvalidOperationException( + $"Alias {_lookupAlias} points to multiple indices ({string.Join(", ", indices)}); expected exactly one") + }; + } + + private async Task EnsureLookupIndexAsync(Cancel ct) + { + var existing = await ResolveBackingIndexAsync(ct); + if (existing != null) { - logger.LogInformation("Content date lookup index {Index} already exists", _lookupIndex); + logger.LogInformation("Content date lookup alias {Alias} already exists, backed by {Index}", _lookupAlias, existing); return; } + var indexName = GenerateStagingName(); + await CreateLookupIndexAsync(indexName, ct); + await SwapAliasAsync(null, indexName, ct); + + logger.LogInformation("Created content date lookup index {Index} with alias {Alias}", indexName, _lookupAlias); + } + + private async Task CreateLookupIndexAsync(string indexName, Cancel ct) + { var mapping = new JsonObject { ["settings"] = new JsonObject { ["number_of_shards"] = 1, ["number_of_replicas"] = 0 }, @@ -84,15 +128,55 @@ private async Task EnsureLookupIndexAsync(Cancel ct) }; var response = await operations.WithRetryAsync( - () => transport.PutAsync(_lookupIndex, PostData.String(mapping.ToJsonString()), ct), - $"PUT {_lookupIndex}", + () => transport.PutAsync(indexName, PostData.String(mapping.ToJsonString()), ct), + $"PUT {indexName}", ct ); if (!response.ApiCallDetails.HasSuccessfulStatusCode) throw new InvalidOperationException( - $"Failed to create content date lookup index {_lookupIndex}: {response.ApiCallDetails.DebugInformation}"); + $"Failed to create content date lookup index {indexName}: {response.ApiCallDetails.DebugInformation}"); - logger.LogInformation("Created content date lookup index {Index}", _lookupIndex); + logger.LogInformation("Created content date lookup index {Index}", indexName); + } + + private async Task SwapAliasAsync(string? oldIndex, string newIndex, Cancel ct) + { + var addAction = new JsonObject { ["add"] = new JsonObject { ["index"] = newIndex, ["alias"] = _lookupAlias } }; + + var actions = oldIndex != null + ? new JsonArray( + new JsonObject { ["remove"] = new JsonObject { ["index"] = oldIndex, ["alias"] = _lookupAlias } }, + addAction + ) + : new JsonArray(addAction); + + var body = new JsonObject { ["actions"] = actions }; + + var response = await operations.WithRetryAsync( + () => transport.PostAsync("/_aliases", PostData.String(body.ToJsonString()), ct), + "POST /_aliases", + ct + ); + + if (!response.ApiCallDetails.HasSuccessfulStatusCode) + throw new InvalidOperationException( + $"Failed to swap alias {_lookupAlias} to {newIndex}: {response.ApiCallDetails.DebugInformation}"); + + logger.LogInformation("Swapped alias {Alias} from {OldIndex} to {NewIndex}", _lookupAlias, oldIndex ?? "(none)", newIndex); + } + + private async Task DeleteIndexAsync(string indexName, Cancel ct) + { + var response = await operations.WithRetryAsync( + () => transport.DeleteAsync(indexName, new DefaultRequestParameters(), PostData.Empty, ct), + $"DELETE {indexName}", + ct + ); + + if (!response.ApiCallDetails.HasSuccessfulStatusCode) + logger.LogWarning("Failed to delete old lookup index {Index}: {Info}", indexName, response.ApiCallDetails.DebugInformation); + else + logger.LogInformation("Deleted old lookup index {Index}", indexName); } private async Task PutEnrichPolicyAsync(Cancel ct) @@ -101,7 +185,7 @@ private async Task PutEnrichPolicyAsync(Cancel ct) { ["match"] = new JsonObject { - ["indices"] = _lookupIndex, + ["indices"] = _lookupAlias, ["match_field"] = "url", ["enrich_fields"] = new JsonArray("content_hash", "content_last_updated") } @@ -190,30 +274,22 @@ private async Task PutPipelineAsync(Cancel ct) logger.LogInformation("Created ingest pipeline {Pipeline}", PipelineName); } - private async Task RefreshLookupIndexAsync(Cancel ct) + private async Task RefreshIndexAsync(string indexName, Cancel ct) { var response = await operations.WithRetryAsync( - () => transport.PostAsync($"/{_lookupIndex}/_refresh", PostData.Empty, ct), - $"POST {_lookupIndex}/_refresh", + () => transport.PostAsync($"/{indexName}/_refresh", PostData.Empty, ct), + $"POST {indexName}/_refresh", ct ); if (!response.ApiCallDetails.HasSuccessfulStatusCode) - logger.LogWarning("Failed to refresh lookup index {Index}: {Info}", _lookupIndex, response.ApiCallDetails.DebugInformation); - else - logger.LogInformation("Refreshed lookup index {Index}", _lookupIndex); - } + throw new InvalidOperationException( + $"Failed to refresh index {indexName}: {response.ApiCallDetails.DebugInformation}"); - private async Task DeleteLookupContentsAsync(Cancel ct) - { - var body = new JsonObject - { - ["query"] = new JsonObject { ["match_all"] = new JsonObject() } - }; - await operations.DeleteByQueryAsync(_lookupIndex, PostData.String(body.ToJsonString()), ct); + logger.LogInformation("Refreshed index {Index}", indexName); } - private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct) + private async Task ReindexToLookupAsync(string sourceAlias, string destIndex, Cancel ct) { var reindexBody = new JsonObject { @@ -224,7 +300,7 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct) }, ["dest"] = new JsonObject { - ["index"] = _lookupIndex + ["index"] = destIndex }, ["script"] = new JsonObject { @@ -233,6 +309,6 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct) } }; - await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), _lookupIndex, ct); + await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), destIndex, ct); } }