-
Notifications
You must be signed in to change notification settings - Fork 17
Add and expose latest version as watermark #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
d70c912
6c44357
d76d6c7
3bee995
1640542
5d87eec
8a98170
1518f68
6b7dab2
f372213
10e313a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,6 +59,8 @@ type Database struct { | |
| config config.StateStoreConfig | ||
| // Earliest version for db after pruning | ||
| earliestVersion int64 | ||
| // Latest version for db | ||
| latestVersion int64 | ||
|
|
||
| // Map of module to when each was last updated | ||
| // Used in pruning to skip over stores that have not been updated recently | ||
|
|
@@ -115,20 +117,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { | |
| opts.FlushSplitBytes = opts.Levels[0].TargetFileSize | ||
| opts = opts.EnsureDefaults() | ||
|
|
||
| //TODO: add a new config and check if readonly = true to support readonly mode | ||
|
|
||
| db, err := pebble.Open(dataDir, opts) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to open PebbleDB: %w", err) | ||
| } | ||
|
|
||
| // Initialize earliest version | ||
| earliestVersion, err := retrieveEarliestVersion(db) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to open PebbleDB: %w", err) | ||
| return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) | ||
| } | ||
|
|
||
| // Initialize latest version | ||
| latestVersion, err := retrieveLatestVersion(db) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to retrieve latest version: %w", err) | ||
| } | ||
|
|
||
| database := &Database{ | ||
| storage: db, | ||
| asyncWriteWG: sync.WaitGroup{}, | ||
| config: config, | ||
| earliestVersion: earliestVersion, | ||
| latestVersion: latestVersion, | ||
| pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), | ||
| } | ||
|
|
||
|
|
@@ -159,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { | |
| return database, nil | ||
| } | ||
|
|
||
| func NewWithDB(storage *pebble.DB) *Database { | ||
| return &Database{ | ||
| storage: storage, | ||
| } | ||
| } | ||
|
|
||
| func (db *Database) Close() error { | ||
| if db.streamHandler != nil { | ||
| _ = db.streamHandler.Close() | ||
|
|
@@ -182,27 +189,32 @@ func (db *Database) SetLatestVersion(version int64) error { | |
| if version < 0 { | ||
| return fmt.Errorf("version must be non-negative") | ||
| } | ||
| db.latestVersion = version | ||
| var ts [VersionSize]byte | ||
| binary.LittleEndian.PutUint64(ts[:], uint64(version)) | ||
| err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) | ||
| return err | ||
| } | ||
|
|
||
| func (db *Database) GetLatestVersion() (int64, error) { | ||
| bz, closer, err := db.storage.Get([]byte(latestVersionKey)) | ||
| if err != nil { | ||
| func (db *Database) GetLatestVersion() int64 { | ||
| return db.latestVersion | ||
| } | ||
|
|
||
| // Retrieve latestVersion from db, if not found, return 0. | ||
| func retrieveLatestVersion(db *pebble.DB) (int64, error) { | ||
| bz, closer, err := db.Get([]byte(latestVersionKey)) | ||
| defer func() { | ||
| if closer != nil { | ||
| _ = closer.Close() | ||
| } | ||
| }() | ||
| if err != nil || len(bz) == 0 { | ||
| if errors.Is(err, pebble.ErrNotFound) { | ||
yzang2019 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // in case of a fresh database | ||
| return 0, nil | ||
| } | ||
|
|
||
| return 0, err | ||
| } | ||
|
|
||
| if len(bz) == 0 { | ||
| return 0, closer.Close() | ||
| } | ||
|
|
||
| uz := binary.LittleEndian.Uint64(bz) | ||
| if uz > math.MaxInt64 { | ||
| return 0, fmt.Errorf("latest version in database overflows int64: %d", uz) | ||
|
|
@@ -216,16 +228,15 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error | |
| } | ||
| if version > db.earliestVersion || ignoreVersion { | ||
| db.earliestVersion = version | ||
|
|
||
| var ts [VersionSize]byte | ||
| binary.LittleEndian.PutUint64(ts[:], uint64(version)) | ||
| return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (db *Database) GetEarliestVersion() (int64, error) { | ||
| return db.earliestVersion, nil | ||
| func (db *Database) GetEarliestVersion() int64 { | ||
| return db.earliestVersion | ||
| } | ||
|
|
||
| func (db *Database) SetLastRangeHashed(latestHashed int64) error { | ||
|
|
@@ -253,22 +264,21 @@ func (db *Database) GetLastRangeHashed() (int64, error) { | |
| return cachedValue, nil | ||
| } | ||
|
|
||
| // Retrieves earliest version from db | ||
| // Retrieves earliest version from db, if not found, return 0 | ||
| func retrieveEarliestVersion(db *pebble.DB) (int64, error) { | ||
| bz, closer, err := db.Get([]byte(earliestVersionKey)) | ||
| if err != nil { | ||
| defer func() { | ||
| if closer != nil { | ||
| _ = closer.Close() | ||
| } | ||
| }() | ||
| if err != nil || len(bz) == 0 { | ||
| if errors.Is(err, pebble.ErrNotFound) { | ||
| // in case of a fresh database | ||
| return 0, nil | ||
| } | ||
|
|
||
| return 0, err | ||
| } | ||
|
|
||
| if len(bz) == 0 { | ||
| return 0, closer.Close() | ||
| } | ||
|
|
||
| ubz := binary.LittleEndian.Uint64(bz) | ||
| if ubz > math.MaxInt64 { | ||
| return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz) | ||
|
|
@@ -373,18 +383,19 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro | |
| version = 1 | ||
| } | ||
|
|
||
| // Create batch and persist latest version in the batch | ||
| b, err := NewBatch(db.storage, version) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for _, kvPair := range cs.Changeset.Pairs { | ||
| if kvPair.Value == nil { | ||
| if err := b.Delete(cs.Name, kvPair.Key); err != nil { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a general practice i recommend against overriding error like this, specially in cases like this since the scope of err is tighly limited to the if block. Ditto for next if statement.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, thanks for pointing that out! |
||
| if err = b.Delete(cs.Name, kvPair.Key); err != nil { | ||
| return err | ||
| } | ||
| } else { | ||
| if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { | ||
| if err = b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { | ||
yzang2019 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return err | ||
| } | ||
| } | ||
|
|
@@ -393,7 +404,13 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro | |
| // Mark the store as updated | ||
| db.storeKeyDirty.Store(cs.Name, version) | ||
|
|
||
| return b.Write() | ||
| // Update latest version | ||
| err = b.Write() | ||
|
||
| if err != nil { | ||
| return err | ||
| } | ||
| db.latestVersion = version | ||
| return nil | ||
| } | ||
|
|
||
| func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { | ||
|
|
@@ -537,10 +554,6 @@ func (db *Database) writeAsyncInBackground() { | |
| panic(err) | ||
| } | ||
| } | ||
| err := db.SetLatestVersion(version) | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.