Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 47 additions & 30 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Database struct {
config config.StateStoreConfig
// Earliest version for db after pruning
earliestVersion int64
// Latest version for db
latestVersion int64

// Map of module to when each was last updated
// Used in pruning to skip over stores that have not been updated recently
Expand Down Expand Up @@ -120,15 +122,24 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

// Initialize earliest version
earliestVersion, err := retrieveEarliestVersion(db)
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
return nil, fmt.Errorf("failed to retrieve earliest version: %w", err)
}

// Initialize latest version
latestVersion, err := retrieveLatestVersion(db)
if err != nil {
return nil, fmt.Errorf("failed to retrieve latest version: %w", err)
}

database := &Database{
storage: db,
asyncWriteWG: sync.WaitGroup{},
config: config,
earliestVersion: earliestVersion,
latestVersion: latestVersion,
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}

Expand Down Expand Up @@ -176,43 +187,47 @@ func (db *Database) Close() error {
}

func (db *Database) SetLatestVersion(version int64) error {
db.latestVersion = version
var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts)
return err
}

func (db *Database) GetLatestVersion() (int64, error) {
bz, closer, err := db.storage.Get([]byte(latestVersionKey))
if err != nil {
func (db *Database) GetLatestVersion() int64 {
return db.latestVersion
}

// Retrieve latestVersion from db, if not found, return 0.
func retrieveLatestVersion(db *pebble.DB) (int64, error) {
bz, closer, err := db.Get([]byte(latestVersionKey))
defer func() {
if closer != nil {
_ = closer.Close()
}
}()
if err != nil || len(bz) == 0 {
if errors.Is(err, pebble.ErrNotFound) {
// in case of a fresh database
return 0, nil
}

return 0, err
}

if len(bz) == 0 {
return 0, closer.Close()
}

return int64(binary.LittleEndian.Uint64(bz)), closer.Close()
return int64(binary.LittleEndian.Uint64(bz)), nil
}

func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error {
if version > db.earliestVersion || ignoreVersion {
db.earliestVersion = version

var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts)
}
return nil
}

func (db *Database) GetEarliestVersion() (int64, error) {
return db.earliestVersion, nil
func (db *Database) GetEarliestVersion() int64 {
return db.earliestVersion
}

func (db *Database) SetLastRangeHashed(latestHashed int64) error {
Expand All @@ -237,23 +252,22 @@ func (db *Database) GetLastRangeHashed() (int64, error) {
return cachedValue, nil
}

// Retrieves earliest version from db
// Retrieves earliest version from db, if not found, return 0
func retrieveEarliestVersion(db *pebble.DB) (int64, error) {
bz, closer, err := db.Get([]byte(earliestVersionKey))
if err != nil {
defer func() {
if closer != nil {
_ = closer.Close()
}
}()
if err != nil || len(bz) == 0 {
if errors.Is(err, pebble.ErrNotFound) {
// in case of a fresh database
return 0, nil
}

return 0, err
}

if len(bz) == 0 {
return 0, closer.Close()
}

return int64(binary.LittleEndian.Uint64(bz)), closer.Close()
return int64(binary.LittleEndian.Uint64(bz)), nil
}

// SetLatestKey sets the latest key processed during migration.
Expand Down Expand Up @@ -349,18 +363,19 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
version = 1
}

// Create batch and persist latest version in the batch
b, err := NewBatch(db.storage, version)
if err != nil {
return err
}

for _, kvPair := range cs.Changeset.Pairs {
if kvPair.Value == nil {
if err := b.Delete(cs.Name, kvPair.Key); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general practice i recommend against overriding error like this, specially in cases like this since the scope of err is tighly limited to the if block. Ditto for next if statement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks for pointing that out!

if err = b.Delete(cs.Name, kvPair.Key); err != nil {
return err
}
} else {
if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
if err = b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
return err
}
}
Expand All @@ -369,7 +384,13 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
// Mark the store as updated
db.storeKeyDirty.Store(cs.Name, version)

return b.Write()
// Update latest version
err = b.Write()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto re keeping the scope of err variable tight.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

if err != nil {
return err
}
db.latestVersion = version
return nil
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
Expand Down Expand Up @@ -513,10 +534,6 @@ func (db *Database) writeAsyncInBackground() {
panic(err)
}
}
err := db.SetLatestVersion(version)
if err != nil {
panic(err)
}
}
}

Expand Down
87 changes: 32 additions & 55 deletions ss/rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type Database struct {

// Earliest version for db after pruning
earliestVersion int64
// Latest version for db
latestVersion int64

asyncWriteWG sync.WaitGroup

Expand Down Expand Up @@ -85,17 +87,24 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
tsLow = int64(binary.LittleEndian.Uint64(tsLowBz))
}

// Initialize earliest version
earliestVersion, err := retrieveEarliestVersion(storage)
if err != nil {
return nil, fmt.Errorf("failed to retrieve earliest version: %w", err)
}

latestVersion, err := retrieveLatestVersion(storage)
if err != nil {
return nil, fmt.Errorf("failed to retrieve earliest version: %w", err)
}

database := &Database{
storage: storage,
config: config,
cfHandle: cfHandle,
tsLow: tsLow,
earliestVersion: earliestVersion,
latestVersion: latestVersion,
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}

Expand All @@ -117,31 +126,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
return database, nil
}

func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Database, error) {
slice, err := storage.GetFullHistoryTsLow(cfHandle)
if err != nil {
return nil, fmt.Errorf("failed to get full_history_ts_low: %w", err)
}

var tsLow int64
tsLowBz := copyAndFreeSlice(slice)
if len(tsLowBz) > 0 {
tsLow = int64(binary.LittleEndian.Uint64(tsLowBz))
}

earliestVersion, err := retrieveEarliestVersion(storage)
if err != nil {
return nil, fmt.Errorf("failed to retrieve earliest version: %w", err)
}

return &Database{
storage: storage,
cfHandle: cfHandle,
tsLow: tsLow,
earliestVersion: earliestVersion,
}, nil
}

func (db *Database) Close() error {
if db.streamHandler != nil {
// Close the changelog stream first
Expand All @@ -155,7 +139,6 @@ func (db *Database) Close() error {
}

db.storage.Close()

db.storage = nil
db.cfHandle = nil

Expand All @@ -171,39 +154,28 @@ func (db *Database) getSlice(storeKey string, version int64, key []byte) (*grock
}

func (db *Database) SetLatestVersion(version int64) error {
db.latestVersion = version
var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))

return db.storage.Put(defaultWriteOpts, []byte(latestVersionKey), ts[:])
}

func (db *Database) GetLatestVersion() (int64, error) {
bz, err := db.storage.GetBytes(defaultReadOpts, []byte(latestVersionKey))
if err != nil {
return 0, err
}

if len(bz) == 0 {
// in case of a fresh database
return 0, nil
}

return int64(binary.LittleEndian.Uint64(bz)), nil
func (db *Database) GetLatestVersion() int64 {
return db.latestVersion
}

func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error {
if version > db.earliestVersion || ignoreVersion {
db.earliestVersion = version

var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
return db.storage.Put(defaultWriteOpts, []byte(earliestVersionKey), ts[:])
}
return nil
}

func (db *Database) GetEarliestVersion() (int64, error) {
return db.earliestVersion, nil
func (db *Database) GetEarliestVersion() int64 {
return db.earliestVersion
}

func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) {
Expand Down Expand Up @@ -246,6 +218,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
version = 1
}

// Update latest version in batch
b := NewBatch(db, version)

for _, kvPair := range cs.Changeset.Pairs {
Expand All @@ -260,7 +233,12 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
}
}

return b.Write()
err := b.Write()
if err != nil {
return err
}
db.latestVersion = version
return nil
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
Expand Down Expand Up @@ -297,10 +275,6 @@ func (db *Database) writeAsyncInBackground() {
panic(err)
}
}
err := db.SetLatestVersion(version)
if err != nil {
panic(err)
}
}
}
}
Expand Down Expand Up @@ -413,7 +387,7 @@ func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte
}
start, end := util.IterateWithPrefix(prefix, nil, nil)

latestVersion, err := db.GetLatestVersion()
latestVersion, err := retrieveLatestVersion(db.storage)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -514,17 +488,20 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo
panic("implement me")
}

// retrieveEarliestVersion retrieves the earliest version from the database
// retrieveEarliestVersion retrieves the earliest version from the database, if not found, return 0.
func retrieveEarliestVersion(storage *grocksdb.DB) (int64, error) {
bz, err := storage.GetBytes(defaultReadOpts, []byte(earliestVersionKey))
if err != nil {
fmt.Printf("warning: rocksdb get for earliestVersionKey failed: %v", err)
return 0, nil
if err != nil || len(bz) == 0 {
return 0, err
}
return int64(binary.LittleEndian.Uint64(bz)), nil
}

if len(bz) == 0 {
// in case of a fresh database
return 0, nil
// retrieveLatestVersion retrieves the latest version from the database, if not found, return 0.
func retrieveLatestVersion(storage *grocksdb.DB) (int64, error) {
bz, err := storage.GetBytes(defaultReadOpts, []byte(latestVersionKey))
if err != nil || len(bz) == 0 {
return 0, err
}

return int64(binary.LittleEndian.Uint64(bz)), nil
Expand Down
Loading
Loading