From 857b9bc8b6f798e773a731f3bd4e42a76ddf4920 Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Fri, 5 Dec 2025 12:48:03 +0000 Subject: [PATCH] feat: add support for interceptors in Elasticsearch client --- _examples/interceptor/Makefile | 12 + _examples/interceptor/README.md | 68 +++++ .../interceptor/cmd/auth_provider/main.go | 120 ++++++++ .../interceptor/cmd/context_auth/main.go | 116 ++++++++ _examples/interceptor/cmd/custom_auth/main.go | 155 +++++++++++ .../cmd/custom_observability/main.go | 261 ++++++++++++++++++ _examples/interceptor/go.mod | 25 ++ _examples/interceptor/go.sum | 39 +++ _examples/interceptor/internal/fake/server.go | 145 ++++++++++ .../interceptor/internal/redact/basic.go | 45 +++ elasticsearch.go | 7 + elasticsearch_internal_test.go | 120 ++++++++ 12 files changed, 1113 insertions(+) create mode 100644 _examples/interceptor/Makefile create mode 100644 _examples/interceptor/README.md create mode 100644 _examples/interceptor/cmd/auth_provider/main.go create mode 100644 _examples/interceptor/cmd/context_auth/main.go create mode 100644 _examples/interceptor/cmd/custom_auth/main.go create mode 100644 _examples/interceptor/cmd/custom_observability/main.go create mode 100644 _examples/interceptor/go.mod create mode 100644 _examples/interceptor/go.sum create mode 100644 _examples/interceptor/internal/fake/server.go create mode 100644 _examples/interceptor/internal/redact/basic.go diff --git a/_examples/interceptor/Makefile b/_examples/interceptor/Makefile new file mode 100644 index 0000000000..838b62177f --- /dev/null +++ b/_examples/interceptor/Makefile @@ -0,0 +1,12 @@ +GO_TEST_CMD = $(if $(shell which richgo),richgo test,go test) +export ELASTICSEARCH_URL=http://elastic:elastic@localhost:9200 + +test: ## Run tests + go run ./cmd/auth_provider/main.go + go run ./cmd/context_auth/main.go + go run ./cmd/custom_auth/main.go + go run ./cmd/custom_observability/main.go + +setup: + +.PHONY: test setup diff --git a/_examples/interceptor/README.md b/_examples/interceptor/README.md new file mode 100644 index 0000000000..6657ccd41d --- /dev/null +++ b/_examples/interceptor/README.md @@ -0,0 +1,68 @@ +# Example: Interceptors + +This example demonstrates how to use Interceptors to modify HTTP requests before they are sent to Elasticsearch. + +Interceptors wrap the HTTP round-trip and can inspect or modify requests and responses. +They are configured via the `elasticsearch.Config.Interceptors` field: + +```go +es, _ := elasticsearch.NewClient(elasticsearch.Config{ + Interceptors: []elastictransport.InterceptorFunc{ + func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(req *http.Request) (*http.Response, error) { + // Modify request before sending + return next(req) + } + }, + }, +}) +``` + +## Dynamic Auth Provider + +The [`cmd/auth_provider/main.go`](cmd/auth_provider/main.go) example demonstrates how to dynamically inject authentication credentials into requests. + +This pattern is useful for scenarios where credentials may change at runtime, such as token refresh or credential rotation. + +```bash +go run cmd/auth_provider/main.go +``` + +## Context-Based Auth Override + +The [`cmd/context_auth/main.go`](cmd/context_auth/main.go) example demonstrates how to override authentication credentials on a per-request basis using `context.Context`. + +This pattern is useful for multi-tenant applications or impersonation scenarios where different requests need different credentials. + +```bash +go run cmd/context_auth/main.go +``` + +## Custom Auth (Kerberos/SPNEGO) + +The [`cmd/custom_auth/main.go`](cmd/custom_auth/main.go) example demonstrates how to implement Kerberos/SPNEGO authentication with challenge-response handling. + +The interceptor handles 401 responses with `WWW-Authenticate: Negotiate` by obtaining a token and retrying the request. + +> **Note:** This example uses a mock implementation. In production, you would use a Kerberos library like [gokrb5](https://github.com/jcmturner/gokrb5) to obtain service tickets. + +```bash +go run cmd/custom_auth/main.go +``` + +## Custom Observability + +The [`cmd/custom_observability/main.go`](cmd/custom_observability/main.go) example demonstrates how to add custom observability to Elasticsearch requests using OpenTelemetry. + +It shows three interceptors for: + +* **Logging**: Request/response details using `slog` +* **Metrics**: Request counter and duration histogram +* **Tracing**: Distributed tracing with spans + +> **Note:** The client has built-in observability functionality. Prefer using the built-in options where possible. + +```bash +go run cmd/custom_observability/main.go +``` + diff --git a/_examples/interceptor/cmd/auth_provider/main.go b/_examples/interceptor/cmd/auth_provider/main.go new file mode 100644 index 0000000000..fd0a5acc74 --- /dev/null +++ b/_examples/interceptor/cmd/auth_provider/main.go @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example demonstrates how to use Interceptors to dynamically +// inject authentication credentials into requests. +// +// Interceptors allow you to modify requests before they are sent, +// making them ideal for scenarios where credentials may change at +// runtime (e.g., token refresh, credential rotation). +package main + +import ( + "fmt" + "log/slog" + "net/http" + "sync" + + "github.com/elastic/elastic-transport-go/v8/elastictransport" + "github.com/elastic/go-elasticsearch/v9" + "github.com/elastic/go-elasticsearch/v9/_examples/interceptor/internal/fake" + "github.com/elastic/go-elasticsearch/v9/_examples/interceptor/internal/redact" +) + +func main() { + // Start a fake Elasticsearch server that logs incoming auth credentials + srv := fake.NewServer( + fake.WithLogFn(func(r *http.Request) { + username, password, _ := redact.BasicAuth(r) + slog.Info("server received request", + slog.String("method", r.Method), + slog.String("path", r.URL.Path), + slog.String("username", username), + slog.String("password", password), + ) + }), + fake.WithStatusCode(http.StatusOK), + fake.WithResponseBody([]byte(`{"cluster_name":"example"}`)), + fake.WithHeaders(func(h http.Header) { + h.Set("X-Elastic-Product", "Elasticsearch") + h.Set("Content-Type", "application/json") + }), + ) + defer srv.Close() + + // Create a credential provider that can be updated at runtime + authProvider := NewCredentialProvider("user1", "password1") + + // Create an Elasticsearch client with a custom auth interceptor + es, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: []string{srv.URL()}, + Interceptors: []elastictransport.InterceptorFunc{ + DynamicAuthInterceptor(authProvider), + }, + }) + if err != nil { + panic(err) + } + + // First request uses initial credentials + fmt.Println(">>> Sending request with initial credentials (user1)") + _, _ = es.Info() + + // Update credentials (simulating credential rotation) + fmt.Println("\n>>> Rotating credentials to (user2)") + authProvider.Update("user2", "password2") + + // Second request automatically uses the new credentials + fmt.Println("\n>>> Sending request with rotated credentials (user2)") + _, _ = es.Info() +} + +// DynamicAuthInterceptor creates an interceptor that injects BasicAuth +// credentials from a CredentialProvider into each request. +func DynamicAuthInterceptor(provider *CredentialProvider) elastictransport.InterceptorFunc { + return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(req *http.Request) (*http.Response, error) { + username, password := provider.Get() + req.SetBasicAuth(username, password) + return next(req) + } + } +} + +// CredentialProvider holds credentials that can be safely updated at runtime. +type CredentialProvider struct { + mu sync.RWMutex + username string + password string +} + +func NewCredentialProvider(username, password string) *CredentialProvider { + return &CredentialProvider{username: username, password: password} +} + +func (p *CredentialProvider) Update(username, password string) { + p.mu.Lock() + defer p.mu.Unlock() + p.username = username + p.password = password +} + +func (p *CredentialProvider) Get() (username, password string) { + p.mu.RLock() + defer p.mu.RUnlock() + return p.username, p.password +} diff --git a/_examples/interceptor/cmd/context_auth/main.go b/_examples/interceptor/cmd/context_auth/main.go new file mode 100644 index 0000000000..5532254574 --- /dev/null +++ b/_examples/interceptor/cmd/context_auth/main.go @@ -0,0 +1,116 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example demonstrates how to use Interceptors to override +// authentication credentials on a per-request basis using context.Context. +// +// This pattern is useful when different requests need different credentials, +// such as multi-tenant applications or impersonation scenarios. +package main + +import ( + "context" + "fmt" + "log/slog" + "net/http" + + "github.com/elastic/elastic-transport-go/v8/elastictransport" + "github.com/elastic/go-elasticsearch/v9" + "github.com/elastic/go-elasticsearch/v9/_examples/interceptor/internal/fake" + "github.com/elastic/go-elasticsearch/v9/_examples/interceptor/internal/redact" +) + +func main() { + // Start a fake Elasticsearch server that logs incoming auth credentials + srv := fake.NewServer( + fake.WithLogFn(func(r *http.Request) { + username, password, _ := redact.BasicAuth(r) + slog.Info("server received request", + slog.String("method", r.Method), + slog.String("path", r.URL.Path), + slog.String("username", username), + slog.String("password", password), + ) + }), + fake.WithStatusCode(http.StatusOK), + fake.WithResponseBody([]byte(`{"cluster_name":"example"}`)), + fake.WithHeaders(func(h http.Header) { + h.Set("X-Elastic-Product", "Elasticsearch") + h.Set("Content-Type", "application/json") + }), + ) + defer srv.Close() + + // Create an Elasticsearch client with default credentials and context auth interceptor + es, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: []string{srv.URL()}, + Username: "default_user", + Password: "default_password", + Interceptors: []elastictransport.InterceptorFunc{ + ContextAuthInterceptor(), + }, + }) + if err != nil { + panic(err) + } + + // Request without context override uses default credentials + fmt.Println(">>> Sending request with default credentials") + _, _ = es.Info() + + // Request with context override uses the specified credentials + fmt.Println("\n>>> Sending request with context override (tenant_a)") + ctx := WithBasicAuth(context.Background(), "tenant_a", "tenant_a_secret") + _, _ = es.Info(es.Info.WithContext(ctx)) + + // Another request with different context credentials + fmt.Println("\n>>> Sending request with context override (tenant_b)") + ctx = WithBasicAuth(context.Background(), "tenant_b", "tenant_b_secret") + _, _ = es.Info(es.Info.WithContext(ctx)) + + // Request without context override still uses default credentials + fmt.Println("\n>>> Sending request with default credentials again") + _, _ = es.Info() +} + +// basicAuthKey is the context key for storing basic auth credentials. +type basicAuthKey struct{} + +type basicAuthValue struct { + username string + password string +} + +// WithBasicAuth returns a context with basic auth credentials attached. +// Use this to override the default client credentials for a specific request. +func WithBasicAuth(ctx context.Context, username, password string) context.Context { + return context.WithValue(ctx, basicAuthKey{}, basicAuthValue{username, password}) +} + +// ContextAuthInterceptor creates an interceptor that overrides BasicAuth +// credentials if they are present in the request's context. +// If no credentials are in the context, the request proceeds unchanged. +func ContextAuthInterceptor() elastictransport.InterceptorFunc { + return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(req *http.Request) (*http.Response, error) { + if auth, ok := req.Context().Value(basicAuthKey{}).(basicAuthValue); ok { + req.SetBasicAuth(auth.username, auth.password) + } + return next(req) + } + } +} diff --git a/_examples/interceptor/cmd/custom_auth/main.go b/_examples/interceptor/cmd/custom_auth/main.go new file mode 100644 index 0000000000..9ba8c85d90 --- /dev/null +++ b/_examples/interceptor/cmd/custom_auth/main.go @@ -0,0 +1,155 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example demonstrates how to use Interceptors to implement +// Kerberos/SPNEGO authentication with challenge-response handling. +// +// Note: This example is a mock implementation and does not use true Kerberos/SPNEGO authentication. +// In production, you would use a Kerberos library like gokrb5 to obtain a service ticket. +// +// The interceptor handles 401 responses with WWW-Authenticate: Negotiate +// by obtaining a token and retrying the request with the Authorization header. +package main + +import ( + "fmt" + "log/slog" + "net/http" + "strings" + + "github.com/elastic/elastic-transport-go/v8/elastictransport" + "github.com/elastic/go-elasticsearch/v9" + "github.com/elastic/go-elasticsearch/v9/_examples/interceptor/internal/fake" +) + +func main() { + // Start a fake Elasticsearch server with SPNEGO auth middleware + srv := fake.NewServer( + fake.WithLogFn(func(r *http.Request) { + auth := r.Header.Get("Authorization") + slog.Info("server received request", + slog.String("method", r.Method), + slog.String("path", r.URL.Path), + slog.String("authorization", auth), + ) + }), + fake.WithResponseBody([]byte(`{"cluster_name":"example"}`)), + fake.WithHeaders(func(h http.Header) { + h.Set("X-Elastic-Product", "Elasticsearch") + h.Set("Content-Type", "application/json") + }), + fake.WithMiddleware(SPNEGOAuthMiddleware), + ) + defer srv.Close() + + // Create an Elasticsearch client with the Kerberos interceptor + es, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: []string{srv.URL()}, + Interceptors: []elastictransport.InterceptorFunc{ + KerberosInterceptor(MockTokenProvider), + }, + }) + if err != nil { + panic(err) + } + + // Send a request - the interceptor handles the 401 challenge automatically + fmt.Println(">>> Sending request (interceptor will handle SPNEGO challenge)") + resp, err := es.Info() + if err != nil { + panic(err) + } + fmt.Printf(">>> Response status: %s\n", resp.Status()) +} + +// KerberosInterceptor creates an interceptor that handles Kerberos/SPNEGO authentication. +// When a 401 response with WWW-Authenticate: Negotiate is received, it obtains a token +// using the provided tokenProvider and retries the request. +func KerberosInterceptor(tokenProvider func() (string, error)) elastictransport.InterceptorFunc { + return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(req *http.Request) (*http.Response, error) { + // Send the initial request + resp, err := next(req) + if err != nil { + return nil, err + } + + // Check if server requires SPNEGO authentication + if resp.StatusCode == http.StatusUnauthorized { + authHeader := resp.Header.Get("WWW-Authenticate") + if strings.HasPrefix(authHeader, "Negotiate") { + slog.Info("received SPNEGO challenge, obtaining token and retrying") + + // Close the original response body + resp.Body.Close() + + // Obtain Kerberos token + token, err := tokenProvider() + if err != nil { + return nil, fmt.Errorf("failed to obtain Kerberos token: %w", err) + } + + // Clone the request and add the Authorization header + retryReq := req.Clone(req.Context()) + retryReq.Header.Set("Authorization", "Negotiate "+token) + + // Retry with the token + return next(retryReq) + } + } + + return resp, nil + } + } +} + +// SPNEGOAuthMiddleware simulates a server that requires SPNEGO authentication. +// It returns 401 with WWW-Authenticate: Negotiate if no valid token is provided, +// or passes through to the next handler if authentication succeeds. +func SPNEGOAuthMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + auth := r.Header.Get("Authorization") + + // Check for valid Negotiate token + if strings.HasPrefix(auth, "Negotiate ") { + token := strings.TrimPrefix(auth, "Negotiate ") + if token != "" { + // Token provided - authentication successful + slog.Info("SPNEGO authentication successful") + next.ServeHTTP(w, r) + return + } + } + + // No valid token - send 401 challenge + slog.Info("no valid token, sending SPNEGO challenge") + w.Header().Set("WWW-Authenticate", "Negotiate") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`{"error":"authentication required"}`)) + }) +} + +// MockTokenProvider simulates obtaining a Kerberos token. +// In a real implementation, this would use a Kerberos library like gokrb5. +func MockTokenProvider() (string, error) { + // In production, this would: + // 1. Load Kerberos credentials (keytab or credential cache) + // 2. Obtain a service ticket for the Elasticsearch SPN + // 3. Return the base64-encoded SPNEGO token + return "MOCK_KERBEROS_TOKEN_BASE64", nil +} diff --git a/_examples/interceptor/cmd/custom_observability/main.go b/_examples/interceptor/cmd/custom_observability/main.go new file mode 100644 index 0000000000..6688a53f29 --- /dev/null +++ b/_examples/interceptor/cmd/custom_observability/main.go @@ -0,0 +1,261 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This example demonstrates how to use Interceptors to add custom +// observability to Elasticsearch requests using OpenTelemetry. +// +// Note: There is existing, built-in functionality to add observability to Elasticsearch requests. +// Prefer using the built-in functionality over this example where possible. +// +// It shows three interceptors for: +// - Logging: Request/response details using slog +// - Metrics: Request counter and duration histogram +// - Tracing: Distributed tracing with spans +// +// All telemetry is exported to stdout for demonstration purposes. +package main + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "os" + "time" + + "github.com/elastic/elastic-transport-go/v8/elastictransport" + "github.com/elastic/go-elasticsearch/v9" + "github.com/elastic/go-elasticsearch/v9/_examples/interceptor/internal/fake" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" +) + +func main() { + // Initialize OpenTelemetry providers + shutdown := initOTel() + defer shutdown() + + // Start a fake Elasticsearch server + srv := fake.NewServer( + fake.WithStatusCode(http.StatusOK), + fake.WithResponseBody([]byte(`{"cluster_name":"example","version":{"number":"9.2.0"}}`)), + fake.WithHeaders(func(h http.Header) { + h.Set("X-Elastic-Product", "Elasticsearch") + h.Set("Content-Type", "application/json") + }), + ) + defer srv.Close() + + // Create metrics instruments + meter := otel.Meter("elasticsearch-client") + requestCounter, _ := meter.Int64Counter("elasticsearch.client.requests", + metric.WithDescription("Number of requests to Elasticsearch"), + metric.WithUnit("{request}"), + ) + requestDuration, _ := meter.Float64Histogram("elasticsearch.client.duration", + metric.WithDescription("Duration of Elasticsearch requests"), + metric.WithUnit("ms"), + ) + + // Create tracer + tracer := otel.Tracer("elasticsearch-client") + + // Create an Elasticsearch client with observability interceptors + es, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: []string{srv.URL()}, + Interceptors: []elastictransport.InterceptorFunc{ + LoggingInterceptor(), + MetricsInterceptor(requestCounter, requestDuration), + TracingInterceptor(tracer), + }, + }) + if err != nil { + panic(err) + } + + // Send some requests to demonstrate the observability + fmt.Println(">>> Sending requests to demonstrate observability interceptors") + fmt.Println() + + for i := 1; i <= 3; i++ { + fmt.Printf("--- Request %d ---\n", i) + _, _ = es.Info() + fmt.Println() + time.Sleep(100 * time.Millisecond) // Small delay between requests + } + + // Give time for metrics to flush + fmt.Println(">>> Waiting for metrics to flush...") + time.Sleep(2 * time.Second) +} + +// initOTel initializes OpenTelemetry with stdout exporters for both tracing and metrics. +func initOTel() func() { + ctx := context.Background() + + // Create resource with service info + res, _ := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("elasticsearch-example"), + ), + ) + + // Initialize trace exporter (stdout with pretty print) + traceExporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint()) + if err != nil { + panic(err) + } + + // Initialize tracer provider + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithSyncer(traceExporter), // Use Syncer for immediate output + ) + otel.SetTracerProvider(tracerProvider) + + // Initialize metric exporter (stdout) + metricExporter, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint()) + if err != nil { + panic(err) + } + + // Initialize meter provider with periodic reader + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(res), + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, sdkmetric.WithInterval(time.Second))), + ) + otel.SetMeterProvider(meterProvider) + + // Return shutdown function + return func() { + _ = tracerProvider.Shutdown(ctx) + _ = meterProvider.Shutdown(ctx) + } +} + +// LoggingInterceptor creates an interceptor that logs request and response details. +func LoggingInterceptor() elastictransport.InterceptorFunc { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(req *http.Request) (*http.Response, error) { + start := time.Now() + + logger.Info("elasticsearch request started", + slog.String("method", req.Method), + slog.String("url", req.URL.String()), + ) + + resp, err := next(req) + + duration := time.Since(start) + + if err != nil { + logger.Error("elasticsearch request failed", + slog.String("method", req.Method), + slog.String("url", req.URL.String()), + slog.Duration("duration", duration), + slog.String("error", err.Error()), + ) + return nil, err + } + + logger.Info("elasticsearch request completed", + slog.String("method", req.Method), + slog.String("url", req.URL.String()), + slog.Int("status_code", resp.StatusCode), + slog.Duration("duration", duration), + ) + + return resp, nil + } + } +} + +// MetricsInterceptor creates an interceptor that records request metrics. +func MetricsInterceptor(counter metric.Int64Counter, histogram metric.Float64Histogram) elastictransport.InterceptorFunc { + return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(req *http.Request) (*http.Response, error) { + start := time.Now() + + resp, err := next(req) + + duration := float64(time.Since(start).Milliseconds()) + + // Record metrics with attributes + attrs := []attribute.KeyValue{ + attribute.String("http.method", req.Method), + attribute.String("url.path", req.URL.Path), + } + + if err != nil { + attrs = append(attrs, attribute.String("error.type", "request_error")) + } else { + attrs = append(attrs, attribute.Int("http.status_code", resp.StatusCode)) + } + + counter.Add(req.Context(), 1, metric.WithAttributes(attrs...)) + histogram.Record(req.Context(), duration, metric.WithAttributes(attrs...)) + + return resp, err + } + } +} + +// TracingInterceptor creates an interceptor that adds distributed tracing. +func TracingInterceptor(tracer trace.Tracer) elastictransport.InterceptorFunc { + return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(req *http.Request) (*http.Response, error) { + ctx, span := tracer.Start(req.Context(), + fmt.Sprintf("elasticsearch %s", req.Method), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.HTTPRequestMethodKey.String(req.Method), + semconv.URLFull(req.URL.String()), + semconv.URLPath(req.URL.Path), + semconv.ServerAddress(req.URL.Host), + ), + ) + defer span.End() + + // Update request with traced context + req = req.WithContext(ctx) + + resp, err := next(req) + + if err != nil { + span.RecordError(err) + span.SetAttributes(attribute.String("error.message", err.Error())) + return nil, err + } + + span.SetAttributes(semconv.HTTPResponseStatusCode(resp.StatusCode)) + + return resp, nil + } + } +} diff --git a/_examples/interceptor/go.mod b/_examples/interceptor/go.mod new file mode 100644 index 0000000000..97affbb716 --- /dev/null +++ b/_examples/interceptor/go.mod @@ -0,0 +1,25 @@ +module github.com/elastic/go-elasticsearch/v9/_examples/interceptor + +go 1.23.0 + +replace github.com/elastic/go-elasticsearch/v9 => ../.. + +require ( + github.com/elastic/elastic-transport-go/v8 v8.8.0 + github.com/elastic/go-elasticsearch/v9 v9.0.0-00010101000000-000000000000 + go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0 + go.opentelemetry.io/otel/metric v1.38.0 + go.opentelemetry.io/otel/sdk v1.38.0 + go.opentelemetry.io/otel/sdk/metric v1.38.0 + go.opentelemetry.io/otel/trace v1.38.0 +) + +require ( + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + golang.org/x/sys v0.35.0 // indirect +) diff --git a/_examples/interceptor/go.sum b/_examples/interceptor/go.sum new file mode 100644 index 0000000000..400caf9ea7 --- /dev/null +++ b/_examples/interceptor/go.sum @@ -0,0 +1,39 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.8.0 h1:7k1Ua+qluFr6p1jfJjGDl97ssJS/P7cHNInzfxgBQAo= +github.com/elastic/elastic-transport-go/v8 v8.8.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0 h1:wm/Q0GAAykXv83wzcKzGGqAnnfLFyFe7RslekZuv+VI= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.38.0/go.mod h1:ra3Pa40+oKjvYh+ZD3EdxFZZB0xdMfuileHAm4nNN7w= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0 h1:kJxSDN4SgWWTjG/hPp3O7LCGLcHXFlvS2/FFOrwL+SE= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.38.0/go.mod h1:mgIOzS7iZeKJdeB8/NYHrJ48fdGc71Llo5bJ1J4DWUE= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/interceptor/internal/fake/server.go b/_examples/interceptor/internal/fake/server.go new file mode 100644 index 0000000000..6c7bb5f598 --- /dev/null +++ b/_examples/interceptor/internal/fake/server.go @@ -0,0 +1,145 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fake + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" +) + +// LogFn is a function called on each request for custom logging. +type LogFn func(*http.Request) + +// Config holds the configuration for the fake server. +type Config struct { + LogFn LogFn + ResponseBody []byte + StatusCode int + Port int // 0 = random (default) + SetHeaders func(http.Header) + Middleware func(http.Handler) http.Handler +} + +// Option is a functional option for configuring the server. +type Option func(*Config) + +// WithLogFn sets a function to be called on each request for custom logging. +func WithLogFn(fn LogFn) Option { + return func(c *Config) { + c.LogFn = fn + } +} + +// WithResponseBody sets the response body returned by the server. +func WithResponseBody(body []byte) Option { + return func(c *Config) { + c.ResponseBody = body + } +} + +// WithStatusCode sets the HTTP status code returned by the server. +func WithStatusCode(code int) Option { + return func(c *Config) { + c.StatusCode = code + } +} + +// WithPort sets a specific port for the server to listen on. +// Use 0 (default) for a random available port. +func WithPort(port int) Option { + return func(c *Config) { + c.Port = port + } +} + +// WithHeaders sets a function to customize response headers. +func WithHeaders(fn func(http.Header)) Option { + return func(c *Config) { + c.SetHeaders = fn + } +} + +// WithMiddleware wraps the default handler with custom middleware. +// This allows full control over request/response flow. +func WithMiddleware(mw func(http.Handler) http.Handler) Option { + return func(c *Config) { + c.Middleware = mw + } +} + +// Server wraps httptest.Server to emulate an Elasticsearch server. +type Server struct { + *httptest.Server + config *Config +} + +// NewServer creates a new fake server with the given options. +// The server is started automatically and accepts requests to any endpoint. +func NewServer(opts ...Option) *Server { + config := &Config{ + StatusCode: http.StatusOK, + LogFn: func(*http.Request) {}, // no-op by default + } + + for _, opt := range opts { + opt(config) + } + + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + config.LogFn(r) + + if config.SetHeaders != nil { + config.SetHeaders(w.Header()) + } + + w.WriteHeader(config.StatusCode) + + if config.ResponseBody != nil { + _, _ = w.Write(config.ResponseBody) + } + }) + + if config.Middleware != nil { + handler = config.Middleware(handler) + } + + srv := &Server{config: config} + + if config.Port != 0 { + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", config.Port)) + if err != nil { + panic(fmt.Sprintf("failed to listen on port %d: %v", config.Port, err)) + } + srv.Server = &httptest.Server{ + Listener: listener, + Config: &http.Server{Handler: handler}, + } + srv.Server.Start() + } else { + srv.Server = httptest.NewServer(handler) + } + + return srv +} + +// URL returns the server's URL, suitable for use as an Elasticsearch endpoint. +func (s *Server) URL() string { + return s.Server.URL +} diff --git a/_examples/interceptor/internal/redact/basic.go b/_examples/interceptor/internal/redact/basic.go new file mode 100644 index 0000000000..5eb16910eb --- /dev/null +++ b/_examples/interceptor/internal/redact/basic.go @@ -0,0 +1,45 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package redact + +import ( + "net/http" + "strings" +) + +// BasicAuth extracts basic auth credentials from the request and returns +// the username and a masked password. The password is masked to show only +// the first and last characters if it's 5+ characters long, otherwise +// it's fully masked. Returns empty strings and false if no auth is present. +func BasicAuth(r *http.Request) (username, maskedPassword string, ok bool) { + username, password, ok := r.BasicAuth() + if !ok { + return "", "", false + } + return username, MaskPassword(password), true +} + +// MaskPassword masks a password string, showing only the first and last +// characters if the password is 5+ characters long. Passwords shorter +// than 5 characters are fully masked. +func MaskPassword(password string) string { + if len(password) < 5 { + return strings.Repeat("*", len(password)) + } + return string(password[0]) + strings.Repeat("*", len(password)-2) + string(password[len(password)-1]) +} diff --git a/elasticsearch.go b/elasticsearch.go index 7a6cf456e7..2e717608bc 100644 --- a/elasticsearch.go +++ b/elasticsearch.go @@ -123,6 +123,12 @@ type Config struct { ConnectionPoolFunc func([]*elastictransport.Connection, elastictransport.Selector) elastictransport.ConnectionPool Instrumentation elastictransport.Instrumentation // Enable instrumentation throughout the client. + + // Interceptors is an array of functions that can mutate the *http.Request / *http.Response on each call to the http.RoundTripper. + // These interceptors are applied left to right, meaning the leftmost interceptor will modify the *http.Request first in the chain + // and the *http.Response last. + // This array is used on instantiation of the transport only and cannot be mutated after transport creation. + Interceptors []elastictransport.InterceptorFunc } // NewOpenTelemetryInstrumentation provides the OpenTelemetry integration for both low-level and TypedAPI. @@ -345,6 +351,7 @@ func newTransport(cfg Config) (*elastictransport.Client, error) { ConnectionPoolFunc: cfg.ConnectionPoolFunc, Instrumentation: cfg.Instrumentation, + Interceptors: cfg.Interceptors, } tp, err := elastictransport.New(tpConfig) diff --git a/elasticsearch_internal_test.go b/elasticsearch_internal_test.go index ca9accb31a..0180b433c5 100644 --- a/elasticsearch_internal_test.go +++ b/elasticsearch_internal_test.go @@ -1480,6 +1480,126 @@ func TestClose(t *testing.T) { } } +func TestIntercepts(t *testing.T) { + successTp := func(request *http.Request) (*http.Response, error) { + h := request.Header.Clone() + h.Add("X-Elastic-Product", "Elasticsearch") + h.Add("X-Found-Handling-Cluster", "foo-bar-cluster-id") + h.Add("X-Found-Handling-Instance", "0123456789") + return &http.Response{ + StatusCode: http.StatusOK, + Header: h, + Body: io.NopCloser(strings.NewReader(`{}`)), + }, nil + } + + const ( + testInterceptReqHeader = "X-TEST-INTERCEPT-REQ" + testInterceptRespHeader = "X-TEST-INTERCEPT-RESP" + ) + + dummyInterceptorFactory := func(num int) elastictransport.InterceptorFunc { + return func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(r *http.Request) (*http.Response, error) { + r.Header.Add(testInterceptReqHeader, strconv.Itoa(num)) + resp, err := next(r) + resp.Header.Add(testInterceptRespHeader, strconv.Itoa(num)) + return resp, err + } + } + } + + validateReqRespH := func(expectReq, expectResp string) func(t *testing.T, resp *esapi.Response) { + return func(t *testing.T, resp *esapi.Response) { + if strings.Join(resp.Header.Values(testInterceptReqHeader), ",") != expectReq { + t.Error("expected header value", resp.Header.Values(testInterceptReqHeader)) + } + if strings.Join(resp.Header.Values(testInterceptRespHeader), ",") != expectResp { + t.Error("expected header value", resp.Header.Values(testInterceptRespHeader)) + } + } + } + + type args struct { + interceptors []elastictransport.InterceptorFunc + } + tests := []struct { + name string + args args + wantErr bool + validateResponse func(t *testing.T, resp *esapi.Response) + }{ + { + name: "interceptor array nil", + args: args{interceptors: nil}, + wantErr: false, + }, + { + name: "interceptor array empty", + args: args{interceptors: []elastictransport.InterceptorFunc{}}, + wantErr: false, + }, + { + name: "interceptor array nil interceptor", + args: args{interceptors: []elastictransport.InterceptorFunc{nil, dummyInterceptorFactory(1)}}, + wantErr: false, + validateResponse: validateReqRespH("1", "1"), + }, + { + name: "interceptor array 1 interceptor", + args: args{interceptors: []elastictransport.InterceptorFunc{dummyInterceptorFactory(1)}}, + wantErr: false, + validateResponse: validateReqRespH("1", "1"), + }, + { + name: "interceptor array many interceptors", + args: args{interceptors: []elastictransport.InterceptorFunc{ + dummyInterceptorFactory(1), dummyInterceptorFactory(2), dummyInterceptorFactory(3)}}, + wantErr: false, + validateResponse: validateReqRespH("1,2,3", "3,2,1"), + }, + { + name: "interceptor returns error", + args: args{interceptors: []elastictransport.InterceptorFunc{ + func(next elastictransport.RoundTripFunc) elastictransport.RoundTripFunc { + return func(r *http.Request) (*http.Response, error) { + return nil, errors.New("test error") + } + }, + }}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Run("typed client", func(t *testing.T) { + es, _ := NewTypedClient(Config{ + Transport: &mockTransp{RoundTripFunc: successTp}, + Interceptors: tt.args.interceptors, + }) + _, err := es.Info().Do(context.Background()) + if (err != nil) != tt.wantErr { + t.Errorf("Info() error = %v, wantErr %v", err, tt.wantErr) + } + }) + t.Run("low-level client", func(t *testing.T) { + es, _ := NewClient(Config{ + Transport: &mockTransp{RoundTripFunc: successTp}, + Interceptors: tt.args.interceptors, + }) + resp, err := es.Info() + if (err != nil) != tt.wantErr { + t.Errorf("Info() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.validateResponse != nil { + tt.validateResponse(t, resp) + } + }) + }) + } +} + type mockESTransport struct { PerformFunc func(*http.Request) (*http.Response, error) CloseFunc func(context.Context) error