From d6b04d2fcc0ddb194df9cfb45fae1f69a93abefb Mon Sep 17 00:00:00 2001 From: panjinjun <1619-panjinjun@users.noreply.git.sysop.bigo.sg> Date: Sun, 27 Jun 2021 12:47:57 +0800 Subject: [PATCH 1/3] add imporovement --- kafka_exporter.go | 247 ++++++++++++++++++++++++++++------------------ 1 file changed, 153 insertions(+), 94 deletions(-) diff --git a/kafka_exporter.go b/kafka_exporter.go index 6dc35a2c..6b6ed101 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -333,7 +333,8 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { func (e *Exporter) collectChans(quit chan struct{}) { original := make(chan prometheus.Metric) - container := make([]prometheus.Metric, 0, 100) + // Do not indicate the limit of capacity + container := make([]prometheus.Metric, 0) go func() { for metric := range original { container = append(container, metric) @@ -362,17 +363,18 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())), ) + topicPartitionsMap := make(map[string][]int32) offset := make(map[string]map[int32]int64) + groupOffset := make(map[int32]map[string]map[string]map[int32]int64) + // metadata refresh control now := time.Now() - if now.After(e.nextMetadataRefresh) { glog.V(DEBUG).Info("Refreshing client metadata") if err := e.client.RefreshMetadata(); err != nil { glog.Errorf("Cannot refresh topics, using cached data: %v", err) } - e.nextMetadataRefresh = now.Add(e.metadataRefreshInterval) } @@ -382,7 +384,17 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { return } - topicChannel := make(chan string) + getTopicPartitions := func(topic string) { + defer wg.Done() + partitions, err := e.client.Partitions(topic) + if err != nil { + plog.Errorf("Cannot get partitions of topic %s: %v", topic, err) + } else { + e.mu.Lock() + topicPartitionsMap[topic] = partitions + e.mu.Unlock() + } + } getTopicMetrics := func(topic string) { defer wg.Done() @@ -478,6 +490,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { glog.Errorf("Cannot get consumer group %v", err) } + for _, group := range ConsumerGroups { offset, _ := group.FetchOffset(topic, partition) if offset > 0 { @@ -488,56 +501,25 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ) } } - } - } - } - - loopTopics := func(id int) { - ok := true - for ok { - topic, open := <-topicChannel - ok = open - if open { - getTopicMetrics(topic) - } - } - } - - minx := func(x int, y int) int { - if x < y { - return x - } else { - return y + } } } - N := len(topics) - if N > 1 { - N = minx(N/2, e.topicWorkers) - } - - for w := 1; w <= N; w++ { - go loopTopics(w) - } - - for _, topic := range topics { - if e.topicFilter.MatchString(topic) { - wg.Add(1) - topicChannel <- topic - } - } - close(topicChannel) - - wg.Wait() - getConsumerGroupMetrics := func(broker *sarama.Broker) { defer wg.Done() + plog.Debugf("[%d] Fetching consumer group metrics", broker.ID()) + e.mu.Lock() + if _, ok := groupOffset[broker.ID()]; !ok { + groupOffset[broker.ID()] = make(map[string]map[string]map[int32]int64) + } + e.mu.Unlock() if err := broker.Open(e.client.Config()); err != nil && err != sarama.ErrAlreadyConnected { glog.Errorf("Cannot connect to broker %d: %v", broker.ID(), err) return } defer broker.Close() + plog.Debugf("[%d]> listing groups", broker.ID()) groups, err := broker.ListGroups(&sarama.ListGroupsRequest{}) if err != nil { glog.Errorf("Cannot get consumer group: %v", err) @@ -550,12 +532,18 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } } + plog.Debugf("[%d]> describing groups", broker.ID()) describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds}) if err != nil { glog.Errorf("Cannot get describe groups: %v", err) return } for _, group := range describeGroups.Groups { + e.mu.Lock() + if _, ok := groupOffset[broker.ID()][group.GroupId]; !ok { + groupOffset[broker.ID()][group.GroupId] = make(map[string]map[int32]int64) + } + e.mu.Unlock() offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1} if e.offsetShowAll { for topic, partitions := range offset { @@ -580,78 +568,149 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId, ) - offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest) - if err != nil { - glog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err) - continue - } - for topic, partitions := range offsetFetchResponse.Blocks { - // If the topic is not consumed by that consumer group, skip it - topicConsumed := false - for _, offsetFetchResponseBlock := range partitions { - // Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group - if offsetFetchResponseBlock.Offset != -1 { - topicConsumed = true - break + start := time.Now() + plog.Debugf("[%d][%s]> fetching group offsets", broker.ID(), group.GroupId) + if offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest); err != nil { + plog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err) + } else { + plog.Debugf("[%d][%s] done fetching group offset in %s", broker.ID(), group.GroupId, time.Since(start).String()) + for topic, partitions := range offsetFetchResponse.Blocks { + // Topic filter + if !e.topicFilter.MatchString(topic) { + continue + } + // If the topic is not consumed by that consumer group, skip it + topicConsumed := false + for _, offsetFetchResponseBlock := range partitions { + // Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group + if offsetFetchResponseBlock.Offset != -1 { + topicConsumed = true + break + } + } + if topicConsumed { + e.mu.Lock() + if _, ok := groupOffset[broker.ID()][group.GroupId][topic]; !ok { + groupOffset[broker.ID()][group.GroupId][topic] = make(map[int32]int64) + } + e.mu.Unlock() + for partition, offsetFetchResponseBlock := range partitions { + err := offsetFetchResponseBlock.Err + if err != sarama.ErrNoError { + plog.Errorf("Error for partition %d :%v\n", partition, err.Error()) + continue + } + e.mu.Lock() + groupOffset[broker.ID()][group.GroupId][topic][partition] = offsetFetchResponseBlock.Offset + e.mu.Unlock() + } } } - if !topicConsumed { - continue - } + } + } + } - var currentOffsetSum int64 + minx := func(x int, y int) int { + if x < y { + return x + } else { + return y + } + } + + // Firstly get topic-partitions information + plog.Info("Fetching topic-partitions information") + for _, topic := range topics { + wg.Add(1) + go getTopicPartitions(topic) + } + wg.Wait() + + // Secondly getConsumerGroupMetrics + glog.V(DEBUG).Info("Fetching consumer group metrics") + if len(e.client.Brokers()) > 0 { + for _, broker := range e.client.Brokers() { + wg.Add(1) + go getConsumerGroupMetrics(broker) + } + wg.Wait() + } else { + glog.Errorln("No valid broker, cannot get consumer group metrics") + } + + // And then getTopicMetrics + topicChannel := make(chan string) + + loopTopics := func(id int) { + ok := true + for ok { + topic, open := <-topicChannel + ok = open + if open { + plog.Infof("Collecting metrics [%d] for topic %s", id, topic) + getTopicMetrics(topic) + } + } + } + + // concurrency control + N := minx(len(topics)/2, e.topicWorkers) + for w := 1; w <= N; w++ { + go loopTopics(w) + } + + plog.Info("Fetching topic metrics") + for _, topic := range topics { + if e.topicFilter.MatchString(topic) { + wg.Add(1) + topicChannel <- topic + } + } + close(topicChannel) + wg.Wait() + + // calculating consume group lag + calculateConsumeGroupMetrics := func(groupOffsetMap map[string]map[string]map[int32]int64) { + defer wg.Done() + for group, topicPartitionOffset := range groupOffsetMap { + for topic, partitionOffsetMap := range topicPartitionOffset { + var groupCurrentOffsetSum int64 var lagSum int64 - for partition, offsetFetchResponseBlock := range partitions { - err := offsetFetchResponseBlock.Err - if err != sarama.ErrNoError { - glog.Errorf("Error for partition %d :%v", partition, err.Error()) - continue - } - currentOffset := offsetFetchResponseBlock.Offset - currentOffsetSum += currentOffset - ch <- prometheus.MustNewConstMetric( - consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), - ) - e.mu.Lock() - if offset, ok := offset[topic][partition]; ok { - // If the topic is consumed by that consumer group, but no offset associated with the partition - // forcing lag to -1 to be able to alert on that - var lag int64 - if offsetFetchResponseBlock.Offset == -1 { - lag = -1 - } else { - lag = offset - offsetFetchResponseBlock.Offset - lagSum += lag - } + for partition, gOffset := range partitionOffsetMap { + cOffset, ok := offset[topic][partition] + if ok { + groupCurrentOffsetSum += gOffset + lag := cOffset - gOffset + lagSum += lag + ch <- prometheus.MustNewConstMetric( + consumergroupCurrentOffset, prometheus.GaugeValue, float64(gOffset), group, topic, strconv.FormatInt(int64(partition), 10), + ) ch <- prometheus.MustNewConstMetric( - consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), + consumergroupLag, prometheus.GaugeValue, float64(lag), group, topic, strconv.FormatInt(int64(partition), 10), ) - } else { - glog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition) } - e.mu.Unlock() } + ch <- prometheus.MustNewConstMetric( - consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic, + consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(groupCurrentOffsetSum), group, topic, ) ch <- prometheus.MustNewConstMetric( - consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic, + consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group, topic, ) } } } - glog.V(DEBUG).Info("Fetching consumer group metrics") - if len(e.client.Brokers()) > 0 { - for _, broker := range e.client.Brokers() { + if len(groupOffset) > 0 { + plog.Info("Calculating consume group lag") + for _, v := range groupOffset { wg.Add(1) - go getConsumerGroupMetrics(broker) + go calculateConsumeGroupMetrics(v) } wg.Wait() - } else { - glog.Errorln("No valid broker, cannot get consumer group metrics") } + } func init() { From a0706e9fee33e5ba3e3aa092a9a5c0d9bed74cbe Mon Sep 17 00:00:00 2001 From: panjinjun <1619-panjinjun@users.noreply.git.sysop.bigo.sg> Date: Tue, 29 Jun 2021 10:52:33 +0800 Subject: [PATCH 2/3] add groupOffset explanation --- kafka_exporter.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka_exporter.go b/kafka_exporter.go index 6b6ed101..d73f2e2f 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -365,6 +365,9 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { topicPartitionsMap := make(map[string][]int32) offset := make(map[string]map[int32]int64) + // groupOffset is used for recording consumergroup offset by groupid, topic, partition on different brokers + // structure: groupOffset[broker.ID()][group.GroupId][topic][partition] = offsetFetchResponseBlock.Offset + // eg: groupOffset[1]["test_group"]["test_topic"][0] = 1 groupOffset := make(map[int32]map[string]map[string]map[int32]int64) // metadata refresh control From a444ec0916f6d703b67d0861d11949b8117727ff Mon Sep 17 00:00:00 2001 From: Josselin Mariette Date: Mon, 31 Jan 2022 18:04:43 +0100 Subject: [PATCH 3/3] Fix some modifications due to rebase --- kafka_exporter.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/kafka_exporter.go b/kafka_exporter.go index d73f2e2f..888ed7ff 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -333,8 +333,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { func (e *Exporter) collectChans(quit chan struct{}) { original := make(chan prometheus.Metric) - // Do not indicate the limit of capacity - container := make([]prometheus.Metric, 0) + container := make([]prometheus.Metric, 0, 100) go func() { for metric := range original { container = append(container, metric) @@ -391,7 +390,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { defer wg.Done() partitions, err := e.client.Partitions(topic) if err != nil { - plog.Errorf("Cannot get partitions of topic %s: %v", topic, err) + glog.Errorf("Cannot get partitions of topic %s: %v", topic, err) } else { e.mu.Lock() topicPartitionsMap[topic] = partitions @@ -504,13 +503,13 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ) } } - } + } } } getConsumerGroupMetrics := func(broker *sarama.Broker) { defer wg.Done() - plog.Debugf("[%d] Fetching consumer group metrics", broker.ID()) + glog.V(DEBUG).Infof("[%d] Fetching consumer group metrics", broker.ID()) e.mu.Lock() if _, ok := groupOffset[broker.ID()]; !ok { groupOffset[broker.ID()] = make(map[string]map[string]map[int32]int64) @@ -522,7 +521,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } defer broker.Close() - plog.Debugf("[%d]> listing groups", broker.ID()) + glog.V(DEBUG).Infof("[%d]> listing groups", broker.ID()) groups, err := broker.ListGroups(&sarama.ListGroupsRequest{}) if err != nil { glog.Errorf("Cannot get consumer group: %v", err) @@ -535,7 +534,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } } - plog.Debugf("[%d]> describing groups", broker.ID()) + glog.V(DEBUG).Infof("[%d]> describing groups", broker.ID()) describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds}) if err != nil { glog.Errorf("Cannot get describe groups: %v", err) @@ -573,11 +572,11 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ) start := time.Now() - plog.Debugf("[%d][%s]> fetching group offsets", broker.ID(), group.GroupId) + glog.V(DEBUG).Infof("[%d][%s]> fetching group offsets", broker.ID(), group.GroupId) if offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest); err != nil { - plog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err) + glog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err) } else { - plog.Debugf("[%d][%s] done fetching group offset in %s", broker.ID(), group.GroupId, time.Since(start).String()) + glog.V(DEBUG).Infof("[%d][%s] done fetching group offset in %s", broker.ID(), group.GroupId, time.Since(start).String()) for topic, partitions := range offsetFetchResponse.Blocks { // Topic filter if !e.topicFilter.MatchString(topic) { @@ -601,7 +600,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { for partition, offsetFetchResponseBlock := range partitions { err := offsetFetchResponseBlock.Err if err != sarama.ErrNoError { - plog.Errorf("Error for partition %d :%v\n", partition, err.Error()) + glog.Errorf("Error for partition %d :%v\n", partition, err.Error()) continue } e.mu.Lock() @@ -623,7 +622,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } // Firstly get topic-partitions information - plog.Info("Fetching topic-partitions information") + glog.V(DEBUG).Infof("Fetching topic-partitions information") for _, topic := range topics { wg.Add(1) go getTopicPartitions(topic) @@ -651,7 +650,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { topic, open := <-topicChannel ok = open if open { - plog.Infof("Collecting metrics [%d] for topic %s", id, topic) + glog.V(DEBUG).Infof("Collecting metrics [%d] for topic %s", id, topic) getTopicMetrics(topic) } } @@ -663,7 +662,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { go loopTopics(w) } - plog.Info("Fetching topic metrics") + glog.V(DEBUG).Infoln("Fetching topic metrics") for _, topic := range topics { if e.topicFilter.MatchString(topic) { wg.Add(1) @@ -706,7 +705,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } if len(groupOffset) > 0 { - plog.Info("Calculating consume group lag") + glog.V(DEBUG).Infoln("Calculating consume group lag") for _, v := range groupOffset { wg.Add(1) go calculateConsumeGroupMetrics(v)