Skip to content

Commit a90fabd

Browse files
authored
Expose SLI metrics (#209)
1 parent b47a83e commit a90fabd

File tree

5 files changed

+267
-8
lines changed

5 files changed

+267
-8
lines changed

cmd/stackdriver-prometheus-sidecar/main.go

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ import (
5555
"github.com/prometheus/prometheus/promql"
5656
"go.opencensus.io/plugin/ocgrpc"
5757
"go.opencensus.io/plugin/ochttp"
58+
"go.opencensus.io/resource"
59+
"go.opencensus.io/stats"
5860
"go.opencensus.io/stats/view"
5961
"go.opencensus.io/tag"
6062
metric_pb "google.golang.org/genproto/googleapis/api/metric"
@@ -66,6 +68,14 @@ import (
6668
var (
6769
sizeDistribution = view.Distribution(0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 33554432)
6870
latencyDistribution = view.Distribution(0, 1, 2, 5, 10, 15, 25, 50, 100, 200, 400, 800, 1500, 3000, 6000)
71+
72+
// VersionTag identifies the version of this binary.
73+
VersionTag = tag.MustNewKey("version")
74+
// UptimeMeasure is a cumulative metric.
75+
UptimeMeasure = stats.Int64(
76+
"agent.googleapis.com/agent/uptime",
77+
"uptime of the Stackdriver Prometheus collector",
78+
stats.UnitSeconds)
6979
)
7080

7181
func init() {
@@ -108,6 +118,15 @@ func init() {
108118
); err != nil {
109119
panic(err)
110120
}
121+
if err := view.Register(
122+
&view.View{
123+
Measure: UptimeMeasure,
124+
TagKeys: []tag.Key{VersionTag},
125+
Aggregation: view.Sum(),
126+
},
127+
); err != nil {
128+
panic(err)
129+
}
111130
}
112131

113132
type kubernetesConfig struct {
@@ -276,6 +295,21 @@ func main() {
276295
level.Info(logger).Log("host_details", Uname())
277296
level.Info(logger).Log("fd_limits", FdLimits())
278297

298+
// We instantiate a context here since the tailer is used by two other components.
299+
// The context will be used in the lifecycle of prometheusReader further down.
300+
ctx, cancel := context.WithCancel(context.Background())
301+
302+
go func() {
303+
uptimeUpdateTime := time.Now()
304+
c := time.Tick(60 * time.Second)
305+
for now := range c {
306+
stats.RecordWithTags(ctx,
307+
[]tag.Mutator{tag.Upsert(VersionTag, fmt.Sprintf("stackdriver-prometheus-sidecar/%s", version.Version))},
308+
UptimeMeasure.M(int64(now.Sub(uptimeUpdateTime).Seconds())))
309+
uptimeUpdateTime = now
310+
}
311+
}()
312+
279313
httpClient := &http.Client{Transport: &ochttp.Transport{}}
280314

281315
if *projectId == "" {
@@ -294,14 +328,30 @@ func main() {
294328
}
295329
view.RegisterExporter(promExporter)
296330
case "stackdriver":
297-
sd, err := oc_stackdriver.NewExporter(oc_stackdriver.Options{ProjectID: *projectId})
331+
const reportingInterval = 60 * time.Second
332+
sd, err := oc_stackdriver.NewExporter(oc_stackdriver.Options{
333+
ProjectID: *projectId,
334+
// If the OpenCensus resource environment variables aren't set, the monitored resource will likely fall back to `generic_task`.
335+
ResourceDetector: resource.FromEnv,
336+
ReportingInterval: reportingInterval,
337+
// Disable default `opencensus_task` label.
338+
DefaultMonitoringLabels: &oc_stackdriver.Labels{},
339+
GetMetricType: func(v *view.View) string {
340+
// Curated metrics produced by this process.
341+
if strings.Contains(v.Name, "agent.googleapis.com") {
342+
return v.Name
343+
}
344+
// Default OpenCensus behavior.
345+
return path.Join("custom.googleapis.com", "opencensus", v.Name)
346+
},
347+
})
298348
if err != nil {
299349
level.Error(logger).Log("msg", "Creating Stackdriver exporter failed", "err", err)
300350
os.Exit(1)
301351
}
302352
defer sd.Flush()
303353
view.RegisterExporter(sd)
304-
view.SetReportingPeriod(60 * time.Second)
354+
view.SetReportingPeriod(reportingInterval)
305355
default:
306356
level.Error(logger).Log("msg", "Unknown monitoring backend", "backend", backend)
307357
os.Exit(1)
@@ -356,10 +406,6 @@ func main() {
356406
}
357407
metadataCache := metadata.NewCache(httpClient, metadataURL, cfg.StaticMetadata)
358408

359-
// We instantiate a context here since the tailer is used by two other components.
360-
// The context will be used in the lifecycle of prometheusReader further down.
361-
ctx, cancel := context.WithCancel(context.Background())
362-
363409
tailer, err := tail.Tail(ctx, cfg.WALDirectory)
364410
if err != nil {
365411
level.Error(logger).Log("msg", "Tailing WAL failed", "err", err)

opencensus/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package opencensus for the Stackdriver Prometheus collector.
2+
package opencensus

opencensus/test_exporter.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package opencensus
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sort"
7+
"strings"
8+
"time"
9+
10+
"go.opencensus.io/metric/metricdata"
11+
"go.opencensus.io/metric/metricexport"
12+
"go.opencensus.io/stats/view"
13+
)
14+
15+
// TestExporter keeps exported metric data in memory to aid in testing the instrumentation.
16+
//
17+
// Metrics can be retrieved with `GetPoint()`. In order to deterministically retrieve the most recent values, you must first invoke `ReadAndExport()`.
18+
type TestExporter struct {
19+
// points is a map from a label signature to the latest value for the time series represented by the signature.
20+
// Use function `labelSignature` to get a signature from a `metricdata.Metric`.
21+
points map[string]metricdata.Point
22+
metricReader *metricexport.Reader
23+
}
24+
25+
// NewTestExporter returns a new exporter.
26+
func NewTestExporter(metricReader *metricexport.Reader) *TestExporter {
27+
return &TestExporter{points: make(map[string]metricdata.Point), metricReader: metricReader}
28+
}
29+
30+
// ExportMetrics records the view data.
31+
func (e *TestExporter) ExportMetrics(ctx context.Context, data []*metricdata.Metric) error {
32+
for _, metric := range data {
33+
for _, ts := range metric.TimeSeries {
34+
signature := labelSignature(metric.Descriptor.Name, labelObjectsToKeyValue(metric.Descriptor.LabelKeys, ts.LabelValues))
35+
e.points[signature] = ts.Points[len(ts.Points)-1]
36+
}
37+
}
38+
return nil
39+
}
40+
41+
// GetPoint returns the latest point for the time series identified by the given labels.
42+
func (e *TestExporter) GetPoint(metricName string, labels map[string]string) (metricdata.Point, bool) {
43+
v, ok := e.points[labelSignature(metricName, labelMapToKeyValue(labels))]
44+
return v, ok
45+
}
46+
47+
// ReadAndExport reads the current values for all metrics and makes them available to this exporter.
48+
func (e *TestExporter) ReadAndExport() {
49+
// The next line forces the view worker to process all stats.Record* calls that
50+
// happened within Store() before the call to ReadAndExport below. This abuses the
51+
// worker implementation to work around lack of synchronization.
52+
// TODO(jkohen,rghetia): figure out a clean way to make this deterministic.
53+
view.SetReportingPeriod(time.Minute)
54+
e.metricReader.ReadAndExport(e)
55+
}
56+
57+
func (e *TestExporter) String() string {
58+
return fmt.Sprintf("points{%v}", e.points)
59+
}
60+
61+
type keyValue struct {
62+
Key string
63+
Value string
64+
}
65+
66+
func sortKeyValue(kv []keyValue) {
67+
sort.Slice(kv, func(i, j int) bool { return kv[i].Key < kv[j].Key })
68+
}
69+
70+
func labelMapToKeyValue(labels map[string]string) []keyValue {
71+
kv := make([]keyValue, 0, len(labels))
72+
for k, v := range labels {
73+
kv = append(kv, keyValue{Key: k, Value: v})
74+
}
75+
sortKeyValue(kv)
76+
return kv
77+
}
78+
79+
func labelObjectsToKeyValue(keys []metricdata.LabelKey, values []metricdata.LabelValue) []keyValue {
80+
kv := make([]keyValue, 0, len(values))
81+
for i := range keys {
82+
if values[i].Present {
83+
kv = append(kv, keyValue{Key: keys[i].Key, Value: values[i].Value})
84+
}
85+
}
86+
sortKeyValue(kv)
87+
return kv
88+
}
89+
90+
// labelSignature returns a string that uniquely identifies the list of labels given in the input.
91+
func labelSignature(metricName string, kv []keyValue) string {
92+
var builder strings.Builder
93+
for _, x := range kv {
94+
builder.WriteString(x.Key)
95+
builder.WriteString(x.Value)
96+
}
97+
return fmt.Sprintf("%s{%s}", metricName, builder.String())
98+
}

stackdriver/client.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"time"
2525

2626
"go.opencensus.io/plugin/ocgrpc"
27+
"go.opencensus.io/stats"
28+
"go.opencensus.io/stats/view"
29+
"go.opencensus.io/tag"
2730
monitoring "google.golang.org/genproto/googleapis/monitoring/v3"
2831
"google.golang.org/grpc"
2932
"google.golang.org/grpc/balancer/roundrobin"
@@ -43,6 +46,27 @@ const (
4346
MonitoringWriteScope = "https://www.googleapis.com/auth/monitoring.write"
4447
)
4548

49+
var (
50+
// StatusTag is the google3 canonical status code: google3/google/rpc/code.proto
51+
StatusTag = tag.MustNewKey("status")
52+
53+
// PointCount is a metric.
54+
PointCount = stats.Int64("agent.googleapis.com/agent/monitoring/point_count",
55+
"count of metric points written to Stackdriver", stats.UnitDimensionless)
56+
)
57+
58+
func init() {
59+
if err := view.Register(
60+
&view.View{
61+
Measure: PointCount,
62+
TagKeys: []tag.Key{StatusTag},
63+
Aggregation: view.Sum(),
64+
},
65+
); err != nil {
66+
panic(err)
67+
}
68+
}
69+
4670
// Client allows reading and writing from/to a remote gRPC endpoint. The
4771
// implementation may hit a single backend, so the application should create a
4872
// number of these clients.
@@ -169,7 +193,12 @@ func (c *Client) Store(req *monitoring.CreateTimeSeriesRequest) error {
169193
TimeSeries: req.TimeSeries[begin:end],
170194
}
171195
_, err := service.CreateTimeSeries(ctx, req_copy)
172-
if err != nil {
196+
if err == nil {
197+
// The response is empty if all points were successfully written.
198+
stats.RecordWithTags(ctx,
199+
[]tag.Mutator{tag.Upsert(StatusTag, "0")},
200+
PointCount.M(int64(end-begin)))
201+
} else {
173202
level.Debug(c.logger).Log(
174203
"msg", "Partial failure calling CreateTimeSeries",
175204
"err", err)
@@ -179,6 +208,19 @@ func (c *Client) Store(req *monitoring.CreateTimeSeriesRequest) error {
179208
errors <- err
180209
return
181210
}
211+
for _, details := range status.Details() {
212+
if summary, ok := details.(*monitoring.CreateTimeSeriesSummary); ok {
213+
level.Debug(c.logger).Log("summary", summary)
214+
stats.RecordWithTags(ctx,
215+
[]tag.Mutator{tag.Upsert(StatusTag, "0")},
216+
PointCount.M(int64(summary.SuccessPointCount)))
217+
for _, e := range summary.Errors {
218+
stats.RecordWithTags(ctx,
219+
[]tag.Mutator{tag.Upsert(StatusTag, fmt.Sprint(uint32(e.Status.Code)))},
220+
PointCount.M(int64(e.PointCount)))
221+
}
222+
}
223+
}
182224
switch status.Code() {
183225
// codes.DeadlineExceeded:
184226
// It is safe to retry

0 commit comments

Comments
 (0)