Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions generator/apache/apache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/observiq/blitz/internal/generator/security"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -96,6 +97,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*ApacheLogGenerat

// Start starts the Apache log generator and writes data using the
// provided generator writer.
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *ApacheLogGenerator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *ApacheLogGenerator) Start(writer output.Writer) error {
g.logger.Info("Starting Apache log generator",
zap.Int("workers", g.workers),
Expand Down
6 changes: 6 additions & 0 deletions generator/apache_combined/apache_combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/observiq/blitz/internal/useragent"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -97,6 +98,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*ApacheCombinedLo

// Start starts the Apache Combined log generator and writes data using the
// provided generator writer.
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *ApacheCombinedLogGenerator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *ApacheCombinedLogGenerator) Start(writer output.Writer) error {
g.logger.Info("Starting Apache Combined log generator",
zap.Int("workers", g.workers),
Expand Down
6 changes: 6 additions & 0 deletions generator/apache_error/apache_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -94,6 +95,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*ApacheErrorLogGe

// Start starts the Apache Error log generator and writes data using the
// provided generator writer.
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *ApacheErrorLogGenerator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *ApacheErrorLogGenerator) Start(writer output.Writer) error {
g.logger.Info("Starting Apache Error log generator",
zap.Int("workers", g.workers),
Expand Down
6 changes: 6 additions & 0 deletions generator/filegen/filegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/observiq/blitz/internal/generator/ctime"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -151,6 +152,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string, cac
}

// Start starts the File log generator
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *FileLogGenerator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *FileLogGenerator) Start(writer output.Writer) error {
g.logger.Info("Starting File log generator",
zap.String("source", g.source),
Expand Down
4 changes: 4 additions & 0 deletions generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"context"

"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
)

// Generator is the interface for generating data.
type Generator interface {
// SupportedTelemetry returns the telemetry types this generator can produce.
SupportedTelemetry() []telemetry.Type

// Start starts the generator and writes data using the
// provided generator writer.
Start(writer output.Writer) error
Expand Down
6 changes: 6 additions & 0 deletions generator/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
jsonlib "github.com/goccy/go-json"
"github.com/observiq/blitz/internal/generator/logtypes"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -40,6 +41,11 @@ type JSONLogGenerator struct {
jsonWriteErrors metric.Int64Counter
}

// SupportedTelemetry returns the telemetry types this generator supports.
func (g *JSONLogGenerator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

// New creates a new JSON log generator
func New(logger *zap.Logger, workers int, rate time.Duration, logType string) (*JSONLogGenerator, error) {
if logger == nil {
Expand Down
6 changes: 6 additions & 0 deletions generator/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -124,6 +125,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration, format string) (*G

// Start starts the Kubernetes container log generator and writes data using the
// provided generator writer.
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *Generator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *Generator) Start(writer output.Writer) error {
g.logger.Info("Starting Kubernetes container log generator",
zap.Int("workers", g.workers),
Expand Down
6 changes: 6 additions & 0 deletions generator/nginx/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/observiq/blitz/internal/generator/security"
"github.com/observiq/blitz/internal/useragent"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -184,6 +185,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error

// Start starts the NGINX log generator and writes data using the
// provided generator writer.
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *Generator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *Generator) Start(writer output.Writer) error {
g.logger.Info("Starting NGINX log generator",
zap.Int("workers", g.workers),
Expand Down
6 changes: 6 additions & 0 deletions generator/nop/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.uber.org/zap"
)

Expand All @@ -24,6 +25,11 @@ func New(logger *zap.Logger) (*NopGenerator, error) {
}, nil
}

// SupportedTelemetry returns the telemetry types this generator supports.
func (g *NopGenerator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

// Start starts the nop generator (performs no work)
func (g *NopGenerator) Start(writer output.Writer) error {
g.logger.Info("Starting NOP generator (no work performed)")
Expand Down
6 changes: 6 additions & 0 deletions generator/okta/okta.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cenkalti/backoff/v4"
jsonlib "github.com/goccy/go-json"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -239,6 +240,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error
}

// Start starts the Okta log generator
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *Generator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *Generator) Start(writer output.Writer) error {
g.logger.Info("Starting Okta log generator",
zap.Int("workers", g.workers),
Expand Down
6 changes: 6 additions & 0 deletions generator/paloalto/paloalto.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -35,6 +36,11 @@ type Generator struct {
writeErrors metric.Int64Counter
}

// SupportedTelemetry returns the telemetry types this generator supports.
func (g *Generator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

// New creates a new Palo Alto generator.
func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error) {
if logger == nil {
Expand Down
6 changes: 6 additions & 0 deletions generator/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -257,6 +258,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration) (*Generator, error

// Start starts the PostgreSQL log generator and writes data using the
// provided generator writer.
// SupportedTelemetry returns the telemetry types this generator supports.
func (g *Generator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (g *Generator) Start(writer output.Writer) error {
g.logger.Info("Starting PostgreSQL log generator",
zap.Int("workers", g.workers),
Expand Down
6 changes: 6 additions & 0 deletions generator/winevt/winevt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/observiq/blitz/internal/generators/winevt/templates"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -31,6 +32,11 @@ type WinevtGenerator struct {
winevtWriteErrors metric.Int64Counter
}

// SupportedTelemetry returns the telemetry types this generator supports.
func (g *WinevtGenerator) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

// New creates a new Windows Event generator.
func New(logger *zap.Logger, workers int, rate time.Duration) (*WinevtGenerator, error) {
if logger == nil {
Expand Down
7 changes: 7 additions & 0 deletions internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/observiq/blitz/generator"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.uber.org/zap"
)

Expand All @@ -27,6 +28,12 @@ func New(logger *zap.Logger, generator generator.Generator, output output.Output
return nil, fmt.Errorf("output cannot be nil")
}

telemetryType, err := telemetry.Compatible(generator.SupportedTelemetry(), output.SupportedTelemetry())
if err != nil {
return nil, fmt.Errorf("incompatible generator and output: %w", err)
}
logger.Info("telemetry type", zap.String("type", string(telemetryType)))

return &Service{
Logger: logger,
Generator: generator,
Expand Down
6 changes: 6 additions & 0 deletions output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/observiq/blitz/internal/workermanager"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -200,6 +201,11 @@ func (f *File) Write(ctx context.Context, data output.LogRecord) error {
}

// Stop gracefully stops workers and closes the writer
// SupportedTelemetry returns the telemetry types this output supports.
func (f *File) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (f *File) Stop(ctx context.Context) error {
f.logger.Info("Stopping File output")

Expand Down
6 changes: 6 additions & 0 deletions output/nop/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.uber.org/zap"
)

Expand All @@ -24,6 +25,11 @@ func New(logger *zap.Logger) (*NopOutput, error) {
}, nil
}

// SupportedTelemetry returns the telemetry types this output supports.
func (o *NopOutput) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

// Write performs no work (data is discarded)
func (o *NopOutput) Write(ctx context.Context, data output.LogRecord) error {
// No-op: data is discarded
Expand Down
6 changes: 6 additions & 0 deletions output/otlp_grpc/otlp_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/observiq/blitz/internal/workermanager"
"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -406,6 +407,11 @@ func (o *OTLPGrpc) Write(ctx context.Context, data output.LogRecord) error {
// Stop shall not be called more than once.
// If the provided context is done, Stop will return immediately
// even if workers are still shutting down.
// SupportedTelemetry returns the telemetry types this output supports.
func (o *OTLPGrpc) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

func (o *OTLPGrpc) Stop(ctx context.Context) error {
o.logger.Info("Stopping OTLP gRPC output")

Expand Down
5 changes: 5 additions & 0 deletions output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package output
import (
"context"
"time"

"github.com/observiq/blitz/telemetry"
)

type LogRecord struct {
Expand Down Expand Up @@ -34,6 +36,9 @@ type Writer interface {
type Output interface {
Writer

// SupportedTelemetry returns the telemetry types this output can send.
SupportedTelemetry() []telemetry.Type

// Stop stops the output.
Stop(ctx context.Context) error
}
6 changes: 6 additions & 0 deletions output/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"go.uber.org/zap"
)

Expand All @@ -25,6 +26,11 @@ func New(logger *zap.Logger) (*StdoutOutput, error) {
}, nil
}

// SupportedTelemetry returns the telemetry types this output supports.
func (o *StdoutOutput) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

// Write writes the log record to stdout
func (o *StdoutOutput) Write(ctx context.Context, data output.LogRecord) error {
_, err := fmt.Fprintln(os.Stdout, data.Message)
Expand Down
6 changes: 6 additions & 0 deletions output/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/observiq/blitz/output"
"github.com/observiq/blitz/telemetry"
"github.com/observiq/blitz/output/syslog/ident"
"github.com/observiq/blitz/output/tcp"
"github.com/observiq/blitz/output/udp"
Expand Down Expand Up @@ -158,6 +159,11 @@ func (s *Syslog) Write(ctx context.Context, rec output.LogRecord) error {
return s.transport.Write(ctx, output.LogRecord{Message: formatted})
}

// SupportedTelemetry returns the telemetry types this output supports.
func (s *Syslog) SupportedTelemetry() []telemetry.Type {
return []telemetry.Type{telemetry.Logs}
}

// Stop delegates to the underlying transport.
func (s *Syslog) Stop(ctx context.Context) error {
return s.transport.Stop(ctx)
Expand Down
Loading