diff --git a/pkg/common/bizerror/error.go b/pkg/common/bizerror/error.go index 67764b3bb..b130607dd 100644 --- a/pkg/common/bizerror/error.go +++ b/pkg/common/bizerror/error.go @@ -40,6 +40,8 @@ const ( NacosError ErrorCode = "NacosError" ZKError ErrorCode = "ZKError" EventError ErrorCode = "EventError" + LockNotHeld ErrorCode = "LockNotHeld" + LockExpired ErrorCode = "LockExpired" ) type bizError struct { diff --git a/pkg/common/constants/lock.go b/pkg/common/constants/lock.go new file mode 100644 index 000000000..30a9f4049 --- /dev/null +++ b/pkg/common/constants/lock.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constants + +import "time" + +const ( + // DefaultLockTimeout is the default timeout for distributed lock operations + // This timeout applies to lock acquisition, renewal, and release operations + DefaultLockTimeout = 30 * time.Second + + // DefaultAutoRenewThreshold is the TTL threshold above which auto-renewal is enabled + // Locks with TTL longer than this value will be automatically renewed + DefaultAutoRenewThreshold = 10 * time.Second + + // DefaultUnlockTimeout is the timeout for unlock operations in deferred cleanup + DefaultUnlockTimeout = 5 * time.Second + + // DefaultRenewTimeout is the timeout for lock renewal operations + DefaultRenewTimeout = 5 * time.Second + + // DefaultLockRetryInterval is the interval between lock acquisition retry attempts + DefaultLockRetryInterval = 100 * time.Millisecond + + // DefaultCleanupInterval is the interval for periodic expired lock cleanup + DefaultCleanupInterval = 5 * time.Minute + + // DefaultCleanupTimeout is the timeout for cleanup operations + DefaultCleanupTimeout = 30 * time.Second +) + +// Lock key prefixes for different resource types +const ( + // TagRouteKeyPrefix is the prefix for tag route lock keys + TagRouteKeyPrefix = "tag_route" + + // ConfiguratorRuleKeyPrefix is the prefix for configurator rule lock keys + ConfiguratorRuleKeyPrefix = "configurator_rule" + + // ConditionRuleKeyPrefix is the prefix for condition rule lock keys + ConditionRuleKeyPrefix = "condition_rule" +) diff --git a/pkg/console/context/context.go b/pkg/console/context/context.go index 9133c4f93..ec6221316 100644 --- a/pkg/console/context/context.go +++ b/pkg/console/context/context.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-admin/pkg/config/app" "github.com/apache/dubbo-admin/pkg/console/counter" + "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/core/manager" "github.com/apache/dubbo-admin/pkg/core/runtime" ) @@ -29,6 +30,7 @@ import ( type Context interface { ResourceManager() manager.ResourceManager CounterManager() counter.CounterManager + LockManager() lock.Lock Config() app.AdminConfig @@ -71,3 +73,11 @@ func (c *context) CounterManager() counter.CounterManager { } return managerComp.CounterManager() } + +func (c *context) LockManager() lock.Lock { + distributedLock, err := lock.GetLockFromRuntime(c.coreRt) + if err != nil { + return nil + } + return distributedLock +} diff --git a/pkg/console/service/condition_rule.go b/pkg/console/service/condition_rule.go index d7890cd6d..43532e409 100644 --- a/pkg/console/service/condition_rule.go +++ b/pkg/console/service/condition_rule.go @@ -18,6 +18,9 @@ package service import ( + "fmt" + "time" + "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/console/model" "github.com/apache/dubbo-admin/pkg/core/logger" @@ -73,6 +76,22 @@ func GetConditionRule(ctx context.Context, name string, mesh string) (*meshresou } func UpdateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + // Lock not available, proceed without lock protection + return updateConditionRuleUnsafe(ctx, name, res) + } + + // Use distributed lock to prevent concurrent modifications + lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return updateConditionRuleUnsafe(ctx, name, res) + }) +} + +func updateConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { if err := ctx.ResourceManager().Update(res); err != nil { logger.Warnf("update %s condition failed with error: %s", name, err.Error()) return err @@ -81,6 +100,20 @@ func UpdateConditionRule(ctx context.Context, name string, res *meshresource.Con } func CreateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + return createConditionRuleUnsafe(ctx, name, res) + } + + lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return createConditionRuleUnsafe(ctx, name, res) + }) +} + +func createConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { if err := ctx.ResourceManager().Add(res); err != nil { logger.Warnf("create %s condition failed with error: %s", name, err.Error()) return err @@ -89,8 +122,20 @@ func CreateConditionRule(ctx context.Context, name string, res *meshresource.Con } func DeleteConditionRule(ctx context.Context, name string, mesh string) error { - if err := ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name)); err != nil { - return err + lock := ctx.LockManager() + if lock == nil { + return ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name)) } - return nil + + lockKey := fmt.Sprintf("condition_route:%s:%s", mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + err := ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name)) + if err != nil { + logger.Warnf("delete %s condition failed with error: %s", name, err.Error()) + return err + } + return nil + }) } diff --git a/pkg/console/service/configurator_rule.go b/pkg/console/service/configurator_rule.go index 1e7b033a9..2548a1629 100644 --- a/pkg/console/service/configurator_rule.go +++ b/pkg/console/service/configurator_rule.go @@ -18,6 +18,9 @@ package service import ( + "fmt" + "time" + consolectx "github.com/apache/dubbo-admin/pkg/console/context" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/manager" @@ -38,6 +41,20 @@ func GetConfigurator(ctx consolectx.Context, name string, mesh string) (*meshres } func UpdateConfigurator(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { + lock := ctx.LockManager() + if lock == nil { + return updateConfiguratorUnsafe(ctx, name, res) + } + + lockKey := fmt.Sprintf("dynamic_config:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return updateConfiguratorUnsafe(ctx, name, res) + }) +} + +func updateConfiguratorUnsafe(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { if err := ctx.ResourceManager().Update(res); err != nil { logger.Warnf("update %s configurator failed with error: %s", name, err.Error()) return err @@ -46,6 +63,20 @@ func UpdateConfigurator(ctx consolectx.Context, name string, res *meshresource.D } func CreateConfigurator(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { + lock := ctx.LockManager() + if lock == nil { + return createConfiguratorUnsafe(ctx, name, res) + } + + lockKey := fmt.Sprintf("dynamic_config:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return createConfiguratorUnsafe(ctx, name, res) + }) +} + +func createConfiguratorUnsafe(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error { if err := ctx.ResourceManager().Add(res); err != nil { logger.Warnf("create %s configurator failed with error: %s", name, err.Error()) return err @@ -54,9 +85,19 @@ func CreateConfigurator(ctx consolectx.Context, name string, res *meshresource.D } func DeleteConfigurator(ctx consolectx.Context, name string, mesh string) error { - if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil { - logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) - return err + lock := ctx.LockManager() + if lock == nil { + return ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)) } - return nil + + lockKey := fmt.Sprintf("dynamic_config:%s:%s", mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil { + logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) + return err + } + return nil + }) } diff --git a/pkg/console/service/tag_rule.go b/pkg/console/service/tag_rule.go index 7c4048feb..7178d32be 100644 --- a/pkg/console/service/tag_rule.go +++ b/pkg/console/service/tag_rule.go @@ -18,7 +18,9 @@ package service import ( + "github.com/apache/dubbo-admin/pkg/common/constants" consolectx "github.com/apache/dubbo-admin/pkg/console/context" + "github.com/apache/dubbo-admin/pkg/core/lock" "github.com/apache/dubbo-admin/pkg/core/logger" "github.com/apache/dubbo-admin/pkg/core/manager" meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1" @@ -38,6 +40,19 @@ func GetTagRule(ctx consolectx.Context, name string, mesh string) (*meshresource } func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { + lockMgr := ctx.LockManager() + if lockMgr == nil { + return updateTagRuleUnsafe(ctx, res) + } + + lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name) + + return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error { + return updateTagRuleUnsafe(ctx, res) + }) +} + +func updateTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResource) error { err := ctx.ResourceManager().Update(res) if err != nil { logger.Warnf("update tag rule %s error: %v", res.Name, err) @@ -47,6 +62,19 @@ func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e } func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { + lockMgr := ctx.LockManager() + if lockMgr == nil { + return createTagRuleUnsafe(ctx, res) + } + + lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name) + + return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error { + return createTagRuleUnsafe(ctx, res) + }) +} + +func createTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResource) error { err := ctx.ResourceManager().Add(res) if err != nil { logger.Warnf("create tag rule %s error: %v", res.Name, err) @@ -56,10 +84,19 @@ func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e } func DeleteTagRule(ctx consolectx.Context, name string, mesh string) error { - err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) - if err != nil { - logger.Warnf("delete tag rule %s error: %v", name, err) - return err + lockMgr := ctx.LockManager() + if lockMgr == nil { + return ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) } - return nil + + lockKey := lock.BuildTagRouteLockKey(mesh, name) + + return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error { + err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name)) + if err != nil { + logger.Warnf("delete tag rule %s error: %v", name, err) + return err + } + return nil + }) } diff --git a/pkg/core/lock/component.go b/pkg/core/lock/component.go new file mode 100644 index 000000000..11814d28a --- /dev/null +++ b/pkg/core/lock/component.go @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "math" + "time" + + "github.com/pkg/errors" + + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/runtime" +) + +func init() { + runtime.RegisterComponent(NewComponent()) +} + +const ( + // DistributedLockComponent is the component type for distributed lock + DistributedLockComponent runtime.ComponentType = "distributed lock" +) + +// Component implements the runtime.Component interface for distributed lock +type Component struct { + lock Lock +} + +// NewComponent creates a new distributed lock component +func NewComponent() *Component { + return &Component{} +} + +// Type returns the component type +func (c *Component) Type() runtime.ComponentType { + return DistributedLockComponent +} + +// Order indicates the initialization order +// Lock should be initialized after Store (Order math.MaxInt - 1) +// Higher order values are initialized first, so we use math.MaxInt - 2 +func (c *Component) Order() int { + return math.MaxInt - 2 // After Store, before other services +} + +// Init initializes the distributed lock component +func (c *Component) Init(ctx runtime.BuilderContext) error { + factory, err := LockFactoryRegistry().GetSupportedFactory(ctx) + if err != nil { + // No supporting factory found + logger.Warnf("No supported lock factory found: %v", err) + logger.Warn("Distributed lock will not be available") + return nil + } + + // Lock created using a factory + lock, err := factory.NewLock(ctx) + if err != nil { + return errors.Wrap(err, "failed to create distributed lock") + } + + c.lock = lock + logger.Info("Distributed lock component initialized successfully") + return nil +} + +// Start starts the distributed lock component +func (c *Component) Start(rt runtime.Runtime, stop <-chan struct{}) error { + if c.lock == nil { + logger.Warn("Distributed lock not available, skipping") + return nil + } + + // Start background cleanup task + ticker := time.NewTicker(constants.DefaultCleanupInterval) // Cleanup every 5 minutes + defer ticker.Stop() + + for { + select { + case <-stop: + return nil + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultCleanupTimeout) + if err := c.lock.CleanupExpiredLocks(ctx); err != nil { + logger.Errorf("Failed to cleanup expired locks: %v", err) + } + cancel() + } + } +} + +// GetLock returns the lock instance +func (c *Component) GetLock() Lock { + return c.lock +} + +// GetLockFromRuntime extracts the lock instance from runtime +func GetLockFromRuntime(rt runtime.Runtime) (Lock, error) { + comp, err := rt.GetComponent(DistributedLockComponent) + if err != nil { + return nil, err + } + + lockComp, ok := comp.(*Component) + if !ok { + return nil, errors.Errorf("component %s is not a valid lock component", DistributedLockComponent) + } + + if lockComp.lock == nil { + return nil, errors.New("distributed lock is not available (possibly using memory store)") + } + + return lockComp.GetLock(), nil +} + +func (c *Component) RequiredDependencies() []runtime.ComponentType { + return []runtime.ComponentType{ + runtime.ResourceStore, + } +} diff --git a/pkg/core/lock/factory.go b/pkg/core/lock/factory.go new file mode 100644 index 000000000..afa38d8d9 --- /dev/null +++ b/pkg/core/lock/factory.go @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "fmt" + "github.com/apache/dubbo-admin/pkg/core/runtime" +) + +var registry = newLockFactoryRegistry() + +// RegisterLockFactory registers a Lock factory +func RegisterLockFactory(f Factory) { + registry.Register(f) +} + +// LockFactoryRegistry returns the global factory registry +func LockFactoryRegistry() Registry { + return registry +} + +// Factory defines the factory interface for creating Locks +type Factory interface { + // Support determines whether the factory supports creating a Lock from a given context + // Determines this by inspecting the components in the context + Support(ctx runtime.BuilderContext) bool + + // NewLock creates a Lock instance from the BuilderContext + // The factory decides for itself how to extract dependencies from the context + NewLock(ctx runtime.BuilderContext) (Lock, error) +} + +// Registry defines the query interface for the factory registry +type Registry interface { + // GetSupportedFactory returns the first supported factory + GetSupportedFactory(ctx runtime.BuilderContext) (Factory, error) + // GetAllSupportedFactories returns all supported factories + GetAllSupportedFactories(ctx runtime.BuilderContext) []Factory +} + +// RegistryMutator defines the interface for modifying the factory registry +type RegistryMutator interface { + Register(Factory) +} + +// MutableRegistry combines query and modification interfaces +type MutableRegistry interface { + Registry + RegistryMutator +} + +var _ MutableRegistry = &lockFactoryRegistry{} + +type lockFactoryRegistry struct { + factories []Factory +} + +func newLockFactoryRegistry() MutableRegistry { + return &lockFactoryRegistry{ + factories: make([]Factory, 0), + } +} + +func (r *lockFactoryRegistry) GetSupportedFactory(ctx runtime.BuilderContext) (Factory, error) { + for _, factory := range r.factories { + if factory.Support(ctx) { + return factory, nil + } + } + return nil, fmt.Errorf("no supported lock factory found") +} + +func (r *lockFactoryRegistry) GetAllSupportedFactories(ctx runtime.BuilderContext) []Factory { + supported := make([]Factory, 0) + for _, factory := range r.factories { + if factory.Support(ctx) { + supported = append(supported, factory) + } + } + return supported +} + +func (r *lockFactoryRegistry) Register(factory Factory) { + r.factories = append(r.factories, factory) +} diff --git a/pkg/core/lock/key.go b/pkg/core/lock/key.go new file mode 100644 index 000000000..29994e0b8 --- /dev/null +++ b/pkg/core/lock/key.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "fmt" + + "github.com/apache/dubbo-admin/pkg/common/constants" +) + +// BuildLockKey constructs a lock key from a prefix and parts +func BuildLockKey(prefix string, parts ...string) string { + key := prefix + for _, part := range parts { + key += ":" + part + } + return key +} + +// BuildTagRouteLockKey constructs a lock key for tag route operations +func BuildTagRouteLockKey(mesh, name string) string { + return fmt.Sprintf("%s:%s:%s", constants.TagRouteKeyPrefix, mesh, name) +} + +// BuildConfiguratorRuleLockKey constructs a lock key for configurator rule operations +func BuildConfiguratorRuleLockKey(mesh, name string) string { + return fmt.Sprintf("%s:%s:%s", constants.ConfiguratorRuleKeyPrefix, mesh, name) +} + +// BuildConditionRuleLockKey constructs a lock key for condition rule operations +func BuildConditionRuleLockKey(mesh, name string) string { + return fmt.Sprintf("%s:%s:%s", constants.ConditionRuleKeyPrefix, mesh, name) +} diff --git a/pkg/core/lock/lock.go b/pkg/core/lock/lock.go new file mode 100644 index 000000000..4b3bd781c --- /dev/null +++ b/pkg/core/lock/lock.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "time" +) + +// Lock defines the distributed lock interface +// This abstraction allows for multiple implementations (GORM, Redis, etcd, etc.) +type Lock interface { + // Lock acquires a distributed lock, blocking until successful or context cancelled + Lock(ctx context.Context, key string, ttl time.Duration) error + + // TryLock attempts to acquire a lock without blocking + // Returns true if lock was acquired, false otherwise + TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) + + // Unlock releases a lock held by this instance + Unlock(ctx context.Context, key string) error + + // Renew extends the TTL of a lock held by this instance + Renew(ctx context.Context, key string, ttl time.Duration) error + + // IsLocked checks if a lock is currently held by anyone + IsLocked(ctx context.Context, key string) (bool, error) + + // WithLock executes a function while holding a lock + // Automatically acquires the lock, executes the function, and releases the lock + WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error + + // CleanupExpiredLocks removes expired locks (maintenance task) + CleanupExpiredLocks(ctx context.Context) error +} diff --git a/pkg/lock/gorm/factory.go b/pkg/lock/gorm/factory.go new file mode 100644 index 000000000..ff9f81bd1 --- /dev/null +++ b/pkg/lock/gorm/factory.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gorm + +import ( + "fmt" + + "github.com/apache/dubbo-admin/pkg/core/lock" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +func init() { + lock.RegisterLockFactory(&gormLockFactory{}) +} + +type gormLockFactory struct{} + +// Support checks if GORM-based lock is supported based on store configuration +func (f *gormLockFactory) Support(ctx runtime.BuilderContext) bool { + cfg := ctx.Config().Store + // GORM lock is supported for database-backed stores (mysql, postgres) + return cfg.Type == "mysql" || cfg.Type == "postgres" +} + +// NewLock creates a GORM Lock instance by obtaining DB from dbcommon package +func (f *gormLockFactory) NewLock(ctx runtime.BuilderContext) (lock.Lock, error) { + cfg := ctx.Config().Store + + // Get the database connection from dbcommon's global connection pool + // This reuses the existing connection pool created by the store + // but accesses it through the dbcommon package instead of StoreComponent + db := dbcommon.GetGlobalDB(cfg.Type) + if db == nil { + return nil, fmt.Errorf("no database connection found for store type: %s", cfg.Type) + } + + // Auto-migrate lock table + if err := db.AutoMigrate(&LockRecord{}); err != nil { + return nil, fmt.Errorf("failed to migrate lock table: %w", err) + } + + logger.Info("Creating GORM-based distributed lock using existing database connection") + return NewGormLockFromDB(db), nil +} diff --git a/pkg/lock/gorm/lock.go b/pkg/lock/gorm/lock.go new file mode 100644 index 000000000..83e1e658d --- /dev/null +++ b/pkg/lock/gorm/lock.go @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gorm + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/common/constants" + "github.com/apache/dubbo-admin/pkg/core/lock" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ lock.Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + db *gorm.DB // Direct DB reference to avoid circular dependency + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +// Deprecated: Use NewGormLockFromDB to avoid circular dependencies +func NewGormLock(pool *dbcommon.ConnectionPool) lock.Lock { + return &GormLock{ + pool: pool, + db: pool.GetDB(), + owner: uuid.New().String(), + } +} + +// NewGormLockFromDB creates a new GORM-based distributed lock instance from a DB connection +// This is the preferred constructor to avoid circular dependencies +func NewGormLockFromDB(db *gorm.DB) lock.Lock { + return &GormLock{ + db: db, + owner: uuid.New().String(), + } +} + +// getDB returns the database instance, to prefer direct DB to pool +func (g *GormLock) getDB() *gorm.DB { + if g.db != nil { + return g.db + } + if g.pool != nil { + return g.pool.GetDB() + } + return nil +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(constants.DefaultLockRetryInterval) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.getDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) + } + + // Check if the insertion was successful + if result.RowsAffected == 0 { + // The lock already exists + acquired = false + return nil + } + + // New row inserted successfully, lock acquired successfully + acquired = true + return nil + }) + + if err != nil { + return false, err + } + + return acquired, nil +} + +// Unlock releases a lock held by this instance +func (g *GormLock) Unlock(ctx context.Context, key string) error { + db := g.getDB().WithContext(ctx) + + result := db.Where("lock_key = ? AND owner = ?", key, g.owner). + Delete(&LockRecord{}) + + if result.Error != nil { + return fmt.Errorf("failed to release lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return bizerror.New(bizerror.LockNotHeld, "lock not held by this owner") + } + + return nil +} + +// Renew extends the TTL of a lock held by this instance +func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) error { + db := g.getDB().WithContext(ctx) + newExpireAt := time.Now().Add(ttl) + + result := db.Model(&LockRecord{}). + Where("lock_key = ? AND owner = ?", key, g.owner). + Update("expire_at", newExpireAt) + + if result.Error != nil { + return fmt.Errorf("failed to renew lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return bizerror.New(bizerror.LockNotHeld, "lock not held by this owner") + } + + return nil +} + +// IsLocked checks if a lock is currently held (by anyone) +func (g *GormLock) IsLocked(ctx context.Context, key string) (bool, error) { + db := g.getDB().WithContext(ctx) + + var count int64 + err := db.Model(&LockRecord{}). + Where("lock_key = ? AND expire_at > ?", key, time.Now()). + Count(&count).Error + + if err != nil { + return false, fmt.Errorf("failed to check lock status: %w", err) + } + + return count > 0, nil +} + +// WithLock executes a function while holding a lock +func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error { + // Acquire lock + if err := g.Lock(ctx, key, ttl); err != nil { + return fmt.Errorf("failed to acquire lock: %w", err) + } + + // Ensure lock is released + defer func() { + // Use background context for unlock to ensure it completes even if ctx is cancelled + unlockCtx, cancel := context.WithTimeout(context.Background(), constants.DefaultUnlockTimeout) + defer cancel() + + if err := g.Unlock(unlockCtx, key); err != nil { + logger.Errorf("Failed to release lock %s: %v", key, err) + } + }() + + // Start auto-renewal if TTL is long enough + var renewDone chan struct{} + if ttl > constants.DefaultAutoRenewThreshold { + renewDone = make(chan struct{}) + go g.autoRenew(ctx, key, ttl, renewDone) + defer close(renewDone) + } + + // Execute the function + return fn() +} + +// autoRenew periodically renews the lock until done channel is closed +func (g *GormLock) autoRenew(ctx context.Context, key string, ttl time.Duration, done <-chan struct{}) { + // Renew at 1/3 of TTL to ensure lock doesn't expire + renewInterval := ttl / 3 + ticker := time.NewTicker(renewInterval) + defer ticker.Stop() + + for { + select { + case <-done: + return + case <-ctx.Done(): + return + case <-ticker.C: + // Double-check done channel before renewing to avoid unnecessary renewal + select { + case <-done: + return + default: + } + + renewCtx, cancel := context.WithTimeout(context.Background(), constants.DefaultRenewTimeout) + if err := g.Renew(renewCtx, key, ttl); err != nil { + logger.Warnf("Failed to renew lock %s: %v", key, err) + cancel() + return + } + cancel() + } + } +} + +// CleanupExpiredLocks removes all expired locks from the database +// This should be called periodically as a maintenance task +func (g *GormLock) CleanupExpiredLocks(ctx context.Context) error { + db := g.getDB().WithContext(ctx) + + result := db.Where("expire_at < ?", time.Now()).Delete(&LockRecord{}) + if result.Error != nil { + return fmt.Errorf("failed to cleanup expired locks: %w", result.Error) + } + + return nil +} diff --git a/pkg/lock/gorm/lock_test.go b/pkg/lock/gorm/lock_test.go new file mode 100644 index 000000000..480579945 --- /dev/null +++ b/pkg/lock/gorm/lock_test.go @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gorm_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + gormlock "github.com/apache/dubbo-admin/pkg/lock/gorm" +) + +func setupTestDB(t *testing.T) *gorm.DB { + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{ + PrepareStmt: false, + }) + require.NoError(t, err, "failed to create test database") + + sqlDB, err := db.DB() + require.NoError(t, err) + + sqlDB.SetMaxOpenConns(1) + + err = db.Exec("PRAGMA journal_mode=WAL;").Error + require.NoError(t, err, "failed to set WAL mode") + + err = db.Exec("PRAGMA busy_timeout=5000;").Error + require.NoError(t, err, "failed to set busy timeout") + + err = db.AutoMigrate(&gormlock.LockRecord{}) + require.NoError(t, err, "failed to migrate lock table") + + return db +} + +func TestBasicLockUnlock(t *testing.T) { + db := setupTestDB(t) + lockInstance := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lockInstance.Lock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err, "should acquire lock successfully") + + isLocked, err := lockInstance.IsLocked(ctx, "test-key") + assert.NoError(t, err) + assert.True(t, isLocked, "lock should be held") + + err = lockInstance.Unlock(ctx, "test-key") + assert.NoError(t, err, "should release lock successfully") + + isLocked, err = lockInstance.IsLocked(ctx, "test-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released") +} + +func TestTryLock(t *testing.T) { + db := setupTestDB(t) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + acquired, err := lock1.TryLock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err) + assert.True(t, acquired, "first lock should be acquired") + + acquired, err = lock2.TryLock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err) + assert.False(t, acquired, "second lock should not be acquired") + + err = lock1.Unlock(ctx, "test-key") + assert.NoError(t, err) + + acquired, err = lock2.TryLock(ctx, "test-key", 5*time.Second) + assert.NoError(t, err) + assert.True(t, acquired, "second lock should be acquired after first is released") + + _ = lock2.Unlock(ctx, "test-key") +} + +func TestConcurrentLockAttempts(t *testing.T) { + db := setupTestDB(t) + ctx := context.Background() + + const numGoroutines = 10 + var successCount atomic.Int32 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + lockInstance := gormlock.NewGormLockFromDB(db) + acquired, err := lockInstance.TryLock(ctx, "concurrent-key", 1*time.Second) + if err == nil && acquired { + successCount.Add(1) + time.Sleep(100 * time.Millisecond) // Hold lock briefly + _ = lockInstance.Unlock(ctx, "concurrent-key") + } + }() + } + + wg.Wait() + + assert.Equal(t, int32(1), successCount.Load(), "only one goroutine should acquire the lock") +} + +func TestLockExpiration(t *testing.T) { + db := setupTestDB(t) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + acquired, err := lock1.TryLock(ctx, "expire-key", 100*time.Millisecond) + assert.NoError(t, err) + assert.True(t, acquired) + + acquired, err = lock2.TryLock(ctx, "expire-key", 1*time.Second) + assert.NoError(t, err) + assert.False(t, acquired, "lock should still be held") + + time.Sleep(200 * time.Millisecond) + + acquired, err = lock2.TryLock(ctx, "expire-key", 1*time.Second) + assert.NoError(t, err) + assert.True(t, acquired, "lock should be acquired after expiration") + + _ = lock2.Unlock(ctx, "expire-key") +} + +func TestLockRenewal(t *testing.T) { + db := setupTestDB(t) + lockInstance := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lockInstance.Lock(ctx, "renew-key", 1*time.Second) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + err = lockInstance.Renew(ctx, "renew-key", 2*time.Second) + assert.NoError(t, err, "should renew lock successfully") + + isLocked, err := lockInstance.IsLocked(ctx, "renew-key") + assert.NoError(t, err) + assert.True(t, isLocked, "lock should still be held after renewal") + + _ = lockInstance.Unlock(ctx, "renew-key") +} + +func TestUnlockNotHeld(t *testing.T) { + db := setupTestDB(t) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lock1.Lock(ctx, "test-key", 5*time.Second) + require.NoError(t, err) + + err = lock2.Unlock(ctx, "test-key") + assert.Error(t, err, "should return error") + + // 检查错误类型和错误码 + var bizErr bizerror.Error + if assert.ErrorAs(t, err, &bizErr) { + assert.Equal(t, bizerror.LockNotHeld, bizErr.Code(), "should return LockNotHeld error code") + } + + _ = lock1.Unlock(ctx, "test-key") +} + +func TestRenewNotHeld(t *testing.T) { + db := setupTestDB(t) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lock1.Lock(ctx, "test-key", 5*time.Second) + require.NoError(t, err) + + err = lock2.Renew(ctx, "test-key", 10*time.Second) + assert.Error(t, err, "should return error") + + var bizErr bizerror.Error + if assert.ErrorAs(t, err, &bizErr) { + assert.Equal(t, bizerror.LockNotHeld, bizErr.Code(), "should return LockNotHeld error code") + } + + _ = lock1.Unlock(ctx, "test-key") +} + +func TestWithLock(t *testing.T) { + db := setupTestDB(t) + lockInstance := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + executed := false + err := lockInstance.WithLock(ctx, "with-lock-key", 2*time.Second, func() error { + executed = true + isLocked, err := lockInstance.IsLocked(ctx, "with-lock-key") + assert.NoError(t, err) + assert.True(t, isLocked) + return nil + }) + + assert.NoError(t, err) + assert.True(t, executed, "function should be executed") + + time.Sleep(100 * time.Millisecond) + isLocked, err := lockInstance.IsLocked(ctx, "with-lock-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released after WithLock") +} + +func TestWithLockAutoRenewal(t *testing.T) { + db := setupTestDB(t) + lockInstance := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + executed := false + err := lockInstance.WithLock(ctx, "auto-renew-key", 15*time.Second, func() error { + time.Sleep(6 * time.Second) + executed = true + return nil + }) + + assert.NoError(t, err) + assert.True(t, executed, "function should be executed") + + time.Sleep(100 * time.Millisecond) + isLocked, err := lockInstance.IsLocked(ctx, "auto-renew-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released after WithLock") +} + +func TestWithLockContextCancellation(t *testing.T) { + db := setupTestDB(t) + lockInstance := gormlock.NewGormLockFromDB(db) + + ctx, cancel := context.WithCancel(context.Background()) + + started := make(chan struct{}) + err := lockInstance.WithLock(ctx, "cancel-key", 5*time.Second, func() error { + close(started) + cancel() + time.Sleep(100 * time.Millisecond) + return nil + }) + + <-started + + assert.NoError(t, err, "function should complete even if context is cancelled during execution") + + time.Sleep(100 * time.Millisecond) + isLocked, err := lockInstance.IsLocked(context.Background(), "cancel-key") + assert.NoError(t, err) + assert.False(t, isLocked, "lock should be released even after context cancellation") +} + +func TestCleanupExpiredLocks(t *testing.T) { + db := setupTestDB(t) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + _, _ = lock1.TryLock(ctx, "cleanup-key-1", 100*time.Millisecond) + _, _ = lock2.TryLock(ctx, "cleanup-key-2", 100*time.Millisecond) + + time.Sleep(200 * time.Millisecond) + + err := lock1.CleanupExpiredLocks(ctx) + assert.NoError(t, err) + + var count int64 + db.Model(&gormlock.LockRecord{}).Count(&count) + assert.Equal(t, int64(0), count, "all expired locks should be cleaned up") +} + +func TestMultipleDifferentLocks(t *testing.T) { + db := setupTestDB(t) + lockInstance := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + err1 := lockInstance.Lock(ctx, "key-1", 5*time.Second) + err2 := lockInstance.Lock(ctx, "key-2", 5*time.Second) + err3 := lockInstance.Lock(ctx, "key-3", 5*time.Second) + + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NoError(t, err3) + + isLocked1, _ := lockInstance.IsLocked(ctx, "key-1") + isLocked2, _ := lockInstance.IsLocked(ctx, "key-2") + isLocked3, _ := lockInstance.IsLocked(ctx, "key-3") + + assert.True(t, isLocked1) + assert.True(t, isLocked2) + assert.True(t, isLocked3) + + _ = lockInstance.Unlock(ctx, "key-1") + _ = lockInstance.Unlock(ctx, "key-2") + _ = lockInstance.Unlock(ctx, "key-3") +} + +func TestLockBlockingBehavior(t *testing.T) { + db := setupTestDB(t) + lock1 := gormlock.NewGormLockFromDB(db) + lock2 := gormlock.NewGormLockFromDB(db) + ctx := context.Background() + + err := lock1.Lock(ctx, "blocking-key", 10*time.Second) + require.NoError(t, err) + + isLocked, err := lock1.IsLocked(ctx, "blocking-key") + require.NoError(t, err) + require.True(t, isLocked) + + acquiredTime := time.Now() + done := make(chan time.Time) + + go func() { + _ = lock2.Lock(ctx, "blocking-key", 10*time.Second) + done <- time.Now() + }() + + time.Sleep(500 * time.Millisecond) + + unlockErr := lock1.Unlock(ctx, "blocking-key") + require.NoError(t, unlockErr, "unlock should succeed") + + isLocked, err = lock1.IsLocked(ctx, "blocking-key") + require.NoError(t, err) + + lock2AcquiredTime := <-done + + duration := lock2AcquiredTime.Sub(acquiredTime) + + assert.GreaterOrEqual(t, duration, 500*time.Millisecond, "lock2 should acquire after lock1 releases") + assert.Less(t, duration, 1500*time.Millisecond, "lock2 should acquire shortly after lock1 releases") + + _ = lock2.Unlock(ctx, "blocking-key") +} diff --git a/pkg/lock/gorm/model.go b/pkg/lock/gorm/model.go new file mode 100644 index 000000000..0becabb8f --- /dev/null +++ b/pkg/lock/gorm/model.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gorm + +import ( + "time" +) + +// LockRecord represents a distributed lock record in the database +type LockRecord struct { + ID uint `gorm:"primarykey"` + LockKey string `gorm:"uniqueIndex;size:255;not null"` // Unique lock identifier + Owner string `gorm:"size:255;not null"` // UUID of the lock holder + ExpireAt time.Time `gorm:"index;not null"` // Lock expiration time + CreatedAt time.Time `gorm:"autoCreateTime"` // Lock creation time + UpdatedAt time.Time `gorm:"autoUpdateTime"` // Last renewal time +} + +// TableName returns the table name for LockRecord +func (LockRecord) TableName() string { + return "distributed_locks" +} diff --git a/pkg/store/dbcommon/connection_pool.go b/pkg/store/dbcommon/connection_pool.go index c9487825d..5a866262f 100644 --- a/pkg/store/dbcommon/connection_pool.go +++ b/pkg/store/dbcommon/connection_pool.go @@ -224,3 +224,21 @@ func (p *ConnectionPool) Stats() sql.DBStats { } return sql.DBStats{} } + +// GetGlobalDB returns the first available gorm.DB instance from the global pool registry +// This is used by components that need database access without going through StoreComponent +// Returns nil if no database connection is available +func GetGlobalDB(storeType storecfg.Type) *gorm.DB { + poolsMutex.RLock() + defer poolsMutex.RUnlock() + + // Find the first pool matching the store type + prefix := string(storeType) + ":" + for key, pool := range pools { + if len(key) >= len(prefix) && key[:len(prefix)] == prefix { + return pool.GetDB() + } + } + + return nil +}