Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/Elastic.Documentation/Search/ContentHash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,27 @@

using System.Security.Cryptography;
using System.Text;
using System.Text.RegularExpressions;

namespace Elastic.Documentation.Search;

/// <summary>Creates a short hex hash from one or more string components.</summary>
public static class ContentHash
public static partial class ContentHash
{
/// <summary>
/// Concatenates all components, computes SHA-256, and returns the first 16 hex characters (lowercased).
/// Compatible with <c>HashedBulkUpdate.CreateHash</c>.
/// </summary>
public static string Create(params string[] components) =>
Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..16].ToLowerInvariant();

/// <summary>
/// Collapses all whitespace runs to a single space, trims, then hashes.
/// Ensures that whitespace-only changes do not produce a different hash.
/// </summary>
public static string CreateNormalized(string content) =>
Create(WhitespaceRuns().Replace(content.Trim(), " "));

[GeneratedRegex(@"\s+")]
private static partial Regex WhitespaceRuns();
}
10 changes: 10 additions & 0 deletions src/Elastic.Documentation/Search/DocumentationDocument.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ public record DocumentationDocument
[JsonPropertyName("last_updated")]
public DateTimeOffset LastUpdated { get; set; }

/// The date this document's content (stripped_body) was last updated.
/// Only advances when the whitespace-normalized content hash changes.
[JsonPropertyName("content_last_updated")]
public DateTimeOffset ContentLastUpdated { get; set; }

/// A hash of the whitespace-normalized stripped_body, used to detect content-only changes.
[Keyword]
[JsonPropertyName("content_hash")]
public string ContentBodyHash { get; set; } = string.Empty;

[JsonPropertyName("description")]
public string? Description { get; set; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Text.Json.Nodes;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using Microsoft.Extensions.Logging;

namespace Elastic.Markdown.Exporters.Elasticsearch;

/// <summary>
/// Manages content-date tracking via an Elasticsearch enrich policy and ingest pipeline.
/// Instead of draining the lookup index into memory, the pipeline compares content hashes
/// at index time and preserves or updates <c>content_last_updated</c> accordingly.
/// </summary>
public class ContentDateEnrichment(
DistributedTransport transport,
ElasticsearchOperations operations,
ILogger logger,
string buildType,
string environment)
{
private readonly string _lookupIndex = $"docs-{buildType}-content-dates-{environment}";

public string PipelineName => $"{_lookupIndex}-pipeline";

private string PolicyName => $"{_lookupIndex}-policy";

/// <summary>
/// Creates the lookup index (if needed), enrich policy, executes it, and creates the ingest pipeline.
/// Must be called before indexing begins.
/// </summary>
public async Task InitializeAsync(Cancel ct)
{
await EnsureLookupIndexAsync(ct);
await PutEnrichPolicyAsync(ct);
await ExecutePolicyAsync(ct);
await PutPipelineAsync(ct);
}

/// <summary>
/// After indexing completes, syncs the lookup index from the lexical index and re-executes the enrich policy.
/// This replaces all lookup entries with current data (implicitly removing orphans) and ensures the next
/// run's pipeline sees up-to-date content hashes.
/// </summary>
public async Task SyncLookupIndexAsync(string lexicalAlias, Cancel ct)
{
logger.LogInformation("Syncing content date lookup index {Index} from {Source}", _lookupIndex, lexicalAlias);

await DeleteLookupContentsAsync(ct);
await ReindexToLookupAsync(lexicalAlias, ct);
Comment on lines +51 to +52
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t rebuild the persisted lookup in place.

Line 51 deletes the only durable content-date state before Line 52 has produced a replacement. If shutdown is cancelled or the reindex fails partway through, the lookup index stays empty/partial, and the next InitializeAsync will execute the enrich policy against that truncated data set. Unchanged pages will then get a fresh content_last_updated. Rebuild into a staging index and swap it in only after the copy and refresh succeed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Elastic.Markdown/Exporters/Elasticsearch/ContentDateEnrichment.cs` around
lines 51 - 52, The current sequence calls DeleteLookupContentsAsync which
removes the only durable lookup before ReindexToLookupAsync writes a
replacement; instead, change the flow to reindex into a new staging index (e.g.,
derive a staging name from lexicalAlias), run the full copy and refresh against
that staging index, verify success, then atomically swap the lexicalAlias to
point to the new index and only then delete the old lookup; update usages in
InitializeAsync to continue referencing the alias so the swap is transparent and
ensure ReindexToLookupAsync and DeleteLookupContentsAsync are replaced or
refactored to support staging + alias-swap semantics.

await RefreshLookupIndexAsync(ct);
await ExecutePolicyAsync(ct);

logger.LogInformation("Content date lookup sync complete");
}

private async Task EnsureLookupIndexAsync(Cancel ct)
{
var head = await operations.WithRetryAsync(
() => transport.HeadAsync(_lookupIndex, ct),
$"HEAD {_lookupIndex}",
ct
);
if (head.ApiCallDetails.HttpStatusCode == 200)
{
logger.LogInformation("Content date lookup index {Index} already exists", _lookupIndex);
return;
}

var mapping = new JsonObject
{
["settings"] = new JsonObject { ["number_of_shards"] = 1, ["number_of_replicas"] = 0 },
["mappings"] = new JsonObject
{
["properties"] = new JsonObject
{
["url"] = new JsonObject { ["type"] = "keyword" },
["content_hash"] = new JsonObject { ["type"] = "keyword" },
["content_last_updated"] = new JsonObject { ["type"] = "date" }
}
}
};

var response = await operations.WithRetryAsync(
() => transport.PutAsync<StringResponse>(_lookupIndex, PostData.String(mapping.ToJsonString()), ct),
$"PUT {_lookupIndex}",
ct
);
if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to create content date lookup index {_lookupIndex}: {response.ApiCallDetails.DebugInformation}");

logger.LogInformation("Created content date lookup index {Index}", _lookupIndex);
}

private async Task PutEnrichPolicyAsync(Cancel ct)
{
var policy = new JsonObject
{
["match"] = new JsonObject
{
["indices"] = _lookupIndex,
["match_field"] = "url",
["enrich_fields"] = new JsonArray("content_hash", "content_last_updated")
}
};

var response = await operations.WithRetryAsync(
() => transport.PutAsync<StringResponse>($"/_enrich/policy/{PolicyName}", PostData.String(policy.ToJsonString()), ct),
$"PUT _enrich/policy/{PolicyName}",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to create enrich policy {PolicyName}: {response.ApiCallDetails.DebugInformation}");

logger.LogInformation("Created enrich policy {Policy}", PolicyName);
}

private async Task ExecutePolicyAsync(Cancel ct)
{
var response = await operations.WithRetryAsync(
() => transport.PostAsync<StringResponse>($"/_enrich/policy/{PolicyName}/_execute", PostData.Empty, ct),
$"POST _enrich/policy/{PolicyName}/_execute",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to execute enrich policy {PolicyName}: {response.ApiCallDetails.DebugInformation}");

logger.LogInformation("Executed enrich policy {Policy}", PolicyName);
}

private async Task PutPipelineAsync(Cancel ct)
{
var pipeline = new JsonObject
{
["description"] = "Resolves content_last_updated via enrich policy lookup on content_hash",
["processors"] = new JsonArray(
new JsonObject
{
["set"] = new JsonObject
{
["field"] = "content_last_updated",
["value"] = "{{{_ingest.timestamp}}}"
}
},
new JsonObject
{
["enrich"] = new JsonObject
{
["policy_name"] = PolicyName,
["field"] = "url",
["target_field"] = "_content_date_lookup",
["max_matches"] = 1,
["ignore_missing"] = true
}
},
new JsonObject
{
["script"] = new JsonObject
{
["lang"] = "painless",
["source"] = """
def lookup = ctx._content_date_lookup;
if (lookup != null && lookup.content_hash != null && lookup.content_hash == ctx.content_hash) {
ctx.content_last_updated = lookup.content_last_updated;
}
ctx.remove('_content_date_lookup');
"""
}
}
)
};

var response = await operations.WithRetryAsync(
() => transport.PutAsync<StringResponse>($"/_ingest/pipeline/{PipelineName}", PostData.String(pipeline.ToJsonString()), ct),
$"PUT _ingest/pipeline/{PipelineName}",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
throw new InvalidOperationException(
$"Failed to create ingest pipeline {PipelineName}: {response.ApiCallDetails.DebugInformation}");

logger.LogInformation("Created ingest pipeline {Pipeline}", PipelineName);
}

private async Task RefreshLookupIndexAsync(Cancel ct)
{
var response = await operations.WithRetryAsync(
() => transport.PostAsync<StringResponse>($"/{_lookupIndex}/_refresh", PostData.Empty, ct),
$"POST {_lookupIndex}/_refresh",
ct
);

if (!response.ApiCallDetails.HasSuccessfulStatusCode)
logger.LogWarning("Failed to refresh lookup index {Index}: {Info}", _lookupIndex, response.ApiCallDetails.DebugInformation);
else
logger.LogInformation("Refreshed lookup index {Index}", _lookupIndex);
}

private async Task DeleteLookupContentsAsync(Cancel ct)
{
var body = new JsonObject
{
["query"] = new JsonObject { ["match_all"] = new JsonObject() }
};
await operations.DeleteByQueryAsync(_lookupIndex, PostData.String(body.ToJsonString()), ct);
}

private async Task ReindexToLookupAsync(string sourceAlias, Cancel ct)
{
var reindexBody = new JsonObject
{
["source"] = new JsonObject
{
["index"] = sourceAlias,
["_source"] = new JsonArray("url", "content_hash", "content_last_updated")
},
["dest"] = new JsonObject
{
["index"] = _lookupIndex
},
["script"] = new JsonObject
{
["lang"] = "painless",
["source"] = "ctx._id = ctx._source.url.sha256().substring(0, 16)"
}
};

await operations.ReindexAsync(sourceAlias, PostData.String(reindexBody.ToJsonString()), _lookupIndex, ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ private void AssignDocumentMetadata(DocumentationDocument doc)
doc.Hash = hash;
}

/// <summary>Computes and assigns the whitespace-normalized content hash for change detection.</summary>
private static void AssignContentHash(DocumentationDocument doc) =>
doc.ContentBodyHash = ContentHash.CreateNormalized(doc.StrippedBody ?? string.Empty);

private static void CommonEnrichments(DocumentationDocument doc, INavigationItem? navigationItem)
{
doc.SearchTitle = CreateSearchTitle();
Expand Down Expand Up @@ -154,6 +158,7 @@ public async ValueTask<bool> ExportAsync(MarkdownExportFileContext fileContext,
: null;

CommonEnrichments(doc, currentNavigation);
AssignContentHash(doc);
AssignDocumentMetadata(doc);

return await WriteDocumentAsync(doc, ctx);
Expand Down Expand Up @@ -191,6 +196,7 @@ public async ValueTask<bool> FinishExportAsync(IDirectoryInfo outputFolder, Canc
doc.Abstract = @abstract;
doc.Headings = headings;
CommonEnrichments(doc, null);
AssignContentHash(doc);
AssignDocumentMetadata(doc);

if (!await WriteDocumentAsync(doc, ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public partial class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposa
// AI Enrichment - post-indexing via AiEnrichmentOrchestrator
private readonly AiEnrichmentOrchestrator? _aiEnrichment;

// Content date tracking - enrich policy + pipeline for content_last_updated
private readonly ContentDateEnrichment _contentDateEnrichment;

// Per-channel running totals for progress logging
private int _primaryIndexed;
private int _secondaryIndexed;
Expand Down Expand Up @@ -72,6 +75,7 @@ IDocumentationConfigurationContext context

_transport = ElasticsearchTransportFactory.Create(es);
_operations = new ElasticsearchOperations(_transport, _logger, collector);
_contentDateEnrichment = new ContentDateEnrichment(_transport, _operations, _logger, endpoints.BuildType, endpoints.Environment);

string[] fixedSynonyms = ["esql", "data-stream", "data-streams", "machine-learning"];
var indexTimeSynonyms = _synonyms.Aggregate(new List<SynonymRule>(), (acc, synonym) =>
Expand All @@ -87,13 +91,21 @@ IDocumentationConfigurationContext context
_lexicalTypeContext = DocumentationMappingContext.DocumentationDocument
.CreateContext(type: _buildType, env: endpoints.Environment) with
{
ConfigureAnalysis = a => DocumentationAnalysisFactory.BuildAnalysis(a, synonymSetName, indexTimeSynonyms)
ConfigureAnalysis = a => DocumentationAnalysisFactory.BuildAnalysis(a, synonymSetName, indexTimeSynonyms),
IndexSettings = new Dictionary<string, string>
{
["index.default_pipeline"] = _contentDateEnrichment.PipelineName
}
};

_semanticTypeContext = DocumentationMappingContext.DocumentationDocumentSemantic
.CreateContext(type: _buildType, env: endpoints.Environment) with
{
ConfigureAnalysis = a => DocumentationAnalysisFactory.BuildAnalysis(a, synonymSetName, indexTimeSynonyms)
ConfigureAnalysis = a => DocumentationAnalysisFactory.BuildAnalysis(a, synonymSetName, indexTimeSynonyms),
IndexSettings = new Dictionary<string, string>
{
["index.final_pipeline"] = _contentDateEnrichment.PipelineName
}
};

if (es.EnableAiEnrichment)
Expand All @@ -105,10 +117,11 @@ IDocumentationConfigurationContext context
"AI enrichment enabled — pipeline: {Pipeline}, policy: {Policy}, lookup: {Lookup}",
infra.PipelineName, infra.EnrichPolicyName, infra.LookupIndexName);

_semanticTypeContext = _semanticTypeContext with
var semanticSettings = new Dictionary<string, string>(_semanticTypeContext.IndexSettings ?? new Dictionary<string, string>())
{
IndexSettings = new Dictionary<string, string> { ["index.default_pipeline"] = infra.PipelineName }
["index.default_pipeline"] = infra.PipelineName
};
_semanticTypeContext = _semanticTypeContext with { IndexSettings = semanticSettings };
}
else
{
Expand Down Expand Up @@ -137,6 +150,10 @@ IDocumentationConfigurationContext context
};
_ = _orchestrator.AddPreBootstrapTask(async (_, ct) =>
{
_logger.LogInformation("Initializing content date enrichment infrastructure...");
await _contentDateEnrichment.InitializeAsync(ct);
_logger.LogInformation("Content date enrichment infrastructure ready");

if (_aiEnrichment is not null)
{
_logger.LogInformation("Initializing AI enrichment infrastructure...");
Expand Down Expand Up @@ -193,14 +210,18 @@ private void ConfigureChannelOptions(string label, IngestChannelOptions<Document
public async ValueTask StartAsync(Cancel ctx = default)
{
var orchestratorContext = await _orchestrator.StartAsync(BootstrapMethod.Failure, ctx);

_logger.LogInformation(
"Orchestrator started — strategy: {Strategy}, primary: {PrimaryAlias}, secondary: {SecondaryAlias}",
orchestratorContext.Strategy, orchestratorContext.PrimaryWriteAlias, orchestratorContext.SecondaryWriteAlias);
}

/// <inheritdoc />
public async ValueTask StopAsync(Cancel ctx = default) =>
public async ValueTask StopAsync(Cancel ctx = default)
{
_ = await _orchestrator.CompleteAsync(null, ctx);
await _contentDateEnrichment.SyncLookupIndexAsync(_lexicalTypeContext.IndexStrategy!.WriteTarget!, ctx);
}

private async Task PostCompleteAsync(OrchestratorContext<DocumentationDocument> context, Cancel ctx)
{
Expand Down
Loading
Loading