Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 47 additions & 34 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Database struct {
config config.StateStoreConfig
// Earliest version for db after pruning
earliestVersion int64
// Latest version for db
latestVersion int64

// Map of module to when each was last updated
// Used in pruning to skip over stores that have not been updated recently
Expand Down Expand Up @@ -115,20 +117,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
opts.FlushSplitBytes = opts.Levels[0].TargetFileSize
opts = opts.EnsureDefaults()

//TODO: add a new config and check if readonly = true to support readonly mode

db, err := pebble.Open(dataDir, opts)
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

// Initialize earliest version
earliestVersion, err := retrieveEarliestVersion(db)
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
return nil, fmt.Errorf("failed to retrieve earliest version: %w", err)
}

// Initialize latest version
latestVersion, err := retrieveLatestVersion(db)
if err != nil {
return nil, fmt.Errorf("failed to retrieve latest version: %w", err)
}

database := &Database{
storage: db,
asyncWriteWG: sync.WaitGroup{},
config: config,
earliestVersion: earliestVersion,
latestVersion: latestVersion,
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}

Expand Down Expand Up @@ -159,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
return database, nil
}

func NewWithDB(storage *pebble.DB) *Database {
return &Database{
storage: storage,
}
}

func (db *Database) Close() error {
if db.streamHandler != nil {
_ = db.streamHandler.Close()
Expand All @@ -182,27 +189,32 @@ func (db *Database) SetLatestVersion(version int64) error {
if version < 0 {
return fmt.Errorf("version must be non-negative")
}
db.latestVersion = version
var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts)
return err
}

func (db *Database) GetLatestVersion() (int64, error) {
bz, closer, err := db.storage.Get([]byte(latestVersionKey))
if err != nil {
func (db *Database) GetLatestVersion() int64 {
return db.latestVersion
}

// Retrieve latestVersion from db, if not found, return 0.
func retrieveLatestVersion(db *pebble.DB) (int64, error) {
bz, closer, err := db.Get([]byte(latestVersionKey))
defer func() {
if closer != nil {
_ = closer.Close()
}
}()
if err != nil || len(bz) == 0 {
if errors.Is(err, pebble.ErrNotFound) {
// in case of a fresh database
return 0, nil
}

return 0, err
}

if len(bz) == 0 {
return 0, closer.Close()
}

uz := binary.LittleEndian.Uint64(bz)
if uz > math.MaxInt64 {
return 0, fmt.Errorf("latest version in database overflows int64: %d", uz)
Expand All @@ -216,16 +228,15 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error
}
if version > db.earliestVersion || ignoreVersion {
db.earliestVersion = version

var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts)
}
return nil
}

func (db *Database) GetEarliestVersion() (int64, error) {
return db.earliestVersion, nil
func (db *Database) GetEarliestVersion() int64 {
return db.earliestVersion
}

func (db *Database) SetLastRangeHashed(latestHashed int64) error {
Expand Down Expand Up @@ -253,22 +264,21 @@ func (db *Database) GetLastRangeHashed() (int64, error) {
return cachedValue, nil
}

// Retrieves earliest version from db
// Retrieves earliest version from db, if not found, return 0
func retrieveEarliestVersion(db *pebble.DB) (int64, error) {
bz, closer, err := db.Get([]byte(earliestVersionKey))
if err != nil {
defer func() {
if closer != nil {
_ = closer.Close()
}
}()
if err != nil || len(bz) == 0 {
if errors.Is(err, pebble.ErrNotFound) {
// in case of a fresh database
return 0, nil
}

return 0, err
}

if len(bz) == 0 {
return 0, closer.Close()
}

ubz := binary.LittleEndian.Uint64(bz)
if ubz > math.MaxInt64 {
return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz)
Expand Down Expand Up @@ -373,18 +383,19 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
version = 1
}

// Create batch and persist latest version in the batch
b, err := NewBatch(db.storage, version)
if err != nil {
return err
}

for _, kvPair := range cs.Changeset.Pairs {
if kvPair.Value == nil {
if err := b.Delete(cs.Name, kvPair.Key); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general practice i recommend against overriding error like this, specially in cases like this since the scope of err is tighly limited to the if block. Ditto for next if statement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks for pointing that out!

if err = b.Delete(cs.Name, kvPair.Key); err != nil {
return err
}
} else {
if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
if err = b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
return err
}
}
Expand All @@ -393,7 +404,13 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
// Mark the store as updated
db.storeKeyDirty.Store(cs.Name, version)

return b.Write()
// Update latest version
err = b.Write()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto re keeping the scope of err variable tight.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

if err != nil {
return err
}
db.latestVersion = version
return nil
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
Expand Down Expand Up @@ -537,10 +554,6 @@ func (db *Database) writeAsyncInBackground() {
panic(err)
}
}
err := db.SetLatestVersion(version)
if err != nil {
panic(err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion ss/pruning/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *Manager) Start() {
go func() {
for {
pruneStartTime := time.Now()
latestVersion, _ := m.stateStore.GetLatestVersion()
latestVersion := m.stateStore.GetLatestVersion()
pruneVersion := latestVersion - m.keepRecent
if pruneVersion > 0 {
// prune all versions up to and including the pruneVersion
Expand Down
Loading
Loading