Skip to content

Commit 78908b9

Browse files
author
Michal Tichák
committed
[core] Adding http metrics endpoint
1 parent de4d866 commit 78908b9

File tree

6 files changed

+421
-2
lines changed

6 files changed

+421
-2
lines changed

common/ecsmetrics/metrics.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package ecsmetrics
2+
3+
import (
4+
"fmt"
5+
internalmetrics "runtime/metrics"
6+
"time"
7+
8+
"github.com/AliceO2Group/Control/common/monitoring"
9+
)
10+
11+
var endRequestChannel chan struct{}
12+
13+
func gather() monitoring.Metric {
14+
samples := []internalmetrics.Sample{
15+
{Name: "/gc/heap/allocs:bytes"},
16+
{Name: "/gc/heap/frees:bytes"},
17+
{Name: "/memory/classes/heap/free:bytes"},
18+
{Name: "/memory/classes/heap/objects:bytes"},
19+
{Name: "/sched/goroutines:goroutines"},
20+
}
21+
22+
// Collect metrics data
23+
internalmetrics.Read(samples)
24+
25+
timestamp := time.Now()
26+
metric := monitoring.Metric{Name: "golangmetrics", Timestamp: timestamp.UnixMilli()}
27+
metric.AddTag("subsystem", "ecs")
28+
for _, sample := range samples {
29+
switch sample.Value.Kind() {
30+
case internalmetrics.KindUint64:
31+
metric.AddValue(sample.Name, sample.Value.Uint64())
32+
case internalmetrics.KindFloat64:
33+
metric.AddValue(sample.Name, sample.Value.Float64())
34+
case internalmetrics.KindFloat64Histogram:
35+
fmt.Printf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
36+
continue
37+
38+
default:
39+
fmt.Printf("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
40+
continue
41+
}
42+
}
43+
return metric
44+
}
45+
46+
func StartGolangMetrics(period time.Duration) {
47+
go func() {
48+
for {
49+
select {
50+
case <-endRequestChannel:
51+
endRequestChannel <- struct{}{}
52+
return
53+
default:
54+
monitoring.Send(gather())
55+
time.Sleep(period)
56+
}
57+
}
58+
}()
59+
}
60+
61+
func StopGolangMetrics() {
62+
endRequestChannel <- struct{}{}
63+
<-endRequestChannel
64+
}

common/monitoring/metric.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package monitoring
2+
3+
type (
4+
TagsType map[string]any
5+
ValuesType map[string]any
6+
)
7+
8+
type Metric struct {
9+
Name string `json:"name"`
10+
Values ValuesType `json:"values"`
11+
Tags TagsType `json:"tags,omitempty"`
12+
Timestamp int64 `json:"timestamp"`
13+
}
14+
15+
func (metric *Metric) AddTag(tagName string, value any) {
16+
if metric.Tags == nil {
17+
metric.Tags = make(TagsType)
18+
}
19+
metric.Tags[tagName] = value
20+
}
21+
22+
func (metric *Metric) AddValue(valueName string, value any) {
23+
if metric.Values == nil {
24+
metric.Values = make(ValuesType)
25+
}
26+
metric.Values[valueName] = value
27+
}

common/monitoring/monitoring.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package monitoring
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"time"
9+
)
10+
11+
var (
12+
server *http.Server
13+
metrics []Metric
14+
// channel that is used to request end of metrics server, it sends notification when server ended.
15+
// It needs to be read!!!
16+
endChannel chan struct{}
17+
18+
// channel used to send metrics into the event loop
19+
metricsChannel chan Metric
20+
21+
// channel for sending notifications to event loop that new http Request to report metrics arrived
22+
metricsRequestChannel chan struct{}
23+
24+
// channel used to send metrics to be reported by http request from event loop
25+
metricsToRequest chan []Metric
26+
)
27+
28+
func initChannels(messageBufferSize int) {
29+
endChannel = make(chan struct{})
30+
metricsRequestChannel = make(chan struct{})
31+
metricsChannel = make(chan Metric, messageBufferSize)
32+
metricsToRequest = make(chan []Metric)
33+
}
34+
35+
func closeChannels() {
36+
close(endChannel)
37+
close(metricsRequestChannel)
38+
close(metricsChannel)
39+
close(metricsToRequest)
40+
}
41+
42+
func eventLoop() {
43+
for {
44+
select {
45+
case <-metricsRequestChannel:
46+
shallowCopyMetrics := metrics
47+
metrics = make([]Metric, 0)
48+
metricsToRequest <- shallowCopyMetrics
49+
50+
case metric := <-metricsChannel:
51+
metrics = append(metrics, metric)
52+
53+
case <-endChannel:
54+
endChannel <- struct{}{}
55+
return
56+
}
57+
}
58+
}
59+
60+
func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
61+
w.Header().Set("Content-Type", "application/json")
62+
metricsRequestChannel <- struct{}{}
63+
metricsToConvert := <-metricsToRequest
64+
if metricsToConvert == nil {
65+
metricsToConvert = make([]Metric, 0)
66+
}
67+
json.NewEncoder(w).Encode(metricsToConvert)
68+
}
69+
70+
func Send(metric Metric) {
71+
metricsChannel <- metric
72+
}
73+
74+
func handleFunc(endpointName string) {
75+
// recover is here to correctly allow multiple Starts and Stops of server
76+
defer func() {
77+
recover()
78+
}()
79+
80+
http.HandleFunc(endpointName, exportMetricsAndReset)
81+
}
82+
83+
// \param url In format if url:port to be used together with
84+
// \param endpoint
85+
func Start(port uint16, endpointName string, messageBufferSize int) error {
86+
if server != nil {
87+
return nil
88+
}
89+
90+
initChannels(messageBufferSize)
91+
92+
go eventLoop()
93+
94+
server := &http.Server{Addr: fmt.Sprintf(":%d", port)}
95+
handleFunc(endpointName)
96+
return server.ListenAndServe()
97+
}
98+
99+
func Stop() {
100+
if server == nil {
101+
return
102+
}
103+
104+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
105+
defer cancel()
106+
server.Shutdown(ctx)
107+
108+
endChannel <- struct{}{}
109+
<-endChannel
110+
server = nil
111+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package monitoring
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"net/http"
7+
"testing"
8+
"time"
9+
)
10+
11+
func TestSimpleStartStop(t *testing.T) {
12+
go Start(1234, "/random", 100)
13+
time.Sleep(time.Millisecond * 100)
14+
Stop()
15+
}
16+
17+
func TestStartMultipleStop(t *testing.T) {
18+
go Start(1234, "/random", 100)
19+
time.Sleep(time.Millisecond * 100)
20+
Stop()
21+
Stop()
22+
}
23+
24+
func cleaningUpAfterTest() {
25+
endChannel <- struct{}{}
26+
<-endChannel
27+
closeChannels()
28+
metrics = make([]Metric, 0)
29+
}
30+
31+
func initTest() {
32+
// we need message channel to block so we don't end to quickly
33+
initChannels(0)
34+
go eventLoop()
35+
}
36+
37+
// decorator function that properly inits and cleans after higher level test of Monitoring package
38+
func testFunction(t *testing.T, testToRun func(*testing.T)) {
39+
initTest()
40+
testToRun(t)
41+
cleaningUpAfterTest()
42+
}
43+
44+
func TestSendingSingleMetric(t *testing.T) {
45+
testFunction(t, func(t *testing.T) {
46+
metric := Metric{Name: "test"}
47+
Send(metric)
48+
if len(metrics) != 1 {
49+
t.Error("wrong number of metrics, should be 1")
50+
}
51+
52+
if metrics[0].Name != "test" {
53+
t.Errorf("Got wrong name %s in stored metric", metrics[0].Name)
54+
}
55+
})
56+
}
57+
58+
func TestExportingMetrics(t *testing.T) {
59+
testFunction(t, func(t *testing.T) {
60+
metric := Metric{Name: "test"}
61+
Send(metric)
62+
63+
metricsRequestChannel <- struct{}{}
64+
metrics := <-metricsToRequest
65+
66+
if len(metrics) != 1 {
67+
t.Errorf("Got wrong amount of metrics %d, expected 1", len(metrics))
68+
}
69+
70+
if metrics[0].Name != "test" {
71+
t.Errorf("Got wrong name of metric %s, expected test", metrics[0].Name)
72+
}
73+
})
74+
}
75+
76+
func TestHttpRun(t *testing.T) {
77+
go Start(12345, "/metrics", 10)
78+
defer Stop()
79+
80+
time.Sleep(time.Second)
81+
82+
metric := Metric{Name: "test"}
83+
metric.Timestamp = 10
84+
metric.AddTag("tag1", 42)
85+
metric.AddValue("value1", 11)
86+
Send(metric)
87+
88+
response, err := http.Get("http://localhost:12345/metrics")
89+
if err != nil {
90+
t.Fatalf("Failed to GET metrics at port 12345: %v", err)
91+
}
92+
decoder := json.NewDecoder(response.Body)
93+
var receivedMetrics []Metric
94+
if err = decoder.Decode(&receivedMetrics); err != nil {
95+
t.Fatalf("Failed to decoded Metric: %v", err)
96+
}
97+
98+
receivedMetric := receivedMetrics[0]
99+
100+
if receivedMetric.Name != "test" {
101+
t.Errorf("Got wrong name of metric %s, expected test", receivedMetric.Name)
102+
}
103+
104+
if receivedMetric.Timestamp != 10 {
105+
t.Errorf("Got wrong timestamp of metric %d, expected 10", receivedMetric.Timestamp)
106+
}
107+
108+
if len(receivedMetric.Tags) != 1 {
109+
t.Errorf("Got wrong number of tags %d, expected 1", len(receivedMetric.Tags))
110+
}
111+
112+
if receivedMetric.Tags["tag1"].(float64) != 42 {
113+
t.Error("Failed to retreive tags: tag1 with value 42")
114+
}
115+
116+
if len(receivedMetric.Values) != 1 {
117+
t.Errorf("Got wrong number of values %d, expected 1", len(receivedMetric.Values))
118+
}
119+
120+
if receivedMetric.Values["value1"].(float64) != 11 {
121+
t.Error("Failed to retreive tags: value1 with value 11")
122+
}
123+
}
124+
125+
// This benchmark cannot be run for too long as it will fill whole RAM even with
126+
// results:
127+
// goos: linux
128+
// goarch: amd64
129+
// pkg: github.com/AliceO2Group/Control/common/monitoring
130+
// cpu: 11th Gen Intel(R) Core(TM) i9-11900H @ 2.50GHz
131+
// BenchmarkSendingMetrics-16
132+
//
133+
// 123365481 192.6 ns/op
134+
// PASS
135+
// ok github.com/AliceO2Group/Control/common/monitoring 44.686s
136+
func BenchmarkSendingMetrics(b *testing.B) {
137+
Start(12345, "/metrics", 100)
138+
139+
// this goroutine keeps clearing results so RAM does not exhausted
140+
go func() {
141+
for {
142+
select {
143+
case <-endChannel:
144+
endChannel <- struct{}{}
145+
break
146+
default:
147+
if len(metrics) >= 10000000 {
148+
metricsRequestChannel <- struct{}{}
149+
<-metricsToRequest
150+
}
151+
}
152+
time.Sleep(100 * time.Millisecond)
153+
}
154+
}()
155+
156+
defer Stop()
157+
158+
metric := Metric{Name: "testname", Timestamp: 12345}
159+
metric.AddValue("value", 42)
160+
metric.AddTag("tag", 40)
161+
162+
for i := 0; i < b.N; i++ {
163+
Send(metric)
164+
}
165+
166+
fmt.Println("")
167+
}

core/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func setDefaults() error {
128128
viper.SetDefault("kafkaEndpoints", []string{"localhost:9092"})
129129
viper.SetDefault("enableKafka", true)
130130
viper.SetDefault("logAllIL", false)
131+
viper.SetDefault("metricsEndpoint", "8086/metrics")
131132
return nil
132133
}
133134

@@ -198,6 +199,7 @@ func setFlags() error {
198199
pflag.StringSlice("kafkaEndpoints", viper.GetStringSlice("kafkaEndpoints"), "List of Kafka endpoints to connect to (default: localhost:9092)")
199200
pflag.Bool("enableKafka", viper.GetBool("enableKafka"), "Turn on the kafka messaging")
200201
pflag.Bool("logAllIL", viper.GetBool("logAllIL"), "Send all the logs into IL, including Debug and Trace messages")
202+
pflag.String("metricsEndpoint", viper.GetString("metricsEndpoint"), "Http endpoint from which metrics can be scraped: [port/endpoint]")
201203

202204
pflag.Parse()
203205
return viper.BindPFlags(pflag.CommandLine)

0 commit comments

Comments
 (0)