diff --git a/base/hlc.go b/base/hlc.go new file mode 100644 index 0000000000..588ba215a7 --- /dev/null +++ b/base/hlc.go @@ -0,0 +1,83 @@ +// Copyright 2026-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package base + +import ( + "sync" + "time" +) + +// hlcLogicalBits is the number of low-order bits of a Couchbase Server CAS reserved for the logical +// (counter) component of the Hybrid Logical Clock. The remaining high-order bits hold the physical +// (wall-clock nanosecond) component. Clearing these bits when reading the wall clock keeps generated +// values in the same numeric space as a CAS and leaves room for same-instant logical increments. +const hlcLogicalBits = 16 + +// hlcLogicalMask masks off the logical component, isolating the physical component of a timestamp. +const hlcLogicalMask = (1 << hlcLogicalBits) - 1 + +// HybridLogicalClock generates monotonically increasing timestamps in the same numeric space as a +// Couchbase Server CAS (a 48-bit nanosecond physical component plus a 16-bit logical counter). Sync +// Gateway uses it to assign HLV current-version values without relying on server-side CAS macro +// expansion, so the value is known before the write and can be written as a literal into every xattr +// field that would otherwise require a per-field subdoc macro-expansion path. +// +// Generated values are CAS-comparable: under synchronised clocks a value generated just before a write +// is below the CAS the server stamps at commit. This is an invariant goxdcr enforces (cv.ver <= cas) until MB-72252. +// Monotonicity per HLV source is the caller's responsibility, supplied via the floor argument to Now. +type HybridLogicalClock struct { + clock func() uint64 // wall-clock source, nanoseconds since the Unix epoch; overridable in tests + highestTime uint64 + mutex sync.Mutex +} + +// NewHybridLogicalClock returns a HybridLogicalClock backed by the system wall clock. +func NewHybridLogicalClock() *HybridLogicalClock { + return &HybridLogicalClock{clock: func() uint64 { return uint64(time.Now().UnixNano()) }} +} + +// SetClockForTest overrides the wall-clock source and resets the clock's high-water mark, so the next +// value returned by Now is determined solely by getTime. Test-only: used to simulate clock skew between +// Sync Gateway and the server. +func (c *HybridLogicalClock) SetClockForTest(getTime func() uint64) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.clock = getTime + c.highestTime = 0 +} + +// Now returns the next timestamp, guaranteed to be strictly greater than both the previous value +// returned by this clock (monotonic across calls) and the supplied floor, while tracking the wall clock +// where possible. The result is max(physical, highestTime+1, floor+1) where physical is the wall-clock +// time with its logical bits cleared. +// +// floor is the highest existing HLV version value for the caller's source; passing 0 (a brand-new +// document, or a source not yet present in the HLV) imposes no lower bound beyond monotonicity, so Now(0) +// is an ordinary clock tick. +func (c *HybridLogicalClock) Now(floor uint64) uint64 { + c.mutex.Lock() + defer c.mutex.Unlock() + + next := c.clock() &^ hlcLogicalMask // physical component, tracks the wall clock + if c.highestTime+1 > next { // ensure strictly greater than the previous value + next = c.highestTime + 1 + } + if floor+1 > next { // ensure strictly greater than the caller's floor + next = floor + 1 + } + c.highestTime = next + return next +} + +// CASToPhysicalNanos returns the physical (wall-clock nanosecond) component of a CAS / HLC value, with the +// logical counter bits cleared. The difference between two such values approximates the elapsed wall-clock +// time between them, which is used to decide how long to wait for a lagging clock to catch up. +func CASToPhysicalNanos(cas uint64) uint64 { + return cas &^ hlcLogicalMask +} diff --git a/base/hlc_test.go b/base/hlc_test.go new file mode 100644 index 0000000000..07385cf183 --- /dev/null +++ b/base/hlc_test.go @@ -0,0 +1,130 @@ +// Copyright 2026-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package base + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// hourNanos is an offset used to place test floors a wall-clock hour either side of "now". +const hourNanos = uint64(time.Hour) + +// TestHybridLogicalClockMonotonic asserts successive values from the system-backed clock strictly increase. +func TestHybridLogicalClockMonotonic(t *testing.T) { + hlc := NewHybridLogicalClock() + prev := hlc.Now(0) + for i := 0; i < 1000; i++ { + next := hlc.Now(0) + require.Greater(t, next, prev, "value at iteration %d did not strictly increase", i) + prev = next + } +} + +// TestHybridLogicalClockTracksWallClock asserts Now(0) reflects the wall clock (physical component) with +// its logical bits cleared, rather than drifting off into logical-counter space from a cold start. +func TestHybridLogicalClockTracksWallClock(t *testing.T) { + hlc := NewHybridLogicalClock() + before := uint64(time.Now().UnixNano()) &^ hlcLogicalMask + got := hlc.Now(0) + after := uint64(time.Now().UnixNano()) + require.GreaterOrEqual(t, got, before) + require.LessOrEqual(t, got, after) +} + +// TestHybridLogicalClockSameInstant asserts that when the wall clock does not advance, successive values +// still strictly increase via the logical counter. +func TestHybridLogicalClockSameInstant(t *testing.T) { + now := uint64(time.Now().UnixNano()) + hlc := &HybridLogicalClock{clock: func() uint64 { return now }} + + first := hlc.Now(0) + require.Equal(t, now&^hlcLogicalMask, first) + require.Equal(t, first+1, hlc.Now(0)) + require.Equal(t, first+2, hlc.Now(0)) +} + +// TestHybridLogicalClockPhysicalMask asserts the logical bits of the wall clock are cleared so generated +// values stay in the CAS numeric space. +func TestHybridLogicalClockPhysicalMask(t *testing.T) { + // OR in low bits so masking is observable regardless of the current nanosecond. + now := uint64(time.Now().UnixNano()) | hlcLogicalMask + hlc := &HybridLogicalClock{clock: func() uint64 { return now }} + + got := hlc.Now(0) + // Isolates only bits 15–0 of the result. Requires them to be all zero — proving Now() stripped them via &^ hlcLogicalMask. + require.Zero(t, got&hlcLogicalMask, "low %d (logical) bits should be cleared", hlcLogicalBits) + // Both sides clear the low 16 bits of now + require.Equal(t, now&^hlcLogicalMask, got) +} + +// TestHybridLogicalClockFloor asserts Now never returns a value <= floor. +func TestHybridLogicalClockFloor(t *testing.T) { + t.Run("floor below physical is ignored", func(t *testing.T) { + now := uint64(time.Now().UnixNano()) + hlc := &HybridLogicalClock{clock: func() uint64 { return now }} + got := hlc.Now(now - hourNanos) // a floor an hour in the past + require.Equal(t, now&^hlcLogicalMask, got) + }) + + t.Run("floor above physical wins", func(t *testing.T) { + now := uint64(time.Now().UnixNano()) + hlc := &HybridLogicalClock{clock: func() uint64 { return now }} + floor := now + hourNanos // a floor an hour in the future + got := hlc.Now(floor) + require.Equal(t, floor+1, got) + require.Greater(t, got, floor) + }) +} + +// TestHybridLogicalClockNoFloor covers the brand-new-document / absent-source case where floor is 0: Now +// behaves as an ordinary tick rather than special-casing. +func TestHybridLogicalClockNoFloor(t *testing.T) { + now := uint64(time.Now().UnixNano()) + hlc := &HybridLogicalClock{clock: func() uint64 { return now }} + require.Equal(t, now&^hlcLogicalMask, hlc.Now(0)) +} + +// TestHybridLogicalClockConcurrent asserts thread-safety: concurrent callers never receive duplicate +// values (strict monotonicity implies uniqueness). +func TestHybridLogicalClockConcurrent(t *testing.T) { + hlc := NewHybridLogicalClock() + + const goroutines = 16 + const perGoroutine = 500 + + var wg sync.WaitGroup + results := make([][]uint64, goroutines) + for g := 0; g < goroutines; g++ { + wg.Add(1) + go func(g int) { + defer wg.Done() + values := make([]uint64, perGoroutine) + for i := range values { + values[i] = hlc.Now(0) + } + results[g] = values + }(g) + } + wg.Wait() + + seen := make(map[uint64]struct{}, goroutines*perGoroutine) + for _, values := range results { + for _, v := range values { + _, dup := seen[v] + assert.False(t, dup, "duplicate value %d returned to concurrent callers", v) + seen[v] = struct{}{} + } + } + require.Len(t, seen, goroutines*perGoroutine) +} diff --git a/base/stats.go b/base/stats.go index f9f186136d..8be56d7843 100644 --- a/base/stats.go +++ b/base/stats.go @@ -637,6 +637,8 @@ type DatabaseStats struct { DocWritesXattrBytes *SgwIntStat `json:"doc_writes_xattr_bytes"` // Highest sequence number seen on the caching DCP feed. HighSeqFeed *SgwUint64Stat `json:"high_seq_feed"` + // The total number of document writes where the Sync Gateway-generated HLV version exceeded the document CAS and required a corrective re-stamp. A non-zero value indicates clock skew between Sync Gateway and Couchbase Server. + HLVVersionCASRetryCount *SgwIntStat `json:"hlv_version_cas_retry_count"` // The number of attachments compacted NumAttachmentsCompacted *SgwIntStat `json:"num_attachments_compacted"` // The total number of documents read via Couchbase Lite 2.x replication since Sync Gateway node startup. @@ -1865,6 +1867,10 @@ func (d *DbStats) initDatabaseStats() error { if err != nil { return err } + resUtil.HLVVersionCASRetryCount, err = NewIntStat(SubsystemDatabaseKey, "hlv_version_cas_retry_count", StatUnitNoUnits, HLVVersionCASRetryCountDesc, StatAddedVersion4dot1dot0, StatDeprecatedVersionNotDeprecated, StatStabilityInternal, labelKeys, labelVals, prometheus.CounterValue, 0) + if err != nil { + return err + } resUtil.PublicRestBytesWritten, err = NewIntStat(SubsystemDatabaseKey, "http_bytes_written", StatUnitBytes, PublicRestBytesWrittenDesc, StatAddedVersion3dot2dot0, StatDeprecatedVersionNotDeprecated, StatStabilityVolatile, labelKeys, labelVals, prometheus.CounterValue, 0) if err != nil { return err @@ -2044,6 +2050,7 @@ func (d *DbStats) unregisterDatabaseStats() { prometheus.Unregister(d.DatabaseStats.DocWritesBytes) prometheus.Unregister(d.DatabaseStats.DocWritesXattrBytes) prometheus.Unregister(d.DatabaseStats.HighSeqFeed) + prometheus.Unregister(d.DatabaseStats.HLVVersionCASRetryCount) prometheus.Unregister(d.DatabaseStats.DocWritesBytesBlip) prometheus.Unregister(d.DatabaseStats.NumAttachmentsCompacted) prometheus.Unregister(d.DatabaseStats.NumDocReadsBlip) diff --git a/base/stats_descriptions.go b/base/stats_descriptions.go index 6d10b43891..189c12c305 100644 --- a/base/stats_descriptions.go +++ b/base/stats_descriptions.go @@ -265,6 +265,8 @@ const ( HighSeqFeedDesc = "Highest sequence number seen on the caching DCP feed." + HLVVersionCASRetryCountDesc = "The total number of document writes where the Sync Gateway-generated HLV version exceeded the document CAS and required a corrective re-stamp. A non-zero value indicates clock skew between Sync Gateway and Couchbase Server." + NumAttachmentsCompactedDesc = "The number of attachments compacted import_feed" ImportFeedDesc = "Contains low level dcp stats: (a). dcp_backfill_expected - the expected number of sequences in backfill (b). dcp_backfill_completed - the number of backfill items processed (c). dcp_rollback_count - the number of DCP rollbacks" diff --git a/db/changes_test.go b/db/changes_test.go index 4a1661982d..a7c82a4324 100644 --- a/db/changes_test.go +++ b/db/changes_test.go @@ -284,7 +284,7 @@ func TestCVPopulationOnChangeEntry(t *testing.T) { defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) collectionID := collection.GetCollectionID() - bucketUUID := db.EncodedSourceID + sourceID := db.EncodedSourceID collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout) @@ -311,8 +311,8 @@ func TestCVPopulationOnChangeEntry(t *testing.T) { docVersion := GetChangeEntryCV(t, changes[0]) assert.Equal(t, doc.ID, changes[0].ID) - assert.Equal(t, bucketUUID, docVersion.SourceID) - assert.Equal(t, doc.Cas, docVersion.Value) + assert.Equal(t, sourceID, docVersion.SourceID) + assert.Equal(t, doc.HLV.Version, docVersion.Value) } func TestDocDeletionFromChannelCoalesced(t *testing.T) { @@ -490,7 +490,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) { defer db.Close(ctx) collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) collectionID := collection.GetCollectionID() - bucketUUID := db.EncodedSourceID + sourceID := db.EncodedSourceID collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout) // Make channel active @@ -512,8 +512,8 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) { // assert that the source and version has been populated with the channel cache entry for the doc assert.Equal(t, "doc1", entries[0].DocID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), entries[0].Version) - assert.Equal(t, bucketUUID, entries[0].SourceID) + require.NotZero(t, entries[0].Version) + assert.Equal(t, sourceID, entries[0].SourceID) assert.Equal(t, doc.HLV.SourceID, entries[0].SourceID) assert.Equal(t, doc.HLV.Version, entries[0].Version) } diff --git a/db/crud.go b/db/crud.go index a23e8f1bc8..5570604443 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1164,7 +1164,7 @@ func (db *DatabaseCollectionWithUser) OnDemandImportForWrite(ctx context.Context } // updateHLV updates the HLV in the sync data appropriately based on what type of document update event we are encountering. mouMatch represents if the _mou.cas == doc.cas -func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document, docUpdateEvent DocUpdateType, mouMatch bool) (*Document, error) { +func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document, docUpdateEvent DocUpdateType, mouMatch bool, generatedVersion uint64) (*Document, error) { hasHLV := d.HLV != nil if d.HLV == nil { @@ -1207,15 +1207,20 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document base.DebugfCtx(ctx, base.KeyVV, "Not updating HLV due to _mou.cas == doc.cas for doc %s, extant HLV %#v", base.UD(d.ID), d.HLV) } case NewVersion, ExistingVersionWithUpdateToHLV: - // add a new entry to the version vector - newVVEntry := Version{} - newVVEntry.SourceID = db.dbCtx.EncodedSourceID - newVVEntry.Value = expandMacroCASValueUint64 + // Add a new entry to the version vector using the HLC value generated by the caller (before running + // the sync function, to maximise the gap to the eventual write CAS - see documentUpdateFunc). The + // value is written as a literal rather than relying on server-side CAS macro expansion. It was + // generated with a floor of the max existing value for our source, so it strictly exceeds any + // existing value for our source, preserving per-source monotonicity. + newVVEntry := Version{ + SourceID: db.dbCtx.EncodedSourceID, + Value: generatedVersion, + } err := d.HLV.AddVersion(newVVEntry) if err != nil { return nil, err } - // update the cvCAS on the SGWrite event too + // cvCAS stays macro-expanded to the committed CAS so the cvCAS==cas import-detection invariant holds. d.HLV.CurrentVersionCAS = expandMacroCASValueUint64 case ExistingVersionLegacyRev: revTreeEncodedCV, err := LegacyRevToRevTreeEncodedVersion(d.GetRevTreeID()) @@ -2259,9 +2264,13 @@ func (db *DatabaseCollectionWithUser) resolveDocMergeHLV(ctx context.Context, lo newHLV := localDoc.HLV.Copy() base.DebugfCtx(ctx, base.KeyVV, "resolving doc %s with merge, local hlv: %v, incoming hlv: %v", base.UD(localDoc.ID), localDoc.HLV, remoteDoc.HLV) + // Generate the merge version value from the database HLC. The floor is the highest existing value for our + // source across both HLVs being merged, so the generated value preserves per-source monotonicity. + sourceID := db.dbCtx.EncodedSourceID + floor := max(localDoc.HLV.maxValueForSource(sourceID), remoteDoc.HLV.maxValueForSource(sourceID)) newCV := Version{ - SourceID: db.dbCtx.EncodedSourceID, - Value: expandMacroCASValueUint64, + SourceID: sourceID, + Value: db.dbCtx.hlc.Now(floor), } err = newHLV.MergeWithIncomingHLV(newCV, remoteDoc.HLV) if err != nil { @@ -2701,7 +2710,6 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( changedAccessPrincipals []string, changedRoleAccessUsers []string, createNewRevIDSkipped bool, - revokedChannelsRequiringExpansion []string, err error) { err = validateExistingDoc(doc, allowImport, docExists) @@ -2737,6 +2745,21 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( newDocHasAttachments := len(newAttachments) > 0 col.storeOldBodyInRevTreeAndUpdateCurrent(ctx, doc, prevCurrentRev, newRevID, newDoc, newDocHasAttachments) + // For events where we need to generate a new version, generate the HLV current version now - before the + // (potentially slow) sync function runs - to maximise the time between generating the version and the + // write committing to the server, reducing the chance the version exceeds the server-assigned CAS (see + // correctVersionAheadOfCAS). The incoming HLV has already been applied to doc.HLV by the callback above, + // so the floor (max existing value for our source) is correct here. doc.HLV may be nil for a brand-new + // document, in which case there is no existing value and the floor is 0. + var generatedVersion uint64 + if docUpdateEvent == NewVersion || docUpdateEvent == ExistingVersionWithUpdateToHLV { + var versionFloor uint64 + if doc.HLV != nil { + versionFloor = doc.HLV.maxValueForSource(col.dbCtx.EncodedSourceID) + } + generatedVersion = col.dbCtx.hlc.Now(versionFloor) + } + syncExpiry, oldBodyJSON, channelSet, access, roles, err := col.runSyncFn(ctx, doc, mutableBody, metaMap, newRevID) if err != nil { if col.ForceAPIForbiddenErrors() { @@ -2781,10 +2804,9 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( return } - // The callback has updated the HLV for mutations coming from CBL. Update the HLV so that the current version is set before - // we call updateChannels, which needs to set the current version for removals - // update the HLV values - doc, err = col.updateHLV(ctx, doc, docUpdateEvent, mouMatch) + // The callback has updated the HLV for mutations coming from CBL. Set the current version (to the + // pre-generated value for new-version events) before updateChannels, which needs it for removals. + doc, err = col.updateHLV(ctx, doc, docUpdateEvent, mouMatch, generatedVersion) if err != nil { return } @@ -2800,7 +2822,7 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( return } } - _, revokedChannelsRequiringExpansion, err = doc.updateChannels(ctx, channelSet) + _, err = doc.updateChannels(ctx, channelSet) if err != nil { return } @@ -2825,7 +2847,7 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc( doc.ClusterUUID = col.serverUUID() doc.TimeSaved = time.Now() - return updatedExpiry, newRevID, newDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, revokedChannelsRequiringExpansion, err + return updatedExpiry, newRevID, newDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, err } // Function type for the callback passed into updateAndReturnDoc @@ -2897,8 +2919,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do } isNewDocCreation = currentValue == nil - var revokedChannelsRequiringExpansion []string - updatedDoc.Expiry, newRevID, storedDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, revokedChannelsRequiringExpansion, err = db.documentUpdateFunc(ctx, !isNewDocCreation, doc, allowImport, docSequence, unusedSequences, callback, expiry, docUpdateEvent) + updatedDoc.Expiry, newRevID, storedDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, err = db.documentUpdateFunc(ctx, !isNewDocCreation, doc, allowImport, docSequence, unusedSequences, callback, expiry, docUpdateEvent) if err != nil { return } @@ -2917,11 +2938,6 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do // update the mutate in options based on the above logic updatedDoc.Spec = doc.HLV.computeMacroExpansions() - updatedDoc.Spec, err = appendRevocationMacroExpansions(updatedDoc.Spec, revokedChannelsRequiringExpansion) - if err != nil { - return - } - updatedDoc.IsTombstone = currentRevFromHistory.Deleted if doc.MetadataOnlyUpdate != nil { if doc.MetadataOnlyUpdate.HexCAS != "" { @@ -2999,6 +3015,9 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do } // update the doc's HLV defined post macro expansion doc = db.postWriteUpdateHLV(ctx, doc, casOut) + // If the generated version came out ahead of the committed CAS (SG clock ran ahead of the + // server), wait for the clock to catch up and re-stamp the CAS so cv.ver <= cas holds for XDCR (MB-72252). + doc = db.correctVersionAheadOfCAS(ctx, key, doc, casOut) } } @@ -3174,9 +3193,8 @@ func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, do if doc.HLV == nil { return doc } - if doc.HLV.Version == expandMacroCASValueUint64 { - doc.HLV.Version = casOut - } + // Version is generated by the HLC and written as a literal, so only cvCAS is macro-expanded and needs + // resolving to the committed CAS here. if doc.HLV.CurrentVersionCAS == expandMacroCASValueUint64 { doc.HLV.CurrentVersionCAS = casOut } @@ -3208,6 +3226,103 @@ func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, do return doc } +// maxVersionCASCorrectionWait bounds how long correctVersionAheadOfCAS will sleep waiting for the server +// clock to catch up to a generated version. A gap larger than this indicates larger clock skew between +// Sync Gateway and the server, which we log rather than stall a write on. +const maxVersionCASCorrectionWait = time.Second + +// correctVersionAheadOfCAS handles the rare case where the HLC-generated current version is greater than +// the CAS the server assigned to the write (the Sync Gateway clock ran ahead of the server's). goxdcr +// rejects a document whose cv.ver exceeds its CAS, so leaving it would cause this mutation to be +// ignored by XDCR (MB-72252). Rather than move the version backwards, we wait for real time to advance until a fresh +// CAS would reach the generated version, then re-stamp the document's CAS (and cvCAS) without changing the +// version - so cv.ver <= cas holds and the version a peer may already have replicated is preserved. +// +// The wait is bounded by maxVersionCASCorrectionWait; a larger gap is logged and left uncorrected. When a +// re-stamp occurs the document's in-memory Cas and cvCAS are updated. +// +// This only applies when the current version was generated by *this* Sync Gateway (cv source == our +// EncodedSourceID) - covering new writes, HLV updates, and merge resolution, all of which stamp our source +// as cv. A current version originating from another peer (e.g. a client or remote SGW whose clock differs) +// is that peer's responsibility, not ours at this time. +func (db *DatabaseCollectionWithUser) correctVersionAheadOfCAS(ctx context.Context, key string, doc *Document, casOut uint64) *Document { + if doc.HLV == nil || doc.HLV.SourceID != db.dbCtx.EncodedSourceID || doc.HLV.Version <= casOut { + return doc + } + + // This is a post-commit corrective write; don't let request cancellation prevent the re-stamp. + ctx = context.WithoutCancel(ctx) + + gap := base.CASToPhysicalNanos(doc.HLV.Version) - base.CASToPhysicalNanos(casOut) + if gap > uint64(maxVersionCASCorrectionWait.Nanoseconds()) { + base.WarnfCtx(ctx, "Generated version %d for doc %q is ahead of its CAS %d by %s (> %s); leaving uncorrected - this indicates clock skew between Sync Gateway and the server and may cause the document to be skipped by XDCR until a later mutation", + doc.HLV.Version, base.UD(doc.ID), casOut, time.Duration(gap), maxVersionCASCorrectionWait) + return doc + } + + // sleep until the server's CAS would have caught up to the generated version, then re-stamp the CAS on the document so cv.ver <= cas holds for XDCR replication + time.Sleep(time.Duration(gap)) + + cas2, err := db.restampVersionCAS(ctx, key, doc, casOut) + if err != nil { + if base.IsCasMismatch(err) { + // A concurrent writer beat us to it; it's that writer's responsibility to satisfy the invariant. + base.DebugfCtx(ctx, base.KeyVV, "Skipping CAS re-stamp for doc %q due to our generated version %d ahead of CAS %d: concurrent update won ahead of re-stamp", base.UD(doc.ID), doc.HLV.Version, casOut) + return doc + } + base.WarnfCtx(ctx, "Unable to re-stamp CAS for doc %q whose generated version %d was ahead of CAS %d: %v", base.UD(doc.ID), doc.HLV.Version, casOut, err) + return doc + } + + doc.Cas = cas2 + doc.HLV.CurrentVersionCAS = cas2 + doc.SyncData.Cas = base.CasToString(cas2) + db.dbStats().Database().HLVVersionCASRetryCount.Add(1) + + // Verify the re-stamp actually resolved the invariant. It may not when the version was ahead only within + // the logical-counter bits (the physical gap, and so the sleep, was ~0 and the server advanced its CAS by + // only a logical tick). We deliberately do not retry; a later mutation will resolve it, but surface a + // warning so the (likely clock-skew/divergence) condition is visible. + if doc.HLV.Version > cas2 { + base.WarnfCtx(ctx, "Re-stamped CAS %d for doc %q is still below the generated version %d; not retrying - the document may be skipped by XDCR until a later mutation", cas2, base.UD(doc.ID), doc.HLV.Version) + return doc + } + base.InfofCtx(ctx, base.KeyVV, "Re-stamped CAS for doc %q from %d to %d so generated version %d <= CAS", base.UD(doc.ID), casOut, cas2, doc.HLV.Version) + return doc +} + +// restampVersionCAS re-persists the document's _sync and _vv xattrs so the server assigns a fresh CAS, +// leaving the current version (cv.ver) unchanged while macro-expanding _sync.cas and _vv.cvCas to the new +// CAS. It is a metadata-only update guarded on the supplied CAS: the revision, sequence and body are +// untouched, so no new revision is created (the resulting mutation is ignored by the changes feed, which +// is correct as the version is not changing). +func (db *DatabaseCollectionWithUser) restampVersionCAS(ctx context.Context, key string, doc *Document, cas uint64) (uint64, error) { + _, syncXattr, vvXattr, _, _, err := doc.MarshalWithXattrs() + if err != nil { + return 0, err + } + + mou := computeMetadataOnlyUpdate(cas, doc.RevSeqNo, doc.MetadataOnlyUpdate) + rawMouXattr, err := base.JSONMarshal(mou) + if err != nil { + return 0, base.RedactErrorf("failed to marshal _mou when attempting to re-persist %s to correct version and CAS drift. Error: %v", base.UD(doc.ID), err) + } + + opts := &sgbucket.MutateInOptions{ + MacroExpansion: []sgbucket.MacroExpansionSpec{ + sgbucket.NewMacroExpansionSpec(xattrCasPath(base.SyncXattrName), sgbucket.MacroCas), + sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.VvXattrName), sgbucket.MacroCas), + sgbucket.NewMacroExpansionSpec(XattrMouCasPath(), sgbucket.MacroCas), + }, + PreserveExpiry: true, + } + return db.dataStore.UpdateXattrs(ctx, key, 0, cas, map[string][]byte{ + base.SyncXattrName: syncXattr, + base.VvXattrName: vvXattr, + base.MouXattrName: rawMouXattr, + }, opts) +} + // getAttachmentIDsForLeafRevisions returns a map of attachment docids with values of attachment names. func getAttachmentIDsForLeafRevisions(ctx context.Context, db *DatabaseCollectionWithUser, doc *Document, newRevID string) (map[string][]string, error) { leafAttachments := make(map[string][]string) @@ -3998,17 +4113,13 @@ func (doc *Document) addNewerRevisionsToRevTreeHistory(newDoc *Document, current } const ( - xattrMacroCas = "cas" // SyncData.Cas - xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c - xattrMacroCurrentRevVersion = "rev.ver" // SyncData.RevAndVersion.CurrentVersion - versionVectorVrsMacro = "ver" // PersistedHybridLogicalVector.Version - versionVectorCVCASMacro = "cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS + xattrMacroCas = "cas" // SyncData.Cas + xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c + versionVectorVrsMacro = "ver" // PersistedHybridLogicalVector.Version + versionVectorCVCASMacro = "cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS expandMacroCASValueUint64 = math.MaxUint64 // static value that indicates that a CAS macro expansion should be applied to a property expandMacroCASValueString = "expand" - - // cbsSubdocPathMaxLength is the maximum number of bytes allowed in a Couchbase Server subdocument path. - cbsSubdocPathMaxLength = 1024 ) func macroExpandSpec(xattrName string) []sgbucket.MacroExpansionSpec { @@ -4033,10 +4144,6 @@ func XattrMouCasPath() string { return base.MouXattrName + "." + xattrMacroCas } -func xattrCurrentRevVersionPath(xattrKey string) string { - return xattrKey + "." + xattrMacroCurrentRevVersion -} - func xattrCurrentVersionPath(xattrKey string) string { return xattrKey + "." + versionVectorVrsMacro } @@ -4044,21 +4151,3 @@ func xattrCurrentVersionPath(xattrKey string) string { func xattrCurrentVersionCASPath(xattrKey string) string { return xattrKey + "." + versionVectorCVCASMacro } - -func xattrRevokedChannelVersionPath(xattrKey string, channelName string) (string, error) { - path := xattrKey + ".channels." + escapeSubdocPathComponent(channelName) + "." + xattrMacroCurrentRevVersion - if len(path) > cbsSubdocPathMaxLength { - return "", base.RedactErrorf("subdoc path for channel %s exceeds maximum length of %d bytes", base.UD(channelName), cbsSubdocPathMaxLength) - } - return path, nil -} - -// escapeSubdocPathComponent wraps a Couchbase subdocument path component in backticks when it -// contains characters that are special in subdoc paths: dots (path separator) or square brackets -// (array index syntax). Any backticks within the component are doubled to escape them. -func escapeSubdocPathComponent(component string) string { - if !strings.ContainsAny(component, ".[]`") { - return component - } - return "`" + strings.ReplaceAll(component, "`", "``") + "`" -} diff --git a/db/crud_test.go b/db/crud_test.go index 96a6855bfe..8a251ad4f6 100644 --- a/db/crud_test.go +++ b/db/crud_test.go @@ -15,7 +15,6 @@ import ( "encoding/json" "log" "reflect" - "strings" "testing" "time" @@ -1754,7 +1753,7 @@ func TestPutExistingCurrentVersion(t *testing.T) { doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalSync) assert.NoError(t, err) assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + assert.NotZero(t, doc.HLV.Version) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) // store the cas version allocated to the above doc creation for creation of incoming HLV later in test @@ -1845,7 +1844,7 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { db, ctx := setupTestDB(t) defer db.Close(ctx) - bucketUUID := db.EncodedSourceID + sourceID := db.EncodedSourceID collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) // create a new doc @@ -1858,9 +1857,10 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { // assert on the HLV values after the above creation of the doc doc, err := collection.GetDocument(ctx, "doc1", DocUnmarshalSync) require.NoError(t, err) - assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + assert.Equal(t, sourceID, doc.HLV.SourceID) + assert.NotZero(t, doc.HLV.Version) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) + rev1CV := doc.HLV.ExtractCurrentVersionFromHLV() // create a new doc update to simulate a doc update arriving over replicator from, client body = Body{"key1": "value2"} @@ -1883,8 +1883,8 @@ func TestPutExistingCurrentVersionWithConflict(t *testing.T) { // assert persisted doc hlv hasn't been updated doc, err = collection.GetDocument(ctx, "doc1", DocUnmarshalSync) assert.NoError(t, err) - assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + assert.Equal(t, rev1CV.SourceID, doc.HLV.SourceID) + assert.Equal(t, rev1CV.Value, doc.HLV.Version) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) } @@ -2603,146 +2603,3 @@ func TestProposedRev(t *testing.T) { }) } } - -func TestXattrRevokedChannelVersionPath(t *testing.T) { - tests := []struct { - name string - channelName string - expected string - wantErr bool - }{ - { - name: "simple channel name", - channelName: "mychannel", - expected: "_sync.channels.mychannel.rev.ver", - }, - { - name: "channel name with dots", - channelName: "location.com.subscriber.default_location", - expected: "_sync.channels.`location.com.subscriber.default_location`.rev.ver", - }, - { - name: "channel name with dots and backticks", - channelName: "channel.with`backtick", - expected: "_sync.channels.`channel.with``backtick`.rev.ver", - }, - { - name: "channel name with uppercase letters", - channelName: "MyChannel", - expected: "_sync.channels.MyChannel.rev.ver", - }, - { - name: "channel name with uppercase letters and dots", - channelName: "My.Channel", - expected: "_sync.channels.`My.Channel`.rev.ver", - }, - { - name: "channel name with digits", - channelName: "channel123", - expected: "_sync.channels.channel123.rev.ver", - }, - { - name: "channel name with digits and dots", - channelName: "channel.123", - expected: "_sync.channels.`channel.123`.rev.ver", - }, - { - name: "channel name with underscore only", - channelName: "my_channel", - expected: "_sync.channels.my_channel.rev.ver", - }, - { - name: "channel name with equals sign", - channelName: "key=value", - expected: "_sync.channels.key=value.rev.ver", - }, - { - name: "channel name with equals sign and dots", - channelName: "config.key=value", - expected: "_sync.channels.`config.key=value`.rev.ver", - }, - { - name: "channel name with plus sign", - channelName: "a+b", - expected: "_sync.channels.a+b.rev.ver", - }, - { - name: "channel name with plus sign and dots", - channelName: "a.b+c", - expected: "_sync.channels.`a.b+c`.rev.ver", - }, - { - name: "channel name with forward slash", - channelName: "scope/channel", - expected: "_sync.channels.scope/channel.rev.ver", - }, - { - name: "channel name with forward slash and dots", - channelName: "scope/channel.sub", - expected: "_sync.channels.`scope/channel.sub`.rev.ver", - }, - { - name: "channel name with comma", - channelName: "a,b", - expected: "_sync.channels.a,b.rev.ver", - }, - { - name: "channel name with comma and dots", - channelName: "a.b,c", - expected: "_sync.channels.`a.b,c`.rev.ver", - }, - { - name: "channel name with at sign", - channelName: "user@example", - expected: "_sync.channels.user@example.rev.ver", - }, - { - name: "channel name with at sign and dots", - channelName: "user@example.com", - expected: "_sync.channels.`user@example.com`.rev.ver", - }, - { - name: "channel name with brackets and index", - channelName: "example[10]ChannelName", - expected: "_sync.channels.`example[10]ChannelName`.rev.ver", - }, - { - name: "channel name with brackets at end", - channelName: "exampleChannelName[10]", - expected: "_sync.channels.`exampleChannelName[10]`.rev.ver", - }, - { - name: "channel name with empty brackets", - channelName: "literal[]bracketchannel", - expected: "_sync.channels.`literal[]bracketchannel`.rev.ver", - }, - { - name: "channel name with brackets and dots", - channelName: "example[10].ChannelName", - expected: "_sync.channels.`example[10].ChannelName`.rev.ver", - }, - { - name: "channel name with backtick", - channelName: "foo`bar", - expected: "_sync.channels.`foo``bar`.rev.ver", - }, - { - // Scaffold "_sync.channels." (15) + ".rev.ver" (8) = 23 chars; a name of 1002 chars - // produces a path of 1025 bytes, exceeding the CBS subdoc path limit of 1024. - name: "channel name exceeding subdoc path limit", - channelName: strings.Repeat("a", 1002), - wantErr: true, - }, - } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - result, err := xattrRevokedChannelVersionPath("_sync", tc.channelName) - if tc.wantErr { - require.Error(t, err) - } else { - require.NoError(t, err) - assert.Equal(t, tc.expected, result) - } - }) - } -} diff --git a/db/database.go b/db/database.go index 6e4aa3b50c..52db381a66 100644 --- a/db/database.go +++ b/db/database.go @@ -106,21 +106,22 @@ const metadataMigrationArmRetryInterval = 10 * time.Second // Basic description of a database. Shared between all Database objects on the same database. // This object is thread-safe so it can be shared between HTTP handlers. type DatabaseContext struct { - Name string // Database name - UUID string // UUID for this database instance. Used by cbgt and sgr - MetadataStore base.DataStore // Storage for database metadata (anything that isn't an end-user's/customer's documents) - Bucket base.Bucket // Storage - bucketUsername string // name of the connecting user for audit logging - BucketUUID string // The bucket UUID for the bucket the database is created against - EncodedSourceID string // The md5 hash of bucket UUID + cluster UUID for the bucket/cluster the database is created against but encoded in base64 - BucketLock sync.RWMutex // Control Access to the underlying bucket object - mutationListener *changeListener // Caching feed listener - ImportListener *importListener // Import feed listener - sequences *sequenceAllocator // Source of new sequence numbers - StartTime time.Time // Timestamp when context was instantiated - RevsLimit uint32 // Max depth a document's revision tree can grow to - autoImport bool // Add sync data to new untracked couchbase server docs? (Xattr mode specific) - revisionCache RevisionCache // Cache of recently-accessed doc revisions + Name string // Database name + UUID string // UUID for this database instance. Used by cbgt and sgr + MetadataStore base.DataStore // Storage for database metadata (anything that isn't an end-user's/customer's documents) + Bucket base.Bucket // Storage + bucketUsername string // name of the connecting user for audit logging + BucketUUID string // The bucket UUID for the bucket the database is created against + EncodedSourceID string // The md5 hash of bucket UUID + cluster UUID for the bucket/cluster the database is created against but encoded in base64 + hlc *base.HybridLogicalClock // Generates HLV current-version values for writes under EncodedSourceID + BucketLock sync.RWMutex // Control Access to the underlying bucket object + mutationListener *changeListener // Caching feed listener + ImportListener *importListener // Import feed listener + sequences *sequenceAllocator // Source of new sequence numbers + StartTime time.Time // Timestamp when context was instantiated + RevsLimit uint32 // Max depth a document's revision tree can grow to + autoImport bool // Add sync data to new untracked couchbase server docs? (Xattr mode specific) + revisionCache RevisionCache // Cache of recently-accessed doc revisions channelCache ChannelCache changeCache changeCache // Cache of recently-access channels EventMgr *EventManager // Manages notification events @@ -453,6 +454,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, bucketUsername: bucketUsername, BucketUUID: bucketUUID, EncodedSourceID: sourceID, + hlc: base.NewHybridLogicalClock(), StartTime: time.Now(), autoImport: autoImport, Options: options, @@ -1892,7 +1894,7 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d forceUpdate = true } - changedChannels, _, err := doc.updateChannels(ctx, channels) + changedChannels, err := doc.updateChannels(ctx, channels) changed = len(doc.Access.updateAccess(ctx, doc, access)) + len(doc.RoleAccess.updateAccess(ctx, doc, roles)) + len(changedChannels) diff --git a/db/database_test.go b/db/database_test.go index 10c6043a8e..4835dd8f1a 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -4165,7 +4165,7 @@ func Test_resyncDocument(t *testing.T) { require.NotNil(t, postResyncDoc.HLV) require.Equal(t, Version{ SourceID: db.EncodedSourceID, - Value: preResyncDoc.Cas, + Value: preResyncDoc.HLV.Version, }, Version{ SourceID: postResyncDoc.HLV.SourceID, Value: postResyncDoc.HLV.Version, diff --git a/db/document.go b/db/document.go index 5fc24cf671..608b9b0ccd 100644 --- a/db/document.go +++ b/db/document.go @@ -1075,7 +1075,7 @@ func (doc *Document) addToChannelSetHistory(channelName string, historyEntry Cha // Updates the Channels property of a document object with current & past channels. // Returns the set of channels that have changed (document joined or left in this revision) -func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) (changedChannels base.Set, revokedChannelsRequiringExpansion []string, err error) { +func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) (changedChannels base.Set, err error) { var changed []string oldChannels := doc.Channels if oldChannels == nil { @@ -1093,10 +1093,6 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) ( Deleted: doc.hasFlag(channels.Deleted)} doc.updateChannelHistory(channel, curSequence, false) changed = append(changed, channel) - // If the current version requires macro expansion, new removal in channel map will also require macro expansion - if doc.HLV != nil && doc.HLV.Version == expandMacroCASValueUint64 { - revokedChannelsRequiringExpansion = append(revokedChannelsRequiringExpansion, channel) - } } } } diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index e10f5a9a8f..1d6554e452 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -471,16 +471,31 @@ func (hlv *HybridLogicalVector) GetValue(sourceID string) (uint64, bool) { return 0, false } +// maxValueForSource returns the highest version value recorded for sourceID in the HLV, or 0 if the source +// is not present. It is used as the floor when generating a new version value for sourceID: the generated +// value must strictly exceed any existing value for that source so the HLV never moves backwards for a +// given source. +// +// The maps a source can appear in are constrained: +// - A source is never in both the current and previous versions, so the cv case need not consult PV. +// - A source is never in both the previous and merge versions, so the PV case is authoritative on its own. +// - The current and merge versions *can* share a source, so the cv case folds in the merge version. +func (hlv *HybridLogicalVector) maxValueForSource(sourceID string) uint64 { + if sourceID == "" { + return 0 + } + if sourceID == hlv.SourceID { + return max(hlv.Version, hlv.MergeVersions[sourceID]) + } + if pv, ok := hlv.PreviousVersions[sourceID]; ok { + return pv + } + return hlv.MergeVersions[sourceID] // 0 if absent +} + // computeMacroExpansions returns the mutate in spec needed for the document update based off the outcome in updateHLV func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansionSpec { var outputSpec []sgbucket.MacroExpansionSpec - if hlv.Version == expandMacroCASValueUint64 { - spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.VvXattrName), sgbucket.MacroCas) - outputSpec = append(outputSpec, spec) - // If version is being expanded, we need to also specify the macro expansion for the expanded rev property - currentRevSpec := sgbucket.NewMacroExpansionSpec(xattrCurrentRevVersionPath(base.SyncXattrName), sgbucket.MacroCas) - outputSpec = append(outputSpec, currentRevSpec) - } if hlv.CurrentVersionCAS == expandMacroCASValueUint64 { spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.VvXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, spec) @@ -575,19 +590,6 @@ func (hlv *HybridLogicalVector) toHistoryForHLV(sortFunc func(HLVVersions) iter. return s.String() } -// appendRevocationMacroExpansions adds macro expansions for the channel map. Not strictly an HLV operation -// but putting the function here as it's required when the HLV's current version is being macro expanded -func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, channelNames []string) ([]sgbucket.MacroExpansionSpec, error) { - for _, channelName := range channelNames { - path, err := xattrRevokedChannelVersionPath(base.SyncXattrName, channelName) - if err != nil { - return nil, err - } - currentSpec = append(currentSpec, sgbucket.NewMacroExpansionSpec(path, sgbucket.MacroCas)) - } - return currentSpec, nil -} - // extractHLVFromBlipMessage extracts the full HLV a string in the format seen over Blip // blip string may be the following formats // 1. cv only: cv diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index 4b1b04479f..acf7cd89cb 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -515,6 +515,46 @@ func TestHLVImport(t *testing.T) { } +// TestHLVVersionAheadOfCASCorrection exercises correctVersionAheadOfCAS by simulating the Sync Gateway +// clock running ahead of the server, so the generated current version exceeds the committed CAS. +func TestHLVVersionAheadOfCASCorrection(t *testing.T) { + db, ctx := setupTestDB(t) + defer db.Close(ctx) + collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db) + + retryCount := db.DbStats.Database().HLVVersionCASRetryCount + + t.Run("within max wait is corrected", func(t *testing.T) { + // SG clock 100ms ahead of the server: the generated version exceeds the CAS, within the wait bound. + offset := uint64(100 * time.Millisecond) + db.hlc.SetClockForTest(func() uint64 { return uint64(time.Now().UnixNano()) + offset }) + + before := retryCount.Value() + _, doc, err := collection.Put(ctx, "skewedWithinBound", Body{"foo": "bar"}) + require.NoError(t, err) + require.NotNil(t, doc.HLV) + // The version is preserved (not moved backwards) and the re-stamped CAS now satisfies cv.ver <= cas. + assert.Equal(t, db.EncodedSourceID, doc.HLV.SourceID) + assert.NotZero(t, doc.HLV.Version) + assert.LessOrEqual(t, doc.HLV.Version, doc.Cas, "version should be corrected to <= CAS") + assert.Equal(t, doc.Cas, doc.HLV.CurrentVersionCAS) + assert.Equal(t, before+1, retryCount.Value(), "corrective re-stamp should increment the stat") + }) + + t.Run("beyond max wait is left uncorrected", func(t *testing.T) { + // SG clock 2s ahead (> maxVersionCASCorrectionWait): correction is skipped with a warning. + offset := uint64(2 * time.Second) + db.hlc.SetClockForTest(func() uint64 { return uint64(time.Now().UnixNano()) + offset }) + + before := retryCount.Value() + _, doc, err := collection.Put(ctx, "skewedBeyondBound", Body{"foo": "bar"}) + require.NoError(t, err) + require.NotNil(t, doc.HLV) + assert.Greater(t, doc.HLV.Version, doc.Cas, "version beyond the wait bound should be left ahead of CAS") + assert.Equal(t, before, retryCount.Value(), "skipped correction should not increment the stat") + }) +} + // TestHLVMapToCBLString: // - Purpose is to test the ability to extract from HLV maps in CBL replication format // - Three test cases, both MV and PV defined, only PV defined and only MV defined @@ -1792,3 +1832,47 @@ func TestFindGenerationFromLegacyRev(t *testing.T) { }) } } + +// TestHLVMaxValueForSource asserts the floor returned for a source across the current, previous and merge +// versions of an HLV, including the absent-source (0) cases that drive new-document version generation. +func TestHLVMaxValueForSource(t *testing.T) { + const cvSource = "cv" + const pvSource = "pv" + const mvSource = "mv" + const sharedSource = "shared" + + hlv := HybridLogicalVector{ + SourceID: cvSource, + Version: 100, + PreviousVersions: HLVVersions{pvSource: 50}, + MergeVersions: HLVVersions{mvSource: 75}, + } + + testCases := []struct { + name string + sourceID string + expected uint64 + }{ + {name: "current version source", sourceID: cvSource, expected: 100}, + {name: "previous version source", sourceID: pvSource, expected: 50}, + {name: "merge version source", sourceID: mvSource, expected: 75}, + {name: "absent source returns 0", sourceID: "missing", expected: 0}, + {name: "empty source returns 0", sourceID: "", expected: 0}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, hlv.maxValueForSource(tc.sourceID)) + }) + } + + // A source can be both the current version and a merge version (see MergeWithIncomingHLV), so the cv + // case must fold in the merge version. (PV+MV and CV+PV overlaps cannot occur, so are not tested here.) + t.Run("merge version folded in when source is also current", func(t *testing.T) { + hlv := HybridLogicalVector{ + SourceID: sharedSource, + Version: 100, + MergeVersions: HLVVersions{sharedSource: 150}, + } + assert.Equal(t, uint64(150), hlv.maxValueForSource(sharedSource)) + }) +} diff --git a/db/revision_cache_test.go b/db/revision_cache_test.go index aa7cbc57ff..ec17ac1d53 100644 --- a/db/revision_cache_test.go +++ b/db/revision_cache_test.go @@ -55,7 +55,7 @@ func (t *testBackingStore) GetDocument(ctx context.Context, docid string, unmars SourceID: "test", Version: 123, } - _, _, err = doc.updateChannels(ctx, base.SetOf("*")) + _, err = doc.updateChannels(ctx, base.SetOf("*")) if err != nil { return nil, err } @@ -2077,7 +2077,7 @@ func TestGetActive(t *testing.T) { expectedCV := Version{ SourceID: db.EncodedSourceID, - Value: doc.Cas, + Value: doc.HLV.Version, } // remove the entry form the rev cache to force the cache to not have the active version in it diff --git a/db/utilities_hlv_testing.go b/db/utilities_hlv_testing.go index 03afe6e9f1..80f421a126 100644 --- a/db/utilities_hlv_testing.go +++ b/db/utilities_hlv_testing.go @@ -35,6 +35,17 @@ type HLVAgent struct { var defaultHelperBody = map[string]any{"version": 1} +// externalPeerMacroExpansions returns the macro-expansion specs a non-SGW HLV-aware peer (e.g. Couchbase +// Server XDCR) applies when writing the given vv xattr: both the current version and cvCAS expand to the +// document CAS at write time. SGW's own writes generate the current version as a literal and so no longer +// macro-expand it (see computeMacroExpansions), so the agent specifies the external-peer behaviour explicitly. +func externalPeerMacroExpansions(vvXattrName string) []sgbucket.MacroExpansionSpec { + return []sgbucket.MacroExpansionSpec{ + sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(vvXattrName), sgbucket.MacroCas), + sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(vvXattrName), sgbucket.MacroCas), + } +} + func NewHLVAgent(t *testing.T, datastore base.DataStore, source string, xattrName string) *HLVAgent { return &HLVAgent{ t: t, @@ -54,7 +65,7 @@ func (h *HLVAgent) InsertWithHLV(ctx context.Context, key string, body Body) (ca vvDataBytes := base.MustJSONMarshal(h.t, hlv) mutateInOpts := &sgbucket.MutateInOptions{ - MacroExpansion: hlv.computeMacroExpansions(), + MacroExpansion: externalPeerMacroExpansions(h.xattrName), } var docBody []byte @@ -81,7 +92,7 @@ func (h *HLVAgent) UpdateWithHLV(ctx context.Context, key string, inputCas uint6 vvXattr, err := hlv.MarshalJSON() require.NoError(h.t, err) mutateInOpts := &sgbucket.MutateInOptions{ - MacroExpansion: hlv.computeMacroExpansions(), + MacroExpansion: externalPeerMacroExpansions(h.xattrName), } docBody := base.MustJSONMarshal(h.t, defaultHelperBody) @@ -386,7 +397,7 @@ func hlvAsBlipString(_ testing.TB, hlv *HybridLogicalVector) string { // AlterHLVForTest will alter the HLV of an existing document in the bucket, setting it to the provided HLV. Used for // testing purposes to set up specific HLV scenarios. -func AlterHLVForTest(t *testing.T, ctx context.Context, dataStore base.DataStore, key string, hlv *HybridLogicalVector, docBody map[string]any) uint64 { +func AlterHLVForTest(t *testing.T, ctx context.Context, dataStore base.DataStore, key string, hlv *HybridLogicalVector, docBody map[string]any) { cas, err := dataStore.Get(ctx, key, nil) require.NoError(t, err) @@ -403,7 +414,10 @@ func AlterHLVForTest(t *testing.T, ctx context.Context, dataStore base.DataStore MacroExpansion: hlv.computeMacroExpansions(), } - cas, err = dataStore.WriteWithXattrs(ctx, key, 0, cas, bodyBytes, xattrData, nil, mutateInOpts) + _, err = dataStore.WriteWithXattrs(ctx, key, 0, cas, bodyBytes, xattrData, nil, mutateInOpts) require.NoError(t, err) - return cas +} + +func (db *DatabaseContext) GetHLCValueForTest(floorValue uint64) uint64 { + return db.hlc.Now(floorValue) } diff --git a/rest/api_test.go b/rest/api_test.go index bd1064a481..8144ebf3ec 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -3024,7 +3024,7 @@ func TestPvDeltaReadAndWrite(t *testing.T) { version2 := rt.UpdateDoc(docID, version1, `{"new": "update!"}`) newDoc, _, err := collection.GetDocWithXattrs(ctx, existingHLVKey, db.DocUnmarshalAll) require.NoError(t, err) - casV2 := newDoc.Cas + hlvVersionV2 := newDoc.HLV.Version encodedSourceV2 := testSource // assert that we have a prev CV drop to pv and a new CV pair, assert pv values are as expected after delta conversions @@ -3049,7 +3049,7 @@ func TestPvDeltaReadAndWrite(t *testing.T) { assert.Equal(t, casV3, bucketDoc.HLV.Version) assert.Len(t, bucketDoc.HLV.PreviousVersions, 2) assert.Equal(t, casV1, bucketDoc.HLV.PreviousVersions[encodedSourceV1]) - assert.Equal(t, casV2, bucketDoc.HLV.PreviousVersions[encodedSourceV2]) + assert.Equal(t, hlvVersionV2, bucketDoc.HLV.PreviousVersions[encodedSourceV2]) } // TestPutDocUpdateVersionVector: @@ -3071,7 +3071,10 @@ func TestPutDocUpdateVersionVector(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + // Version is generated by the SG HLC, so it is independent of (and must not exceed) the document CAS; + // cvCAS still macro-expands to the CAS. (MB-72252) + assert.NotZero(t, doc.HLV.Version) + assert.LessOrEqual(t, doc.HLV.Version, base.HexCasToUint64(doc.SyncData.Cas)) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) // Put a new revision of this doc and assert that the version vector SourceID and Version is updated @@ -3082,7 +3085,10 @@ func TestPutDocUpdateVersionVector(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + // Version is generated by the SG HLC, so it is independent of (and must not exceed) the document CAS; + // cvCAS still macro-expands to the CAS. (MB-72252) + assert.NotZero(t, doc.HLV.Version) + assert.LessOrEqual(t, doc.HLV.Version, base.HexCasToUint64(doc.SyncData.Cas)) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) // Delete doc and assert that the version vector SourceID and Version is updated @@ -3093,7 +3099,10 @@ func TestPutDocUpdateVersionVector(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + // Version is generated by the SG HLC, so it is independent of (and must not exceed) the document CAS; + // cvCAS still macro-expands to the CAS. (MB-72252) + assert.NotZero(t, doc.HLV.Version) + assert.LessOrEqual(t, doc.HLV.Version, base.HexCasToUint64(doc.SyncData.Cas)) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) } @@ -3123,7 +3132,10 @@ func TestHLVOnPutWithImportRejection(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + // Version is generated by the SG HLC, so it is independent of (and must not exceed) the document CAS; + // cvCAS still macro-expands to the CAS. (MB-72252) + assert.NotZero(t, doc.HLV.Version) + assert.LessOrEqual(t, doc.HLV.Version, base.HexCasToUint64(doc.SyncData.Cas)) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) // Put a doc that will be rejected by the import filter on the attempt to perform on demand import for write @@ -3135,7 +3147,10 @@ func TestHLVOnPutWithImportRejection(t *testing.T) { require.NoError(t, err) assert.Equal(t, bucketUUID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + // Version is generated by the SG HLC, so it is independent of (and must not exceed) the document CAS; + // cvCAS still macro-expands to the CAS. (MB-72252) + assert.NotZero(t, doc.HLV.Version) + assert.LessOrEqual(t, doc.HLV.Version, base.HexCasToUint64(doc.SyncData.Cas)) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) } @@ -3311,6 +3326,37 @@ func TestContinuousChangesDoesNotBlockOffline(t *testing.T) { }, 0) } +func TestDocWriteManyChannelRemovals(t *testing.T) { + rt := NewRestTester(t, &RestTesterConfig{ + SyncFn: channels.DocChannelsSyncFunction, + }) + defer rt.Close() + + const docID = "doc1" + doc := rt.CreateDocNoHLV(docID, db.Body{"channels": []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T"}}) + + // update doc to have only one channel + updateVersion := rt.UpdateDoc(docID, doc.ExtractDocVersion(), `{"channels": ["A"]}`) + assert.NotEmpty(t, updateVersion.CV.String()) + + collection, ctx := rt.GetSingleTestDatabaseCollection() + updatedDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalNoHistory) + require.NoError(t, err) + + // assert channel removals are present in sync data + channelRemovals := updatedDoc.SyncData.Channels + require.Len(t, channelRemovals, 20) // 19 removals but should also include non-removed channel "A" but just with no removal info + require.Nil(t, channelRemovals["A"]) // channel "A" should not have removal info since it's not removed + for _, channel := range []string{"B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T"} { + channelRemovalInfo, ok := channelRemovals[channel] + require.True(t, ok) + require.NotNil(t, channelRemovalInfo) + assert.Equal(t, updatedDoc.Sequence, channelRemovalInfo.Seq) + assert.Equal(t, updatedDoc.GetRevTreeID(), channelRemovalInfo.Rev.RevTreeID) + assert.Equal(t, updatedDoc.HLV.GetCurrentVersionString(), channelRemovalInfo.Rev.CV()) + } +} + func TestPublicAllDocsApiStats(t *testing.T) { rt := NewRestTester(t, &RestTesterConfig{ SyncFn: channels.DocChannelsSyncFunction, @@ -3419,7 +3465,10 @@ func TestHLVUpdateOnRevReplicatorPut(t *testing.T) { require.NoError(t, err) assert.Equal(t, rt.GetDatabase().EncodedSourceID, doc.HLV.SourceID) - assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.Version) + // Version is generated by the SG HLC, so it is independent of (and must not exceed) the document CAS; + // cvCAS still macro-expands to the CAS. (MB-72252) + assert.NotZero(t, doc.HLV.Version) + assert.LessOrEqual(t, doc.HLV.Version, base.HexCasToUint64(doc.SyncData.Cas)) assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), doc.HLV.CurrentVersionCAS) } diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 06b09a2f84..6cd27eb97c 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -3536,6 +3536,71 @@ func TestBlipPullConflict(t *testing.T) { }) } +func TestManyChannelsRemovedOnDocUpdate(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache, base.KeySGTest) + + btcRunner := NewBlipTesterClientRunner(t) + btcRunner.Run(func(t *testing.T) { + rt := NewRestTester(t, &RestTesterConfig{ + SyncFn: channels.DocChannelsSyncFunction, + }) + defer rt.Close() + + const alice = "alice" + docID := SafeDocumentName(t, t.Name()) + rt.CreateUser(alice, []string{"*"}) + sgVersion := rt.CreateDocNoHLV(docID, db.Body{"channels": []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T"}}).ExtractDocVersion() + rt.WaitForPendingChanges() + + opts := &BlipTesterClientOpts{ + Username: alice, + } + btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts) + defer btc.Close() + + clientVersion := btcRunner.AddRevTreeRev(btc.id, docID, sgVersion.RevTreeID, EmptyDocVersion(), []byte(`{"channels": ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T"]}`)) + clientVersion = btcRunner.AddRevTreeRev(btc.id, docID, "2-abc", &clientVersion, []byte(`{"channels": ["A"]}`)) + + btcRunner.StartPush(btc.id) + + rt.WaitForVersion(docID, clientVersion) + + collection, ctx := rt.GetSingleTestDatabaseCollection() + doc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalNoHistory) + require.NoError(t, err) + if btc.UseHLV() { + encodedHLV, err := db.LegacyRevToRevTreeEncodedVersion("2-abc") + require.NoError(t, err) + require.Equal(t, db.HybridLogicalVector{ + CurrentVersionCAS: doc.Cas, + Version: encodedHLV.Value, + SourceID: encodedHLV.SourceID, + }, *doc.HLV) + } else { + require.Equal(t, doc.Cas, doc.HLV.CurrentVersionCAS) + require.Equal(t, rt.GetDatabase().EncodedSourceID, doc.HLV.SourceID) + require.Empty(t, doc.HLV.PreviousVersions) + require.Empty(t, doc.HLV.MergeVersions) + // Version is generated by the SG HLC, not the CAS macro. + require.NotZero(t, doc.HLV.Version) + } + + // assert channel removals are present in sync data + channelRemovals := doc.SyncData.Channels + require.Len(t, channelRemovals, 20) // 19 removals but should also include non-removed channel "A" but just with no removal info + require.Nil(t, channelRemovals["A"]) // channel "A" should not have removal info since it's not removed) + for _, channel := range []string{"B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T"} { + channelRemovalInfo, ok := channelRemovals[channel] + require.True(t, ok) + require.NotNil(t, channelRemovalInfo) + assert.Equal(t, doc.Sequence, channelRemovalInfo.Seq) + assert.Equal(t, doc.GetRevTreeID(), channelRemovalInfo.Rev.RevTreeID) + assert.Equal(t, doc.HLV.GetCurrentVersionString(), channelRemovalInfo.Rev.CV()) + } + }) + +} + func TestPushHLVOntoLegacyRev(t *testing.T) { t.Skip("CBG-4909 skipping due to conflict resolution not yet implemented for CBL rev tree") @@ -3940,66 +4005,3 @@ func TestChannelRemovalWithSpecialCharsInName(t *testing.T) { ) }) } - -func TestChannelRemovalWithLongChannelName(t *testing.T) { - if base.UnitTestUrlIsWalrus() { - t.Skip("rosmar doesn't support escaping characters in sub doc keys") - } - base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) - btcRunner := NewBlipTesterClientRunner(t) - - btcRunner.Run(func(t *testing.T) { - // Use the channel name directly (no prefix) so the test controls the exact length. - rtConfig := RestTesterConfig{ - SyncFn: `function (doc, oldDoc) { - access("alice", "test." + doc._id); - if (doc.chan && doc.chan.length > 0) { - channel(doc.chan); - }}`, - } - rt := NewRestTester(t, &rtConfig) - defer rt.Close() - - const username = "alice" - rt.CreateUser(username, nil) - - opts := &BlipTesterClientOpts{Username: username} - client := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts) - defer client.Close() - - btcRunner.StartPush(client.id) - - longChannelName := strings.Repeat("a", 1025) - const docID = "long-channel-name-doc" - - // Rev 1: assign the doc to the long channel. No channel is revoked on a new - // document, so no macro expansion path is built and the write succeeds. - v := btcRunner.AddRevTreeRev(client.id, docID, "1-abc", EmptyDocVersion(), []byte(fmt.Sprintf(`{"chan": %q}`, longChannelName))) - rt.WaitForVersion(docID, v) - - // Rev 2: move the doc to a short channel, revoking the long channel. Building - // the subdoc macro expansion path for the revoked channel exceeds the CBS - // 1024-byte limit, so the write must fail. We construct the rev message manually - // so we can inspect the error response directly rather than via WaitForVersion. - revRequest := blip.NewRequest() - revRequest.SetProfile(db.MessageRev) - revRequest.Properties[db.RevMessageID] = docID - revRequest.Properties[db.RevMessageRev] = "2-abc" - revRequest.Properties[db.RevMessageHistory] = v.RevTreeID - revRequest.SetBody([]byte(`{"chan": "shortchannel"}`)) - btcRunner.SingleCollection(client.id).sendPushMsg(revRequest) - - revResp := revRequest.Response() - if btcRunner.SingleCollection(client.id).UseHLV() { - // HLV replication does not use subdoc macro expansion for revoked channels, - // so the write succeeds. - require.NotEqual(t, blip.ErrorType, revResp.Type()) - require.NotContains(t, revResp.Properties, "Error-Code") - } else { - // RevTree replication builds a subdoc macro expansion path for revoked - // channels; a 1025-char channel name exceeds the CBS 1024-byte path limit. - require.Equal(t, blip.ErrorType, revResp.Type()) - require.Equal(t, strconv.Itoa(http.StatusInternalServerError), revResp.Properties["Error-Code"]) - } - }) -} diff --git a/rest/changes_test.go b/rest/changes_test.go index 6108d2658a..b34873415c 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -411,13 +411,13 @@ func TestCVPopulationOnChangesViaAPI(t *testing.T) { changes := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?version_type=cv", "", true) - fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalCAS) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalSync) require.NoError(t, err) entryCV := db.GetChangeEntryCV(t, &changes.Results[0]) assert.Equal(t, "doc1", changes.Results[0].ID) assert.Equal(t, bucketUUID, entryCV.SourceID) - assert.Equal(t, fetchedDoc.Cas, entryCV.Value) + assert.Equal(t, fetchedDoc.HLV.Version, entryCV.Value) } func TestCVPopulationOnDocIDChanges(t *testing.T) { @@ -440,13 +440,13 @@ func TestCVPopulationOnDocIDChanges(t *testing.T) { changes := rt.WaitForChanges(1, fmt.Sprintf(`/{{.keyspace}}/_changes?version_type=cv&filter=_doc_ids&doc_ids=%s`, DocID), "", true) - fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalCAS) + fetchedDoc, _, err := collection.GetDocWithXattrs(ctx, DocID, db.DocUnmarshalSync) require.NoError(t, err) entryCV := db.GetChangeEntryCV(t, &changes.Results[0]) assert.Equal(t, "doc1", changes.Results[0].ID) assert.Equal(t, bucketUUID, entryCV.SourceID) - assert.Equal(t, fetchedDoc.Cas, entryCV.Value) + assert.Equal(t, fetchedDoc.HLV.Version, entryCV.Value) } // TestChangesVersionType tests the /_changes REST endpoint with different version_type parameters for each possible underlying feed type and HTTP method. diff --git a/rest/replicatortest/replicator_conflict_test.go b/rest/replicatortest/replicator_conflict_test.go index 57128f0814..62b52a5c01 100644 --- a/rest/replicatortest/replicator_conflict_test.go +++ b/rest/replicatortest/replicator_conflict_test.go @@ -445,6 +445,8 @@ func TestActiveReplicatorLocalWinsCases(t *testing.T) { require.NoError(t, err) ctx1 := rt1.Context() ctx2 := rt2.Context() + rt1DbCtx := rt1.GetDatabase() + rt2DbCtx := rt2.GetDatabase() docID := "doc1_" version := rt2.PutDoc(docID, `{"source":"rt2","channels":["alice"]}`) @@ -486,7 +488,7 @@ func TestActiveReplicatorLocalWinsCases(t *testing.T) { // alter remote doc HLV to have MV newHLVForRemote := &db.HybridLogicalVector{ - Version: math.MaxUint64, // will macro expand + Version: rt2DbCtx.GetHLCValueForTest(0), SourceID: version.CV.SourceID, PreviousVersions: testCase.pvForRemote, MergeVersions: testCase.mvForRemote, @@ -495,7 +497,7 @@ func TestActiveReplicatorLocalWinsCases(t *testing.T) { // create local hlv for conflict (simulating an update locally) newHLVForLocal := &db.HybridLogicalVector{ SourceID: rt1.GetDatabase().EncodedSourceID, - Version: math.MaxUint64, // will macro expand + Version: rt1DbCtx.GetHLCValueForTest(0), PreviousVersions: map[string]uint64{ rt1Version.CV.SourceID: rt1Version.CV.Value, // move current cv to pv }, @@ -508,7 +510,7 @@ func TestActiveReplicatorLocalWinsCases(t *testing.T) { "channels": []string{"alice"}, "some": "data", } - remoteCas := db.AlterHLVForTest(t, ctx2, rt2.GetSingleDataStore(), docID, newHLVForRemote, docBody) + db.AlterHLVForTest(t, ctx2, rt2.GetSingleDataStore(), docID, newHLVForRemote, docBody) remoteDocPreConflict, _ := rt2.GetDoc(docID) // ensure doc is imported base.RequireWaitForStat(t, func() int64 { return rt2.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() @@ -518,14 +520,14 @@ func TestActiveReplicatorLocalWinsCases(t *testing.T) { docBody = map[string]any{ "channels": []string{"alice"}, } - localCas := db.AlterHLVForTest(t, ctx2, rt1.GetSingleDataStore(), docID, newHLVForLocal, docBody) + db.AlterHLVForTest(t, ctx2, rt1.GetSingleDataStore(), docID, newHLVForLocal, docBody) localDocPreConflict, localDocPreConflictBody := rt1.GetDoc(docID) // ensure doc is imported base.RequireWaitForStat(t, func() int64 { return rt1.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() }, 1) expectedHLV := &db.HybridLogicalVector{ - Version: localCas, + Version: newHLVForLocal.Version, SourceID: rt1.GetDatabase().EncodedSourceID, PreviousVersions: testCase.expectedPV, } @@ -533,7 +535,7 @@ func TestActiveReplicatorLocalWinsCases(t *testing.T) { expectedHLV.MergeVersions = testCase.mvForLocal } // add current CVs to expected MV - expectedHLV.PreviousVersions[rt2.GetDatabase().EncodedSourceID] = remoteCas + expectedHLV.PreviousVersions[rt2.GetDatabase().EncodedSourceID] = newHLVForRemote.Version rt1.WaitForPendingChanges() rt2.WaitForPendingChanges() @@ -807,7 +809,7 @@ func TestActiveReplicatorRemoteWinsCases(t *testing.T) { "channels": []string{"alice"}, "some": "data", } - remoteCas := db.AlterHLVForTest(t, ctx1, rt2.GetSingleDataStore(), docID, newHLVForRemote, docBody) + db.AlterHLVForTest(t, ctx1, rt2.GetSingleDataStore(), docID, newHLVForRemote, docBody) remoteDocVersionPreConflict, remoteDocBodyPreConflict := rt2.GetDoc(docID) // ensure doc is imported base.RequireWaitForStat(t, func() int64 { return rt2.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() @@ -817,7 +819,7 @@ func TestActiveReplicatorRemoteWinsCases(t *testing.T) { "source": "rt1", "channels": []string{"alice"}, } - localCas := db.AlterHLVForTest(t, ctx2, rt1.GetSingleDataStore(), docID, newHLVForLocal, docBody) + db.AlterHLVForTest(t, ctx2, rt1.GetSingleDataStore(), docID, newHLVForLocal, docBody) localDocVersionPreConflict, _ := rt1.GetDoc(docID) // ensure doc is imported base.RequireWaitForStat(t, func() int64 { return rt1.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() @@ -825,10 +827,10 @@ func TestActiveReplicatorRemoteWinsCases(t *testing.T) { expectedHLV := &db.HybridLogicalVector{ SourceID: rt2.GetDatabase().EncodedSourceID, - Version: remoteCas, + Version: newHLVForRemote.Version, MergeVersions: testCase.expectedMV, PreviousVersions: map[string]uint64{ - rt1.GetDatabase().EncodedSourceID: localCas, + rt1.GetDatabase().EncodedSourceID: newHLVForLocal.Version, }, } maps.Copy(expectedHLV.PreviousVersions, testCase.expectedPV) @@ -1038,7 +1040,7 @@ func TestActiveReplicatorHLVConflictNoCommonMVPV(t *testing.T) { "source": "rt2", "channels": []string{"alice"}, } - remoteCas := db.AlterHLVForTest(t, ctx1, rt2.GetSingleDataStore(), docID, newHLVForRemote, docBody) + db.AlterHLVForTest(t, ctx1, rt2.GetSingleDataStore(), docID, newHLVForRemote, docBody) remoteDocVersionPreConflict, remoteDocBodyPreConflict := rt2.GetDoc(docID) // ensure doc is imported base.RequireWaitForStat(t, func() int64 { return rt2.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() @@ -1048,7 +1050,7 @@ func TestActiveReplicatorHLVConflictNoCommonMVPV(t *testing.T) { docBody = map[string]any{ "channels": []string{"alice"}, } - localCas := db.AlterHLVForTest(t, ctx2, rt1.GetSingleDataStore(), docID, newHLVForLocal, docBody) + db.AlterHLVForTest(t, ctx2, rt1.GetSingleDataStore(), docID, newHLVForLocal, docBody) localDocVersionPreConflict, localDocBodyPreConflict := rt1.GetDoc(docID) // ensure doc is imported base.RequireWaitForStat(t, func() int64 { return rt1.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() @@ -1078,10 +1080,10 @@ func TestActiveReplicatorHLVConflictNoCommonMVPV(t *testing.T) { expectedHLV = db.HybridLogicalVector{ CurrentVersionCAS: rt1Doc.Cas, SourceID: rt2.GetDatabase().EncodedSourceID, - Version: remoteCas, + Version: newHLVForRemote.Version, MergeVersions: testCase.expectedMV, PreviousVersions: map[string]uint64{ - rt1.GetDatabase().EncodedSourceID: localCas, + rt1.GetDatabase().EncodedSourceID: newHLVForLocal.Version, }, } expectedWinnerPreConflict = remoteDocBodyPreConflict @@ -1089,10 +1091,10 @@ func TestActiveReplicatorHLVConflictNoCommonMVPV(t *testing.T) { expectedHLV = db.HybridLogicalVector{ CurrentVersionCAS: rt1Doc.Cas, SourceID: rt1.GetDatabase().EncodedSourceID, - Version: localCas, + Version: newHLVForLocal.Version, MergeVersions: testCase.expectedMV, PreviousVersions: db.HLVVersions{ - rt2.GetDatabase().EncodedSourceID: remoteCas, + rt2.GetDatabase().EncodedSourceID: newHLVForRemote.Version, }, } expectedWinnerPreConflict = localDocBodyPreConflict @@ -1572,7 +1574,7 @@ func TestActiveReplicatorHLVConflictCustom(t *testing.T) { expectedDocPushResolved: true, hlvResult: db.HybridLogicalVector{ SourceID: "NEED TO EXPAND IN TEST", - Version: math.MaxInt64, // # this needs to get expanded in test code to doc.Cas before comparison + Version: math.MaxInt64, // placeholder; overwritten in test code with the generated cv value before comparison MergeVersions: db.HLVVersions{ passiveStartingCV.SourceID: passiveStartingCV.Value, activeStartingCV.SourceID: activeStartingCV.Value, @@ -1667,7 +1669,9 @@ func TestActiveReplicatorHLVConflictCustom(t *testing.T) { expectedHLV.CurrentVersionCAS = resolvedDoc.Cas if testCase.newCVGenerated { expectedHLV.SourceID = rt1.GetDatabase().EncodedSourceID - expectedHLV.Version = resolvedDoc.Cas + // The merge current version is generated by the SG HLC rather than the CAS macro. + require.NotZero(t, resolvedDoc.HLV.Version) + expectedHLV.Version = resolvedDoc.HLV.Version } require.Equal(t, expectedHLV, *resolvedDoc.HLV) diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index d2f17977a2..f13b26ef8a 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -2787,7 +2787,8 @@ func TestActiveReplicatorPullMergeConflictingAttachments(t *testing.T) { require.True(t, ok) assert.Equal(t, rt2DocConflictVersion.CV.Value, val2) assert.Equal(t, rt1.GetDatabase().EncodedSourceID, doc.HLV.SourceID) - assert.Equal(t, doc.Cas, doc.HLV.Version) + assert.NotZero(t, doc.HLV.Version) + assert.NotEqual(t, doc.HLV.Version, doc.HLV.MergeVersions[rt1.GetDatabase().EncodedSourceID]) // we should have new CV generated from last conflicting version for this source } assert.Nil(t, doc.Body(ctx)[db.BodyAttachments], "_attachments property should not be in resolved doc body") @@ -4005,17 +4006,18 @@ func TestActiveReplicatorPullConflict(t *testing.T) { rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() doc, err := rt1collection.GetDocument(rt1ctx, docID, db.DocUnmarshalAll) require.NoError(t, err) - var expValue uint64 if sgrRunner.IsV4Protocol() { if test.localWinsHLV { - expValue = rt1CVVersion.CV.Value - } else { - expValue = doc.Cas - } - if test.newCVGenerated || test.localWinsHLV { test.expectedLocalVersion.CV = db.Version{ SourceID: rt1.GetDatabase().EncodedSourceID, - Value: expValue, + Value: rt1CVVersion.CV.Value, + } + } else if test.newCVGenerated { + require.NotZero(t, doc.HLV.Version) + require.Equal(t, rt1.GetDatabase().EncodedSourceID, doc.HLV.SourceID) + test.expectedLocalVersion.CV = db.Version{ + SourceID: rt1.GetDatabase().EncodedSourceID, + Value: doc.HLV.Version, } } else { test.expectedLocalVersion.CV = rt2Version.CV @@ -4233,16 +4235,14 @@ func TestActiveReplicatorPushAndPullConflict(t *testing.T) { require.NoError(t, err) switch test.winner { case merge: - test.expectedVersion.CV = db.Version{ - SourceID: rt1.GetDatabase().EncodedSourceID, - Value: doc.Cas, - } + require.NotZero(t, doc.HLV.Version) + require.Equal(t, rt1.GetDatabase().EncodedSourceID, doc.HLV.SourceID) + test.expectedVersion.CV = db.Version{SourceID: doc.HLV.SourceID, Value: doc.HLV.Version} case local: test.expectedVersion.CV = rt1Version.CV case remote: test.expectedVersion.CV = rt2Version.CV } - sgrRunner.WaitForVersion(docID, rt1, test.expectedVersion) requireBodyEqual(t, test.expectedBody, doc) t.Logf("Doc %s is %+v", docID, doc) t.Logf("Doc %s attachments are %+v", docID, doc.Attachments()) @@ -5774,11 +5774,8 @@ func TestActiveReplicatorPullConflictReadWriteIntlProps(t *testing.T) { rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() doc, err := rt1collection.GetDocument(rt1ctx, docID, db.DocUnmarshalAll) require.NoError(t, err) - test.expectedLocalVersion.CV = db.Version{ - SourceID: rt1.GetDatabase().EncodedSourceID, - Value: doc.Cas, - } - sgrRunner.WaitForVersion(docID, rt1, test.expectedLocalVersion) + assert.NotZero(t, doc.HLV.Version) + assert.Equal(t, rt1.GetDatabase().EncodedSourceID, doc.HLV.SourceID) t.Logf("doc.Body(): %v", doc.Body(ctx)) assert.Equal(t, test.expectedLocalBody, doc.Body(ctx)) t.Logf("Doc %s is %+v", docID, doc) @@ -6647,7 +6644,7 @@ func TestReplicatorConflictAttachment(t *testing.T) { require.NoError(t, err) expVersion.CV = db.Version{ SourceID: activeRT.GetDatabase().EncodedSourceID, - Value: doc.Cas, + Value: doc.HLV.Version, } } } diff --git a/topologytest/main_test.go b/topologytest/main_test.go index 1676a0f408..cf6373a258 100644 --- a/topologytest/main_test.go +++ b/topologytest/main_test.go @@ -13,6 +13,7 @@ package topologytest import ( "context" "os" + "runtime" "strconv" "testing" @@ -22,6 +23,13 @@ import ( func TestMain(m *testing.M) { ctx := context.Background() // start of test process + // TODO: CBG-5460 Temporarily disabled on Windows. Windows' low wall-clock precision causes peers' HLV versions to + // tie (Sync Gateway and Couchbase Server peers each have their own HLC), making conflict winners + // non-deterministic. + if runtime.GOOS == "windows" { + base.SkipTestMain(m, "Topology tests are temporarily disabled on Windows due to low wall-clock precision causing non-deterministic HLV version ordering") + return + } runTests, _ := strconv.ParseBool(os.Getenv(base.TbpEnvTopologyTests)) if !base.UnitTestUrlIsWalrus() && !runTests { base.SkipTestMain(m, "Tests are disabled for Couchbase Server by default, to enable set %s=true environment variable", base.TbpEnvTopologyTests)