Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions base/hlc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2026-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package base

import (
"sync"
"time"
)

// hlcLogicalBits is the number of low-order bits of a Couchbase Server CAS reserved for the logical
// (counter) component of the Hybrid Logical Clock. The remaining high-order bits hold the physical
// (wall-clock nanosecond) component. Clearing these bits when reading the wall clock keeps generated
// values in the same numeric space as a CAS and leaves room for same-instant logical increments.
const hlcLogicalBits = 16

// hlcLogicalMask masks off the logical component, isolating the physical component of a timestamp.
const hlcLogicalMask = (1 << hlcLogicalBits) - 1

// HybridLogicalClock generates monotonically increasing timestamps in the same numeric space as a
// Couchbase Server CAS (a 48-bit nanosecond physical component plus a 16-bit logical counter). Sync
// Gateway uses it to assign HLV current-version values without relying on server-side CAS macro
// expansion, so the value is known before the write and can be written as a literal into every xattr
// field that would otherwise require a per-field subdoc macro-expansion path.
//
// Generated values are CAS-comparable: under synchronised clocks a value generated just before a write
// is below the CAS the server stamps at commit. This is an invariant goxdcr enforces (cv.ver <= cas) until MB-72252.
// Monotonicity per HLV source is the caller's responsibility, supplied via the floor argument to Now.
type HybridLogicalClock struct {
clock func() uint64 // wall-clock source, nanoseconds since the Unix epoch; overridable in tests
highestTime uint64
mutex sync.Mutex
}

// NewHybridLogicalClock returns a HybridLogicalClock backed by the system wall clock.
func NewHybridLogicalClock() *HybridLogicalClock {
return &HybridLogicalClock{clock: func() uint64 { return uint64(time.Now().UnixNano()) }}
}

// SetClockForTest overrides the wall-clock source and resets the clock's high-water mark, so the next
// value returned by Now is determined solely by getTime. Test-only: used to simulate clock skew between
// Sync Gateway and the server.
func (c *HybridLogicalClock) SetClockForTest(getTime func() uint64) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.clock = getTime
c.highestTime = 0
}

// Now returns the next timestamp, guaranteed to be strictly greater than both the previous value
// returned by this clock (monotonic across calls) and the supplied floor, while tracking the wall clock
// where possible. The result is max(physical, highestTime+1, floor+1) where physical is the wall-clock
// time with its logical bits cleared.
//
// floor is the highest existing HLV version value for the caller's source; passing 0 (a brand-new
// document, or a source not yet present in the HLV) imposes no lower bound beyond monotonicity, so Now(0)
// is an ordinary clock tick.
func (c *HybridLogicalClock) Now(floor uint64) uint64 {
c.mutex.Lock()
defer c.mutex.Unlock()

next := c.clock() &^ hlcLogicalMask // physical component, tracks the wall clock
if c.highestTime+1 > next { // ensure strictly greater than the previous value
next = c.highestTime + 1
}
if floor+1 > next { // ensure strictly greater than the caller's floor
next = floor + 1
}
c.highestTime = next
return next
}

// CASToPhysicalNanos returns the physical (wall-clock nanosecond) component of a CAS / HLC value, with the
// logical counter bits cleared. The difference between two such values approximates the elapsed wall-clock
// time between them, which is used to decide how long to wait for a lagging clock to catch up.
func CASToPhysicalNanos(cas uint64) uint64 {
return cas &^ hlcLogicalMask
}
130 changes: 130 additions & 0 deletions base/hlc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2026-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.

package base

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// hourNanos is an offset used to place test floors a wall-clock hour either side of "now".
const hourNanos = uint64(time.Hour)

// TestHybridLogicalClockMonotonic asserts successive values from the system-backed clock strictly increase.
func TestHybridLogicalClockMonotonic(t *testing.T) {
hlc := NewHybridLogicalClock()
prev := hlc.Now(0)
for i := 0; i < 1000; i++ {
next := hlc.Now(0)
require.Greater(t, next, prev, "value at iteration %d did not strictly increase", i)
prev = next
}
}

// TestHybridLogicalClockTracksWallClock asserts Now(0) reflects the wall clock (physical component) with
// its logical bits cleared, rather than drifting off into logical-counter space from a cold start.
func TestHybridLogicalClockTracksWallClock(t *testing.T) {
hlc := NewHybridLogicalClock()
before := uint64(time.Now().UnixNano()) &^ hlcLogicalMask
got := hlc.Now(0)
after := uint64(time.Now().UnixNano())
require.GreaterOrEqual(t, got, before)
require.LessOrEqual(t, got, after)
}

// TestHybridLogicalClockSameInstant asserts that when the wall clock does not advance, successive values
// still strictly increase via the logical counter.
func TestHybridLogicalClockSameInstant(t *testing.T) {
now := uint64(time.Now().UnixNano())
hlc := &HybridLogicalClock{clock: func() uint64 { return now }}

first := hlc.Now(0)
require.Equal(t, now&^hlcLogicalMask, first)
require.Equal(t, first+1, hlc.Now(0))
require.Equal(t, first+2, hlc.Now(0))
}

// TestHybridLogicalClockPhysicalMask asserts the logical bits of the wall clock are cleared so generated
// values stay in the CAS numeric space.
func TestHybridLogicalClockPhysicalMask(t *testing.T) {
// OR in low bits so masking is observable regardless of the current nanosecond.
now := uint64(time.Now().UnixNano()) | hlcLogicalMask
hlc := &HybridLogicalClock{clock: func() uint64 { return now }}

got := hlc.Now(0)
// Isolates only bits 15–0 of the result. Requires them to be all zero — proving Now() stripped them via &^ hlcLogicalMask.
require.Zero(t, got&hlcLogicalMask, "low %d (logical) bits should be cleared", hlcLogicalBits)
// Both sides clear the low 16 bits of now
require.Equal(t, now&^hlcLogicalMask, got)
}

// TestHybridLogicalClockFloor asserts Now never returns a value <= floor.
func TestHybridLogicalClockFloor(t *testing.T) {
t.Run("floor below physical is ignored", func(t *testing.T) {
now := uint64(time.Now().UnixNano())
hlc := &HybridLogicalClock{clock: func() uint64 { return now }}
got := hlc.Now(now - hourNanos) // a floor an hour in the past
require.Equal(t, now&^hlcLogicalMask, got)
})

t.Run("floor above physical wins", func(t *testing.T) {
now := uint64(time.Now().UnixNano())
hlc := &HybridLogicalClock{clock: func() uint64 { return now }}
floor := now + hourNanos // a floor an hour in the future
got := hlc.Now(floor)
require.Equal(t, floor+1, got)
require.Greater(t, got, floor)
})
}

// TestHybridLogicalClockNoFloor covers the brand-new-document / absent-source case where floor is 0: Now
// behaves as an ordinary tick rather than special-casing.
func TestHybridLogicalClockNoFloor(t *testing.T) {
now := uint64(time.Now().UnixNano())
hlc := &HybridLogicalClock{clock: func() uint64 { return now }}
require.Equal(t, now&^hlcLogicalMask, hlc.Now(0))
}

// TestHybridLogicalClockConcurrent asserts thread-safety: concurrent callers never receive duplicate
// values (strict monotonicity implies uniqueness).
func TestHybridLogicalClockConcurrent(t *testing.T) {
hlc := NewHybridLogicalClock()

const goroutines = 16
const perGoroutine = 500

var wg sync.WaitGroup
results := make([][]uint64, goroutines)
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func(g int) {
defer wg.Done()
values := make([]uint64, perGoroutine)
for i := range values {
values[i] = hlc.Now(0)
}
results[g] = values
}(g)
}
wg.Wait()

seen := make(map[uint64]struct{}, goroutines*perGoroutine)
for _, values := range results {
for _, v := range values {
_, dup := seen[v]
assert.False(t, dup, "duplicate value %d returned to concurrent callers", v)
seen[v] = struct{}{}
}
}
require.Len(t, seen, goroutines*perGoroutine)
}
7 changes: 7 additions & 0 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ type DatabaseStats struct {
DocWritesXattrBytes *SgwIntStat `json:"doc_writes_xattr_bytes"`
// Highest sequence number seen on the caching DCP feed.
HighSeqFeed *SgwUint64Stat `json:"high_seq_feed"`
// The total number of document writes where the Sync Gateway-generated HLV version exceeded the document CAS and required a corrective re-stamp. A non-zero value indicates clock skew between Sync Gateway and Couchbase Server.
HLVVersionCASRetryCount *SgwIntStat `json:"hlv_version_cas_retry_count"`
// The number of attachments compacted
NumAttachmentsCompacted *SgwIntStat `json:"num_attachments_compacted"`
// The total number of documents read via Couchbase Lite 2.x replication since Sync Gateway node startup.
Expand Down Expand Up @@ -1865,6 +1867,10 @@ func (d *DbStats) initDatabaseStats() error {
if err != nil {
return err
}
resUtil.HLVVersionCASRetryCount, err = NewIntStat(SubsystemDatabaseKey, "hlv_version_cas_retry_count", StatUnitNoUnits, HLVVersionCASRetryCountDesc, StatAddedVersion4dot1dot0, StatDeprecatedVersionNotDeprecated, StatStabilityInternal, labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
}
resUtil.PublicRestBytesWritten, err = NewIntStat(SubsystemDatabaseKey, "http_bytes_written", StatUnitBytes, PublicRestBytesWrittenDesc, StatAddedVersion3dot2dot0, StatDeprecatedVersionNotDeprecated, StatStabilityVolatile, labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
Expand Down Expand Up @@ -2044,6 +2050,7 @@ func (d *DbStats) unregisterDatabaseStats() {
prometheus.Unregister(d.DatabaseStats.DocWritesBytes)
prometheus.Unregister(d.DatabaseStats.DocWritesXattrBytes)
prometheus.Unregister(d.DatabaseStats.HighSeqFeed)
prometheus.Unregister(d.DatabaseStats.HLVVersionCASRetryCount)
prometheus.Unregister(d.DatabaseStats.DocWritesBytesBlip)
prometheus.Unregister(d.DatabaseStats.NumAttachmentsCompacted)
prometheus.Unregister(d.DatabaseStats.NumDocReadsBlip)
Expand Down
2 changes: 2 additions & 0 deletions base/stats_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ const (

HighSeqFeedDesc = "Highest sequence number seen on the caching DCP feed."

HLVVersionCASRetryCountDesc = "The total number of document writes where the Sync Gateway-generated HLV version exceeded the document CAS and required a corrective re-stamp. A non-zero value indicates clock skew between Sync Gateway and Couchbase Server."

NumAttachmentsCompactedDesc = "The number of attachments compacted import_feed"

ImportFeedDesc = "Contains low level dcp stats: (a). dcp_backfill_expected - the expected number of sequences in backfill (b). dcp_backfill_completed - the number of backfill items processed (c). dcp_rollback_count - the number of DCP rollbacks"
Expand Down
12 changes: 6 additions & 6 deletions db/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestCVPopulationOnChangeEntry(t *testing.T) {
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.EncodedSourceID
sourceID := db.EncodedSourceID

collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

Expand All @@ -311,8 +311,8 @@ func TestCVPopulationOnChangeEntry(t *testing.T) {

docVersion := GetChangeEntryCV(t, changes[0])
assert.Equal(t, doc.ID, changes[0].ID)
assert.Equal(t, bucketUUID, docVersion.SourceID)
assert.Equal(t, doc.Cas, docVersion.Value)
assert.Equal(t, sourceID, docVersion.SourceID)
assert.Equal(t, doc.HLV.Version, docVersion.Value)
}

func TestDocDeletionFromChannelCoalesced(t *testing.T) {
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.EncodedSourceID
sourceID := db.EncodedSourceID
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Make channel active
Expand All @@ -512,8 +512,8 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {

// assert that the source and version has been populated with the channel cache entry for the doc
assert.Equal(t, "doc1", entries[0].DocID)
assert.Equal(t, base.HexCasToUint64(doc.SyncData.Cas), entries[0].Version)
assert.Equal(t, bucketUUID, entries[0].SourceID)
require.NotZero(t, entries[0].Version)
assert.Equal(t, sourceID, entries[0].SourceID)
assert.Equal(t, doc.HLV.SourceID, entries[0].SourceID)
assert.Equal(t, doc.HLV.Version, entries[0].Version)
}
Loading
Loading