Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 116 additions & 40 deletions src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace Elastic.Markdown.Exporters.Elasticsearch;

/// <summary>
/// Manages content-date tracking via an Elasticsearch enrich policy and ingest pipeline.
/// Instead of draining the lookup index into memory, the pipeline compares content hashes
/// at index time and preserves or updates <c>content_last_updated</c> accordingly.
/// Uses a stable alias over timestamped backing indices so that lookup data is atomically
/// swapped after a full reindex, avoiding any window where the lookup is empty.
/// </summary>
public class ContentDateEnrichment(
DistributedTransport transport,
Expand All @@ -21,11 +21,11 @@ public class ContentDateEnrichment(
string buildType,
string environment)
{
private readonly string _lookupIndex = $"docs-{buildType}-content-dates-{environment}";
private readonly string _lookupAlias = $"docs-{buildType}-content-dates-{environment}";

public string PipelineName => $"{_lookupIndex}-pipeline";
public string PipelineName => $"{_lookupAlias}-pipeline";

private string PolicyName => $"{_lookupIndex}-policy";
private string PolicyName => $"{_lookupAlias}-policy";

/// <summary>
/// Creates the lookup index (if needed), enrich policy, executes it, and creates the ingest pipeline.
Expand All @@ -40,35 +40,79 @@ public async Task InitializeAsync(Cancel ct)
}

/// <summary>
/// After indexing completes, syncs the lookup index from the lexical index and re-executes the enrich policy.
/// This replaces all lookup entries with current data (implicitly removing orphans) and ensures the next
/// run's pipeline sees up-to-date content hashes.
/// After indexing completes, reindexes into a fresh staging index and atomically swaps the
/// alias to point at it. The old backing index is deleted only after the swap succeeds.
/// This replaces all lookup entries with current data (implicitly removing orphans) and ensures
/// the next run's pipeline sees up-to-date content hashes.
/// </summary>
public async Task SyncLookupIndexAsync(string lexicalAlias, Cancel ct)
{
logger.LogInformation("Syncing content date lookup index {Index} from {Source}", _lookupIndex, lexicalAlias);
logger.LogInformation("Syncing content date lookup from {Source} via staging index", lexicalAlias);

var oldIndex = await ResolveBackingIndexAsync(ct);
var stagingIndex = GenerateStagingName();

await CreateLookupIndexAsync(stagingIndex, ct);
await ReindexToLookupAsync(lexicalAlias, stagingIndex, ct);
await RefreshIndexAsync(stagingIndex, ct);
await SwapAliasAsync(oldIndex, stagingIndex, ct);

if (oldIndex != null)
await DeleteIndexAsync(oldIndex, ct);

await DeleteLookupContentsAsync(ct);
await ReindexToLookupAsync(lexicalAlias, ct);
await RefreshLookupIndexAsync(ct);
await ExecutePolicyAsync(ct);

logger.LogInformation("Content date lookup sync complete");
}

private async Task EnsureLookupIndexAsync(Cancel ct)
private string GenerateStagingName() =>
$"{_lookupAlias}-{DateTime.UtcNow:yyyyMMddHHmmss}-{Guid.NewGuid().ToString("N")[..8]}";

private async Task<string?> ResolveBackingIndexAsync(Cancel ct)
{
var head = await operations.WithRetryAsync(
() => transport.HeadAsync(_lookupIndex, ct),
$"HEAD {_lookupIndex}",
var response = await operations.WithRetryAsync(
() => transport.GetAsync<StringResponse>($"/_alias/{_lookupAlias}", ct),
$"GET /_alias/{_lookupAlias}",
ct
);
if (head.ApiCallDetails.HttpStatusCode == 200)

if (response.ApiCallDetails.HttpStatusCode == 404)
return null;

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to resolve alias {_lookupAlias}: {response.ApiCallDetails.DebugInformation}");

var json = JsonNode.Parse(response.Body);
var indices = json?.AsObject().Select(kv => kv.Key).ToList() ?? [];

return indices.Count switch
{
0 => null,
1 => indices[0],
_ => throw new InvalidOperationException(
$"Alias {_lookupAlias} points to multiple indices ({string.Join(", ", indices)}); expected exactly one")
};
}

private async Task EnsureLookupIndexAsync(Cancel ct)
{
var existing = await ResolveBackingIndexAsync(ct);
if (existing != null)
{
logger.LogInformation("Content date lookup index {Index} already exists", _lookupIndex);
logger.LogInformation("Content date lookup alias {Alias} already exists, backed by {Index}", _lookupAlias, existing);
return;
}

var indexName = GenerateStagingName();
await CreateLookupIndexAsync(indexName, ct);
await SwapAliasAsync(null, indexName, ct);

logger.LogInformation("Created content date lookup index {Index} with alias {Alias}", indexName, _lookupAlias);
}

private async Task CreateLookupIndexAsync(string indexName, Cancel ct)
{
var mapping = new JsonObject
{
["settings"] = new JsonObject { ["number_of_shards"] = 1, ["number_of_replicas"] = 0 },
Expand All @@ -84,15 +128,55 @@ private async Task EnsureLookupIndexAsync(Cancel ct)
};

var response = await operations.WithRetryAsync(
() => transport.PutAsync<StringResponse>(_lookupIndex, PostData.String(mapping.ToJsonString()), ct),
$"PUT {_lookupIndex}",
() => transport.PutAsync<StringResponse>(indexName, PostData.String(mapping.ToJsonString()), ct),
$"PUT {indexName}",
ct
);
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to create content date lookup index {_lookupIndex}: {response.ApiCallDetails.DebugInformation}");
$"Failed to create content date lookup index {indexName}: {response.ApiCallDetails.DebugInformation}");

logger.LogInformation("Created content date lookup index {Index}", _lookupIndex);
logger.LogInformation("Created content date lookup index {Index}", indexName);
}

private async Task SwapAliasAsync(string? oldIndex, string newIndex, Cancel ct)
{
var addAction = new JsonObject { ["add"] = new JsonObject { ["index"] = newIndex, ["alias"] = _lookupAlias } };

var actions = oldIndex != null
? new JsonArray(
new JsonObject { ["remove"] = new JsonObject { ["index"] = oldIndex, ["alias"] = _lookupAlias } },
addAction
)
: new JsonArray(addAction);

var body = new JsonObject { ["actions"] = actions };

var response = await operations.WithRetryAsync(
() => transport.PostAsync<StringResponse>("/_aliases", PostData.String(body.ToJsonString()), ct),
"POST /_aliases",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to swap alias {_lookupAlias} to {newIndex}: {response.ApiCallDetails.DebugInformation}");

logger.LogInformation("Swapped alias {Alias} from {OldIndex} to {NewIndex}", _lookupAlias, oldIndex ?? "(none)", newIndex);
}

private async Task DeleteIndexAsync(string indexName, Cancel ct)
{
var response = await operations.WithRetryAsync(
() => transport.DeleteAsync<StringResponse>(indexName, new DefaultRequestParameters(), PostData.Empty, ct),
$"DELETE {indexName}",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
logger.LogWarning("Failed to delete old lookup index {Index}: {Info}", indexName, response.ApiCallDetails.DebugInformation);
else
logger.LogInformation("Deleted old lookup index {Index}", indexName);
}

private async Task PutEnrichPolicyAsync(Cancel ct)
Expand All @@ -101,7 +185,7 @@ private async Task PutEnrichPolicyAsync(Cancel ct)
{
["match"] = new JsonObject
{
["indices"] = _lookupIndex,
["indices"] = _lookupAlias,
["match_field"] = "url",
["enrich_fields"] = new JsonArray("content_hash", "content_last_updated")
}
Expand Down Expand Up @@ -190,30 +274,22 @@ private async Task PutPipelineAsync(Cancel ct)
logger.LogInformation("Created ingest pipeline {Pipeline}", PipelineName);
}

private async Task RefreshLookupIndexAsync(Cancel ct)
private async Task RefreshIndexAsync(string indexName, Cancel ct)
{
var response = await operations.WithRetryAsync(
() => transport.PostAsync<StringResponse>($"/{_lookupIndex}/_refresh", PostData.Empty, ct),
$"POST {_lookupIndex}/_refresh",
() => transport.PostAsync<StringResponse>($"/{indexName}/_refresh", PostData.Empty, ct),
$"POST {indexName}/_refresh",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
logger.LogWarning("Failed to refresh lookup index {Index}: {Info}", _lookupIndex, response.ApiCallDetails.DebugInformation);
else
logger.LogInformation("Refreshed lookup index {Index}", _lookupIndex);
}
throw new InvalidOperationException(
$"Failed to refresh index {indexName}: {response.ApiCallDetails.DebugInformation}");

private async Task DeleteLookupContentsAsync(Cancel ct)
{
var body = new JsonObject
{
["query"] = new JsonObject { ["match_all"] = new JsonObject() }
};
await operations.DeleteByQueryAsync(_lookupIndex, PostData.String(body.ToJsonString()), ct);
logger.LogInformation("Refreshed index {Index}", indexName);
}

private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
private async Task ReindexToLookupAsync(string sourceAlias, string destIndex, Cancel ct)
{
var reindexBody = new JsonObject
{
Expand All @@ -224,7 +300,7 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
},
["dest"] = new JsonObject
{
["index"] = _lookupIndex
["index"] = destIndex
},
["script"] = new JsonObject
{
Expand All @@ -233,6 +309,6 @@ private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
}
};

await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), _lookupIndex, ct);
await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), destIndex, ct);
}
}
Loading