diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs
index b4d16d699..3ee19d5b8 100644
--- a/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs
+++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs
@@ -11,8 +11,8 @@ namespace Elastic.Markdown.Exporters.Elasticsearch;
///
/// Manages content-date tracking via an Elasticsearch enrich policy and ingest pipeline.
-/// Instead of draining the lookup index into memory, the pipeline compares content hashes
-/// at index time and preserves or updates content_last_updated accordingly.
+/// Uses a stable alias over timestamped backing indices so that lookup data is atomically
+/// swapped after a full reindex, avoiding any window where the lookup is empty.
///
public class ContentDateEnrichment(
DistributedTransport transport,
@@ -21,11 +21,11 @@ public class ContentDateEnrichment(
string buildType,
string environment)
{
- private readonly string _lookupIndex = $"docs-{buildType}-content-dates-{environment}";
+ private readonly string _lookupAlias = $"docs-{buildType}-content-dates-{environment}";
- public string PipelineName => $"{_lookupIndex}-pipeline";
+ public string PipelineName => $"{_lookupAlias}-pipeline";
- private string PolicyName => $"{_lookupIndex}-policy";
+ private string PolicyName => $"{_lookupAlias}-policy";
///
/// Creates the lookup index (if needed), enrich policy, executes it, and creates the ingest pipeline.
@@ -40,35 +40,79 @@ public async Task InitializeAsync(Cancel ct)
}
///
- /// After indexing completes, syncs the lookup index from the lexical index and re-executes the enrich policy.
- /// This replaces all lookup entries with current data (implicitly removing orphans) and ensures the next
- /// run's pipeline sees up-to-date content hashes.
+ /// After indexing completes, reindexes into a fresh staging index and atomically swaps the
+ /// alias to point at it. The old backing index is deleted only after the swap succeeds.
+ /// This replaces all lookup entries with current data (implicitly removing orphans) and ensures
+ /// the next run's pipeline sees up-to-date content hashes.
///
public async Task SyncLookupIndexAsync(string lexicalAlias, Cancel ct)
{
- logger.LogInformation("Syncing content date lookup index {Index} from {Source}", _lookupIndex, lexicalAlias);
+ logger.LogInformation("Syncing content date lookup from {Source} via staging index", lexicalAlias);
+
+ var oldIndex = await ResolveBackingIndexAsync(ct);
+ var stagingIndex = GenerateStagingName();
+
+ await CreateLookupIndexAsync(stagingIndex, ct);
+ await ReindexToLookupAsync(lexicalAlias, stagingIndex, ct);
+ await RefreshIndexAsync(stagingIndex, ct);
+ await SwapAliasAsync(oldIndex, stagingIndex, ct);
+
+ if (oldIndex != null)
+ await DeleteIndexAsync(oldIndex, ct);
- await DeleteLookupContentsAsync(ct);
- await ReindexToLookupAsync(lexicalAlias, ct);
- await RefreshLookupIndexAsync(ct);
await ExecutePolicyAsync(ct);
logger.LogInformation("Content date lookup sync complete");
}
- private async Task EnsureLookupIndexAsync(Cancel ct)
+ private string GenerateStagingName() =>
+ $"{_lookupAlias}-{DateTime.UtcNow:yyyyMMddHHmmss}-{Guid.NewGuid().ToString("N")[..8]}";
+
+ private async Task ResolveBackingIndexAsync(Cancel ct)
{
- var head = await operations.WithRetryAsync(
- () => transport.HeadAsync(_lookupIndex, ct),
- $"HEAD {_lookupIndex}",
+ var response = await operations.WithRetryAsync(
+ () => transport.GetAsync($"/_alias/{_lookupAlias}", ct),
+ $"GET /_alias/{_lookupAlias}",
ct
);
- if (head.ApiCallDetails.HttpStatusCode == 200)
+
+ if (response.ApiCallDetails.HttpStatusCode == 404)
+ return null;
+
+ if (!response.ApiCallDetails.HasSuccessfulStatusCode)
+ throw new InvalidOperationException(
+ $"Failed to resolve alias {_lookupAlias}: {response.ApiCallDetails.DebugInformation}");
+
+ var json = JsonNode.Parse(response.Body);
+ var indices = json?.AsObject().Select(kv => kv.Key).ToList() ?? [];
+
+ return indices.Count switch
+ {
+ 0 => null,
+ 1 => indices[0],
+ _ => throw new InvalidOperationException(
+ $"Alias {_lookupAlias} points to multiple indices ({string.Join(", ", indices)}); expected exactly one")
+ };
+ }
+
+ private async Task EnsureLookupIndexAsync(Cancel ct)
+ {
+ var existing = await ResolveBackingIndexAsync(ct);
+ if (existing != null)
{
- logger.LogInformation("Content date lookup index {Index} already exists", _lookupIndex);
+ logger.LogInformation("Content date lookup alias {Alias} already exists, backed by {Index}", _lookupAlias, existing);
return;
}
+ var indexName = GenerateStagingName();
+ await CreateLookupIndexAsync(indexName, ct);
+ await SwapAliasAsync(null, indexName, ct);
+
+ logger.LogInformation("Created content date lookup index {Index} with alias {Alias}", indexName, _lookupAlias);
+ }
+
+ private async Task CreateLookupIndexAsync(string indexName, Cancel ct)
+ {
var mapping = new JsonObject
{
["settings"] = new JsonObject { ["number_of_shards"] = 1, ["number_of_replicas"] = 0 },
@@ -84,15 +128,55 @@ private async Task EnsureLookupIndexAsync(Cancel ct)
};
var response = await operations.WithRetryAsync(
- () => transport.PutAsync(_lookupIndex, PostData.String(mapping.ToJsonString()), ct),
- $"PUT {_lookupIndex}",
+ () => transport.PutAsync(indexName, PostData.String(mapping.ToJsonString()), ct),
+ $"PUT {indexName}",
ct
);
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
- $"Failed to create content date lookup index {_lookupIndex}: {response.ApiCallDetails.DebugInformation}");
+ $"Failed to create content date lookup index {indexName}: {response.ApiCallDetails.DebugInformation}");
- logger.LogInformation("Created content date lookup index {Index}", _lookupIndex);
+ logger.LogInformation("Created content date lookup index {Index}", indexName);
+ }
+
+ private async Task SwapAliasAsync(string? oldIndex, string newIndex, Cancel ct)
+ {
+ var addAction = new JsonObject { ["add"] = new JsonObject { ["index"] = newIndex, ["alias"] = _lookupAlias } };
+
+ var actions = oldIndex != null
+ ? new JsonArray(
+ new JsonObject { ["remove"] = new JsonObject { ["index"] = oldIndex, ["alias"] = _lookupAlias } },
+ addAction
+ )
+ : new JsonArray(addAction);
+
+ var body = new JsonObject { ["actions"] = actions };
+
+ var response = await operations.WithRetryAsync(
+ () => transport.PostAsync("/_aliases", PostData.String(body.ToJsonString()), ct),
+ "POST /_aliases",
+ ct
+ );
+
+ if (!response.ApiCallDetails.HasSuccessfulStatusCode)
+ throw new InvalidOperationException(
+ $"Failed to swap alias {_lookupAlias} to {newIndex}: {response.ApiCallDetails.DebugInformation}");
+
+ logger.LogInformation("Swapped alias {Alias} from {OldIndex} to {NewIndex}", _lookupAlias, oldIndex ?? "(none)", newIndex);
+ }
+
+ private async Task DeleteIndexAsync(string indexName, Cancel ct)
+ {
+ var response = await operations.WithRetryAsync(
+ () => transport.DeleteAsync(indexName, new DefaultRequestParameters(), PostData.Empty, ct),
+ $"DELETE {indexName}",
+ ct
+ );
+
+ if (!response.ApiCallDetails.HasSuccessfulStatusCode)
+ logger.LogWarning("Failed to delete old lookup index {Index}: {Info}", indexName, response.ApiCallDetails.DebugInformation);
+ else
+ logger.LogInformation("Deleted old lookup index {Index}", indexName);
}
private async Task PutEnrichPolicyAsync(Cancel ct)
@@ -101,7 +185,7 @@ private async Task PutEnrichPolicyAsync(Cancel ct)
{
["match"] = new JsonObject
{
- ["indices"] = _lookupIndex,
+ ["indices"] = _lookupAlias,
["match_field"] = "url",
["enrich_fields"] = new JsonArray("content_hash", "content_last_updated")
}
@@ -190,30 +274,22 @@ private async Task PutPipelineAsync(Cancel ct)
logger.LogInformation("Created ingest pipeline {Pipeline}", PipelineName);
}
- private async Task RefreshLookupIndexAsync(Cancel ct)
+ private async Task RefreshIndexAsync(string indexName, Cancel ct)
{
var response = await operations.WithRetryAsync(
- () => transport.PostAsync($"/{_lookupIndex}/_refresh", PostData.Empty, ct),
- $"POST {_lookupIndex}/_refresh",
+ () => transport.PostAsync($"/{indexName}/_refresh", PostData.Empty, ct),
+ $"POST {indexName}/_refresh",
ct
);
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
- logger.LogWarning("Failed to refresh lookup index {Index}: {Info}", _lookupIndex, response.ApiCallDetails.DebugInformation);
- else
- logger.LogInformation("Refreshed lookup index {Index}", _lookupIndex);
- }
+ throw new InvalidOperationException(
+ $"Failed to refresh index {indexName}: {response.ApiCallDetails.DebugInformation}");
- private async Task DeleteLookupContentsAsync(Cancel ct)
- {
- var body = new JsonObject
- {
- ["query"] = new JsonObject { ["match_all"] = new JsonObject() }
- };
- await operations.DeleteByQueryAsync(_lookupIndex, PostData.String(body.ToJsonString()), ct);
+ logger.LogInformation("Refreshed index {Index}", indexName);
}
- private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
+ private async Task ReindexToLookupAsync(string sourceAlias, string destIndex, Cancel ct)
{
var reindexBody = new JsonObject
{
@@ -224,7 +300,7 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
},
["dest"] = new JsonObject
{
- ["index"] = _lookupIndex
+ ["index"] = destIndex
},
["script"] = new JsonObject
{
@@ -233,6 +309,6 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
}
};
- await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), _lookupIndex, ct);
+ await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), destIndex, ct);
}
}