diff --git a/pkg/console/counter/component.go b/pkg/console/counter/component.go index 0217ee2f7..8c39cb3ac 100644 --- a/pkg/console/counter/component.go +++ b/pkg/console/counter/component.go @@ -22,7 +22,11 @@ import ( "math" "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" + meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" + resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model" "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" ) const ComponentType runtime.ComponentType = "counter manager" @@ -41,7 +45,7 @@ var _ ManagerComponent = &managerComponent{} func (c *managerComponent) RequiredDependencies() []runtime.ComponentType { return []runtime.ComponentType{ runtime.ResourceStore, - runtime.EventBus, // Counter depends on EventBus to subscribe to events + runtime.EventBus, } } @@ -64,6 +68,19 @@ func (c *managerComponent) Init(runtime.BuilderContext) error { } func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error { + storeComponent, err := rt.GetComponent(runtime.ResourceStore) + if err != nil { + return err + } + storeRouter, ok := storeComponent.(store.Router) + if !ok { + return fmt.Errorf("component %s does not implement store.Router", runtime.ResourceStore) + } + + if err := c.initializeCountsFromStore(storeRouter); err != nil { + logger.Warnf("Failed to initialize counter manager from store: %v", err) + } + component, err := rt.GetComponent(runtime.EventBus) if err != nil { return err @@ -75,6 +92,73 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error { return c.manager.Bind(bus) } +func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router) error { + if err := c.initializeResourceCount(storeRouter, meshresource.InstanceKind); err != nil { + return fmt.Errorf("failed to initialize instance count: %w", err) + } + + if err := c.initializeResourceCount(storeRouter, meshresource.ApplicationKind); err != nil { + return fmt.Errorf("failed to initialize application count: %w", err) + } + + if err := c.initializeResourceCount(storeRouter, meshresource.ServiceProviderMetadataKind); err != nil { + return fmt.Errorf("failed to initialize service provider metadata count: %w", err) + } + + return nil +} + +func (c *managerComponent) initializeResourceCount(storeRouter store.Router, kind resmodel.ResourceKind) error { + resourceStore, err := storeRouter.ResourceKindRoute(kind) + if err != nil { + return err + } + + allResources := resourceStore.List() + cm := c.manager.(*counterManager) + + for _, obj := range allResources { + resource, ok := obj.(resmodel.Resource) + if !ok { + continue + } + + mesh := resource.ResourceMesh() + if mesh == "" { + mesh = "default" + } + + if counter, exists := cm.simpleCounters[kind]; exists { + counter.Increment(mesh) + } + + if kind == meshresource.InstanceKind { + instance, ok := resource.(*meshresource.InstanceResource) + if ok && instance.Spec != nil { + protocol := instance.Spec.GetProtocol() + if protocol != "" { + if cfg := cm.getDistributionConfig(kind, ProtocolCounter); cfg != nil { + cfg.counter.Increment(mesh, protocol) + } + } + + releaseVersion := instance.Spec.GetReleaseVersion() + if releaseVersion != "" { + if cfg := cm.getDistributionConfig(kind, ReleaseCounter); cfg != nil { + cfg.counter.Increment(mesh, releaseVersion) + } + } + + if cfg := cm.getDistributionConfig(kind, DiscoveryCounter); cfg != nil { + cfg.counter.Increment(mesh, mesh) + } + } + } + } + + return nil +} + func (c *managerComponent) CounterManager() CounterManager { return c.manager } diff --git a/pkg/console/counter/counter.go b/pkg/console/counter/counter.go index c32d35163..6d5e2a1ad 100644 --- a/pkg/console/counter/counter.go +++ b/pkg/console/counter/counter.go @@ -19,70 +19,121 @@ package counter import ( "sync" - "sync/atomic" ) type Counter struct { - name string - value atomic.Int64 + name string + data map[string]int64 + mu sync.RWMutex } func NewCounter(name string) *Counter { - return &Counter{name: name} + return &Counter{ + name: name, + data: make(map[string]int64), + } } func (c *Counter) Get() int64 { - return c.value.Load() + c.mu.RLock() + defer c.mu.RUnlock() + var sum int64 + for _, v := range c.data { + sum += v + } + return sum } -func (c *Counter) Increment() { - c.value.Add(1) +func (c *Counter) GetByGroup(group string) int64 { + if group == "" { + group = "default" + } + c.mu.RLock() + defer c.mu.RUnlock() + return c.data[group] } -func (c *Counter) Decrement() { - for { - current := c.value.Load() - if current == 0 { - return - } - if c.value.CompareAndSwap(current, current-1) { - return +func (c *Counter) Increment(group string) { + if group == "" { + group = "default" + } + c.mu.Lock() + defer c.mu.Unlock() + c.data[group]++ +} + +func (c *Counter) Decrement(group string) { + if group == "" { + group = "default" + } + c.mu.Lock() + defer c.mu.Unlock() + if value, ok := c.data[group]; ok { + value-- + if value <= 0 { + delete(c.data, group) + } else { + c.data[group] = value } } } func (c *Counter) Reset() { - c.value.Store(0) + c.mu.Lock() + defer c.mu.Unlock() + c.data = make(map[string]int64) } type DistributionCounter struct { name string - data map[string]int64 + data map[string]map[string]int64 mu sync.RWMutex } func NewDistributionCounter(name string) *DistributionCounter { return &DistributionCounter{ name: name, - data: make(map[string]int64), + data: make(map[string]map[string]int64), } } -func (c *DistributionCounter) Increment(key string) { +func (c *DistributionCounter) Increment(group, key string) { + if group == "" { + group = "default" + } + if key == "" { + key = "unknown" + } c.mu.Lock() defer c.mu.Unlock() - c.data[key]++ + if c.data[group] == nil { + c.data[group] = make(map[string]int64) + } + c.data[group][key]++ } -func (c *DistributionCounter) Decrement(key string) { +func (c *DistributionCounter) Decrement(group, key string) { + if group == "" { + group = "default" + } + if key == "" { + key = "unknown" + } c.mu.Lock() defer c.mu.Unlock() - if value, ok := c.data[key]; ok { + groupData, exists := c.data[group] + if !exists { + return + } + if value, ok := groupData[key]; ok { value-- if value <= 0 { - delete(c.data, key) + delete(groupData, key) + if len(groupData) == 0 { + delete(c.data, group) + } } else { - c.data[key] = value + groupData[key] = value } } } @@ -90,8 +141,27 @@ func (c *DistributionCounter) Decrement(key string) { func (c *DistributionCounter) GetAll() map[string]int64 { c.mu.RLock() defer c.mu.RUnlock() - result := make(map[string]int64, len(c.data)) - for k, v := range c.data { + result := make(map[string]int64) + for _, groupData := range c.data { + for k, v := range groupData { + result[k] += v + } + } + return result +} + +func (c *DistributionCounter) GetByGroup(group string) map[string]int64 { + if group == "" { + group = "default" + } + c.mu.RLock() + defer c.mu.RUnlock() + groupData, exists := c.data[group] + if !exists { + return map[string]int64{} + } + result := make(map[string]int64, len(groupData)) + for k, v := range groupData { result[k] = v } return result @@ -100,5 +170,5 @@ func (c *DistributionCounter) GetAll() map[string]int64 { func (c *DistributionCounter) Reset() { c.mu.Lock() defer c.mu.Unlock() - c.data = make(map[string]int64) + c.data = make(map[string]map[string]int64) } diff --git a/pkg/console/counter/manager.go b/pkg/console/counter/manager.go index b8d0d549c..581139fa5 100644 --- a/pkg/console/counter/manager.go +++ b/pkg/console/counter/manager.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/apache/dubbo-admin/pkg/core/events" + "github.com/apache/dubbo-admin/pkg/core/logger" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model" ) @@ -42,6 +43,8 @@ type CounterManager interface { RegisterDistributionCounter(kind resmodel.ResourceKind, metric CounterType, extractor FieldExtractor) Count(kind resmodel.ResourceKind) int64 Distribution(metric CounterType) map[string]int64 + CountByMesh(kind resmodel.ResourceKind, mesh string) int64 + DistributionByMesh(metric CounterType, mesh string) map[string]int64 Reset() Bind(bus events.EventBus) error } @@ -137,12 +140,28 @@ func (cm *counterManager) Distribution(metric CounterType) map[string]int64 { if !exists { return map[string]int64{} } - raw := counter.GetAll() - result := make(map[string]int64, len(raw)) - for k, v := range raw { - result[k] = v + return counter.GetAll() +} + +func (cm *counterManager) CountByMesh(kind resmodel.ResourceKind, mesh string) int64 { + if mesh == "" { + mesh = "default" + } + if counter, exists := cm.simpleCounters[kind]; exists { + return counter.GetByGroup(mesh) + } + return 0 +} + +func (cm *counterManager) DistributionByMesh(metric CounterType, mesh string) map[string]int64 { + if mesh == "" { + mesh = "default" } - return result + counter, exists := cm.distributionByType[metric] + if !exists { + return map[string]int64{} + } + return counter.GetByGroup(mesh) } func (cm *counterManager) Bind(bus events.EventBus) error { @@ -167,11 +186,14 @@ func (cm *counterManager) Bind(bus events.EventBus) error { if err := bus.Subscribe(subscriber); err != nil { return err } + logger.Infof("CounterManager subscribed to %s events", resourceKind) } + logger.Infof("CounterManager bound to EventBus successfully") return nil } func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event events.Event) error { + logger.Debugf("CounterManager handling %s event, type: %s", kind, event.Type()) if counter := cm.simpleCounters[kind]; counter != nil { processSimpleCounter(counter, event) } @@ -183,36 +205,72 @@ func (cm *counterManager) handleEvent(kind resmodel.ResourceKind, event events.E return nil } +func (cm *counterManager) getDistributionConfig(kind resmodel.ResourceKind, metric CounterType) *distributionCounterConfig { + configs := cm.distributionConfigs[kind] + for _, cfg := range configs { + if cfg.counterType == metric { + return cfg + } + } + return nil +} + +func extractMeshName(res resmodel.Resource) string { + if res == nil { + return "default" + } + mesh := res.ResourceMesh() + if mesh == "" { + return "default" + } + return mesh +} + func processSimpleCounter(counter *Counter, event events.Event) { + mesh := extractMeshName(event.NewObj()) + if event.NewObj() == nil { + mesh = extractMeshName(event.OldObj()) + } + switch event.Type() { case cache.Added: - counter.Increment() + counter.Increment(mesh) + logger.Debugf("CounterManager: Increment %s for mesh=%s, current count=%d", counter.name, mesh, counter.GetByGroup(mesh)) case cache.Sync, cache.Replaced: if isNewResourceEvent(event) { - counter.Increment() + counter.Increment(mesh) + logger.Debugf("CounterManager: Increment %s for mesh=%s (Sync/Replaced), current count=%d", counter.name, mesh, counter.GetByGroup(mesh)) } case cache.Deleted: - counter.Decrement() + counter.Decrement(mesh) + logger.Debugf("CounterManager: Decrement %s for mesh=%s, current count=%d", counter.name, mesh, counter.GetByGroup(mesh)) case cache.Updated: - // no-op for simple counters default: } } func processDistributionCounter(cfg *distributionCounterConfig, event events.Event) { + mesh := extractMeshName(event.NewObj()) + if event.NewObj() == nil { + mesh = extractMeshName(event.OldObj()) + } + switch event.Type() { case cache.Added: - cfg.increment(cfg.extractFrom(event.NewObj())) + key := cfg.extractFrom(event.NewObj()) + cfg.counter.Increment(mesh, normalizeDistributionKey(key)) case cache.Sync, cache.Replaced: if isNewResourceEvent(event) { - cfg.increment(cfg.extractFrom(event.NewObj())) + key := cfg.extractFrom(event.NewObj()) + cfg.counter.Increment(mesh, normalizeDistributionKey(key)) } else { cfg.update(event.OldObj(), event.NewObj()) } case cache.Updated: cfg.update(event.OldObj(), event.NewObj()) case cache.Deleted: - cfg.decrement(cfg.extractFrom(event.OldObj())) + key := cfg.extractFrom(event.OldObj()) + cfg.counter.Decrement(mesh, normalizeDistributionKey(key)) default: } } @@ -224,25 +282,19 @@ func (cfg *distributionCounterConfig) extractFrom(res resmodel.Resource) string return cfg.extractor(res) } -func (cfg *distributionCounterConfig) increment(key string) { - cfg.counter.Increment(normalizeDistributionKey(key)) -} - -func (cfg *distributionCounterConfig) decrement(key string) { - cfg.counter.Decrement(normalizeDistributionKey(key)) -} - func (cfg *distributionCounterConfig) update(oldObj, newObj resmodel.Resource) { oldKey := normalizeDistributionKey(cfg.extractFrom(oldObj)) newKey := normalizeDistributionKey(cfg.extractFrom(newObj)) - if oldKey == newKey { + oldMesh := extractMeshName(oldObj) + newMesh := extractMeshName(newObj) + if oldKey == newKey && oldMesh == newMesh { return } if oldObj != nil { - cfg.counter.Decrement(oldKey) + cfg.counter.Decrement(oldMesh, oldKey) } if newObj != nil { - cfg.counter.Increment(newKey) + cfg.counter.Increment(newMesh, newKey) } } diff --git a/pkg/console/handler/overview.go b/pkg/console/handler/overview.go index d0ad7c010..1d9dc9715 100644 --- a/pkg/console/handler/overview.go +++ b/pkg/console/handler/overview.go @@ -70,12 +70,23 @@ func ClusterOverview(ctx consolectx.Context) gin.HandlerFunc { return func(c *gin.Context) { resp := model.NewOverviewResp() if counterMgr := ctx.CounterManager(); counterMgr != nil { - resp.AppCount = counterMgr.Count(meshresource.ApplicationKind) - resp.ServiceCount = counterMgr.Count(meshresource.ServiceProviderMetadataKind) - resp.InsCount = counterMgr.Count(meshresource.InstanceKind) - resp.Protocols = counterMgr.Distribution(counter.ProtocolCounter) - resp.Releases = counterMgr.Distribution(counter.ReleaseCounter) - resp.Discoveries = counterMgr.Distribution(counter.DiscoveryCounter) + mesh := c.Query("mesh") + + if mesh != "" { + resp.AppCount = counterMgr.CountByMesh(meshresource.ApplicationKind, mesh) + resp.ServiceCount = counterMgr.CountByMesh(meshresource.ServiceProviderMetadataKind, mesh) + resp.InsCount = counterMgr.CountByMesh(meshresource.InstanceKind, mesh) + resp.Protocols = counterMgr.DistributionByMesh(counter.ProtocolCounter, mesh) + resp.Releases = counterMgr.DistributionByMesh(counter.ReleaseCounter, mesh) + resp.Discoveries = counterMgr.DistributionByMesh(counter.DiscoveryCounter, mesh) + } else { + resp.AppCount = counterMgr.Count(meshresource.ApplicationKind) + resp.ServiceCount = counterMgr.Count(meshresource.ServiceProviderMetadataKind) + resp.InsCount = counterMgr.Count(meshresource.InstanceKind) + resp.Protocols = counterMgr.Distribution(counter.ProtocolCounter) + resp.Releases = counterMgr.Distribution(counter.ReleaseCounter) + resp.Discoveries = counterMgr.Distribution(counter.DiscoveryCounter) + } } c.JSON(http.StatusOK, model.NewSuccessResp(resp)) } diff --git a/pkg/core/discovery/subscriber/instance.go b/pkg/core/discovery/subscriber/instance.go index 997144039..ded8a67d0 100644 --- a/pkg/core/discovery/subscriber/instance.go +++ b/pkg/core/discovery/subscriber/instance.go @@ -71,6 +71,7 @@ func (s *InstanceEventSubscriber) ProcessEvent(event events.Event) error { instanceRes = oldObj } instanceResList, err := s.instanceStore.ListByIndexes(map[string]string{ + index.ByMeshIndex: instanceRes.Mesh, index.ByInstanceAppNameIndex: instanceRes.Spec.AppName, }) diff --git a/pkg/core/discovery/subscriber/nacos_service.go b/pkg/core/discovery/subscriber/nacos_service.go index 6a83771d3..e5c9c6312 100644 --- a/pkg/core/discovery/subscriber/nacos_service.go +++ b/pkg/core/discovery/subscriber/nacos_service.go @@ -128,6 +128,7 @@ func (n *NacosServiceEventSubscriber) processConsumerMetadataUpsert(serviceRes * return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByServiceConsumerServiceName: serviceName, }) if err != nil { @@ -212,6 +213,7 @@ func (n *NacosServiceEventSubscriber) processRPCInstanceUpsert(serviceRes *meshr return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByRPCInstanceAppName: serviceRes.Name, }) if err != nil { @@ -286,6 +288,7 @@ func (n *NacosServiceEventSubscriber) processServiceConsumerDelete(serviceRes *m return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByServiceConsumerServiceName: serviceRes.Name, }) if err != nil { @@ -309,6 +312,7 @@ func (n *NacosServiceEventSubscriber) processRPCInstanceDelete(serviceRes *meshr return err } resources, err := st.ListByIndexes(map[string]string{ + index.ByMeshIndex: serviceRes.Mesh, index.ByRPCInstanceAppName: serviceRes.Name, }) if err != nil {