From 9f0e521c7305efe067f5b3a2a7656ce089a858ad Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:07:52 +0800 Subject: [PATCH 01/11] feat: implement distributed lock by gorm --- pkg/console/context/context.go | 10 + pkg/console/service/condition_rule.go | 53 ++++- pkg/console/service/configurator_rule.go | 49 ++++- pkg/console/service/tag_rule.go | 51 ++++- pkg/core/bootstrap/bootstrap.go | 8 + pkg/core/lock/component.go | 148 +++++++++++++ pkg/core/lock/errors.go | 28 +++ pkg/core/lock/gorm_lock.go | 261 +++++++++++++++++++++++ pkg/core/lock/init.go | 26 +++ pkg/core/lock/lock.go | 50 +++++ pkg/core/lock/lock_model.go | 37 ++++ 11 files changed, 709 insertions(+), 12 deletions(-) create mode 100644 pkg/core/lock/component.go create mode 100644 pkg/core/lock/errors.go create mode 100644 pkg/core/lock/gorm_lock.go create mode 100644 pkg/core/lock/init.go create mode 100644 pkg/core/lock/lock.go create mode 100644 pkg/core/lock/lock_model.go diff --git a/pkg/console/context/context.go b/pkg/console/context/context.go index 9133c4f93..ec6221316 100644 --- a/pkg/console/context/context.go +++ b/pkg/console/context/context.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-admin/pkg/config/app" "github.com/apache/dubbo-admin/pkg/console/counter" + "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/core/manager" "github.com/apache/dubbo-admin/pkg/core/runtime" ) @@ -29,6 +30,7 @@ import ( type Context interface { ResourceManager() manager.ResourceManager CounterManager() counter.CounterManager + LockManager() lock.Lock Config() app.AdminConfig @@ -71,3 +73,11 @@ func (c *context) CounterManager() counter.CounterManager { } return managerComp.CounterManager() } + +func (c *context) LockManager() lock.Lock { + distributedLock, err := lock.GetLockFromRuntime(c.coreRt) + if err != nil { + return nil + } + return distributedLock +} diff --git a/pkg/console/service/condition_rule.go b/pkg/console/service/condition_rule.go index d7890cd6d..ff295b1c2 100644 --- a/pkg/console/service/condition_rule.go +++ b/pkg/console/service/condition_rule.go @@ -18,6 +18,9 @@ package service import ( + "fmt" + "time" + "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/console/model" "github.com/apache/dubbo-admin/pkg/core/logger" @@ -73,14 +76,46 @@ func GetConditionRule(ctx context.Context, name string, mesh string) (*meshresou } func UpdateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + // Lock not available, proceed without lock protection + return updateConditionRuleUnsafe(ctx, name, res) + } + + // Use distributed lock to prevent concurrent modifications + lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return updateConditionRuleUnsafe(ctx, name, res) + }) +} + +func updateConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { if err := ctx.ResourceManager().Update(res); err != nil { logger.Warnf("update %s condition failed with error: %s", name, err.Error()) return err } + logger.Infof("Condition route %s updated successfully", name) return nil } func CreateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + return createConditionRuleUnsafe(ctx, name, res) + } + + lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return createConditionRuleUnsafe(ctx, name, res) + }) +} + +// createConditionRuleUnsafe performs the actual creation without lock protection +func createConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { if err := ctx.ResourceManager().Add(res); err != nil { logger.Warnf("create %s condition failed with error: %s", name, err.Error()) return err @@ -89,8 +124,20 @@ func CreateConditionRule(ctx context.Context, name string, res *meshresource.Con } func DeleteConditionRule(ctx context.Context, name string, mesh string) error { - if err := ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name)); err != nil { - return err + lock := ctx.LockManager() + if lock == nil { + return ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name)) } - return nil + + lockKey := fmt.Sprintf("condition_route:%s:%s", mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + err := ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name)) + if err != nil { + logger.Warnf("delete %s condition failed with error: %s", name, err.Error()) + return err + } + return nil + }) } diff --git a/pkg/console/service/configurator_rule.go b/pkg/console/service/configurator_rule.go index 1e7b033a9..2ff8599b4 100644 --- a/pkg/console/service/configurator_rule.go +++ b/pkg/console/service/configurator_rule.go @@ -18,6 +18,9 @@ package service import ( + "fmt" + "time" + consolectx "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/manager" @@ -38,6 +41,20 @@ func GetConfigurator(ctx consolectx.Context, name string, mesh string) (*meshres } func UpdateConfigurator(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { + lock := ctx.LockManager() + if lock == nil { + return updateConfiguratorUnsafe(ctx, name, res) + } + + lockKey := fmt.Sprintf("dynamic_config:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return updateConfiguratorUnsafe(ctx, name, res) + }) +} + +func updateConfiguratorUnsafe(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { if err := ctx.ResourceManager().Update(res); err != nil { logger.Warnf("update %s configurator failed with error: %s", name, err.Error()) return err @@ -46,6 +63,20 @@ func UpdateConfigurator(ctx consolectx.Context, name string, res *meshresource.D } func CreateConfigurator(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { + lock := ctx.LockManager() + if lock == nil { + return createConfiguratorUnsafe(ctx, name, res) + } + + lockKey := fmt.Sprintf("dynamic_config:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return createConfiguratorUnsafe(ctx, name, res) + }) +} + +func createConfiguratorUnsafe(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { if err := ctx.ResourceManager().Add(res); err != nil { logger.Warnf("create %s configurator failed with error: %s", name, err.Error()) return err @@ -54,9 +85,19 @@ func CreateConfigurator(ctx consolectx.Context, name string, res *meshresource.D } func DeleteConfigurator(ctx consolectx.Context, name string, mesh string) error { - if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil { - logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) - return err + lock := ctx.LockManager() + if lock == nil { + return ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)) } - return nil + + lockKey := fmt.Sprintf("dynamic_config:%s:%s", mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil { + logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) + return err + } + return nil + }) } diff --git a/pkg/console/service/tag_rule.go b/pkg/console/service/tag_rule.go index 7c4048feb..4bc3acf0d 100644 --- a/pkg/console/service/tag_rule.go +++ b/pkg/console/service/tag_rule.go @@ -18,6 +18,9 @@ package service import ( + "fmt" + "time" + consolectx "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/manager" @@ -38,6 +41,20 @@ func GetTagRule(ctx consolectx.Context, name string, mesh string) (*meshresource } func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + return updateTagRuleUnsafe(ctx, res) + } + + lockKey := fmt.Sprintf("tag_route:%s:%s", res.Mesh, res.Name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return updateTagRuleUnsafe(ctx, res) + }) +} + +func updateTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResource) error { err := ctx.ResourceManager().Update(res) if err != nil { logger.Warnf("update tag rule %s error: %v", res.Name, err) @@ -47,6 +64,20 @@ func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e } func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + return createTagRuleUnsafe(ctx, res) + } + + lockKey := fmt.Sprintf("tag_route:%s:%s", res.Mesh, res.Name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return createTagRuleUnsafe(ctx, res) + }) +} + +func createTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResource) error { err := ctx.ResourceManager().Add(res) if err != nil { logger.Warnf("create tag rule %s error: %v", res.Name, err) @@ -56,10 +87,20 @@ func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e } func DeleteTagRule(ctx consolectx.Context, name string, mesh string) error { - err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) - if err != nil { - logger.Warnf("delete tag rule %s error: %v", name, err) - return err + lock := ctx.LockManager() + if lock == nil { + return ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) } - return nil + + lockKey := fmt.Sprintf("tag_route:%s:%s", mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) + if err != nil { + logger.Warnf("delete tag rule %s error: %v", name, err) + return err + } + return nil + }) } diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go index f3ef6f0d4..0b3b07ede 100644 --- a/pkg/core/bootstrap/bootstrap.go +++ b/pkg/core/bootstrap/bootstrap.go @@ -20,6 +20,7 @@ package bootstrap import ( "context" "fmt" + "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/common/bizerror" "github.com/apache/dubbo-admin/pkg/config/app" @@ -177,3 +178,10 @@ func initAndActivateComponent(builder *runtime.Builder, comp runtime.Component) } return nil } +func initDistributedLock(builder *runtime.Builder) error { + comp, err := runtime.ComponentRegistry().Get(lock.DistributedLockComponent) + if err != nil { + return err + } + return initAndActivateComponent(builder, comp) +} diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go new file mode 100644 index 000000000..43416975f --- /dev/null +++ b/pkg/core/lock/component.go @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +const ( + // DistributedLockComponent is the component type for distributed lock + DistributedLockComponent runtime.ComponentType = "distributed lock" +) + +// Component implements the runtime.Component interface for distributed lock +type Component struct { + lock Lock +} + +// NewComponent creates a new distributed lock component +func NewComponent() *Component { + return &Component{} +} + +// Type returns the component type +func (c *Component) Type() runtime.ComponentType { + return DistributedLockComponent +} + +// Order indicates the initialization order +// Lock should be initialized after Store (Order 100) but before other services +func (c *Component) Order() int { + return 90 // After Store, before Console +} + +// Init initializes the distributed lock component +func (c *Component) Init(ctx runtime.BuilderContext) error { + // Get the store component to access connection pool + storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) + if err != nil { + return err + } + + // Try to extract connection pool from store component + // We need to use type assertion with the proper interface + type ConnectionPoolProvider interface { + GetConnectionPool() *dbcommon.ConnectionPool + } + + storeWithPool, ok := storeComp.(ConnectionPoolProvider) + if !ok { + // For memory store or other stores without connection pool + logger.Warnf("Store component does not provide connection pool, distributed lock will not be available") + return nil + } + + pool := storeWithPool.GetConnectionPool() + if pool == nil { + logger.Warnf("Connection pool is nil, distributed lock will not be available") + return nil + } + + // Create GORM-based lock implementation using NewGormLock + c.lock = NewGormLock(pool) + + // Initialize the lock table + db := pool.GetDB() + if err := db.AutoMigrate(&LockRecord{}); err != nil { + return errors.Wrap(err, "failed to migrate lock table") + } + + logger.Info("Distributed lock component initialized successfully") + return nil +} + +// Start starts the distributed lock component +func (c *Component) Start(rt runtime.Runtime, stop <-chan struct{}) error { + if c.lock == nil { + logger.Warn("Distributed lock not available, skipping") + return nil + } + + // Start background cleanup task + ticker := time.NewTicker(5 * time.Minute) // Cleanup every 5 minutes + defer ticker.Stop() + + logger.Info("Distributed lock cleanup task started") + + for { + select { + case <-stop: + logger.Info("Distributed lock cleanup task stopped") + return nil + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := c.lock.CleanupExpiredLocks(ctx); err != nil { + logger.Errorf("Failed to cleanup expired locks: %v", err) + } + cancel() + } + } +} + +// GetLock returns the lock instance +func (c *Component) GetLock() Lock { + return c.lock +} + +// GetLockFromRuntime extracts the lock instance from runtime +func GetLockFromRuntime(rt runtime.Runtime) (Lock, error) { + comp, err := rt.GetComponent(DistributedLockComponent) + if err != nil { + return nil, err + } + + lockComp, ok := comp.(*Component) + if !ok { + // 修正:使用标准错误处理 + return nil, errors.Errorf("component %s is not a valid lock component", DistributedLockComponent) + } + + if lockComp.lock == nil { + return nil, errors.New("distributed lock is not available (possibly using memory store)") + } + + return lockComp.GetLock(), nil +} diff --git a/pkg/core/lock/errors.go b/pkg/core/lock/errors.go new file mode 100644 index 000000000..dc7914276 --- /dev/null +++ b/pkg/core/lock/errors.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import "errors" + +var ( + // ErrLockNotHeld is returned when trying to release a lock that is not held + ErrLockNotHeld = errors.New("lock not held by this owner") + + // ErrLockExpired is returned when the lock has expired + ErrLockExpired = errors.New("lock has expired") +) diff --git a/pkg/core/lock/gorm_lock.go b/pkg/core/lock/gorm_lock.go new file mode 100644 index 000000000..14fe27581 --- /dev/null +++ b/pkg/core/lock/gorm_lock.go @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) + } + + // Check if we got the lock by verifying the owner + var existingLock LockRecord + if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { + return fmt.Errorf("failed to verify lock ownership: %w", err) + } + + // Determine if we acquired the lock + acquired = existingLock.Owner == g.owner + return nil + }) + + if err != nil { + return false, err + } + + if acquired { + logger.Debugf("Lock acquired: key=%s, owner=%s, ttl=%v", key, g.owner, ttl) + } + + return acquired, nil +} + +// Unlock releases a lock held by this instance +func (g *GormLock) Unlock(ctx context.Context, key string) error { + db := g.pool.GetDB().WithContext(ctx) + + result := db.Where("lock_key = ? AND owner = ?", key, g.owner). + Delete(&LockRecord{}) + + if result.Error != nil { + return fmt.Errorf("failed to release lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return ErrLockNotHeld + } + + logger.Debugf("Lock released: key=%s, owner=%s", key, g.owner) + return nil +} + +// Renew extends the TTL of a lock held by this instance +func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) error { + db := g.pool.GetDB().WithContext(ctx) + newExpireAt := time.Now().Add(ttl) + + result := db.Model(&LockRecord{}). + Where("lock_key = ? AND owner = ?", key, g.owner). + Update("expire_at", newExpireAt) + + if result.Error != nil { + return fmt.Errorf("failed to renew lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return ErrLockNotHeld + } + + logger.Debugf("Lock renewed: key=%s, owner=%s, new_expire_at=%v", key, g.owner, newExpireAt) + return nil +} + +// IsLocked checks if a lock is currently held (by anyone) +func (g *GormLock) IsLocked(ctx context.Context, key string) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + + var count int64 + err := db.Model(&LockRecord{}). + Where("lock_key = ? AND expire_at > ?", key, time.Now()). + Count(&count).Error + + if err != nil { + return false, fmt.Errorf("failed to check lock status: %w", err) + } + + return count > 0, nil +} + +// WithLock executes a function while holding a lock +// It automatically acquires the lock, executes the function, and releases the lock +// If TTL is longer than 10 seconds, it will automatically renew the lock until the function completes +func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error { + // Acquire lock + if err := g.Lock(ctx, key, ttl); err != nil { + return fmt.Errorf("failed to acquire lock: %w", err) + } + + // Ensure lock is released + defer func() { + // Use background context for unlock to ensure it completes even if ctx is cancelled + unlockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := g.Unlock(unlockCtx, key); err != nil { + logger.Errorf("Failed to release lock %s: %v", key, err) + } + }() + + // Start auto-renewal if TTL is long enough + var renewDone chan struct{} + if ttl > 10*time.Second { + renewDone = make(chan struct{}) + go g.autoRenew(ctx, key, ttl, renewDone) + defer close(renewDone) + } + + // Execute the function + return fn() +} + +// autoRenew periodically renews the lock until done channel is closed +func (g *GormLock) autoRenew(ctx context.Context, key string, ttl time.Duration, done <-chan struct{}) { + // Renew at 1/3 of TTL to ensure lock doesn't expire + renewInterval := ttl / 3 + ticker := time.NewTicker(renewInterval) + defer ticker.Stop() + + logger.Debugf("Auto-renewal started for lock %s (interval: %v)", key, renewInterval) + + for { + select { + case <-done: + logger.Debugf("Auto-renewal stopped for lock %s (done signal)", key) + return + case <-ctx.Done(): + logger.Debugf("Auto-renewal stopped for lock %s (context cancelled)", key) + return + case <-ticker.C: + renewCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := g.Renew(renewCtx, key, ttl); err != nil { + logger.Warnf("Failed to renew lock %s: %v", key, err) + cancel() + return + } + cancel() + logger.Debugf("Lock %s renewed successfully", key) + } + } +} + +// CleanupExpiredLocks removes all expired locks from the database +// This should be called periodically as a maintenance task +func (g *GormLock) CleanupExpiredLocks(ctx context.Context) error { + db := g.pool.GetDB().WithContext(ctx) + + result := db.Where("expire_at < ?", time.Now()).Delete(&LockRecord{}) + if result.Error != nil { + return fmt.Errorf("failed to cleanup expired locks: %w", result.Error) + } + + if result.RowsAffected > 0 { + logger.Infof("Cleaned up %d expired locks", result.RowsAffected) + } + + return nil +} diff --git a/pkg/core/lock/init.go b/pkg/core/lock/init.go new file mode 100644 index 000000000..5b05fa2fd --- /dev/null +++ b/pkg/core/lock/init.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "github.com/apache/dubbo-admin/pkg/core/runtime" +) + +func init() { + runtime.RegisterComponent(NewComponent()) +} diff --git a/pkg/core/lock/lock.go b/pkg/core/lock/lock.go new file mode 100644 index 000000000..4b3bd781c --- /dev/null +++ b/pkg/core/lock/lock.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "time" +) + +// Lock defines the distributed lock interface +// This abstraction allows for multiple implementations (GORM, Redis, etcd, etc.) +type Lock interface { + // Lock acquires a distributed lock, blocking until successful or context cancelled + Lock(ctx context.Context, key string, ttl time.Duration) error + + // TryLock attempts to acquire a lock without blocking + // Returns true if lock was acquired, false otherwise + TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) + + // Unlock releases a lock held by this instance + Unlock(ctx context.Context, key string) error + + // Renew extends the TTL of a lock held by this instance + Renew(ctx context.Context, key string, ttl time.Duration) error + + // IsLocked checks if a lock is currently held by anyone + IsLocked(ctx context.Context, key string) (bool, error) + + // WithLock executes a function while holding a lock + // Automatically acquires the lock, executes the function, and releases the lock + WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error + + // CleanupExpiredLocks removes expired locks (maintenance task) + CleanupExpiredLocks(ctx context.Context) error +} diff --git a/pkg/core/lock/lock_model.go b/pkg/core/lock/lock_model.go new file mode 100644 index 000000000..797f5df5e --- /dev/null +++ b/pkg/core/lock/lock_model.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "time" +) + +// LockRecord represents a distributed lock record in the database +type LockRecord struct { + ID uint `gorm:"primarykey"` + LockKey string `gorm:"uniqueIndex;size:255;not null"` // Unique lock identifier + Owner string `gorm:"size:255;not null"` // UUID of the lock holder + ExpireAt time.Time `gorm:"index;not null"` // Lock expiration time + CreatedAt time.Time `gorm:"autoCreateTime"` // Lock creation time + UpdatedAt time.Time `gorm:"autoUpdateTime"` // Last renewal time +} + +// TableName returns the table name for LockRecord +func (LockRecord) TableName() string { + return "distributed_locks" +} From 64929e87bcfd485427986c21da461dc3c1d7fa59 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Wed, 10 Dec 2025 11:55:19 +0800 Subject: [PATCH 02/11] fix --- pkg/console/service/condition_rule.go | 2 - pkg/console/service/configurator_rule.go | 2 +- pkg/console/service/tag_rule.go | 31 +-- pkg/core/lock/component.go | 46 ++-- pkg/core/lock/const.go | 40 +++ pkg/core/lock/gorm_lock.go | 75 +++--- pkg/core/lock/gorm_lock_test.go | 329 +++++++++++++++++++++++ pkg/core/lock/key.go | 44 +++ pkg/core/store/component.go | 17 ++ 9 files changed, 509 insertions(+), 77 deletions(-) create mode 100644 pkg/core/lock/const.go create mode 100644 pkg/core/lock/gorm_lock_test.go create mode 100644 pkg/core/lock/key.go diff --git a/pkg/console/service/condition_rule.go b/pkg/console/service/condition_rule.go index ff295b1c2..43532e409 100644 --- a/pkg/console/service/condition_rule.go +++ b/pkg/console/service/condition_rule.go @@ -96,7 +96,6 @@ func updateConditionRuleUnsafe(ctx context.Context, name string, res *meshresour logger.Warnf("update %s condition failed with error: %s", name, err.Error()) return err } - logger.Infof("Condition route %s updated successfully", name) return nil } @@ -114,7 +113,6 @@ func CreateConditionRule(ctx context.Context, name string, res *meshresource.Con }) } -// createConditionRuleUnsafe performs the actual creation without lock protection func createConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { if err := ctx.ResourceManager().Add(res); err != nil { logger.Warnf("create %s condition failed with error: %s", name, err.Error()) diff --git a/pkg/console/service/configurator_rule.go b/pkg/console/service/configurator_rule.go index 2ff8599b4..2548a1629 100644 --- a/pkg/console/service/configurator_rule.go +++ b/pkg/console/service/configurator_rule.go @@ -95,7 +95,7 @@ func DeleteConfigurator(ctx consolectx.Context, name string, mesh string) error return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil { - logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) + logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) return err } return nil diff --git a/pkg/console/service/tag_rule.go b/pkg/console/service/tag_rule.go index 4bc3acf0d..0a3439df6 100644 --- a/pkg/console/service/tag_rule.go +++ b/pkg/console/service/tag_rule.go @@ -18,10 +18,8 @@ package service import ( - "fmt" - "time" - consolectx "github.com/apache/dubbo-admin/pkg/console/context" + "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/manager" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" @@ -41,15 +39,14 @@ func GetTagRule(ctx consolectx.Context, name string, mesh string) (*meshresource } func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { - lock := ctx.LockManager() - if lock == nil { + lockMgr := ctx.LockManager() + if lockMgr == nil { return updateTagRuleUnsafe(ctx, res) } - lockKey := fmt.Sprintf("tag_route:%s:%s", res.Mesh, res.Name) - lockTimeout := 30 * time.Second + lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name) - return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return lockMgr.WithLock(ctx.AppContext(), lockKey, lock.DefaultLockTimeout, func() error { return updateTagRuleUnsafe(ctx, res) }) } @@ -64,15 +61,14 @@ func updateTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResou } func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { - lock := ctx.LockManager() - if lock == nil { + lockMgr := ctx.LockManager() + if lockMgr == nil { return createTagRuleUnsafe(ctx, res) } - lockKey := fmt.Sprintf("tag_route:%s:%s", res.Mesh, res.Name) - lockTimeout := 30 * time.Second + lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name) - return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return lockMgr.WithLock(ctx.AppContext(), lockKey, lock.DefaultLockTimeout, func() error { return createTagRuleUnsafe(ctx, res) }) } @@ -87,15 +83,14 @@ func createTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResou } func DeleteTagRule(ctx consolectx.Context, name string, mesh string) error { - lock := ctx.LockManager() - if lock == nil { + lockMgr := ctx.LockManager() + if lockMgr == nil { return ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) } - lockKey := fmt.Sprintf("tag_route:%s:%s", mesh, name) - lockTimeout := 30 * time.Second + lockKey := lock.BuildTagRouteLockKey(mesh, name) - return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return lockMgr.WithLock(ctx.AppContext(), lockKey, lock.DefaultLockTimeout, func() error { err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) if err != nil { logger.Warnf("delete tag rule %s error: %v", name, err) diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go index 43416975f..0523ec32b 100644 --- a/pkg/core/lock/component.go +++ b/pkg/core/lock/component.go @@ -19,13 +19,14 @@ package lock import ( "context" + "gorm.io/gorm" + "math" "time" "github.com/pkg/errors" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/runtime" - "github.com/apache/dubbo-admin/pkg/store/dbcommon" ) const ( @@ -49,48 +50,47 @@ func (c *Component) Type() runtime.ComponentType { } // Order indicates the initialization order -// Lock should be initialized after Store (Order 100) but before other services +// Lock should be initialized after Store (Order math.MaxInt - 1) +// Higher order values are initialized first, so we use math.MaxInt - 2 func (c *Component) Order() int { - return 90 // After Store, before Console + return math.MaxInt - 2 // After Store, before other services } // Init initializes the distributed lock component func (c *Component) Init(ctx runtime.BuilderContext) error { - // Get the store component to access connection pool + // Get the store component to access database connection storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) if err != nil { return err } - // Try to extract connection pool from store component - // We need to use type assertion with the proper interface - type ConnectionPoolProvider interface { - GetConnectionPool() *dbcommon.ConnectionPool + // Try to extract database connection from store component + // We use GetDB() interface to avoid circular dependency with dbcommon package + type DBProvider interface { + GetDB() *gorm.DB } - storeWithPool, ok := storeComp.(ConnectionPoolProvider) + storeWithDB, ok := storeComp.(DBProvider) if !ok { - // For memory store or other stores without connection pool - logger.Warnf("Store component does not provide connection pool, distributed lock will not be available") + // For memory store or other stores without database + logger.Warnf("Store component does not provide database connection, distributed lock will not be available") return nil } - pool := storeWithPool.GetConnectionPool() - if pool == nil { - logger.Warnf("Connection pool is nil, distributed lock will not be available") + db := storeWithDB.GetDB() + if db == nil { + logger.Warnf("Database connection is nil, distributed lock will not be available") return nil } - // Create GORM-based lock implementation using NewGormLock - c.lock = NewGormLock(pool) + // Create GORM-based lock implementation using NewGormLockFromDB + c.lock = NewGormLockFromDB(db) // Initialize the lock table - db := pool.GetDB() if err := db.AutoMigrate(&LockRecord{}); err != nil { return errors.Wrap(err, "failed to migrate lock table") } - logger.Info("Distributed lock component initialized successfully") return nil } @@ -102,20 +102,17 @@ func (c *Component) Start(rt runtime.Runtime, stop <-chan struct{}) error { } // Start background cleanup task - ticker := time.NewTicker(5 * time.Minute) // Cleanup every 5 minutes + ticker := time.NewTicker(DefaultCleanupInterval) // Cleanup every 5 minutes defer ticker.Stop() - logger.Info("Distributed lock cleanup task started") - for { select { case <-stop: - logger.Info("Distributed lock cleanup task stopped") return nil case <-ticker.C: - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), DefaultCleanupTimeout) if err := c.lock.CleanupExpiredLocks(ctx); err != nil { - logger.Errorf("Failed to cleanup expired locks: %v", err) + logger.Errorf("Failed to cleanup expired locks: %v", err) } cancel() } @@ -136,7 +133,6 @@ func GetLockFromRuntime(rt runtime.Runtime) (Lock, error) { lockComp, ok := comp.(*Component) if !ok { - // 修正:使用标准错误处理 return nil, errors.Errorf("component %s is not a valid lock component", DistributedLockComponent) } diff --git a/pkg/core/lock/const.go b/pkg/core/lock/const.go new file mode 100644 index 000000000..ecf5d7308 --- /dev/null +++ b/pkg/core/lock/const.go @@ -0,0 +1,40 @@ +package lock + +import "time" + +const ( + // DefaultLockTimeout is the default timeout for distributed lock operations + // This timeout applies to lock acquisition, renewal, and release operations + DefaultLockTimeout = 30 * time.Second + + // DefaultAutoRenewThreshold is the TTL threshold above which auto-renewal is enabled + // Locks with TTL longer than this value will be automatically renewed + DefaultAutoRenewThreshold = 10 * time.Second + + // DefaultUnlockTimeout is the timeout for unlock operations in deferred cleanup + DefaultUnlockTimeout = 5 * time.Second + + // DefaultRenewTimeout is the timeout for lock renewal operations + DefaultRenewTimeout = 5 * time.Second + + // DefaultLockRetryInterval is the interval between lock acquisition retry attempts + DefaultLockRetryInterval = 100 * time.Millisecond + + // DefaultCleanupInterval is the interval for periodic expired lock cleanup + DefaultCleanupInterval = 5 * time.Minute + + // DefaultCleanupTimeout is the timeout for cleanup operations + DefaultCleanupTimeout = 30 * time.Second +) + +// Lock key prefixes for different resource types +const ( + // TagRouteKeyPrefix is the prefix for tag route lock keys + TagRouteKeyPrefix = "tag_route" + + // ConfiguratorRuleKeyPrefix is the prefix for configurator rule lock keys + ConfiguratorRuleKeyPrefix = "configurator_rule" + + // ConditionRuleKeyPrefix is the prefix for condition rule lock keys + ConditionRuleKeyPrefix = "condition_rule" +) diff --git a/pkg/core/lock/gorm_lock.go b/pkg/core/lock/gorm_lock.go index 14fe27581..c4e316884 100644 --- a/pkg/core/lock/gorm_lock.go +++ b/pkg/core/lock/gorm_lock.go @@ -37,21 +37,44 @@ var _ Lock = (*GormLock)(nil) // It uses GORM for database operations and supports MySQL, PostgreSQL, etc. type GormLock struct { pool *dbcommon.ConnectionPool - owner string // Unique identifier for this lock instance + db *gorm.DB // Direct DB reference to avoid circular dependency + owner string // Unique identifier for this lock instance } // NewGormLock creates a new GORM-based distributed lock instance +// Deprecated: Use NewGormLockFromDB to avoid circular dependencies func NewGormLock(pool *dbcommon.ConnectionPool) Lock { return &GormLock{ pool: pool, + db: pool.GetDB(), owner: uuid.New().String(), } } +// NewGormLockFromDB creates a new GORM-based distributed lock instance from a DB connection +// This is the preferred constructor to avoid circular dependencies +func NewGormLockFromDB(db *gorm.DB) Lock { + return &GormLock{ + db: db, + owner: uuid.New().String(), + } +} + +// getDB returns the database instance, preferring direct DB over pool +func (g *GormLock) getDB() *gorm.DB { + if g.db != nil { + return g.db + } + if g.pool != nil { + return g.pool.GetDB() + } + return nil +} + // Lock acquires a lock with the specified key and TTL // It blocks until the lock is acquired or context is cancelled func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(DefaultLockRetryInterval) defer ticker.Stop() for { @@ -74,7 +97,7 @@ func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) erro // TryLock attempts to acquire a lock without blocking // Returns true if lock was acquired, false otherwise func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { - db := g.pool.GetDB().WithContext(ctx) + db := g.getDB().WithContext(ctx) expireAt := time.Now().Add(ttl) var acquired bool @@ -86,7 +109,7 @@ func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) ( return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) } - // Try to acquire lock using INSERT ... ON CONFLICT + // Try to acquire lock using INSERT ... ON CONFLICT lock := &LockRecord{ LockKey: key, Owner: g.owner, @@ -100,13 +123,13 @@ func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) ( }).Create(lock) if result.Error != nil { - return fmt.Errorf("failed to insert lock record: %w", result.Error) + return fmt.Errorf("failed to insert lock record: %w", result.Error) } // Check if we got the lock by verifying the owner var existingLock LockRecord - if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { - return fmt.Errorf("failed to verify lock ownership: %w", err) + if err := tx.Where("lock_key = ?", key).First(&existingLock).Error; err != nil { + return fmt.Errorf("failed to verify lock ownership: %w", err) } // Determine if we acquired the lock @@ -118,16 +141,12 @@ func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) ( return false, err } - if acquired { - logger.Debugf("Lock acquired: key=%s, owner=%s, ttl=%v", key, g.owner, ttl) - } - return acquired, nil } // Unlock releases a lock held by this instance func (g *GormLock) Unlock(ctx context.Context, key string) error { - db := g.pool.GetDB().WithContext(ctx) + db := g.getDB().WithContext(ctx) result := db.Where("lock_key = ? AND owner = ?", key, g.owner). Delete(&LockRecord{}) @@ -140,13 +159,12 @@ func (g *GormLock) Unlock(ctx context.Context, key string) error { return ErrLockNotHeld } - logger.Debugf("Lock released: key=%s, owner=%s", key, g.owner) return nil } // Renew extends the TTL of a lock held by this instance func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) error { - db := g.pool.GetDB().WithContext(ctx) + db := g.getDB().WithContext(ctx) newExpireAt := time.Now().Add(ttl) result := db.Model(&LockRecord{}). @@ -161,13 +179,12 @@ func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) err return ErrLockNotHeld } - logger.Debugf("Lock renewed: key=%s, owner=%s, new_expire_at=%v", key, g.owner, newExpireAt) return nil } // IsLocked checks if a lock is currently held (by anyone) func (g *GormLock) IsLocked(ctx context.Context, key string) (bool, error) { - db := g.pool.GetDB().WithContext(ctx) + db := g.getDB().WithContext(ctx) var count int64 err := db.Model(&LockRecord{}). @@ -182,8 +199,6 @@ func (g *GormLock) IsLocked(ctx context.Context, key string) (bool, error) { } // WithLock executes a function while holding a lock -// It automatically acquires the lock, executes the function, and releases the lock -// If TTL is longer than 10 seconds, it will automatically renew the lock until the function completes func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error { // Acquire lock if err := g.Lock(ctx, key, ttl); err != nil { @@ -193,7 +208,7 @@ func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, // Ensure lock is released defer func() { // Use background context for unlock to ensure it completes even if ctx is cancelled - unlockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + unlockCtx, cancel := context.WithTimeout(context.Background(), DefaultUnlockTimeout) defer cancel() if err := g.Unlock(unlockCtx, key); err != nil { @@ -203,7 +218,7 @@ func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, // Start auto-renewal if TTL is long enough var renewDone chan struct{} - if ttl > 10*time.Second { + if ttl > DefaultAutoRenewThreshold { renewDone = make(chan struct{}) go g.autoRenew(ctx, key, ttl, renewDone) defer close(renewDone) @@ -220,25 +235,27 @@ func (g *GormLock) autoRenew(ctx context.Context, key string, ttl time.Duration, ticker := time.NewTicker(renewInterval) defer ticker.Stop() - logger.Debugf("Auto-renewal started for lock %s (interval: %v)", key, renewInterval) - for { select { case <-done: - logger.Debugf("Auto-renewal stopped for lock %s (done signal)", key) return case <-ctx.Done(): - logger.Debugf("Auto-renewal stopped for lock %s (context cancelled)", key) return case <-ticker.C: - renewCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // Double-check done channel before renewing to avoid unnecessary renewal + select { + case <-done: + return + default: + } + + renewCtx, cancel := context.WithTimeout(context.Background(), DefaultRenewTimeout) if err := g.Renew(renewCtx, key, ttl); err != nil { logger.Warnf("Failed to renew lock %s: %v", key, err) cancel() return } cancel() - logger.Debugf("Lock %s renewed successfully", key) } } } @@ -246,16 +263,12 @@ func (g *GormLock) autoRenew(ctx context.Context, key string, ttl time.Duration, // CleanupExpiredLocks removes all expired locks from the database // This should be called periodically as a maintenance task func (g *GormLock) CleanupExpiredLocks(ctx context.Context) error { - db := g.pool.GetDB().WithContext(ctx) + db := g.getDB().WithContext(ctx) result := db.Where("expire_at < ?", time.Now()).Delete(&LockRecord{}) if result.Error != nil { return fmt.Errorf("failed to cleanup expired locks: %w", result.Error) } - if result.RowsAffected > 0 { - logger.Infof("Cleaned up %d expired locks", result.RowsAffected) - } - return nil } diff --git a/pkg/core/lock/gorm_lock_test.go b/pkg/core/lock/gorm_lock_test.go new file mode 100644 index 000000000..f40403d81 --- /dev/null +++ b/pkg/core/lock/gorm_lock_test.go @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/lock" +) + +func setupTestDB(t *testing.T) *gorm.DB { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err, "failed to create test database") + + err = db.AutoMigrate(&lock.LockRecord{}) + require.NoError(t, err, "failed to migrate lock table") + + return db +} + +func TestBasicLockUnlock(t *testing.T) { + db := setupTestDB(t) + lockInstance := lock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lockInstance.Lock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err, "should acquire lock successfully") + + isLocked, err := lockInstance.IsLocked(ctx, "test-key") + assert.NoError(t, err) + assert.True(t, isLocked, "lock should be held") + + err = lockInstance.Unlock(ctx, "test-key") + assert.NoError(t, err, "should release lock successfully") + + isLocked, err = lockInstance.IsLocked(ctx, "test-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released") +} + +func TestTryLock(t *testing.T) { + db := setupTestDB(t) + lock1 := lock.NewGormLockFromDB(db) + lock2 := lock.NewGormLockFromDB(db) + ctx := context.Background() + + acquired, err := lock1.TryLock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err) + assert.True(t, acquired, "first lock should be acquired") + + acquired, err = lock2.TryLock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err) + assert.False(t, acquired, "second lock should not be acquired") + + err = lock1.Unlock(ctx, "test-key") + assert.NoError(t, err) + + acquired, err = lock2.TryLock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err) + assert.True(t, acquired, "second lock should be acquired after first is released") + + _ = lock2.Unlock(ctx, "test-key") +} + +func TestConcurrentLockAttempts(t *testing.T) { + db := setupTestDB(t) + ctx := context.Background() + + const numGoroutines = 10 + var successCount atomic.Int32 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + lockInstance := lock.NewGormLockFromDB(db) + acquired, err := lockInstance.TryLock(ctx, "concurrent-key", 1*time.Second) + if err == nil && acquired { + successCount.Add(1) + time.Sleep(100 * time.Millisecond) // Hold lock briefly + _ = lockInstance.Unlock(ctx, "concurrent-key") + } + }() + } + + wg.Wait() + + assert.Equal(t, int32(1), successCount.Load(), "only one goroutine should acquire the lock") +} + +func TestLockExpiration(t *testing.T) { + db := setupTestDB(t) + lock1 := lock.NewGormLockFromDB(db) + lock2 := lock.NewGormLockFromDB(db) + ctx := context.Background() + + acquired, err := lock1.TryLock(ctx, "expire-key", 100*time.Millisecond) + assert.NoError(t, err) + assert.True(t, acquired) + + acquired, err = lock2.TryLock(ctx, "expire-key", 1*time.Second) + assert.NoError(t, err) + assert.False(t, acquired, "lock should still be held") + + time.Sleep(200 * time.Millisecond) + + acquired, err = lock2.TryLock(ctx, "expire-key", 1*time.Second) + assert.NoError(t, err) + assert.True(t, acquired, "lock should be acquired after expiration") + + _ = lock2.Unlock(ctx, "expire-key") +} + +func TestLockRenewal(t *testing.T) { + db := setupTestDB(t) + lockInstance := lock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lockInstance.Lock(ctx, "renew-key", 1*time.Second) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + err = lockInstance.Renew(ctx, "renew-key", 2*time.Second) + assert.NoError(t, err, "should renew lock successfully") + + isLocked, err := lockInstance.IsLocked(ctx, "renew-key") + assert.NoError(t, err) + assert.True(t, isLocked, "lock should still be held after renewal") + + _ = lockInstance.Unlock(ctx, "renew-key") +} + +func TestUnlockNotHeld(t *testing.T) { + db := setupTestDB(t) + lock1 := lock.NewGormLockFromDB(db) + lock2 := lock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lock1.Lock(ctx, "test-key", 5*time.Second) + require.NoError(t, err) + + err = lock2.Unlock(ctx, "test-key") + assert.ErrorIs(t, err, lock.ErrLockNotHeld, "should return ErrLockNotHeld") + + _ = lock1.Unlock(ctx, "test-key") +} + +func TestRenewNotHeld(t *testing.T) { + db := setupTestDB(t) + lock1 := lock.NewGormLockFromDB(db) + lock2 := lock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lock1.Lock(ctx, "test-key", 5*time.Second) + require.NoError(t, err) + + err = lock2.Renew(ctx, "test-key", 10*time.Second) + assert.ErrorIs(t, err, lock.ErrLockNotHeld, "should return ErrLockNotHeld") + + _ = lock1.Unlock(ctx, "test-key") +} + +func TestWithLock(t *testing.T) { + db := setupTestDB(t) + lockInstance := lock.NewGormLockFromDB(db) + ctx := context.Background() + + executed := false + err := lockInstance.WithLock(ctx, "with-lock-key", 2*time.Second, func() error { + executed = true + isLocked, err := lockInstance.IsLocked(ctx, "with-lock-key") + assert.NoError(t, err) + assert.True(t, isLocked) + return nil + }) + + assert.NoError(t, err) + assert.True(t, executed, "function should be executed") + + time.Sleep(100 * time.Millisecond) + isLocked, err := lockInstance.IsLocked(ctx, "with-lock-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released after WithLock") +} + +func TestWithLockAutoRenewal(t *testing.T) { + db := setupTestDB(t) + lockInstance := lock.NewGormLockFromDB(db) + ctx := context.Background() + + executed := false + err := lockInstance.WithLock(ctx, "auto-renew-key", 15*time.Second, func() error { + time.Sleep(6 * time.Second) + executed = true + return nil + }) + + assert.NoError(t, err) + assert.True(t, executed, "function should be executed") + + time.Sleep(100 * time.Millisecond) + isLocked, err := lockInstance.IsLocked(ctx, "auto-renew-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released after WithLock") +} + +func TestWithLockContextCancellation(t *testing.T) { + db := setupTestDB(t) + lockInstance := lock.NewGormLockFromDB(db) + + ctx, cancel := context.WithCancel(context.Background()) + + started := make(chan struct{}) + err := lockInstance.WithLock(ctx, "cancel-key", 5*time.Second, func() error { + close(started) + cancel() + time.Sleep(100 * time.Millisecond) + return nil + }) + + <-started + + assert.NoError(t, err, "function should complete even if context is cancelled during execution") + + time.Sleep(100 * time.Millisecond) + isLocked, err := lockInstance.IsLocked(context.Background(), "cancel-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released even after context cancellation") +} + +func TestCleanupExpiredLocks(t *testing.T) { + db := setupTestDB(t) + lock1 := lock.NewGormLockFromDB(db) + lock2 := lock.NewGormLockFromDB(db) + ctx := context.Background() + + _, _ = lock1.TryLock(ctx, "cleanup-key-1", 100*time.Millisecond) + _, _ = lock2.TryLock(ctx, "cleanup-key-2", 100*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + + err := lock1.CleanupExpiredLocks(ctx) + assert.NoError(t, err) + + var count int64 + db.Model(&lock.LockRecord{}).Count(&count) + assert.Equal(t, int64(0), count, "all expired locks should be cleaned up") +} + +func TestMultipleDifferentLocks(t *testing.T) { + db := setupTestDB(t) + lockInstance := lock.NewGormLockFromDB(db) + ctx := context.Background() + + err1 := lockInstance.Lock(ctx, "key-1", 5*time.Second) + err2 := lockInstance.Lock(ctx, "key-2", 5*time.Second) + err3 := lockInstance.Lock(ctx, "key-3", 5*time.Second) + + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) + + isLocked1, _ := lockInstance.IsLocked(ctx, "key-1") + isLocked2, _ := lockInstance.IsLocked(ctx, "key-2") + isLocked3, _ := lockInstance.IsLocked(ctx, "key-3") + + assert.True(t, isLocked1) + assert.True(t, isLocked2) + assert.True(t, isLocked3) + + _ = lockInstance.Unlock(ctx, "key-1") + _ = lockInstance.Unlock(ctx, "key-2") + _ = lockInstance.Unlock(ctx, "key-3") +} + +func TestLockBlockingBehavior(t *testing.T) { + db := setupTestDB(t) + lock1 := lock.NewGormLockFromDB(db) + lock2 := lock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lock1.Lock(ctx, "blocking-key", 2*time.Second) + require.NoError(t, err) + + acquiredTime := time.Now() + var lock2AcquiredTime time.Time + + go func() { + _ = lock2.Lock(ctx, "blocking-key", 5*time.Second) + lock2AcquiredTime = time.Now() + }() + + time.Sleep(1 * time.Second) + _ = lock1.Unlock(ctx, "blocking-key") + + time.Sleep(500 * time.Millisecond) + + duration := lock2AcquiredTime.Sub(acquiredTime) + assert.GreaterOrEqual(t, duration, 1*time.Second, "lock2 should acquire after lock1 releases") + assert.Less(t, duration, 2*time.Second, "lock2 should acquire shortly after lock1 releases") + + _ = lock2.Unlock(ctx, "blocking-key") +} diff --git a/pkg/core/lock/key.go b/pkg/core/lock/key.go new file mode 100644 index 000000000..3f8475247 --- /dev/null +++ b/pkg/core/lock/key.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import "fmt" + +// BuildLockKey constructs a lock key from a prefix and parts +func BuildLockKey(prefix string, parts ...string) string { + key := prefix + for _, part := range parts { + key += ":" + part + } + return key +} + +// BuildTagRouteLockKey constructs a lock key for tag route operations +func BuildTagRouteLockKey(mesh, name string) string { + return fmt.Sprintf("%s:%s:%s", TagRouteKeyPrefix, mesh, name) +} + +// BuildConfiguratorRuleLockKey constructs a lock key for configurator rule operations +func BuildConfiguratorRuleLockKey(mesh, name string) string { + return fmt.Sprintf("%s:%s:%s", ConfiguratorRuleKeyPrefix, mesh, name) +} + +// BuildConditionRuleLockKey constructs a lock key for condition rule operations +func BuildConditionRuleLockKey(mesh, name string) string { + return fmt.Sprintf("%s:%s:%s", ConditionRuleKeyPrefix, mesh, name) +} diff --git a/pkg/core/store/component.go b/pkg/core/store/component.go index 6eadc7f29..0320e426c 100644 --- a/pkg/core/store/component.go +++ b/pkg/core/store/component.go @@ -19,6 +19,7 @@ package store import ( "fmt" + "gorm.io/gorm" "math" coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" @@ -109,3 +110,19 @@ func (sc *storeComponent) ResourceKindRoute(k coremodel.ResourceKind) (ResourceS return nil, fmt.Errorf("%s is not supported by store yet", k) } + +func (sc *storeComponent) GetDB() *gorm.DB { + type dbGetter interface { + GetDB() *gorm.DB + } + + for _, store := range sc.stores { + if dg, ok := store.(dbGetter); ok { + if db := dg.GetDB(); db != nil { + return db + } + } + } + + return nil +} From c83ecf6bef679e506468adefc6f19646462efddc Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Wed, 10 Dec 2025 11:58:46 +0800 Subject: [PATCH 03/11] add license header --- pkg/core/lock/const.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/core/lock/const.go b/pkg/core/lock/const.go index ecf5d7308..3439eee67 100644 --- a/pkg/core/lock/const.go +++ b/pkg/core/lock/const.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package lock import "time" From bac7de451a101eced7883d6e23201529061d88ab Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Sat, 20 Dec 2025 18:39:03 +0800 Subject: [PATCH 04/11] add err code --- pkg/common/bizerror/error.go | 2 ++ pkg/core/lock/errors.go | 28 ---------------------------- pkg/core/lock/factory.go | 1 + 3 files changed, 3 insertions(+), 28 deletions(-) delete mode 100644 pkg/core/lock/errors.go create mode 100644 pkg/core/lock/factory.go diff --git a/pkg/common/bizerror/error.go b/pkg/common/bizerror/error.go index 67764b3bb..b130607dd 100644 --- a/pkg/common/bizerror/error.go +++ b/pkg/common/bizerror/error.go @@ -40,6 +40,8 @@ const ( NacosError ErrorCode = "NacosError" ZKError ErrorCode = "ZKError" EventError ErrorCode = "EventError" + LockNotHeld ErrorCode = "LockNotHeld" + LockExpired ErrorCode = "LockExpired" ) type bizError struct { diff --git a/pkg/core/lock/errors.go b/pkg/core/lock/errors.go deleted file mode 100644 index dc7914276..000000000 --- a/pkg/core/lock/errors.go +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package lock - -import "errors" - -var ( - // ErrLockNotHeld is returned when trying to release a lock that is not held - ErrLockNotHeld = errors.New("lock not held by this owner") - - // ErrLockExpired is returned when the lock has expired - ErrLockExpired = errors.New("lock has expired") -) diff --git a/pkg/core/lock/factory.go b/pkg/core/lock/factory.go new file mode 100644 index 000000000..371688338 --- /dev/null +++ b/pkg/core/lock/factory.go @@ -0,0 +1 @@ +package lock From a7647f453e573edc7e8304a484ea85b19550fc94 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Sat, 20 Dec 2025 18:39:34 +0800 Subject: [PATCH 05/11] fix --- pkg/core/lock/component.go | 29 +++++++++++++---------------- pkg/core/lock/factory.go | 31 +++++++++++++++++++++++++++++++ pkg/core/lock/gorm_lock.go | 5 +++-- pkg/core/lock/gorm_lock_test.go | 5 +++-- 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go index 0523ec32b..2f82e83bf 100644 --- a/pkg/core/lock/component.go +++ b/pkg/core/lock/component.go @@ -19,7 +19,6 @@ package lock import ( "context" - "gorm.io/gorm" "math" "time" @@ -64,31 +63,29 @@ func (c *Component) Init(ctx runtime.BuilderContext) error { return err } - // Try to extract database connection from store component - // We use GetDB() interface to avoid circular dependency with dbcommon package - type DBProvider interface { - GetDB() *gorm.DB + // Try to extract data store interface from store component + type DataStore interface { + GetDataStore() any } - storeWithDB, ok := storeComp.(DBProvider) + store, ok := storeComp.(DataStore) if !ok { // For memory store or other stores without database - logger.Warnf("Store component does not provide database connection, distributed lock will not be available") + logger.Warnf("Store component does not provide data store interface, distributed lock will not be available") return nil } - db := storeWithDB.GetDB() - if db == nil { - logger.Warnf("Database connection is nil, distributed lock will not be available") + dataStore := store.GetDataStore() + if dataStore == nil { + logger.Warnf("Data store is nil, distributed lock will not be available") return nil } - // Create GORM-based lock implementation using NewGormLockFromDB - c.lock = NewGormLockFromDB(db) - - // Initialize the lock table - if err := db.AutoMigrate(&LockRecord{}); err != nil { - return errors.Wrap(err, "failed to migrate lock table") + // Create lock implementation based on the data store + c.lock = NewLockFromDataStore(dataStore) + if c.lock == nil { + logger.Warnf("Cannot create distributed lock from data store, distributed lock will not be available") + return nil } return nil diff --git a/pkg/core/lock/factory.go b/pkg/core/lock/factory.go index 371688338..4d974a5a5 100644 --- a/pkg/core/lock/factory.go +++ b/pkg/core/lock/factory.go @@ -1 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package lock + +import ( + "gorm.io/gorm" +) + +// NewLockFromDataStore creates a lock implementation based on the data store type +func NewLockFromDataStore(dataStore any) Lock { + // Try GORM database first + if db, ok := dataStore.(*gorm.DB); ok { + return NewGormLockFromDB(db) + } + + return nil +} diff --git a/pkg/core/lock/gorm_lock.go b/pkg/core/lock/gorm_lock.go index c4e316884..eb4a47462 100644 --- a/pkg/core/lock/gorm_lock.go +++ b/pkg/core/lock/gorm_lock.go @@ -26,6 +26,7 @@ import ( "gorm.io/gorm" "gorm.io/gorm/clause" + "github.com/apache/dubbo-admin/pkg/common/bizerror" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/store/dbcommon" ) @@ -156,7 +157,7 @@ func (g *GormLock) Unlock(ctx context.Context, key string) error { } if result.RowsAffected == 0 { - return ErrLockNotHeld + return bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner") } return nil @@ -176,7 +177,7 @@ func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) err } if result.RowsAffected == 0 { - return ErrLockNotHeld + return bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner") } return nil diff --git a/pkg/core/lock/gorm_lock_test.go b/pkg/core/lock/gorm_lock_test.go index f40403d81..fc0cc93c2 100644 --- a/pkg/core/lock/gorm_lock_test.go +++ b/pkg/core/lock/gorm_lock_test.go @@ -29,6 +29,7 @@ import ( "gorm.io/driver/sqlite" "gorm.io/gorm" + "github.com/apache/dubbo-admin/pkg/common/bizerror" "github.com/apache/dubbo-admin/pkg/core/lock" ) @@ -166,7 +167,7 @@ func TestUnlockNotHeld(t *testing.T) { require.NoError(t, err) err = lock2.Unlock(ctx, "test-key") - assert.ErrorIs(t, err, lock.ErrLockNotHeld, "should return ErrLockNotHeld") + assert.ErrorIs(t, err, bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner"), "should return ErrLockNotHeld") _ = lock1.Unlock(ctx, "test-key") } @@ -181,7 +182,7 @@ func TestRenewNotHeld(t *testing.T) { require.NoError(t, err) err = lock2.Renew(ctx, "test-key", 10*time.Second) - assert.ErrorIs(t, err, lock.ErrLockNotHeld, "should return ErrLockNotHeld") + assert.ErrorIs(t, err, bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner"), "should return ErrLockNotHeld") _ = lock1.Unlock(ctx, "test-key") } From 9badfeff1c8e9dd6f0c9d45236af5baabc444fc7 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Sat, 20 Dec 2025 21:14:09 +0800 Subject: [PATCH 06/11] fix --- .../const.go => common/constants/lock.go} | 2 +- pkg/console/service/tag_rule.go | 7 ++- pkg/core/lock/component.go | 5 +- pkg/core/lock/gorm_lock.go | 13 +++-- pkg/core/lock/gorm_lock_test.go | 58 +++++++++++++++---- pkg/core/lock/key.go | 12 ++-- 6 files changed, 69 insertions(+), 28 deletions(-) rename pkg/{core/lock/const.go => common/constants/lock.go} (99%) diff --git a/pkg/core/lock/const.go b/pkg/common/constants/lock.go similarity index 99% rename from pkg/core/lock/const.go rename to pkg/common/constants/lock.go index 3439eee67..30a9f4049 100644 --- a/pkg/core/lock/const.go +++ b/pkg/common/constants/lock.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package lock +package constants import "time" diff --git a/pkg/console/service/tag_rule.go b/pkg/console/service/tag_rule.go index 0a3439df6..7178d32be 100644 --- a/pkg/console/service/tag_rule.go +++ b/pkg/console/service/tag_rule.go @@ -18,6 +18,7 @@ package service import ( + "github.com/apache/dubbo-admin/pkg/common/constants" consolectx "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/core/logger" @@ -46,7 +47,7 @@ func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name) - return lockMgr.WithLock(ctx.AppContext(), lockKey, lock.DefaultLockTimeout, func() error { + return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error { return updateTagRuleUnsafe(ctx, res) }) } @@ -68,7 +69,7 @@ func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name) - return lockMgr.WithLock(ctx.AppContext(), lockKey, lock.DefaultLockTimeout, func() error { + return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error { return createTagRuleUnsafe(ctx, res) }) } @@ -90,7 +91,7 @@ func DeleteTagRule(ctx consolectx.Context, name string, mesh string) error { lockKey := lock.BuildTagRouteLockKey(mesh, name) - return lockMgr.WithLock(ctx.AppContext(), lockKey, lock.DefaultLockTimeout, func() error { + return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error { err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) if err != nil { logger.Warnf("delete tag rule %s error: %v", name, err) diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go index 2f82e83bf..4dd3e9d51 100644 --- a/pkg/core/lock/component.go +++ b/pkg/core/lock/component.go @@ -24,6 +24,7 @@ import ( "github.com/pkg/errors" + "github.com/apache/dubbo-admin/pkg/common/constants" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/runtime" ) @@ -99,7 +100,7 @@ func (c *Component) Start(rt runtime.Runtime, stop <-chan struct{}) error { } // Start background cleanup task - ticker := time.NewTicker(DefaultCleanupInterval) // Cleanup every 5 minutes + ticker := time.NewTicker(constants.DefaultCleanupInterval) // Cleanup every 5 minutes defer ticker.Stop() for { @@ -107,7 +108,7 @@ func (c *Component) Start(rt runtime.Runtime, stop <-chan struct{}) error { case <-stop: return nil case <-ticker.C: - ctx, cancel := context.WithTimeout(context.Background(), DefaultCleanupTimeout) + ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultCleanupTimeout) if err := c.lock.CleanupExpiredLocks(ctx); err != nil { logger.Errorf("Failed to cleanup expired locks: %v", err) } diff --git a/pkg/core/lock/gorm_lock.go b/pkg/core/lock/gorm_lock.go index eb4a47462..93f5f383a 100644 --- a/pkg/core/lock/gorm_lock.go +++ b/pkg/core/lock/gorm_lock.go @@ -27,6 +27,7 @@ import ( "gorm.io/gorm/clause" "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/store/dbcommon" ) @@ -75,7 +76,7 @@ func (g *GormLock) getDB() *gorm.DB { // Lock acquires a lock with the specified key and TTL // It blocks until the lock is acquired or context is cancelled func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { - ticker := time.NewTicker(DefaultLockRetryInterval) + ticker := time.NewTicker(constants.DefaultLockRetryInterval) defer ticker.Stop() for { @@ -157,7 +158,7 @@ func (g *GormLock) Unlock(ctx context.Context, key string) error { } if result.RowsAffected == 0 { - return bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner") + return bizerror.New(bizerror.LockNotHeld, "lock not held by this owner") } return nil @@ -177,7 +178,7 @@ func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) err } if result.RowsAffected == 0 { - return bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner") + return bizerror.New(bizerror.LockNotHeld, "lock not held by this owner") } return nil @@ -209,7 +210,7 @@ func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, // Ensure lock is released defer func() { // Use background context for unlock to ensure it completes even if ctx is cancelled - unlockCtx, cancel := context.WithTimeout(context.Background(), DefaultUnlockTimeout) + unlockCtx, cancel := context.WithTimeout(context.Background(), constants.DefaultUnlockTimeout) defer cancel() if err := g.Unlock(unlockCtx, key); err != nil { @@ -219,7 +220,7 @@ func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, // Start auto-renewal if TTL is long enough var renewDone chan struct{} - if ttl > DefaultAutoRenewThreshold { + if ttl > constants.DefaultAutoRenewThreshold { renewDone = make(chan struct{}) go g.autoRenew(ctx, key, ttl, renewDone) defer close(renewDone) @@ -250,7 +251,7 @@ func (g *GormLock) autoRenew(ctx context.Context, key string, ttl time.Duration, default: } - renewCtx, cancel := context.WithTimeout(context.Background(), DefaultRenewTimeout) + renewCtx, cancel := context.WithTimeout(context.Background(), constants.DefaultRenewTimeout) if err := g.Renew(renewCtx, key, ttl); err != nil { logger.Warnf("Failed to renew lock %s: %v", key, err) cancel() diff --git a/pkg/core/lock/gorm_lock_test.go b/pkg/core/lock/gorm_lock_test.go index fc0cc93c2..9dce07ea0 100644 --- a/pkg/core/lock/gorm_lock_test.go +++ b/pkg/core/lock/gorm_lock_test.go @@ -34,9 +34,22 @@ import ( ) func setupTestDB(t *testing.T) *gorm.DB { - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{ + PrepareStmt: false, + }) require.NoError(t, err, "failed to create test database") + sqlDB, err := db.DB() + require.NoError(t, err) + + sqlDB.SetMaxOpenConns(1) + + err = db.Exec("PRAGMA journal_mode=WAL;").Error + require.NoError(t, err, "failed to set WAL mode") + + err = db.Exec("PRAGMA busy_timeout=5000;").Error + require.NoError(t, err, "failed to set busy timeout") + err = db.AutoMigrate(&lock.LockRecord{}) require.NoError(t, err, "failed to migrate lock table") @@ -167,7 +180,13 @@ func TestUnlockNotHeld(t *testing.T) { require.NoError(t, err) err = lock2.Unlock(ctx, "test-key") - assert.ErrorIs(t, err, bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner"), "should return ErrLockNotHeld") + assert.Error(t, err, "should return error") + + // 检查错误类型和错误码 + var bizErr bizerror.Error + if assert.ErrorAs(t, err, &bizErr) { + assert.Equal(t, bizerror.LockNotHeld, bizErr.Code(), "should return LockNotHeld error code") + } _ = lock1.Unlock(ctx, "test-key") } @@ -182,7 +201,12 @@ func TestRenewNotHeld(t *testing.T) { require.NoError(t, err) err = lock2.Renew(ctx, "test-key", 10*time.Second) - assert.ErrorIs(t, err, bizerror.NewBizError(bizerror.LockNotHeld, "lock not held by this owner"), "should return ErrLockNotHeld") + assert.Error(t, err, "should return error") + + var bizErr bizerror.Error + if assert.ErrorAs(t, err, &bizErr) { + assert.Equal(t, bizerror.LockNotHeld, bizErr.Code(), "should return LockNotHeld error code") + } _ = lock1.Unlock(ctx, "test-key") } @@ -306,25 +330,35 @@ func TestLockBlockingBehavior(t *testing.T) { lock2 := lock.NewGormLockFromDB(db) ctx := context.Background() - err := lock1.Lock(ctx, "blocking-key", 2*time.Second) + err := lock1.Lock(ctx, "blocking-key", 10*time.Second) + require.NoError(t, err) + + isLocked, err := lock1.IsLocked(ctx, "blocking-key") require.NoError(t, err) + require.True(t, isLocked) acquiredTime := time.Now() - var lock2AcquiredTime time.Time + done := make(chan time.Time) go func() { - _ = lock2.Lock(ctx, "blocking-key", 5*time.Second) - lock2AcquiredTime = time.Now() + _ = lock2.Lock(ctx, "blocking-key", 10*time.Second) + done <- time.Now() }() - time.Sleep(1 * time.Second) - _ = lock1.Unlock(ctx, "blocking-key") - time.Sleep(500 * time.Millisecond) + unlockErr := lock1.Unlock(ctx, "blocking-key") + require.NoError(t, unlockErr, "unlock should succeed") + + isLocked, err = lock1.IsLocked(ctx, "blocking-key") + require.NoError(t, err) + + lock2AcquiredTime := <-done + duration := lock2AcquiredTime.Sub(acquiredTime) - assert.GreaterOrEqual(t, duration, 1*time.Second, "lock2 should acquire after lock1 releases") - assert.Less(t, duration, 2*time.Second, "lock2 should acquire shortly after lock1 releases") + + assert.GreaterOrEqual(t, duration, 500*time.Millisecond, "lock2 should acquire after lock1 releases") + assert.Less(t, duration, 1500*time.Millisecond, "lock2 should acquire shortly after lock1 releases") _ = lock2.Unlock(ctx, "blocking-key") } diff --git a/pkg/core/lock/key.go b/pkg/core/lock/key.go index 3f8475247..29994e0b8 100644 --- a/pkg/core/lock/key.go +++ b/pkg/core/lock/key.go @@ -17,7 +17,11 @@ package lock -import "fmt" +import ( + "fmt" + + "github.com/apache/dubbo-admin/pkg/common/constants" +) // BuildLockKey constructs a lock key from a prefix and parts func BuildLockKey(prefix string, parts ...string) string { @@ -30,15 +34,15 @@ func BuildLockKey(prefix string, parts ...string) string { // BuildTagRouteLockKey constructs a lock key for tag route operations func BuildTagRouteLockKey(mesh, name string) string { - return fmt.Sprintf("%s:%s:%s", TagRouteKeyPrefix, mesh, name) + return fmt.Sprintf("%s:%s:%s", constants.TagRouteKeyPrefix, mesh, name) } // BuildConfiguratorRuleLockKey constructs a lock key for configurator rule operations func BuildConfiguratorRuleLockKey(mesh, name string) string { - return fmt.Sprintf("%s:%s:%s", ConfiguratorRuleKeyPrefix, mesh, name) + return fmt.Sprintf("%s:%s:%s", constants.ConfiguratorRuleKeyPrefix, mesh, name) } // BuildConditionRuleLockKey constructs a lock key for condition rule operations func BuildConditionRuleLockKey(mesh, name string) string { - return fmt.Sprintf("%s:%s:%s", ConditionRuleKeyPrefix, mesh, name) + return fmt.Sprintf("%s:%s:%s", constants.ConditionRuleKeyPrefix, mesh, name) } From c3e506fd0e0a632b03f3e98571ada84c2938bb74 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Sat, 20 Dec 2025 23:05:13 +0800 Subject: [PATCH 07/11] refactor: improve lock factory design with registry pattern --- pkg/core/lock/component.go | 39 ++++++---------- pkg/core/lock/factory.go | 82 +++++++++++++++++++++++++++++--- pkg/core/lock/gorm_factory.go | 88 +++++++++++++++++++++++++++++++++++ pkg/core/lock/gorm_lock.go | 2 +- pkg/core/lock/init.go | 26 ----------- 5 files changed, 178 insertions(+), 59 deletions(-) create mode 100644 pkg/core/lock/gorm_factory.go delete mode 100644 pkg/core/lock/init.go diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go index 4dd3e9d51..ae9d6f260 100644 --- a/pkg/core/lock/component.go +++ b/pkg/core/lock/component.go @@ -29,6 +29,10 @@ import ( "github.com/apache/dubbo-admin/pkg/core/runtime" ) +func init() { + runtime.RegisterComponent(NewComponent()) +} + const ( // DistributedLockComponent is the component type for distributed lock DistributedLockComponent runtime.ComponentType = "distributed lock" @@ -58,37 +62,22 @@ func (c *Component) Order() int { // Init initializes the distributed lock component func (c *Component) Init(ctx runtime.BuilderContext) error { - // Get the store component to access database connection - storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) + factory, err := LockFactoryRegistry().GetSupportedFactory(ctx) if err != nil { - return err - } - - // Try to extract data store interface from store component - type DataStore interface { - GetDataStore() any - } - - store, ok := storeComp.(DataStore) - if !ok { - // For memory store or other stores without database - logger.Warnf("Store component does not provide data store interface, distributed lock will not be available") + // No supporting factory found + logger.Warnf("No supported lock factory found: %v", err) + logger.Warn("Distributed lock will not be available") return nil } - dataStore := store.GetDataStore() - if dataStore == nil { - logger.Warnf("Data store is nil, distributed lock will not be available") - return nil - } - - // Create lock implementation based on the data store - c.lock = NewLockFromDataStore(dataStore) - if c.lock == nil { - logger.Warnf("Cannot create distributed lock from data store, distributed lock will not be available") - return nil + // Lock created using a factory + lock, err := factory.NewLock(ctx) + if err != nil { + return errors.Wrap(err, "failed to create distributed lock") } + c.lock = lock + logger.Info("Distributed lock component initialized successfully") return nil } diff --git a/pkg/core/lock/factory.go b/pkg/core/lock/factory.go index 4d974a5a5..afa38d8d9 100644 --- a/pkg/core/lock/factory.go +++ b/pkg/core/lock/factory.go @@ -18,15 +18,83 @@ package lock import ( - "gorm.io/gorm" + "fmt" + "github.com/apache/dubbo-admin/pkg/core/runtime" ) -// NewLockFromDataStore creates a lock implementation based on the data store type -func NewLockFromDataStore(dataStore any) Lock { - // Try GORM database first - if db, ok := dataStore.(*gorm.DB); ok { - return NewGormLockFromDB(db) +var registry = newLockFactoryRegistry() + +// RegisterLockFactory registers a Lock factory +func RegisterLockFactory(f Factory) { + registry.Register(f) +} + +// LockFactoryRegistry returns the global factory registry +func LockFactoryRegistry() Registry { + return registry +} + +// Factory defines the factory interface for creating Locks +type Factory interface { + // Support determines whether the factory supports creating a Lock from a given context + // Determines this by inspecting the components in the context + Support(ctx runtime.BuilderContext) bool + + // NewLock creates a Lock instance from the BuilderContext + // The factory decides for itself how to extract dependencies from the context + NewLock(ctx runtime.BuilderContext) (Lock, error) +} + +// Registry defines the query interface for the factory registry +type Registry interface { + // GetSupportedFactory returns the first supported factory + GetSupportedFactory(ctx runtime.BuilderContext) (Factory, error) + // GetAllSupportedFactories returns all supported factories + GetAllSupportedFactories(ctx runtime.BuilderContext) []Factory +} + +// RegistryMutator defines the interface for modifying the factory registry +type RegistryMutator interface { + Register(Factory) +} + +// MutableRegistry combines query and modification interfaces +type MutableRegistry interface { + Registry + RegistryMutator +} + +var _ MutableRegistry = &lockFactoryRegistry{} + +type lockFactoryRegistry struct { + factories []Factory +} + +func newLockFactoryRegistry() MutableRegistry { + return &lockFactoryRegistry{ + factories: make([]Factory, 0), + } +} + +func (r *lockFactoryRegistry) GetSupportedFactory(ctx runtime.BuilderContext) (Factory, error) { + for _, factory := range r.factories { + if factory.Support(ctx) { + return factory, nil + } + } + return nil, fmt.Errorf("no supported lock factory found") +} + +func (r *lockFactoryRegistry) GetAllSupportedFactories(ctx runtime.BuilderContext) []Factory { + supported := make([]Factory, 0) + for _, factory := range r.factories { + if factory.Support(ctx) { + supported = append(supported, factory) + } } + return supported +} - return nil +func (r *lockFactoryRegistry) Register(factory Factory) { + r.factories = append(r.factories, factory) } diff --git a/pkg/core/lock/gorm_factory.go b/pkg/core/lock/gorm_factory.go new file mode 100644 index 000000000..439be6cb0 --- /dev/null +++ b/pkg/core/lock/gorm_factory.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "fmt" + + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/runtime" +) + +func init() { + RegisterLockFactory(&gormLockFactory{}) +} + +type gormLockFactory struct{} + +// Support checks if a GORM Lock can be created from the context. +func (f *gormLockFactory) Support(ctx runtime.BuilderContext) bool { + storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) + if err != nil { + return false + } + + type DataStoreProvider interface { + GetDataStore() any + } + + provider, ok := storeComp.(DataStoreProvider) + if !ok { + return false + } + + dataStore := provider.GetDataStore() + if dataStore == nil { + return false + } + + _, ok = dataStore.(*gorm.DB) + return ok +} + +// NewLock creates a GORM Lock instance +func (f *gormLockFactory) NewLock(ctx runtime.BuilderContext) (Lock, error) { + storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) + if err != nil { + return nil, fmt.Errorf("store component not found: %w", err) + } + + type DataStoreProvider interface { + GetDataStore() any + } + + provider, ok := storeComp.(DataStoreProvider) + if !ok { + return nil, fmt.Errorf("store does not provide data store interface") + } + + dataStore := provider.GetDataStore() + if dataStore == nil { + return nil, fmt.Errorf("data store is nil") + } + + db, ok := dataStore.(*gorm.DB) + if !ok { + return nil, fmt.Errorf("data store is not *gorm.DB (got %T)", dataStore) + } + + logger.Info("Creating GORM-based distributed lock") + return NewGormLockFromDB(db), nil +} diff --git a/pkg/core/lock/gorm_lock.go b/pkg/core/lock/gorm_lock.go index 93f5f383a..498ae4842 100644 --- a/pkg/core/lock/gorm_lock.go +++ b/pkg/core/lock/gorm_lock.go @@ -62,7 +62,7 @@ func NewGormLockFromDB(db *gorm.DB) Lock { } } -// getDB returns the database instance, preferring direct DB over pool +// getDB returns the database instance, to prefer direct DB to pool func (g *GormLock) getDB() *gorm.DB { if g.db != nil { return g.db diff --git a/pkg/core/lock/init.go b/pkg/core/lock/init.go deleted file mode 100644 index 5b05fa2fd..000000000 --- a/pkg/core/lock/init.go +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package lock - -import ( - "github.com/apache/dubbo-admin/pkg/core/runtime" -) - -func init() { - runtime.RegisterComponent(NewComponent()) -} From e6599d1a19495091871571d4a18fd05e1ac808e4 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Mon, 22 Dec 2025 00:47:39 +0800 Subject: [PATCH 08/11] refactor: decouple lock factory from StoreComponent --- pkg/core/lock/gorm_factory.go | 64 ++++++++------------------- pkg/core/store/component.go | 17 ------- pkg/store/dbcommon/connection_pool.go | 18 ++++++++ 3 files changed, 36 insertions(+), 63 deletions(-) diff --git a/pkg/core/lock/gorm_factory.go b/pkg/core/lock/gorm_factory.go index 439be6cb0..eb6d6ee82 100644 --- a/pkg/core/lock/gorm_factory.go +++ b/pkg/core/lock/gorm_factory.go @@ -19,11 +19,10 @@ package lock import ( "fmt" - - "gorm.io/gorm" - + "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" ) func init() { @@ -32,57 +31,30 @@ func init() { type gormLockFactory struct{} -// Support checks if a GORM Lock can be created from the context. +// Support checks if GORM-based lock is supported based on store configuration func (f *gormLockFactory) Support(ctx runtime.BuilderContext) bool { - storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) - if err != nil { - return false - } - - type DataStoreProvider interface { - GetDataStore() any - } - - provider, ok := storeComp.(DataStoreProvider) - if !ok { - return false - } - - dataStore := provider.GetDataStore() - if dataStore == nil { - return false - } - - _, ok = dataStore.(*gorm.DB) - return ok + cfg := ctx.Config().Store + // GORM lock is supported for database-backed stores (mysql, postgres) + return cfg.Type == "mysql" || cfg.Type == "postgres" } -// NewLock creates a GORM Lock instance +// NewLock creates a GORM Lock instance by obtaining DB from dbcommon package func (f *gormLockFactory) NewLock(ctx runtime.BuilderContext) (Lock, error) { - storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) - if err != nil { - return nil, fmt.Errorf("store component not found: %w", err) - } - - type DataStoreProvider interface { - GetDataStore() any - } - - provider, ok := storeComp.(DataStoreProvider) - if !ok { - return nil, fmt.Errorf("store does not provide data store interface") - } + cfg := ctx.Config().Store - dataStore := provider.GetDataStore() - if dataStore == nil { - return nil, fmt.Errorf("data store is nil") + // Get the database connection from dbcommon's global connection pool + // This reuses the existing connection pool created by the store + // but accesses it through the dbcommon package instead of StoreComponent + db := dbcommon.GetGlobalDB(cfg.Type) + if db == nil { + return nil, fmt.Errorf("no database connection found for store type: %s", cfg.Type) } - db, ok := dataStore.(*gorm.DB) - if !ok { - return nil, fmt.Errorf("data store is not *gorm.DB (got %T)", dataStore) + // Auto-migrate lock table + if err := db.AutoMigrate(&LockRecord{}); err != nil { + return nil, fmt.Errorf("failed to migrate lock table: %w", err) } - logger.Info("Creating GORM-based distributed lock") + logger.Info("Creating GORM-based distributed lock using existing database connection") return NewGormLockFromDB(db), nil } diff --git a/pkg/core/store/component.go b/pkg/core/store/component.go index 0320e426c..6eadc7f29 100644 --- a/pkg/core/store/component.go +++ b/pkg/core/store/component.go @@ -19,7 +19,6 @@ package store import ( "fmt" - "gorm.io/gorm" "math" coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model" @@ -110,19 +109,3 @@ func (sc *storeComponent) ResourceKindRoute(k coremodel.ResourceKind) (ResourceS return nil, fmt.Errorf("%s is not supported by store yet", k) } - -func (sc *storeComponent) GetDB() *gorm.DB { - type dbGetter interface { - GetDB() *gorm.DB - } - - for _, store := range sc.stores { - if dg, ok := store.(dbGetter); ok { - if db := dg.GetDB(); db != nil { - return db - } - } - } - - return nil -} diff --git a/pkg/store/dbcommon/connection_pool.go b/pkg/store/dbcommon/connection_pool.go index c9487825d..5a866262f 100644 --- a/pkg/store/dbcommon/connection_pool.go +++ b/pkg/store/dbcommon/connection_pool.go @@ -224,3 +224,21 @@ func (p *ConnectionPool) Stats() sql.DBStats { } return sql.DBStats{} } + +// GetGlobalDB returns the first available gorm.DB instance from the global pool registry +// This is used by components that need database access without going through StoreComponent +// Returns nil if no database connection is available +func GetGlobalDB(storeType storecfg.Type) *gorm.DB { + poolsMutex.RLock() + defer poolsMutex.RUnlock() + + // Find the first pool matching the store type + prefix := string(storeType) + ":" + for key, pool := range pools { + if len(key) >= len(prefix) && key[:len(prefix)] == prefix { + return pool.GetDB() + } + } + + return nil +} From c6f0e0adbb01025168f173f9df0185e535ba7100 Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Mon, 22 Dec 2025 00:59:51 +0800 Subject: [PATCH 09/11] fix ci --- pkg/core/lock/gorm_factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/core/lock/gorm_factory.go b/pkg/core/lock/gorm_factory.go index eb6d6ee82..c0cbda8de 100644 --- a/pkg/core/lock/gorm_factory.go +++ b/pkg/core/lock/gorm_factory.go @@ -19,7 +19,7 @@ package lock import ( "fmt" - + "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/runtime" "github.com/apache/dubbo-admin/pkg/store/dbcommon" From 963cd6109b11311f22ef74e7da1ed33c07961ced Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Thu, 25 Dec 2025 22:26:57 +0800 Subject: [PATCH 10/11] add RequiredDependencies --- pkg/core/lock/component.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go index ae9d6f260..11814d28a 100644 --- a/pkg/core/lock/component.go +++ b/pkg/core/lock/component.go @@ -129,3 +129,9 @@ func GetLockFromRuntime(rt runtime.Runtime) (Lock, error) { return lockComp.GetLock(), nil } + +func (c *Component) RequiredDependencies() []runtime.ComponentType { + return []runtime.ComponentType{ + runtime.ResourceStore, + } +} From fb4a7a966ed40a30303a282bfe6e2fe10d3fbfaf Mon Sep 17 00:00:00 2001 From: everfid-ever <166227111+everfid-ever@users.noreply.github.com> Date: Tue, 30 Dec 2025 21:50:50 +0800 Subject: [PATCH 11/11] refactor --- pkg/core/bootstrap/bootstrap.go | 8 ---- .../gorm_factory.go => lock/gorm/factory.go} | 7 +-- .../lock/gorm_lock.go => lock/gorm/lock.go} | 22 +++++---- .../gorm/lock_test.go} | 46 +++++++++---------- .../lock/lock_model.go => lock/gorm/model.go} | 2 +- 5 files changed, 40 insertions(+), 45 deletions(-) rename pkg/{core/lock/gorm_factory.go => lock/gorm/factory.go} (91%) rename pkg/{core/lock/gorm_lock.go => lock/gorm/lock.go} (94%) rename pkg/{core/lock/gorm_lock_test.go => lock/gorm/lock_test.go} (90%) rename pkg/{core/lock/lock_model.go => lock/gorm/model.go} (99%) diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go index 0b3b07ede..f3ef6f0d4 100644 --- a/pkg/core/bootstrap/bootstrap.go +++ b/pkg/core/bootstrap/bootstrap.go @@ -20,7 +20,6 @@ package bootstrap import ( "context" "fmt" - "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/common/bizerror" "github.com/apache/dubbo-admin/pkg/config/app" @@ -178,10 +177,3 @@ func initAndActivateComponent(builder *runtime.Builder, comp runtime.Component) } return nil } -func initDistributedLock(builder *runtime.Builder) error { - comp, err := runtime.ComponentRegistry().Get(lock.DistributedLockComponent) - if err != nil { - return err - } - return initAndActivateComponent(builder, comp) -} diff --git a/pkg/core/lock/gorm_factory.go b/pkg/lock/gorm/factory.go similarity index 91% rename from pkg/core/lock/gorm_factory.go rename to pkg/lock/gorm/factory.go index c0cbda8de..ff9f81bd1 100644 --- a/pkg/core/lock/gorm_factory.go +++ b/pkg/lock/gorm/factory.go @@ -15,18 +15,19 @@ * limitations under the License. */ -package lock +package gorm import ( "fmt" + "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/runtime" "github.com/apache/dubbo-admin/pkg/store/dbcommon" ) func init() { - RegisterLockFactory(&gormLockFactory{}) + lock.RegisterLockFactory(&gormLockFactory{}) } type gormLockFactory struct{} @@ -39,7 +40,7 @@ func (f *gormLockFactory) Support(ctx runtime.BuilderContext) bool { } // NewLock creates a GORM Lock instance by obtaining DB from dbcommon package -func (f *gormLockFactory) NewLock(ctx runtime.BuilderContext) (Lock, error) { +func (f *gormLockFactory) NewLock(ctx runtime.BuilderContext) (lock.Lock, error) { cfg := ctx.Config().Store // Get the database connection from dbcommon's global connection pool diff --git a/pkg/core/lock/gorm_lock.go b/pkg/lock/gorm/lock.go similarity index 94% rename from pkg/core/lock/gorm_lock.go rename to pkg/lock/gorm/lock.go index 498ae4842..83e1e658d 100644 --- a/pkg/core/lock/gorm_lock.go +++ b/pkg/lock/gorm/lock.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package lock +package gorm import ( "context" @@ -28,12 +28,13 @@ import ( "github.com/apache/dubbo-admin/pkg/common/bizerror" "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/store/dbcommon" ) // Ensure GormLock implements Lock interface -var _ Lock = (*GormLock)(nil) +var _ lock.Lock = (*GormLock)(nil) // GormLock provides distributed locking using database as backend // It uses GORM for database operations and supports MySQL, PostgreSQL, etc. @@ -45,7 +46,7 @@ type GormLock struct { // NewGormLock creates a new GORM-based distributed lock instance // Deprecated: Use NewGormLockFromDB to avoid circular dependencies -func NewGormLock(pool *dbcommon.ConnectionPool) Lock { +func NewGormLock(pool *dbcommon.ConnectionPool) lock.Lock { return &GormLock{ pool: pool, db: pool.GetDB(), @@ -55,7 +56,7 @@ func NewGormLock(pool *dbcommon.ConnectionPool) Lock { // NewGormLockFromDB creates a new GORM-based distributed lock instance from a DB connection // This is the preferred constructor to avoid circular dependencies -func NewGormLockFromDB(db *gorm.DB) Lock { +func NewGormLockFromDB(db *gorm.DB) lock.Lock { return &GormLock{ db: db, owner: uuid.New().String(), @@ -128,14 +129,15 @@ func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) ( return fmt.Errorf("failed to insert lock record: %w", result.Error) } - // Check if we got the lock by verifying the owner - var existingLock LockRecord - if err := tx.Where("lock_key = ?", key).First(&existingLock).Error; err != nil { - return fmt.Errorf("failed to verify lock ownership: %w", err) + // Check if the insertion was successful + if result.RowsAffected == 0 { + // The lock already exists + acquired = false + return nil } - // Determine if we acquired the lock - acquired = existingLock.Owner == g.owner + // New row inserted successfully, lock acquired successfully + acquired = true return nil }) diff --git a/pkg/core/lock/gorm_lock_test.go b/pkg/lock/gorm/lock_test.go similarity index 90% rename from pkg/core/lock/gorm_lock_test.go rename to pkg/lock/gorm/lock_test.go index 9dce07ea0..480579945 100644 --- a/pkg/core/lock/gorm_lock_test.go +++ b/pkg/lock/gorm/lock_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package lock_test +package gorm_test import ( "context" @@ -30,7 +30,7 @@ import ( "gorm.io/gorm" "github.com/apache/dubbo-admin/pkg/common/bizerror" - "github.com/apache/dubbo-admin/pkg/core/lock" + gormlock "github.com/apache/dubbo-admin/pkg/lock/gorm" ) func setupTestDB(t *testing.T) *gorm.DB { @@ -50,7 +50,7 @@ func setupTestDB(t *testing.T) *gorm.DB { err = db.Exec("PRAGMA busy_timeout=5000;").Error require.NoError(t, err, "failed to set busy timeout") - err = db.AutoMigrate(&lock.LockRecord{}) + err = db.AutoMigrate(&gormlock.LockRecord{}) require.NoError(t, err, "failed to migrate lock table") return db @@ -58,7 +58,7 @@ func setupTestDB(t *testing.T) *gorm.DB { func TestBasicLockUnlock(t *testing.T) { db := setupTestDB(t) - lockInstance := lock.NewGormLockFromDB(db) + lockInstance := gormlock.NewGormLockFromDB(db) ctx := context.Background() err := lockInstance.Lock(ctx, "test-key", 5*time.Second) @@ -78,8 +78,8 @@ func TestBasicLockUnlock(t *testing.T) { func TestTryLock(t *testing.T) { db := setupTestDB(t) - lock1 := lock.NewGormLockFromDB(db) - lock2 := lock.NewGormLockFromDB(db) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) ctx := context.Background() acquired, err := lock1.TryLock(ctx, "test-key", 5*time.Second) @@ -112,7 +112,7 @@ func TestConcurrentLockAttempts(t *testing.T) { for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() - lockInstance := lock.NewGormLockFromDB(db) + lockInstance := gormlock.NewGormLockFromDB(db) acquired, err := lockInstance.TryLock(ctx, "concurrent-key", 1*time.Second) if err == nil && acquired { successCount.Add(1) @@ -129,8 +129,8 @@ func TestConcurrentLockAttempts(t *testing.T) { func TestLockExpiration(t *testing.T) { db := setupTestDB(t) - lock1 := lock.NewGormLockFromDB(db) - lock2 := lock.NewGormLockFromDB(db) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) ctx := context.Background() acquired, err := lock1.TryLock(ctx, "expire-key", 100*time.Millisecond) @@ -152,7 +152,7 @@ func TestLockExpiration(t *testing.T) { func TestLockRenewal(t *testing.T) { db := setupTestDB(t) - lockInstance := lock.NewGormLockFromDB(db) + lockInstance := gormlock.NewGormLockFromDB(db) ctx := context.Background() err := lockInstance.Lock(ctx, "renew-key", 1*time.Second) @@ -172,8 +172,8 @@ func TestLockRenewal(t *testing.T) { func TestUnlockNotHeld(t *testing.T) { db := setupTestDB(t) - lock1 := lock.NewGormLockFromDB(db) - lock2 := lock.NewGormLockFromDB(db) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) ctx := context.Background() err := lock1.Lock(ctx, "test-key", 5*time.Second) @@ -193,8 +193,8 @@ func TestUnlockNotHeld(t *testing.T) { func TestRenewNotHeld(t *testing.T) { db := setupTestDB(t) - lock1 := lock.NewGormLockFromDB(db) - lock2 := lock.NewGormLockFromDB(db) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) ctx := context.Background() err := lock1.Lock(ctx, "test-key", 5*time.Second) @@ -213,7 +213,7 @@ func TestRenewNotHeld(t *testing.T) { func TestWithLock(t *testing.T) { db := setupTestDB(t) - lockInstance := lock.NewGormLockFromDB(db) + lockInstance := gormlock.NewGormLockFromDB(db) ctx := context.Background() executed := false @@ -236,7 +236,7 @@ func TestWithLock(t *testing.T) { func TestWithLockAutoRenewal(t *testing.T) { db := setupTestDB(t) - lockInstance := lock.NewGormLockFromDB(db) + lockInstance := gormlock.NewGormLockFromDB(db) ctx := context.Background() executed := false @@ -257,7 +257,7 @@ func TestWithLockAutoRenewal(t *testing.T) { func TestWithLockContextCancellation(t *testing.T) { db := setupTestDB(t) - lockInstance := lock.NewGormLockFromDB(db) + lockInstance := gormlock.NewGormLockFromDB(db) ctx, cancel := context.WithCancel(context.Background()) @@ -281,8 +281,8 @@ func TestWithLockContextCancellation(t *testing.T) { func TestCleanupExpiredLocks(t *testing.T) { db := setupTestDB(t) - lock1 := lock.NewGormLockFromDB(db) - lock2 := lock.NewGormLockFromDB(db) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) ctx := context.Background() _, _ = lock1.TryLock(ctx, "cleanup-key-1", 100*time.Millisecond) @@ -294,13 +294,13 @@ func TestCleanupExpiredLocks(t *testing.T) { assert.NoError(t, err) var count int64 - db.Model(&lock.LockRecord{}).Count(&count) + db.Model(&gormlock.LockRecord{}).Count(&count) assert.Equal(t, int64(0), count, "all expired locks should be cleaned up") } func TestMultipleDifferentLocks(t *testing.T) { db := setupTestDB(t) - lockInstance := lock.NewGormLockFromDB(db) + lockInstance := gormlock.NewGormLockFromDB(db) ctx := context.Background() err1 := lockInstance.Lock(ctx, "key-1", 5*time.Second) @@ -326,8 +326,8 @@ func TestMultipleDifferentLocks(t *testing.T) { func TestLockBlockingBehavior(t *testing.T) { db := setupTestDB(t) - lock1 := lock.NewGormLockFromDB(db) - lock2 := lock.NewGormLockFromDB(db) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) ctx := context.Background() err := lock1.Lock(ctx, "blocking-key", 10*time.Second) diff --git a/pkg/core/lock/lock_model.go b/pkg/lock/gorm/model.go similarity index 99% rename from pkg/core/lock/lock_model.go rename to pkg/lock/gorm/model.go index 797f5df5e..0becabb8f 100644 --- a/pkg/core/lock/lock_model.go +++ b/pkg/lock/gorm/model.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package lock +package gorm import ( "time"