Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func main() {

maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307. In move-tables mode, these replicas are expected to be in the target cluster. Specified target credentials will be used for the connection.")
Comment thread
danieljoos marked this conversation as resolved.
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
flag.Int64Var(&migrationContext.ThrottleHTTPIntervalMillis, "throttle-http-interval-millis", 100, "Number of milliseconds to wait before triggering another HTTP throttle check")
Expand Down
6 changes: 5 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,8 +1900,12 @@ func (apl *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
}

func (apl *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
targetDB := apl.db
if apl.migrationContext.IsMoveTablesMode() {
targetDB = apl.moveTablesTargetDB
}
Comment thread
danieljoos marked this conversation as resolved.
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
if err := apl.db.QueryRow(query).Scan(&variableName, &result); err != nil {
if err := targetDB.QueryRow(query).Scan(&variableName, &result); err != nil {
return 0, err
}
return result, nil
Expand Down
56 changes: 56 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,62 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueriesNoRows() {
suite.Require().Equal(0, count)
}

func (suite *ApplierTestSuite) TestShowStatusVariable() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

applier := NewApplier(migrationContext)
defer applier.Teardown()

suite.Require().NoError(applier.InitDBConnections())

// Uptime is always present in `SHOW GLOBAL STATUS` and is non-negative.
result, err := applier.ShowStatusVariable("Uptime")
suite.Require().NoError(err)
suite.Require().GreaterOrEqual(result, int64(0))
}

func (suite *ApplierTestSuite) TestShowStatusVariableMoveTablesMode() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.MoveTables.ConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MoveTables.TableNames = []string{testMysqlTableName}
migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther

applier := NewApplier(migrationContext)
defer applier.Teardown()

suite.Require().NoError(applier.InitDBConnections())

// In move-tables mode the status variable must be read from the
// move-tables target DB connection rather than the applier DB.
suite.Require().True(migrationContext.IsMoveTablesMode())
suite.Require().NotNil(applier.moveTablesTargetDB)

result, err := applier.ShowStatusVariable("Uptime")
suite.Require().NoError(err)
suite.Require().GreaterOrEqual(result, int64(0))
}

func TestApplier(t *testing.T) {
if testing.Short() {
t.Skip("skipping applier test suite in short mode")
Expand Down
10 changes: 3 additions & 7 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,17 +1636,13 @@ func (mgtr *Migrator) addDMLEventsListener() error {

// initiateThrottler kicks in the throttling collection and the throttling checks.
func (mgtr *Migrator) initiateThrottler() {
if mgtr.migrationContext.IsMoveTablesMode() {
// TODO(chriskirkland): throttle against the target cluster
mgtr.migrationContext.Log.Info("Skipping throttling in move tables mode")
return
}

mgtr.throttler = NewThrottler(mgtr.migrationContext, mgtr.applier, mgtr.inspector, mgtr.appVersion)

go mgtr.throttler.initiateThrottlerCollection(mgtr.firstThrottlingCollected)
mgtr.migrationContext.Log.Infof("Waiting for first throttle metrics to be collected")
<-mgtr.firstThrottlingCollected // replication lag
if !mgtr.migrationContext.IsMoveTablesMode() {
<-mgtr.firstThrottlingCollected // replication lag
}
<-mgtr.firstThrottlingCollected // HTTP status
<-mgtr.firstThrottlingCollected // other, general metrics
Comment on lines +1643 to 1647

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why only this first firstThrottlingCollected is gated behind move tables mode? Can't really tell just looking at this diff

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see down below we aren't emitting collectReplicationLag when in move tables. What's the reason for that?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh okay, reading into the code more is this because collectReplicationLag relies on the heartbeat table in some way? And we don't do the heartbeat in move tables

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, some of the collect...Lag goroutines in throttler.go get that firstThrottlingCollected channel handed over and write to it as soon as they're set up.
As we're skipping the collectReplicationLag goroutine for move-tables, we also must not await the channel, here. Otherwise it would block forever at the initializeThrottler func.

mgtr.migrationContext.Log.Infof("First throttle metrics collected")
Expand Down
18 changes: 16 additions & 2 deletions go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ func (thlr *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
}
}

// controlReplicaConnectionConfig returns the connection config used to read
// replication lag from a control replica. In move-tables mode the lag is read
// from the target cluster's replicas, otherwise from the source (inspector)
// cluster's replicas.
func (thlr *Throttler) controlReplicaConnectionConfig(replicaKey mysql.InstanceKey) *mysql.ConnectionConfig {
if thlr.migrationContext.IsMoveTablesMode() {
return thlr.migrationContext.MoveTables.ConnectionConfig.DuplicateCredentials(replicaKey)
}
return thlr.migrationContext.InspectorConnectionConfig.DuplicateCredentials(replicaKey)
}

// collectControlReplicasLag polls all the control replicas to get maximum lag value
func (thlr *Throttler) collectControlReplicasLag() {
if atomic.LoadInt64(&thlr.migrationContext.HibernateUntil) > 0 {
Expand Down Expand Up @@ -213,7 +224,8 @@ func (thlr *Throttler) collectControlReplicasLag() {
}
lagResults := make(chan *mysql.ReplicationLagResult, instanceKeyMap.Len())
for replicaKey := range *instanceKeyMap {
connectionConfig := thlr.migrationContext.InspectorConnectionConfig.DuplicateCredentials(replicaKey)
connectionConfig := thlr.controlReplicaConnectionConfig(replicaKey)

if err := connectionConfig.RegisterTLSConfig(); err != nil {
return &mysql.ReplicationLagResult{Err: err}
}
Comment thread
danieljoos marked this conversation as resolved.
Expand Down Expand Up @@ -443,7 +455,9 @@ func (thlr *Throttler) collectGeneralThrottleMetrics() error {
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func (thlr *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
go thlr.collectReplicationLag(firstThrottlingCollected)
if !thlr.migrationContext.IsMoveTablesMode() {
go thlr.collectReplicationLag(firstThrottlingCollected)
}
go thlr.collectControlReplicasLag()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to gate this one behind move tables mode? Quick glance at the code of collectControlReplicasLag, it looks like it relies on the changelog table which we also don't have in move-tables mode right?

@danieljoos danieljoos Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the collectReplicationLag is disabled for move-tables mode.

In that function, gh-ost typically collects the replication lag from the replica it is attached to (reading the binlog data).
So for migrations it then knows the "impact" of the copy-writes it did on the corresponding primary - and if these are causing too much stress on the replica it is attached to. It then begins to throttle automatically.

For move-tables we can't use that mechanism, as we're only connected to the target primary and not reading the binlog on the target side.

sorry misread your question. I'll look into that.

go thlr.collectThrottleHTTPStatus(firstThrottlingCollected)

Expand Down
35 changes: 35 additions & 0 deletions go/logic/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
)

func newTestThrottler() *Throttler {
Expand Down Expand Up @@ -86,6 +87,40 @@ func TestThrottleReturnsOnContextCancellation(t *testing.T) {
}
}

func TestControlReplicaConnectionConfig(t *testing.T) {
replicaKey := mysql.InstanceKey{Hostname: "replica-host", Port: 3307}

t.Run("uses inspector connection config when not in move-tables mode", func(t *testing.T) {
thlr := newTestThrottler()
thlr.migrationContext.InspectorConnectionConfig.Key = mysql.InstanceKey{Hostname: "source-host", Port: 3306}
thlr.migrationContext.InspectorConnectionConfig.User = "source-user"
thlr.migrationContext.InspectorConnectionConfig.Password = "source-pass"

connectionConfig := thlr.controlReplicaConnectionConfig(replicaKey)

assert.False(t, thlr.migrationContext.IsMoveTablesMode())
assert.Equal(t, replicaKey, connectionConfig.Key)
assert.Equal(t, "source-user", connectionConfig.User)
assert.Equal(t, "source-pass", connectionConfig.Password)
})

t.Run("uses move-tables connection config when in move-tables mode", func(t *testing.T) {
thlr := newTestThrottler()
thlr.migrationContext.MoveTables.TableNames = []string{"my_table"}
thlr.migrationContext.MoveTables.ConnectionConfig = mysql.NewConnectionConfig()
thlr.migrationContext.MoveTables.ConnectionConfig.Key = mysql.InstanceKey{Hostname: "target-host", Port: 3306}
thlr.migrationContext.MoveTables.ConnectionConfig.User = "target-user"
thlr.migrationContext.MoveTables.ConnectionConfig.Password = "target-pass"

connectionConfig := thlr.controlReplicaConnectionConfig(replicaKey)

assert.True(t, thlr.migrationContext.IsMoveTablesMode())
assert.Equal(t, replicaKey, connectionConfig.Key)
assert.Equal(t, "target-user", connectionConfig.User)
assert.Equal(t, "target-pass", connectionConfig.Password)
})
}

func TestThrottleCallsOnThrottledCallback(t *testing.T) {
thlr := newTestThrottler()
thlr.migrationContext.SetThrottled(true, "test", base.NoThrottleReasonHint)
Expand Down
Loading