diff --git a/go.mod b/go.mod index 4a5beee6234..dafdc9105c8 100644 --- a/go.mod +++ b/go.mod @@ -202,6 +202,9 @@ require ( // store have been included in a tagged sqldb version. replace github.com/lightningnetwork/lnd/sqldb => ./sqldb +// Replace for local kvdb package modifications. +replace github.com/lightningnetwork/lnd/kvdb => ./kvdb + // This replace is for https://github.com/advisories/GHSA-25xm-hr59-7c27 replace github.com/ulikunitz/xz => github.com/ulikunitz/xz v0.5.11 diff --git a/go.sum b/go.sum index 0f8782d9904..aa6d8bdd023 100644 --- a/go.sum +++ b/go.sum @@ -369,8 +369,6 @@ github.com/lightningnetwork/lnd/fn/v2 v2.0.8 h1:r2SLz7gZYQPVc3IZhU82M66guz3Zk2oY github.com/lightningnetwork/lnd/fn/v2 v2.0.8/go.mod h1:TOzwrhjB/Azw1V7aa8t21ufcQmdsQOQMDtxVOQWNl8s= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= -github.com/lightningnetwork/lnd/kvdb v1.4.16 h1:9BZgWdDfjmHRHLS97cz39bVuBAqMc4/p3HX1xtUdbDI= -github.com/lightningnetwork/lnd/kvdb v1.4.16/go.mod h1:HW+bvwkxNaopkz3oIgBV6NEnV4jCEZCACFUcNg4xSjM= github.com/lightningnetwork/lnd/queue v1.1.1 h1:99ovBlpM9B0FRCGYJo6RSFDlt8/vOkQQZznVb18iNMI= github.com/lightningnetwork/lnd/queue v1.1.1/go.mod h1:7A6nC1Qrm32FHuhx/mi1cieAiBZo5O6l8IBIoQxvkz4= github.com/lightningnetwork/lnd/ticker v1.1.1 h1:J/b6N2hibFtC7JLV77ULQp++QLtCwT6ijJlbdiZFbSM= diff --git a/kvdb/bolt_test.go b/kvdb/bolt_test.go index 69366c5b5e2..f86aed675cc 100644 --- a/kvdb/bolt_test.go +++ b/kvdb/bolt_test.go @@ -23,6 +23,10 @@ func TestBolt(t *testing.T) { name: "read write cursor", test: testReadWriteCursor, }, + { + name: "read write cursor delete before positioning", + test: testReadWriteCursorDeleteBeforePositioning, + }, { name: "read write cursor with bucket and value", test: testReadWriteCursorWithBucketAndValue, diff --git a/kvdb/postgres_bench_test.go b/kvdb/postgres_bench_test.go new file mode 100644 index 00000000000..47aa7d90136 --- /dev/null +++ b/kvdb/postgres_bench_test.go @@ -0,0 +1,344 @@ +//go:build kvdb_postgres + +package kvdb + +import ( + "fmt" + "testing" + + "github.com/btcsuite/btcwallet/walletdb" + "github.com/lightningnetwork/lnd/kvdb/postgres" + "github.com/stretchr/testify/require" +) + +// BenchmarkPostgresCursorDelete benchmarks the cursor Delete operation with +// various dataset sizes. +func BenchmarkPostgresCursorDelete(b *testing.B) { + // Start embedded postgres instance for benchmarks. + stop, err := postgres.StartEmbeddedPostgres() + require.NoError(b, err) + b.Cleanup(func() { + if err := stop(); err != nil { + b.Logf("Failed to stop postgres: %v", err) + } + }) + + sizes := []int{10, 100, 1000, 10000, 100000, 1000000} + + for _, size := range sizes { + // Skip larger sizes on short benchmarks. + if testing.Short() && size > 10000 { + b.Skip("Skipping large dataset in short mode") + } + + b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + // Create a test database. + f, err := postgres.NewFixture("") + require.NoError(b, err) + + // Pre-populate the database with test data. + b.Logf("Populating database with %d keys...", size) + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket, err := tx.CreateTopLevelBucket([]byte("bench")) + if err != nil { + return err + } + + // Insert test data in batches for efficiency. + batchSize := 1000 + for i := 0; i < size; i++ { + key := fmt.Sprintf("key%08d", i) + val := fmt.Sprintf("value%08d", i) + if err := bucket.Put([]byte(key), []byte(val)); err != nil { + return err + } + + // Commit batch periodically for large datasets. + if i > 0 && i%batchSize == 0 && size > 10000 { + return nil + } + } + return nil + }, func() {}) + require.NoError(b, err) + + // For large datasets, commit remaining data. + if size > 10000 { + for i := (size / 1000) * 1000; i < size; i++ { + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket := tx.ReadWriteBucket([]byte("bench")) + key := fmt.Sprintf("key%08d", i) + val := fmt.Sprintf("value%08d", i) + return bucket.Put([]byte(key), []byte(val)) + }, func() {}) + require.NoError(b, err) + } + } + + b.ResetTimer() + + // Benchmark the delete operations. + for i := 0; i < b.N; i++ { + // Delete using cursor. + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket := tx.ReadWriteBucket([]byte("bench")) + cursor := bucket.ReadWriteCursor() + + // Position cursor and delete. + idx := i % size + key := fmt.Sprintf("key%08d", idx) + k, _ := cursor.Seek([]byte(key)) + if k != nil { + return cursor.Delete() + } + return nil + }, func() {}) + require.NoError(b, err) + + // Re-add the deleted key for next iteration. + b.StopTimer() + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket := tx.ReadWriteBucket([]byte("bench")) + idx := i % size + key := fmt.Sprintf("key%08d", idx) + val := fmt.Sprintf("value%08d", idx) + return bucket.Put([]byte(key), []byte(val)) + }, func() {}) + require.NoError(b, err) + b.StartTimer() + } + }) + } +} + +// BenchmarkPostgresBucketDelete benchmarks the bucket Delete operation with +// various dataset sizes. +func BenchmarkPostgresBucketDelete(b *testing.B) { + // Start embedded postgres instance for benchmarks. + stop, err := postgres.StartEmbeddedPostgres() + require.NoError(b, err) + b.Cleanup(func() { + if err := stop(); err != nil { + b.Logf("Failed to stop postgres: %v", err) + } + }) + + sizes := []int{10, 100, 1000, 10000, 100000, 1000000} + + for _, size := range sizes { + // Skip larger sizes on short benchmarks. + if testing.Short() && size > 10000 { + b.Skip("Skipping large dataset in short mode") + } + + b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + // Create a test database. + f, err := postgres.NewFixture("") + require.NoError(b, err) + + // Pre-populate the database with test data. + b.Logf("Populating database with %d keys...", size) + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket, err := tx.CreateTopLevelBucket([]byte("bench")) + if err != nil { + return err + } + + // Insert test data. + for i := 0; i < size; i++ { + key := fmt.Sprintf("key%08d", i) + val := fmt.Sprintf("value%08d", i) + if err := bucket.Put([]byte(key), []byte(val)); err != nil { + return err + } + } + return nil + }, func() {}) + require.NoError(b, err) + + b.ResetTimer() + + // Benchmark the delete operations. + for i := 0; i < b.N; i++ { + // Delete directly on bucket. + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket := tx.ReadWriteBucket([]byte("bench")) + + idx := i % size + key := fmt.Sprintf("key%08d", idx) + return bucket.Delete([]byte(key)) + }, func() {}) + require.NoError(b, err) + + // Re-add the deleted key for next iteration. + b.StopTimer() + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket := tx.ReadWriteBucket([]byte("bench")) + idx := i % size + key := fmt.Sprintf("key%08d", idx) + val := fmt.Sprintf("value%08d", idx) + return bucket.Put([]byte(key), []byte(val)) + }, func() {}) + require.NoError(b, err) + b.StartTimer() + } + }) + } +} + +// BenchmarkPostgresBucketDeleteNonExistent benchmarks deleting non-existent +// keys with various dataset sizes. +func BenchmarkPostgresBucketDeleteNonExistent(b *testing.B) { + // Start embedded postgres instance for benchmarks. + stop, err := postgres.StartEmbeddedPostgres() + require.NoError(b, err) + b.Cleanup(func() { + if err := stop(); err != nil { + b.Logf("Failed to stop postgres: %v", err) + } + }) + + // Test with different dataset sizes to see if having more data affects + // non-existent key deletion performance. + sizes := []int{0, 1000, 100000, 1000000} + + for _, size := range sizes { + // Skip larger sizes on short benchmarks. + if testing.Short() && size > 10000 { + b.Skip("Skipping large dataset in short mode") + } + + b.Run(fmt.Sprintf("dbsize=%d", size), func(b *testing.B) { + // Create a test database. + f, err := postgres.NewFixture("") + require.NoError(b, err) + + // Create bucket and optionally populate it. + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket, err := tx.CreateTopLevelBucket([]byte("bench")) + if err != nil { + return err + } + + // Populate with existing data if requested. + for i := 0; i < size; i++ { + key := fmt.Sprintf("existing%08d", i) + val := fmt.Sprintf("value%08d", i) + if err := bucket.Put([]byte(key), []byte(val)); err != nil { + return err + } + } + return nil + }, func() {}) + require.NoError(b, err) + + b.ResetTimer() + + // Benchmark deleting non-existent keys. + for i := 0; i < b.N; i++ { + err := Update(f.Db, func(tx walletdb.ReadWriteTx) error { + bucket := tx.ReadWriteBucket([]byte("bench")) + + // Delete a non-existent key. + key := fmt.Sprintf("nonexistent%08d", i) + return bucket.Delete([]byte(key)) + }, func() {}) + require.NoError(b, err) + } + }) + } +} + +// BenchmarkPostgresNextSequence benchmarks the NextSequence operation. +func BenchmarkPostgresNextSequence(b *testing.B) { + // Start embedded postgres instance for benchmarks. + stop, err := postgres.StartEmbeddedPostgres() + require.NoError(b, err) + b.Cleanup(func() { + if err := stop(); err != nil { + b.Logf("Failed to stop postgres: %v", err) + } + }) + + b.Run("single_bucket", func(b *testing.B) { + // Create a test database. + f, err := postgres.NewFixture("") + require.NoError(b, err) + + // Create a single bucket for benchmarking. + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + top, err := tx.CreateTopLevelBucket([]byte("bench")) + if err != nil { + return err + } + _, err = top.CreateBucket([]byte("subbucket")) + return err + }, func() {}) + require.NoError(b, err) + + b.ResetTimer() + + // Benchmark NextSequence calls on a single bucket. + for i := 0; i < b.N; i++ { + err := Update(f.Db, func(tx walletdb.ReadWriteTx) error { + top := tx.ReadWriteBucket([]byte("bench")) + bucket := top.NestedReadWriteBucket([]byte("subbucket")) + + seq, err := bucket.NextSequence() + if err != nil { + return err + } + + // Verify sequence is correct. + expectedSeq := uint64(i + 1) + if seq != expectedSeq { + b.Errorf("Expected sequence %d, got %d", expectedSeq, seq) + } + + return nil + }, func() {}) + require.NoError(b, err) + } + }) + + b.Run("concurrent_buckets", func(b *testing.B) { + // Create a test database. + f, err := postgres.NewFixture("") + require.NoError(b, err) + + // Create multiple buckets to simulate concurrent access patterns. + numBuckets := 10 + err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { + top, err := tx.CreateTopLevelBucket([]byte("bench")) + if err != nil { + return err + } + for i := 0; i < numBuckets; i++ { + bucketName := fmt.Sprintf("bucket%d", i) + _, err = top.CreateBucket([]byte(bucketName)) + if err != nil { + return err + } + } + return nil + }, func() {}) + require.NoError(b, err) + + b.ResetTimer() + + // Benchmark NextSequence calls rotating through buckets. + for i := 0; i < b.N; i++ { + bucketIdx := i % numBuckets + err := Update(f.Db, func(tx walletdb.ReadWriteTx) error { + top := tx.ReadWriteBucket([]byte("bench")) + bucketName := fmt.Sprintf("bucket%d", bucketIdx) + bucket := top.NestedReadWriteBucket([]byte(bucketName)) + + _, err := bucket.NextSequence() + return err + }, func() {}) + require.NoError(b, err) + } + }) +} + diff --git a/kvdb/postgres_test.go b/kvdb/postgres_test.go index 4645e6a3084..0298c2fa3e2 100644 --- a/kvdb/postgres_test.go +++ b/kvdb/postgres_test.go @@ -46,6 +46,16 @@ func TestPostgres(t *testing.T) { }, }, }, + { + name: "read write cursor delete before positioning", + test: testReadWriteCursorDeleteBeforePositioning, + expectedDb: m{ + "test_kv": []m{ + {"id": int64(1), "key": "test", "parent_id": nil, "sequence": nil, "value": nil}, + {"id": int64(3), "key": "key2", "parent_id": int64(1), "sequence": nil, "value": "val2"}, + }, + }, + }, { name: "read write cursor with bucket and value", test: testReadWriteCursorWithBucketAndValue, diff --git a/kvdb/readwrite_cursor_test.go b/kvdb/readwrite_cursor_test.go index de70196b336..6992ef0d679 100644 --- a/kvdb/readwrite_cursor_test.go +++ b/kvdb/readwrite_cursor_test.go @@ -255,6 +255,66 @@ func testReadWriteCursor(t *testing.T, db walletdb.DB) { require.NoError(t, err) } +// testReadWriteCursorDeleteBeforePositioning tests that calling Delete on a +// cursor before positioning it returns an error. +func testReadWriteCursorDeleteBeforePositioning(t *testing.T, db walletdb.DB) { + require.NoError(t, Update(db, func(tx walletdb.ReadWriteTx) error { + b, err := tx.CreateTopLevelBucket([]byte("test")) + require.NoError(t, err) + require.NotNil(t, b) + + require.NoError(t, b.Put([]byte("key1"), []byte("val1"))) + require.NoError(t, b.Put([]byte("key2"), []byte("val2"))) + + // Create a cursor but don't position it. + cursor := b.ReadWriteCursor() + + // Test Delete on unpositioned cursor. The behavior differs by + // backend: + // + // - BoltDB: panics + // - SQL backends: returns nil (no-op) + // + // We'll use panic recovery to handle both cases. + didPanic := false + func() { + defer func() { + if r := recover(); r != nil { + didPanic = true + } + }() + + err = cursor.Delete() + }() + + if didPanic { + // This is expected for BoltDB + t.Log("Delete on unpositioned cursor " + + "panicked (expected for BoltDB)") + } else { + // For SQL backends, it should not error. + require.NoError(t, err) + t.Log("Delete on unpositioned cursor " + + "returned nil (expected for SQL backends)") + } + + // Verify that no data was deleted. + require.Equal(t, []byte("val1"), b.Get([]byte("key1"))) + require.Equal(t, []byte("val2"), b.Get([]byte("key2"))) + + // Now position the cursor and delete should work. + k, v := cursor.First() + require.Equal(t, []byte("key1"), k) + require.Equal(t, []byte("val1"), v) + + require.NoError(t, cursor.Delete()) + require.Nil(t, b.Get([]byte("key1"))) + require.Equal(t, []byte("val2"), b.Get([]byte("key2"))) + + return nil + }, func() {})) +} + // testReadWriteCursorWithBucketAndValue tests that cursors are able to iterate // over both bucket and value keys if both are present in the iterated bucket. func testReadWriteCursorWithBucketAndValue(t *testing.T, db walletdb.DB) { diff --git a/kvdb/sqlbase/readwrite_bucket.go b/kvdb/sqlbase/readwrite_bucket.go index f8913723f86..6c3317b9196 100644 --- a/kvdb/sqlbase/readwrite_bucket.go +++ b/kvdb/sqlbase/readwrite_bucket.go @@ -323,46 +323,71 @@ func (b *readWriteBucket) Put(key, value []byte) error { return nil } -// Delete deletes the key/value pointed to by the passed key. -// Returns ErrKeyRequired if the passed key is empty. +// Delete deletes the key/value pointed to by the passed key. Returns +// ErrKeyRequired if the passed key is empty. func (b *readWriteBucket) Delete(key []byte) error { if key == nil { + // Deleting a nil key seems like a no-op in original context, + // maintain that. return nil } + if len(key) == 0 { return walletdb.ErrKeyRequired } - // Check to see if a bucket with this key exists. - var dummy int + // First, try to delete the key directly, but only if it's a value + // (value IS NOT NULL). + result, err := b.tx.Exec( + "DELETE FROM "+b.table+" WHERE key=$1 AND "+ + parentSelector(b.id)+" AND value IS NOT NULL", + key, + ) + if err != nil { + return fmt.Errorf("error attempting to delete "+ + "key %x: %w", key, err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("error getting rows affected for "+ + "key %x: %w", key, err) + } + + // If we deleted exactly one row, we're done. It was a key-value pair. + if rowsAffected == 1 { + return nil + } + + // If rowsAffected is 0, it means either: + // 1. The key doesn't exist at all. + // 2. The key exists, but `value IS NULL` (it's a bucket). + // + // We need to check for case 2 to return ErrIncompatibleValue. + var existsAsBucket int row, cancel := b.tx.QueryRow( "SELECT 1 FROM "+b.table+" WHERE "+parentSelector(b.id)+ " AND key=$1 AND value IS NULL", key, ) defer cancel() - err := row.Scan(&dummy) - switch { - // No bucket exists, proceed to deletion of the key. - case err == sql.ErrNoRows: + err = row.Scan(&existsAsBucket) - case err != nil: - return err - - // Bucket exists. - default: + if err == nil { + // Scan succeeded without error, meaning we found a row where + // value IS NULL. It's a bucket. return walletdb.ErrIncompatibleValue } - _, err = b.tx.Exec( - "DELETE FROM "+b.table+" WHERE key=$1 AND "+ - parentSelector(b.id)+" AND value IS NOT NULL", - key, - ) - if err != nil { - return err + if err == sql.ErrNoRows { + // Key didn't exist as a value (rowsAffected==0) AND it doesn't + // exist as a bucket. So the key just wasn't found. Deleting a + // non-existent key is often a no-op. + return nil } - return nil + // Some other database error occurred during the check. + return fmt.Errorf("error checking if key %x exists as bucket: %w", + key, err) } // ReadWriteCursor returns a new read-write cursor for this bucket. @@ -379,9 +404,40 @@ func (b *readWriteBucket) Tx() walletdb.ReadWriteTx { // Note that this is not a thread safe function and as such it must not be used // for synchronization. func (b *readWriteBucket) NextSequence() (uint64, error) { - seq := b.Sequence() + 1 + if b.id == nil { + // Sequence numbers are only supported for nested buckets, as + // top-level buckets don't have a unique row ID in the same way. + panic("sequence not supported on top level bucket") + } + + var nextSeq uint64 + row, cancel := b.tx.QueryRow( + // Atomically increment the sequence number for the bucket ID. + // We use COALESCE to handle the case where the sequence is NULL + // (never set before), treating it as 0 before incrementing. The + // RETURNING clause gives us the new value. + "UPDATE "+b.table+" SET sequence = COALESCE(sequence, 0) + 1 "+ + "WHERE id=$1 RETURNING sequence", + b.id, + ) + defer cancel() + + err := row.Scan(&nextSeq) + if err != nil { + // If we get sql.ErrNoRows, it means the bucket ID didn't exist, + // which shouldn't happen if the bucket was created correctly. + // We wrap the error for clarity. + if errors.Is(err, sql.ErrNoRows) { + return 0, fmt.Errorf("bucket with id %d not found "+ + "for sequence update", *b.id) + } + + // Return other potential scan or query errors directly. + return 0, fmt.Errorf("failed to update and retrieve "+ + "sequence for bucket id %d: %w", *b.id, err) + } - return seq, b.SetSequence(seq) + return nextSeq, nil } // SetSequence updates the sequence number for the bucket. diff --git a/kvdb/sqlbase/readwrite_cursor.go b/kvdb/sqlbase/readwrite_cursor.go index 3124f915c0e..d8e32621a53 100644 --- a/kvdb/sqlbase/readwrite_cursor.go +++ b/kvdb/sqlbase/readwrite_cursor.go @@ -184,49 +184,57 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) { } // Delete removes the current key/value pair the cursor is at without -// invalidating the cursor. Returns ErrIncompatibleValue if attempted -// when the cursor points to a nested bucket. +// invalidating the cursor. Returns ErrIncompatibleValue if attempted when the +// cursor points to a nested bucket. func (c *readWriteCursor) Delete() error { - // Get first record at or after cursor. - var key []byte + // Use a single atomic query with CTEs to: + // 1. Find the first key at or after cursor position + // 2. Delete it if it's a value (not a bucket) + // 3. Return whether it was deleted or is a bucket + var deleted bool + var isBucket bool + row, cancel := c.bucket.tx.QueryRow( - "SELECT key FROM "+c.bucket.table+" WHERE "+ - parentSelector(c.bucket.id)+ - " AND key>=$1 ORDER BY key LIMIT 1", + "WITH target AS ("+ + " SELECT key, value FROM "+c.bucket.table+ + " WHERE "+parentSelector(c.bucket.id)+ + " AND key >= $1"+ + " ORDER BY key"+ + " LIMIT 1"+ + "), "+ + "deleted AS ("+ + " DELETE FROM "+c.bucket.table+ + " WHERE "+parentSelector(c.bucket.id)+ + " AND key = (SELECT key FROM target)"+ + " AND value IS NOT NULL"+ + " RETURNING 1"+ + ") "+ + "SELECT "+ + " EXISTS(SELECT 1 FROM deleted) AS was_deleted, "+ + " EXISTS(SELECT 1 FROM target WHERE value IS NULL) AS is_bucket", c.currKey, ) defer cancel() - err := row.Scan(&key) - switch { - case err == sql.ErrNoRows: + err := row.Scan(&deleted, &isBucket) + if err == sql.ErrNoRows { + // No key at or after cursor position. return nil - - case err != nil: - panic(err) } - - // Delete record. - result, err := c.bucket.tx.Exec( - "DELETE FROM "+c.bucket.table+" WHERE "+ - parentSelector(c.bucket.id)+ - " AND key=$1 AND value IS NOT NULL", - key, - ) if err != nil { panic(err) } - rows, err := result.RowsAffected() - if err != nil { - return err + // If we deleted the key, we're done. + if deleted { + return nil } - // The key exists but nothing has been deleted. This means that the key - // must have been a bucket key. - if rows != 1 { + // If the key is a bucket, return incompatible value error. + if isBucket { return walletdb.ErrIncompatibleValue } - return err + // No key found (target was empty). + return nil }