From 2e3ed9473cae3106020a8f46d9d208ff919c718c Mon Sep 17 00:00:00 2001 From: Joe Sirianni Date: Fri, 6 Mar 2026 14:24:29 -0500 Subject: [PATCH] feat(output): Support OTLP metrics --- generator/apache/apache_test.go | 8 + .../apache_combined/apache_combined_test.go | 8 + generator/apache_error/apache_error_test.go | 8 + generator/filegen/filegen_test.go | 4 + generator/json/json_test.go | 8 + generator/nginx/nginx_test.go | 8 + generator/okta/okta_test.go | 4 + generator/paloalto/paloalto_test.go | 4 + generator/postgres/postgres_test.go | 8 + generator/winevt/winevt_test.go | 4 + output/file/file.go | 6 +- output/nop/nop.go | 5 + output/otlp_grpc/otlp_grpc.go | 443 +++++++++++++++--- output/output.go | 47 +- output/stdout/stdout.go | 5 + output/syslog/syslog.go | 5 + output/syslog/syslog_test.go | 4 + output/tcp/tcp.go | 5 + output/udp/udp.go | 5 + 19 files changed, 531 insertions(+), 58 deletions(-) diff --git a/generator/apache/apache_test.go b/generator/apache/apache_test.go index ed76468..ca53c32 100644 --- a/generator/apache/apache_test.go +++ b/generator/apache/apache_test.go @@ -53,6 +53,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() @@ -289,6 +293,10 @@ func (d *discardWriter) Write(ctx context.Context, data output.LogRecord) error return nil } +func (d *discardWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func BenchmarkApacheGenerator(b *testing.B) { logger := zaptest.NewLogger(b) writer := &discardWriter{} diff --git a/generator/apache_combined/apache_combined_test.go b/generator/apache_combined/apache_combined_test.go index fdce79c..59af176 100644 --- a/generator/apache_combined/apache_combined_test.go +++ b/generator/apache_combined/apache_combined_test.go @@ -53,6 +53,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() @@ -290,6 +294,10 @@ func (d *discardWriter) Write(ctx context.Context, data output.LogRecord) error return nil } +func (d *discardWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func BenchmarkApacheCombinedGenerator(b *testing.B) { logger := zaptest.NewLogger(b) writer := &discardWriter{} diff --git a/generator/apache_error/apache_error_test.go b/generator/apache_error/apache_error_test.go index 9d0fb7a..2752bfe 100644 --- a/generator/apache_error/apache_error_test.go +++ b/generator/apache_error/apache_error_test.go @@ -53,6 +53,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() @@ -296,6 +300,10 @@ func (d *discardWriter) Write(ctx context.Context, data output.LogRecord) error return nil } +func (d *discardWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func BenchmarkApacheErrorGenerator(b *testing.B) { logger := zaptest.NewLogger(b) writer := &discardWriter{} diff --git a/generator/filegen/filegen_test.go b/generator/filegen/filegen_test.go index 9e5b5e3..9083f3a 100644 --- a/generator/filegen/filegen_test.go +++ b/generator/filegen/filegen_test.go @@ -49,6 +49,10 @@ func (m *mockWriter) Write(ctx context.Context, record output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) Close(ctx context.Context) error { return nil } diff --git a/generator/json/json_test.go b/generator/json/json_test.go index e18e59b..f5a8a84 100644 --- a/generator/json/json_test.go +++ b/generator/json/json_test.go @@ -62,6 +62,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() @@ -389,6 +393,10 @@ func (d *discardWriter) Write(ctx context.Context, data output.LogRecord) error return nil } +func (d *discardWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func BenchmarkGenerateDefaultLog(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/generator/nginx/nginx_test.go b/generator/nginx/nginx_test.go index fd68ade..3922d22 100644 --- a/generator/nginx/nginx_test.go +++ b/generator/nginx/nginx_test.go @@ -53,6 +53,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() @@ -269,6 +273,10 @@ func (d *discardWriter) Write(ctx context.Context, data output.LogRecord) error return nil } +func (d *discardWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func BenchmarkNginxGenerator(b *testing.B) { logger := zaptest.NewLogger(b) writer := &discardWriter{} diff --git a/generator/okta/okta_test.go b/generator/okta/okta_test.go index 0aa379c..b7fdda5 100644 --- a/generator/okta/okta_test.go +++ b/generator/okta/okta_test.go @@ -44,6 +44,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() diff --git a/generator/paloalto/paloalto_test.go b/generator/paloalto/paloalto_test.go index af64554..61be887 100644 --- a/generator/paloalto/paloalto_test.go +++ b/generator/paloalto/paloalto_test.go @@ -52,6 +52,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() diff --git a/generator/postgres/postgres_test.go b/generator/postgres/postgres_test.go index 6568dd1..3324a52 100644 --- a/generator/postgres/postgres_test.go +++ b/generator/postgres/postgres_test.go @@ -53,6 +53,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() @@ -270,6 +274,10 @@ func (d *discardWriter) Write(ctx context.Context, data output.LogRecord) error return nil } +func (d *discardWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func BenchmarkPostgresGenerator(b *testing.B) { logger := zaptest.NewLogger(b) writer := &discardWriter{} diff --git a/generator/winevt/winevt_test.go b/generator/winevt/winevt_test.go index f6a7818..16c12df 100644 --- a/generator/winevt/winevt_test.go +++ b/generator/winevt/winevt_test.go @@ -33,6 +33,10 @@ func (m *mockWriter) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (m *mockWriter) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (m *mockWriter) getWrites() [][]byte { m.mu.Lock() defer m.mu.Unlock() diff --git a/output/file/file.go b/output/file/file.go index 6994334..644ed6c 100644 --- a/output/file/file.go +++ b/output/file/file.go @@ -200,7 +200,11 @@ func (f *File) Write(ctx context.Context, data output.LogRecord) error { } } -// Stop gracefully stops workers and closes the writer +// WriteMetric is not supported by the file output. +func (f *File) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + // SupportedTelemetry returns the telemetry types this output supports. func (f *File) SupportedTelemetry() []telemetry.Type { return []telemetry.Type{telemetry.Logs} diff --git a/output/nop/nop.go b/output/nop/nop.go index d064a66..077beca 100644 --- a/output/nop/nop.go +++ b/output/nop/nop.go @@ -36,6 +36,11 @@ func (o *NopOutput) Write(ctx context.Context, data output.LogRecord) error { return nil } +// WriteMetric is not supported by the nop output. +func (o *NopOutput) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + // Stop performs no work func (o *NopOutput) Stop(ctx context.Context) error { o.logger.Info("Stopping NOP output") diff --git a/output/otlp_grpc/otlp_grpc.go b/output/otlp_grpc/otlp_grpc.go index 0181221..b714ee5 100644 --- a/output/otlp_grpc/otlp_grpc.go +++ b/output/otlp_grpc/otlp_grpc.go @@ -15,8 +15,10 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" collectorlogs "go.opentelemetry.io/proto/otlp/collector/logs/v1" + collectormetrics "go.opentelemetry.io/proto/otlp/collector/metrics/v1" commonpb "go.opentelemetry.io/proto/otlp/common/v1" logspb "go.opentelemetry.io/proto/otlp/logs/v1" + metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" "go.uber.org/zap" "google.golang.org/grpc" @@ -133,25 +135,33 @@ func WithTLSConfig(tlsConfig *tls.Config) OTLPGrpcOption { // OTLPGrpc implements the Output interface for OTLP gRPC connections type OTLPGrpc struct { - logger *zap.Logger - host string - port string - workers int - insecure bool - tlsConfig *tls.Config - dataChan chan *logspb.LogRecord - ctx context.Context - cancel context.CancelFunc - workerManager *workermanager.WorkerManager - meter metric.Meter - - // Metrics - otlpLogsReceived metric.Int64Counter - otlpActiveWorkers metric.Int64Gauge - otlpLogRate metric.Float64Counter - otlpRequestSizeBytes metric.Int64Histogram - otlpRequestLatency metric.Float64Histogram - otlpSendErrors metric.Int64Counter + logger *zap.Logger + host string + port string + workers int + insecure bool + tlsConfig *tls.Config + ctx context.Context + cancel context.CancelFunc + meter metric.Meter + + // Log pipeline + logDataChan chan *logspb.LogRecord + logWorkerManager *workermanager.WorkerManager + + // Metric pipeline + metricDataChan chan *metricspb.Metric + metricWorkerManager *workermanager.WorkerManager + + // Observability metrics + otlpLogsReceived metric.Int64Counter + otlpMetricsReceived metric.Int64Counter + otlpActiveWorkers metric.Int64Gauge + otlpLogRate metric.Float64Counter + otlpMetricRate metric.Float64Counter + otlpRequestSizeBytes metric.Int64Histogram + otlpRequestLatency metric.Float64Histogram + otlpSendErrors metric.Int64Counter // Configuration batchTimeout time.Duration @@ -211,7 +221,7 @@ func New(logger *zap.Logger, opts ...OTLPGrpcOption) (*OTLPGrpc, error) { meter := otel.Meter("blitz-otlp-grpc-output") - // Initialize metrics + // Initialize observability metrics otlpLogsReceived, err := meter.Int64Counter( "blitz.otlp_grpc.logs.received", metric.WithDescription("Number of logs received from the write channel"), @@ -220,6 +230,14 @@ func New(logger *zap.Logger, opts ...OTLPGrpcOption) (*OTLPGrpc, error) { return nil, fmt.Errorf("create logs received counter: %w", err) } + otlpMetricsReceived, err := meter.Int64Counter( + "blitz.otlp_grpc.metrics.received", + metric.WithDescription("Number of metrics received from the write channel"), + ) + if err != nil { + return nil, fmt.Errorf("create metrics received counter: %w", err) + } + otlpActiveWorkers, err := meter.Int64Gauge( "blitz.otlp_grpc.workers.active", metric.WithDescription("Number of active worker goroutines"), @@ -236,6 +254,14 @@ func New(logger *zap.Logger, opts ...OTLPGrpcOption) (*OTLPGrpc, error) { return nil, fmt.Errorf("create log rate counter: %w", err) } + otlpMetricRate, err := meter.Float64Counter( + "blitz.otlp_grpc.metric.rate", + metric.WithDescription("Rate at which metrics are successfully sent to the configured host"), + ) + if err != nil { + return nil, fmt.Errorf("create metric rate counter: %w", err) + } + otlpRequestSizeBytes, err := meter.Int64Histogram( "blitz.otlp_grpc.request.size.bytes", metric.WithDescription("Size of requests in bytes"), @@ -261,19 +287,22 @@ func New(logger *zap.Logger, opts ...OTLPGrpcOption) (*OTLPGrpc, error) { } otlp := &OTLPGrpc{ - logger: logger.Named("output-otlp-grpc"), - host: cfg.host, - port: cfg.port, - workers: cfg.workers, - insecure: cfg.insecure, - tlsConfig: cfg.tlsConfig, - dataChan: make(chan *logspb.LogRecord, DefaultOTLPGrpcChannelSize), - ctx: ctx, - cancel: cancel, - meter: meter, - otlpLogsReceived: otlpLogsReceived, - otlpActiveWorkers: otlpActiveWorkers, - otlpLogRate: otlpLogRate, + logger: logger.Named("output-otlp-grpc"), + host: cfg.host, + port: cfg.port, + workers: cfg.workers, + insecure: cfg.insecure, + tlsConfig: cfg.tlsConfig, + logDataChan: make(chan *logspb.LogRecord, DefaultOTLPGrpcChannelSize), + metricDataChan: make(chan *metricspb.Metric, DefaultOTLPGrpcChannelSize), + ctx: ctx, + cancel: cancel, + meter: meter, + otlpLogsReceived: otlpLogsReceived, + otlpMetricsReceived: otlpMetricsReceived, + otlpActiveWorkers: otlpActiveWorkers, + otlpLogRate: otlpLogRate, + otlpMetricRate: otlpMetricRate, otlpRequestSizeBytes: otlpRequestSizeBytes, otlpRequestLatency: otlpRequestLatency, otlpSendErrors: otlpSendErrors, @@ -294,24 +323,37 @@ func New(logger *zap.Logger, opts ...OTLPGrpcOption) (*OTLPGrpc, error) { zap.Bool("tls_enabled", cfg.tlsConfig != nil), ) - // Create channel size gauge + // Create channel size gauges _, err = meter.Int64ObservableGauge( - "blitz.otlp_grpc.channel.size", - metric.WithDescription("Current size of the data channel"), + "blitz.otlp_grpc.log_channel.size", + metric.WithDescription("Current size of the log data channel"), metric.WithInt64Callback(func(_ context.Context, io metric.Int64Observer) error { - io.Observe(int64(len(otlp.dataChan))) + io.Observe(int64(len(otlp.logDataChan))) return nil }), ) if err != nil { - return nil, fmt.Errorf("create channel size gauge: %w", err) + return nil, fmt.Errorf("create log channel size gauge: %w", err) } - // Create worker manager - otlp.workerManager = workermanager.NewWorkerManager(otlp.logger, cfg.workers, otlp.otlpWorker) + _, err = meter.Int64ObservableGauge( + "blitz.otlp_grpc.metric_channel.size", + metric.WithDescription("Current size of the metric data channel"), + metric.WithInt64Callback(func(_ context.Context, io metric.Int64Observer) error { + io.Observe(int64(len(otlp.metricDataChan))) + return nil + }), + ) + if err != nil { + return nil, fmt.Errorf("create metric channel size gauge: %w", err) + } + + // Create worker managers for logs and metrics + otlp.logWorkerManager = workermanager.NewWorkerManager(otlp.logger, cfg.workers, otlp.logWorker) + otlp.metricWorkerManager = workermanager.NewWorkerManager(otlp.logger, cfg.workers, otlp.metricWorker) - // Record initial active workers count - otlp.otlpActiveWorkers.Record(context.Background(), int64(cfg.workers), + // Record initial active workers count (logs + metrics workers) + otlp.otlpActiveWorkers.Record(context.Background(), int64(cfg.workers*2), metric.WithAttributeSet( attribute.NewSet( attribute.String("component", "output_otlp_grpc"), @@ -320,7 +362,8 @@ func New(logger *zap.Logger, opts ...OTLPGrpcOption) (*OTLPGrpc, error) { ) // Start the workers - otlp.workerManager.Start() + otlp.logWorkerManager.Start() + otlp.metricWorkerManager.Start() return otlp, nil } @@ -386,7 +429,7 @@ func (o *OTLPGrpc) Write(ctx context.Context, data output.LogRecord) error { } select { - case o.dataChan <- record: + case o.logDataChan <- record: // Record logs received o.otlpLogsReceived.Add(ctx, 1, metric.WithAttributeSet( @@ -403,14 +446,92 @@ func (o *OTLPGrpc) Write(ctx context.Context, data output.LogRecord) error { } } +// WriteMetric sends metric data to the OTLP gRPC output channel for processing by metric workers. +// WriteMetric shall not be called after Stop is called. +func (o *OTLPGrpc) WriteMetric(ctx context.Context, data output.MetricRecord) error { + m := o.buildOTLPMetric(data) + + select { + case o.metricDataChan <- m: + o.otlpMetricsReceived.Add(ctx, 1, + metric.WithAttributeSet( + attribute.NewSet( + attribute.String("component", "output_otlp_grpc"), + ), + ), + ) + return nil + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting to write metric: %w", ctx.Err()) + case <-o.ctx.Done(): + return fmt.Errorf("OTLP gRPC output is shutting down") + } +} + +// buildOTLPMetric converts an output.MetricRecord to an OTLP Metric protobuf. +func (o *OTLPGrpc) buildOTLPMetric(data output.MetricRecord) *metricspb.Metric { + timestamp := data.Timestamp + if timestamp.IsZero() { + timestamp = time.Now() + } + timeNano := output.TimeToUnixNanoUint64(timestamp) + + // Build attributes + attrs := make([]*commonpb.KeyValue, 0, len(data.Attributes)) + for k, v := range data.Attributes { + attrs = append(attrs, &commonpb.KeyValue{ + Key: k, + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: v}}, + }) + } + + // Build the data point value + dp := &metricspb.NumberDataPoint{ + TimeUnixNano: timeNano, + Attributes: attrs, + } + if data.DoubleValue != nil { + dp.Value = &metricspb.NumberDataPoint_AsDouble{AsDouble: *data.DoubleValue} + } else if data.IntValue != nil { + dp.Value = &metricspb.NumberDataPoint_AsInt{AsInt: *data.IntValue} + } + + m := &metricspb.Metric{ + Name: data.Name, + Description: data.Description, + Unit: data.Unit, + } + + switch data.Type { + case output.MetricTypeSum: + m.Data = &metricspb.Metric_Sum{ + Sum: &metricspb.Sum{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: true, + DataPoints: []*metricspb.NumberDataPoint{dp}, + }, + } + default: + // Default to gauge + m.Data = &metricspb.Metric_Gauge{ + Gauge: &metricspb.Gauge{ + DataPoints: []*metricspb.NumberDataPoint{dp}, + }, + } + } + + return m +} + +// SupportedTelemetry returns the telemetry types this output supports. +func (o *OTLPGrpc) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs, telemetry.Metrics} +} + // Stop gracefully shuts down all workers and closes OTLP gRPC connections // Stop shall not be called more than once. // If the provided context is done, Stop will return immediately // even if workers are still shutting down. -// SupportedTelemetry returns the telemetry types this output supports. -func (o *OTLPGrpc) SupportedTelemetry() []telemetry.Type { - return []telemetry.Type{telemetry.Logs} -} func (o *OTLPGrpc) Stop(ctx context.Context) error { o.logger.Info("Stopping OTLP gRPC output") @@ -424,27 +545,29 @@ func (o *OTLPGrpc) Stop(ctx context.Context) error { ), ) - // Close the channel to ensure workers do not + // Close the channels to ensure workers do not // process new data. - close(o.dataChan) + close(o.logDataChan) + close(o.metricDataChan) // Signal the workers to stop. o.cancel() - // Stop the worker manager - o.workerManager.Stop() + // Stop the worker managers + o.logWorkerManager.Stop() + o.metricWorkerManager.Stop() o.logger.Info("OTLP gRPC output stopped successfully") return nil } -// otlpWorker processes OTLP gRPC data from the channel and sends it to the configured host and port. +// logWorker processes OTLP gRPC log data from the channel and sends it to the configured host and port. // This function is designed to work with the worker manager, which handles automatic restart // with exponential backoff when the worker exits due to connection failures or errors. // The worker should return immediately on any failure - the worker manager will handle // reconnection attempts with appropriate backoff delays. -func (o *OTLPGrpc) otlpWorker(id int) { - o.logger.Info("Starting OTLP gRPC worker", zap.Int("worker_id", id)) +func (o *OTLPGrpc) logWorker(id int) { + o.logger.Info("Starting OTLP gRPC log worker", zap.Int("worker_id", id)) conn, err := o.connect() if err != nil { @@ -461,7 +584,7 @@ func (o *OTLPGrpc) otlpWorker(id int) { for { select { - case rec, ok := <-o.dataChan: + case rec, ok := <-o.logDataChan: if !ok { o.logger.Info("OTLP gRPC worker exiting - channel closed", zap.Int("worker_id", id)) // Flush remaining logs @@ -515,6 +638,73 @@ func (o *OTLPGrpc) otlpWorker(id int) { } } +// metricWorker processes OTLP gRPC metric data from the channel and sends it to the configured host and port. +func (o *OTLPGrpc) metricWorker(id int) { + o.logger.Info("Starting OTLP gRPC metric worker", zap.Int("worker_id", id)) + + conn, err := o.connect() + if err != nil { + o.logger.Error("Failed to establish initial OTLP gRPC connection for metrics", + zap.Int("worker_id", id), + zap.Error(err)) + return + } + defer conn.Close() + + client := collectormetrics.NewMetricsServiceClient(conn) + + batch := newMetricBatch(o.maxExportBatchSize, o.batchTimeout) + + for { + select { + case rec, ok := <-o.metricDataChan: + if !ok { + o.logger.Info("OTLP gRPC metric worker exiting - channel closed", zap.Int("worker_id", id)) + if err := o.flushMetricBatch(client, batch); err != nil { + o.logger.Error("Failed to flush final metric batch", zap.Int("worker_id", id), zap.Error(err)) + } + return + } + + batch.add(rec) + + if batch.isFull() { + if !batch.timer.Stop() { + select { + case <-batch.timer.C: + default: + } + } + if err := o.sendMetricBatch(client, batch); err != nil { + o.logger.Error("Failed to send OTLP gRPC metric batch", + zap.Int("worker_id", id), + zap.Error(err)) + return + } + batch = newMetricBatch(o.maxExportBatchSize, o.batchTimeout) + } + + case <-batch.timer.C: + if !batch.isEmpty() { + if err := o.sendMetricBatch(client, batch); err != nil { + o.logger.Error("Failed to send OTLP gRPC metric batch", + zap.Int("worker_id", id), + zap.Error(err)) + return + } + } + batch = newMetricBatch(o.maxExportBatchSize, o.batchTimeout) + + case <-o.ctx.Done(): + o.logger.Info("OTLP gRPC metric worker exiting - context cancelled", zap.Int("worker_id", id)) + if err := o.flushMetricBatch(client, batch); err != nil { + o.logger.Error("Failed to flush final metric batch", zap.Int("worker_id", id), zap.Error(err)) + } + return + } + } +} + // connect establishes a gRPC connection to the configured host and port func (o *OTLPGrpc) connect() (*grpc.ClientConn, error) { endpoint := fmt.Sprintf("%s:%s", o.host, o.port) @@ -653,6 +843,147 @@ func (o *OTLPGrpc) flushBatch(client collectorlogs.LogsServiceClient, batch *log return o.sendBatch(client, batch) } +// metricBatch holds a batch of metrics to be sent +type metricBatch struct { + metrics []*metricspb.Metric + maxSize int + timer *time.Timer + mu sync.Mutex +} + +// newMetricBatch creates a new metric batch +func newMetricBatch(maxSize int, timeout time.Duration) *metricBatch { + return &metricBatch{ + metrics: make([]*metricspb.Metric, 0, maxSize), + maxSize: maxSize, + timer: time.NewTimer(timeout), + } +} + +func (b *metricBatch) add(data *metricspb.Metric) { + b.mu.Lock() + defer b.mu.Unlock() + b.metrics = append(b.metrics, data) +} + +func (b *metricBatch) isFull() bool { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.metrics) >= b.maxSize +} + +func (b *metricBatch) isEmpty() bool { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.metrics) == 0 +} + +func (b *metricBatch) getAndClear() []*metricspb.Metric { + b.mu.Lock() + defer b.mu.Unlock() + metrics := b.metrics + b.metrics = make([]*metricspb.Metric, 0, b.maxSize) + return metrics +} + +// sendMetricBatch sends a batch of metrics via OTLP gRPC +func (o *OTLPGrpc) sendMetricBatch(client collectormetrics.MetricsServiceClient, batch *metricBatch) error { + startTime := time.Now() + + metrics := batch.getAndClear() + if len(metrics) == 0 { + return nil + } + + request := o.buildOTLPMetricsRequest(metrics) + + ctx, cancel := context.WithTimeout(context.Background(), o.batchTimeout) + defer cancel() + + ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{})) + + _, err := client.Export(ctx, request) + if err != nil { + o.recordSendError("metrics_export_error", err) + return fmt.Errorf("failed to export metrics: %w", err) + } + + latency := time.Since(startTime).Seconds() + requestSize := int64(proto.Size(request)) + o.otlpMetricRate.Add(context.Background(), float64(len(metrics)), + metric.WithAttributeSet( + attribute.NewSet( + attribute.String("component", "output_otlp_grpc"), + ), + ), + ) + o.otlpRequestSizeBytes.Record(context.Background(), requestSize, + metric.WithAttributeSet( + attribute.NewSet( + attribute.String("component", "output_otlp_grpc"), + attribute.String("signal", "metrics"), + ), + ), + ) + o.otlpRequestLatency.Record(context.Background(), latency, + metric.WithAttributeSet( + attribute.NewSet( + attribute.String("component", "output_otlp_grpc"), + attribute.String("signal", "metrics"), + ), + ), + ) + + return nil +} + +// flushMetricBatch flushes any remaining metrics in the batch +func (o *OTLPGrpc) flushMetricBatch(client collectormetrics.MetricsServiceClient, batch *metricBatch) error { + if !batch.timer.Stop() { + select { + case <-batch.timer.C: + default: + } + } + if batch.isEmpty() { + return nil + } + return o.sendMetricBatch(client, batch) +} + +// buildOTLPMetricsRequest builds an OTLP ExportMetricsServiceRequest +func (o *OTLPGrpc) buildOTLPMetricsRequest(metrics []*metricspb.Metric) *collectormetrics.ExportMetricsServiceRequest { + scopeMetrics := &metricspb.ScopeMetrics{ + Metrics: make([]*metricspb.Metric, 0, len(metrics)), + } + for _, m := range metrics { + if m == nil { + continue + } + scopeMetrics.Metrics = append(scopeMetrics.Metrics, m) + } + + return &collectormetrics.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + { + Key: "service.name", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_StringValue{ + StringValue: "blitz", + }, + }, + }, + }, + }, + ScopeMetrics: []*metricspb.ScopeMetrics{scopeMetrics}, + }, + }, + } +} + // buildOTLPRequest builds an OTLP ExportLogsServiceRequest from prepared LogRecord entries func (o *OTLPGrpc) buildOTLPRequest(logs []*logspb.LogRecord) *collectorlogs.ExportLogsServiceRequest { resourceLogs := &logspb.ResourceLogs{ diff --git a/output/output.go b/output/output.go index f18ce9a..411276e 100644 --- a/output/output.go +++ b/output/output.go @@ -2,11 +2,15 @@ package output import ( "context" + "errors" "time" "github.com/observiq/blitz/telemetry" ) +// ErrUnsupportedTelemetryType is returned when a writer does not support the requested telemetry type. +var ErrUnsupportedTelemetryType = errors.New("unsupported telemetry type") + type LogRecord struct { // Message is the raw log message Message string @@ -26,10 +30,51 @@ type LogRecordMetadata struct { Severity string } -// Writer can consume log records. +// MetricType represents the type of metric data point. +type MetricType string + +const ( + // MetricTypeGauge represents a gauge metric. + MetricTypeGauge MetricType = "gauge" + // MetricTypeSum represents a sum (counter) metric. + MetricTypeSum MetricType = "sum" +) + +// MetricRecord represents a single metric data point. +type MetricRecord struct { + // Name is the metric name (e.g. "system.cpu.utilization"). + Name string + + // Description is a human-readable description of the metric. + Description string + + // Unit is the metric unit (e.g. "s", "By", "1"). + Unit string + + // Type is the metric data point type (gauge or sum). + Type MetricType + + // IntValue is the integer value for the data point. Exactly one of + // IntValue or DoubleValue should be set. + IntValue *int64 + + // DoubleValue is the floating-point value for the data point. + DoubleValue *float64 + + // Attributes are key-value pairs associated with this data point. + Attributes map[string]string + + // Timestamp is when the measurement was taken. + Timestamp time.Time +} + +// Writer can consume log and metric records. type Writer interface { // Write writes the data to the output. Write(ctx context.Context, data LogRecord) error + + // WriteMetric writes a metric data point to the output. + WriteMetric(ctx context.Context, data MetricRecord) error } // Output is the interface for outputting data. diff --git a/output/stdout/stdout.go b/output/stdout/stdout.go index a7f934e..0e030b1 100644 --- a/output/stdout/stdout.go +++ b/output/stdout/stdout.go @@ -37,6 +37,11 @@ func (o *StdoutOutput) Write(ctx context.Context, data output.LogRecord) error { return err } +// WriteMetric is not supported by the stdout output. +func (o *StdoutOutput) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + // Stop performs cleanup func (o *StdoutOutput) Stop(ctx context.Context) error { o.logger.Info("Stopping stdout output") diff --git a/output/syslog/syslog.go b/output/syslog/syslog.go index c438abc..3b94d91 100644 --- a/output/syslog/syslog.go +++ b/output/syslog/syslog.go @@ -159,6 +159,11 @@ func (s *Syslog) Write(ctx context.Context, rec output.LogRecord) error { return s.transport.Write(ctx, output.LogRecord{Message: formatted}) } +// WriteMetric is not supported by the syslog output. +func (s *Syslog) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + // SupportedTelemetry returns the telemetry types this output supports. func (s *Syslog) SupportedTelemetry() []telemetry.Type { return []telemetry.Type{telemetry.Logs} diff --git a/output/syslog/syslog_test.go b/output/syslog/syslog_test.go index bc5faae..379ed36 100644 --- a/output/syslog/syslog_test.go +++ b/output/syslog/syslog_test.go @@ -22,6 +22,10 @@ func (s *stubOutput) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (s *stubOutput) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + func (s *stubOutput) SupportedTelemetry() []telemetry.Type { return []telemetry.Type{telemetry.Logs} } diff --git a/output/tcp/tcp.go b/output/tcp/tcp.go index 5d3642d..454222d 100644 --- a/output/tcp/tcp.go +++ b/output/tcp/tcp.go @@ -214,6 +214,11 @@ func (t *TCP) Write(ctx context.Context, data output.LogRecord) error { // Stop shall not be called more than once. // If the provided context is done, Stop will return immediately // even if workers are still shutting down. +// WriteMetric is not supported by the TCP output. +func (t *TCP) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + // SupportedTelemetry returns the telemetry types this output supports. func (t *TCP) SupportedTelemetry() []telemetry.Type { return []telemetry.Type{telemetry.Logs} diff --git a/output/udp/udp.go b/output/udp/udp.go index 1a75185..cacbddf 100644 --- a/output/udp/udp.go +++ b/output/udp/udp.go @@ -197,6 +197,11 @@ func (u *UDP) Write(ctx context.Context, data output.LogRecord) error { // Stop shall not be called more than once. // If the provided context is done, Stop will return immediately // even if workers are still shutting down. +// WriteMetric is not supported by the UDP output. +func (u *UDP) WriteMetric(_ context.Context, _ output.MetricRecord) error { + return output.ErrUnsupportedTelemetryType +} + // SupportedTelemetry returns the telemetry types this output supports. func (u *UDP) SupportedTelemetry() []telemetry.Type { return []telemetry.Type{telemetry.Logs}