diff --git a/generator/apache/apache.go b/generator/apache/apache.go index d7e2f2f..7059b8e 100644 --- a/generator/apache/apache.go +++ b/generator/apache/apache.go @@ -12,6 +12,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/observiq/blitz/internal/generator/security" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -96,6 +97,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*ApacheLogGenerat // Start starts the Apache log generator and writes data using the // provided generator writer. +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *ApacheLogGenerator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *ApacheLogGenerator) Start(writer output.Writer) error { g.logger.Info("Starting Apache log generator", zap.Int("workers", g.workers), diff --git a/generator/apache_combined/apache_combined.go b/generator/apache_combined/apache_combined.go index 91bb12a..4799a26 100644 --- a/generator/apache_combined/apache_combined.go +++ b/generator/apache_combined/apache_combined.go @@ -11,6 +11,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/observiq/blitz/internal/useragent" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -97,6 +98,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*ApacheCombinedLo // Start starts the Apache Combined log generator and writes data using the // provided generator writer. +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *ApacheCombinedLogGenerator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *ApacheCombinedLogGenerator) Start(writer output.Writer) error { g.logger.Info("Starting Apache Combined log generator", zap.Int("workers", g.workers), diff --git a/generator/apache_error/apache_error.go b/generator/apache_error/apache_error.go index 9b5f521..bc23692 100644 --- a/generator/apache_error/apache_error.go +++ b/generator/apache_error/apache_error.go @@ -11,6 +11,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -94,6 +95,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*ApacheErrorLogGe // Start starts the Apache Error log generator and writes data using the // provided generator writer. +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *ApacheErrorLogGenerator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *ApacheErrorLogGenerator) Start(writer output.Writer) error { g.logger.Info("Starting Apache Error log generator", zap.Int("workers", g.workers), diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index ac8d340..e1038e2 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/golang-lru/v2/expirable" "github.com/observiq/blitz/internal/generator/ctime" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -151,6 +152,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string, cac } // Start starts the File log generator +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *FileLogGenerator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *FileLogGenerator) Start(writer output.Writer) error { g.logger.Info("Starting File log generator", zap.String("source", g.source), diff --git a/generator/generator.go b/generator/generator.go index e8c164f..fc54386 100644 --- a/generator/generator.go +++ b/generator/generator.go @@ -4,10 +4,14 @@ import ( "context" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" ) // Generator is the interface for generating data. type Generator interface { + // SupportedTelemetry returns the telemetry types this generator can produce. + SupportedTelemetry() []telemetry.Type + // Start starts the generator and writes data using the // provided generator writer. Start(writer output.Writer) error diff --git a/generator/json/json.go b/generator/json/json.go index 7748516..316eca6 100644 --- a/generator/json/json.go +++ b/generator/json/json.go @@ -11,6 +11,7 @@ import ( jsonlib "github.com/goccy/go-json" "github.com/observiq/blitz/internal/generator/logtypes" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -40,6 +41,11 @@ type JSONLogGenerator struct { jsonWriteErrors metric.Int64Counter } +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *JSONLogGenerator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + // New creates a new JSON log generator func New(logger *zap.Logger, workers int, rate time.Duration, logType string) (*JSONLogGenerator, error) { if logger == nil { diff --git a/generator/kubernetes/kubernetes.go b/generator/kubernetes/kubernetes.go index db3d37f..833dd0f 100644 --- a/generator/kubernetes/kubernetes.go +++ b/generator/kubernetes/kubernetes.go @@ -11,6 +11,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -124,6 +125,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration, format string) (*G // Start starts the Kubernetes container log generator and writes data using the // provided generator writer. +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *Generator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *Generator) Start(writer output.Writer) error { g.logger.Info("Starting Kubernetes container log generator", zap.Int("workers", g.workers), diff --git a/generator/nginx/nginx.go b/generator/nginx/nginx.go index 97f786c..10a5eba 100644 --- a/generator/nginx/nginx.go +++ b/generator/nginx/nginx.go @@ -12,6 +12,7 @@ import ( "github.com/observiq/blitz/internal/generator/security" "github.com/observiq/blitz/internal/useragent" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -184,6 +185,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error // Start starts the NGINX log generator and writes data using the // provided generator writer. +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *Generator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *Generator) Start(writer output.Writer) error { g.logger.Info("Starting NGINX log generator", zap.Int("workers", g.workers), diff --git a/generator/nop/nop.go b/generator/nop/nop.go index 7798701..d68a46c 100644 --- a/generator/nop/nop.go +++ b/generator/nop/nop.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.uber.org/zap" ) @@ -24,6 +25,11 @@ func New(logger *zap.Logger) (*NopGenerator, error) { }, nil } +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *NopGenerator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + // Start starts the nop generator (performs no work) func (g *NopGenerator) Start(writer output.Writer) error { g.logger.Info("Starting NOP generator (no work performed)") diff --git a/generator/okta/okta.go b/generator/okta/okta.go index d0e879a..1b65faf 100644 --- a/generator/okta/okta.go +++ b/generator/okta/okta.go @@ -10,6 +10,7 @@ import ( "github.com/cenkalti/backoff/v4" jsonlib "github.com/goccy/go-json" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -239,6 +240,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error } // Start starts the Okta log generator +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *Generator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *Generator) Start(writer output.Writer) error { g.logger.Info("Starting Okta log generator", zap.Int("workers", g.workers), diff --git a/generator/paloalto/paloalto.go b/generator/paloalto/paloalto.go index a1728a2..9818a8d 100644 --- a/generator/paloalto/paloalto.go +++ b/generator/paloalto/paloalto.go @@ -13,6 +13,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -35,6 +36,11 @@ type Generator struct { writeErrors metric.Int64Counter } +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *Generator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + // New creates a new Palo Alto generator. func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error) { if logger == nil { diff --git a/generator/postgres/postgres.go b/generator/postgres/postgres.go index 2f3c2b3..4ffdb72 100644 --- a/generator/postgres/postgres.go +++ b/generator/postgres/postgres.go @@ -10,6 +10,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -257,6 +258,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error // Start starts the PostgreSQL log generator and writes data using the // provided generator writer. +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *Generator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (g *Generator) Start(writer output.Writer) error { g.logger.Info("Starting PostgreSQL log generator", zap.Int("workers", g.workers), diff --git a/generator/winevt/winevt.go b/generator/winevt/winevt.go index ae6cb3e..5120810 100644 --- a/generator/winevt/winevt.go +++ b/generator/winevt/winevt.go @@ -9,6 +9,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/observiq/blitz/internal/generators/winevt/templates" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -31,6 +32,11 @@ type WinevtGenerator struct { winevtWriteErrors metric.Int64Counter } +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *WinevtGenerator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + // New creates a new Windows Event generator. func New(logger *zap.Logger, workers int, rate time.Duration) (*WinevtGenerator, error) { if logger == nil { diff --git a/internal/service/service.go b/internal/service/service.go index 331c37a..1775be1 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -7,6 +7,7 @@ import ( "github.com/observiq/blitz/generator" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.uber.org/zap" ) @@ -27,6 +28,12 @@ func New(logger *zap.Logger, generator generator.Generator, output output.Output return nil, fmt.Errorf("output cannot be nil") } + telemetryType, err := telemetry.Compatible(generator.SupportedTelemetry(), output.SupportedTelemetry()) + if err != nil { + return nil, fmt.Errorf("incompatible generator and output: %w", err) + } + logger.Info("telemetry type", zap.String("type", string(telemetryType))) + return &Service{ Logger: logger, Generator: generator, diff --git a/output/file/file.go b/output/file/file.go index ed95dae..6994334 100644 --- a/output/file/file.go +++ b/output/file/file.go @@ -7,6 +7,7 @@ import ( "github.com/observiq/blitz/internal/workermanager" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -200,6 +201,11 @@ func (f *File) Write(ctx context.Context, data output.LogRecord) error { } // Stop gracefully stops workers and closes the writer +// SupportedTelemetry returns the telemetry types this output supports. +func (f *File) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (f *File) Stop(ctx context.Context) error { f.logger.Info("Stopping File output") diff --git a/output/nop/nop.go b/output/nop/nop.go index e740c23..d064a66 100644 --- a/output/nop/nop.go +++ b/output/nop/nop.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.uber.org/zap" ) @@ -24,6 +25,11 @@ func New(logger *zap.Logger) (*NopOutput, error) { }, nil } +// SupportedTelemetry returns the telemetry types this output supports. +func (o *NopOutput) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + // Write performs no work (data is discarded) func (o *NopOutput) Write(ctx context.Context, data output.LogRecord) error { // No-op: data is discarded diff --git a/output/otlp_grpc/otlp_grpc.go b/output/otlp_grpc/otlp_grpc.go index 52eda00..0181221 100644 --- a/output/otlp_grpc/otlp_grpc.go +++ b/output/otlp_grpc/otlp_grpc.go @@ -10,6 +10,7 @@ import ( "github.com/observiq/blitz/internal/workermanager" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -406,6 +407,11 @@ func (o *OTLPGrpc) Write(ctx context.Context, data output.LogRecord) error { // Stop shall not be called more than once. // If the provided context is done, Stop will return immediately // even if workers are still shutting down. +// SupportedTelemetry returns the telemetry types this output supports. +func (o *OTLPGrpc) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (o *OTLPGrpc) Stop(ctx context.Context) error { o.logger.Info("Stopping OTLP gRPC output") diff --git a/output/output.go b/output/output.go index 89a610b..f18ce9a 100644 --- a/output/output.go +++ b/output/output.go @@ -3,6 +3,8 @@ package output import ( "context" "time" + + "github.com/observiq/blitz/telemetry" ) type LogRecord struct { @@ -34,6 +36,9 @@ type Writer interface { type Output interface { Writer + // SupportedTelemetry returns the telemetry types this output can send. + SupportedTelemetry() []telemetry.Type + // Stop stops the output. Stop(ctx context.Context) error } diff --git a/output/stdout/stdout.go b/output/stdout/stdout.go index 468beb6..a7f934e 100644 --- a/output/stdout/stdout.go +++ b/output/stdout/stdout.go @@ -6,6 +6,7 @@ import ( "os" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.uber.org/zap" ) @@ -25,6 +26,11 @@ func New(logger *zap.Logger) (*StdoutOutput, error) { }, nil } +// SupportedTelemetry returns the telemetry types this output supports. +func (o *StdoutOutput) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + // Write writes the log record to stdout func (o *StdoutOutput) Write(ctx context.Context, data output.LogRecord) error { _, err := fmt.Fprintln(os.Stdout, data.Message) diff --git a/output/syslog/syslog.go b/output/syslog/syslog.go index b3d7276..c438abc 100644 --- a/output/syslog/syslog.go +++ b/output/syslog/syslog.go @@ -10,6 +10,7 @@ import ( "time" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "github.com/observiq/blitz/output/syslog/ident" "github.com/observiq/blitz/output/tcp" "github.com/observiq/blitz/output/udp" @@ -158,6 +159,11 @@ func (s *Syslog) Write(ctx context.Context, rec output.LogRecord) error { return s.transport.Write(ctx, output.LogRecord{Message: formatted}) } +// SupportedTelemetry returns the telemetry types this output supports. +func (s *Syslog) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + // Stop delegates to the underlying transport. func (s *Syslog) Stop(ctx context.Context) error { return s.transport.Stop(ctx) diff --git a/output/syslog/syslog_test.go b/output/syslog/syslog_test.go index 392a434..bc5faae 100644 --- a/output/syslog/syslog_test.go +++ b/output/syslog/syslog_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -21,6 +22,10 @@ func (s *stubOutput) Write(ctx context.Context, data output.LogRecord) error { return nil } +func (s *stubOutput) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (s *stubOutput) Stop(ctx context.Context) error { s.stopped = true return nil diff --git a/output/tcp/tcp.go b/output/tcp/tcp.go index de4d087..5d3642d 100644 --- a/output/tcp/tcp.go +++ b/output/tcp/tcp.go @@ -9,6 +9,7 @@ import ( "github.com/observiq/blitz/internal/workermanager" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -213,6 +214,11 @@ func (t *TCP) Write(ctx context.Context, data output.LogRecord) error { // Stop shall not be called more than once. // If the provided context is done, Stop will return immediately // even if workers are still shutting down. +// SupportedTelemetry returns the telemetry types this output supports. +func (t *TCP) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (t *TCP) Stop(ctx context.Context) error { t.logger.Info("Stopping TCP output") diff --git a/output/udp/udp.go b/output/udp/udp.go index 39e8509..1a75185 100644 --- a/output/udp/udp.go +++ b/output/udp/udp.go @@ -8,6 +8,7 @@ import ( "github.com/observiq/blitz/internal/workermanager" "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -196,6 +197,11 @@ func (u *UDP) Write(ctx context.Context, data output.LogRecord) error { // Stop shall not be called more than once. // If the provided context is done, Stop will return immediately // even if workers are still shutting down. +// SupportedTelemetry returns the telemetry types this output supports. +func (u *UDP) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Logs} +} + func (u *UDP) Stop(ctx context.Context) error { u.logger.Info("Stopping UDP output") diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go new file mode 100644 index 0000000..c1db923 --- /dev/null +++ b/telemetry/telemetry.go @@ -0,0 +1,60 @@ +// Package telemetry defines the telemetry types supported by Blitz +// generators and outputs. +package telemetry + +import ( + "fmt" + "slices" +) + +// Type represents a telemetry signal type. +type Type string + +const ( + // Logs represents log telemetry. + Logs Type = "logs" + // Metrics represents metric telemetry. + Metrics Type = "metrics" +) + +// Valid returns true if the telemetry type is a known type. +func (t Type) Valid() bool { + switch t { + case Logs, Metrics: + return true + default: + return false + } +} + +// Supports returns true if the given type is present in the slice of supported types. +func Supports(supported []Type, t Type) bool { + return slices.Contains(supported, t) +} + +// Compatible returns the single telemetry type that is supported by both +// the generator and the output. An error is returned if there are no common +// types or if more than one type overlaps, since each pipeline must operate +// on exactly one telemetry type. +func Compatible(generatorTypes, outputTypes []Type) (Type, error) { + var common []Type + for _, gt := range generatorTypes { + if Supports(outputTypes, gt) { + common = append(common, gt) + } + } + switch len(common) { + case 0: + return "", fmt.Errorf( + "generator and output have no compatible telemetry types: generator supports %v, output supports %v", + generatorTypes, outputTypes, + ) + case 1: + return common[0], nil + default: + return "", fmt.Errorf( + "generator and output have multiple compatible telemetry types %v: a single type must be configured", + common, + ) + } +} diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go new file mode 100644 index 0000000..2e48db1 --- /dev/null +++ b/telemetry/telemetry_test.go @@ -0,0 +1,52 @@ +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTypeValid(t *testing.T) { + assert.True(t, Logs.Valid()) + assert.True(t, Metrics.Valid()) + assert.False(t, Type("unknown").Valid()) + assert.False(t, Type("").Valid()) +} + +func TestSupports(t *testing.T) { + logsOnly := []Type{Logs} + both := []Type{Logs, Metrics} + + assert.True(t, Supports(logsOnly, Logs)) + assert.False(t, Supports(logsOnly, Metrics)) + assert.True(t, Supports(both, Logs)) + assert.True(t, Supports(both, Metrics)) + assert.False(t, Supports(nil, Logs)) +} + +func TestCompatible(t *testing.T) { + t.Run("both support logs", func(t *testing.T) { + result, err := Compatible([]Type{Logs}, []Type{Logs}) + require.NoError(t, err) + assert.Equal(t, Logs, result) + }) + + t.Run("generator logs+metrics, output logs only", func(t *testing.T) { + result, err := Compatible([]Type{Logs, Metrics}, []Type{Logs}) + require.NoError(t, err) + assert.Equal(t, Logs, result) + }) + + t.Run("both support logs+metrics errors with multiple", func(t *testing.T) { + _, err := Compatible([]Type{Logs, Metrics}, []Type{Logs, Metrics}) + require.Error(t, err) + assert.Contains(t, err.Error(), "multiple compatible telemetry types") + }) + + t.Run("no overlap", func(t *testing.T) { + _, err := Compatible([]Type{Metrics}, []Type{Logs}) + require.Error(t, err) + assert.Contains(t, err.Error(), "no compatible telemetry types") + }) +}