-
Notifications
You must be signed in to change notification settings - Fork 2.3k
kvdb/sqlbase: make NextSequence atomic, using a single SQL statement #9676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Roasbeef
wants to merge
6
commits into
lightningnetwork:master
Choose a base branch
from
Roasbeef:fix-next-seq-kv-sql
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
84e1002
kvdb/sqlbase: make NextSequence atomic, using a single SQL statement
Roasbeef 3050327
kvdb/sqlbase: optimize Delete operation for case where key exists
Roasbeef 23ab2d0
kvdb/sqlbase: optimize Delete for the rw cursor
Roasbeef d355e88
kvdb/sqlbase: add backend aware test for Delete before cursor positio…
Roasbeef 523d47a
kvdb: add benchmarks for Delete and NextSequence
Roasbeef e2f2347
build: temp replace for kvdb
Roasbeef File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,344 @@ | ||
| //go:build kvdb_postgres | ||
|
|
||
| package kvdb | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "testing" | ||
|
|
||
| "github.com/btcsuite/btcwallet/walletdb" | ||
| "github.com/lightningnetwork/lnd/kvdb/postgres" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // BenchmarkPostgresCursorDelete benchmarks the cursor Delete operation with | ||
| // various dataset sizes. | ||
| func BenchmarkPostgresCursorDelete(b *testing.B) { | ||
| // Start embedded postgres instance for benchmarks. | ||
| stop, err := postgres.StartEmbeddedPostgres() | ||
| require.NoError(b, err) | ||
| b.Cleanup(func() { | ||
| if err := stop(); err != nil { | ||
| b.Logf("Failed to stop postgres: %v", err) | ||
| } | ||
| }) | ||
|
|
||
| sizes := []int{10, 100, 1000, 10000, 100000, 1000000} | ||
|
|
||
| for _, size := range sizes { | ||
| // Skip larger sizes on short benchmarks. | ||
| if testing.Short() && size > 10000 { | ||
| b.Skip("Skipping large dataset in short mode") | ||
| } | ||
|
|
||
| b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { | ||
| // Create a test database. | ||
| f, err := postgres.NewFixture("") | ||
| require.NoError(b, err) | ||
|
|
||
| // Pre-populate the database with test data. | ||
| b.Logf("Populating database with %d keys...", size) | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket, err := tx.CreateTopLevelBucket([]byte("bench")) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Insert test data in batches for efficiency. | ||
| batchSize := 1000 | ||
| for i := 0; i < size; i++ { | ||
| key := fmt.Sprintf("key%08d", i) | ||
| val := fmt.Sprintf("value%08d", i) | ||
| if err := bucket.Put([]byte(key), []byte(val)); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Commit batch periodically for large datasets. | ||
| if i > 0 && i%batchSize == 0 && size > 10000 { | ||
| return nil | ||
| } | ||
| } | ||
| return nil | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
|
|
||
| // For large datasets, commit remaining data. | ||
| if size > 10000 { | ||
| for i := (size / 1000) * 1000; i < size; i++ { | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket := tx.ReadWriteBucket([]byte("bench")) | ||
| key := fmt.Sprintf("key%08d", i) | ||
| val := fmt.Sprintf("value%08d", i) | ||
| return bucket.Put([]byte(key), []byte(val)) | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
| } | ||
| } | ||
|
|
||
| b.ResetTimer() | ||
|
|
||
| // Benchmark the delete operations. | ||
| for i := 0; i < b.N; i++ { | ||
| // Delete using cursor. | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket := tx.ReadWriteBucket([]byte("bench")) | ||
| cursor := bucket.ReadWriteCursor() | ||
|
|
||
| // Position cursor and delete. | ||
| idx := i % size | ||
| key := fmt.Sprintf("key%08d", idx) | ||
| k, _ := cursor.Seek([]byte(key)) | ||
| if k != nil { | ||
| return cursor.Delete() | ||
| } | ||
| return nil | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
|
|
||
| // Re-add the deleted key for next iteration. | ||
| b.StopTimer() | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket := tx.ReadWriteBucket([]byte("bench")) | ||
| idx := i % size | ||
| key := fmt.Sprintf("key%08d", idx) | ||
| val := fmt.Sprintf("value%08d", idx) | ||
| return bucket.Put([]byte(key), []byte(val)) | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
| b.StartTimer() | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // BenchmarkPostgresBucketDelete benchmarks the bucket Delete operation with | ||
| // various dataset sizes. | ||
| func BenchmarkPostgresBucketDelete(b *testing.B) { | ||
| // Start embedded postgres instance for benchmarks. | ||
| stop, err := postgres.StartEmbeddedPostgres() | ||
| require.NoError(b, err) | ||
| b.Cleanup(func() { | ||
| if err := stop(); err != nil { | ||
| b.Logf("Failed to stop postgres: %v", err) | ||
| } | ||
| }) | ||
|
|
||
| sizes := []int{10, 100, 1000, 10000, 100000, 1000000} | ||
|
|
||
| for _, size := range sizes { | ||
| // Skip larger sizes on short benchmarks. | ||
| if testing.Short() && size > 10000 { | ||
| b.Skip("Skipping large dataset in short mode") | ||
| } | ||
|
|
||
| b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { | ||
| // Create a test database. | ||
| f, err := postgres.NewFixture("") | ||
| require.NoError(b, err) | ||
|
|
||
| // Pre-populate the database with test data. | ||
| b.Logf("Populating database with %d keys...", size) | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket, err := tx.CreateTopLevelBucket([]byte("bench")) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Insert test data. | ||
| for i := 0; i < size; i++ { | ||
| key := fmt.Sprintf("key%08d", i) | ||
| val := fmt.Sprintf("value%08d", i) | ||
| if err := bucket.Put([]byte(key), []byte(val)); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
|
|
||
| b.ResetTimer() | ||
|
|
||
| // Benchmark the delete operations. | ||
| for i := 0; i < b.N; i++ { | ||
| // Delete directly on bucket. | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket := tx.ReadWriteBucket([]byte("bench")) | ||
|
|
||
| idx := i % size | ||
| key := fmt.Sprintf("key%08d", idx) | ||
| return bucket.Delete([]byte(key)) | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
|
|
||
| // Re-add the deleted key for next iteration. | ||
| b.StopTimer() | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket := tx.ReadWriteBucket([]byte("bench")) | ||
| idx := i % size | ||
| key := fmt.Sprintf("key%08d", idx) | ||
| val := fmt.Sprintf("value%08d", idx) | ||
| return bucket.Put([]byte(key), []byte(val)) | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
| b.StartTimer() | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // BenchmarkPostgresBucketDeleteNonExistent benchmarks deleting non-existent | ||
| // keys with various dataset sizes. | ||
| func BenchmarkPostgresBucketDeleteNonExistent(b *testing.B) { | ||
| // Start embedded postgres instance for benchmarks. | ||
| stop, err := postgres.StartEmbeddedPostgres() | ||
| require.NoError(b, err) | ||
| b.Cleanup(func() { | ||
| if err := stop(); err != nil { | ||
| b.Logf("Failed to stop postgres: %v", err) | ||
| } | ||
| }) | ||
|
|
||
| // Test with different dataset sizes to see if having more data affects | ||
| // non-existent key deletion performance. | ||
| sizes := []int{0, 1000, 100000, 1000000} | ||
|
|
||
| for _, size := range sizes { | ||
| // Skip larger sizes on short benchmarks. | ||
| if testing.Short() && size > 10000 { | ||
| b.Skip("Skipping large dataset in short mode") | ||
| } | ||
|
|
||
| b.Run(fmt.Sprintf("dbsize=%d", size), func(b *testing.B) { | ||
| // Create a test database. | ||
| f, err := postgres.NewFixture("") | ||
| require.NoError(b, err) | ||
|
|
||
| // Create bucket and optionally populate it. | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket, err := tx.CreateTopLevelBucket([]byte("bench")) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Populate with existing data if requested. | ||
| for i := 0; i < size; i++ { | ||
| key := fmt.Sprintf("existing%08d", i) | ||
| val := fmt.Sprintf("value%08d", i) | ||
| if err := bucket.Put([]byte(key), []byte(val)); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
|
|
||
| b.ResetTimer() | ||
|
|
||
| // Benchmark deleting non-existent keys. | ||
| for i := 0; i < b.N; i++ { | ||
| err := Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| bucket := tx.ReadWriteBucket([]byte("bench")) | ||
|
|
||
| // Delete a non-existent key. | ||
| key := fmt.Sprintf("nonexistent%08d", i) | ||
| return bucket.Delete([]byte(key)) | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // BenchmarkPostgresNextSequence benchmarks the NextSequence operation. | ||
| func BenchmarkPostgresNextSequence(b *testing.B) { | ||
| // Start embedded postgres instance for benchmarks. | ||
| stop, err := postgres.StartEmbeddedPostgres() | ||
| require.NoError(b, err) | ||
| b.Cleanup(func() { | ||
| if err := stop(); err != nil { | ||
| b.Logf("Failed to stop postgres: %v", err) | ||
| } | ||
| }) | ||
|
|
||
| b.Run("single_bucket", func(b *testing.B) { | ||
| // Create a test database. | ||
| f, err := postgres.NewFixture("") | ||
| require.NoError(b, err) | ||
|
|
||
| // Create a single bucket for benchmarking. | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| top, err := tx.CreateTopLevelBucket([]byte("bench")) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| _, err = top.CreateBucket([]byte("subbucket")) | ||
| return err | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
|
|
||
| b.ResetTimer() | ||
|
|
||
| // Benchmark NextSequence calls on a single bucket. | ||
| for i := 0; i < b.N; i++ { | ||
| err := Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| top := tx.ReadWriteBucket([]byte("bench")) | ||
| bucket := top.NestedReadWriteBucket([]byte("subbucket")) | ||
|
|
||
| seq, err := bucket.NextSequence() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Verify sequence is correct. | ||
| expectedSeq := uint64(i + 1) | ||
| if seq != expectedSeq { | ||
| b.Errorf("Expected sequence %d, got %d", expectedSeq, seq) | ||
| } | ||
|
|
||
| return nil | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
| } | ||
| }) | ||
|
|
||
| b.Run("concurrent_buckets", func(b *testing.B) { | ||
| // Create a test database. | ||
| f, err := postgres.NewFixture("") | ||
| require.NoError(b, err) | ||
|
|
||
| // Create multiple buckets to simulate concurrent access patterns. | ||
| numBuckets := 10 | ||
| err = Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| top, err := tx.CreateTopLevelBucket([]byte("bench")) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| for i := 0; i < numBuckets; i++ { | ||
| bucketName := fmt.Sprintf("bucket%d", i) | ||
| _, err = top.CreateBucket([]byte(bucketName)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
|
|
||
| b.ResetTimer() | ||
|
|
||
| // Benchmark NextSequence calls rotating through buckets. | ||
| for i := 0; i < b.N; i++ { | ||
| bucketIdx := i % numBuckets | ||
| err := Update(f.Db, func(tx walletdb.ReadWriteTx) error { | ||
| top := tx.ReadWriteBucket([]byte("bench")) | ||
| bucketName := fmt.Sprintf("bucket%d", bucketIdx) | ||
| bucket := top.NestedReadWriteBucket([]byte(bucketName)) | ||
|
|
||
| _, err := bucket.NextSequence() | ||
| return err | ||
| }, func() {}) | ||
| require.NoError(b, err) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For large datasets, this condition will cause the first
Updatecall to return early, leaving the database only partially populated. Subsequent iterations will then operate on an incomplete dataset, leading to inaccurate benchmark results. Consider refactoring to ensure all data is populated before benchmarking.