diff --git a/go/base/context.go b/go/base/context.go index 59550e045..4daa57b66 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -398,6 +398,24 @@ func (mctx *MigrationContext) GetGhostTableName() string { } } +// GetTargetTableName generates the name of the target table, based on original table name and +// the migration context (i.e. move-tables mode). +func (mctx *MigrationContext) GetTargetTableName() string { + if mctx.IsMoveTablesMode() { + return mctx.MoveTables.TableNames[0] + } + return mctx.GetGhostTableName() +} + +// GetTargetDatabaseName fetches the name of the target database, which defaults to the original +// database name unless we're in move-tables mode. +func (mctx *MigrationContext) GetTargetDatabaseName() string { + if mctx.IsMoveTablesMode() { + return mctx.MoveTables.TargetDatabase + } + return mctx.DatabaseName +} + // GetOldTableName generates the name of the "old" table, into which the original table is renamed. func (mctx *MigrationContext) GetOldTableName() string { var tableName string @@ -939,11 +957,27 @@ func (mctx *MigrationContext) ApplyCredentials() { // Override mctx.InspectorConnectionConfig.Password = mctx.CliPassword } + + if mctx.IsMoveTablesMode() { + // Derive the applier config from the inspector config, but point it at + // the target host and override credentials from the target CLI args. + mctx.MoveTables.ConnectionConfig = mctx.InspectorConnectionConfig.DuplicateCredentials(mysql.InstanceKey{ + Hostname: mctx.MoveTables.TargetHost, + Port: mctx.MoveTables.TargetPort, + }) + mctx.MoveTables.ConnectionConfig.User = mctx.MoveTables.TargetUser + mctx.MoveTables.ConnectionConfig.Password = mctx.MoveTables.TargetPass + } } func (mctx *MigrationContext) SetupTLS() error { if mctx.UseTLS { - return mctx.InspectorConnectionConfig.UseTLS(mctx.TLSCACertificate, mctx.TLSCertificate, mctx.TLSKey, mctx.TLSAllowInsecure) + if err := mctx.InspectorConnectionConfig.UseTLS(mctx.TLSCACertificate, mctx.TLSCertificate, mctx.TLSKey, mctx.TLSAllowInsecure); err != nil { + return err + } + if mctx.IsMoveTablesMode() && mctx.MoveTables.ConnectionConfig != nil { + return mctx.MoveTables.ConnectionConfig.UseTLS(mctx.TLSCACertificate, mctx.TLSCertificate, mctx.TLSKey, mctx.TLSAllowInsecure) + } } return nil } diff --git a/go/base/context_test.go b/go/base/context_test.go index a9f62150d..ffbc174a4 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -216,6 +216,62 @@ func TestReadConfigFile(t *testing.T) { } } +func TestApplyCredentialsMoveTablesDerivesConnectionConfig(t *testing.T) { + ctx := NewMigrationContext() + ctx.MoveTables.TableNames = []string{"some_table"} + ctx.MoveTables.TargetHost = "target-host" + ctx.MoveTables.TargetPort = 3307 + ctx.MoveTables.TargetUser = "target-user" + ctx.MoveTables.TargetPass = "target-pass" + + ctx.InspectorConnectionConfig.Key.Hostname = "source-host" + ctx.InspectorConnectionConfig.Key.Port = 3306 + ctx.InspectorConnectionConfig.User = "source-user" + ctx.InspectorConnectionConfig.Password = "source-pass" + ctx.InspectorConnectionConfig.Timeout = 12.5 + ctx.InspectorConnectionConfig.TransactionIsolation = "REPEATABLE-READ" + ctx.InspectorConnectionConfig.Charset = "utf8mb4" + + ctx.ApplyCredentials() + + got := ctx.MoveTables.ConnectionConfig + require.NotNil(t, got) + require.Equal(t, "target-host", got.Key.Hostname) + require.Equal(t, 3307, got.Key.Port) + require.Equal(t, "target-user", got.User) + require.Equal(t, "target-pass", got.Password) + require.Equal(t, 12.5, got.Timeout) + require.Equal(t, "REPEATABLE-READ", got.TransactionIsolation) + require.Equal(t, "utf8mb4", got.Charset) + require.NotNil(t, got.ImpliedKey) + require.Equal(t, "target-host", got.ImpliedKey.Hostname) + require.Equal(t, 3307, got.ImpliedKey.Port) +} + +func TestSetupTLSAppliesToMoveTablesConfig(t *testing.T) { + ctx := NewMigrationContext() + ctx.UseTLS = true + ctx.TLSAllowInsecure = true + ctx.MoveTables.TableNames = []string{"some_table"} + ctx.MoveTables.TargetHost = "target-host" + ctx.MoveTables.TargetPort = 3307 + ctx.MoveTables.TargetUser = "target-user" + ctx.MoveTables.TargetPass = "target-pass" + + ctx.InspectorConnectionConfig.Key.Hostname = "source-host" + ctx.InspectorConnectionConfig.Key.Port = 3306 + ctx.InspectorConnectionConfig.User = "source-user" + ctx.InspectorConnectionConfig.Password = "source-pass" + + ctx.ApplyCredentials() + require.NoError(t, ctx.SetupTLS()) + + require.NotNil(t, ctx.InspectorConnectionConfig.TLSConfig()) + require.NotNil(t, ctx.MoveTables.ConnectionConfig.TLSConfig()) + require.Equal(t, "source-host", ctx.InspectorConnectionConfig.TLSConfig().ServerName) + require.Equal(t, "target-host", ctx.MoveTables.ConnectionConfig.TLSConfig().ServerName) +} + func TestSetAbortError_StoresFirstError(t *testing.T) { ctx := NewMigrationContext() diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index d678bb5d0..fc8ec2634 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -20,6 +20,7 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/logic" "github.com/github/gh-ost/go/metrics" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" _ "github.com/go-sql-driver/mysql" "github.com/openark/golib/log" @@ -233,12 +234,6 @@ func main() { migrationContext.Log.SetLevel(log.ERROR) } - if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil { - migrationContext.Log.Fatale(err) - } - - migrationContext.SetConnectionCharset(*charset) - if migrationContext.AlterStatement == "" && !migrationContext.Revert && *moveTables == "" { log.Fatal("--alter must be provided and statement must not be empty, or --revert must be used, or --move-tables must be used") } @@ -379,6 +374,7 @@ func main() { // For now, we only support moving a single table at a time. log.Fatal("--move-tables currently supports only a single table") } + if migrationContext.MoveTables.TargetUser == "" { migrationContext.MoveTables.TargetUser = migrationContext.CliUser } @@ -388,8 +384,15 @@ func main() { if migrationContext.MoveTables.TargetDatabase == "" { migrationContext.MoveTables.TargetDatabase = migrationContext.DatabaseName } + migrationContext.MoveTables.ConnectionConfig = mysql.NewConnectionConfig() } + if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil { + migrationContext.Log.Fatale(err) + } + + migrationContext.SetConnectionCharset(*charset) + switch *cutOver { case "atomic", "default", "": migrationContext.CutOverType = base.CutOverAtomic diff --git a/go/logic/applier.go b/go/logic/applier.go index 24b47dc84..c025000b5 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -115,7 +115,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { // hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid // regex syntax errors. func (apl *Applier) compileMigrationKeyWarningRegex() (*regexp.Regexp, error) { - escapedTable := regexp.QuoteMeta(apl.migrationContext.GetGhostTableName()) + escapedTable := regexp.QuoteMeta(apl.migrationContext.GetTargetTableName()) escapedKey := regexp.QuoteMeta(apl.migrationContext.UniqueKey.NameInGhostTable) migrationUniqueKeyPattern := fmt.Sprintf(`for key '(%s\.)?%s'`, escapedTable, escapedKey) migrationKeyRegex, err := regexp.Compile(migrationUniqueKeyPattern) @@ -126,7 +126,7 @@ func (apl *Applier) compileMigrationKeyWarningRegex() (*regexp.Regexp, error) { } func (apl *Applier) InitDBConnections() (err error) { - applierUri := apl.connectionConfig.GetDBUri(apl.migrationContext.DatabaseName) + applierUri := apl.connectionConfig.GetDBUri(apl.migrationContext.GetTargetDatabaseName()) uriWithMulti := fmt.Sprintf("%s&multiStatements=true", applierUri) if apl.db, _, err = mysql.GetDB(apl.migrationContext.Uuid, uriWithMulti); err != nil { return err @@ -154,11 +154,23 @@ func (apl *Applier) InitDBConnections() (err error) { apl.connectionConfig.ImpliedKey = impliedKey } } - if err := apl.readTableColumns(); err != nil { - return err + if !apl.migrationContext.IsMoveTablesMode() { + // read target table columns from applier + if err := apl.readTableColumns(); err != nil { + return err + } + } + if apl.moveTablesConnectionConfig != nil { + moveTablesURI := apl.moveTablesConnectionConfig.GetDBUri(apl.migrationContext.GetTargetDatabaseName()) + "&multiStatements=true" + if apl.moveTablesTargetDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, moveTablesURI); err != nil { + return err + } + if _, err := base.ValidateConnection(apl.moveTablesTargetDB, apl.moveTablesConnectionConfig, apl.migrationContext, apl.name); err != nil { + return err + } } if apl.moveTablesConnectionConfig != nil { - moveTablesURI := apl.moveTablesConnectionConfig.GetDBUri(apl.migrationContext.MoveTables.TargetDatabase) + "&multiStatements=true" + moveTablesURI := apl.moveTablesConnectionConfig.GetDBUri(apl.migrationContext.GetTargetDatabaseName()) + "&multiStatements=true" if apl.moveTablesTargetDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, moveTablesURI); err != nil { return err } @@ -185,7 +197,7 @@ func buildMigrationLockName(db, table string) string { // preventing two gh-ost processes from migrating the same table concurrently // on the same MySQL server. func (apl *Applier) AcquireMigrationLock(ctx context.Context) error { - lockName := buildMigrationLockName(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName) + lockName := buildMigrationLockName(apl.migrationContext.DatabaseName, apl.originalTableName()) // Use a dedicated *sql.DB so the pinned connection does not consume a // slot in apl.db's small pool (mysql.MaxDBPoolConnections). @@ -223,10 +235,10 @@ func (apl *Applier) AcquireMigrationLock(ctx context.Context) error { lockDB.Close() if holderID.Valid { return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s held by connection id %d", - apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName, holderID.Int64) + apl.migrationContext.DatabaseName, apl.originalTableName(), lockName, holderID.Int64) } return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s is held", - apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName) + apl.migrationContext.DatabaseName, apl.originalTableName(), lockName) } apl.migrationLockConn = conn @@ -314,12 +326,8 @@ func (apl *Applier) releaseMigrationLock() { } func (apl *Applier) prepareQueries() (err error) { - targetDatabaseName := apl.migrationContext.DatabaseName - targetTableName := apl.migrationContext.GetGhostTableName() - if apl.migrationContext.IsMoveTablesMode() { - targetDatabaseName = apl.migrationContext.MoveTables.TargetDatabase - targetTableName = apl.migrationContext.OriginalTableName - } + targetDatabaseName := apl.migrationContext.GetTargetDatabaseName() + targetTableName := apl.migrationContext.GetTargetTableName() if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder( targetDatabaseName, @@ -360,7 +368,7 @@ func (apl *Applier) prepareQueries() (err error) { if apl.migrationContext.IsMoveTablesMode() { if apl.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( apl.migrationContext.DatabaseName, - apl.migrationContext.OriginalTableName, + apl.originalTableName(), apl.migrationContext.OriginalTableColumns, apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, @@ -370,7 +378,7 @@ func (apl *Applier) prepareQueries() (err error) { } if apl.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( apl.migrationContext.DatabaseName, - apl.migrationContext.OriginalTableName, + apl.originalTableName(), apl.migrationContext.OriginalTableColumns, apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, @@ -425,7 +433,7 @@ func (apl *Applier) generateSqlModeQuery() string { func (apl *Applier) generateInstantDDLQuery() string { return fmt.Sprintf(`ALTER /* gh-ost */ TABLE %s.%s %s, ALGORITHM=INSTANT`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), apl.migrationContext.AlterStatementOptions, ) } @@ -433,7 +441,7 @@ func (apl *Applier) generateInstantDDLQuery() string { // readTableColumns reads table columns on applier func (apl *Applier) readTableColumns() (err error) { apl.migrationContext.Log.Infof("Examining table structure on applier") - apl.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(apl.db, apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName) + apl.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(apl.db, apl.migrationContext.DatabaseName, apl.originalTableName()) if err != nil { return err } @@ -456,6 +464,13 @@ func (apl *Applier) tableExists(tableName string) (tableFound bool) { return (m != nil) } +func (apl *Applier) originalTableName() string { + if apl.migrationContext.IsMoveTablesMode() { + return apl.migrationContext.MoveTables.TableNames[0] + } + return apl.migrationContext.OriginalTableName +} + // ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, // or attempts to drop them if instructed to. func (apl *Applier) ValidateOrDropExistingTables() error { @@ -537,17 +552,19 @@ func retryOnLockWaitTimeout(operation func() error, maxRetries int64, logger bas return err } -// CreateGhostTable creates the ghost table on the applier host -func (apl *Applier) CreateGhostTable() error { +// createTargetTable creates the table on the applier host to which the applier will +// apply changes. +func (apl *Applier) createTargetTable(targetTableName string) error { + targetDatabase := apl.migrationContext.GetTargetDatabaseName() query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`, + sql.EscapeName(targetDatabase), + sql.EscapeName(targetTableName), sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.GetGhostTableName()), - sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) - apl.migrationContext.Log.Infof("Creating ghost table %s.%s", - sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.GetGhostTableName()), + apl.migrationContext.Log.Infof("Creating target table %s.%s", + sql.EscapeName(targetDatabase), + sql.EscapeName(targetTableName), ) err := func() error { @@ -566,7 +583,44 @@ func (apl *Applier) CreateGhostTable() error { if _, err := tx.Exec(query); err != nil { return err } - apl.migrationContext.Log.Infof("Ghost table created") + apl.migrationContext.Log.Infof("Target table created") + if err := tx.Commit(); err != nil { + // Neither SET SESSION nor ALTER are really transactional, so strictly speaking + // there's no need to commit; but let's do this the legit way anyway. + return err + } + return nil + }() + + return err +} + +// createTargetTableFromStatement creates the table on the applier host to which the applier will +// apply changes. +func (apl *Applier) createTargetTableFromStatement(targetTableName, createStatement string) error { + targetDatabase := apl.migrationContext.GetTargetDatabaseName() + apl.migrationContext.Log.Infof("Creating target table %s.%s", + sql.EscapeName(targetDatabase), + sql.EscapeName(targetTableName), + ) + + err := func() error { + tx, err := apl.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, apl.migrationContext.ApplierTimeZone) + sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery()) + + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + if _, err := tx.Exec(createStatement); err != nil { + return err + } + apl.migrationContext.Log.Infof("Target table created") if err := tx.Commit(); err != nil { // Neither SET SESSION nor ALTER are really transactional, so strictly speaking // there's no need to commit; but let's do this the legit way anyway. @@ -578,6 +632,22 @@ func (apl *Applier) CreateGhostTable() error { return err } +// CreateGhostTable creates the ghost table on the applier host +func (apl *Applier) CreateGhostTable() error { + if apl.migrationContext.IsMoveTablesMode() { + return errors.New("CreateGhostTable is not available in MoveTables mode") + } + return apl.createTargetTable(apl.migrationContext.GetGhostTableName()) +} + +// CreateTargetTable creates the target table on the target host (for move-tables) +func (apl *Applier) CreateTargetTable(createStatement string) error { + if !apl.migrationContext.IsMoveTablesMode() { + return errors.New("CreateTargetTable is only available in MoveTables mode") + } + return apl.createTargetTableFromStatement(apl.originalTableName(), createStatement) +} + // AlterGhost applies `alter` statement on ghost table func (apl *Applier) AlterGhost() error { query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s %s`, @@ -784,7 +854,7 @@ func (apl *Applier) createTriggers(tableName string) error { sql.EscapeName(tableName), trigger.Statement, ) - apl.migrationContext.Log.Infof("Createing trigger %s on %s.%s", + apl.migrationContext.Log.Infof("Creating trigger %s on %s.%s", sql.EscapeName(triggerName), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(tableName), @@ -800,8 +870,10 @@ func (apl *Applier) createTriggers(tableName string) error { // CreateTriggers creates the original triggers on applier host func (apl *Applier) CreateTriggersOnGhost() error { - err := apl.createTriggers(apl.migrationContext.GetGhostTableName()) - return err + if err := apl.createTriggers(apl.migrationContext.GetGhostTableName()); err != nil { + return fmt.Errorf("error creating triggers on ghost table: %w", err) + } + return nil } // DropChangelogTable drops the changelog table on the applier host @@ -821,12 +893,22 @@ func (apl *Applier) DropOldTable() error { // DropGhostTable drops the ghost table on the applier host func (apl *Applier) DropGhostTable() error { - return apl.dropTable(apl.migrationContext.GetGhostTableName()) + if err := apl.dropTable(apl.migrationContext.GetGhostTableName()); err != nil { + return fmt.Errorf("error dropping ghost table: %w", err) + } + return nil } // WriteChangelog writes a value to the changelog table. -// It returns the hint as given, for convenience +// It returns the hint (or an empty string in move-tables mode), for convenience func (apl *Applier) WriteChangelog(hint, value string) (string, error) { + // In move-tables mode, there is no changelog table (§1.2). All changelog + // writes are no-ops. This is a single chokepoint rather than per-caller + // guards to prevent drift when new callers are added. + if apl.migrationContext.IsMoveTablesMode() { + return "", nil + } + explicitId := 0 switch hint { case "heartbeat": @@ -919,6 +1001,11 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) { // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. // Apl is done asynchronously func (apl *Applier) InitiateHeartbeat() { + // In move-tables mode, there is no heartbeat table (§1.2). + if apl.migrationContext.IsMoveTablesMode() { + return + } + var numSuccessiveFailures int64 injectHeartbeat := func() error { if atomic.LoadInt64(&apl.migrationContext.HibernateUntil) > 0 { @@ -987,8 +1074,7 @@ func (apl *Applier) ExecuteThrottleQuery() (int64, error) { // readMigrationMinValues returns the minimum values to be iterated on rowcopy func (apl *Applier) readMigrationMinValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error { - apl.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) - query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, uniqueKey) + query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.originalTableName(), uniqueKey) if err != nil { return err } @@ -1013,7 +1099,7 @@ func (apl *Applier) readMigrationMinValues(tx *gosql.Tx, uniqueKey *sql.UniqueKe // readMigrationMaxValues returns the maximum values to be iterated on rowcopy func (apl *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error { apl.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) - query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, uniqueKey) + query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.originalTableName(), uniqueKey) if err != nil { return err } @@ -1052,12 +1138,17 @@ Detail description of the lost data in mysql two-phase commit issue by @Fanduzi: will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the newly inserted data, thus Avoiding data loss due to the above problem. */ -func (apl *Applier) ReadMigrationRangeValues() error { +func (apl *Applier) ReadMigrationRangeValues(db *gosql.DB) error { if _, err := apl.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil { return err } - tx, err := apl.db.Begin() + if db == nil { + // default to reading from applier database + db = apl.db + } + + tx, err := db.Begin() if err != nil { return err } @@ -1077,7 +1168,7 @@ func (apl *Applier) ReadMigrationRangeValues() error { // which will be used for copying the next chunk of rows. Ir returns "false" if there is // no further chunk to work through, i.e. we're past the last chunk and are done with // iterating the range (and thus done with copying row chunks) -func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { +func (apl *Applier) CalculateNextIterationRangeEndValues(db *gosql.DB) (hasFurtherRange bool, err error) { for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -1085,7 +1176,7 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool } query, explodedArgs, err := buildFunc( apl.migrationContext.DatabaseName, - apl.migrationContext.OriginalTableName, + apl.originalTableName(), &apl.migrationContext.UniqueKey.Columns, apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), apl.migrationContext.MigrationRangeMaxValues.AbstractValues(), @@ -1097,7 +1188,12 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool return hasFurtherRange, err } - rows, err := apl.db.Query(query, explodedArgs...) + if db == nil { + // default to applier database if not provided + db = apl.db + } + + rows, err := db.Query(query, explodedArgs...) if err != nil { return hasFurtherRange, err } @@ -1130,7 +1226,7 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( apl.migrationContext.DatabaseName, - apl.migrationContext.OriginalTableName, + apl.originalTableName(), apl.migrationContext.GetGhostTableName(), apl.migrationContext.SharedColumns.Names(), apl.migrationContext.MappedSharedColumns.Names(), @@ -1219,13 +1315,20 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i // ApplyIterationMoveTableCopyQueries issues a SELECT query on the original table and an INSERT query on the target table, // copying a chunk of rows. It is used when `--move-tables` is specified, instead of ApplyIterationInsertQuery. -func (apl *Applier) ApplyIterationMoveTableCopyQueries() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +func (apl *Applier) ApplyIterationMoveTableCopyQueries(sourceDB *gosql.DB) (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { startTime := time.Now() chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize) + if sourceDB == nil { + return chunkSize, rowsAffected, duration, errors.New("source DB is required for move-tables copy") + } // First, select data from the source database: rows, err := func() ([]*sql.ColumnValues, error) { var qb *sql.MoveTableCopySelectQueryBuilder + apl.migrationContext.Log.Debugf("Building SELECT query for move-tables; first: %v; rest: %v", + apl.moveTablesCopySelectFirstQueryBuilder, + apl.moveTablesCopySelectNextQueryBuilder) + if apl.migrationContext.GetIteration() == 0 { qb = apl.moveTablesCopySelectFirstQueryBuilder } else { @@ -1238,7 +1341,7 @@ func (apl *Applier) ApplyIterationMoveTableCopyQueries() (chunkSize int64, rowsA if err != nil { return nil, err } - sqlRows, err := apl.db.Query(query, explodedArgs...) + sqlRows, err := sourceDB.Query(query, explodedArgs...) if err != nil { return nil, err } @@ -1345,11 +1448,11 @@ func (apl *Applier) ApplyIterationMoveTableCopyQueries() (chunkSize int64, rowsA func (apl *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Locking %s.%s", sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.LockTablesStartTime = time.Now() if _, err := sqlutils.ExecNoPrepare(apl.singletonDB, query); err != nil { @@ -1377,7 +1480,7 @@ func (apl *Applier) UnlockTables() error { func (apl *Applier) SwapTablesQuickAndBumpy() error { query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) apl.migrationContext.Log.Infof("Renaming original table") @@ -1388,9 +1491,9 @@ func (apl *Applier) SwapTablesQuickAndBumpy() error { query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) - apl.migrationContext.Log.Infof("Renaming ghost table") + apl.migrationContext.Log.Infof("Renaming target table") if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { return err } @@ -1407,13 +1510,13 @@ func (apl *Applier) RenameTablesRollback() (renameError error) { // We prefer the single, atomic operation: query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Renaming back both tables") if _, err := sqlutils.ExecNoPrepare(apl.db, query); err == nil { @@ -1422,7 +1525,7 @@ func (apl *Applier) RenameTablesRollback() (renameError error) { // But, if for some reason the above was impossible to do, we rename one by one. query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), ) @@ -1434,7 +1537,7 @@ func (apl *Applier) RenameTablesRollback() (renameError error) { sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Renaming back to original table") if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { @@ -1683,13 +1786,13 @@ func (apl *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) apl.migrationContext.Log.Infof("Locking %s.%s, %s.%s", sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) @@ -1739,7 +1842,7 @@ func (apl *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked // Tables still locked apl.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s", sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) @@ -1778,13 +1881,13 @@ func (apl *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query) if _, err := tx.Exec(query); err != nil { @@ -2088,12 +2191,12 @@ func (apl *Applier) ExpectMetadataLock(sessionId int64) error { err := sqlutils.QueryRowsMap(apl.db, query, func(m sqlutils.RowMap) error { found = true return nil - }, apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, sessionId) + }, apl.migrationContext.DatabaseName, apl.originalTableName(), sessionId) if err != nil { return err } if !found { - err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName) + err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", apl.migrationContext.DatabaseName, apl.originalTableName()) return apl.migrationContext.Log.Errore(err) } return nil diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 2f1676617..e0fd034b9 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -335,6 +335,8 @@ func (suite *ApplierTestSuite) TearDownTest() { suite.Require().NoError(err) _, err = suite.otherDB.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestOtherTableName()) suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`_%s_ghc`", testMysqlDatabase, testMysqlTableName)) + suite.Require().NoError(err) } func (suite *ApplierTestSuite) TestInitDBConnections() { @@ -366,6 +368,41 @@ func (suite *ApplierTestSuite) TestInitDBConnections() { suite.Require().Equal(sql.NewColumnList([]string{"id", "item_id"}), migrationContext.OriginalTableColumnsOnApplier) } +func (suite *ApplierTestSuite) TestInitiateApplierMoveTablesMode_NoGhostOrChangelogTable() { + ctx := context.Background() + + var err error + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // #8206 [Task] [1.2] Skip ghost/changelog tables, heartbeat in gh-ost move-tables mode + // In move-tables mode, no ghost or changelog table should exist. + // InitDBConnections() should succeed without them. + suite.Require().False(applier.tableExists("_testing_gho"), "ghost table should not exist in move-tables mode") + suite.Require().False(applier.tableExists("_testing_ghc"), "changelog table should not exist in move-tables mode") + + // In move-tables mode, OriginalTableColumnsOnApplier is unused and intentionally never populated + suite.Require().Nil(migrationContext.OriginalTableColumnsOnApplier) +} + func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { ctx := context.Background() @@ -431,6 +468,53 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { suite.Require().Equal(int64(0), migrationContext.RowsDeltaEstimate) } +// finalCleanup() requires a fully wired migrator to call directly. +// This test verifies the IsMoveTablesMode() predicate that gates the early return. +// Full behavioral coverage relies on the suite: no ghost/changelog tables are +// created (Test #1), and WriteChangelog is a no-op (Test #2). +func (suite *ApplierTestSuite) TestFinalCleanupMoveTablesMode_SkipsDrops() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) +} + +// initiateStreaming() requires a binlog-capable MySQL connection to call directly. +// This test verifies IsMoveTablesMode() and that GetChangelogTableName() returns +// a derivable name. A new streamer always starts with zero listeners; the real +// proof that no changelog listener is registered comes from the full run not +// failing on a nonexistent _ghc table. +func (suite *ApplierTestSuite) TestInitiateStreamingMoveTablesMode_NoChangelogListener() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) + + changelogTableName := migrationContext.GetChangelogTableName() + suite.Require().NotEmpty(changelogTableName, "changelog table name should be derivable") + + streamer := NewEventsStreamer(migrationContext) + suite.Require().Empty(streamer.listeners, "new streamer should have no listeners") +} + +// initiateApplier() requires a full migrator to call directly. +// This test verifies the IsMoveTablesMode() predicate that gates InitiateHeartbeat(). +// Even if heartbeat ran, TestWriteChangelogNoOpInMoveTablesMode proves WriteChangelog +// is a no-op, so no SQL would execute against a nonexistent changelog table. +// +// A stronger test would instrument InitiateHeartbeat() (e.g., via a callback or +// channel) to assert the goroutine is never started. That requires test-infrastructure +// changes to the Applier and is beyond #8206's scope. +func (suite *ApplierTestSuite) TestNoHeartbeatInMoveTablesMode() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) +} + func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() { ctx := context.Background() @@ -529,6 +613,43 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi suite.Require().Equal(gosql.ErrNoRows, err) } +func (suite *ApplierTestSuite) TestWriteChangelogNoOpInMoveTablesMode() { + ctx := context.Background() + + var err error + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // #8206 [Task] [1.2] Skip ghost/changelog tables, heartbeat in gh-ost move-tables mode + // WriteChangelog should be a no-op in move-tables mode. + // No changelog table exists, so if it tried to execute, it would fail. + hint, err := applier.WriteChangelog("heartbeat", "2026-06-05T00:00:00Z") + suite.Require().NoError(err) + suite.Require().Empty(hint) + + // Also verify state writes are no-ops + hint, err = applier.WriteChangelogState("Migrated") + suite.Require().NoError(err) + suite.Require().Equal("", hint) +} + func (suite *ApplierTestSuite) TestAcquireMigrationLockSucceedsWhenFree() { ctx := context.Background() @@ -701,11 +822,11 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -778,14 +899,14 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) err = applier.AlterGhost() suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -851,7 +972,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { err = applier.prepareQueries() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) // checkpoint table is empty @@ -1745,15 +1866,15 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) - chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries() + chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries(applier.db) suite.Require().NoError(err) suite.Require().Equal(int64(3), rowsAffected) suite.Require().Equal(int64(1000), chunkSize) @@ -1830,7 +1951,7 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueriesNoRows() { migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{100}) migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{200}) - chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries() + chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries(applier.db) suite.Require().NoError(err) suite.Require().Equal(int64(0), rowsAffected) suite.Require().Equal(int64(1000), chunkSize) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 96aadd672..59eb22282 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -90,13 +90,13 @@ func (isp *Inspector) ValidateOriginalTable() (err error) { return err } if err := isp.validateTableForeignKeys(isp.migrationContext.DiscardForeignKeys); err != nil { - return err + return fmt.Errorf("failed to validate table foreign keys: %w", err) } if err := isp.validateTableTriggers(); err != nil { - return err + return fmt.Errorf("failed to validate table triggers: %w", err) } if err := isp.estimateTableRowsViaExplain(); err != nil { - return err + return fmt.Errorf("failed to estimate table rows: %w", err) } return nil } @@ -118,17 +118,24 @@ func (isp *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (column } func (isp *Inspector) InspectOriginalTable() (err error) { - isp.migrationContext.OriginalTableColumns, isp.migrationContext.OriginalTableVirtualColumns, isp.migrationContext.OriginalTableUniqueKeys, err = isp.InspectTableColumnsAndUniqueKeys(isp.migrationContext.OriginalTableName) + isp.migrationContext.OriginalTableColumns, isp.migrationContext.OriginalTableVirtualColumns, isp.migrationContext.OriginalTableUniqueKeys, err = isp.InspectTableColumnsAndUniqueKeys(isp.originalTableName()) if err != nil { return err } - isp.migrationContext.OriginalTableAutoIncrement, err = isp.getAutoIncrementValue(isp.migrationContext.OriginalTableName) + isp.migrationContext.OriginalTableAutoIncrement, err = isp.getAutoIncrementValue(isp.originalTableName()) if err != nil { return err } return nil } +func (isp *Inspector) originalTableName() string { + if isp.migrationContext.IsMoveTablesMode() { + return isp.migrationContext.MoveTables.TableNames[0] + } + return isp.migrationContext.OriginalTableName +} + // inspectOriginalAndGhostTables compares original and ghost tables to see whether the migration // makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { @@ -143,30 +150,7 @@ func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { return err } sharedUniqueKeys := isp.getSharedUniqueKeys(isp.migrationContext.OriginalTableUniqueKeys, isp.migrationContext.GhostTableUniqueKeys) - for i, sharedUniqueKey := range sharedUniqueKeys { - isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.migrationContext.OriginalTableName, &sharedUniqueKey.Columns) - uniqueKeyIsValid := true - for _, column := range sharedUniqueKey.Columns.Columns() { - switch column.Type { - case sql.FloatColumnType: - { - isp.migrationContext.Log.Warningf("Will not use %+v as shared key due to FLOAT data type", sharedUniqueKey.Name) - uniqueKeyIsValid = false - } - case sql.JSONColumnType: - { - // Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code - // will remain in place to potentially handle the future case where JSON is supported in indexes. - isp.migrationContext.Log.Warningf("Will not use %+v as shared key due to JSON data type", sharedUniqueKey.Name) - uniqueKeyIsValid = false - } - } - } - if uniqueKeyIsValid { - isp.migrationContext.UniqueKey = sharedUniqueKeys[i] - break - } - } + isp.migrationContext.UniqueKey = isp.selectUniqueKey(sharedUniqueKeys) if isp.migrationContext.UniqueKey == nil { return fmt.Errorf("no shared unique key can be found after ALTER! Bailing out") } @@ -186,7 +170,7 @@ func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { // This additional step looks at which columns are unsigned. We could have merged this within // the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel // comfortable in doing this as a separate step. - isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.migrationContext.OriginalTableName, isp.migrationContext.OriginalTableColumns, isp.migrationContext.SharedColumns, &isp.migrationContext.UniqueKey.Columns) + isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.originalTableName(), isp.migrationContext.OriginalTableColumns, isp.migrationContext.SharedColumns, &isp.migrationContext.UniqueKey.Columns) isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.migrationContext.GetGhostTableName(), isp.migrationContext.GhostTableColumns, isp.migrationContext.MappedSharedColumns) for i := range isp.migrationContext.SharedColumns.Columns() { @@ -217,6 +201,33 @@ func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { return nil } +func (isp *Inspector) selectUniqueKey(candidateKeys []*sql.UniqueKey) *sql.UniqueKey { + for i, candidateKey := range candidateKeys { + isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.originalTableName(), &candidateKey.Columns) + uniqueKeyIsValid := true + for _, column := range candidateKey.Columns.Columns() { + switch column.Type { + case sql.FloatColumnType: + { + isp.migrationContext.Log.Warningf("Will not use %+v as unique key due to FLOAT data type", candidateKey.Name) + uniqueKeyIsValid = false + } + case sql.JSONColumnType: + { + // Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code + // will remain in place to potentially handle the future case where JSON is supported in indexes. + isp.migrationContext.Log.Warningf("Will not use %+v as unique key due to JSON data type", candidateKey.Name) + uniqueKeyIsValid = false + } + } + } + if uniqueKeyIsValid { + return candidateKeys[i] + } + } + return nil +} + // validateConnection issues a simple can-connect to MySQL func (isp *Inspector) validateConnection() error { version, err := base.ValidateConnection(isp.db, isp.connectionConfig, isp.migrationContext, isp.name) @@ -471,7 +482,7 @@ func (isp *Inspector) validateLogSlaveUpdates() error { // validateTable makes sure the table we need to operate on actually exists func (isp *Inspector) validateTable() error { - query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(isp.migrationContext.DatabaseName), isp.migrationContext.OriginalTableName) + query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(isp.migrationContext.DatabaseName), isp.originalTableName()) tableFound := false err := sqlutils.QueryRowsMap(isp.db, query, func(rowMap sqlutils.RowMap) error { @@ -479,7 +490,7 @@ func (isp *Inspector) validateTable() error { isp.migrationContext.RowsEstimate = rowMap.GetInt64("Rows") isp.migrationContext.UsedRowsEstimateMethod = base.TableStatusRowsEstimate if rowMap.GetString("Comment") == "VIEW" { - return fmt.Errorf("%s.%s is a VIEW, not a real table. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return fmt.Errorf("%s.%s is a VIEW, not a real table. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } tableFound = true @@ -489,7 +500,7 @@ func (isp *Inspector) validateTable() error { return err } if !tableFound { - return isp.migrationContext.Log.Errorf("cannot find table %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("cannot find table %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Infof("Table found. Engine=%s", isp.migrationContext.TableEngine) isp.migrationContext.Log.Debugf("Estimated number of rows via STATUS: %d", isp.migrationContext.RowsEstimate) @@ -523,26 +534,26 @@ func (isp *Inspector) validateTableForeignKeys(allowChildForeignKeys bool) error return nil }, isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), ) if err != nil { return err } if numParentForeignKeys > 0 { - return isp.migrationContext.Log.Errorf("found %d parent-side foreign keys on %s.%s. Parent-side foreign keys are not supported. Bailing out", numParentForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("found %d parent-side foreign keys on %s.%s. Parent-side foreign keys are not supported. Bailing out", numParentForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } if numChildForeignKeys > 0 { if allowChildForeignKeys { isp.migrationContext.Log.Debugf("Foreign keys found and will be dropped, as per given --discard-foreign-keys flag") return nil } - return isp.migrationContext.Log.Errorf("found %d child-side foreign keys on %s.%s. Child-side foreign keys are not supported. Bailing out", numChildForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("found %d child-side foreign keys on %s.%s. Child-side foreign keys are not supported. Bailing out", numChildForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Debugf("Validated no foreign keys exist on table") return nil @@ -564,15 +575,15 @@ func (isp *Inspector) validateTableTriggers() error { return nil }, isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), ) if err != nil { return err } if numTriggers > 0 { if isp.migrationContext.IncludeTriggers { - isp.migrationContext.Log.Infof("Found %d triggers on %s.%s.", numTriggers, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) - isp.migrationContext.Triggers, err = mysql.GetTriggers(isp.db, isp.migrationContext.DatabaseName, isp.migrationContext.OriginalTableName) + isp.migrationContext.Log.Infof("Found %d triggers on %s.%s.", numTriggers, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) + isp.migrationContext.Triggers, err = mysql.GetTriggers(isp.db, isp.migrationContext.DatabaseName, isp.originalTableName()) if err != nil { return err } @@ -584,7 +595,7 @@ func (isp *Inspector) validateTableTriggers() error { } return nil } - return isp.migrationContext.Log.Errorf("found triggers on %s.%s. Tables with triggers are supported only when using \"include-triggers\" flag. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("found triggers on %s.%s. Tables with triggers are supported only when using \"include-triggers\" flag. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Debugf("Validated no triggers exist on table") return nil @@ -637,7 +648,7 @@ func (isp *Inspector) validateGhostTriggersLength() error { // estimateTableRowsViaExplain estimates number of rows on original table func (isp *Inspector) estimateTableRowsViaExplain() error { - query := fmt.Sprintf(`explain select /* gh-ost */ * from %s.%s where 1=1`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + query := fmt.Sprintf(`explain select /* gh-ost */ * from %s.%s where 1=1`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) outputFound := false err := sqlutils.QueryRowsMap(isp.db, query, func(rowMap sqlutils.RowMap) error { @@ -651,7 +662,7 @@ func (isp *Inspector) estimateTableRowsViaExplain() error { return err } if !outputFound { - return isp.migrationContext.Log.Errorf("cannot run EXPLAIN on %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("cannot run EXPLAIN on %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Infof("Estimated number of rows via EXPLAIN: %d", isp.migrationContext.RowsEstimate) return nil @@ -675,7 +686,7 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error { return err } - query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) var rowsEstimate int64 if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index c820d074d..8a53b7302 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -495,7 +495,7 @@ func (mgtr *Migrator) Migrate() (err error) { defer mgtr.teardown() if err := mgtr.initiateInspector(); err != nil { - return err + return fmt.Errorf("failed to initiate inspector: %w", err) } if err := mgtr.checkAbort(); err != nil { return err @@ -604,7 +604,7 @@ func (mgtr *Migrator) Migrate() (err error) { if err := mgtr.addDMLEventsListener(); err != nil { return err } - if err := mgtr.applier.ReadMigrationRangeValues(); err != nil { + if err := mgtr.applier.ReadMigrationRangeValues(nil); err != nil { return err } @@ -790,13 +790,27 @@ func (mgtr *Migrator) Revert() error { return nil } +// prepareMoveTablesCopyState initializes state for row copy in move-tables mode. +// for move-tables functionality, the source and target tables are identical so we just need to grab any valid UNIQUE key constraint. +func (mgtr *Migrator) prepareMoveTablesCopyState() { + mgtr.migrationContext.UniqueKey = mgtr.inspector.selectUniqueKey(mgtr.migrationContext.OriginalTableUniqueKeys) + + // In move-tables mode source and target schemas match, so shared columns are identical. + mgtr.migrationContext.SharedColumns = mgtr.migrationContext.OriginalTableColumns + mgtr.migrationContext.MappedSharedColumns = mgtr.migrationContext.OriginalTableColumns +} + func (mgtr *Migrator) MoveTables() (err error) { mgtr.migrationContext.Log.Infof("Moving tables %v from %s to %s (%s)", mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName), - sql.EscapeName(mgtr.migrationContext.MoveTables.TargetDatabase), mgtr.migrationContext.MoveTables.TargetHost) + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), mgtr.migrationContext.MoveTables.TargetHost) mgtr.migrationContext.StartTime = time.Now() + if mgtr.migrationContext.OriginalTableName == "" { + mgtr.migrationContext.OriginalTableName = mgtr.migrationContext.MoveTables.TableNames[0] + } + // Ensure context is cancelled on exit (cleanup) defer mgtr.migrationContext.CancelContext() @@ -824,10 +838,20 @@ func (mgtr *Migrator) MoveTables() (err error) { if err := mgtr.initiateApplier(); err != nil { return err } + if err := mgtr.initiateStreaming(); err != nil { + return err + } if err := mgtr.checkAbort(); err != nil { return err } + mgtr.prepareMoveTablesCopyState() + + // this function assumes that the unique key constraint has been set. + if err := mgtr.applier.prepareQueries(); err != nil { + return err + } + // Validation complete! Run on-validated hook. if err := mgtr.hooksExecutor.OnValidated(); err != nil { return err @@ -844,7 +868,7 @@ func (mgtr *Migrator) MoveTables() (err error) { if err := mgtr.addDMLEventsListener(); err != nil { return err } - if err := mgtr.applier.ReadMigrationRangeValues(); err != nil { + if err := mgtr.applier.ReadMigrationRangeValues(mgtr.inspector.db); err != nil { return err } @@ -885,7 +909,7 @@ func (mgtr *Migrator) MoveTables() (err error) { } mgtr.migrationContext.Log.Infof("Done moving tables %v from %s to %s (%s)", mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName), - sql.EscapeName(mgtr.migrationContext.MoveTables.TargetDatabase), mgtr.migrationContext.MoveTables.TargetHost) + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), mgtr.migrationContext.MoveTables.TargetHost) // Final check for abort before declaring success if err := mgtr.checkAbort(); err != nil { return err @@ -1217,14 +1241,16 @@ func (mgtr *Migrator) initiateInspector() (err error) { return err } if err := mgtr.inspector.ValidateOriginalTable(); err != nil { - return err + return fmt.Errorf("failed to validate original table: %w", err) } if err := mgtr.inspector.InspectOriginalTable(); err != nil { - return err + return fmt.Errorf("failed to inspect original table: %w", err) } // So far so good, table is accessible and valid. // Let's get master connection config - if mgtr.migrationContext.AssumeMasterHostname == "" { + if mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.MoveTables.ConnectionConfig + } else if mgtr.migrationContext.AssumeMasterHostname == "" { // No forced master host; detect master if mgtr.migrationContext.ApplierConnectionConfig, err = mgtr.inspector.getMasterConnectionConfig(); err != nil { return err @@ -1256,7 +1282,9 @@ func (mgtr *Migrator) initiateInspector() (err error) { mgtr.migrationContext.Log.Infof("--test-on-replica or --migrate-on-replica given. Will not execute on master %+v but rather on replica %+v itself", *mgtr.migrationContext.ApplierConnectionConfig.ImpliedKey, *mgtr.migrationContext.InspectorConnectionConfig.ImpliedKey, ) - mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.InspectorConnectionConfig.Duplicate() + if !mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.InspectorConnectionConfig.Duplicate() + } if mgtr.migrationContext.GetThrottleControlReplicaKeys().Len() == 0 { mgtr.migrationContext.AddThrottleControlReplicaKey(mgtr.migrationContext.InspectorConnectionConfig.Key) } @@ -1296,11 +1324,11 @@ func (mgtr *Migrator) initiateStatus() { // migration, and as response to the "status" interactive command. func (mgtr *Migrator) printMigrationStatusHint(writers ...io.Writer) { w := io.MultiWriter(writers...) - fmt.Fprintf(w, "# Migrating %s.%s; Ghost table is %s.%s\n", + fmt.Fprintf(w, "# Migrating %s.%s; Target table is %s.%s\n", sql.EscapeName(mgtr.migrationContext.DatabaseName), sql.EscapeName(mgtr.migrationContext.OriginalTableName), - sql.EscapeName(mgtr.migrationContext.DatabaseName), - sql.EscapeName(mgtr.migrationContext.GetGhostTableName()), + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), + sql.EscapeName(mgtr.migrationContext.GetTargetTableName()), ) fmt.Fprintf(w, "# Migrating %+v; inspecting %+v; executing on %+v\n", *mgtr.applier.connectionConfig.ImpliedKey, @@ -1548,14 +1576,19 @@ func (mgtr *Migrator) initiateStreaming() error { if err := mgtr.eventsStreamer.InitDBConnections(); err != nil { return err } - mgtr.eventsStreamer.AddListener( - false, - mgtr.migrationContext.DatabaseName, - mgtr.migrationContext.GetChangelogTableName(), - func(dmlEntry *binlog.BinlogEntry) error { - return mgtr.onChangelogEvent(dmlEntry) - }, - ) + + if mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.Log.Info("Skipping stream of the changelog table") + } else { + mgtr.eventsStreamer.AddListener( + false, + mgtr.migrationContext.DatabaseName, + mgtr.migrationContext.GetChangelogTableName(), + func(dmlEntry *binlog.BinlogEntry) error { + return mgtr.onChangelogEvent(dmlEntry) + }, + ) + } go func() { mgtr.migrationContext.Log.Debugf("Beginning streaming") @@ -1583,10 +1616,15 @@ func (mgtr *Migrator) initiateStreaming() error { // addDMLEventsListener begins listening for binlog events on the original table, // and creates & enqueues a write task per such event. func (mgtr *Migrator) addDMLEventsListener() error { + originalTableName := mgtr.migrationContext.OriginalTableName + if mgtr.migrationContext.IsMoveTablesMode() { + originalTableName = mgtr.migrationContext.MoveTables.TableNames[0] + } + err := mgtr.eventsStreamer.AddListener( false, mgtr.migrationContext.DatabaseName, - mgtr.migrationContext.OriginalTableName, + originalTableName, func(dmlEntry *binlog.BinlogEntry) error { // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits // This is critical because this callback blocks the event streamer @@ -1598,6 +1636,12 @@ func (mgtr *Migrator) addDMLEventsListener() error { // initiateThrottler kicks in the throttling collection and the throttling checks. func (mgtr *Migrator) initiateThrottler() { + if mgtr.migrationContext.IsMoveTablesMode() { + // TODO(chriskirkland): throttle against the target cluster + mgtr.migrationContext.Log.Info("Skipping throttling in move tables mode") + return + } + mgtr.throttler = NewThrottler(mgtr.migrationContext, mgtr.applier, mgtr.inspector, mgtr.appVersion) go mgtr.throttler.initiateThrottlerCollection(mgtr.firstThrottlingCollected) @@ -1617,39 +1661,51 @@ func (mgtr *Migrator) initiateApplier() error { if err := mgtr.applier.AcquireMigrationLock(mgtr.migrationContext.GetContext()); err != nil { return err } - if mgtr.migrationContext.Revert { - if err := mgtr.applier.CreateChangelogTable(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } - } else if !mgtr.migrationContext.Resume { - if err := mgtr.applier.ValidateOrDropExistingTables(); err != nil { - return err - } - if err := mgtr.applier.CreateChangelogTable(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } - if err := mgtr.applier.CreateGhostTable(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - return err + + if mgtr.migrationContext.IsMoveTablesMode() { + createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.MoveTables.TableNames[0]) + if err != nil { + return fmt.Errorf("failed to fetch create table statement: %w", err) } - if err := mgtr.applier.AlterGhost(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table, see further error details. Bailing out") + if err := mgtr.applier.CreateTargetTable(createTableStatement); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create target table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") return err } + } else { + if mgtr.migrationContext.Revert { + if err := mgtr.applier.CreateChangelogTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + } else if !mgtr.migrationContext.Resume { + if err := mgtr.applier.ValidateOrDropExistingTables(); err != nil { + return err + } + if err := mgtr.applier.CreateChangelogTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + if err := mgtr.applier.CreateGhostTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + if err := mgtr.applier.AlterGhost(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table, see further error details. Bailing out") + return err + } - if mgtr.migrationContext.OriginalTableAutoIncrement > 0 && !mgtr.parser.IsAutoIncrementDefined() { - // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, - // so we should copy AUTO_INCREMENT value onto our ghost table. - if err := mgtr.applier.AlterGhostAutoIncrement(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + if mgtr.migrationContext.OriginalTableAutoIncrement > 0 && !mgtr.parser.IsAutoIncrementDefined() { + // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, + // so we should copy AUTO_INCREMENT value onto our ghost table. + if err := mgtr.applier.AlterGhostAutoIncrement(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + return err + } + } + if _, err := mgtr.applier.WriteChangelogState(string(GhostTableMigrated)); err != nil { return err } } - if _, err := mgtr.applier.WriteChangelogState(string(GhostTableMigrated)); err != nil { - return err - } } // ensure performance_schema.metadata_locks is available. @@ -1663,7 +1719,9 @@ func (mgtr *Migrator) initiateApplier() error { mgtr.migrationContext.Log.Warning("proceeding without metadata lock check. There is a small chance of data loss if another session accesses the ghost table during cut-over. See https://github.com/github/gh-ost/pull/1536 for details") } - go mgtr.applier.InitiateHeartbeat() + if !mgtr.migrationContext.IsMoveTablesMode() { + go mgtr.applier.InitiateHeartbeat() + } return nil } @@ -1705,7 +1763,13 @@ func (mgtr *Migrator) iterateChunks() error { } // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() + var hasFurtherRange bool + var err error + if mgtr.migrationContext.IsMoveTablesMode() { + hasFurtherRange, err = mgtr.applier.CalculateNextIterationRangeEndValues(mgtr.inspector.db) + } else { + hasFurtherRange, err = mgtr.applier.CalculateNextIterationRangeEndValues(nil) + } if err != nil { return err // wrapping call will retry } @@ -1726,13 +1790,14 @@ func (mgtr *Migrator) iterateChunks() error { } var rowsAffected int64 if mgtr.migrationContext.IsMoveTablesMode() { - _, rowsAffected, _, err = mgtr.applier.ApplyIterationMoveTableCopyQueries() + _, rowsAffected, _, err = mgtr.applier.ApplyIterationMoveTableCopyQueries(mgtr.inspector.db) } else { _, rowsAffected, _, err = mgtr.applier.ApplyIterationInsertQuery() } if err != nil { return err // wrapping call will retry } + mgtr.migrationContext.Log.Debugf("ApplyIterationInsertQuery affected %d rows", rowsAffected) if mgtr.migrationContext.PanicOnWarnings { if len(mgtr.migrationContext.MigrationLastInsertSQLWarnings) > 0 { @@ -1942,7 +2007,11 @@ func (mgtr *Migrator) executeWriteFuncs() error { return nil } - mgtr.throttler.throttle(nil) + if !mgtr.migrationContext.IsMoveTablesMode() { + // disable throttling in move-tables mode for now + // https://github.com/github/database-infrastructure/issues/8212 + mgtr.throttler.throttle(nil) + } // We give higher priority to event processing, then secondary priority to // rowcopy @@ -2021,14 +2090,21 @@ func (mgtr *Migrator) finalCleanup() error { if createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.GetGhostTableName()); err == nil { mgtr.migrationContext.Log.Infof("New table structure follows") fmt.Println(createTableStatement) - } else { - mgtr.migrationContext.Log.Errore(err) + } else if !mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.Log.Errore(fmt.Errorf("error showing create table: %w", err)) } } if err := mgtr.eventsStreamer.Close(); err != nil { mgtr.migrationContext.Log.Errore(err) } + if mgtr.migrationContext.IsMoveTablesMode() { + // for move-tables mode, we're done at this point + // TODO(zacharysierakowski): when we add the checkpoint table in for 1.6, make sure we cleanup + // the checkpoint table here first before returning (looks like that's a few lines below changelog table cleanup) + return nil + } + if err := mgtr.retryOperation(mgtr.applier.DropChangelogTable); err != nil { return err } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index fc8feada1..d6e972cc3 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -677,7 +677,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() { suite.Require().NoError(migrator.initiateApplier()) defer migrator.applier.Teardown() suite.Require().NoError(migrator.applier.prepareQueries()) - suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) + suite.Require().NoError(migrator.applier.ReadMigrationRangeValues(nil)) go migrator.iterateChunks() go func() { @@ -749,7 +749,7 @@ func (suite *MigratorTestSuite) TestCopierCompositePK() { suite.Require().NoError(migrator.initiateApplier()) defer migrator.applier.Teardown() suite.Require().NoError(migrator.applier.prepareQueries()) - suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) + suite.Require().NoError(migrator.applier.ReadMigrationRangeValues(nil)) go migrator.iterateChunks() go func() { diff --git a/localtests/docker-compose-move-tables.yml b/localtests/docker-compose-move-tables.yml new file mode 100644 index 000000000..8ba8647ea --- /dev/null +++ b/localtests/docker-compose-move-tables.yml @@ -0,0 +1,57 @@ +services: + mysql-source-primary: + image: $TEST_MYSQL_IMAGE + container_name: mysql-source-primary + command: --server-id=1 --log-bin=mysql-bin --binlog-format=row --gtid-mode=ON --enforce-gtid-consistency=ON --character-set-server=utf8mb4 $MYSQL_NATIVE_PASSWORD_FLAG + environment: + MYSQL_ROOT_PASSWORD: opensesame + MYSQL_ROOT_HOST: '%' + MYSQL_DATABASE: test + MYSQL_TCP_PORT: 3307 + INIT_ROCKSDB: 1 # for percona-server + ports: + - '3307:3307' + expose: + - '3307' + mysql-source-replica: + image: $TEST_MYSQL_IMAGE + container_name: mysql-source-replica + command: --server-id=2 --log-bin=mysql-bin --binlog-format=row --gtid-mode=ON --enforce-gtid-consistency=ON --log-slave-updates=ON --character-set-server=utf8mb4 $MYSQL_NATIVE_PASSWORD_FLAG + environment: + MYSQL_ROOT_PASSWORD: opensesame + MYSQL_ROOT_HOST: '%' + MYSQL_DATABASE: test + MYSQL_TCP_PORT: 3308 + INIT_ROCKSDB: 1 # for percona-server + ports: + - '3308:3308' + expose: + - '3308' + mysql-target-primary: + image: $TEST_MYSQL_IMAGE + container_name: mysql-target-primary + command: --server-id=3 --log-bin=mysql-bin --binlog-format=row --gtid-mode=ON --enforce-gtid-consistency=ON --character-set-server=utf8mb4 $MYSQL_NATIVE_PASSWORD_FLAG + environment: + MYSQL_ROOT_PASSWORD: opensesame + MYSQL_ROOT_HOST: '%' + MYSQL_DATABASE: test + MYSQL_TCP_PORT: 3309 + INIT_ROCKSDB: 1 # for percona-server + ports: + - '3309:3309' + expose: + - '3309' + mysql-target-replica: + image: $TEST_MYSQL_IMAGE + container_name: mysql-target-replica + command: --server-id=4 --log-bin=mysql-bin --binlog-format=row --gtid-mode=ON --enforce-gtid-consistency=ON --log-slave-updates=ON --character-set-server=utf8mb4 $MYSQL_NATIVE_PASSWORD_FLAG + environment: + MYSQL_ROOT_PASSWORD: opensesame + MYSQL_ROOT_HOST: '%' + MYSQL_DATABASE: test + MYSQL_TCP_PORT: 3310 + INIT_ROCKSDB: 1 # for percona-server + ports: + - '3310:3310' + expose: + - '3310' \ No newline at end of file diff --git a/localtests/move-tables/create.sql b/localtests/move-tables/create.sql new file mode 100644 index 000000000..46e919003 --- /dev/null +++ b/localtests/move-tables/create.sql @@ -0,0 +1,34 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id bigint(20) NOT NULL AUTO_INCREMENT, + column1 int(11) NOT NULL, + column2 smallint(5) unsigned NOT NULL, + column3 mediumint(8) unsigned NOT NULL, + column4 tinyint(3) unsigned NOT NULL, + column5 int(11) NOT NULL, + column6 int(11) NOT NULL, + PRIMARY KEY (id), + KEY c12_ix (column1, column2) +) auto_increment=1; + +insert into gh_ost_test values + (NULL, 1001, 100, 500000, 10, 1700000001, 1700000002), + (NULL, 1002, 200, 600000, 20, 1700000003, 1700000004), + (NULL, 1003, 300, 700000, 30, 1700000005, 1700000006), + (NULL, 1004, 400, 800000, 40, 1700000007, 1700000008), + (NULL, 1005, 500, 900000, 50, 1700000009, 1700000010), + (NULL, 1006, 600, 1000000, 60, 1700000011, 1700000012), + (NULL, 1007, 700, 1100000, 70, 1700000013, 1700000014), + (NULL, 1008, 800, 1200000, 80, 1700000015, 1700000016), + (NULL, 1009, 900, 1300000, 90, 1700000017, 1700000018), + (NULL, 1010, 1000, 1400000, 100, 1700000019, 1700000020), + (NULL, 1011, 1100, 1500000, 110, 1700000021, 1700000022), + (NULL, 1012, 1200, 1600000, 120, 1700000023, 1700000024), + (NULL, 1013, 1300, 1700000, 130, 1700000025, 1700000026), + (NULL, 1014, 1400, 1800000, 140, 1700000027, 1700000028), + (NULL, 1015, 1500, 1900000, 150, 1700000029, 1700000030), + (NULL, 1016, 1600, 2000000, 160, 1700000031, 1700000032), + (NULL, 1017, 1700, 2100000, 170, 1700000033, 1700000034), + (NULL, 1018, 1800, 2200000, 180, 1700000035, 1700000036), + (NULL, 1019, 1900, 2300000, 190, 1700000037, 1700000038), + (NULL, 1020, 2000, 2400000, 200, 1700000039, 1700000040); \ No newline at end of file diff --git a/script/move-tables/README.md b/script/move-tables/README.md new file mode 100644 index 000000000..52d138d09 --- /dev/null +++ b/script/move-tables/README.md @@ -0,0 +1,180 @@ +### Setup + +Setup the multi-cluster topology and seed the data +```bash +script/move-tables/setup +``` + +Verify data is present in the source cluster. +```bash +script/move-tables/mysql-source-primary -D gh_ost_test_db -e "SELECT * FROM gh_ost_test;" +``` + +Verify the empty database is present in the target cluster. +```bash +script/move-tables/mysql-target-primary -D gh_ost_test_db -e "SHOW TABLES;" +``` + +### Testing `gh-ost` + +Checkout your branch of `github/gh-ost` and build the binaries: +```bash +script/build --cli +``` + +Run gh-ost to move tables: +```bash +./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --execute --verbose +``` + +### WIP + +Current state based on the current outer dev loop: + + +```bash + + +\u2718 \e[2m\u2388 (\u2205) gh-ost:(move-tables/1.2-skip-ghost-tables) +> rm /tmp/gh-ost.gh_ost_test_db..sock; ./script/build --cli && ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --execute --verbose +rm: /tmp/gh-ost.gh_ost_test_db..sock: No such file or directory +go version go1.25.9 darwin/arm64 found in : Go Binary: /opt/homebrew/bin/go +++ '[' '!' -L .gopath/src/github.com/github/gh-ost ']' +++ export GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath:/Users/chriskirkland/git/src/github.com/github/gh-ost/.vendor +++ GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath:/Users/chriskirkland/git/src/github.com/github/gh-ost/.vendor ++ mkdir -p bin ++ bindir=/Users/chriskirkland/git/src/github.com/github/gh-ost/bin ++ scriptdir=/Users/chriskirkland/git/src/github.com/github/gh-ost/script +++ git rev-parse HEAD ++ version=0508dd782e1871de9dcaa51d3f59e5ba4cd92117 +++ git describe --tags --always --dirty ++ describe=v1.1.9-19-g0508dd78-dirty ++ export GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath ++ GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath ++ cd .gopath/src/github.com/github/gh-ost ++ go build -o /Users/chriskirkland/git/src/github.com/github/gh-ost/bin/gh-ost -ldflags '-X main.AppVersion=0508dd782e1871de9dcaa51d3f59e5ba4cd92117 -X main.BuildDescribe=v1.1.9-19-g0508dd78-dirty' ./go/cmd/gh-ost/main.go +2026-06-01 16:41:13 INFO starting gh-ost 0508dd782e1871de9dcaa51d3f59e5ba4cd92117 (git commit: unknown) +2026-06-01 16:41:13 INFO Moving tables [gh_ost_test] from `gh_ost_test_db` to `gh_ost_test_db` (localhost) +2026-06-01 16:41:13 INFO inspector connection validated on localhost:3308 +2026-06-01 16:41:13 INFO User has SUPER, REPLICATION SLAVE privileges, and has ALL privileges on `gh_ost_test_db`.* +2026-06-01 16:41:13 INFO binary logs validated on localhost:3308 +2026-06-01 16:41:13 INFO Restarting replication on localhost:3308 to make sure binlog settings apply to replication thread +2026-06-01 16:41:13 INFO Inspector initiated on 3e162abb4a14:3308, version 8.0.41 +2026-06-01 16:41:13 INFO Inspector validating original table +2026-06-01 16:41:13 INFO Table found. Engine=InnoDB +2026-06-01 16:41:13 INFO Estimated number of rows via EXPLAIN: 20 +2026-06-01 16:41:13 INFO Inspector validated original table +2026-06-01 16:41:13 INFO Inspector inspected original table +2026-06-01 16:41:13 INFO log_slave_updates validated on localhost:3308 +2026-06-01 16:41:13 INFO Inspector validated and initialized +2026-06-01 16:41:13 INFO applier connection validated on localhost:3309 +2026-06-01 16:41:13 INFO applier connection validated on localhost:3309 +2026-06-01 16:41:13 INFO will use time_zone='SYSTEM' on applier +2026-06-01 16:41:13 INFO applier connection validated on localhost:3309 +2026-06-01 16:41:13 INFO Applier initiated on 381ee87dc2c6:3309, version 8.0.41 +2026-06-01 16:41:13 INFO Fetching create table statement for `gh_ost_test_db.gh_ost_test` +2026-06-01 16:41:13 INFO Create table statement: CREATE TABLE `gh_ost_test` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `column1` int NOT NULL, + `column2` smallint unsigned NOT NULL, + `column3` mediumint unsigned NOT NULL, + `column4` tinyint unsigned NOT NULL, + `column5` int NOT NULL, + `column6` int NOT NULL, + PRIMARY KEY (`id`), + KEY `c12_ix` (`column1`,`column2`) +) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci +2026-06-01 16:41:13 INFO Creating target table `gh_ost_test_db`.`gh_ost_test` +2026-06-01 16:41:13 INFO Target table created +2026-06-01 16:41:13 INFO streamer connection validated on localhost:3308 +[2026/06/01 16:41:13] [info] binlogsyncer.go:191 create BinlogSyncer with config {ServerID:99999 Flavor:mysql Host:localhost Port:3308 User:root Password: Localhost: Charset: SemiSyncEnabled:false RawModeEnabled:false TLSConfig: ParseTime:false TimestampStringLocation:UTC UseDecimal:true RecvBufferSize:0 HeartbeatPeriod:0s ReadTimeout:0s MaxReconnectAttempts:0 DisableRetrySync:false VerifyChecksum:false DumpCommandFlag:0 Option: Logger:0x14000494a20 Dialer:0x100c8c0c0 RowsEventDecodeFunc: TableMapOptionalMetaDecodeFunc: DiscardGTIDSet:false EventCacheCount:10240 SynchronousEventHandler:} +2026-06-01 16:41:13 INFO Connecting binlog streamer at mysql-bin.000003:2987908 +[2026/06/01 16:41:13] [info] binlogsyncer.go:443 begin to sync binlog from position (mysql-bin.000003, 2987908) +[2026/06/01 16:41:13] [info] binlogsyncer.go:409 Connected to mysql 8.0.41 server +2026-06-01 16:41:13 INFO Skipping stream of the changelog table [] +[2026/06/01 16:41:13] [info] binlogsyncer.go:868 rotate to (mysql-bin.000003, 2987908) +2026-06-01 16:41:13 INFO rotate to next log from mysql-bin.000003:0 to mysql-bin.000003 +2026-06-01 16:41:13 INFO Listening on unix socket file: /tmp/gh-ost.gh_ost_test_db..sock +2026-06-01 16:41:13 INFO Adding listener for gh_ost_test_db.gh_ost_test +2026-06-01 16:41:13 INFO Reading migration range according to key: PRIMARY ( + select /* gh-ost `gh_ost_test_db`.`gh_ost_test` */ `id` + from + `gh_ost_test_db`.`gh_ost_test` + force index (PRIMARY) + order by + `id` asc + limit 1) +2026-06-01 16:41:13 INFO Migration min values: [1] +2026-06-01 16:41:13 INFO Migration max values: [20] +2026-06-01 16:41:13 INFO Skipping throttling in move tables mode [] +# Migrating `gh_ost_test_db`.`gh_ost_test`; Target table is `gh_ost_test_db`.`gh_ost_test` +# Migrating 381ee87dc2c6:3309; inspecting 3e162abb4a14:3308; executing on Chriss-MBP-2 +# Migration started at Mon Jun 01 16:41:13 -0600 2026 +# chunk-size: 1000; max-lag-millis: 1500ms; dml-batch-size: 10; max-load: ; critical-load: ; nice-ratio: 0.000000 +# throttle-additional-flag-file: /tmp/gh-ost.throttle +# Serving on unix socket: /tmp/gh-ost.gh_ost_test_db..sock +Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 0s(total), 0s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A +2026-06-01 16:41:13 INFO Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 0s(total), 0s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A [] +Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 1s(total), 1s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A +2026-06-01 16:41:14 INFO [execWriteFuncs] Processing row copy function [] +2026-06-01 16:41:14 INFO Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 1s(total), 1s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A [] +2026-06-01 16:41:14 INFO ApplyIterationInsertQuery affected 20 rows +2026-06-01 16:41:14 INFO [execWriteFuncs] Processing row copy function [] +2026-06-01 16:41:14 INFO [execWriteFuncs] Processing row copy function [] +2026-06-01 16:41:14 INFO Row copy complete +2026-06-01 16:41:14 INFO Writing changelog state: Migrated +[2026/06/01 16:41:14] [info] binlogsyncer.go:225 syncer is closing... +2026-06-01 16:41:14 INFO StreamEvents encountered unexpected error: Sync was closed +github.com/go-mysql-org/go-mysql/replication.init + :1 +runtime.doInit1 + /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/proc.go:7670 +runtime.doInit + /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/proc.go:7637 +runtime.main + /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/proc.go:256 +runtime.goexit + /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/asm_arm64.s:1268 +[2026/06/01 16:41:14] [info] binlogsyncer.go:988 kill last connection id 405 +[2026/06/01 16:41:14] [info] binlogsyncer.go:255 syncer is closed +2026-06-01 16:41:14 INFO Closed streamer connection. err= +2026-06-01 16:41:14 INFO Done moving tables [gh_ost_test] from `gh_ost_test_db` to `gh_ost_test_db` (localhost) +2026-06-01 16:41:14 INFO Removing socket file: /tmp/gh-ost.gh_ost_test_db..sock +2026-06-01 16:41:14 INFO Tearing down inspector +2026-06-01 16:41:14 INFO Tearing down applier +2026-06-01 16:41:14 INFO Tearing down streamer +# Done + +``` + +:tada: :tada: :tada: :tada: +```bash + +\u2714 \e[2m\u2388 (\u2205) gh-ost-tablemove-poc:(chriskirkland/move-tables) +> ./script/move-tables/mysql-target-primary -D "gh_ost_test_db" -e "SELECT * FROM gh_ost_test;" ++----+---------+---------+---------+---------+------------+------------+ +| id | column1 | column2 | column3 | column4 | column5 | column6 | ++----+---------+---------+---------+---------+------------+------------+ +| 1 | 1001 | 100 | 500000 | 10 | 1700000001 | 1700000002 | +| 2 | 1002 | 200 | 600000 | 20 | 1700000003 | 1700000004 | +| 3 | 1003 | 300 | 700000 | 30 | 1700000005 | 1700000006 | +| 4 | 1004 | 400 | 800000 | 40 | 1700000007 | 1700000008 | +| 5 | 1005 | 500 | 900000 | 50 | 1700000009 | 1700000010 | +| 6 | 1006 | 600 | 1000000 | 60 | 1700000011 | 1700000012 | +| 7 | 1007 | 700 | 1100000 | 70 | 1700000013 | 1700000014 | +| 8 | 1008 | 800 | 1200000 | 80 | 1700000015 | 1700000016 | +| 9 | 1009 | 900 | 1300000 | 90 | 1700000017 | 1700000018 | +| 10 | 1010 | 1000 | 1400000 | 100 | 1700000019 | 1700000020 | +| 11 | 1011 | 1100 | 1500000 | 110 | 1700000021 | 1700000022 | +| 12 | 1012 | 1200 | 1600000 | 120 | 1700000023 | 1700000024 | +| 13 | 1013 | 1300 | 1700000 | 130 | 1700000025 | 1700000026 | +| 14 | 1014 | 1400 | 1800000 | 140 | 1700000027 | 1700000028 | +| 15 | 1015 | 1500 | 1900000 | 150 | 1700000029 | 1700000030 | +| 16 | 1016 | 1600 | 2000000 | 160 | 1700000031 | 1700000032 | +| 17 | 1017 | 1700 | 2100000 | 170 | 1700000033 | 1700000034 | +| 18 | 1018 | 1800 | 2200000 | 180 | 1700000035 | 1700000036 | +| 19 | 1019 | 1900 | 2300000 | 190 | 1700000037 | 1700000038 | +| 20 | 1020 | 2000 | 2400000 | 200 | 1700000039 | 1700000040 | ++----+---------+---------+---------+---------+------------+------------+ + +``` \ No newline at end of file diff --git a/script/move-tables/mysql-source-primary b/script/move-tables/mysql-source-primary new file mode 100755 index 000000000..9643924d7 --- /dev/null +++ b/script/move-tables/mysql-source-primary @@ -0,0 +1,6 @@ +#!/bin/bash +# +# This executes a command on the mysql-source-primary docker container created +# from localtests/docker-compose-move-tables.yml. + +MYSQL_PWD=opensesame mysql -uroot -h0.0.0.0 -P3307 "$@" \ No newline at end of file diff --git a/script/move-tables/mysql-source-replica b/script/move-tables/mysql-source-replica new file mode 100755 index 000000000..f42a698d4 --- /dev/null +++ b/script/move-tables/mysql-source-replica @@ -0,0 +1,6 @@ +#!/bin/bash +# +# This executes a command on the mysql-source-replica docker container created +# from localtests/docker-compose-move-tables.yml. + +MYSQL_PWD=opensesame mysql -uroot -h0.0.0.0 -P3308 "$@" \ No newline at end of file diff --git a/script/move-tables/mysql-target-primary b/script/move-tables/mysql-target-primary new file mode 100755 index 000000000..9a229231c --- /dev/null +++ b/script/move-tables/mysql-target-primary @@ -0,0 +1,6 @@ +#!/bin/bash +# +# This executes a command on the mysql-target-primary docker container created +# from localtests/docker-compose-move-tables.yml. + +MYSQL_PWD=opensesame mysql -uroot -h0.0.0.0 -P3309 "$@" \ No newline at end of file diff --git a/script/move-tables/mysql-target-replica b/script/move-tables/mysql-target-replica new file mode 100755 index 000000000..ae43d4779 --- /dev/null +++ b/script/move-tables/mysql-target-replica @@ -0,0 +1,6 @@ +#!/bin/bash +# +# This executes a command on the mysql-target-replica docker container created +# from localtests/docker-compose-move-tables.yml. + +MYSQL_PWD=opensesame mysql -uroot -h0.0.0.0 -P3310 "$@" \ No newline at end of file diff --git a/script/move-tables/setup b/script/move-tables/setup new file mode 100755 index 000000000..05fd1ece9 --- /dev/null +++ b/script/move-tables/setup @@ -0,0 +1,130 @@ +#!/bin/bash + +# This script starts four MySQL docker containers in two primary-replica clusters +# which can be used for running tablemove replica tests. +# Set the environment var TEST_MYSQL_IMAGE to change the docker image. +# +# Usage: +# setup start the containers + +set -e + +GH_OST_ROOT=$(git rev-parse --show-toplevel) +SCRIPT_PATH="${GH_OST_ROOT}/script/move-tables" +DATABASE_NAME="gh_ost_test_db" + +poll_mysql() { + CTR=0 + cmd="exec-mysql-$1" + while ! $cmd -e "select 1;" >/dev/null 2>&1; do + sleep 1 + CTR=$((CTR + 1)) + if [ $CTR -gt 30 ]; then + echo " ❌ MySQL $1 failed to start" + return 1 + fi + done + echo " ✔ MySQL $1 OK" + return 0 +} + +exec-mysql-source-primary() { + if [[ $TEST_MYSQL_IMAGE =~ "mysql:8.4" ]]; then + ${SCRIPT_PATH}/mysql-source-primary --ssl-mode=required "$@" + else + ${SCRIPT_PATH}/mysql-source-primary "$@" + fi +} + +exec-mysql-source-replica() { + if [[ $TEST_MYSQL_IMAGE =~ "mysql:8.4" ]]; then + ${SCRIPT_PATH}/mysql-source-replica --ssl-mode=required "$@" + else + ${SCRIPT_PATH}/mysql-source-replica "$@" + fi +} + +exec-mysql-target-primary() { + if [[ $TEST_MYSQL_IMAGE =~ "mysql:8.4" ]]; then + ${SCRIPT_PATH}/mysql-target-primary --ssl-mode=required "$@" + else + ${SCRIPT_PATH}/mysql-target-primary "$@" + fi +} + +exec-mysql-target-replica() { + if [[ $TEST_MYSQL_IMAGE =~ "mysql:8.4" ]]; then + ${SCRIPT_PATH}/mysql-target-replica --ssl-mode=required "$@" + else + ${SCRIPT_PATH}/mysql-target-replica "$@" + fi +} + +setup() { + [ -z "$TEST_MYSQL_IMAGE" ] && TEST_MYSQL_IMAGE="mysql:8.0.41" + + echo "Starting MySQL $TEST_MYSQL_IMAGE containers (2 clusters)..." + compose_file="$GH_OST_ROOT/localtests/docker-compose-move-tables.yml" + MYSQL_SHA2_RSA_KEYS_FLAG="" + MYSQL_PASSWORD_HASHING_ALGORITHM="mysql_native_password" + MYSQL_NATIVE_PASSWORD_FLAG="" + if [[ $TEST_MYSQL_IMAGE =~ "mysql:8.4" ]]; then + MYSQL_PASSWORD_HASHING_ALGORITHM="caching_sha2_password" + MYSQL_SHA2_RSA_KEYS_FLAG="--caching-sha2-password-auto-generate-rsa-keys=ON" + MYSQL_NATIVE_PASSWORD_FLAG="$MYSQL_SHA2_RSA_KEYS_FLAG" + fi + (TEST_MYSQL_IMAGE="$TEST_MYSQL_IMAGE" MYSQL_NATIVE_PASSWORD_FLAG="$MYSQL_NATIVE_PASSWORD_FLAG" envsubst <"$compose_file") >"$compose_file.tmp" + + docker compose -f "$compose_file.tmp" up -d --wait + + echo "Waiting for MySQL..." + poll_mysql "source-primary" || exit 1 + poll_mysql "source-replica" || exit 1 + poll_mysql "target-primary" || exit 1 + poll_mysql "target-replica" || exit 1 + + # Setup replication for source cluster, not idempotent + echo -n "Setting up replication for source cluster..." + exec-mysql-source-primary -e "create user if not exists 'repl'@'%' identified with $MYSQL_PASSWORD_HASHING_ALGORITHM by 'repl';" + exec-mysql-source-primary -e "grant replication slave on *.* to 'repl'@'%'; flush privileges;" + exec-mysql-source-primary -e "create user if not exists 'gh-ost'@'%' identified with $MYSQL_PASSWORD_HASHING_ALGORITHM by 'gh-ost';" + exec-mysql-source-primary -e "grant all on *.* to 'gh-ost'@'%';" + + sleep 1 + if [[ $TEST_MYSQL_IMAGE =~ "mysql:8.4" ]]; then + exec-mysql-source-replica -e "change replication source to source_host='mysql-source-primary', source_port=3307, source_user='repl', source_password='repl', source_auto_position=1, source_ssl=1;" + exec-mysql-source-replica -e "start replica;" + else + exec-mysql-source-replica -e "change master to master_host='mysql-source-primary', master_port=3307, master_user='repl', master_password='repl', master_auto_position=1;" + exec-mysql-source-replica -e "start slave;" + fi + echo "OK" + + # Setup replication for target cluster + echo -n "Setting up replication for target cluster..." + exec-mysql-target-primary -e "create user if not exists 'repl'@'%' identified with $MYSQL_PASSWORD_HASHING_ALGORITHM by 'repl';" + exec-mysql-target-primary -e "grant replication slave on *.* to 'repl'@'%'; flush privileges;" + exec-mysql-target-primary -e "create user if not exists 'gh-ost'@'%' identified with $MYSQL_PASSWORD_HASHING_ALGORITHM by 'gh-ost';" + exec-mysql-target-primary -e "grant all on *.* to 'gh-ost'@'%';" + + sleep 1 + if [[ $TEST_MYSQL_IMAGE =~ "mysql:8.4" ]]; then + exec-mysql-target-replica -e "change replication source to source_host='mysql-target-primary', source_port=3309, source_user='repl', source_password='repl', source_auto_position=1, source_ssl=1;" + exec-mysql-target-replica -e "start replica;" + else + exec-mysql-target-replica -e "change master to master_host='mysql-target-primary', master_port=3309, master_user='repl', master_password='repl', master_auto_position=1;" + exec-mysql-target-replica -e "start slave;" + fi + echo "OK" + + echo -n "Initializing '$DATABASE_NAME' database into each cluster..." + exec-mysql-source-primary -e "CREATE DATABASE IF NOT EXISTS $DATABASE_NAME;" + exec-mysql-target-primary -e "CREATE DATABASE IF NOT EXISTS $DATABASE_NAME;" + echo "OK" + + echo -n "Seeding data in source cluster..." + exec-mysql-source-primary -D $DATABASE_NAME < "$GH_OST_ROOT/localtests/move-tables/create.sql" + echo "OK" +} + +setup \ No newline at end of file diff --git a/script/move-tables/teardown b/script/move-tables/teardown new file mode 100755 index 000000000..edddb4b59 --- /dev/null +++ b/script/move-tables/teardown @@ -0,0 +1,20 @@ +#!/bin/bash + +# This script removes the four MySQL docker containers in two primary-replica clusters +# expected to be provisioned by the setup script. +# +# Usage: +# teardown remove the containers + +set -e + +GH_OST_ROOT=$(git rev-parse --show-toplevel) +if [[ ":$PATH:" != *":$GH_OST_ROOT:"* ]]; then + export PATH="${PATH}:${GH_OST_ROOT}/script" +fi + +echo "Stopping containers..." +docker stop mysql-source-replica mysql-source-primary mysql-target-replica mysql-target-primary 2>/dev/null || true + +echo "Removing containers..." +docker rm -f mysql-source-replica mysql-source-primary mysql-target-replica mysql-target-primary 2>/dev/null || true \ No newline at end of file