From 4a00af644cd81a5524f932aae67dd3dd25812e97 Mon Sep 17 00:00:00 2001 From: wy770 Date: Mon, 12 Jan 2026 00:46:50 +0800 Subject: [PATCH 1/3] implement counter by key --- app/dubbo-admin/dubbo-admin.yaml | 264 +++++++++--------- pkg/console/counter/component.go | 122 +++++++- pkg/console/counter/counter.go | 124 ++++++-- pkg/console/counter/manager.go | 96 +++++-- pkg/console/handler/overview.go | 23 +- pkg/core/discovery/subscriber/instance.go | 1 + .../discovery/subscriber/nacos_service.go | 4 + 7 files changed, 449 insertions(+), 185 deletions(-) diff --git a/app/dubbo-admin/dubbo-admin.yaml b/app/dubbo-admin/dubbo-admin.yaml index b33dfee92..af49ea9ce 100644 --- a/app/dubbo-admin/dubbo-admin.yaml +++ b/app/dubbo-admin/dubbo-admin.yaml @@ -1,129 +1,135 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -mode: zone -console: - grafana: - prometheus: - jaeger: - dashboards: - metric: - application: - instance: - service: - trace: - application: - instance: - service: - log: - application: - instance: - service: - auth: - user: admin - password: dubbo@2025 - expirationTime: 3600 -store: -# type: memory - type: mysql - address: root:123456@tcp(127.0.0.1:23306)/dubbo-admin?charset=utf8mb4&parseTime=True&loc=Asia%2FShanghai -discovery: - - type: nacos2 - name: localhost-nacos - id: localhost-nacos - address: - registry: nacos://101.34.253.152:30848?username=nacos&password=nacos - configCenter: nacos://101.34.253.152:30848?username=nacos&password=nacos - metadataReport: nacos://101.34.253.152:30848?username=nacos&password=nacos - properties: - # [Nacos2] Adjust the interval(time unit is `s`) to adjust the frequency of list-all configs in nacos. - # Default period is 30s - configWatchPeriod: 20 - # [Nacos2] Adjust the interval(time unit is `s`) to adjust the frequency of list-all services in nacos. - # Default period is 30s - serviceWatchPeriod: 20 -# - type: zookeeper -# name: 本地zookeeper -# id: zk3.6 -# address: -# registry: zookeeper://127.0.0.1:2181 -# configCenter: zookeeper://127.0.0.1:2181 -# metadataReport: zookeeper://127.0.0.1:2181 - - # mock discovery is only for development -# - type: mock -# id: mock -# name: mockRegistry -engine: -# id: mock -# name: mock -# type: mock - id: default - name: k8s1.28.6 - type: kubernetes - properties: -# [Kubernetes] Path to kubernetes config file, if not set, will use in cluster config - kubeConfigPath: /root/.kube/config - # [Kubernetes] Watch pods with specified labels, if not set, will watch all pods - # podWatchSelector: org.apache.dubbo/dubbo-apps=true - # [Kubernetes] Identify which Dubbo app the pod belongs to, if not set, pod will not be seen as a dubbo instance at first - # 1. ByLabels: Use the label value corresponding to the labelKey as the dubbo app name - # e.g. - # type: ByLabel - # labelKey: org.apache.dubbo/dubbo-app-name - # 2. ByAnnotation: Use the annotation value corresponding to the annotationKey as the dubbo app name - # e.g. - # type: ByAnnotation - # annotationKey: org.apache.dubbo/dubbo-app-name -# dubboAppIdentifier: -# type: ByLabel -# labelKey: org.apache.dubbo/dubbo-app-name - # [Kubernetes] Identify the rpc port of a dubbo application, if not set, pod will not be seen as a dubbo instance at first - # 1. ByLabels: Use the label value corresponding to the labelKey as the dubbo rpc port - # e.g. - # type: ByLabel - # labelKey: org.apache.dubbo/dubbo-rpc-port - # 2. ByAnnotation: Use the annotation value corresponding to the annotationKey as the dubbo rpc port - # e.g. - # type: ByAnnotation - # annotationKey: org.apache.dubbo/dubbo-rpc-port - # [Kubernetes] Identify the registry(discovery) which the dubbo instance belongs to, if not set, pod will not be seen as a dubbo instance at first - # 1. ByLabels: Use the label value corresponding to the labelKey as the dubbo registry name - # e.g. - # type: ByLabel - # labelKey: org.apache.dubbo/registry - # 2. ByAnnotation: Use the annotation value corresponding to the annotationKey as the dubbo registry name - # e.g. - # type: ByAnnotation - # annotationKey: org.apache.dubbo/registry - # [Kubernetes] Strategy of choosing the main container, if not set, [type = ByIndex] and [index = 0] will be used - # 1. ByLast: choose the last container as the main container - # e.g. - # type: ByLast - # 2. ByIndex(default): choose the container at the specified index location as the main container - # e.g. - # type: ByIndex - # index: 0 - # 3. ByName: choose the container with the specified name - # e.g. - # type: ByName - # name: main - # 4. ByAnnotation: choose the container with the annotation key, specified annotation value will be used as the container name - # e.g. - # type: ByAnnotation - # annotationKey: org.apache.dubbo/main-container-name=${app-name} -# mainContainerChooseStrategy: -# type: ByIndex -# index: 0 +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mode: zone +console: + grafana: + prometheus: + jaeger: + dashboards: + metric: + application: + instance: + service: + trace: + application: + instance: + service: + log: + application: + instance: + service: + auth: + user: admin + password: dubbo@2025 + expirationTime: 3600 +store: + type: memory +# type: mysql +# address: root:12345678@tcp(127.0.0.1:3306)/dubbo-admin?charset=utf8mb4&parseTime=True&loc=Asia%2FShanghai +discovery: + - type: nacos2 + name: localhost-nacos-1 + id: localhost-nacos-1 + address: + registry: nacos://127.0.0.1:8848?username=nacos&password=nacos + configCenter: nacos://127.0.0.1:8848?username=nacos&password=nacos + metadataReport: nacos://127.0.0.1:8848?username=nacos&password=nacos + properties: + configWatchPeriod: 20 + serviceWatchPeriod: 20 + - type: nacos2 + name: localhost-nacos-2 + id: localhost-nacos-2 + address: + registry: nacos://127.0.0.1:8849?username=nacos&password=nacos + configCenter: nacos://127.0.0.1:8849?username=nacos&password=nacos + metadataReport: nacos://127.0.0.1:8849?username=nacos&password=nacos + properties: + configWatchPeriod: 20 + serviceWatchPeriod: 20 +# - type: zookeeper +# name: 本地zookeeper +# id: zk3.6 +# address: +# registry: zookeeper://127.0.0.1:2181 +# configCenter: zookeeper://127.0.0.1:2181 +# metadataReport: zookeeper://127.0.0.1:2181 + + # mock discovery is only for development +# - type: mock +# id: mock +# name: mockRegistry +engine: + id: mock + name: mock + type: mock +# id: default +# name: k8s1.28.6 +# type: kubernetes + properties: +# [Kubernetes] Path to kubernetes config file, if not set, will use in cluster config + kubeConfigPath: /root/.kube/config + # [Kubernetes] Watch pods with specified labels, if not set, will watch all pods + # podWatchSelector: org.apache.dubbo/dubbo-apps=true + # [Kubernetes] Identify which Dubbo app the pod belongs to, if not set, pod will not be seen as a dubbo instance at first + # 1. ByLabels: Use the label value corresponding to the labelKey as the dubbo app name + # e.g. + # type: ByLabel + # labelKey: org.apache.dubbo/dubbo-app-name + # 2. ByAnnotation: Use the annotation value corresponding to the annotationKey as the dubbo app name + # e.g. + # type: ByAnnotation + # annotationKey: org.apache.dubbo/dubbo-app-name +# dubboAppIdentifier: +# type: ByLabel +# labelKey: org.apache.dubbo/dubbo-app-name + # [Kubernetes] Identify the rpc port of a dubbo application, if not set, pod will not be seen as a dubbo instance at first + # 1. ByLabels: Use the label value corresponding to the labelKey as the dubbo rpc port + # e.g. + # type: ByLabel + # labelKey: org.apache.dubbo/dubbo-rpc-port + # 2. ByAnnotation: Use the annotation value corresponding to the annotationKey as the dubbo rpc port + # e.g. + # type: ByAnnotation + # annotationKey: org.apache.dubbo/dubbo-rpc-port + # [Kubernetes] Identify the registry(discovery) which the dubbo instance belongs to, if not set, pod will not be seen as a dubbo instance at first + # 1. ByLabels: Use the label value corresponding to the labelKey as the dubbo registry name + # e.g. + # type: ByLabel + # labelKey: org.apache.dubbo/registry + # 2. ByAnnotation: Use the annotation value corresponding to the annotationKey as the dubbo registry name + # e.g. + # type: ByAnnotation + # annotationKey: org.apache.dubbo/registry + # [Kubernetes] Strategy of choosing the main container, if not set, [type = ByIndex] and [index = 0] will be used + # 1. ByLast: choose the last container as the main container + # e.g. + # type: ByLast + # 2. ByIndex(default): choose the container at the specified index location as the main container + # e.g. + # type: ByIndex + # index: 0 + # 3. ByName: choose the container with the specified name + # e.g. + # type: ByName + # name: main + # 4. ByAnnotation: choose the container with the annotation key, specified annotation value will be used as the container name + # e.g. + # type: ByAnnotation + # annotationKey: org.apache.dubbo/main-container-name=${app-name} +# mainContainerChooseStrategy: +# type: ByIndex +# index: 0 diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go index 0217ee2f7..0d2344eb9 100644 --- a/pkg/console/counter/component.go +++ b/pkg/console/counter/component.go @@ -22,7 +22,10 @@ import ( "math" "github.com/apache/dubbo-admin/pkg/core/events" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model" "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" ) const ComponentType runtime.ComponentType = "counter manager" @@ -41,7 +44,7 @@ var _ ManagerComponent = &managerComponent{} func (c *managerComponent) RequiredDependencies() []runtime.ComponentType { return []runtime.ComponentType{ runtime.ResourceStore, - runtime.EventBus, // Counter depends on EventBus to subscribe to events + runtime.EventBus, } } @@ -64,6 +67,17 @@ func (c *managerComponent) Init(runtime.BuilderContext) error { } func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error { + storeComponent, err := rt.GetComponent(runtime.ResourceStore) + if err != nil { + return err + } + storeRouter, ok := storeComponent.(store.Router) + if !ok { + return fmt.Errorf("component %s does not implement store.Router", runtime.ResourceStore) + } + + c.initializeCountsFromStore(storeRouter) + component, err := rt.GetComponent(runtime.EventBus) if err != nil { return err @@ -75,6 +89,112 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error { return c.manager.Bind(bus) } +func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router) error { + if err := c.initializeResourceCount(storeRouter, meshresource.InstanceKind); err != nil { + return fmt.Errorf("failed to initialize instance count: %w", err) + } + + if err := c.initializeResourceCount(storeRouter, meshresource.ApplicationKind); err != nil { + return fmt.Errorf("failed to initialize application count: %w", err) + } + + if err := c.initializeResourceCount(storeRouter, meshresource.ServiceProviderMetadataKind); err != nil { + return fmt.Errorf("failed to initialize service provider metadata count: %w", err) + } + + return nil +} + +func (c *managerComponent) initializeResourceCount(storeRouter store.Router, kind resmodel.ResourceKind) error { + resourceStore, err := storeRouter.ResourceKindRoute(kind) + if err != nil { + return err + } + + allResources := resourceStore.List() + + meshCounts := make(map[string]int64) + meshDistributions := make(map[string]map[string]int64) + + for _, obj := range allResources { + resource, ok := obj.(resmodel.Resource) + if !ok { + continue + } + + mesh := resource.ResourceMesh() + if mesh == "" { + mesh = "default" + } + + meshCounts[mesh]++ + + if kind == meshresource.InstanceKind { + instance, ok := resource.(*meshresource.InstanceResource) + if ok && instance.Spec != nil { + protocol := instance.Spec.GetProtocol() + if protocol != "" { + if meshDistributions[mesh] == nil { + meshDistributions[mesh] = make(map[string]int64) + } + meshDistributions[mesh]["protocol:"+protocol]++ + } + + releaseVersion := instance.Spec.GetReleaseVersion() + if releaseVersion != "" { + if meshDistributions[mesh] == nil { + meshDistributions[mesh] = make(map[string]int64) + } + meshDistributions[mesh]["release:"+releaseVersion]++ + } + + if meshDistributions[mesh] == nil { + meshDistributions[mesh] = make(map[string]int64) + } + meshDistributions[mesh]["discovery:"+mesh]++ + } + } + } + + cm := c.manager.(*counterManager) + + if counter, exists := cm.simpleCounters[kind]; exists { + for mesh, count := range meshCounts { + for i := int64(0); i < count; i++ { + counter.Increment(mesh) + } + } + } + + if kind == meshresource.InstanceKind { + for mesh, distributions := range meshDistributions { + for key, count := range distributions { + if len(key) > 9 && key[:9] == "protocol:" { + if cfg := cm.getDistributionConfig(kind, ProtocolCounter); cfg != nil { + for i := int64(0); i < count; i++ { + cfg.counter.Increment(mesh, key[9:]) + } + } + } else if len(key) > 8 && key[:8] == "release:" { + if cfg := cm.getDistributionConfig(kind, ReleaseCounter); cfg != nil { + for i := int64(0); i < count; i++ { + cfg.counter.Increment(mesh, key[8:]) + } + } + } else if len(key) > 11 && key[:11] == "discovery:" { + if cfg := cm.getDistributionConfig(kind, DiscoveryCounter); cfg != nil { + for i := int64(0); i < count; i++ { + cfg.counter.Increment(mesh, key[11:]) + } + } + } + } + } + } + + return nil +} + func (c *managerComponent) CounterManager() CounterManager { return c.manager } diff --git a/pkg/console/counter/counter.go b/pkg/console/counter/counter.go index c32d35163..6d5e2a1ad 100644 --- a/pkg/console/counter/counter.go +++ b/pkg/console/counter/counter.go @@ -19,70 +19,121 @@ package counter import ( "sync" - "sync/atomic" ) type Counter struct { - name string - value atomic.Int64 + name string + data map[string]int64 + mu sync.RWMutex } func NewCounter(name string) *Counter { - return &Counter{name: name} + return &Counter{ + name: name, + data: make(map[string]int64), + } } func (c *Counter) Get() int64 { - return c.value.Load() + c.mu.RLock() + defer c.mu.RUnlock() + var sum int64 + for _, v := range c.data { + sum += v + } + return sum } -func (c *Counter) Increment() { - c.value.Add(1) +func (c *Counter) GetByGroup(group string) int64 { + if group == "" { + group = "default" + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.data[group] } -func (c *Counter) Decrement() { - for { - current := c.value.Load() - if current == 0 { - return - } - if c.value.CompareAndSwap(current, current-1) { - return +func (c *Counter) Increment(group string) { + if group == "" { + group = "default" + } + c.mu.Lock() + defer c.mu.Unlock() + c.data[group]++ +} + +func (c *Counter) Decrement(group string) { + if group == "" { + group = "default" + } + c.mu.Lock() + defer c.mu.Unlock() + if value, ok := c.data[group]; ok { + value-- + if value <= 0 { + delete(c.data, group) + } else { + c.data[group] = value } } } func (c *Counter) Reset() { - c.value.Store(0) + c.mu.Lock() + defer c.mu.Unlock() + c.data = make(map[string]int64) } type DistributionCounter struct { name string - data map[string]int64 + data map[string]map[string]int64 mu sync.RWMutex } func NewDistributionCounter(name string) *DistributionCounter { return &DistributionCounter{ name: name, - data: make(map[string]int64), + data: make(map[string]map[string]int64), } } -func (c *DistributionCounter) Increment(key string) { +func (c *DistributionCounter) Increment(group, key string) { + if group == "" { + group = "default" + } + if key == "" { + key = "unknown" + } c.mu.Lock() defer c.mu.Unlock() - c.data[key]++ + if c.data[group] == nil { + c.data[group] = make(map[string]int64) + } + c.data[group][key]++ } -func (c *DistributionCounter) Decrement(key string) { +func (c *DistributionCounter) Decrement(group, key string) { + if group == "" { + group = "default" + } + if key == "" { + key = "unknown" + } c.mu.Lock() defer c.mu.Unlock() - if value, ok := c.data[key]; ok { + groupData, exists := c.data[group] + if !exists { + return + } + if value, ok := groupData[key]; ok { value-- if value <= 0 { - delete(c.data, key) + delete(groupData, key) + if len(groupData) == 0 { + delete(c.data, group) + } } else { - c.data[key] = value + groupData[key] = value } } } @@ -90,8 +141,27 @@ func (c *DistributionCounter) Decrement(key string) { func (c *DistributionCounter) GetAll() map[string]int64 { c.mu.RLock() defer c.mu.RUnlock() - result := make(map[string]int64, len(c.data)) - for k, v := range c.data { + result := make(map[string]int64) + for _, groupData := range c.data { + for k, v := range groupData { + result[k] += v + } + } + return result +} + +func (c *DistributionCounter) GetByGroup(group string) map[string]int64 { + if group == "" { + group = "default" + } + c.mu.RLock() + defer c.mu.RUnlock() + groupData, exists := c.data[group] + if !exists { + return map[string]int64{} + } + result := make(map[string]int64, len(groupData)) + for k, v := range groupData { result[k] = v } return result @@ -100,5 +170,5 @@ func (c *DistributionCounter) GetAll() map[string]int64 { func (c *DistributionCounter) Reset() { c.mu.Lock() defer c.mu.Unlock() - c.data = make(map[string]int64) + c.data = make(map[string]map[string]int64) } diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go index b8d0d549c..304b12904 100644 --- a/pkg/console/counter/manager.go +++ b/pkg/console/counter/manager.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model" ) @@ -42,6 +43,8 @@ type CounterManager interface { RegisterDistributionCounter(kind resmodel.ResourceKind, metric CounterType, extractor FieldExtractor) Count(kind resmodel.ResourceKind) int64 Distribution(metric CounterType) map[string]int64 + CountByMesh(kind resmodel.ResourceKind, mesh string) int64 + DistributionByMesh(metric CounterType, mesh string) map[string]int64 Reset() Bind(bus events.EventBus) error } @@ -137,12 +140,28 @@ func (cm *counterManager) Distribution(metric CounterType) map[string]int64 { if !exists { return map[string]int64{} } - raw := counter.GetAll() - result := make(map[string]int64, len(raw)) - for k, v := range raw { - result[k] = v + return counter.GetAll() +} + +func (cm *counterManager) CountByMesh(kind resmodel.ResourceKind, mesh string) int64 { + if mesh == "" { + mesh = "default" + } + if counter, exists := cm.simpleCounters[kind]; exists { + return counter.GetByGroup(mesh) + } + return 0 +} + +func (cm *counterManager) DistributionByMesh(metric CounterType, mesh string) map[string]int64 { + if mesh == "" { + mesh = "default" } - return result + counter, exists := cm.distributionByType[metric] + if !exists { + return map[string]int64{} + } + return counter.GetByGroup(mesh) } func (cm *counterManager) Bind(bus events.EventBus) error { @@ -167,11 +186,14 @@ func (cm *counterManager) Bind(bus events.EventBus) error { if err := bus.Subscribe(subscriber); err != nil { return err } + logger.Infof("CounterManager subscribed to %s events", resourceKind) } + logger.Infof("CounterManager bound to EventBus successfully") return nil } func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event events.Event) error { + logger.Debugf("CounterManager handling %s event, type: %s", kind, event.Type()) if counter := cm.simpleCounters[kind]; counter != nil { processSimpleCounter(counter, event) } @@ -183,36 +205,72 @@ func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event events.E return nil } +func (cm *counterManager) getDistributionConfig(kind resmodel.ResourceKind, metric CounterType) *distributionCounterConfig { + configs := cm.distributionConfigs[kind] + for _, cfg := range configs { + if cfg.counterType == metric { + return cfg + } + } + return nil +} + +func extractMeshName(res resmodel.Resource) string { + if res == nil { + return "default" + } + mesh := res.ResourceMesh() + if mesh == "" { + return "default" + } + return mesh +} + func processSimpleCounter(counter *Counter, event events.Event) { + mesh := extractMeshName(event.NewObj()) + if event.NewObj() == nil { + mesh = extractMeshName(event.OldObj()) + } + switch event.Type() { case cache.Added: - counter.Increment() + counter.Increment(mesh) + logger.Debugf("CounterManager: Increment %s for mesh=%s, current count=%d", counter.name, mesh, counter.GetByGroup(mesh)) case cache.Sync, cache.Replaced: if isNewResourceEvent(event) { - counter.Increment() + counter.Increment(mesh) + logger.Debugf("CounterManager: Increment %s for mesh=%s (Sync/Replaced), current count=%d", counter.name, mesh, counter.GetByGroup(mesh)) } case cache.Deleted: - counter.Decrement() + counter.Decrement(mesh) + logger.Debugf("CounterManager: Decrement %s for mesh=%s, current count=%d", counter.name, mesh, counter.GetByGroup(mesh)) case cache.Updated: - // no-op for simple counters default: } } func processDistributionCounter(cfg *distributionCounterConfig, event events.Event) { + mesh := extractMeshName(event.NewObj()) + if event.NewObj() == nil { + mesh = extractMeshName(event.OldObj()) + } + switch event.Type() { case cache.Added: - cfg.increment(cfg.extractFrom(event.NewObj())) + key := cfg.extractFrom(event.NewObj()) + cfg.counter.Increment(mesh, normalizeDistributionKey(key)) case cache.Sync, cache.Replaced: if isNewResourceEvent(event) { - cfg.increment(cfg.extractFrom(event.NewObj())) + key := cfg.extractFrom(event.NewObj()) + cfg.counter.Increment(mesh, normalizeDistributionKey(key)) } else { cfg.update(event.OldObj(), event.NewObj()) } case cache.Updated: cfg.update(event.OldObj(), event.NewObj()) case cache.Deleted: - cfg.decrement(cfg.extractFrom(event.OldObj())) + key := cfg.extractFrom(event.OldObj()) + cfg.counter.Decrement(mesh, normalizeDistributionKey(key)) default: } } @@ -224,25 +282,19 @@ func (cfg *distributionCounterConfig) extractFrom(res resmodel.Resource) string return cfg.extractor(res) } -func (cfg *distributionCounterConfig) increment(key string) { - cfg.counter.Increment(normalizeDistributionKey(key)) -} - -func (cfg *distributionCounterConfig) decrement(key string) { - cfg.counter.Decrement(normalizeDistributionKey(key)) -} - func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource) { oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj)) newKey := normalizeDistributionKey(cfg.extractFrom(newObj)) if oldKey == newKey { return } + oldMesh := extractMeshName(oldObj) + newMesh := extractMeshName(newObj) if oldObj != nil { - cfg.counter.Decrement(oldKey) + cfg.counter.Decrement(oldMesh, oldKey) } if newObj != nil { - cfg.counter.Increment(newKey) + cfg.counter.Increment(newMesh, newKey) } } diff --git a/pkg/console/handler/overview.go b/pkg/console/handler/overview.go index 85cd6c633..68bb3fd48 100644 --- a/pkg/console/handler/overview.go +++ b/pkg/console/handler/overview.go @@ -62,12 +62,23 @@ func ClusterOverview(ctx consolectx.Context) gin.HandlerFunc { return func(c *gin.Context) { resp := model.NewOverviewResp() if counterMgr := ctx.CounterManager(); counterMgr != nil { - resp.AppCount = counterMgr.Count(meshresource.ApplicationKind) - resp.ServiceCount = counterMgr.Count(meshresource.ServiceProviderMetadataKind) - resp.InsCount = counterMgr.Count(meshresource.InstanceKind) - resp.Protocols = counterMgr.Distribution(counter.ProtocolCounter) - resp.Releases = counterMgr.Distribution(counter.ReleaseCounter) - resp.Discoveries = counterMgr.Distribution(counter.DiscoveryCounter) + mesh := c.Query("mesh") + + if mesh != "" { + resp.AppCount = counterMgr.CountByMesh(meshresource.ApplicationKind, mesh) + resp.ServiceCount = counterMgr.CountByMesh(meshresource.ServiceProviderMetadataKind, mesh) + resp.InsCount = counterMgr.CountByMesh(meshresource.InstanceKind, mesh) + resp.Protocols = counterMgr.DistributionByMesh(counter.ProtocolCounter, mesh) + resp.Releases = counterMgr.DistributionByMesh(counter.ReleaseCounter, mesh) + resp.Discoveries = counterMgr.DistributionByMesh(counter.DiscoveryCounter, mesh) + } else { + resp.AppCount = counterMgr.Count(meshresource.ApplicationKind) + resp.ServiceCount = counterMgr.Count(meshresource.ServiceProviderMetadataKind) + resp.InsCount = counterMgr.Count(meshresource.InstanceKind) + resp.Protocols = counterMgr.Distribution(counter.ProtocolCounter) + resp.Releases = counterMgr.Distribution(counter.ReleaseCounter) + resp.Discoveries = counterMgr.Distribution(counter.DiscoveryCounter) + } } c.JSON(http.StatusOK, model.NewSuccessResp(resp)) } diff --git a/pkg/core/discovery/subscriber/instance.go b/pkg/core/discovery/subscriber/instance.go index 997144039..ded8a67d0 100644 --- a/pkg/core/discovery/subscriber/instance.go +++ b/pkg/core/discovery/subscriber/instance.go @@ -71,6 +71,7 @@ func (s *InstanceEventSubscriber) ProcessEvent(event events.Event) error { instanceRes = oldObj } instanceResList, err := s.instanceStore.ListByIndexes(map[string]string{ + index.ByMeshIndex: instanceRes.Mesh, index.ByInstanceAppNameIndex: instanceRes.Spec.AppName, }) diff --git a/pkg/core/discovery/subscriber/nacos_service.go b/pkg/core/discovery/subscriber/nacos_service.go index 6a83771d3..e5c9c6312 100644 --- a/pkg/core/discovery/subscriber/nacos_service.go +++ b/pkg/core/discovery/subscriber/nacos_service.go @@ -128,6 +128,7 @@ func (n *NacosServiceEventSubscriber) processConsumerMetadataUpsert(serviceRes * return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByServiceConsumerServiceName: serviceName, }) if err != nil { @@ -212,6 +213,7 @@ func (n *NacosServiceEventSubscriber) processRPCInstanceUpsert(serviceRes *meshr return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByRPCInstanceAppName: serviceRes.Name, }) if err != nil { @@ -286,6 +288,7 @@ func (n *NacosServiceEventSubscriber) processServiceConsumerDelete(serviceRes *m return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByServiceConsumerServiceName: serviceRes.Name, }) if err != nil { @@ -309,6 +312,7 @@ func (n *NacosServiceEventSubscriber) processRPCInstanceDelete(serviceRes *meshr return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByRPCInstanceAppName: serviceRes.Name, }) if err != nil { From 5e17b0ac52d4806db3a20701da38d93be05726e3 Mon Sep 17 00:00:00 2001 From: wy770 Date: Sun, 18 Jan 2026 00:04:19 +0800 Subject: [PATCH 2/3] chore: trigger CI From 17329aab02120563e130af1eedd00e7e9046fe42 Mon Sep 17 00:00:00 2001 From: WyRainBow Date: Tue, 20 Jan 2026 17:37:35 +0800 Subject: [PATCH 3/3] Fix counter initialization errors and mesh change detection logic --- pkg/console/counter/component.go | 64 +++++++------------------------- pkg/console/counter/manager.go | 6 +-- 2 files changed, 17 insertions(+), 53 deletions(-) diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go index 0d2344eb9..8c39cb3ac 100644 --- a/pkg/console/counter/component.go +++ b/pkg/console/counter/component.go @@ -22,6 +22,7 @@ import ( "math" "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model" "github.com/apache/dubbo-admin/pkg/core/runtime" @@ -76,7 +77,9 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error { return fmt.Errorf("component %s does not implement store.Router", runtime.ResourceStore) } - c.initializeCountsFromStore(storeRouter) + if err := c.initializeCountsFromStore(storeRouter); err != nil { + logger.Warnf("Failed to initialize counter manager from store: %v", err) + } component, err := rt.GetComponent(runtime.EventBus) if err != nil { @@ -112,9 +115,7 @@ func (c *managerComponent) initializeResourceCount(storeRouter store.Router, kin } allResources := resourceStore.List() - - meshCounts := make(map[string]int64) - meshDistributions := make(map[string]map[string]int64) + cm := c.manager.(*counterManager) for _, obj := range allResources { resource, ok := obj.(resmodel.Resource) @@ -127,66 +128,29 @@ func (c *managerComponent) initializeResourceCount(storeRouter store.Router, kin mesh = "default" } - meshCounts[mesh]++ + if counter, exists := cm.simpleCounters[kind]; exists { + counter.Increment(mesh) + } if kind == meshresource.InstanceKind { instance, ok := resource.(*meshresource.InstanceResource) if ok && instance.Spec != nil { protocol := instance.Spec.GetProtocol() if protocol != "" { - if meshDistributions[mesh] == nil { - meshDistributions[mesh] = make(map[string]int64) + if cfg := cm.getDistributionConfig(kind, ProtocolCounter); cfg != nil { + cfg.counter.Increment(mesh, protocol) } - meshDistributions[mesh]["protocol:"+protocol]++ } releaseVersion := instance.Spec.GetReleaseVersion() if releaseVersion != "" { - if meshDistributions[mesh] == nil { - meshDistributions[mesh] = make(map[string]int64) + if cfg := cm.getDistributionConfig(kind, ReleaseCounter); cfg != nil { + cfg.counter.Increment(mesh, releaseVersion) } - meshDistributions[mesh]["release:"+releaseVersion]++ } - if meshDistributions[mesh] == nil { - meshDistributions[mesh] = make(map[string]int64) - } - meshDistributions[mesh]["discovery:"+mesh]++ - } - } - } - - cm := c.manager.(*counterManager) - - if counter, exists := cm.simpleCounters[kind]; exists { - for mesh, count := range meshCounts { - for i := int64(0); i < count; i++ { - counter.Increment(mesh) - } - } - } - - if kind == meshresource.InstanceKind { - for mesh, distributions := range meshDistributions { - for key, count := range distributions { - if len(key) > 9 && key[:9] == "protocol:" { - if cfg := cm.getDistributionConfig(kind, ProtocolCounter); cfg != nil { - for i := int64(0); i < count; i++ { - cfg.counter.Increment(mesh, key[9:]) - } - } - } else if len(key) > 8 && key[:8] == "release:" { - if cfg := cm.getDistributionConfig(kind, ReleaseCounter); cfg != nil { - for i := int64(0); i < count; i++ { - cfg.counter.Increment(mesh, key[8:]) - } - } - } else if len(key) > 11 && key[:11] == "discovery:" { - if cfg := cm.getDistributionConfig(kind, DiscoveryCounter); cfg != nil { - for i := int64(0); i < count; i++ { - cfg.counter.Increment(mesh, key[11:]) - } - } + if cfg := cm.getDistributionConfig(kind, DiscoveryCounter); cfg != nil { + cfg.counter.Increment(mesh, mesh) } } } diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go index 304b12904..581139fa5 100644 --- a/pkg/console/counter/manager.go +++ b/pkg/console/counter/manager.go @@ -285,11 +285,11 @@ func (cfg *distributionCounterConfig) extractFrom(res resmodel.Resource) string func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource) { oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj)) newKey := normalizeDistributionKey(cfg.extractFrom(newObj)) - if oldKey == newKey { - return - } oldMesh := extractMeshName(oldObj) newMesh := extractMeshName(newObj) + if oldKey == newKey && oldMesh == newMesh { + return + } if oldObj != nil { cfg.counter.Decrement(oldMesh, oldKey) }