diff --git a/cmd/blitz/main.go b/cmd/blitz/main.go index 8ae9299..47fba51 100644 --- a/cmd/blitz/main.go +++ b/cmd/blitz/main.go @@ -9,12 +9,14 @@ import ( "net" "net/http" "os" + "reflect" "os/signal" "strconv" "strings" "syscall" "time" + "github.com/go-viper/mapstructure/v2" "github.com/observiq/blitz/generator" apachegen "github.com/observiq/blitz/generator/apache" apachecombinedgen "github.com/observiq/blitz/generator/apache_combined" @@ -22,6 +24,7 @@ import ( "github.com/observiq/blitz/generator/filegen" jsongen "github.com/observiq/blitz/generator/json" "github.com/observiq/blitz/generator/kubernetes" + metricsgen "github.com/observiq/blitz/generator/metrics" "github.com/observiq/blitz/generator/nginx" gennop "github.com/observiq/blitz/generator/nop" "github.com/observiq/blitz/generator/okta" @@ -110,7 +113,13 @@ func run(cmd *cobra.Command, args []string) error { } cfg := config.NewConfig() - if err := viper.Unmarshal(cfg); err != nil { + if err := viper.Unmarshal(cfg, viper.DecodeHook( + mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + flattenNestedMapHook(), + ), + )); err != nil { return fmt.Errorf("failed to unmarshal config: %w", err) } @@ -399,6 +408,31 @@ func run(cmd *cobra.Command, args []string) error { logger.Error("Failed to create Okta generator", zap.Error(err)) return err } + case config.GeneratorTypeMetrics: + metricDefs := make([]metricsgen.MetricDefinition, 0, len(cfg.Generator.Metrics.Metrics)) + for _, m := range cfg.Generator.Metrics.Metrics { + metricDefs = append(metricDefs, metricsgen.MetricDefinition{ + Name: m.Name, + Type: output.MetricType(m.Type), + Description: m.Description, + Unit: m.Unit, + Attributes: m.Attributes, + ValueMin: m.ValueMin, + ValueMax: m.ValueMax, + }) + } + generatorInstance, err = metricsgen.New( + logger, + cfg.Generator.Metrics.Workers, + cfg.Generator.Metrics.Rate, + cfg.Generator.Metrics.ResourceAttributes, + metricDefs, + ) + + if err != nil { + logger.Error("Failed to create metrics generator", zap.Error(err)) + return err + } default: logger.Error("Invalid generator type", zap.String("type", string(cfg.Generator.Type))) return fmt.Errorf("invalid generator type: %s", cfg.Generator.Type) @@ -466,3 +500,68 @@ func httpServer(port int, logger *zap.Logger) error { logger.Info("starting metrics HTTP server", zap.String("addr", addr)) return s.ListenAndServe() } + +// flattenNestedMapHook returns a mapstructure DecodeHookFunc that flattens +// nested map[string]interface{} values into dotted-key maps. This is needed +// because Viper splits YAML keys containing "." into nested maps +// (e.g. "service.name: foo" becomes {"service": {"name": "foo"}}). +// Supports both map[string]string and map[string][]string target types. +func flattenNestedMapHook() mapstructure.DecodeHookFunc { + return func(from reflect.Type, to reflect.Type, data interface{}) (interface{}, error) { + src, ok := data.(map[string]interface{}) + if !ok { + return data, nil + } + + switch { + case to == reflect.TypeOf(map[string]string{}): + result := make(map[string]string) + flattenMapString("", src, result) + return result, nil + + case to == reflect.TypeOf(map[string][]string{}): + result := make(map[string][]string) + flattenMapSlice("", src, result) + return result, nil + + default: + return data, nil + } + } +} + +func flattenMapString(prefix string, m map[string]interface{}, out map[string]string) { + for k, v := range m { + key := k + if prefix != "" { + key = prefix + "." + k + } + switch val := v.(type) { + case map[string]interface{}: + flattenMapString(key, val, out) + default: + out[key] = fmt.Sprintf("%v", val) + } + } +} + +func flattenMapSlice(prefix string, m map[string]interface{}, out map[string][]string) { + for k, v := range m { + key := k + if prefix != "" { + key = prefix + "." + k + } + switch val := v.(type) { + case map[string]interface{}: + flattenMapSlice(key, val, out) + case []interface{}: + strs := make([]string, 0, len(val)) + for _, elem := range val { + strs = append(strs, fmt.Sprintf("%v", elem)) + } + out[key] = strs + default: + out[key] = []string{fmt.Sprintf("%v", val)} + } + } +} diff --git a/generator/metrics/metrics.go b/generator/metrics/metrics.go new file mode 100644 index 0000000..a3f3075 --- /dev/null +++ b/generator/metrics/metrics.go @@ -0,0 +1,273 @@ +package metrics + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" +) + +const componentName = "generator_metrics" + +// MetricDefinition describes a single metric to generate. +type MetricDefinition struct { + Name string + Type output.MetricType + Description string + Unit string + Attributes map[string]string + ValueMin float64 + ValueMax float64 +} + +// Generator generates synthetic metric data points. +type Generator struct { + logger *zap.Logger + workers int + rate time.Duration + + // resourceCombos is the pre-computed cartesian product of resource + // attribute values. Each entry is a flat map for a single resource. + resourceCombos []map[string]string + metrics []MetricDefinition + rng *rand.Rand + + wg sync.WaitGroup + stopCh chan struct{} + meter metric.Meter + + metricsGenerated metric.Int64Counter + activeWorkers metric.Int64Gauge + writeErrors metric.Int64Counter +} + +// New creates a new metrics generator. +func New( + logger *zap.Logger, + workers int, + rate time.Duration, + resourceAttrs map[string][]string, + metricDefs []MetricDefinition, +) (*Generator, error) { + if logger == nil { + return nil, fmt.Errorf("logger cannot be nil") + } + if workers < 1 { + return nil, fmt.Errorf("workers must be 1 or greater, got %d", workers) + } + if len(metricDefs) == 0 { + return nil, fmt.Errorf("at least one metric definition is required") + } + + meter := otel.Meter("blitz-generator") + + metricsGenerated, err := meter.Int64Counter( + "blitz.generator.metrics.generated", + metric.WithDescription("Total number of metric data points generated"), + ) + if err != nil { + return nil, fmt.Errorf("create metrics generated counter: %w", err) + } + + activeWorkers, err := meter.Int64Gauge( + "blitz.generator.workers.active", + metric.WithDescription("Number of active worker goroutines"), + ) + if err != nil { + return nil, fmt.Errorf("create active workers gauge: %w", err) + } + + writeErrors, err := meter.Int64Counter( + "blitz.generator.write.errors", + metric.WithDescription("Total number of write errors"), + ) + if err != nil { + return nil, fmt.Errorf("create write errors counter: %w", err) + } + + resourceCombos := cartesianProduct(resourceAttrs) + + return &Generator{ + logger: logger.Named("generator-metrics"), + workers: workers, + rate: rate, + resourceCombos: resourceCombos, + metrics: metricDefs, + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + stopCh: make(chan struct{}), + meter: meter, + metricsGenerated: metricsGenerated, + activeWorkers: activeWorkers, + writeErrors: writeErrors, + }, nil +} + +// SupportedTelemetry returns the telemetry types this generator supports. +func (g *Generator) SupportedTelemetry() []telemetry.Type { + return []telemetry.Type{telemetry.Metrics} +} + +// Start starts the metrics generator. +func (g *Generator) Start(writer output.Writer) error { + g.logger.Info("Starting metrics generator", + zap.Int("workers", g.workers), + zap.Duration("rate", g.rate), + zap.Int("metric_definitions", len(g.metrics)), + zap.Int("resource_combos", len(g.resourceCombos)), + ) + + g.activeWorkers.Record(context.Background(), int64(g.workers), + metric.WithAttributeSet(attribute.NewSet(attribute.String("component", componentName))), + ) + + for i := 0; i < g.workers; i++ { + g.wg.Add(1) + go g.worker(i, writer) + } + return nil +} + +// Stop stops the generator. +func (g *Generator) Stop(ctx context.Context) error { + g.logger.Info("Stopping metrics generator") + + g.activeWorkers.Record(ctx, 0, + metric.WithAttributeSet(attribute.NewSet(attribute.String("component", componentName))), + ) + + close(g.stopCh) + + done := make(chan struct{}) + go func() { + g.wg.Wait() + close(done) + }() + + select { + case <-done: + g.logger.Info("All workers stopped gracefully") + return nil + case <-ctx.Done(): + return fmt.Errorf("stop cancelled due to context cancellation: %w", ctx.Err()) + } +} + +func (g *Generator) worker(workerID int, writer output.Writer) { + defer g.wg.Done() + g.logger.Debug("Starting worker", zap.Int("worker_id", workerID)) + + backoffConfig := backoff.NewExponentialBackOff() + backoffConfig.InitialInterval = g.rate + backoffConfig.MaxInterval = 5 * time.Second + backoffConfig.MaxElapsedTime = 0 + + backoffTicker := backoff.NewTicker(backoffConfig) + defer backoffTicker.Stop() + + for { + select { + case <-g.stopCh: + g.logger.Debug("Worker stopping", zap.Int("worker_id", workerID)) + return + case <-backoffTicker.C: + if err := g.generateAndWrite(writer); err != nil { + g.logger.Error("Failed to write metric", zap.Int("worker_id", workerID), zap.Error(err)) + continue + } + backoffConfig.Reset() + } + } +} + +// generateAndWrite emits one data point for every combination of +// resource attributes × metric definition. +func (g *Generator) generateAndWrite(writer output.Writer) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + now := time.Now() + + for _, resAttrs := range g.resourceCombos { + for i := range g.metrics { + def := &g.metrics[i] + value := def.ValueMin + g.rng.Float64()*(def.ValueMax-def.ValueMin) + + record := output.MetricRecord{ + Name: def.Name, + Description: def.Description, + Unit: def.Unit, + Type: def.Type, + DoubleValue: &value, + Attributes: def.Attributes, + ResourceAttributes: resAttrs, + Timestamp: now, + } + + if err := writer.WriteMetric(ctx, record); err != nil { + errorType := "unknown" + if ctx.Err() == context.DeadlineExceeded { + errorType = "timeout" + } + g.recordWriteError(errorType) + return err + } + + g.metricsGenerated.Add(context.Background(), 1, + metric.WithAttributeSet(attribute.NewSet(attribute.String("component", componentName))), + ) + } + } + + return nil +} + +func (g *Generator) recordWriteError(errorType string) { + g.writeErrors.Add(context.Background(), 1, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("component", componentName), + attribute.String("error_type", errorType), + )), + ) +} + +// cartesianProduct computes the cartesian product of a map of keys to +// value lists. Each returned map has exactly one value per key. If the +// input is nil or empty, a single empty map is returned so callers +// always iterate at least once. +func cartesianProduct(attrs map[string][]string) []map[string]string { + if len(attrs) == 0 { + return []map[string]string{{}} + } + + keys := make([]string, 0, len(attrs)) + for k := range attrs { + keys = append(keys, k) + } + + results := []map[string]string{{}} + for _, key := range keys { + vals := attrs[key] + var next []map[string]string + for _, existing := range results { + for _, v := range vals { + combo := make(map[string]string, len(existing)+1) + for ek, ev := range existing { + combo[ek] = ev + } + combo[key] = v + next = append(next, combo) + } + } + results = next + } + return results +} diff --git a/generator/metrics/metrics_test.go b/generator/metrics/metrics_test.go new file mode 100644 index 0000000..b4f5af3 --- /dev/null +++ b/generator/metrics/metrics_test.go @@ -0,0 +1,234 @@ +package metrics + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/observiq/blitz/output" + "github.com/observiq/blitz/telemetry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +type mockWriter struct { + mu sync.Mutex + metrics []output.MetricRecord +} + +func (m *mockWriter) Write(_ context.Context, _ output.LogRecord) error { + return output.ErrUnsupportedTelemetryType +} + +func (m *mockWriter) WriteMetric(_ context.Context, data output.MetricRecord) error { + m.mu.Lock() + defer m.mu.Unlock() + m.metrics = append(m.metrics, data) + return nil +} + +func (m *mockWriter) getMetrics() []output.MetricRecord { + m.mu.Lock() + defer m.mu.Unlock() + return append([]output.MetricRecord(nil), m.metrics...) +} + +func sampleDefs() []MetricDefinition { + return []MetricDefinition{ + { + Name: "system.cpu.utilization", + Type: output.MetricTypeGauge, + Description: "CPU utilization", + Unit: "1", + Attributes: map[string]string{"host.name": "test-host"}, + ValueMin: 0, + ValueMax: 100, + }, + } +} + +func TestNew(t *testing.T) { + logger := zaptest.NewLogger(t) + + t.Run("success", func(t *testing.T) { + g, err := New(logger, 1, time.Second, nil, sampleDefs()) + require.NoError(t, err) + assert.NotNil(t, g) + }) + + t.Run("nil logger", func(t *testing.T) { + _, err := New(nil, 1, time.Second, nil, sampleDefs()) + require.Error(t, err) + }) + + t.Run("zero workers", func(t *testing.T) { + _, err := New(logger, 0, time.Second, nil, sampleDefs()) + require.Error(t, err) + }) + + t.Run("no metric definitions", func(t *testing.T) { + _, err := New(logger, 1, time.Second, nil, nil) + require.Error(t, err) + }) +} + +func TestSupportedTelemetry(t *testing.T) { + logger := zaptest.NewLogger(t) + g, err := New(logger, 1, time.Second, nil, sampleDefs()) + require.NoError(t, err) + assert.Equal(t, []telemetry.Type{telemetry.Metrics}, g.SupportedTelemetry()) +} + +func TestStartStop(t *testing.T) { + logger := zaptest.NewLogger(t) + writer := &mockWriter{} + + g, err := New(logger, 2, 50*time.Millisecond, nil, sampleDefs()) + require.NoError(t, err) + + require.NoError(t, g.Start(writer)) + + time.Sleep(300 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, g.Stop(ctx)) + + metrics := writer.getMetrics() + assert.Greater(t, len(metrics), 0, "Expected at least one metric") + + m := metrics[0] + assert.Equal(t, "system.cpu.utilization", m.Name) + assert.Equal(t, output.MetricTypeGauge, m.Type) + assert.Equal(t, "CPU utilization", m.Description) + assert.Equal(t, "1", m.Unit) + assert.NotNil(t, m.DoubleValue) + assert.GreaterOrEqual(t, *m.DoubleValue, 0.0) + assert.LessOrEqual(t, *m.DoubleValue, 100.0) + assert.False(t, m.Timestamp.IsZero()) +} + +func TestResourceAttributeCombinations(t *testing.T) { + logger := zaptest.NewLogger(t) + writer := &mockWriter{} + + resAttrs := map[string][]string{ + "service.name": {"svc-a", "svc-b"}, + } + + defs := []MetricDefinition{ + { + Name: "cpu", + Type: output.MetricTypeGauge, + ValueMin: 0, + ValueMax: 1, + }, + } + + g, err := New(logger, 1, 50*time.Millisecond, resAttrs, defs) + require.NoError(t, err) + + require.NoError(t, g.Start(writer)) + time.Sleep(200 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, g.Stop(ctx)) + + metrics := writer.getMetrics() + // Each tick produces 2 data points (one per resource attr value). + resValues := make(map[string]bool) + for _, m := range metrics { + if v, ok := m.ResourceAttributes["service.name"]; ok { + resValues[v] = true + } + } + assert.True(t, resValues["svc-a"], "Expected svc-a") + assert.True(t, resValues["svc-b"], "Expected svc-b") +} + +func TestMultipleMetricDefinitions(t *testing.T) { + logger := zaptest.NewLogger(t) + writer := &mockWriter{} + + defs := []MetricDefinition{ + { + Name: "system.cpu.utilization", + Type: output.MetricTypeGauge, + ValueMin: 0, + ValueMax: 100, + }, + { + Name: "system.memory.usage", + Type: output.MetricTypeGauge, + Unit: "By", + ValueMin: 1000, + ValueMax: 8000, + }, + { + Name: "http.server.request.count", + Type: output.MetricTypeSum, + ValueMin: 1, + ValueMax: 50, + }, + } + + g, err := New(logger, 1, 50*time.Millisecond, nil, defs) + require.NoError(t, err) + + require.NoError(t, g.Start(writer)) + time.Sleep(200 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, g.Stop(ctx)) + + metrics := writer.getMetrics() + assert.GreaterOrEqual(t, len(metrics), 3) + + names := make(map[string]bool) + for _, m := range metrics { + names[m.Name] = true + } + assert.True(t, names["system.cpu.utilization"]) + assert.True(t, names["system.memory.usage"]) + assert.True(t, names["http.server.request.count"]) +} + +func TestCartesianProduct(t *testing.T) { + t.Run("nil input", func(t *testing.T) { + result := cartesianProduct(nil) + assert.Len(t, result, 1) + assert.Empty(t, result[0]) + }) + + t.Run("single key single value", func(t *testing.T) { + result := cartesianProduct(map[string][]string{"a": {"1"}}) + assert.Len(t, result, 1) + assert.Equal(t, "1", result[0]["a"]) + }) + + t.Run("single key multiple values", func(t *testing.T) { + result := cartesianProduct(map[string][]string{"a": {"1", "2", "3"}}) + assert.Len(t, result, 3) + }) + + t.Run("two keys", func(t *testing.T) { + result := cartesianProduct(map[string][]string{ + "a": {"1", "2"}, + "b": {"x", "y"}, + }) + assert.Len(t, result, 4) + + combos := make(map[string]bool) + for _, m := range result { + combos[m["a"]+"-"+m["b"]] = true + } + assert.True(t, combos["1-x"]) + assert.True(t, combos["1-y"]) + assert.True(t, combos["2-x"]) + assert.True(t, combos["2-y"]) + }) +} diff --git a/go.mod b/go.mod index fe596b7..11c0a64 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ tool ( require ( github.com/cenkalti/backoff/v4 v4.3.0 + github.com/go-viper/mapstructure/v2 v2.4.0 github.com/goccy/go-json v0.10.5 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/prometheus/client_golang v1.23.2 @@ -48,7 +49,6 @@ require ( github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/s2a-go v0.1.9 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/internal/config/generator.go b/internal/config/generator.go index c37390c..d8d94d4 100644 --- a/internal/config/generator.go +++ b/internal/config/generator.go @@ -32,6 +32,8 @@ const ( GeneratorTypeFile GeneratorType = "filegen" // GeneratorTypeOkta represents Okta System Log generator GeneratorTypeOkta GeneratorType = "okta" + // GeneratorTypeMetrics represents the metrics generator + GeneratorTypeMetrics GeneratorType = "metrics" ) // Generator contains configuration for log generators @@ -60,6 +62,8 @@ type Generator struct { Filegen FileGeneratorConfig `yaml:"filegen,omitempty" mapstructure:"filegen,omitempty"` // Okta contains Okta System Log generator configuration Okta OktaGeneratorConfig `yaml:"okta,omitempty" mapstructure:"okta,omitempty"` + // Metrics contains metrics generator configuration + Metrics MetricsGeneratorConfig `yaml:"metrics,omitempty" mapstructure:"metrics,omitempty"` } // Validate validates the generator configuration @@ -116,8 +120,12 @@ func (g *Generator) Validate() error { if err := g.Okta.Validate(); err != nil { return fmt.Errorf("okta generator validation failed: %w", err) } + case GeneratorTypeMetrics: + if err := g.Metrics.Validate(); err != nil { + return fmt.Errorf("metrics generator validation failed: %w", err) + } default: - return fmt.Errorf("invalid generator type: %s, must be one of: nop, json, winevt, palo-alto, apache-common, apache-combined, apache-error, nginx, postgres, kubernetes, filegen, okta", g.Type) + return fmt.Errorf("invalid generator type: %s, must be one of: nop, json, winevt, palo-alto, apache-common, apache-combined, apache-error, nginx, postgres, kubernetes, filegen, okta, metrics", g.Type) } return nil diff --git a/internal/config/generator_metrics.go b/internal/config/generator_metrics.go new file mode 100644 index 0000000..22485c5 --- /dev/null +++ b/internal/config/generator_metrics.go @@ -0,0 +1,77 @@ +package config + +import ( + "fmt" + "time" +) + +// MetricDefinition describes a single metric to generate. +type MetricDefinition struct { + // Name is the metric name (e.g. "system.cpu.utilization"). + Name string `yaml:"name" mapstructure:"name"` + // Type is the metric type: "gauge" or "sum". + Type string `yaml:"type" mapstructure:"type"` + // Description is an optional human-readable description. + Description string `yaml:"description,omitempty" mapstructure:"description,omitempty"` + // Unit is the metric unit (e.g. "s", "By", "1", "%"). + Unit string `yaml:"unit,omitempty" mapstructure:"unit,omitempty"` + // Attributes are key-value pairs attached to every data point. + Attributes map[string]string `yaml:"attributes,omitempty" mapstructure:"attributes,omitempty"` + // ValueMin is the minimum value for generated data points (inclusive). + ValueMin float64 `yaml:"valueMin,omitempty" mapstructure:"valueMin,omitempty"` + // ValueMax is the maximum value for generated data points (inclusive). + ValueMax float64 `yaml:"valueMax,omitempty" mapstructure:"valueMax,omitempty"` +} + +// Validate validates a single metric definition. +func (m *MetricDefinition) Validate() error { + if m.Name == "" { + return fmt.Errorf("metric name is required") + } + switch m.Type { + case "gauge", "sum": + default: + return fmt.Errorf("metric %q: type must be \"gauge\" or \"sum\", got %q", m.Name, m.Type) + } + if m.ValueMax < m.ValueMin { + return fmt.Errorf("metric %q: valueMax (%g) must be >= valueMin (%g)", m.Name, m.ValueMax, m.ValueMin) + } + return nil +} + +// MetricsGeneratorConfig contains configuration for the metrics generator. +type MetricsGeneratorConfig struct { + // Workers is the number of worker goroutines. + Workers int `yaml:"workers,omitempty" mapstructure:"workers,omitempty"` + // Rate is the generation interval per worker. + Rate time.Duration `yaml:"rate,omitempty" mapstructure:"rate,omitempty"` + // ResourceAttributes maps keys to one-or-more values. The generator + // emits data points for each combination of resource attribute values. + ResourceAttributes map[string][]string `yaml:"resourceAttributes,omitempty" mapstructure:"resourceAttributes,omitempty"` + // Metrics is the list of metric definitions to generate. + Metrics []MetricDefinition `yaml:"metrics,omitempty" mapstructure:"metrics,omitempty"` +} + +// Validate validates the metrics generator configuration. +func (c *MetricsGeneratorConfig) Validate() error { + if c.Workers < 1 { + return fmt.Errorf("metrics generator workers must be 1 or greater, got %d", c.Workers) + } + if c.Rate <= 0 { + return fmt.Errorf("metrics generator rate must be positive, got %v", c.Rate) + } + if len(c.Metrics) == 0 { + return fmt.Errorf("metrics generator requires at least one metric definition") + } + for k, vals := range c.ResourceAttributes { + if len(vals) == 0 { + return fmt.Errorf("resourceAttribute %q must have at least one value", k) + } + } + for i := range c.Metrics { + if err := c.Metrics[i].Validate(); err != nil { + return fmt.Errorf("metrics[%d]: %w", i, err) + } + } + return nil +} diff --git a/internal/config/generator_metrics_test.go b/internal/config/generator_metrics_test.go new file mode 100644 index 0000000..1373e41 --- /dev/null +++ b/internal/config/generator_metrics_test.go @@ -0,0 +1,163 @@ +package config + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestMetricDefinition_Validate(t *testing.T) { + tests := []struct { + name string + def MetricDefinition + wantErr bool + }{ + { + name: "valid gauge", + def: MetricDefinition{ + Name: "system.cpu.utilization", + Type: "gauge", + ValueMin: 0, + ValueMax: 100, + }, + }, + { + name: "valid sum", + def: MetricDefinition{ + Name: "http.requests", + Type: "sum", + ValueMin: 1, + ValueMax: 50, + }, + }, + { + name: "empty name", + def: MetricDefinition{Type: "gauge"}, + wantErr: true, + }, + { + name: "invalid type", + def: MetricDefinition{ + Name: "m", + Type: "histogram", + }, + wantErr: true, + }, + { + name: "valueMax less than valueMin", + def: MetricDefinition{ + Name: "m", + Type: "gauge", + ValueMin: 100, + ValueMax: 0, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.def.Validate() + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMetricsGeneratorConfig_Validate(t *testing.T) { + validMetric := MetricDefinition{ + Name: "cpu", + Type: "gauge", + ValueMin: 0, + ValueMax: 1, + } + + tests := []struct { + name string + cfg MetricsGeneratorConfig + wantErr bool + }{ + { + name: "valid", + cfg: MetricsGeneratorConfig{ + Workers: 1, + Rate: time.Second, + Metrics: []MetricDefinition{validMetric}, + }, + }, + { + name: "valid with resource attributes", + cfg: MetricsGeneratorConfig{ + Workers: 1, + Rate: time.Second, + ResourceAttributes: map[string][]string{ + "service.name": {"svc-a", "svc-b"}, + }, + Metrics: []MetricDefinition{validMetric}, + }, + }, + { + name: "zero workers", + cfg: MetricsGeneratorConfig{ + Workers: 0, + Rate: time.Second, + Metrics: []MetricDefinition{validMetric}, + }, + wantErr: true, + }, + { + name: "zero rate", + cfg: MetricsGeneratorConfig{ + Workers: 1, + Rate: 0, + Metrics: []MetricDefinition{validMetric}, + }, + wantErr: true, + }, + { + name: "no metrics", + cfg: MetricsGeneratorConfig{ + Workers: 1, + Rate: time.Second, + Metrics: nil, + }, + wantErr: true, + }, + { + name: "invalid metric definition", + cfg: MetricsGeneratorConfig{ + Workers: 1, + Rate: time.Second, + Metrics: []MetricDefinition{{Name: "", Type: "gauge"}}, + }, + wantErr: true, + }, + { + name: "empty resource attribute values", + cfg: MetricsGeneratorConfig{ + Workers: 1, + Rate: time.Second, + ResourceAttributes: map[string][]string{ + "service.name": {}, + }, + Metrics: []MetricDefinition{validMetric}, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/internal/config/override.go b/internal/config/override.go index a0ce529..d194134 100644 --- a/internal/config/override.go +++ b/internal/config/override.go @@ -247,7 +247,7 @@ func DefaultOverrides() []*Override { NewOverride("logging.file.rotation.compress", "logging file rotation: compress rotated files", true), NewOverride("logging.file.rotation.localTime", "logging file rotation: use local time for backup timestamps", false), NewOverride("metrics.port", "HTTP port for the metrics endpoint", DefaultMetricsPort), - NewOverride("generator.type", "generator type. One of: nop|json|winevt|palo-alto|apache-common|apache-combined|apache-error|nginx|postgres|kubernetes|filegen|okta", GeneratorTypeNop), + NewOverride("generator.type", "generator type. One of: nop|json|winevt|palo-alto|apache-common|apache-combined|apache-error|nginx|postgres|kubernetes|filegen|okta|metrics", GeneratorTypeNop), NewOverride("generator.json.workers", "number of JSON generator workers", 1), NewOverride("generator.json.rate", "rate at which logs are generated per worker", 1*time.Second), NewOverride("generator.json.type", "type of log to generate. One of: default|pii", logtypes.LogTypeDefault), @@ -275,6 +275,8 @@ func DefaultOverrides() []*Override { NewOverride("generator.filegen.cache-ttl", "file cache time-to-live (0 = never expire)", time.Duration(0)), NewOverride("generator.okta.workers", "number of Okta generator workers", 1), NewOverride("generator.okta.rate", "rate at which Okta logs are generated per worker", 1*time.Second), + NewOverride("generator.metrics.workers", "number of metrics generator workers", 1), + NewOverride("generator.metrics.rate", "rate at which metrics are generated per worker", 60*time.Second), NewOverride("output.type", "output type. One of: nop|stdout|tcp|udp|syslog|otlp-grpc|file", OutputTypeNop), NewOverride("output.udp.host", "UDP output target host", ""), NewOverride("output.udp.port", "UDP output target port", 0), diff --git a/internal/config/override_test.go b/internal/config/override_test.go index 1779551..4b78fff 100644 --- a/internal/config/override_test.go +++ b/internal/config/override_test.go @@ -51,6 +51,8 @@ func getTestOverrideFlagsArgs() []string { "--generator-filegen-cache-ttl", "0", "--generator-okta-workers", "22", "--generator-okta-rate", "40ms", + "--generator-metrics-workers", "1", + "--generator-metrics-rate", "60s", "--output-type", "otlp-grpc", "--output-udp-host", "udp.example.com", "--output-udp-port", "1514", @@ -145,6 +147,8 @@ func getTestOverrideEnvs() map[string]string { "BLITZ_GENERATOR_FILEGEN_CACHE_TTL": "0", "BLITZ_GENERATOR_OKTA_WORKERS": "23", "BLITZ_GENERATOR_OKTA_RATE": "35ms", + "BLITZ_GENERATOR_METRICS_WORKERS": "1", + "BLITZ_GENERATOR_METRICS_RATE": "60s", "BLITZ_OUTPUT_TYPE": "file", "BLITZ_OUTPUT_UDP_HOST": "udp.env.example", "BLITZ_OUTPUT_UDP_PORT": "5514", @@ -281,6 +285,10 @@ func TestOverrideDefaults(t *testing.T) { Workers: 1, Rate: 1 * time.Second, }, + Metrics: MetricsGeneratorConfig{ + Workers: 1, + Rate: 60 * time.Second, + }, }, Output: Output{ Type: OutputTypeNop, @@ -438,6 +446,10 @@ func TestOverrideFlags(t *testing.T) { Workers: 22, Rate: 40 * time.Millisecond, }, + Metrics: MetricsGeneratorConfig{ + Workers: 1, + Rate: 60 * time.Second, + }, }, Output: Output{ Type: OutputTypeOTLPGrpc, @@ -598,6 +610,10 @@ func TestOverrideEnvs(t *testing.T) { Workers: 23, Rate: 35 * time.Millisecond, }, + Metrics: MetricsGeneratorConfig{ + Workers: 1, + Rate: 60 * time.Second, + }, }, Output: Output{ Type: OutputTypeFile, diff --git a/output/otlp_grpc/otlp_grpc.go b/output/otlp_grpc/otlp_grpc.go index b714ee5..432aa35 100644 --- a/output/otlp_grpc/otlp_grpc.go +++ b/output/otlp_grpc/otlp_grpc.go @@ -163,6 +163,9 @@ type OTLPGrpc struct { otlpRequestLatency metric.Float64Histogram otlpSendErrors metric.Int64Counter + // Resource attributes from metric records, cached on first WriteMetric call. + cachedResourceAttrs []*commonpb.KeyValue + // Configuration batchTimeout time.Duration maxQueueSize int @@ -449,6 +452,19 @@ func (o *OTLPGrpc) Write(ctx context.Context, data output.LogRecord) error { // WriteMetric sends metric data to the OTLP gRPC output channel for processing by metric workers. // WriteMetric shall not be called after Stop is called. func (o *OTLPGrpc) WriteMetric(ctx context.Context, data output.MetricRecord) error { + // Cache resource attributes from the first record that carries them. + // All records in a pipeline share the same resource, so this is safe. + if o.cachedResourceAttrs == nil && len(data.ResourceAttributes) > 0 { + attrs := make([]*commonpb.KeyValue, 0, len(data.ResourceAttributes)) + for k, v := range data.ResourceAttributes { + attrs = append(attrs, &commonpb.KeyValue{ + Key: k, + Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: v}}, + }) + } + o.cachedResourceAttrs = attrs + } + m := o.buildOTLPMetric(data) select { @@ -951,7 +967,9 @@ func (o *OTLPGrpc) flushMetricBatch(client collectormetrics.MetricsServiceClient return o.sendMetricBatch(client, batch) } -// buildOTLPMetricsRequest builds an OTLP ExportMetricsServiceRequest +// buildOTLPMetricsRequest builds an OTLP ExportMetricsServiceRequest. +// Resource attributes are taken from the batch. If no metrics carry resource +// attributes, a default service.name=blitz is used. func (o *OTLPGrpc) buildOTLPMetricsRequest(metrics []*metricspb.Metric) *collectormetrics.ExportMetricsServiceRequest { scopeMetrics := &metricspb.ScopeMetrics{ Metrics: make([]*metricspb.Metric, 0, len(metrics)), @@ -963,20 +981,26 @@ func (o *OTLPGrpc) buildOTLPMetricsRequest(metrics []*metricspb.Metric) *collect scopeMetrics.Metrics = append(scopeMetrics.Metrics, m) } + // Use cached resource attributes if available, otherwise default. + resourceAttrs := o.cachedResourceAttrs + if len(resourceAttrs) == 0 { + resourceAttrs = []*commonpb.KeyValue{ + { + Key: "service.name", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_StringValue{ + StringValue: "blitz", + }, + }, + }, + } + } + return &collectormetrics.ExportMetricsServiceRequest{ ResourceMetrics: []*metricspb.ResourceMetrics{ { Resource: &resourcepb.Resource{ - Attributes: []*commonpb.KeyValue{ - { - Key: "service.name", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_StringValue{ - StringValue: "blitz", - }, - }, - }, - }, + Attributes: resourceAttrs, }, ScopeMetrics: []*metricspb.ScopeMetrics{scopeMetrics}, }, diff --git a/output/output.go b/output/output.go index 411276e..2ca2e1c 100644 --- a/output/output.go +++ b/output/output.go @@ -64,6 +64,10 @@ type MetricRecord struct { // Attributes are key-value pairs associated with this data point. Attributes map[string]string + // ResourceAttributes are key-value pairs associated with the resource + // that produced this metric (e.g. "service.name", "host.name"). + ResourceAttributes map[string]string + // Timestamp is when the measurement was taken. Timestamp time.Time }