Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ type EtcdServer struct {
kv mvcc.WatchableKV
lessor lease.Lessor
bemu sync.RWMutex
defragMu sync.Mutex
be backend.Backend
beHooks *serverstorage.BackendHooks
authStore auth.AuthStore
Expand Down Expand Up @@ -975,8 +976,10 @@ func (s *EtcdServer) Cleanup() {
}

func (s *EtcdServer) Defragment() error {
s.bemu.Lock()
defer s.bemu.Unlock()
s.defragMu.Lock()
defer s.defragMu.Unlock()
s.bemu.RLock()
defer s.bemu.RUnlock()
return s.be.Defrag()
}

Expand Down
203 changes: 147 additions & 56 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package backend

import (
"errors"
"fmt"
"hash/crc32"
"io"
Expand All @@ -28,6 +29,7 @@ import (
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
bolterrors "go.etcd.io/bbolt/errors"
"go.etcd.io/etcd/client/pkg/v3/verify"
)

Expand Down Expand Up @@ -469,20 +471,6 @@ func (b *backend) defrag() error {
isDefragActive.Set(1)
defer isDefragActive.Set(0)

// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.Lock()
defer b.readTx.Unlock()

// Create a temporary file to ensure we start with a clean slate.
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
dir := filepath.Dir(b.db.Path())
Expand All @@ -500,63 +488,112 @@ func (b *backend) defrag() error {
// return nil, fmt.Errorf(defragOpenFileError)
return temp, nil
}
// Don't load tmp db into memory regardless of opening options
options.Mlock = false
tdbp := temp.Name()
tmpdb, err := bolt.Open(tdbp, 0o600, &options)
if err != nil {
temp.Close()
if rmErr := os.Remove(temp.Name()); rmErr != nil {
b.lg.Error(
"failed to remove temporary file",
b.lg.Error("failed to remove temporary file",
zap.String("path", temp.Name()),
zap.Error(rmErr),
)
}

return err
}

dbp := b.db.Path()
size1, sizeInUse1 := b.Size(), b.SizeInUse()
b.lg.Info(
"defragmenting",
b.lg.Info("defragmenting",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes", size1),
zap.String("current-db-size", humanize.Bytes(uint64(size1))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
)

defer func() {
// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()
// ============================================================
// PHASE 1: Commit pending writes, take a snapshot, install the
// journal, then unlock so writes can continue during the copy.
// ============================================================
b.batchTx.LockOutsideApply()

// Commit/stop and then reset current transactions (including the readTx)
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
b.batchTx.commit(false)

b.mu.RLock()
snapTx, snapErr := b.db.Begin(false)
b.mu.RUnlock()
if snapErr != nil {
b.batchTx.Unlock()
tmpdb.Close()
os.Remove(tdbp)
return fmt.Errorf("failed to begin snapshot tx for defrag: %w", snapErr)
}

journal := newDefragJournal()
b.batchTx.defragJournal = journal
b.batchTx.Unlock()

b.lg.Info("defrag: copying data (writes unlocked)")

// gofail: var defragBeforeCopy struct{}
err = defragdb(b.db, tmpdb, defragLimit)
err = defragFromTx(snapTx, tmpdb, defragLimit)
snapTx.Rollback()

if err != nil {
b.batchTx.LockOutsideApply()
b.batchTx.defragJournal = nil
b.batchTx.Unlock()
journal.close()
tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
}
os.RemoveAll(tdbp)
return err
}

// ============================================================
// PHASE 2: Lock writes, drain and replay the journal into the
// temp DB. This should be fast since it's only the delta.
// ============================================================
b.lg.Info("defrag: replaying journal")

// restore the bbolt transactions if defragmentation fails
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.tx = b.unsafeBegin(false)
b.batchTx.LockOutsideApply()
b.batchTx.defragJournal = nil
journal.close()
ops := journal.drain()

if len(ops) > 0 {
b.lg.Info("defrag: replaying journal ops", zap.Int("count", len(ops)))
}

err = replayJournal(tmpdb, ops, defragLimit)
if err != nil {
b.batchTx.Unlock()
tmpdb.Close()
os.RemoveAll(tdbp)
return err
}

// ============================================================
// PHASE 3: Atomic switchover — close old db, rename, reopen.
// ============================================================
b.lg.Info("defrag: switching database")

b.mu.Lock()
b.readTx.Lock()

// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
defer func() {
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()

b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil

err = b.db.Close()
if err != nil {
b.lg.Fatal("failed to close database", zap.Error(err))
Expand Down Expand Up @@ -585,12 +622,15 @@ func (b *backend) defrag() error {
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

b.readTx.Unlock()
b.mu.Unlock()
b.batchTx.Unlock()

took := time.Since(now)
defragSec.Observe(took.Seconds())

size2, sizeInUse2 := b.Size(), b.SizeInUse()
b.lg.Info(
"finished defragmenting directory",
b.lg.Info("finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
Expand All @@ -603,11 +643,7 @@ func (b *backend) defrag() error {
return nil
}

func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// gofail: var defragdbFail string
// return fmt.Errorf(defragdbFail)

// open a tx on tmpdb for writes
func defragFromTx(srcTx *bolt.Tx, tmpdb *bolt.DB, limit int) error {
tmptx, err := tmpdb.Begin(true)
if err != nil {
return err
Expand All @@ -618,18 +654,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
}()

// open a tx on old db for read
tx, err := odb.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()

c := tx.Cursor()

c := srcTx.Cursor()
count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
b := srcTx.Bucket(next)
if b == nil {
return fmt.Errorf("backend: cannot defrag bucket %s", next)
}
Expand Down Expand Up @@ -665,6 +693,69 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return tmptx.Commit()
}

func replayJournal(tmpdb *bolt.DB, ops []defragJournalOp, limit int) error {
if len(ops) == 0 {
return nil
}

tx, err := tmpdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
}
}()

count := 0
for _, op := range ops {
count++
if count > limit {
if err = tx.Commit(); err != nil {
return err
}
tx, err = tmpdb.Begin(true)
if err != nil {
return err
}
count = 0
}

switch op.opType {
case opCreateBucket:
if _, err = tx.CreateBucketIfNotExists(op.bucketName); err != nil {
return fmt.Errorf("replay: create bucket %s: %w", op.bucketName, err)
}
case opDeleteBucket:
if delErr := tx.DeleteBucket(op.bucketName); delErr != nil && !errors.Is(delErr, bolterrors.ErrBucketNotFound) {
return fmt.Errorf("replay: delete bucket %s: %w", op.bucketName, delErr)
}
case opPut:
b := tx.Bucket(op.bucketName)
if b == nil {
return fmt.Errorf("replay: bucket %s not found for put", op.bucketName)
}
if op.seq {
b.FillPercent = 0.9
}
if err = b.Put(op.key, op.value); err != nil {
return fmt.Errorf("replay: put in bucket %s: %w", op.bucketName, err)
}
case opDelete:
b := tx.Bucket(op.bucketName)
if b == nil {
continue
}
if err = b.Delete(op.key); err != nil {
return fmt.Errorf("replay: delete from bucket %s: %w", op.bucketName, err)
}
}
}

return tx.Commit()
}

func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
Expand Down
Loading