Skip to content

Commit 8efe820

Browse files
reakaleekclaude
andauthored
Search: Use staging index + alias swap for content date lookup sync (#3098)
* Search: Use staging index + alias swap for content date lookup sync Replace the delete-then-reindex flow in ContentDateEnrichment with a staging index + atomic alias swap to eliminate the window where the lookup index is empty. If a reindex fails, the previous lookup data remains intact behind the alias. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Search: Harden staging index lifecycle error handling Make RefreshIndexAsync throw on failure so SwapAliasAsync never runs against an unrefreshed staging index. Distinguish 404 from transient errors in ResolveBackingIndexAsync and fail deterministically when the alias points at multiple indices. Add a GUID suffix to staging index names for collision resistance under concurrent runs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9a64e85 commit 8efe820

1 file changed

Lines changed: 116 additions & 40 deletions

File tree

src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs

Lines changed: 116 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ namespace Elastic.Markdown.Exporters.Elasticsearch;
1111

1212
/// <summary>
1313
/// Manages content-date tracking via an Elasticsearch enrich policy and ingest pipeline.
14-
/// Instead of draining the lookup index into memory, the pipeline compares content hashes
15-
/// at index time and preserves or updates <c>content_last_updated</c> accordingly.
14+
/// Uses a stable alias over timestamped backing indices so that lookup data is atomically
15+
/// swapped after a full reindex, avoiding any window where the lookup is empty.
1616
/// </summary>
1717
public class ContentDateEnrichment(
1818
DistributedTransport transport,
@@ -21,11 +21,11 @@ public class ContentDateEnrichment(
2121
string buildType,
2222
string environment)
2323
{
24-
private readonly string _lookupIndex = $"docs-{buildType}-content-dates-{environment}";
24+
private readonly string _lookupAlias = $"docs-{buildType}-content-dates-{environment}";
2525

26-
public string PipelineName => $"{_lookupIndex}-pipeline";
26+
public string PipelineName => $"{_lookupAlias}-pipeline";
2727

28-
private string PolicyName => $"{_lookupIndex}-policy";
28+
private string PolicyName => $"{_lookupAlias}-policy";
2929

3030
/// <summary>
3131
/// Creates the lookup index (if needed), enrich policy, executes it, and creates the ingest pipeline.
@@ -40,35 +40,79 @@ public async Task InitializeAsync(Cancel ct)
4040
}
4141

4242
/// <summary>
43-
/// After indexing completes, syncs the lookup index from the lexical index and re-executes the enrich policy.
44-
/// This replaces all lookup entries with current data (implicitly removing orphans) and ensures the next
45-
/// run's pipeline sees up-to-date content hashes.
43+
/// After indexing completes, reindexes into a fresh staging index and atomically swaps the
44+
/// alias to point at it. The old backing index is deleted only after the swap succeeds.
45+
/// This replaces all lookup entries with current data (implicitly removing orphans) and ensures
46+
/// the next run's pipeline sees up-to-date content hashes.
4647
/// </summary>
4748
public async Task SyncLookupIndexAsync(string lexicalAlias, Cancel ct)
4849
{
49-
logger.LogInformation("Syncing content date lookup index {Index} from {Source}", _lookupIndex, lexicalAlias);
50+
logger.LogInformation("Syncing content date lookup from {Source} via staging index", lexicalAlias);
51+
52+
var oldIndex = await ResolveBackingIndexAsync(ct);
53+
var stagingIndex = GenerateStagingName();
54+
55+
await CreateLookupIndexAsync(stagingIndex, ct);
56+
await ReindexToLookupAsync(lexicalAlias, stagingIndex, ct);
57+
await RefreshIndexAsync(stagingIndex, ct);
58+
await SwapAliasAsync(oldIndex, stagingIndex, ct);
59+
60+
if (oldIndex != null)
61+
await DeleteIndexAsync(oldIndex, ct);
5062

51-
await DeleteLookupContentsAsync(ct);
52-
await ReindexToLookupAsync(lexicalAlias, ct);
53-
await RefreshLookupIndexAsync(ct);
5463
await ExecutePolicyAsync(ct);
5564

5665
logger.LogInformation("Content date lookup sync complete");
5766
}
5867

59-
private async Task EnsureLookupIndexAsync(Cancel ct)
68+
private string GenerateStagingName() =>
69+
$"{_lookupAlias}-{DateTime.UtcNow:yyyyMMddHHmmss}-{Guid.NewGuid().ToString("N")[..8]}";
70+
71+
private async Task<string?> ResolveBackingIndexAsync(Cancel ct)
6072
{
61-
var head = await operations.WithRetryAsync(
62-
() => transport.HeadAsync(_lookupIndex, ct),
63-
$"HEAD {_lookupIndex}",
73+
var response = await operations.WithRetryAsync(
74+
() => transport.GetAsync<StringResponse>($"/_alias/{_lookupAlias}", ct),
75+
$"GET /_alias/{_lookupAlias}",
6476
ct
6577
);
66-
if (head.ApiCallDetails.HttpStatusCode == 200)
78+
79+
if (response.ApiCallDetails.HttpStatusCode == 404)
80+
return null;
81+
82+
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
83+
throw new InvalidOperationException(
84+
$"Failed to resolve alias {_lookupAlias}: {response.ApiCallDetails.DebugInformation}");
85+
86+
var json = JsonNode.Parse(response.Body);
87+
var indices = json?.AsObject().Select(kv => kv.Key).ToList() ?? [];
88+
89+
return indices.Count switch
90+
{
91+
0 => null,
92+
1 => indices[0],
93+
_ => throw new InvalidOperationException(
94+
$"Alias {_lookupAlias} points to multiple indices ({string.Join(", ", indices)}); expected exactly one")
95+
};
96+
}
97+
98+
private async Task EnsureLookupIndexAsync(Cancel ct)
99+
{
100+
var existing = await ResolveBackingIndexAsync(ct);
101+
if (existing != null)
67102
{
68-
logger.LogInformation("Content date lookup index {Index} already exists", _lookupIndex);
103+
logger.LogInformation("Content date lookup alias {Alias} already exists, backed by {Index}", _lookupAlias, existing);
69104
return;
70105
}
71106

107+
var indexName = GenerateStagingName();
108+
await CreateLookupIndexAsync(indexName, ct);
109+
await SwapAliasAsync(null, indexName, ct);
110+
111+
logger.LogInformation("Created content date lookup index {Index} with alias {Alias}", indexName, _lookupAlias);
112+
}
113+
114+
private async Task CreateLookupIndexAsync(string indexName, Cancel ct)
115+
{
72116
var mapping = new JsonObject
73117
{
74118
["mappings"] = new JsonObject
@@ -83,15 +127,55 @@ private async Task EnsureLookupIndexAsync(Cancel ct)
83127
};
84128

85129
var response = await operations.WithRetryAsync(
86-
() => transport.PutAsync<StringResponse>(_lookupIndex, PostData.String(mapping.ToJsonString()), ct),
87-
$"PUT {_lookupIndex}",
130+
() => transport.PutAsync<StringResponse>(indexName, PostData.String(mapping.ToJsonString()), ct),
131+
$"PUT {indexName}",
88132
ct
89133
);
90134
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
91135
throw new InvalidOperationException(
92-
$"Failed to create content date lookup index {_lookupIndex}: {response.ApiCallDetails.DebugInformation}");
136+
$"Failed to create content date lookup index {indexName}: {response.ApiCallDetails.DebugInformation}");
93137

94-
logger.LogInformation("Created content date lookup index {Index}", _lookupIndex);
138+
logger.LogInformation("Created content date lookup index {Index}", indexName);
139+
}
140+
141+
private async Task SwapAliasAsync(string? oldIndex, string newIndex, Cancel ct)
142+
{
143+
var addAction = new JsonObject { ["add"] = new JsonObject { ["index"] = newIndex, ["alias"] = _lookupAlias } };
144+
145+
var actions = oldIndex != null
146+
? new JsonArray(
147+
new JsonObject { ["remove"] = new JsonObject { ["index"] = oldIndex, ["alias"] = _lookupAlias } },
148+
addAction
149+
)
150+
: new JsonArray(addAction);
151+
152+
var body = new JsonObject { ["actions"] = actions };
153+
154+
var response = await operations.WithRetryAsync(
155+
() => transport.PostAsync<StringResponse>("/_aliases", PostData.String(body.ToJsonString()), ct),
156+
"POST /_aliases",
157+
ct
158+
);
159+
160+
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
161+
throw new InvalidOperationException(
162+
$"Failed to swap alias {_lookupAlias} to {newIndex}: {response.ApiCallDetails.DebugInformation}");
163+
164+
logger.LogInformation("Swapped alias {Alias} from {OldIndex} to {NewIndex}", _lookupAlias, oldIndex ?? "(none)", newIndex);
165+
}
166+
167+
private async Task DeleteIndexAsync(string indexName, Cancel ct)
168+
{
169+
var response = await operations.WithRetryAsync(
170+
() => transport.DeleteAsync<StringResponse>(indexName, new DefaultRequestParameters(), PostData.Empty, ct),
171+
$"DELETE {indexName}",
172+
ct
173+
);
174+
175+
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
176+
logger.LogWarning("Failed to delete old lookup index {Index}: {Info}", indexName, response.ApiCallDetails.DebugInformation);
177+
else
178+
logger.LogInformation("Deleted old lookup index {Index}", indexName);
95179
}
96180

97181
private async Task PutEnrichPolicyAsync(Cancel ct)
@@ -100,7 +184,7 @@ private async Task PutEnrichPolicyAsync(Cancel ct)
100184
{
101185
["match"] = new JsonObject
102186
{
103-
["indices"] = _lookupIndex,
187+
["indices"] = _lookupAlias,
104188
["match_field"] = "url",
105189
["enrich_fields"] = new JsonArray("content_hash", "content_last_updated")
106190
}
@@ -189,30 +273,22 @@ private async Task PutPipelineAsync(Cancel ct)
189273
logger.LogInformation("Created ingest pipeline {Pipeline}", PipelineName);
190274
}
191275

192-
private async Task RefreshLookupIndexAsync(Cancel ct)
276+
private async Task RefreshIndexAsync(string indexName, Cancel ct)
193277
{
194278
var response = await operations.WithRetryAsync(
195-
() => transport.PostAsync<StringResponse>($"/{_lookupIndex}/_refresh", PostData.Empty, ct),
196-
$"POST {_lookupIndex}/_refresh",
279+
() => transport.PostAsync<StringResponse>($"/{indexName}/_refresh", PostData.Empty, ct),
280+
$"POST {indexName}/_refresh",
197281
ct
198282
);
199283

200284
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
201-
logger.LogWarning("Failed to refresh lookup index {Index}: {Info}", _lookupIndex, response.ApiCallDetails.DebugInformation);
202-
else
203-
logger.LogInformation("Refreshed lookup index {Index}", _lookupIndex);
204-
}
285+
throw new InvalidOperationException(
286+
$"Failed to refresh index {indexName}: {response.ApiCallDetails.DebugInformation}");
205287

206-
private async Task DeleteLookupContentsAsync(Cancel ct)
207-
{
208-
var body = new JsonObject
209-
{
210-
["query"] = new JsonObject { ["match_all"] = new JsonObject() }
211-
};
212-
await operations.DeleteByQueryAsync(_lookupIndex, PostData.String(body.ToJsonString()), ct);
288+
logger.LogInformation("Refreshed index {Index}", indexName);
213289
}
214290

215-
private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
291+
private async Task ReindexToLookupAsync(string sourceAlias, string destIndex, Cancel ct)
216292
{
217293
var reindexBody = new JsonObject
218294
{
@@ -223,7 +299,7 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
223299
},
224300
["dest"] = new JsonObject
225301
{
226-
["index"] = _lookupIndex
302+
["index"] = destIndex
227303
},
228304
["script"] = new JsonObject
229305
{
@@ -232,6 +308,6 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
232308
}
233309
};
234310

235-
await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), _lookupIndex, ct);
311+
await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), destIndex, ct);
236312
}
237313
}

0 commit comments

Comments
 (0)