diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index fc8ec2634..e7a2a0f03 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -134,7 +134,7 @@ func main() { maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation") replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") - throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") + throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307. In move-tables mode, these replicas are expected to be in the target cluster. Specified target credentials will be used for the connection.") throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response") flag.Int64Var(&migrationContext.ThrottleHTTPIntervalMillis, "throttle-http-interval-millis", 100, "Number of milliseconds to wait before triggering another HTTP throttle check") diff --git a/go/logic/applier.go b/go/logic/applier.go index c025000b5..e18950a84 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1900,8 +1900,12 @@ func (apl *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed } func (apl *Applier) ShowStatusVariable(variableName string) (result int64, err error) { + targetDB := apl.db + if apl.migrationContext.IsMoveTablesMode() { + targetDB = apl.moveTablesTargetDB + } query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName) - if err := apl.db.QueryRow(query).Scan(&variableName, &result); err != nil { + if err := targetDB.QueryRow(query).Scan(&variableName, &result); err != nil { return 0, err } return result, nil diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index e0fd034b9..cec2f3d86 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1964,6 +1964,62 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueriesNoRows() { suite.Require().Equal(0, count) } +func (suite *ApplierTestSuite) TestShowStatusVariable() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + suite.Require().NoError(applier.InitDBConnections()) + + // Uptime is always present in `SHOW GLOBAL STATUS` and is non-negative. + result, err := applier.ShowStatusVariable("Uptime") + suite.Require().NoError(err) + suite.Require().GreaterOrEqual(result, int64(0)) +} + +func (suite *ApplierTestSuite) TestShowStatusVariableMoveTablesMode() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + suite.Require().NoError(applier.InitDBConnections()) + + // In move-tables mode the status variable must be read from the + // move-tables target DB connection rather than the applier DB. + suite.Require().True(migrationContext.IsMoveTablesMode()) + suite.Require().NotNil(applier.moveTablesTargetDB) + + result, err := applier.ShowStatusVariable("Uptime") + suite.Require().NoError(err) + suite.Require().GreaterOrEqual(result, int64(0)) +} + func TestApplier(t *testing.T) { if testing.Short() { t.Skip("skipping applier test suite in short mode") diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 8a53b7302..e62855324 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1636,17 +1636,13 @@ func (mgtr *Migrator) addDMLEventsListener() error { // initiateThrottler kicks in the throttling collection and the throttling checks. func (mgtr *Migrator) initiateThrottler() { - if mgtr.migrationContext.IsMoveTablesMode() { - // TODO(chriskirkland): throttle against the target cluster - mgtr.migrationContext.Log.Info("Skipping throttling in move tables mode") - return - } - mgtr.throttler = NewThrottler(mgtr.migrationContext, mgtr.applier, mgtr.inspector, mgtr.appVersion) go mgtr.throttler.initiateThrottlerCollection(mgtr.firstThrottlingCollected) mgtr.migrationContext.Log.Infof("Waiting for first throttle metrics to be collected") - <-mgtr.firstThrottlingCollected // replication lag + if !mgtr.migrationContext.IsMoveTablesMode() { + <-mgtr.firstThrottlingCollected // replication lag + } <-mgtr.firstThrottlingCollected // HTTP status <-mgtr.firstThrottlingCollected // other, general metrics mgtr.migrationContext.Log.Infof("First throttle metrics collected") diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 2535579ab..19b5b4084 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -176,6 +176,17 @@ func (thlr *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo } } +// controlReplicaConnectionConfig returns the connection config used to read +// replication lag from a control replica. In move-tables mode the lag is read +// from the target cluster's replicas, otherwise from the source (inspector) +// cluster's replicas. +func (thlr *Throttler) controlReplicaConnectionConfig(replicaKey mysql.InstanceKey) *mysql.ConnectionConfig { + if thlr.migrationContext.IsMoveTablesMode() { + return thlr.migrationContext.MoveTables.ConnectionConfig.DuplicateCredentials(replicaKey) + } + return thlr.migrationContext.InspectorConnectionConfig.DuplicateCredentials(replicaKey) +} + // collectControlReplicasLag polls all the control replicas to get maximum lag value func (thlr *Throttler) collectControlReplicasLag() { if atomic.LoadInt64(&thlr.migrationContext.HibernateUntil) > 0 { @@ -213,7 +224,8 @@ func (thlr *Throttler) collectControlReplicasLag() { } lagResults := make(chan *mysql.ReplicationLagResult, instanceKeyMap.Len()) for replicaKey := range *instanceKeyMap { - connectionConfig := thlr.migrationContext.InspectorConnectionConfig.DuplicateCredentials(replicaKey) + connectionConfig := thlr.controlReplicaConnectionConfig(replicaKey) + if err := connectionConfig.RegisterTLSConfig(); err != nil { return &mysql.ReplicationLagResult{Err: err} } @@ -443,7 +455,9 @@ func (thlr *Throttler) collectGeneralThrottleMetrics() error { // that may affect throttling. There are several components, all running independently, // that collect such metrics. func (thlr *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) { - go thlr.collectReplicationLag(firstThrottlingCollected) + if !thlr.migrationContext.IsMoveTablesMode() { + go thlr.collectReplicationLag(firstThrottlingCollected) + } go thlr.collectControlReplicasLag() go thlr.collectThrottleHTTPStatus(firstThrottlingCollected) diff --git a/go/logic/throttler_test.go b/go/logic/throttler_test.go index f2d3c193a..4a0b03826 100644 --- a/go/logic/throttler_test.go +++ b/go/logic/throttler_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/mysql" ) func newTestThrottler() *Throttler { @@ -86,6 +87,40 @@ func TestThrottleReturnsOnContextCancellation(t *testing.T) { } } +func TestControlReplicaConnectionConfig(t *testing.T) { + replicaKey := mysql.InstanceKey{Hostname: "replica-host", Port: 3307} + + t.Run("uses inspector connection config when not in move-tables mode", func(t *testing.T) { + thlr := newTestThrottler() + thlr.migrationContext.InspectorConnectionConfig.Key = mysql.InstanceKey{Hostname: "source-host", Port: 3306} + thlr.migrationContext.InspectorConnectionConfig.User = "source-user" + thlr.migrationContext.InspectorConnectionConfig.Password = "source-pass" + + connectionConfig := thlr.controlReplicaConnectionConfig(replicaKey) + + assert.False(t, thlr.migrationContext.IsMoveTablesMode()) + assert.Equal(t, replicaKey, connectionConfig.Key) + assert.Equal(t, "source-user", connectionConfig.User) + assert.Equal(t, "source-pass", connectionConfig.Password) + }) + + t.Run("uses move-tables connection config when in move-tables mode", func(t *testing.T) { + thlr := newTestThrottler() + thlr.migrationContext.MoveTables.TableNames = []string{"my_table"} + thlr.migrationContext.MoveTables.ConnectionConfig = mysql.NewConnectionConfig() + thlr.migrationContext.MoveTables.ConnectionConfig.Key = mysql.InstanceKey{Hostname: "target-host", Port: 3306} + thlr.migrationContext.MoveTables.ConnectionConfig.User = "target-user" + thlr.migrationContext.MoveTables.ConnectionConfig.Password = "target-pass" + + connectionConfig := thlr.controlReplicaConnectionConfig(replicaKey) + + assert.True(t, thlr.migrationContext.IsMoveTablesMode()) + assert.Equal(t, replicaKey, connectionConfig.Key) + assert.Equal(t, "target-user", connectionConfig.User) + assert.Equal(t, "target-pass", connectionConfig.Password) + }) +} + func TestThrottleCallsOnThrottledCallback(t *testing.T) { thlr := newTestThrottler() thlr.migrationContext.SetThrottled(true, "test", base.NoThrottleReasonHint)