Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 72 additions & 11 deletions google/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ type ExporterOpts struct {
// how it's applied dynamically in conjunction with Export.Match runtime option.
Matchers Matchers

// A list of metric matchers for debugging gRPC requests. Logs exact gRPC requests
// containing any time series matching these matchers.
DebugLogMatchers Matchers

// Prefix under which metrics are written to GCM.
MetricTypePrefix string

Expand Down Expand Up @@ -710,7 +714,7 @@ func (e *Exporter) Run() error {
opts := e.opts
e.mtx.RUnlock()

curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
curBatch := newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize, opts.DebugLogMatchers, opts.MetricTypePrefix)

// Send the currently accumulated batch to GCM asynchronously.
send := func() {
Expand All @@ -736,7 +740,7 @@ func (e *Exporter) Run() error {
stopTimer()
timer.Reset(batchDelayMax)

curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize, opts.DebugLogMatchers, opts.MetricTypePrefix)
}

for {
Expand Down Expand Up @@ -943,24 +947,28 @@ func (e *Exporter) withUntypedDefaultMetadata(f MetadataFunc) MetadataFunc {
// batch accumulates a batch of samples to be sent to GCM. Once the batch is full
// it must be sent and cannot be used anymore after that.
type batch struct {
logger log.Logger
maxSize uint
logger log.Logger
maxSize uint
debugLogMatchers Matchers
metricTypePrefix string

m map[string][]*monitoring_pb.TimeSeries
shards []*shard
oneFull bool
total int
}

func newBatch(logger log.Logger, shardsCount uint, maxSize uint) *batch {
func newBatch(logger log.Logger, shardsCount uint, maxSize uint, debugLogMatchers Matchers, metricTypePrefix string) *batch {
if logger == nil {
logger = log.NewNopLogger()
}
return &batch{
logger: logger,
maxSize: maxSize,
m: make(map[string][]*monitoring_pb.TimeSeries, 1),
shards: make([]*shard, 0, shardsCount/2),
logger: logger,
maxSize: maxSize,
debugLogMatchers: debugLogMatchers,
metricTypePrefix: metricTypePrefix,
m: make(map[string][]*monitoring_pb.TimeSeries, 1),
shards: make([]*shard, 0, shardsCount/2),
}
}

Expand Down Expand Up @@ -1002,6 +1010,55 @@ func (b *batch) empty() bool {
return b.total == 0
}

func extractMetricName(metricType, prefix string) string {
if prefix != "" && strings.HasPrefix(metricType, prefix+"/") {
metricType = metricType[len(prefix)+1:]
}
idx := strings.LastIndexByte(metricType, '/')
if idx < 0 {
return metricType
}
return metricType[:idx]
}

func timeSeriesToLabels(ts *monitoring_pb.TimeSeries, prefix string) labels.Labels {
if ts == nil {
return labels.EmptyLabels()
}
builder := labels.NewBuilder(labels.EmptyLabels())
if name := extractMetricName(ts.GetMetric().GetType(), prefix); name != "" {
builder.Set(labels.MetricName, name)
}
for k, v := range ts.GetResource().GetLabels() {
builder.Set(k, v)
}
for k, v := range ts.GetMetric().GetLabels() {
builder.Set(k, v)
}
return builder.Labels()
}

func logDebugGRPCRequest(logger log.Logger, matchers Matchers, prefix string, req *monitoring_pb.CreateTimeSeriesRequest) {
if len(matchers) == 0 || req == nil {
return
}
var matchedSeries []*monitoring_pb.TimeSeries
for _, ts := range req.TimeSeries {
lset := timeSeriesToLabels(ts, prefix)
if matchers.Matches(lset) {
matchedSeries = append(matchedSeries, ts)
}
}
if len(matchedSeries) > 0 {
debugReq := &monitoring_pb.CreateTimeSeriesRequest{
Name: req.Name,
TimeSeries: matchedSeries,
}
//nolint:errcheck
level.Debug(logger).Log("msg", "gRPC CreateTimeSeries request matching debug matchers", "matchers", matchers.String(), "series_count", len(matchedSeries), "req", debugReq)
}
}

// send the accumulated samples to their respective projects. It returns once all
// requests have completed and notifies the pending shards.
func (b *batch) send(
Expand All @@ -1028,10 +1085,14 @@ func (b *batch) send(

// We do not retry any requests due to the risk of producing a backlog
// that cannot be worked down, especially if large amounts of clients try to do so.
err := sendOne(sendCtx, &monitoring_pb.CreateTimeSeriesRequest{
req := &monitoring_pb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", pid),
TimeSeries: l,
})
}
if len(b.debugLogMatchers) > 0 {
logDebugGRPCRequest(b.logger, b.debugLogMatchers, b.metricTypePrefix, req)
}
err := sendOne(sendCtx, req)
if err != nil {
//nolint:errcheck
level.Error(b.logger).Log("msg", "send batch", "size", len(l), "err", err)
Expand Down
60 changes: 58 additions & 2 deletions google/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package export

import (
"bytes"
"context"
"fmt"
"os"
Expand All @@ -39,11 +40,12 @@ import (
"github.com/stretchr/testify/require"
monitoredres_pb "google.golang.org/genproto/googleapis/api/monitoredres"
timestamp_pb "google.golang.org/protobuf/types/known/timestamppb"
metric_pb "google.golang.org/genproto/googleapis/api/metric"
"k8s.io/apimachinery/pkg/util/wait"
)

func TestBatchAdd(t *testing.T) {
b := newBatch(nil, DefaultShardCount, 100)
b := newBatch(nil, DefaultShardCount, 100, nil, "")

if !b.empty() {
t.Fatalf("batch unexpectedly not empty")
Expand Down Expand Up @@ -100,7 +102,7 @@ func TestBatchFillFromShardsAndSend(t *testing.T) {
})
}

b := newBatch(nil, DefaultShardCount, 101)
b := newBatch(nil, DefaultShardCount, 101, nil, "")

for _, s := range shards {
s.fill(b)
Expand Down Expand Up @@ -876,3 +878,57 @@ func TestMatchers_Equals(t *testing.T) {
require.False(t, m.Equals(diff6))
require.False(t, diff6.Equals(m))
}

func TestDebugLogGRPCRequest(t *testing.T) {
var buf bytes.Buffer
logger := log.NewLogfmtLogger(&buf)

var matchers Matchers
require.NoError(t, matchers.Set(`{__name__="up"}`))

req := &monitoring_pb.CreateTimeSeriesRequest{
Name: "projects/test-proj",
TimeSeries: []*monitoring_pb.TimeSeries{
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "prometheus_target",
Labels: map[string]string{
"project_id": "test-proj",
},
},
Metric: &metric_pb.Metric{
Type: "prometheus.googleapis.com/up/gauge",
Labels: map[string]string{
"instance": "localhost:9090",
},
},
},
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "prometheus_target",
Labels: map[string]string{
"project_id": "test-proj",
},
},
Metric: &metric_pb.Metric{
Type: "prometheus.googleapis.com/other_metric/gauge",
Labels: map[string]string{
"instance": "localhost:9090",
},
},
},
},
}

// First test: no matchers set should not log.
logDebugGRPCRequest(logger, nil, "prometheus.googleapis.com", req)
require.Empty(t, buf.String())

// Second test: matchers set should log only the matching series.
logDebugGRPCRequest(logger, matchers, "prometheus.googleapis.com", req)
out := buf.String()
require.Contains(t, out, "gRPC CreateTimeSeries request matching debug matchers")
require.Contains(t, out, "series_count=1")
require.Contains(t, out, "prometheus.googleapis.com/up/gauge")
require.NotContains(t, out, "other_metric")
}
3 changes: 3 additions & 0 deletions google/export/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func ExporterOptsFlags(a *kingpin.Application, opts *export.ExporterOpts) {
a.Flag("export.match", `A Prometheus time series matcher. Matches all series if empty. Can be repeated. Every time series must match at least one of the matchers to be exported. This flag can be used equivalently to the match[] parameter of the Prometheus federation endpoint to selectively export data. External labels are excluded from matching. (Example: --export.match='{job="prometheus"}' --export.match='{__name__=~"job:.*"}). This flag can be overridden by Prometheus google_cloud.export runtime configuration.`).
Default("").Hidden().SetValue(&opts.Matchers)

a.Flag("export.debug.log-match", "A Prometheus time series matcher for debugging gRPC requests. Logs exact gRPC requests containing any time series matching these matchers. Can be repeated.").
Default("").SetValue(&opts.DebugLogMatchers)

a.Flag("export.debug.metric-prefix", "Google Cloud Monitoring metric prefix to use.").
Default(opts.MetricTypePrefix).
StringVar(&opts.MetricTypePrefix)
Expand Down
14 changes: 14 additions & 0 deletions google/export/setup/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,17 @@ func TestTryPopulateUnspecifiedFromMetadata(t *testing.T) {
t.Fatal("tryPopulateUnspecifiedFromMetadata took 30s to complete, it should timeout after 1s")
}
}

func TestExporterOptsFlags_DebugLogMatch(t *testing.T) {
fake := kingpin.New("test", "test")
opts := export.ExporterOpts{}
ExporterOptsFlags(fake, &opts)

if _, err := fake.Parse([]string{"--export.debug.log-match={__name__=\"up\",job=\"prometheus\"}", "--export.debug.log-match={env=\"prod\"}"}); err != nil {
t.Fatal(err)
}

if len(opts.DebugLogMatchers) != 2 {
t.Fatalf("expected 2 debug log matchers, got %d", len(opts.DebugLogMatchers))
}
}
Loading