diff --git a/executor-plugins/resource-tagger/go.mod b/executor-plugins/resource-tagger/go.mod index 3d21382..c25f3b7 100644 --- a/executor-plugins/resource-tagger/go.mod +++ b/executor-plugins/resource-tagger/go.mod @@ -7,9 +7,9 @@ require ( github.com/kube-arbiter/arbiter v0.1.1-0.20221019145918-1199780f119f github.com/pseudomuto/protoc-gen-doc v1.5.1 google.golang.org/grpc v1.47.0 - k8s.io/apimachinery v0.24.2 + k8s.io/apimachinery v0.25.3 k8s.io/client-go v0.24.2 - k8s.io/klog/v2 v2.60.1 + k8s.io/klog/v2 v2.70.1 ) require ( @@ -17,8 +17,9 @@ require ( github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect - github.com/go-logr/logr v1.2.0 // indirect + github.com/go-logr/logr v1.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/uuid v1.3.0 // indirect @@ -44,8 +45,8 @@ require ( google.golang.org/protobuf v1.28.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect - sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect + k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect + sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/executor-plugins/resource-tagger/go.sum b/executor-plugins/resource-tagger/go.sum index 0d44565..e86cb29 100644 --- a/executor-plugins/resource-tagger/go.sum +++ b/executor-plugins/resource-tagger/go.sum @@ -79,8 +79,9 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= -github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= +github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= +github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -102,8 +103,9 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= -github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -669,28 +671,33 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.24.2 h1:g518dPU/L7VRLxWfcadQn2OnsiGWVOadTLpdnqgY2OI= k8s.io/api v0.24.2/go.mod h1:AHqbSkTm6YrQ0ObxjO3Pmp/ubFF/KuM7jU+3khoBsOg= -k8s.io/apimachinery v0.24.2 h1:5QlH9SL2C8KMcrNJPor+LbXVTaZRReml7svPEh4OKDM= k8s.io/apimachinery v0.24.2/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2UM= +k8s.io/apimachinery v0.25.3 h1:7o9ium4uyUOM76t6aunP0nZuex7gDf8VGwkR5RcJnQc= +k8s.io/apimachinery v0.25.3/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo= k8s.io/client-go v0.24.2 h1:CoXFSf8if+bLEbinDqN9ePIDGzcLtqhfd6jpfnwGOFA= k8s.io/client-go v0.24.2/go.mod h1:zg4Xaoo+umDsfCWr4fCnmLEtQXyCNXCvJuSsglNcV30= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= -k8s.io/klog/v2 v2.60.1 h1:VW25q3bZx9uE3vvdL6M8ezOX79vA2Aq1nEWLqNQclHc= k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 h1:Gii5eqf+GmIEwGNKQYQClCayuJCe2/4fZUvF7VG99sU= +k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= +k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= +k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 h1:HNSDgDCrr/6Ly3WEGKZftiE7IY19Vz2GdbOCyI4qqhc= k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y= sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY= +sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= +sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= -sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y= sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= +sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= +sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/observer-plugins/metric-server/main.go b/observer-plugins/metric-server/main.go index d69a3af..ff5b167 100644 --- a/observer-plugins/metric-server/main.go +++ b/observer-plugins/metric-server/main.go @@ -20,6 +20,9 @@ import ( "flag" "log" "net" + "os" + "os/signal" + "syscall" //"github.com/smoky8/pkg/lib/go/obi" "google.golang.org/grpc" @@ -37,6 +40,9 @@ var ( endpoint = flag.String("endpoint", "/var/run/observer.sock", "unix socket domain for current server") kubeconfig = flag.String("kubeconfig", "", "kubernetes auth config file") ) +var ( + shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} +) func main() { klog.InitFlags(flag.CommandLine) @@ -61,8 +67,11 @@ func main() { if err != nil { log.Fatalf("%s create metric client error: %s", server.PluginName, err) } - metricServer := grpc.NewServer() + // Setup signal watcher to handle cleanup + SetupSignalHandler(*endpoint) + + metricServer := grpc.NewServer() obi.RegisterServerServer(metricServer, server.NewServer(clientSet)) listen, err := net.Listen("unix", *endpoint) if err != nil { @@ -72,3 +81,25 @@ func main() { klog.Fatalln(metricServer.Serve(listen)) } + +// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned +// which is closed on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +func SetupSignalHandler(socketFile string) { + c := make(chan os.Signal) + signal.Notify(c, shutdownSignals...) + go func() { + for s := range c { + switch s { + case os.Interrupt, syscall.SIGTERM: + klog.Infoln("Shutting down normally...") + if err := os.RemoveAll(socketFile); err != nil { + klog.Fatal(err) + } + os.Exit(1) + default: + klog.Infoln("Got signal", s) + } + } + }() +} diff --git a/observer-plugins/prometheus/main.go b/observer-plugins/prometheus/main.go index 7b0859f..28bea7f 100644 --- a/observer-plugins/prometheus/main.go +++ b/observer-plugins/prometheus/main.go @@ -20,6 +20,9 @@ import ( "flag" "log" "net" + "os" + "os/signal" + "syscall" "google.golang.org/grpc" "k8s.io/client-go/kubernetes" @@ -38,6 +41,9 @@ var ( stepSeconds = flag.Int64("step", 60, "query steps") rangeMinute = flag.Int64("range", 2, "prometheus, the maximum time between two slices within the boundaries.") ) +var ( + shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} +) func main() { klog.InitFlags(flag.CommandLine) @@ -64,6 +70,8 @@ func main() { if err != nil { klog.Fatal(err) } + // Setup signal watcher to handle cleanup + SetupSignalHandler(*endpoint) server := grpc.NewServer() obi.RegisterServerServer(server, prometheus.NewPrometheusServer(*address, conf, *stepSeconds, *rangeMinute)) @@ -72,6 +80,28 @@ func main() { log.Fatal(err) } - klog.Infof("%s starting work...", prometheus.PluginName) + klog.Infof("%s plugin started ...", prometheus.PluginName) klog.Fatalln(server.Serve(listen)) } + +// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned +// which is closed on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +func SetupSignalHandler(socketFile string) { + c := make(chan os.Signal) + signal.Notify(c, shutdownSignals...) + go func() { + for s := range c { + switch s { + case os.Interrupt, syscall.SIGTERM: + klog.Infoln("Shutting down normally...") + if err := os.RemoveAll(socketFile); err != nil { + klog.Fatal(err) + } + os.Exit(1) + default: + klog.Infoln("Got signal", s) + } + } + }() +} diff --git a/observer-plugins/prometheus/prometheus/prometheus.go b/observer-plugins/prometheus/prometheus/prometheus.go index 7fc1cc7..4bbde36 100644 --- a/observer-plugins/prometheus/prometheus/prometheus.go +++ b/observer-plugins/prometheus/prometheus/prometheus.go @@ -17,6 +17,7 @@ limitations under the License. package prometheus import ( + "encoding/json" "fmt" "time" @@ -65,7 +66,7 @@ type CalculateAux struct { Value float64 } -func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string) (DataSeries, error) { +func (p *prometheusServer) Query(startTime, endTime time.Time, kind, query, op string) (DataSeries, error) { method := "prometheusServer.Query" ans := DataSeries{Timestamp: endTime.UnixMilli()} prometheusAPI, err := p.NewPrometheusAPI() @@ -86,18 +87,29 @@ func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string) klog.V(4).Infof("%s quer '%s' result with warnings %v\n", method, warnings) } - data, err := formatRawValues(result, op) - if err != nil { - return ans, err - } - - if f, ok := actionFuncs[op]; ok { - f(data, &ans) + // TODO: Use kind as the raw data query, may add a 'rawData: true' property for this? + if kind == "Pod" || kind == "Node" { + data, err := formatRawValues(result) + if err != nil { + return ans, err + } + if f, ok := actionFuncs[op]; ok { + f(data, &ans) + } + } else { + // Handle raw data if no aggregation defined, just return the json data + jsonValue, err := json.Marshal(result) + if err != nil { + klog.Errorf("failed to marshal result to json: %s", err) + ans.Value = fmt.Sprintf("failed to get json value: %s " + result.String()) + } else { + ans.Value = string(jsonValue) + } } return ans, nil } -func formatRawValues(rawValue model.Value, op string) ([]CalculateAux, error) { +func formatRawValues(rawValue model.Value) ([]CalculateAux, error) { ans := make([]CalculateAux, 0) switch rawValue.Type() { case model.ValScalar: diff --git a/observer-plugins/prometheus/prometheus/server.go b/observer-plugins/prometheus/prometheus/server.go index dad95b1..9d620f9 100644 --- a/observer-plugins/prometheus/prometheus/server.go +++ b/observer-plugins/prometheus/prometheus/server.go @@ -31,6 +31,7 @@ const ( MaxAction = "max" MinAction = "min" AvgAction = "avg" + NoneAction = "none" ) // impl obi interface @@ -77,24 +78,27 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe var err error klog.V(4).Infof("prometheus query: %s\n", req.Query) + var resourceName string + if len(req.ResourceNames) > 0 { + resourceName = req.ResourceNames[0] + } result := &obi.GetMetricsResponse{ - ResourceName: req.ResourceNames[0], + ResourceName: resourceName, Namespace: req.Namespace, Unit: req.Unit, Records: []*obi.GetMetricsResponseRecord{}, } + // use avgerage as the default aggregation action op := AvgAction if len(req.Aggregation) > 0 { op = req.Aggregation[0] } - klog.Infof("exec aggregation is: %s\n", op) - metricData, err := p.Query(startTime, endTime, req.Query, op) + metricData, err := p.Query(startTime, endTime, req.Kind, req.Query, op) if err != nil { klog.Errorf("%s query error: %s\n", method, err) return result, err } - // only return the latest record result.Records = append(result.Records, &obi.GetMetricsResponseRecord{Timestamp: metricData.Timestamp, Value: metricData.Value}) /* @@ -103,8 +107,8 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe } */ - klog.Infof("query by %s successfully", req.MetricName) - klog.V(5).Infof("%s query by %s result: %v\n", method, req.MetricName, metricData) + klog.Infof("query by metric '%s', query '%s' successfully", req.MetricName, req.Query) + klog.V(5).Infof("%s query by %s, %s result: %v\n", method, req.MetricName, req.Query, metricData) return result, nil }