Skip to content
2 changes: 2 additions & 0 deletions pkg/common/bizerror/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
NacosError ErrorCode = "NacosError"
ZKError ErrorCode = "ZKError"
EventError ErrorCode = "EventError"
LockNotHeld ErrorCode = "LockNotHeld"
LockExpired ErrorCode = "LockExpired"
)

type bizError struct {
Expand Down
57 changes: 57 additions & 0 deletions pkg/common/constants/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constants

import "time"

const (
// DefaultLockTimeout is the default timeout for distributed lock operations
// This timeout applies to lock acquisition, renewal, and release operations
DefaultLockTimeout = 30 * time.Second

// DefaultAutoRenewThreshold is the TTL threshold above which auto-renewal is enabled
// Locks with TTL longer than this value will be automatically renewed
DefaultAutoRenewThreshold = 10 * time.Second

// DefaultUnlockTimeout is the timeout for unlock operations in deferred cleanup
DefaultUnlockTimeout = 5 * time.Second

// DefaultRenewTimeout is the timeout for lock renewal operations
DefaultRenewTimeout = 5 * time.Second

// DefaultLockRetryInterval is the interval between lock acquisition retry attempts
DefaultLockRetryInterval = 100 * time.Millisecond

// DefaultCleanupInterval is the interval for periodic expired lock cleanup
DefaultCleanupInterval = 5 * time.Minute

// DefaultCleanupTimeout is the timeout for cleanup operations
DefaultCleanupTimeout = 30 * time.Second
)

// Lock key prefixes for different resource types
const (
// TagRouteKeyPrefix is the prefix for tag route lock keys
TagRouteKeyPrefix = "tag_route"

// ConfiguratorRuleKeyPrefix is the prefix for configurator rule lock keys
ConfiguratorRuleKeyPrefix = "configurator_rule"

// ConditionRuleKeyPrefix is the prefix for condition rule lock keys
ConditionRuleKeyPrefix = "condition_rule"
)
10 changes: 10 additions & 0 deletions pkg/console/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (

"github.com/apache/dubbo-admin/pkg/config/app"
"github.com/apache/dubbo-admin/pkg/console/counter"
"github.com/apache/dubbo-admin/pkg/core/lock"
"github.com/apache/dubbo-admin/pkg/core/manager"
"github.com/apache/dubbo-admin/pkg/core/runtime"
)

type Context interface {
ResourceManager() manager.ResourceManager
CounterManager() counter.CounterManager
LockManager() lock.Lock

Config() app.AdminConfig

Expand Down Expand Up @@ -71,3 +73,11 @@ func (c *context) CounterManager() counter.CounterManager {
}
return managerComp.CounterManager()
}

func (c *context) LockManager() lock.Lock {
distributedLock, err := lock.GetLockFromRuntime(c.coreRt)
if err != nil {
return nil
}
return distributedLock
}
51 changes: 48 additions & 3 deletions pkg/console/service/condition_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package service

import (
"fmt"
"time"

"github.com/apache/dubbo-admin/pkg/console/context"
"github.com/apache/dubbo-admin/pkg/console/model"
"github.com/apache/dubbo-admin/pkg/core/logger"
Expand Down Expand Up @@ -73,6 +76,22 @@ func GetConditionRule(ctx context.Context, name string, mesh string) (*meshresou
}

func UpdateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error {
lock := ctx.LockManager()
if lock == nil {
// Lock not available, proceed without lock protection
return updateConditionRuleUnsafe(ctx, name, res)
}

// Use distributed lock to prevent concurrent modifications
lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name)
lockTimeout := 30 * time.Second

return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error {
return updateConditionRuleUnsafe(ctx, name, res)
})
}

func updateConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error {
if err := ctx.ResourceManager().Update(res); err != nil {
logger.Warnf("update %s condition failed with error: %s", name, err.Error())
return err
Expand All @@ -81,6 +100,20 @@ func UpdateConditionRule(ctx context.Context, name string, res *meshresource.Con
}

func CreateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error {
lock := ctx.LockManager()
if lock == nil {
return createConditionRuleUnsafe(ctx, name, res)
}

lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name)
lockTimeout := 30 * time.Second

return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error {
return createConditionRuleUnsafe(ctx, name, res)
})
}

func createConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error {
if err := ctx.ResourceManager().Add(res); err != nil {
logger.Warnf("create %s condition failed with error: %s", name, err.Error())
return err
Expand All @@ -89,8 +122,20 @@ func CreateConditionRule(ctx context.Context, name string, res *meshresource.Con
}

func DeleteConditionRule(ctx context.Context, name string, mesh string) error {
if err := ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name)); err != nil {
return err
lock := ctx.LockManager()
if lock == nil {
return ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name))
}
return nil

lockKey := fmt.Sprintf("condition_route:%s:%s", mesh, name)
lockTimeout := 30 * time.Second

return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error {
err := ctx.ResourceManager().DeleteByKey(meshresource.ConditionRouteKind, coremodel.BuildResourceKey(mesh, name))
if err != nil {
logger.Warnf("delete %s condition failed with error: %s", name, err.Error())
return err
}
return nil
})
}
49 changes: 45 additions & 4 deletions pkg/console/service/configurator_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package service

import (
"fmt"
"time"

consolectx "github.com/apache/dubbo-admin/pkg/console/context"
"github.com/apache/dubbo-admin/pkg/core/logger"
"github.com/apache/dubbo-admin/pkg/core/manager"
Expand All @@ -38,6 +41,20 @@ func GetConfigurator(ctx consolectx.Context, name string, mesh string) (*meshres
}

func UpdateConfigurator(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error {
lock := ctx.LockManager()
if lock == nil {
return updateConfiguratorUnsafe(ctx, name, res)
}

lockKey := fmt.Sprintf("dynamic_config:%s:%s", res.Mesh, name)
lockTimeout := 30 * time.Second

return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error {
return updateConfiguratorUnsafe(ctx, name, res)
})
}

func updateConfiguratorUnsafe(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error {
if err := ctx.ResourceManager().Update(res); err != nil {
logger.Warnf("update %s configurator failed with error: %s", name, err.Error())
return err
Expand All @@ -46,6 +63,20 @@ func UpdateConfigurator(ctx consolectx.Context, name string, res *meshresource.D
}

func CreateConfigurator(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error {
lock := ctx.LockManager()
if lock == nil {
return createConfiguratorUnsafe(ctx, name, res)
}

lockKey := fmt.Sprintf("dynamic_config:%s:%s", res.Mesh, name)
lockTimeout := 30 * time.Second

return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error {
return createConfiguratorUnsafe(ctx, name, res)
})
}

func createConfiguratorUnsafe(ctx consolectx.Context, name string, res *meshresource.DynamicConfigResource) error {
if err := ctx.ResourceManager().Add(res); err != nil {
logger.Warnf("create %s configurator failed with error: %s", name, err.Error())
return err
Expand All @@ -54,9 +85,19 @@ func CreateConfigurator(ctx consolectx.Context, name string, res *meshresource.D
}

func DeleteConfigurator(ctx consolectx.Context, name string, mesh string) error {
if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil {
logger.Warnf("delete %s configurator failed with error: %s", name, err.Error())
return err
lock := ctx.LockManager()
if lock == nil {
return ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name))
}
return nil

lockKey := fmt.Sprintf("dynamic_config:%s:%s", mesh, name)
lockTimeout := 30 * time.Second

return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error {
if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil {
logger.Warnf("delete %s configurator failed with error: %s", name, err.Error())
return err
}
return nil
})
}
47 changes: 42 additions & 5 deletions pkg/console/service/tag_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package service

import (
"github.com/apache/dubbo-admin/pkg/common/constants"
consolectx "github.com/apache/dubbo-admin/pkg/console/context"
"github.com/apache/dubbo-admin/pkg/core/lock"
"github.com/apache/dubbo-admin/pkg/core/logger"
"github.com/apache/dubbo-admin/pkg/core/manager"
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
Expand All @@ -38,6 +40,19 @@ func GetTagRule(ctx consolectx.Context, name string, mesh string) (*meshresource
}

func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error {
lockMgr := ctx.LockManager()
if lockMgr == nil {
return updateTagRuleUnsafe(ctx, res)
}

lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name)

return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error {
return updateTagRuleUnsafe(ctx, res)
})
}

func updateTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResource) error {
err := ctx.ResourceManager().Update(res)
if err != nil {
logger.Warnf("update tag rule %s error: %v", res.Name, err)
Expand All @@ -47,6 +62,19 @@ func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e
}

func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error {
lockMgr := ctx.LockManager()
if lockMgr == nil {
return createTagRuleUnsafe(ctx, res)
}

lockKey := lock.BuildTagRouteLockKey(res.Mesh, res.Name)

return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error {
return createTagRuleUnsafe(ctx, res)
})
}

func createTagRuleUnsafe(ctx consolectx.Context, res *meshresource.TagRouteResource) error {
err := ctx.ResourceManager().Add(res)
if err != nil {
logger.Warnf("create tag rule %s error: %v", res.Name, err)
Expand All @@ -56,10 +84,19 @@ func CreateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) e
}

func DeleteTagRule(ctx consolectx.Context, name string, mesh string) error {
err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name))
if err != nil {
logger.Warnf("delete tag rule %s error: %v", name, err)
return err
lockMgr := ctx.LockManager()
if lockMgr == nil {
return ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name))
}
return nil

lockKey := lock.BuildTagRouteLockKey(mesh, name)

return lockMgr.WithLock(ctx.AppContext(), lockKey, constants.DefaultLockTimeout, func() error {
err := ctx.ResourceManager().DeleteByKey(meshresource.TagRouteKind, coremodel.BuildResourceKey(mesh, name))
if err != nil {
logger.Warnf("delete tag rule %s error: %v", name, err)
return err
}
return nil
})
}
Loading
Loading