Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {

// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error)
func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
func (mgtr *Migrator) sleepWhileTrue(stage string, operation func() (bool, error)) error {
for {
// Check for abort before continuing
if err := mgtr.checkAbort(); err != nil {
Expand All @@ -143,6 +143,7 @@ func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
if !shouldSleep {
return nil
}
metrics.RecordSleep(mgtr.migrationContext.Metrics, stage, time.Second)
time.Sleep(time.Second)
}
}
Expand All @@ -166,7 +167,9 @@ func (mgtr *Migrator) retryOperation(operation func() error, notFatalHint ...boo
for i := 0; i < maxRetries; i++ {
if i != 0 {
// sleep after previous iteration
RetrySleepFn(1 * time.Second)
sleepDuration := 1 * time.Second
metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration)
RetrySleepFn(sleepDuration)
}
// Check for abort/context cancellation before each retry
if abortErr := mgtr.checkAbort(); abortErr != nil {
Expand Down Expand Up @@ -207,7 +210,9 @@ func (mgtr *Migrator) retryOperationWithExponentialBackoff(operation func() erro
)

if i != 0 {
RetrySleepFn(time.Duration(interval) * time.Second)
sleepDuration := time.Duration(interval) * time.Second
metrics.RecordSleep(mgtr.migrationContext.Metrics, "retry_backoff", sleepDuration)
RetrySleepFn(sleepDuration)
}
// Check for abort/context cancellation before each retry
if abortErr := mgtr.checkAbort(); abortErr != nil {
Expand Down Expand Up @@ -842,6 +847,7 @@ func (mgtr *Migrator) cutOver() (err error) {
mgtr.migrationContext.MarkPointOfInterest()
mgtr.migrationContext.Log.Debugf("checking for cut-over postpone")
if err := mgtr.sleepWhileTrue(
"cut_over_postpone",
func() (bool, error) {
heartbeatLag := mgtr.migrationContext.TimeSinceLastHeartbeatOnChangelog()
maxLagMillisecondsThrottle := time.Duration(atomic.LoadInt64(&mgtr.migrationContext.MaxLagMillisecondsThrottleThreshold)) * time.Millisecond
Expand Down Expand Up @@ -1761,7 +1767,9 @@ func (mgtr *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error) {
return chk, err
}
mgtr.applier.CurrentCoordinatesMutex.Unlock()
time.Sleep(500 * time.Millisecond)
sleepDuration := 500 * time.Millisecond
metrics.RecordSleep(mgtr.migrationContext.Metrics, "replica_wait", sleepDuration)
time.Sleep(sleepDuration)
}
}

Expand Down Expand Up @@ -1866,7 +1874,12 @@ func (mgtr *Migrator) executeWriteFuncs() error {
copyRowsDuration := time.Since(copyRowsStartTime)
sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds())
sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond
time.Sleep(sleepTime)
if sleepTime > 0 {
if sleepTime >= time.Millisecond {
metrics.RecordSleep(mgtr.migrationContext.Metrics, "chunk_throttle", sleepTime)
}
time.Sleep(sleepTime)
}
}
}
default:
Expand Down
12 changes: 12 additions & 0 deletions go/metrics/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,15 @@ func EmitThrottleInterval(emit Emitter, duration time.Duration, reason string) {
emit.Histogram("throttle.duration_seconds", duration.Seconds(), tags...)
emit.Count("throttle.events_total", 1, tags...)
}

// RecordSleep emits per-stage sleep/wait metrics (namespace is applied by the client):
// gh_ost.sleep.duration_milliseconds and gh_ost.sleep.total_milliseconds, both tagged by stage.
func RecordSleep(emit Emitter, stage string, d time.Duration) {
if emit == nil || stage == "" || d < 0 {
return
}
tags := []string{"stage:" + stage}
milliseconds := d.Milliseconds()
emit.Histogram("sleep.duration_milliseconds", float64(milliseconds), tags...)
emit.Count("sleep.total_milliseconds", milliseconds, tags...)
}
68 changes: 68 additions & 0 deletions go/metrics/emit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package metrics
import (
"context"
"runtime"
"slices"
"testing"
"time"
)
Expand Down Expand Up @@ -256,3 +257,70 @@ func TestEmitThrottleIntervalNilSafe(t *testing.T) {
EmitThrottleInterval(nil, time.Second, "test")
EmitThrottleInterval(&gaugeSpy{}, time.Second, "test")
}

type sleepSpy struct {
histogramNames []string
histogramValues []float64
histogramTags [][]string
countNames []string
countValues []int64
countTags [][]string
}

func (s *sleepSpy) Gauge(_ string, _ float64, _ ...string) {}

func (s *sleepSpy) Histogram(name string, value float64, tags ...string) {
s.histogramNames = append(s.histogramNames, name)
s.histogramValues = append(s.histogramValues, value)
s.histogramTags = append(s.histogramTags, tags)
}

func (s *sleepSpy) Count(name string, value int64, tags ...string) {
s.countNames = append(s.countNames, name)
s.countValues = append(s.countValues, value)
s.countTags = append(s.countTags, tags)
}

func TestRecordSleep(t *testing.T) {
spy := &sleepSpy{}

RecordSleep(spy, "retry_backoff", 2*time.Second)

if len(spy.histogramNames) != 1 {
t.Fatalf("got %d histograms, want 1", len(spy.histogramNames))
}
if spy.histogramNames[0] != "sleep.duration_milliseconds" || spy.histogramValues[0] != 2000 {
t.Fatalf("got histogram %s=%v, want sleep.duration_milliseconds=2000", spy.histogramNames[0], spy.histogramValues[0])
}
if !slices.Equal(spy.histogramTags[0], []string{"stage:retry_backoff"}) {
t.Fatalf("got histogram tags %#v", spy.histogramTags[0])
}
if len(spy.countNames) != 1 {
t.Fatalf("got %d counts, want 1", len(spy.countNames))
}
if spy.countNames[0] != "sleep.total_milliseconds" || spy.countValues[0] != 2000 {
t.Fatalf("got count %s=%v, want sleep.total_milliseconds=2000", spy.countNames[0], spy.countValues[0])
}
if !slices.Equal(spy.countTags[0], []string{"stage:retry_backoff"}) {
t.Fatalf("got count tags %#v", spy.countTags[0])
}
}

func TestRecordSleepSubSecond(t *testing.T) {
spy := &sleepSpy{}

RecordSleep(spy, "replica_wait", 500*time.Millisecond)

if spy.histogramNames[0] != "sleep.duration_milliseconds" || spy.histogramValues[0] != 500 {
t.Fatalf("got histogram %s=%v, want sleep.duration_milliseconds=500", spy.histogramNames[0], spy.histogramValues[0])
}
if spy.countNames[0] != "sleep.total_milliseconds" || spy.countValues[0] != 500 {
t.Fatalf("got count %s=%v, want sleep.total_milliseconds=500", spy.countNames[0], spy.countValues[0])
}
}

func TestRecordSleepNilSafe(t *testing.T) {
RecordSleep(nil, "retry_backoff", time.Second)
RecordSleep(&sleepSpy{}, "", time.Second)
RecordSleep(&sleepSpy{}, "retry_backoff", -time.Second)
}
Loading