diff --git a/go/base/context.go b/go/base/context.go index 617e5bb13..83d08711a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -271,6 +271,17 @@ type MigrationContext struct { SkipMetadataLockCheck bool IsOpenMetadataLockInstruments bool + // move tables: + MoveTables struct { + TableNames []string // List of table names to be moved. + TargetHost string // Target hostname for the move. This must be a primary/writable host. + TargetPort int // Target MySQL port for the move. + TargetUser string // Target username for the move. If not specified, it will default to the source user. + TargetPass string // Target password for the move. If not specified, it will default to the source password. + TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name. + ConnectionConfig *mysql.ConnectionConfig + } + Log Logger } @@ -343,6 +354,9 @@ func (mctx *MigrationContext) SetConnectionConfig(storageEngine string) error { } mctx.InspectorConnectionConfig.TransactionIsolation = transactionIsolation mctx.ApplierConnectionConfig.TransactionIsolation = transactionIsolation + if mctx.MoveTables.ConnectionConfig != nil { + mctx.MoveTables.ConnectionConfig.TransactionIsolation = transactionIsolation + } return nil } @@ -353,6 +367,9 @@ func (mctx *MigrationContext) SetConnectionCharset(charset string) { mctx.InspectorConnectionConfig.Charset = charset mctx.ApplierConnectionConfig.Charset = charset + if mctx.MoveTables.ConnectionConfig != nil { + mctx.MoveTables.ConnectionConfig.Charset = charset + } } func getSafeTableName(baseName string, suffix string) string { @@ -378,6 +395,24 @@ func (mctx *MigrationContext) GetGhostTableName() string { } } +// GetTargetTableName generates the name of the target table, based on original table name and +// the migration context (i.e. move-tables mode). +func (mctx *MigrationContext) GetTargetTableName() string { + if mctx.IsMoveTablesMode() { + return mctx.MoveTables.TableNames[0] + } + return mctx.GetGhostTableName() +} + +// GetTargetDatabaseName fetches the name of the target database, which defaults to the original +// database name unless we're in move-tables mode. +func (mctx *MigrationContext) GetTargetDatabaseName() string { + if mctx.IsMoveTablesMode() { + return mctx.DatabaseName + } + return mctx.MoveTables.TargetDatabase +} + // GetOldTableName generates the name of the "old" table, into which the original table is renamed. func (mctx *MigrationContext) GetOldTableName() string { var tableName string @@ -900,8 +935,15 @@ func (mctx *MigrationContext) AddThrottleControlReplicaKey(key mysql.InstanceKey return nil } +// TODO(chriskirkland): pick up from here: +// +// need to figure out how to populate the applier connection config from +// the inspector connection config but using the "target" values from the CLI +// args. + // ApplyCredentials sorts out the credentials between the config file and the CLI flags func (mctx *MigrationContext) ApplyCredentials() { + //TODO(chriskirkland): make this work for move-tables mctx.configMutex.Lock() defer mctx.configMutex.Unlock() @@ -919,9 +961,23 @@ func (mctx *MigrationContext) ApplyCredentials() { // Override mctx.InspectorConnectionConfig.Password = mctx.CliPassword } + + if mctx.IsMoveTablesMode() { + // apply credentials for the applier from target CLI args + if mctx.MoveTables.ConnectionConfig == nil { + mctx.MoveTables.ConnectionConfig = &mysql.ConnectionConfig{} + } + mctx.MoveTables.ConnectionConfig.User = mctx.MoveTables.TargetUser + mctx.MoveTables.ConnectionConfig.Password = mctx.MoveTables.TargetPass + mctx.MoveTables.ConnectionConfig.Key = mysql.InstanceKey{ + Hostname: mctx.MoveTables.TargetHost, + Port: mctx.MoveTables.TargetPort, + } + } } func (mctx *MigrationContext) SetupTLS() error { + //TODO(chriskirkland): make this work for move-tables? if mctx.UseTLS { return mctx.InspectorConnectionConfig.UseTLS(mctx.TLSCACertificate, mctx.TLSCertificate, mctx.TLSKey, mctx.TLSAllowInsecure) } @@ -1029,6 +1085,11 @@ func (mctx *MigrationContext) CancelContext() { } } +// IsMoveTablesMode returns true if gh-ost should be used for moving tables instead of running a schema migration. +func (mctx *MigrationContext) IsMoveTablesMode() bool { + return len(mctx.MoveTables.TableNames) > 0 +} + // SendWithContext attempts to send a value to a channel, but returns early // if the context is cancelled. This prevents goroutine deadlocks when the // channel receiver has exited due to an error. diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 567137fd5..9b476cb93 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -12,10 +12,12 @@ import ( "os" "os/signal" "regexp" + "strings" "syscall" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/logic" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" _ "github.com/go-sql-driver/mysql" "github.com/openark/golib/log" @@ -164,8 +166,16 @@ func main() { version := flag.Bool("version", false, "Print version & exit") checkFlag := flag.Bool("check-flag", false, "Check if another flag exists/supported. This allows for cross-version scripting. Exits with 0 when all additional provided flags exist, nonzero otherwise. You must provide (dummy) values for flags that require a value. Example: gh-ost --check-flag --cut-over-lock-timeout-seconds --nice-ratio 0") flag.StringVar(&migrationContext.ForceTmpTableName, "force-table-names", "", "table name prefix to be used on the temporary tables") - flag.CommandLine.SetOutput(os.Stdout) + // move tables flags + moveTables := flag.String("move-tables", "", "Comma delimited list of tables to move. e.g. 'table1,table2,table3'. This is a special mode that allows you to move tables between database clusters. This mode is mutually exclusive with --alter, --table, --test-on-replica, --migrate-on-replica, --execute-on-replica, and --revert.") + flag.StringVar(&migrationContext.MoveTables.TargetHost, "target-host", "", "Target MySQL hostname for --move-tables mode. Must be specified if --move-tables is specified.") + flag.IntVar(&migrationContext.MoveTables.TargetPort, "target-port", 3306, "Target MySQL port for --move-tables mode. Defaults to 3306.") + flag.StringVar(&migrationContext.MoveTables.TargetUser, "target-user", "", "Target MySQL username for --move-tables mode. If not provided, uses the same user as the source connection") + flag.StringVar(&migrationContext.MoveTables.TargetPass, "target-password", "", "Target MySQL password for --move-tables mode. If not provided, uses the same password as the source connection") + flag.StringVar(&migrationContext.MoveTables.TargetDatabase, "target-database", "", "Target MySQL database name for --move-tables mode. If not provided, uses the same database name as the source connection") + + flag.CommandLine.SetOutput(os.Stdout) flag.Parse() if *checkFlag { @@ -203,13 +213,7 @@ func main() { migrationContext.Log.SetLevel(log.ERROR) } - if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil { - migrationContext.Log.Fatale(err) - } - - migrationContext.SetConnectionCharset(*charset) - - if migrationContext.AlterStatement == "" && !migrationContext.Revert { + if migrationContext.AlterStatement == "" && !migrationContext.Revert && *moveTables == "" { log.Fatal("--alter must be provided and statement must not be empty") } parser := sql.NewParserFromAlterStatement(migrationContext.AlterStatement) @@ -250,7 +254,7 @@ func main() { migrationContext.Log.Fatale(err) } - if migrationContext.OriginalTableName == "" { + if migrationContext.OriginalTableName == "" && *moveTables == "" { if parser.HasExplicitTable() { migrationContext.OriginalTableName = parser.GetExplicitTable() } else { @@ -320,6 +324,40 @@ func main() { migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection") } + if *moveTables != "" { + if migrationContext.AlterStatement != "" { + log.Fatal("--move-tables is mutually exclusive with --alter") + } + if migrationContext.OriginalTableName != "" { + log.Fatal("--move-tables is mutually exclusive with --table") + } + if migrationContext.TestOnReplica { + log.Fatal("--move-tables is mutually exclusive with --test-on-replica") + } + if migrationContext.MigrateOnReplica { + log.Fatal("--move-tables is mutually exclusive with --migrate-on-replica") + } + if migrationContext.Revert { + log.Fatal("--move-tables is mutually exclusive with --revert") + } + if migrationContext.MoveTables.TargetHost == "" { + log.Fatal("--target-host must be specified when using --move-tables") + } + migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",") + if len(migrationContext.MoveTables.TableNames) > 1 { + // Future version will support moving multiple tables at the same time. + // For now, we only support moving a single table at a time. + log.Fatal("--move-tables currently supports only a single table") + } + migrationContext.MoveTables.ConnectionConfig = mysql.NewConnectionConfig() + } + + if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil { + migrationContext.Log.Fatale(err) + } + + migrationContext.SetConnectionCharset(*charset) + switch *cutOver { case "atomic", "default", "": migrationContext.CutOverType = base.CutOverAtomic @@ -379,6 +417,8 @@ func main() { var err error if migrationContext.Revert { err = migrator.Revert() + } else if migrationContext.IsMoveTablesMode() { + err = migrator.MoveTables() } else { err = migrator.Migrate() } diff --git a/go/logic/applier.go b/go/logic/applier.go index b49e131b8..db5b34e09 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -83,6 +83,12 @@ type Applier struct { dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder + + moveTablesTargetDB *gosql.DB + moveTablesConnectionConfig *mysql.ConnectionConfig + moveTablesCopySelectFirstQueryBuilder *sql.MoveTableCopySelectQueryBuilder + moveTablesCopySelectNextQueryBuilder *sql.MoveTableCopySelectQueryBuilder + moveTablesCopyInsertQueryBuilder *sql.MoveTableCopyInsertQueryBuilder } func NewApplier(migrationContext *base.MigrationContext) *Applier { @@ -91,6 +97,8 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { migrationContext: migrationContext, finishedMigrating: 0, name: "applier", + + moveTablesConnectionConfig: migrationContext.MoveTables.ConnectionConfig, } } @@ -99,7 +107,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { // hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid // regex syntax errors. func (apl *Applier) compileMigrationKeyWarningRegex() (*regexp.Regexp, error) { - escapedTable := regexp.QuoteMeta(apl.migrationContext.GetGhostTableName()) + escapedTable := regexp.QuoteMeta(apl.migrationContext.GetTargetTableName()) escapedKey := regexp.QuoteMeta(apl.migrationContext.UniqueKey.NameInGhostTable) migrationUniqueKeyPattern := fmt.Sprintf(`for key '(%s\.)?%s'`, escapedTable, escapedKey) migrationKeyRegex, err := regexp.Compile(migrationUniqueKeyPattern) @@ -138,25 +146,46 @@ func (apl *Applier) InitDBConnections() (err error) { apl.connectionConfig.ImpliedKey = impliedKey } } - if err := apl.readTableColumns(); err != nil { - return err + if apl.migrationContext.IsMoveTablesMode() { + //TODO(chriskirkland): do we need this? + + // seed target columns based on the original table on source + apl.migrationContext.OriginalTableColumnsOnApplier = apl.migrationContext.OriginalTableColumns + } else { + // read target table columns from applier + if err := apl.readTableColumns(); err != nil { + return err + } + } + if apl.moveTablesConnectionConfig != nil { + moveTablesURI := apl.moveTablesConnectionConfig.GetDBUri(apl.migrationContext.MoveTables.TargetDatabase) + "&multiStatements=true" + if apl.moveTablesTargetDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, moveTablesURI); err != nil { + return err + } + if _, err := base.ValidateConnection(apl.moveTablesTargetDB, apl.moveTablesConnectionConfig, apl.migrationContext, apl.name); err != nil { + return err + } } apl.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", apl.connectionConfig.ImpliedKey, apl.migrationContext.ApplierMySQLVersion) return nil } +// NOTE(chriskirkland): this is totally done, thanks @danieljoos func (apl *Applier) prepareQueries() (err error) { + targetDatabaseName := apl.migrationContext.GetTargetDatabaseName() + targetTableName := apl.migrationContext.GetTargetTableName() + if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder( - apl.migrationContext.DatabaseName, - apl.migrationContext.GetGhostTableName(), + targetDatabaseName, + targetTableName, apl.migrationContext.OriginalTableColumns, &apl.migrationContext.UniqueKey.Columns, ); err != nil { return err } if apl.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder( - apl.migrationContext.DatabaseName, - apl.migrationContext.GetGhostTableName(), + targetDatabaseName, + targetTableName, apl.migrationContext.OriginalTableColumns, apl.migrationContext.SharedColumns, apl.migrationContext.MappedSharedColumns, @@ -164,8 +193,8 @@ func (apl *Applier) prepareQueries() (err error) { return err } if apl.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder( - apl.migrationContext.DatabaseName, - apl.migrationContext.GetGhostTableName(), + targetDatabaseName, + targetTableName, apl.migrationContext.OriginalTableColumns, apl.migrationContext.SharedColumns, apl.migrationContext.MappedSharedColumns, @@ -182,6 +211,36 @@ func (apl *Applier) prepareQueries() (err error) { return err } } + if apl.migrationContext.IsMoveTablesMode() { + apl.migrationContext.Log.Debugf("Building CopySelect queries for move-tables") + if apl.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( + apl.migrationContext.DatabaseName, + apl.originalTableName(), + apl.migrationContext.OriginalTableColumns, + apl.migrationContext.UniqueKey.Name, + &apl.migrationContext.UniqueKey.Columns, + true, // <-- include start range values for first select query + ); err != nil { + return err + } + if apl.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( + apl.migrationContext.DatabaseName, + apl.originalTableName(), + apl.migrationContext.OriginalTableColumns, + apl.migrationContext.UniqueKey.Name, + &apl.migrationContext.UniqueKey.Columns, + false, + ); err != nil { + return err + } + if apl.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder( + targetDatabaseName, + targetTableName, + apl.migrationContext.OriginalTableColumns, + ); err != nil { + return err + } + } return nil } @@ -221,7 +280,7 @@ func (apl *Applier) generateSqlModeQuery() string { func (apl *Applier) generateInstantDDLQuery() string { return fmt.Sprintf(`ALTER /* gh-ost */ TABLE %s.%s %s, ALGORITHM=INSTANT`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), apl.migrationContext.AlterStatementOptions, ) } @@ -229,7 +288,7 @@ func (apl *Applier) generateInstantDDLQuery() string { // readTableColumns reads table columns on applier func (apl *Applier) readTableColumns() (err error) { apl.migrationContext.Log.Infof("Examining table structure on applier") - apl.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(apl.db, apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName) + apl.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(apl.db, apl.migrationContext.DatabaseName, apl.originalTableName()) if err != nil { return err } @@ -252,6 +311,13 @@ func (apl *Applier) tableExists(tableName string) (tableFound bool) { return (m != nil) } +func (apl *Applier) originalTableName() string { + if apl.migrationContext.IsMoveTablesMode() { + return apl.migrationContext.MoveTables.TableNames[0] + } + return apl.migrationContext.OriginalTableName +} + // ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, // or attempts to drop them if instructed to. func (apl *Applier) ValidateOrDropExistingTables() error { @@ -333,17 +399,22 @@ func retryOnLockWaitTimeout(operation func() error, maxRetries int64, logger bas return err } -// CreateGhostTable creates the ghost table on the applier host -func (apl *Applier) CreateGhostTable() error { +// createTargetTable creates the table on the applier host to which the applier will +// apply changes. +func (apl *Applier) createTargetTable(targetTableName string) error { + targetDatabase := apl.migrationContext.DatabaseName + if apl.migrationContext.IsMoveTablesMode() { + targetDatabase = apl.migrationContext.MoveTables.TargetDatabase + } query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`, - sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.GetGhostTableName()), + sql.EscapeName(targetDatabase), + sql.EscapeName(targetTableName), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.OriginalTableName), ) - apl.migrationContext.Log.Infof("Creating ghost table %s.%s", - sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.GetGhostTableName()), + apl.migrationContext.Log.Infof("Creating target table %s.%s", + sql.EscapeName(targetDatabase), + sql.EscapeName(targetTableName), ) err := func() error { @@ -362,7 +433,49 @@ func (apl *Applier) CreateGhostTable() error { if _, err := tx.Exec(query); err != nil { return err } - apl.migrationContext.Log.Infof("Ghost table created") + apl.migrationContext.Log.Infof("Target table created") + if err := tx.Commit(); err != nil { + // Neither SET SESSION nor ALTER are really transactional, so strictly speaking + // there's no need to commit; but let's do this the legit way anyway. + return err + } + return nil + }() + + return err +} + +// createTargetTableFromStatement creates the table on the applier host to which the applier will +// apply changes. +func (apl *Applier) createTargetTableFromStatement(targetTableName, createStatement string) error { + //TODO(chriskirkland): how to inject the targetTableName in place of the original table name? + + targetDatabase := apl.migrationContext.DatabaseName + if apl.migrationContext.IsMoveTablesMode() { + targetDatabase = apl.migrationContext.MoveTables.TargetDatabase + } + apl.migrationContext.Log.Infof("Creating target table %s.%s", + sql.EscapeName(targetDatabase), + sql.EscapeName(targetTableName), + ) + + err := func() error { + tx, err := apl.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, apl.migrationContext.ApplierTimeZone) + sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery()) + + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + if _, err := tx.Exec(createStatement); err != nil { + return err + } + apl.migrationContext.Log.Infof("Target table created") if err := tx.Commit(); err != nil { // Neither SET SESSION nor ALTER are really transactional, so strictly speaking // there's no need to commit; but let's do this the legit way anyway. @@ -374,6 +487,19 @@ func (apl *Applier) CreateGhostTable() error { return err } +// CreateGhostTable creates the ghost table on the applier host +func (apl *Applier) CreateGhostTable() error { + return apl.createTargetTable(apl.migrationContext.GetGhostTableName()) +} + +// CreateTargetTable creates the ghost table on the applier host +func (apl *Applier) CreateTargetTable(createStatement string) error { + if !apl.migrationContext.IsMoveTablesMode() { + return errors.New("CreateTargetTable is only available in MoveTables mode") + } + return apl.createTargetTableFromStatement(apl.originalTableName(), createStatement) +} + // AlterGhost applies `alter` statement on ghost table func (apl *Applier) AlterGhost() error { query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s %s`, @@ -597,7 +723,7 @@ func (apl *Applier) createTriggers(tableName string) error { // CreateTriggers creates the original triggers on applier host func (apl *Applier) CreateTriggersOnGhost() error { err := apl.createTriggers(apl.migrationContext.GetGhostTableName()) - return err + return fmt.Errorf("error creating triggers on ghost table: %w", err) } // DropChangelogTable drops the changelog table on the applier host @@ -617,12 +743,22 @@ func (apl *Applier) DropOldTable() error { // DropGhostTable drops the ghost table on the applier host func (apl *Applier) DropGhostTable() error { - return apl.dropTable(apl.migrationContext.GetGhostTableName()) + if err := apl.dropTable(apl.migrationContext.GetGhostTableName()); err != nil { + return fmt.Errorf("error dropping ghost table: %w", err) + } + return nil } // WriteChangelog writes a value to the changelog table. -// It returns the hint as given, for convenience +// It returns the hint (or an empty string in move-tables mode), for convenience func (apl *Applier) WriteChangelog(hint, value string) (string, error) { + // In move-tables mode, there is no changelog table (ยง1.2). All changelog + // writes are no-ops. This is a single chokepoint rather than per-caller + // guards to prevent drift when new callers are added. + if apl.migrationContext.IsMoveTablesMode() { + return "", nil + } + explicitId := 0 switch hint { case "heartbeat": @@ -783,11 +919,11 @@ func (apl *Applier) ExecuteThrottleQuery() (int64, error) { // readMigrationMinValues returns the minimum values to be iterated on rowcopy func (apl *Applier) readMigrationMinValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error { - apl.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) - query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, uniqueKey) + query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.originalTableName(), uniqueKey) if err != nil { return err } + apl.migrationContext.Log.Infof("Reading migration range according to key: %s (%s)", uniqueKey.Name, query) rows, err := tx.Query(query) if err != nil { @@ -809,7 +945,7 @@ func (apl *Applier) readMigrationMinValues(tx *gosql.Tx, uniqueKey *sql.UniqueKe // readMigrationMaxValues returns the maximum values to be iterated on rowcopy func (apl *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error { apl.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) - query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, uniqueKey) + query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(apl.migrationContext.DatabaseName, apl.originalTableName(), uniqueKey) if err != nil { return err } @@ -848,12 +984,17 @@ Detail description of the lost data in mysql two-phase commit issue by @Fanduzi: will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the newly inserted data, thus Avoiding data loss due to the above problem. */ -func (apl *Applier) ReadMigrationRangeValues() error { +func (apl *Applier) ReadMigrationRangeValues(db *gosql.DB) error { if _, err := apl.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil { return err } - tx, err := apl.db.Begin() + if db == nil { + // default to reading from applier database + db = apl.db + } + + tx, err := db.Begin() if err != nil { return err } @@ -873,7 +1014,7 @@ func (apl *Applier) ReadMigrationRangeValues() error { // which will be used for copying the next chunk of rows. Ir returns "false" if there is // no further chunk to work through, i.e. we're past the last chunk and are done with // iterating the range (and thus done with copying row chunks) -func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { +func (apl *Applier) CalculateNextIterationRangeEndValues(db *gosql.DB) (hasFurtherRange bool, err error) { for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -881,7 +1022,7 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool } query, explodedArgs, err := buildFunc( apl.migrationContext.DatabaseName, - apl.migrationContext.OriginalTableName, + apl.originalTableName(), &apl.migrationContext.UniqueKey.Columns, apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), apl.migrationContext.MigrationRangeMaxValues.AbstractValues(), @@ -893,7 +1034,12 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool return hasFurtherRange, err } - rows, err := apl.db.Query(query, explodedArgs...) + if db == nil { + // default to applier database if not provided + db = apl.db + } + + rows, err := db.Query(query, explodedArgs...) if err != nil { return hasFurtherRange, err } @@ -926,7 +1072,7 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( apl.migrationContext.DatabaseName, - apl.migrationContext.OriginalTableName, + apl.originalTableName(), apl.migrationContext.GetGhostTableName(), apl.migrationContext.SharedColumns.Names(), apl.migrationContext.MappedSharedColumns.Names(), @@ -1013,15 +1159,137 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i return chunkSize, rowsAffected, duration, nil } +// ApplyIterationMoveTableCopyQueries issues a SELECT query on the original table and an INSERT query on the target table, +// copying a chunk of rows. It is used when `--move-table` is specified, instead of ApplyIterationInsertQuery. +func (apl *Applier) ApplyIterationMoveTableCopyQueries(sourceDB *gosql.DB) (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { + startTime := time.Now() + chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize) + + // First, select data from the source database: + rows, err := func() ([]*sql.ColumnValues, error) { + var qb *sql.MoveTableCopySelectQueryBuilder + apl.migrationContext.Log.Debugf("Building SELECT query for move-tables; first: %v; rest: %v", + apl.moveTablesCopySelectFirstQueryBuilder, + apl.moveTablesCopySelectNextQueryBuilder) + + if apl.migrationContext.GetIteration() == 0 { + qb = apl.moveTablesCopySelectFirstQueryBuilder + } else { + qb = apl.moveTablesCopySelectNextQueryBuilder + } + query, explodedArgs, err := qb.BuildQuery( + apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), + apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), + ) + if err != nil { + return nil, err + } + sqlRows, err := sourceDB.Query(query, explodedArgs...) + if err != nil { + return nil, err + } + defer sqlRows.Close() + chunkRows := make([]*sql.ColumnValues, 0, chunkSize) + for sqlRows.Next() { + row := sql.NewColumnValues(apl.migrationContext.SharedColumns.Len()) + err := sqlRows.Scan(row.ValuesPointers...) + if err != nil { + return nil, err + } + chunkRows = append(chunkRows, row) + } + if err := sqlRows.Err(); err != nil { + return nil, err + } + return chunkRows, nil + }() + if err != nil { + return chunkSize, rowsAffected, duration, err + } + + // Then, insert data into the destination database: + sqlResult, err := func() (gosql.Result, error) { + query, explodedArgs, err := apl.moveTablesCopyInsertQueryBuilder.BuildQuery(rows) + if err != nil { + return nil, err + } + tx, err := apl.moveTablesTargetDB.Begin() + if err != nil { + return nil, err + } + defer tx.Rollback() + + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s', %s`, + apl.migrationContext.ApplierTimeZone, + apl.generateSqlModeQuery()) + if _, err := tx.Exec(sessionQuery); err != nil { + return nil, err + } + + sqlResult, err := tx.Exec(query, explodedArgs...) + if err != nil { + return nil, err + } + + if apl.migrationContext.PanicOnWarnings { + rows, err := tx.Query("SHOW WARNINGS") + if err != nil { + return nil, err + } + defer rows.Close() + if err = rows.Err(); err != nil { + return nil, err + } + migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() + if err != nil { + return nil, err + } + var sqlWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") + continue + } + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { + continue + } + sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings + } + + if err := tx.Commit(); err != nil { + return nil, err + } + return sqlResult, nil + }() + if err != nil { + return chunkSize, rowsAffected, duration, err + } + rowsAffected, _ = sqlResult.RowsAffected() + duration = time.Since(startTime) + apl.migrationContext.Log.Debugf( + "Issued SELECT+INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", + apl.migrationContext.MigrationIterationRangeMinValues, + apl.migrationContext.MigrationIterationRangeMaxValues, + apl.migrationContext.GetIteration(), + chunkSize, + ) + + return chunkSize, rowsAffected, duration, nil +} + // LockOriginalTable places a write lock on the original table func (apl *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Locking %s.%s", sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.LockTablesStartTime = time.Now() if _, err := sqlutils.ExecNoPrepare(apl.singletonDB, query); err != nil { @@ -1047,9 +1315,10 @@ func (apl *Applier) UnlockTables() error { // - rename ghost table to original // There is a point in time in between where the table does not exist. func (apl *Applier) SwapTablesQuickAndBumpy() error { + // TODO(chriskirkland): audit usage in move-tables query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) apl.migrationContext.Log.Infof("Renaming original table") @@ -1060,9 +1329,9 @@ func (apl *Applier) SwapTablesQuickAndBumpy() error { query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) - apl.migrationContext.Log.Infof("Renaming ghost table") + apl.migrationContext.Log.Infof("Renaming target table") if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { return err } @@ -1075,17 +1344,18 @@ func (apl *Applier) SwapTablesQuickAndBumpy() error { // RenameTablesRollback renames back both table: original back to ghost, // _old back to original. This is used by `--test-on-replica` func (apl *Applier) RenameTablesRollback() (renameError error) { + // TODO(chriskirkland): audit usage in move-tables // Restoring tables to original names. // We prefer the single, atomic operation: query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Renaming back both tables") if _, err := sqlutils.ExecNoPrepare(apl.db, query); err == nil { @@ -1094,7 +1364,7 @@ func (apl *Applier) RenameTablesRollback() (renameError error) { // But, if for some reason the above was impossible to do, we rename one by one. query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), ) @@ -1106,7 +1376,7 @@ func (apl *Applier) RenameTablesRollback() (renameError error) { sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Renaming back to original table") if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { @@ -1355,13 +1625,13 @@ func (apl *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) apl.migrationContext.Log.Infof("Locking %s.%s, %s.%s", sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) @@ -1411,7 +1681,7 @@ func (apl *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked // Tables still locked apl.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s", sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), ) @@ -1427,6 +1697,7 @@ func (apl *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked // AtomicCutoverRename func (apl *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error { + // TODO(chriskirkland): audit usage in move-tables tx, err := apl.db.Begin() if err != nil { return err @@ -1450,13 +1721,13 @@ func (apl *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`, sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetOldTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetGhostTableName()), sql.EscapeName(apl.migrationContext.DatabaseName), - sql.EscapeName(apl.migrationContext.OriginalTableName), + sql.EscapeName(apl.originalTableName()), ) apl.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query) if _, err := tx.Exec(query); err != nil { @@ -1632,7 +1903,11 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e ctx := context.Background() err := func() error { - conn, err := apl.db.Conn(ctx) + db := apl.db + if apl.migrationContext.IsMoveTablesMode() { + db = apl.moveTablesTargetDB + } + conn, err := db.Conn(ctx) if err != nil { return err } @@ -1736,6 +2011,9 @@ func (apl *Applier) Teardown() { apl.migrationContext.Log.Debugf("Tearing down...") apl.db.Close() apl.singletonDB.Close() + if apl.moveTablesTargetDB != nil { + apl.moveTablesTargetDB.Close() + } atomic.StoreInt64(&apl.finishedMigrating, 1) } @@ -1752,12 +2030,12 @@ func (apl *Applier) ExpectMetadataLock(sessionId int64) error { err := sqlutils.QueryRowsMap(apl.db, query, func(m sqlutils.RowMap) error { found = true return nil - }, apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, sessionId) + }, apl.migrationContext.DatabaseName, apl.originalTableName(), sessionId) if err != nil { return err } if !found { - err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName) + err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", apl.migrationContext.DatabaseName, apl.originalTableName()) return apl.migrationContext.Log.Errore(err) } return nil diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 6d7ba42f4..6d1474ceb 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -9,6 +9,7 @@ import ( "context" gosql "database/sql" "errors" + "net" "strings" "testing" "time" @@ -271,6 +272,7 @@ type ApplierTestSuite struct { mysqlContainer testcontainers.Container db *gosql.DB + otherDB *gosql.DB } func (suite *ApplierTestSuite) SetupSuite() { @@ -291,12 +293,29 @@ func (suite *ApplierTestSuite) SetupSuite() { db, err := gosql.Open("mysql", dsn) suite.Require().NoError(err) - suite.db = db + + containerHost, err := mysqlContainer.Host(ctx) + suite.Require().NoError(err) + containerPort, err := mysqlContainer.MappedPort(ctx, "3306/tcp") + suite.Require().NoError(err) + + // Second database & connection for move-tables tests: + _, _ = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testMysqlDatabaseOther)) + otherConf := drivermysql.NewConfig() + otherConf.DBName = testMysqlDatabaseOther + otherConf.User = testMysqlUser + otherConf.Passwd = testMysqlPass + otherConf.Net = "tcp" + otherConf.Addr = net.JoinHostPort(containerHost, containerPort.Port()) + otherDB, err := gosql.Open("mysql", otherConf.FormatDSN()) + suite.Require().NoError(err) + suite.otherDB = otherDB } func (suite *ApplierTestSuite) TeardownSuite() { suite.Assert().NoError(suite.db.Close()) + suite.Assert().NoError(suite.otherDB.Close()) suite.Assert().NoError(testcontainers.TerminateContainer(suite.mysqlContainer)) } @@ -313,6 +332,10 @@ func (suite *ApplierTestSuite) TearDownTest() { suite.Require().NoError(err) _, err = suite.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestGhostTableName()) suite.Require().NoError(err) + _, err = suite.otherDB.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestOtherTableName()) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`_%s_ghc`", testMysqlDatabase, testMysqlTableName)) + suite.Require().NoError(err) } func (suite *ApplierTestSuite) TestInitDBConnections() { @@ -344,6 +367,41 @@ func (suite *ApplierTestSuite) TestInitDBConnections() { suite.Require().Equal(sql.NewColumnList([]string{"id", "item_id"}), migrationContext.OriginalTableColumnsOnApplier) } +func (suite *ApplierTestSuite) TestInitiateApplierMoveTablesMode_NoGhostOrChangelogTable() { + ctx := context.Background() + + var err error + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // #8206 [Task] [1.2] Skip ghost/changelog tables, heartbeat in gh-ost move-tables mode + // In move-tables mode, no ghost or changelog table should exist. + // InitDBConnections() should succeed without them. + suite.Require().False(applier.tableExists("_testing_gho"), "ghost table should not exist in move-tables mode") + suite.Require().False(applier.tableExists("_testing_ghc"), "changelog table should not exist in move-tables mode") + + //Verify move-tables mode seeds columns from the source table + suite.Require().Equal(sql.NewColumnList([]string{"id", "item_id"}), migrationContext.OriginalTableColumnsOnApplier) +} + func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { ctx := context.Background() @@ -409,6 +467,53 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { suite.Require().Equal(int64(0), migrationContext.RowsDeltaEstimate) } +// finalCleanup() requires a fully wired migrator to call directly. +// This test verifies the IsMoveTablesMode() predicate that gates the early return. +// Full behavioral coverage relies on the suite: no ghost/changelog tables are +// created (Test #1), and WriteChangelog is a no-op (Test #2). +func (suite *ApplierTestSuite) TestFinalCleanupMoveTablesMode_SkipsDrops() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) +} + +// initiateStreaming() requires a binlog-capable MySQL connection to call directly. +// This test verifies IsMoveTablesMode() and that GetChangelogTableName() returns +// a derivable name. A new streamer always starts with zero listeners; the real +// proof that no changelog listener is registered comes from the full run not +// failing on a nonexistent _ghc table. +func (suite *ApplierTestSuite) TestInitiateStreamingMoveTablesMode_NoChangelogListener() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) + + changelogTableName := migrationContext.GetChangelogTableName() + suite.Require().NotEmpty(changelogTableName, "changelog table name should be derivable") + + streamer := NewEventsStreamer(migrationContext) + suite.Require().Empty(streamer.listeners, "new streamer should have no listeners") +} + +// initiateApplier() requires a full migrator to call directly. +// This test verifies the IsMoveTablesMode() predicate that gates InitiateHeartbeat(). +// Even if heartbeat ran, TestWriteChangelogNoOpInMoveTablesMode proves WriteChangelog +// is a no-op, so no SQL would execute against a nonexistent changelog table. +// +// A stronger test would instrument InitiateHeartbeat() (e.g., via a callback or +// channel) to assert the goroutine is never started. That requires test-infrastructure +// changes to the Applier and is beyond #8206's scope. +func (suite *ApplierTestSuite) TestNoHeartbeatInMoveTablesMode() { + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + suite.Require().True(migrationContext.IsMoveTablesMode()) +} + func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() { ctx := context.Background() @@ -507,6 +612,43 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi suite.Require().Equal(gosql.ErrNoRows, err) } +func (suite *ApplierTestSuite) TestWriteChangelogNoOpInMoveTablesMode() { + ctx := context.Background() + + var err error + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // #8206 [Task] [1.2] Skip ghost/changelog tables, heartbeat in gh-ost move-tables mode + // WriteChangelog should be a no-op in move-tables mode. + // No changelog table exists, so if it tried to execute, it would fail. + hint, err := applier.WriteChangelog("heartbeat", "2026-06-05T00:00:00Z") + suite.Require().NoError(err) + suite.Require().Empty(hint) + + // Also verify state writes are no-ops + hint, err = applier.WriteChangelogState("Migrated") + suite.Require().NoError(err) + suite.Require().Equal("", hint) +} + func (suite *ApplierTestSuite) TestCreateGhostTable() { ctx := context.Background() @@ -602,11 +744,11 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -679,14 +821,14 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai err = applier.CreateChangelogTable() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) err = applier.AlterGhost() suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) suite.Require().NoError(err) suite.Require().True(hasFurtherRange) @@ -752,7 +894,7 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { err = applier.prepareQueries() suite.Require().NoError(err) - err = applier.ReadMigrationRangeValues() + err = applier.ReadMigrationRangeValues(nil) suite.Require().NoError(err) // checkpoint table is empty @@ -1542,6 +1684,155 @@ func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() { // Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back } +func (suite *ApplierTestSuite) TestApplyDMLEventQueriesMoveTablesMode() { + ctx := context.Background() + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.otherDB.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestOtherTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "primary_key", + Columns: *sql.NewColumnList([]string{"id"}), + } + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}), + }, + } + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().NoError(err) + + // Check that the row was inserted into the ghost table via moveTablesTargetDB + rows, err := suite.otherDB.Query("SELECT * FROM " + getTestOtherTableName()) + suite.Require().NoError(err) + defer rows.Close() + + var count, id, item_id int + for rows.Next() { + err = rows.Scan(&id, &item_id) + suite.Require().NoError(err) + count += 1 + } + suite.Require().NoError(rows.Err()) + + suite.Require().Equal(1, count) + suite.Require().Equal(123456, id) + suite.Require().Equal(42, item_id) + + suite.Require().Equal(int64(1), migrationContext.TotalDMLEventsApplied) + suite.Require().Equal(int64(0), migrationContext.RowsDeltaEstimate) +} + +func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { + ctx := context.Background() + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT NOT NULL, name VARCHAR(50), created_at DATETIME NOT NULL, PRIMARY KEY(id));", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.otherDB.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT NOT NULL, name VARCHAR(50), created_at DATETIME NOT NULL, PRIMARY KEY(id));", getTestOtherTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, name, created_at) VALUES (1, 'alice', '2024-01-15 10:30:00'), (2, 'bob', '2024-06-20 14:45:00'), (3, 'carol', '2025-12-31 23:59:59');", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + applier := NewApplier(migrationContext) + applier.prepareQueries() + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + err = applier.CreateChangelogTable() + suite.Require().NoError(err) + + err = applier.ReadMigrationRangeValues(nil) + suite.Require().NoError(err) + + migrationContext.SetNextIterationRangeMinValues() + hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues(nil) + suite.Require().NoError(err) + suite.Require().True(hasFurtherRange) + + chunkSize, rowsAffected, duration, err := applier.ApplyIterationMoveTableCopyQueries(suite.db) + suite.Require().NoError(err) + suite.Require().Equal(int64(3), rowsAffected) + suite.Require().Equal(int64(1000), chunkSize) + suite.Require().Greater(duration, time.Duration(0)) + + // Verify rows were copied to the other table + rows, err := suite.otherDB.QueryContext(ctx, "SELECT id, name, created_at FROM "+getTestOtherTableName()+" ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + type row struct { + id int + name string + createdAt string + } + var results []row + for rows.Next() { + var r row + err = rows.Scan(&r.id, &r.name, &r.createdAt) + suite.Require().NoError(err) + results = append(results, r) + } + suite.Require().NoError(rows.Err()) + + suite.Require().Len(results, 3) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice", results[0].name) + suite.Require().Equal("2024-01-15 10:30:00", results[0].createdAt) + suite.Require().Equal(2, results[1].id) + suite.Require().Equal("bob", results[1].name) + suite.Require().Equal("2024-06-20 14:45:00", results[1].createdAt) + suite.Require().Equal(3, results[2].id) + suite.Require().Equal("carol", results[2].name) + suite.Require().Equal("2025-12-31 23:59:59", results[2].createdAt) +} + func TestApplier(t *testing.T) { if testing.Short() { t.Skip("skipping applier test suite in short mode") diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 96aadd672..08846656b 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -86,18 +86,20 @@ func (isp *Inspector) InitDBConnections() (err error) { } func (isp *Inspector) ValidateOriginalTable() (err error) { + isp.migrationContext.Log.Infof("Inspector validating original table") if err := isp.validateTable(); err != nil { return err } if err := isp.validateTableForeignKeys(isp.migrationContext.DiscardForeignKeys); err != nil { - return err + return fmt.Errorf("failed to validate table foreign keys: %w", err) } if err := isp.validateTableTriggers(); err != nil { - return err + return fmt.Errorf("failed to validate table triggers: %w", err) } if err := isp.estimateTableRowsViaExplain(); err != nil { - return err + return fmt.Errorf("failed to estimate table rows: %w", err) } + isp.migrationContext.Log.Infof("Inspector validated original table") return nil } @@ -118,17 +120,25 @@ func (isp *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (column } func (isp *Inspector) InspectOriginalTable() (err error) { - isp.migrationContext.OriginalTableColumns, isp.migrationContext.OriginalTableVirtualColumns, isp.migrationContext.OriginalTableUniqueKeys, err = isp.InspectTableColumnsAndUniqueKeys(isp.migrationContext.OriginalTableName) + isp.migrationContext.OriginalTableColumns, isp.migrationContext.OriginalTableVirtualColumns, isp.migrationContext.OriginalTableUniqueKeys, err = isp.InspectTableColumnsAndUniqueKeys(isp.originalTableName()) if err != nil { return err } - isp.migrationContext.OriginalTableAutoIncrement, err = isp.getAutoIncrementValue(isp.migrationContext.OriginalTableName) + isp.migrationContext.OriginalTableAutoIncrement, err = isp.getAutoIncrementValue(isp.originalTableName()) if err != nil { return err } + isp.migrationContext.Log.Infof("Inspector inspected original table") return nil } +func (isp *Inspector) originalTableName() string { + if isp.migrationContext.IsMoveTablesMode() { + return isp.migrationContext.MoveTables.TableNames[0] + } + return isp.migrationContext.OriginalTableName +} + // inspectOriginalAndGhostTables compares original and ghost tables to see whether the migration // makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { @@ -143,30 +153,7 @@ func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { return err } sharedUniqueKeys := isp.getSharedUniqueKeys(isp.migrationContext.OriginalTableUniqueKeys, isp.migrationContext.GhostTableUniqueKeys) - for i, sharedUniqueKey := range sharedUniqueKeys { - isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.migrationContext.OriginalTableName, &sharedUniqueKey.Columns) - uniqueKeyIsValid := true - for _, column := range sharedUniqueKey.Columns.Columns() { - switch column.Type { - case sql.FloatColumnType: - { - isp.migrationContext.Log.Warningf("Will not use %+v as shared key due to FLOAT data type", sharedUniqueKey.Name) - uniqueKeyIsValid = false - } - case sql.JSONColumnType: - { - // Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code - // will remain in place to potentially handle the future case where JSON is supported in indexes. - isp.migrationContext.Log.Warningf("Will not use %+v as shared key due to JSON data type", sharedUniqueKey.Name) - uniqueKeyIsValid = false - } - } - } - if uniqueKeyIsValid { - isp.migrationContext.UniqueKey = sharedUniqueKeys[i] - break - } - } + isp.migrationContext.UniqueKey = isp.selectUniqueKey(sharedUniqueKeys) if isp.migrationContext.UniqueKey == nil { return fmt.Errorf("no shared unique key can be found after ALTER! Bailing out") } @@ -186,7 +173,7 @@ func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { // This additional step looks at which columns are unsigned. We could have merged this within // the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel // comfortable in doing this as a separate step. - isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.migrationContext.OriginalTableName, isp.migrationContext.OriginalTableColumns, isp.migrationContext.SharedColumns, &isp.migrationContext.UniqueKey.Columns) + isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.originalTableName(), isp.migrationContext.OriginalTableColumns, isp.migrationContext.SharedColumns, &isp.migrationContext.UniqueKey.Columns) isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.migrationContext.GetGhostTableName(), isp.migrationContext.GhostTableColumns, isp.migrationContext.MappedSharedColumns) for i := range isp.migrationContext.SharedColumns.Columns() { @@ -217,6 +204,33 @@ func (isp *Inspector) inspectOriginalAndGhostTables() (err error) { return nil } +func (isp *Inspector) selectUniqueKey(candidateKeys []*sql.UniqueKey) *sql.UniqueKey { + for i, candidateKey := range candidateKeys { + isp.applyColumnTypes(isp.migrationContext.DatabaseName, isp.originalTableName(), &candidateKey.Columns) + uniqueKeyIsValid := true + for _, column := range candidateKey.Columns.Columns() { + switch column.Type { + case sql.FloatColumnType: + { + isp.migrationContext.Log.Warningf("Will not use %+v as unique key due to FLOAT data type", candidateKey.Name) + uniqueKeyIsValid = false + } + case sql.JSONColumnType: + { + // Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code + // will remain in place to potentially handle the future case where JSON is supported in indexes. + isp.migrationContext.Log.Warningf("Will not use %+v as unique key due to JSON data type", candidateKey.Name) + uniqueKeyIsValid = false + } + } + } + if uniqueKeyIsValid { + return candidateKeys[i] + } + } + return nil +} + // validateConnection issues a simple can-connect to MySQL func (isp *Inspector) validateConnection() error { version, err := base.ValidateConnection(isp.db, isp.connectionConfig, isp.migrationContext, isp.name) @@ -471,7 +485,7 @@ func (isp *Inspector) validateLogSlaveUpdates() error { // validateTable makes sure the table we need to operate on actually exists func (isp *Inspector) validateTable() error { - query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(isp.migrationContext.DatabaseName), isp.migrationContext.OriginalTableName) + query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(isp.migrationContext.DatabaseName), isp.originalTableName()) tableFound := false err := sqlutils.QueryRowsMap(isp.db, query, func(rowMap sqlutils.RowMap) error { @@ -479,7 +493,7 @@ func (isp *Inspector) validateTable() error { isp.migrationContext.RowsEstimate = rowMap.GetInt64("Rows") isp.migrationContext.UsedRowsEstimateMethod = base.TableStatusRowsEstimate if rowMap.GetString("Comment") == "VIEW" { - return fmt.Errorf("%s.%s is a VIEW, not a real table. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return fmt.Errorf("%s.%s is a VIEW, not a real table. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } tableFound = true @@ -489,7 +503,7 @@ func (isp *Inspector) validateTable() error { return err } if !tableFound { - return isp.migrationContext.Log.Errorf("cannot find table %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("cannot find table %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Infof("Table found. Engine=%s", isp.migrationContext.TableEngine) isp.migrationContext.Log.Debugf("Estimated number of rows via STATUS: %d", isp.migrationContext.RowsEstimate) @@ -523,26 +537,26 @@ func (isp *Inspector) validateTableForeignKeys(allowChildForeignKeys bool) error return nil }, isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), ) if err != nil { return err } if numParentForeignKeys > 0 { - return isp.migrationContext.Log.Errorf("found %d parent-side foreign keys on %s.%s. Parent-side foreign keys are not supported. Bailing out", numParentForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("found %d parent-side foreign keys on %s.%s. Parent-side foreign keys are not supported. Bailing out", numParentForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } if numChildForeignKeys > 0 { if allowChildForeignKeys { isp.migrationContext.Log.Debugf("Foreign keys found and will be dropped, as per given --discard-foreign-keys flag") return nil } - return isp.migrationContext.Log.Errorf("found %d child-side foreign keys on %s.%s. Child-side foreign keys are not supported. Bailing out", numChildForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("found %d child-side foreign keys on %s.%s. Child-side foreign keys are not supported. Bailing out", numChildForeignKeys, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Debugf("Validated no foreign keys exist on table") return nil @@ -564,15 +578,15 @@ func (isp *Inspector) validateTableTriggers() error { return nil }, isp.migrationContext.DatabaseName, - isp.migrationContext.OriginalTableName, + isp.originalTableName(), ) if err != nil { return err } if numTriggers > 0 { if isp.migrationContext.IncludeTriggers { - isp.migrationContext.Log.Infof("Found %d triggers on %s.%s.", numTriggers, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) - isp.migrationContext.Triggers, err = mysql.GetTriggers(isp.db, isp.migrationContext.DatabaseName, isp.migrationContext.OriginalTableName) + isp.migrationContext.Log.Infof("Found %d triggers on %s.%s.", numTriggers, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) + isp.migrationContext.Triggers, err = mysql.GetTriggers(isp.db, isp.migrationContext.DatabaseName, isp.originalTableName()) if err != nil { return err } @@ -584,7 +598,7 @@ func (isp *Inspector) validateTableTriggers() error { } return nil } - return isp.migrationContext.Log.Errorf("found triggers on %s.%s. Tables with triggers are supported only when using \"include-triggers\" flag. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("found triggers on %s.%s. Tables with triggers are supported only when using \"include-triggers\" flag. Bailing out", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Debugf("Validated no triggers exist on table") return nil @@ -637,7 +651,7 @@ func (isp *Inspector) validateGhostTriggersLength() error { // estimateTableRowsViaExplain estimates number of rows on original table func (isp *Inspector) estimateTableRowsViaExplain() error { - query := fmt.Sprintf(`explain select /* gh-ost */ * from %s.%s where 1=1`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + query := fmt.Sprintf(`explain select /* gh-ost */ * from %s.%s where 1=1`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) outputFound := false err := sqlutils.QueryRowsMap(isp.db, query, func(rowMap sqlutils.RowMap) error { @@ -651,7 +665,7 @@ func (isp *Inspector) estimateTableRowsViaExplain() error { return err } if !outputFound { - return isp.migrationContext.Log.Errorf("cannot run EXPLAIN on %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + return isp.migrationContext.Log.Errorf("cannot run EXPLAIN on %s.%s!", sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) } isp.migrationContext.Log.Infof("Estimated number of rows via EXPLAIN: %d", isp.migrationContext.RowsEstimate) return nil @@ -675,7 +689,7 @@ func (isp *Inspector) CountTableRows(ctx context.Context) error { return err } - query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.migrationContext.OriginalTableName)) + query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(isp.migrationContext.DatabaseName), sql.EscapeName(isp.originalTableName())) var rowsEstimate int64 if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index aaacbdd2e..a59d8499a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -475,7 +475,7 @@ func (mgtr *Migrator) Migrate() (err error) { defer mgtr.teardown() if err := mgtr.initiateInspector(); err != nil { - return err + return fmt.Errorf("failed to initiate inspector: %w", err) } if err := mgtr.checkAbort(); err != nil { return err @@ -584,7 +584,7 @@ func (mgtr *Migrator) Migrate() (err error) { if err := mgtr.addDMLEventsListener(); err != nil { return err } - if err := mgtr.applier.ReadMigrationRangeValues(); err != nil { + if err := mgtr.applier.ReadMigrationRangeValues(nil); err != nil { return err } @@ -769,6 +769,141 @@ func (mgtr *Migrator) Revert() error { return nil } +/* TODO(chriskirkland): work left to do: +* - validate that migrating binlog events after migration is in-flight work +* - validate migration waits for cutover flag (^ related) +* - DROP target table if it exists on the cluster pre-migration +* +* - Cleanup lots of debug/info logging +* - lots and lots of refactoring, state isolation, dependency injection, etc. :) + */ + +func (mgtr *Migrator) MoveTables() (err error) { + mgtr.migrationContext.Log.Infof("Moving tables %v from %s to %s (%s)", + mgtr.migrationContext.MoveTables.TableNames, + sql.EscapeName(mgtr.migrationContext.DatabaseName), + sql.EscapeName(mgtr.migrationContext.MoveTables.TargetDatabase), mgtr.migrationContext.MoveTables.TargetHost) + mgtr.migrationContext.StartTime = time.Now() + + if mgtr.migrationContext.OriginalTableName == "" { + mgtr.migrationContext.OriginalTableName = mgtr.migrationContext.MoveTables.TableNames[0] + } + + // Ensure context is cancelled on exit (cleanup) + defer mgtr.migrationContext.CancelContext() + + if mgtr.migrationContext.Hostname, err = os.Hostname(); err != nil { + return err + } + + go mgtr.listenOnPanicAbort() + + // Run on-startup hook: + if err := mgtr.hooksExecutor.OnStartup(); err != nil { + return err + } + + // After this point, we'll need to teardown anything that's been started + // so we don't leave things hanging around + defer mgtr.teardown() + + if err := mgtr.initiateInspector(); err != nil { + return err + } + if err := mgtr.checkAbort(); err != nil { + return err + } + if err := mgtr.initiateApplier(); err != nil { + return err + } + if err := mgtr.initiateStreaming(); err != nil { + return err + } + if err := mgtr.checkAbort(); err != nil { + return err + } + + // TODO(chriskirkland): move this into a well-named function + // populate the unique key for the migration. in single-cluster modes where we may be ALTERing the table, we need + // to validate the migration has a valid unique key to use here... but in move-tables, the source and target tables + // are identical,s owe just need to grab any valid UNIQUE key constraint. + mgtr.migrationContext.UniqueKey = mgtr.inspector.selectUniqueKey(mgtr.migrationContext.OriginalTableUniqueKeys) + + // columns are identical on the source and target tables + mgtr.migrationContext.SharedColumns = mgtr.migrationContext.OriginalTableColumns + mgtr.migrationContext.MappedSharedColumns = mgtr.migrationContext.OriginalTableColumns + + // this function assumes that the unique key constraint has been set. + if err := mgtr.applier.prepareQueries(); err != nil { + return err + } + + // Validation complete! Run on-validated hook. + if err := mgtr.hooksExecutor.OnValidated(); err != nil { + return err + } + + if err := mgtr.initiateServer(); err != nil { + return err + } + defer mgtr.server.RemoveSocketFile() + + if err := mgtr.countTableRows(); err != nil { + return err + } + if err := mgtr.addDMLEventsListener(); err != nil { + return err + } + + if err := mgtr.applier.ReadMigrationRangeValues(mgtr.inspector.db); err != nil { + return err + } + + mgtr.initiateThrottler() + + // Run on-before-row-copy hook + if err := mgtr.hooksExecutor.OnBeforeRowCopy(); err != nil { + return err + } + go func() { + if err := mgtr.executeWriteFuncs(); err != nil { + // Send error to PanicAbort to trigger abort + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + }() + go mgtr.iterateChunks() + mgtr.migrationContext.MarkRowCopyStartTime() + go mgtr.initiateStatus() + + mgtr.migrationContext.Log.Debugf("Operating until row copy is complete") + mgtr.consumeRowCopyComplete() + mgtr.migrationContext.Log.Infof("Row copy complete") + // Check if row copy was aborted due to error + if err := mgtr.checkAbort(); err != nil { + return err + } + if err := mgtr.hooksExecutor.OnRowCopyComplete(); err != nil { + return err + } + + //TODO: cutover here + + if err := mgtr.finalCleanup(); err != nil { + return nil + } + if err := mgtr.hooksExecutor.OnSuccess(false); err != nil { + return err + } + mgtr.migrationContext.Log.Infof("Done moving tables %v from %s to %s (%s)", + mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName), + sql.EscapeName(mgtr.migrationContext.MoveTables.TargetDatabase), mgtr.migrationContext.MoveTables.TargetHost) + // Final check for abort before declaring success + if err := mgtr.checkAbort(); err != nil { + return err + } + return nil +} + // ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external // hook access point func (mgtr *Migrator) ExecOnFailureHook() (err error) { @@ -1093,14 +1228,16 @@ func (mgtr *Migrator) initiateInspector() (err error) { return err } if err := mgtr.inspector.ValidateOriginalTable(); err != nil { - return err + return fmt.Errorf("failed to validate original table: %w", err) } if err := mgtr.inspector.InspectOriginalTable(); err != nil { - return err + return fmt.Errorf("failed to inspect original table: %w", err) } // So far so good, table is accessible and valid. // Let's get master connection config - if mgtr.migrationContext.AssumeMasterHostname == "" { + if mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.MoveTables.ConnectionConfig + } else if mgtr.migrationContext.AssumeMasterHostname == "" { // No forced master host; detect master if mgtr.migrationContext.ApplierConnectionConfig, err = mgtr.inspector.getMasterConnectionConfig(); err != nil { return err @@ -1132,7 +1269,9 @@ func (mgtr *Migrator) initiateInspector() (err error) { mgtr.migrationContext.Log.Infof("--test-on-replica or --migrate-on-replica given. Will not execute on master %+v but rather on replica %+v itself", *mgtr.migrationContext.ApplierConnectionConfig.ImpliedKey, *mgtr.migrationContext.InspectorConnectionConfig.ImpliedKey, ) - mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.InspectorConnectionConfig.Duplicate() + if !mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.InspectorConnectionConfig.Duplicate() + } if mgtr.migrationContext.GetThrottleControlReplicaKeys().Len() == 0 { mgtr.migrationContext.AddThrottleControlReplicaKey(mgtr.migrationContext.InspectorConnectionConfig.Key) } @@ -1142,6 +1281,7 @@ func (mgtr *Migrator) initiateInspector() (err error) { if err := mgtr.inspector.validateLogSlaveUpdates(); err != nil { return err } + mgtr.migrationContext.Log.Infof("Inspector validated and initialized") return nil } @@ -1172,11 +1312,11 @@ func (mgtr *Migrator) initiateStatus() { // migration, and as response to the "status" interactive command. func (mgtr *Migrator) printMigrationStatusHint(writers ...io.Writer) { w := io.MultiWriter(writers...) - fmt.Fprintf(w, "# Migrating %s.%s; Ghost table is %s.%s\n", + fmt.Fprintf(w, "# Migrating %s.%s; Target table is %s.%s\n", sql.EscapeName(mgtr.migrationContext.DatabaseName), sql.EscapeName(mgtr.migrationContext.OriginalTableName), - sql.EscapeName(mgtr.migrationContext.DatabaseName), - sql.EscapeName(mgtr.migrationContext.GetGhostTableName()), + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), + sql.EscapeName(mgtr.migrationContext.GetTargetTableName()), ) fmt.Fprintf(w, "# Migrating %+v; inspecting %+v; executing on %+v\n", *mgtr.applier.connectionConfig.ImpliedKey, @@ -1424,14 +1564,19 @@ func (mgtr *Migrator) initiateStreaming() error { if err := mgtr.eventsStreamer.InitDBConnections(); err != nil { return err } - mgtr.eventsStreamer.AddListener( - false, - mgtr.migrationContext.DatabaseName, - mgtr.migrationContext.GetChangelogTableName(), - func(dmlEntry *binlog.BinlogEntry) error { - return mgtr.onChangelogEvent(dmlEntry) - }, - ) + + if mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.Log.Info("Skipping stream of the changelog table") + } else { + mgtr.eventsStreamer.AddListener( + false, + mgtr.migrationContext.DatabaseName, + mgtr.migrationContext.GetChangelogTableName(), + func(dmlEntry *binlog.BinlogEntry) error { + return mgtr.onChangelogEvent(dmlEntry) + }, + ) + } go func() { mgtr.migrationContext.Log.Debugf("Beginning streaming") @@ -1459,10 +1604,15 @@ func (mgtr *Migrator) initiateStreaming() error { // addDMLEventsListener begins listening for binlog events on the original table, // and creates & enqueues a write task per such event. func (mgtr *Migrator) addDMLEventsListener() error { + originalTableName := mgtr.migrationContext.OriginalTableName + if mgtr.migrationContext.IsMoveTablesMode() { + originalTableName = mgtr.migrationContext.MoveTables.TableNames[0] + } + err := mgtr.eventsStreamer.AddListener( false, mgtr.migrationContext.DatabaseName, - mgtr.migrationContext.OriginalTableName, + originalTableName, func(dmlEntry *binlog.BinlogEntry) error { // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits // This is critical because this callback blocks the event streamer @@ -1474,6 +1624,12 @@ func (mgtr *Migrator) addDMLEventsListener() error { // initiateThrottler kicks in the throttling collection and the throttling checks. func (mgtr *Migrator) initiateThrottler() { + if mgtr.migrationContext.IsMoveTablesMode() { + // TODO(chriskirkland): throttle against the target cluster + mgtr.migrationContext.Log.Info("Skipping throttling in move tables mode") + return + } + mgtr.throttler = NewThrottler(mgtr.migrationContext, mgtr.applier, mgtr.inspector, mgtr.appVersion) go mgtr.throttler.initiateThrottlerCollection(mgtr.firstThrottlingCollected) @@ -1490,39 +1646,59 @@ func (mgtr *Migrator) initiateApplier() error { if err := mgtr.applier.InitDBConnections(); err != nil { return err } - if mgtr.migrationContext.Revert { - if err := mgtr.applier.CreateChangelogTable(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } - } else if !mgtr.migrationContext.Resume { - if err := mgtr.applier.ValidateOrDropExistingTables(); err != nil { - return err - } - if err := mgtr.applier.CreateChangelogTable(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } - if err := mgtr.applier.CreateGhostTable(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - return err + + if mgtr.migrationContext.IsMoveTablesMode() { + //TODO(chriskirkland): drop the target table if it exists? + + mgtr.migrationContext.Log.Infof("Fetching create table statement for `%s.%s`", + mgtr.migrationContext.DatabaseName, + mgtr.migrationContext.MoveTables.TableNames[0], + ) + createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.MoveTables.TableNames[0]) + if err != nil { + return fmt.Errorf("failed to fetch create table statement: %w", err) } - if err := mgtr.applier.AlterGhost(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table, see further error details. Bailing out") + mgtr.migrationContext.Log.Infof("Create table statement: %s", createTableStatement) + + if err := mgtr.applier.CreateTargetTable(createTableStatement); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create target table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") return err } + } else { + if mgtr.migrationContext.Revert { + if err := mgtr.applier.CreateChangelogTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + } else if !mgtr.migrationContext.Resume { + if err := mgtr.applier.ValidateOrDropExistingTables(); err != nil { + return err + } + if err := mgtr.applier.CreateChangelogTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } + if err := mgtr.applier.CreateGhostTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + if err := mgtr.applier.AlterGhost(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table, see further error details. Bailing out") + return err + } - if mgtr.migrationContext.OriginalTableAutoIncrement > 0 && !mgtr.parser.IsAutoIncrementDefined() { - // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, - // so we should copy AUTO_INCREMENT value onto our ghost table. - if err := mgtr.applier.AlterGhostAutoIncrement(); err != nil { - mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + if mgtr.migrationContext.OriginalTableAutoIncrement > 0 && !mgtr.parser.IsAutoIncrementDefined() { + // Original table has AUTO_INCREMENT value and the -alter statement does not indicate any override, + // so we should copy AUTO_INCREMENT value onto our ghost table. + if err := mgtr.applier.AlterGhostAutoIncrement(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to ALTER ghost table AUTO_INCREMENT value, see further error details. Bailing out") + return err + } + } + if _, err := mgtr.applier.WriteChangelogState(string(GhostTableMigrated)); err != nil { return err } } - if _, err := mgtr.applier.WriteChangelogState(string(GhostTableMigrated)); err != nil { - return err - } } // ensure performance_schema.metadata_locks is available. @@ -1536,7 +1712,9 @@ func (mgtr *Migrator) initiateApplier() error { mgtr.migrationContext.Log.Warning("proceeding without metadata lock check. There is a small chance of data loss if another session accesses the ghost table during cut-over. See https://github.com/github/gh-ost/pull/1536 for details") } - go mgtr.applier.InitiateHeartbeat() + if !mgtr.migrationContext.IsMoveTablesMode() { + go mgtr.applier.InitiateHeartbeat() + } return nil } @@ -1578,7 +1756,7 @@ func (mgtr *Migrator) iterateChunks() error { } // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() + hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues(mgtr.inspector.db) if err != nil { return err // wrapping call will retry } @@ -1597,10 +1775,16 @@ func (mgtr *Migrator) iterateChunks() error { // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. return nil } - _, rowsAffected, _, err := mgtr.applier.ApplyIterationInsertQuery() + var rowsAffected int64 + if mgtr.migrationContext.IsMoveTablesMode() { + _, rowsAffected, _, err = mgtr.applier.ApplyIterationMoveTableCopyQueries(mgtr.inspector.db) + } else { + _, rowsAffected, _, err = mgtr.applier.ApplyIterationInsertQuery() + } if err != nil { - return err // wrapping call will retry + return fmt.Errorf("ApplyIterationInsertQuery failed: %w", err) // wrapping call will retry } + mgtr.migrationContext.Log.Debugf("ApplyIterationInsertQuery affected %d rows", rowsAffected) if mgtr.migrationContext.PanicOnWarnings { if len(mgtr.migrationContext.MigrationLastInsertSQLWarnings) > 0 { @@ -1810,13 +1994,18 @@ func (mgtr *Migrator) executeWriteFuncs() error { return nil } - mgtr.throttler.throttle(nil) + if !mgtr.migrationContext.IsMoveTablesMode() { + // disable throttling in move-tables mode for now + // https://github.com/github/database-infrastructure/issues/8212 + mgtr.throttler.throttle(nil) + } // We give higher priority to event processing, then secondary priority to // rowcopy select { case eventStruct := <-mgtr.applyEventsQueue: { + mgtr.migrationContext.Log.Debugf("[execWriteFuncs] Processing apply event struct") if err := mgtr.onApplyEventStruct(eventStruct); err != nil { return err } @@ -1826,6 +2015,7 @@ func (mgtr *Migrator) executeWriteFuncs() error { select { case copyRowsFunc := <-mgtr.copyRowsQueue: { + mgtr.migrationContext.Log.Debugf("[execWriteFuncs] Processing row copy function") copyRowsStartTime := time.Now() // Retries are handled within the copyRowsFunc if err := copyRowsFunc(); err != nil { @@ -1889,14 +2079,19 @@ func (mgtr *Migrator) finalCleanup() error { if createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.GetGhostTableName()); err == nil { mgtr.migrationContext.Log.Infof("New table structure follows") fmt.Println(createTableStatement) - } else { - mgtr.migrationContext.Log.Errore(err) + } else if !mgtr.migrationContext.IsMoveTablesMode() { + mgtr.migrationContext.Log.Errore(fmt.Errorf("error showing create table: %w", err)) } } if err := mgtr.eventsStreamer.Close(); err != nil { mgtr.migrationContext.Log.Errore(err) } + if mgtr.migrationContext.IsMoveTablesMode() { + // all done + return nil + } + if err := mgtr.retryOperation(mgtr.applier.DropChangelogTable); err != nil { return err } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 7a1bcb48f..80759bce7 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -601,7 +601,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) suite.Require().NoError(migrator.applier.prepareQueries()) - suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) + suite.Require().NoError(migrator.applier.ReadMigrationRangeValues(nil)) go migrator.iterateChunks() go func() { @@ -672,7 +672,7 @@ func (suite *MigratorTestSuite) TestCopierCompositePK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) suite.Require().NoError(migrator.applier.prepareQueries()) - suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) + suite.Require().NoError(migrator.applier.ReadMigrationRangeValues(nil)) go migrator.iterateChunks() go func() { diff --git a/go/logic/streamer.go b/go/logic/streamer.go index ecb936069..f0a30541b 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -65,6 +65,8 @@ func (es *EventsStreamer) AddListener( es.listenersMutex.Lock() defer es.listenersMutex.Unlock() + es.migrationContext.Log.Infof("Adding listener for %s.%s", databaseName, tableName) + if databaseName == "" { return fmt.Errorf("empty database name in AddListener") } diff --git a/go/logic/test_utils.go b/go/logic/test_utils.go index f552cfc76..966eed1b6 100644 --- a/go/logic/test_utils.go +++ b/go/logic/test_utils.go @@ -17,6 +17,7 @@ var ( testMysqlUser = "root" testMysqlPass = "root-password" testMysqlDatabase = "test" + testMysqlDatabaseOther = "test_other" testMysqlTableName = "testing" ) @@ -36,6 +37,10 @@ func getTestOldTableName() string { return fmt.Sprintf("`%s`.`_%s_del`", testMysqlDatabase, testMysqlTableName) } +func getTestOtherTableName() string { + return fmt.Sprintf("`%s`.`%s`", testMysqlDatabaseOther, testMysqlTableName) +} + func getTestConnectionConfig(ctx context.Context, container testcontainers.Container) (*mysql.ConnectionConfig, error) { host, err := container.Host(ctx) if err != nil { diff --git a/go/sql/builder.go b/go/sql/builder.go index 6e41eb4e1..de60b25e3 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -7,6 +7,7 @@ package sql import ( "fmt" + "slices" "strconv" "strings" ) @@ -329,6 +330,143 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait) } +type MoveTableCopySelectQueryBuilder struct { + preparedStatement string + argsMapping []int + argsCount int +} + +func NewMoveTableCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, columns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTableCopySelectQueryBuilder, error) { + sourceDatabaseName = EscapeName(sourceDatabaseName) + sourceTableName = EscapeName(sourceTableName) + columnNames := columns.Names() + for i := range columnNames { + columnNames[i] = EscapeName(columnNames[i]) + } + sharedColumnsListing := strings.Join(columnNames, ", ") + uniqueKey = EscapeName(uniqueKey) + var minRangeComparisonSign = GreaterThanComparisonSign + if includeRangeStartValues { + minRangeComparisonSign = GreaterThanOrEqualsComparisonSign + } + rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) + rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) + dummyArgs := make([]any, len(uniqueKeyColumns.Columns())) + for i := range dummyArgs { + dummyArgs[i] = i + } + var argsMapping []int + + rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, dummyArgs, minRangeComparisonSign) + if err != nil { + return nil, err + } + for _, a := range rangeExplodedArgs { + idx := slices.Index(dummyArgs, a) + if idx == -1 { + return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a) + } + argsMapping = append(argsMapping, idx) + } + + rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, dummyArgs, LessThanOrEqualsComparisonSign) + if err != nil { + return nil, err + } + for _, a := range rangeExplodedArgs { + idx := slices.Index(dummyArgs, a) + if idx == -1 { + return nil, fmt.Errorf("failed to build args mapping, missing argument pointer %v", a) + } + argsMapping = append(argsMapping, idx+len(dummyArgs)) + } + + stmt := fmt.Sprintf(` + select /* gh-ost %s.%s */ %s + from + %s.%s + force index (%s) + where + (%s and %s) + `, + sourceDatabaseName, sourceTableName, sharedColumnsListing, + sourceDatabaseName, sourceTableName, + uniqueKey, + rangeStartComparison, rangeEndComparison, + ) + return &MoveTableCopySelectQueryBuilder{ + preparedStatement: stmt, + argsMapping: argsMapping, + argsCount: len(dummyArgs) * 2, + }, nil +} + +func (b *MoveTableCopySelectQueryBuilder) BuildQuery(rangeStartArgs, rangeEndArgs []any) (string, []any, error) { + if len(rangeStartArgs)+len(rangeEndArgs) != b.argsCount { + return "", nil, fmt.Errorf("got %d args but expected %d", len(rangeStartArgs)+len(rangeEndArgs), b.argsCount) + } + explodedArgs := make([]any, 0, len(b.argsMapping)) + for _, idx := range b.argsMapping { + if idx < len(rangeStartArgs) { + explodedArgs = append(explodedArgs, rangeStartArgs[idx]) + } else { + explodedArgs = append(explodedArgs, rangeEndArgs[idx-len(rangeStartArgs)]) + } + } + return b.preparedStatement, explodedArgs, nil +} + +type MoveTableCopyInsertQueryBuilder struct { + preparedStatement string + valueListPlaceholder string + valueListSize int +} + +func NewMoveTableCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, columns *ColumnList) (*MoveTableCopyInsertQueryBuilder, error) { + targetDatabaseName = EscapeName(targetDatabaseName) + targetTableName = EscapeName(targetTableName) + columnsNames := columns.Names() + for i := range columnsNames { + columnsNames[i] = EscapeName(columnsNames[i]) + } + sharedColumnsListing := strings.Join(columnsNames, ", ") + valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(columns), ", ") + ")" + valueListSize := len(columnsNames) + stmt := fmt.Sprintf(` + insert /* gh-ost %s.%s */ ignore + into + %s.%s + (%s) + values + `, + targetDatabaseName, targetTableName, + targetDatabaseName, targetTableName, + sharedColumnsListing, + ) + return &MoveTableCopyInsertQueryBuilder{ + preparedStatement: stmt, + valueListPlaceholder: valueListPlaceholder, + valueListSize: valueListSize, + }, nil +} + +func (b *MoveTableCopyInsertQueryBuilder) BuildQuery(values []*ColumnValues) (string, []any, error) { + var explodedArgs []any + var builder strings.Builder + builder.WriteString(b.preparedStatement) + for i, value := range values { + if len(value.AbstractValues()) != b.valueListSize { + return "", nil, fmt.Errorf("got %d column values but expected %d", len(value.AbstractValues()), b.valueListSize) + } + if i > 0 { + builder.WriteString(",\n") + } + builder.WriteString(b.valueListPlaceholder) + explodedArgs = append(explodedArgs, value.AbstractValues()...) + } + return builder.String(), explodedArgs, nil +} + func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { if uniqueKeyColumns.Len() == 0 { return "", explodedArgs, fmt.Errorf("got 0 columns in BuildUniqueKeyRangeEndPreparedQuery") diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 0d10b75e7..b0e756f13 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -784,6 +784,233 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { } } +func TestMoveTableCopySelectQueryBuilder(t *testing.T) { + t.Run("single column unique key", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true) + require.NoError(t, err) + + query, args, err := builder.BuildQuery([]any{3}, []any{103}) + require.NoError(t, err) + + expected := ` + select /* gh-ost mydb.tbl */ id, name, position + from + mydb.tbl + force index (PRIMARY) + where + (((id > ?) or ((id = ?))) and ((id < ?) or ((id = ?)))) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{3, 3, 103, 103}, args) + }) + + t.Run("single column unique key without range start", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, false) + require.NoError(t, err) + + query, args, err := builder.BuildQuery([]any{3}, []any{103}) + require.NoError(t, err) + + expected := ` + select /* gh-ost mydb.tbl */ id, name, position + from + mydb.tbl + force index (PRIMARY) + where + (((id > ?)) and ((id < ?) or ((id = ?)))) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{3, 103, 103}, args) + }) + + t.Run("compound unique key", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true) + require.NoError(t, err) + + query, args, err := builder.BuildQuery([]any{3, 17}, []any{103, 117}) + require.NoError(t, err) + + expected := ` + select /* gh-ost mydb.tbl */ id, name, position + from + mydb.tbl + force index (name_position_uidx) + where + (((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?))) + and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, args) + }) + + t.Run("reuses prepared statement across calls", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true) + require.NoError(t, err) + + query1, args1, err := builder.BuildQuery([]any{1}, []any{10}) + require.NoError(t, err) + query2, args2, err := builder.BuildQuery([]any{11}, []any{20}) + require.NoError(t, err) + + require.Equal(t, query1, query2) + require.Equal(t, []any{1, 1, 10, 10}, args1) + require.Equal(t, []any{11, 11, 20, 20}, args2) + }) + + t.Run("wrong args count", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name"}) + uniqueKeyColumns := NewColumnList([]string{"id"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "PRIMARY", uniqueKeyColumns, true) + require.NoError(t, err) + + _, _, err = builder.BuildQuery([]any{1, 2}, []any{10}) + require.Error(t, err) + }) +} + +func BenchmarkMoveTableCopySelectQueryBuilderBuildQuery(b *testing.B) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + + builder, err := NewMoveTableCopySelectQueryBuilder("mydb", "tbl", sharedColumns, "name_position_uidx", uniqueKeyColumns, true) + if err != nil { + b.Fatal(err) + } + + rangeStartArgs := []any{3, 17} + rangeEndArgs := []any{103, 117} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := builder.BuildQuery(rangeStartArgs, rangeEndArgs) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMoveTableCopyInsertQueryBuilder(t *testing.T) { + t.Run("single row", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice", 10}), + } + query, args, err := builder.BuildQuery(values) + require.NoError(t, err) + + expected := ` + insert /* gh-ost mydb.ghost */ ignore + into + mydb.ghost + (id, name, position) + values + (?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{1, "alice", 10}, args) + }) + + t.Run("multiple rows", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice", 10}), + ToColumnValues([]interface{}{2, "bob", 20}), + ToColumnValues([]interface{}{3, "carol", 30}), + } + query, args, err := builder.BuildQuery(values) + require.NoError(t, err) + + expected := ` + insert /* gh-ost mydb.ghost */ ignore + into + mydb.ghost + (id, name, position) + values + (?, ?, ?), + (?, ?, ?), + (?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []any{1, "alice", 10, 2, "bob", 20, 3, "carol", 30}, args) + }) + + t.Run("wrong column count", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice"}), + } + _, _, err = builder.BuildQuery(values) + require.Error(t, err) + }) + + t.Run("reuses prepared statement", func(t *testing.T) { + sharedColumns := NewColumnList([]string{"id", "name"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + require.NoError(t, err) + + values1 := []*ColumnValues{ToColumnValues([]interface{}{1, "a"})} + values2 := []*ColumnValues{ToColumnValues([]interface{}{2, "b"})} + + query1, args1, err := builder.BuildQuery(values1) + require.NoError(t, err) + query2, args2, err := builder.BuildQuery(values2) + require.NoError(t, err) + + require.Equal(t, query1, query2) + require.Equal(t, []any{1, "a"}, args1) + require.Equal(t, []any{2, "b"}, args2) + }) +} + +func BenchmarkMoveTableCopyInsertQueryBuilderBuildQuery(b *testing.B) { + sharedColumns := NewColumnList([]string{"id", "name", "position"}) + + builder, err := NewMoveTableCopyInsertQueryBuilder("mydb", "ghost", sharedColumns) + if err != nil { + b.Fatal(err) + } + + values := []*ColumnValues{ + ToColumnValues([]interface{}{1, "alice", 10}), + ToColumnValues([]interface{}{2, "bob", 20}), + ToColumnValues([]interface{}{3, "carol", 30}), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := builder.BuildQuery(values) + if err != nil { + b.Fatal(err) + } + } +} + func TestCheckpointQueryBuilder(t *testing.T) { databaseName := "mydb" tableName := "_tbl_ghk"