diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index c88894926..a2ead2761 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -373,6 +373,9 @@ func main() { if migrationContext.PostponeCutOverFlagFile == "" { log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables") } + if !migrationContext.Checkpoint { + log.Fatal("--checkpoint must be specified when using --move-tables") + } migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",") for i := range migrationContext.MoveTables.TableNames { migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i]) diff --git a/go/logic/applier.go b/go/logic/applier.go index c025000b5..cbca5cb12 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -110,6 +110,35 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { } } +func (apl *Applier) checkpointDB() *gosql.DB { + if apl.migrationContext.IsMoveTablesMode() && apl.moveTablesTargetDB != nil { + return apl.moveTablesTargetDB + } + return apl.db +} + +func (apl *Applier) checkpointDatabaseName() string { + if apl.migrationContext.IsMoveTablesMode() { + return apl.migrationContext.GetTargetDatabaseName() + } + return apl.migrationContext.DatabaseName +} + +func (apl *Applier) checkpointDrainGTIDString(chk *Checkpoint) string { + if chk == nil || chk.MoveTablesCutOverDrainGTID == nil || chk.MoveTablesCutOverDrainGTID.IsEmpty() { + return "" + } + return chk.MoveTablesCutOverDrainGTID.String() +} + +func (apl *Applier) checkpointRangeColumnNames() (minColumnNames []string, maxColumnNames []string) { + for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() { + minColumnNames = append(minColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_min") + maxColumnNames = append(maxColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_max") + } + return minColumnNames, maxColumnNames +} + // compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings // for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions, // hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid @@ -358,9 +387,10 @@ func (apl *Applier) prepareQueries() (err error) { } if apl.migrationContext.Checkpoint { if apl.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder( - apl.migrationContext.DatabaseName, + apl.checkpointDatabaseName(), apl.migrationContext.GetCheckpointTableName(), &apl.migrationContext.UniqueKey.Columns, + apl.migrationContext.IsMoveTablesMode(), ); err != nil { return err } @@ -752,6 +782,12 @@ func (apl *Applier) CreateCheckpointTable() error { "`gh_ost_dml_applied` bigint", "`gh_ost_is_cutover` tinyint(1) DEFAULT '0'", } + if apl.migrationContext.IsMoveTablesMode() { + colDefs = append(colDefs, + "`gh_ost_move_tables_cutover_started` tinyint(1) DEFAULT '0'", + "`gh_ost_move_tables_drain_gtid` text charset ascii", + ) + } for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() { if col.MySQLType == "" { return fmt.Errorf("column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name)) @@ -768,12 +804,12 @@ func (apl *Applier) CreateCheckpointTable() error { } query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)", - sql.EscapeName(apl.migrationContext.DatabaseName), + sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()), strings.Join(colDefs, ",\n "), ) apl.migrationContext.Log.Infof("Created checkpoint table") - if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { + if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil { return err } return nil @@ -782,14 +818,14 @@ func (apl *Applier) CreateCheckpointTable() error { // dropTable drops a given table on the applied host func (apl *Applier) dropTable(tableName string) error { query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`, - sql.EscapeName(apl.migrationContext.DatabaseName), + sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(tableName), ) apl.migrationContext.Log.Infof("Dropping table %s.%s", - sql.EscapeName(apl.migrationContext.DatabaseName), + sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(tableName), ) - if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil { + if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil { return err } apl.migrationContext.Log.Infof("Table dropped") @@ -954,8 +990,11 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { return insertId, err } args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied, chk.IsCutover) + if apl.migrationContext.IsMoveTablesMode() { + args = append(args, chk.MoveTablesCutOverStarted, apl.checkpointDrainGTIDString(chk)) + } args = append(args, uniqueKeyArgs...) - res, err := apl.db.Exec(query, args...) + res, err := apl.checkpointDB().Exec(query, args...) if err != nil { return insertId, err } @@ -963,15 +1002,39 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) { } func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) { - row := apl.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()))) + minColumnNames, maxColumnNames := apl.checkpointRangeColumnNames() + selectColumns := []string{ + "gh_ost_chk_id", + "gh_ost_chk_timestamp", + "gh_ost_chk_coords", + "gh_ost_chk_iteration", + "gh_ost_rows_copied", + "gh_ost_dml_applied", + "gh_ost_is_cutover", + } + if apl.migrationContext.IsMoveTablesMode() { + selectColumns = append(selectColumns, "gh_ost_move_tables_cutover_started", "gh_ost_move_tables_drain_gtid") + } + selectColumns = append(selectColumns, minColumnNames...) + selectColumns = append(selectColumns, maxColumnNames...) + + row := apl.checkpointDB().QueryRow(fmt.Sprintf( + `select /* gh-ost */ %s from %s.%s order by gh_ost_chk_id desc limit 1`, + strings.Join(selectColumns, ", "), + sql.EscapeName(apl.checkpointDatabaseName()), + sql.EscapeName(apl.migrationContext.GetCheckpointTableName()), + )) chk := &Checkpoint{ IterationRangeMin: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()), IterationRangeMax: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()), } - var coordStr string + var coordStr, drainGTIDStr string var timestamp int64 ptrs := []interface{}{&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover} + if apl.migrationContext.IsMoveTablesMode() { + ptrs = append(ptrs, &chk.MoveTablesCutOverStarted, &drainGTIDStr) + } ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...) ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...) err := row.Scan(ptrs...) @@ -995,6 +1058,43 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) { } chk.LastTrxCoords = fileCoords } + if apl.migrationContext.IsMoveTablesMode() && drainGTIDStr != "" { + drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) + if err != nil { + return nil, err + } + chk.MoveTablesCutOverDrainGTID = drainGTID + } + return chk, nil +} + +func (apl *Applier) ReadMoveTablesCutOverCheckpoint() (*Checkpoint, error) { + row := apl.checkpointDB().QueryRow(fmt.Sprintf(`select /* gh-ost */ gh_ost_chk_id, gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName()))) + chk := &Checkpoint{} + var coordStr, drainGTIDStr string + var timestamp int64 + err := row.Scan(&chk.Id, ×tamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover, &chk.MoveTablesCutOverStarted, &drainGTIDStr) + if err != nil { + if errors.Is(err, gosql.ErrNoRows) { + return nil, ErrNoCheckpointFound + } + return nil, err + } + chk.Timestamp = time.Unix(timestamp, 0) + if coordStr != "" { + coords, err := mysql.NewGTIDBinlogCoordinates(coordStr) + if err != nil { + return nil, err + } + chk.LastTrxCoords = coords + } + if drainGTIDStr != "" { + drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) + if err != nil { + return nil, err + } + chk.MoveTablesCutOverDrainGTID = drainGTID + } return chk, nil } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index e0fd034b9..159b5de7e 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1005,6 +1005,96 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() { suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied) suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied) suite.Require().Equal(chk.IsCutover, gotChk.IsCutover) + suite.Require().False(gotChk.MoveTablesCutOverStarted) + suite.Require().Nil(gotChk.MoveTablesCutOverDrainGTID) +} + +func (suite *ApplierTestSuite) TestWriteCheckpointMoveTables() { + ctx := context.Background() + + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null, id2 char(4) CHARACTER SET utf8mb4, primary key(id, id2))", getTestTableName())) + suite.Require().NoError(err) + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?), (?,?)", getTestTableName()), 411, "君子懷德", 411, "小人懷土", 212, "君子不器") + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.UseGTIDs = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "id2"}) + migrationContext.Checkpoint = true + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabase + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id", "id2"}), + } + + inspector := NewInspector(migrationContext) + suite.Require().NoError(inspector.InitDBConnections()) + + err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns) + suite.Require().NoError(err) + + applier := NewApplier(migrationContext) + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + err = applier.CreateCheckpointTable() + suite.Require().NoError(err) + + err = applier.prepareQueries() + suite.Require().NoError(err) + + err = applier.ReadMigrationRangeValues(inspector.db) + suite.Require().NoError(err) + + coords, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-10") + suite.Require().NoError(err) + drainGTID, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-20") + suite.Require().NoError(err) + + chk := &Checkpoint{ + LastTrxCoords: coords, + IterationRangeMin: applier.migrationContext.MigrationRangeMinValues, + IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues, + Iteration: 3, + RowsCopied: 1000, + DMLApplied: 2000, + IsCutover: false, + MoveTablesCutOverStarted: true, + MoveTablesCutOverDrainGTID: drainGTID, + } + id, err := applier.WriteCheckpoint(chk) + suite.Require().NoError(err) + suite.Require().Equal(int64(1), id) + + gotChk, err := applier.ReadLastCheckpoint() + suite.Require().NoError(err) + + suite.Require().Equal(chk.Iteration, gotChk.Iteration) + suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String()) + suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String()) + suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String()) + suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied) + suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied) + suite.Require().Equal(chk.IsCutover, gotChk.IsCutover) + suite.Require().True(gotChk.MoveTablesCutOverStarted) + suite.Require().NotNil(gotChk.MoveTablesCutOverDrainGTID) + suite.Require().Equal(drainGTID.String(), gotChk.MoveTablesCutOverDrainGTID.String()) } func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() { diff --git a/go/logic/checkpoint.go b/go/logic/checkpoint.go index cffe08c4b..f81a2bb16 100644 --- a/go/logic/checkpoint.go +++ b/go/logic/checkpoint.go @@ -24,9 +24,11 @@ type Checkpoint struct { IterationRangeMin *sql.ColumnValues // IterationRangeMax is the max shared key value // for the chunk copier range. - IterationRangeMax *sql.ColumnValues - Iteration int64 - RowsCopied int64 - DMLApplied int64 - IsCutover bool + IterationRangeMax *sql.ColumnValues + Iteration int64 + RowsCopied int64 + DMLApplied int64 + IsCutover bool + MoveTablesCutOverStarted bool + MoveTablesCutOverDrainGTID mysql.BinlogCoordinates } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b6432947b..015a5ea6a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -804,6 +804,121 @@ func (mgtr *Migrator) prepareMoveTablesCopyState() { mgtr.migrationContext.MappedSharedColumns = mgtr.migrationContext.OriginalTableColumns } +func (mgtr *Migrator) hydrateMoveTablesStateFromTarget() error { + probeContext := base.NewMigrationContext() + probeContext.DatabaseName = mgtr.migrationContext.GetTargetDatabaseName() + targetInspector := &Inspector{db: mgtr.applier.moveTablesTargetDB, migrationContext: probeContext} + + columns, virtualColumns, uniqueKeys, err := targetInspector.InspectTableColumnsAndUniqueKeys(mgtr.migrationContext.GetTargetTableName()) + if err != nil { + return err + } + + mgtr.migrationContext.OriginalTableColumns = columns + mgtr.migrationContext.OriginalTableVirtualColumns = virtualColumns + mgtr.migrationContext.OriginalTableUniqueKeys = uniqueKeys + mgtr.migrationContext.UniqueKey = targetInspector.selectUniqueKey(uniqueKeys) + mgtr.migrationContext.SharedColumns = columns + mgtr.migrationContext.MappedSharedColumns = columns + return nil +} + +func (mgtr *Migrator) persistMoveTablesCutOverCheckpoint(drainGTID mysql.BinlogCoordinates) (*Checkpoint, error) { + mgtr.applier.CurrentCoordinatesMutex.Lock() + if mgtr.applier.CurrentCoordinates == nil || mgtr.applier.CurrentCoordinates.IsEmpty() { + mgtr.applier.CurrentCoordinatesMutex.Unlock() + return nil, errors.New("current coordinates are empty, cannot checkpoint move-tables cutover") + } + safeCoords := mgtr.applier.CurrentCoordinates.Clone() + mgtr.applier.CurrentCoordinatesMutex.Unlock() + + chk := &Checkpoint{ + LastTrxCoords: safeCoords, + IterationRangeMin: sql.NewColumnValues(mgtr.migrationContext.UniqueKey.Len()), + IterationRangeMax: sql.NewColumnValues(mgtr.migrationContext.UniqueKey.Len()), + Iteration: mgtr.migrationContext.GetIteration(), + RowsCopied: atomic.LoadInt64(&mgtr.migrationContext.TotalRowsCopied), + DMLApplied: atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), + MoveTablesCutOverStarted: true, + MoveTablesCutOverDrainGTID: drainGTID, + } + mgtr.applier.LastIterationRangeMutex.Lock() + if mgtr.applier.LastIterationRangeMinValues != nil { + chk.IterationRangeMin = mgtr.applier.LastIterationRangeMinValues.Clone() + } + if mgtr.applier.LastIterationRangeMaxValues != nil { + chk.IterationRangeMax = mgtr.applier.LastIterationRangeMaxValues.Clone() + } + mgtr.applier.LastIterationRangeMutex.Unlock() + id, err := mgtr.applier.WriteCheckpoint(chk) + chk.Id = id + return chk, err +} + +func (mgtr *Migrator) drainMoveTablesCutOver(drainGTID mysql.BinlogCoordinates) error { + drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second + mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)", + drainTimeout, moveTablesCutOverDrainPollInterval) + drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout) + defer cancel() + ticker := time.NewTicker(moveTablesCutOverDrainPollInterval) + defer ticker.Stop() + for { + if err := mgtr.checkAbort(); err != nil { + return err + } + mgtr.applier.CurrentCoordinatesMutex.Lock() + applierCoords := mgtr.applier.CurrentCoordinates + mgtr.applier.CurrentCoordinatesMutex.Unlock() + applyBacklog := len(mgtr.applyEventsQueue) + streamerBacklog := 0 + if mgtr.eventsStreamer != nil { + streamerBacklog = len(mgtr.eventsStreamer.eventsChannel) + } + if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) && applyBacklog == 0 && streamerBacklog == 0 { + mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID") + return nil + } + if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) { + mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)", applyBacklog, streamerBacklog) + } else { + mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling") + } + select { + case <-drainCtx.Done(): + return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", drainTimeout) + case <-ticker.C: + } + } +} + +func (mgtr *Migrator) resumeMoveTablesCutOverFromCheckpoint(chk *Checkpoint) error { + if chk == nil || !chk.MoveTablesCutOverStarted || chk.MoveTablesCutOverDrainGTID == nil || chk.MoveTablesCutOverDrainGTID.IsEmpty() { + return errors.New("checkpoint does not contain move-tables cutover resume state") + } + if chk.LastTrxCoords != nil && !chk.LastTrxCoords.IsEmpty() { + mgtr.applier.CurrentCoordinatesMutex.Lock() + mgtr.applier.CurrentCoordinates = chk.LastTrxCoords.Clone() + mgtr.applier.CurrentCoordinatesMutex.Unlock() + } + mgtr.migrationContext.Log.Infof("Resuming move-tables cutover from checkpoint at coords=%+v drain_gtid=%s", + chk.LastTrxCoords, chk.MoveTablesCutOverDrainGTID.DisplayString()) + if err := mgtr.drainMoveTablesCutOver(chk.MoveTablesCutOverDrainGTID); err != nil { + return err + } + if mgtr.migrationContext.Checkpoint { + if _, err := mgtr.persistMoveTablesCutOverCheckpoint(chk.MoveTablesCutOverDrainGTID); err != nil { + mgtr.migrationContext.Log.Warningf("failed to checkpoint drained move-tables cutover: %+v", err) + } + } + atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) + mgtr.migrationContext.Log.Debugf("T4: CutOverCompleteFlag set") + if err := mgtr.hooksExecutor.OnSuccess(false); err != nil { + return fmt.Errorf("on-success hook failed: %w", err) + } + return nil +} + func (mgtr *Migrator) MoveTables() (err error) { mgtr.migrationContext.Log.Infof("Moving tables %v from %s to %s (%s)", mgtr.migrationContext.MoveTables.TableNames, @@ -833,18 +948,95 @@ func (mgtr *Migrator) MoveTables() (err error) { // so we don't leave things hanging around defer mgtr.teardown() + if mgtr.migrationContext.Checkpoint && mgtr.migrationContext.Resume { + mgtr.migrationContext.ApplierConnectionConfig = mgtr.migrationContext.MoveTables.ConnectionConfig + mgtr.applier = NewApplier(mgtr.migrationContext) + if err := mgtr.applier.InitDBConnections(); err != nil { + return err + } + cutoverResumeCheckpoint, err := mgtr.applier.ReadMoveTablesCutOverCheckpoint() + if err != nil && !errors.Is(err, ErrNoCheckpointFound) { + return err + } + if cutoverResumeCheckpoint != nil && cutoverResumeCheckpoint.MoveTablesCutOverStarted && cutoverResumeCheckpoint.MoveTablesCutOverDrainGTID != nil && !cutoverResumeCheckpoint.MoveTablesCutOverDrainGTID.IsEmpty() { + mgtr.migrationContext.InitialStreamerCoords = cutoverResumeCheckpoint.LastTrxCoords + mgtr.migrationContext.Iteration = cutoverResumeCheckpoint.Iteration + atomic.StoreInt64(&mgtr.migrationContext.TotalRowsCopied, cutoverResumeCheckpoint.RowsCopied) + atomic.StoreInt64(&mgtr.migrationContext.TotalDMLEventsApplied, cutoverResumeCheckpoint.DMLApplied) + if err := mgtr.hydrateMoveTablesStateFromTarget(); err != nil { + return fmt.Errorf("failed to hydrate move-tables resume state from target: %w", err) + } + if err := mgtr.createFlagFiles(); err != nil { + return err + } + if err := mgtr.initiateStreaming(); err != nil { + return err + } + if err := mgtr.applier.prepareQueries(); err != nil { + return err + } + if err := mgtr.hooksExecutor.OnValidated(); err != nil { + return err + } + if err := mgtr.initiateServer(); err != nil { + return err + } + defer mgtr.server.RemoveSocketFile() + if err := mgtr.addDMLEventsListener(); err != nil { + return err + } + go func() { + if err := mgtr.executeWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + }() + go mgtr.initiateStatus() + if err := mgtr.resumeMoveTablesCutOverFromCheckpoint(cutoverResumeCheckpoint); err != nil { + return err + } + if err := mgtr.finalCleanup(); err != nil { + return nil + } + mgtr.migrationContext.Log.Infof("Done moving tables %v from %s to %s (%s)", + mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName), + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), mgtr.migrationContext.MoveTables.TargetHost) + if err := mgtr.checkAbort(); err != nil { + return err + } + return nil + } + mgtr.applier.Teardown() + mgtr.applier = nil + } + if err := mgtr.initiateInspector(); err != nil { return err } if err := mgtr.checkAbort(); err != nil { return err } + mgtr.prepareMoveTablesCopyState() if err := mgtr.initiateApplier(); err != nil { return err } if err := mgtr.checkAbort(); err != nil { return err } + if mgtr.migrationContext.Checkpoint && mgtr.migrationContext.Resume { + lastCheckpoint, err := mgtr.applier.ReadLastCheckpoint() + if err != nil { + return mgtr.migrationContext.Log.Errorf("no checkpoint found, unable to resume: %+v", err) + } + mgtr.migrationContext.Log.Infof("Resuming move-tables from checkpoint coords=%+v range_min=%+v range_max=%+v iteration=%d", + lastCheckpoint.LastTrxCoords, lastCheckpoint.IterationRangeMin.String(), lastCheckpoint.IterationRangeMax.String(), lastCheckpoint.Iteration) + + mgtr.migrationContext.MigrationIterationRangeMinValues = lastCheckpoint.IterationRangeMin + mgtr.migrationContext.MigrationIterationRangeMaxValues = lastCheckpoint.IterationRangeMax + mgtr.migrationContext.Iteration = lastCheckpoint.Iteration + atomic.StoreInt64(&mgtr.migrationContext.TotalRowsCopied, lastCheckpoint.RowsCopied) + atomic.StoreInt64(&mgtr.migrationContext.TotalDMLEventsApplied, lastCheckpoint.DMLApplied) + mgtr.migrationContext.InitialStreamerCoords = lastCheckpoint.LastTrxCoords + } if err := mgtr.createFlagFiles(); err != nil { return err } @@ -858,12 +1050,15 @@ func (mgtr *Migrator) MoveTables() (err error) { return err } - mgtr.prepareMoveTablesCopyState() - // this function assumes that the unique key constraint has been set. if err := mgtr.applier.prepareQueries(); err != nil { return err } + if mgtr.migrationContext.Checkpoint && !mgtr.migrationContext.Resume { + if err := mgtr.applier.CreateCheckpointTable(); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create checkpoint table, see further error details") + } + } // Validation complete! Run on-validated hook. if err := mgtr.hooksExecutor.OnValidated(); err != nil { @@ -900,6 +1095,9 @@ func (mgtr *Migrator) MoveTables() (err error) { go mgtr.iterateChunks() mgtr.migrationContext.MarkRowCopyStartTime() go mgtr.initiateStatus() + if mgtr.migrationContext.Checkpoint { + go mgtr.checkpointLoop() + } mgtr.migrationContext.Log.Debugf("Operating until row copy is complete") mgtr.consumeRowCopyComplete() @@ -995,7 +1193,8 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { // No retry on the RENAME: it is not idempotent — a partial success leaves // the table already renamed and a retry would fail. The operator re-runs // the whole hook chain on failure. - pinnedConn, err := mgtr.inspector.db.Conn(context.Background()) + cutOverCtx := mgtr.migrationContext.GetContext() + pinnedConn, err := mgtr.inspector.db.Conn(cutOverCtx) if err != nil { return fmt.Errorf("failed to pin connection for T1/T2: %w", err) } @@ -1008,7 +1207,7 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { sql.EscapeName(sourceDB), sql.EscapeName(sourceTable), sql.EscapeName(sourceDB), sql.EscapeName(delTable)) mgtr.migrationContext.Log.Infof("T1: renaming source table: %s", renameQuery) - if _, err := pinnedConn.ExecContext(context.Background(), renameQuery); err != nil { + if _, err := pinnedConn.ExecContext(cutOverCtx, renameQuery); err != nil { return fmt.Errorf("RENAME failed: %w", err) } @@ -1016,7 +1215,7 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { // @@GLOBAL scope is explicit so the intent is unambiguous in the SQL itself. // Design: https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md#32-correctness-verification-for-p4 var drainGTIDStr string - if err := pinnedConn.QueryRowContext(context.Background(), "select @@gtid_executed").Scan(&drainGTIDStr); err != nil { + if err := pinnedConn.QueryRowContext(cutOverCtx, "select @@global.gtid_executed").Scan(&drainGTIDStr); err != nil { return fmt.Errorf("drain GTID capture failed: %w", err) } drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) @@ -1024,46 +1223,18 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) { return fmt.Errorf("drain GTID parse failed: %w", err) } mgtr.migrationContext.Log.Infof("T2: captured drain GTID: %s", drainGTID.DisplayString()) - - // ----- T3: drain poll ----- - // Wait until applier.CurrentCoordinates catches up to drainGTID. The drain - // is complete when the applier's coords are not strictly smaller than the - // drain target (i.e. the applier contains every GTID in drainGTID). Reads - // of CurrentCoordinates hold the mutex per applier.go:75. Per-iteration - // logging is Debug only to avoid spamming Info on a hot loop. - drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second - mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)", - drainTimeout, moveTablesCutOverDrainPollInterval) - drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout) - defer cancel() - ticker := time.NewTicker(moveTablesCutOverDrainPollInterval) - defer ticker.Stop() - for { - if err := mgtr.checkAbort(); err != nil { - return err - } - mgtr.applier.CurrentCoordinatesMutex.Lock() - applierCoords := mgtr.applier.CurrentCoordinates - mgtr.applier.CurrentCoordinatesMutex.Unlock() - applyBacklog := len(mgtr.applyEventsQueue) - streamerBacklog := 0 - if mgtr.eventsStreamer != nil { - streamerBacklog = len(mgtr.eventsStreamer.eventsChannel) - } - if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) && applyBacklog == 0 && streamerBacklog == 0 { - mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID") - break - } - if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) { - mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)", applyBacklog, streamerBacklog) - } else { - mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling") + if mgtr.migrationContext.Checkpoint { + if _, err := mgtr.persistMoveTablesCutOverCheckpoint(drainGTID); err != nil { + return fmt.Errorf("failed to persist move-tables cutover checkpoint: %w", err) } - select { - case <-drainCtx.Done(): - return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", drainTimeout) - case <-ticker.C: - // next iteration + } + + if err := mgtr.drainMoveTablesCutOver(drainGTID); err != nil { + return err + } + if mgtr.migrationContext.Checkpoint { + if _, err := mgtr.persistMoveTablesCutOverCheckpoint(drainGTID); err != nil { + mgtr.migrationContext.Log.Warningf("failed to checkpoint drained move-tables cutover: %+v", err) } } @@ -1836,13 +2007,20 @@ func (mgtr *Migrator) initiateApplier() error { } if mgtr.migrationContext.IsMoveTablesMode() { - createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.MoveTables.TableNames[0]) - if err != nil { - return fmt.Errorf("failed to fetch create table statement: %w", err) - } - if err := mgtr.applier.CreateTargetTable(createTableStatement); err != nil { - mgtr.migrationContext.Log.Errorf("unable to create target table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - return err + if !mgtr.migrationContext.Resume { + createTableStatement, err := mgtr.inspector.showCreateTable(mgtr.migrationContext.MoveTables.TableNames[0]) + if err != nil { + return fmt.Errorf("failed to fetch create table statement: %w", err) + } + if err := mgtr.applier.CreateTargetTable(createTableStatement); err != nil { + mgtr.migrationContext.Log.Errorf("unable to create target table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + } else { + mgtr.migrationContext.Log.Infof("Resuming move-tables; reusing existing target table %s.%s", + sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), + sql.EscapeName(mgtr.migrationContext.GetTargetTableName()), + ) } } else { if mgtr.migrationContext.Revert { @@ -2272,9 +2450,17 @@ func (mgtr *Migrator) finalCleanup() error { } if mgtr.migrationContext.IsMoveTablesMode() { + if mgtr.migrationContext.Checkpoint { + if mgtr.migrationContext.OkToDropTable { + if err := mgtr.retryOperation(mgtr.applier.DropCheckpointTable); err != nil { + return err + } + } else if !mgtr.migrationContext.Noop { + mgtr.migrationContext.Log.Infof("Am not dropping checkpoint table without `--ok-to-drop-table`. To drop the checkpoint table, issue:") + mgtr.migrationContext.Log.Infof("-- drop table %s.%s", sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), sql.EscapeName(mgtr.migrationContext.GetCheckpointTableName())) + } + } // for move-tables mode, we're done at this point - // TODO(zacharysierakowski): when we add the checkpoint table in for 1.6, make sure we cleanup - // the checkpoint table here first before returning (looks like that's a few lines below changelog table cleanup) return nil } diff --git a/go/logic/migrator_move_tables_cutover_test.go b/go/logic/migrator_move_tables_cutover_test.go index 22426be7d..47838922c 100644 --- a/go/logic/migrator_move_tables_cutover_test.go +++ b/go/logic/migrator_move_tables_cutover_test.go @@ -131,6 +131,43 @@ func TestMoveTablesCutOver_PostponeGateFiresOnBeginPostponedOnce(t *testing.T) { "post-state: hook failure must leave CutOverCompleteFlag unset") } +// TestResumeMoveTablesCutOverFromCheckpointAlreadyDrained verifies the crash- +// safe resume branch skips T1 entirely and proceeds directly to T5 when the +// persisted checkpoint already shows a drain-satisfied position. +func TestResumeMoveTablesCutOverFromCheckpointAlreadyDrained(t *testing.T) { + var calls []string + fakeHooks := &recordingHooks{name: "fake", calls: &calls} + + ctx := base.NewMigrationContext() + ctx.Hooks = fakeHooks + ctx.Checkpoint = false + + m := NewMigrator(ctx, "test") + m.applier = NewApplier(ctx) + + drainGTID, err := mysql.NewGTIDBinlogCoordinates("11111111-1111-1111-1111-111111111111:1-10") + require.NoError(t, err) + + chk := &Checkpoint{ + LastTrxCoords: drainGTID, + MoveTablesCutOverStarted: true, + MoveTablesCutOverDrainGTID: drainGTID, + } + + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.CutOverCompleteFlag), "pre-state: flag must be 0") + require.Empty(t, calls, "pre-state: no hooks recorded") + + require.NoError(t, m.resumeMoveTablesCutOverFromCheckpoint(chk)) + + require.Equal(t, int64(1), atomic.LoadInt64(&ctx.CutOverCompleteFlag), + "post-state: resume path must set CutOverCompleteFlag before exiting") + require.Equal(t, []string{"fake:OnSuccess"}, calls, + "post-state: resume path should jump directly to T5 without rerunning T0/T1") + if m.applier.CurrentCoordinates != nil { + require.Equal(t, drainGTID.String(), m.applier.CurrentCoordinates.String()) + } +} + // ----------------------------------------------------------------------------- // Integration tests - real MySQL via testcontainers, exercise T1/T2/T3. // diff --git a/go/sql/builder.go b/go/sql/builder.go index 1c3c612fa..3f4c375ef 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -120,11 +120,12 @@ func BuildEqualsPreparedComparison(columns []string) (result string, err error) // It holds the prepared query statement so it doesn't need to be recreated every time. type CheckpointInsertQueryBuilder struct { - uniqueKeyColumns *ColumnList - preparedStatement string + uniqueKeyColumns *ColumnList + preparedStatement string + includeMoveTablesCutOverColumns bool } -func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns *ColumnList) (*CheckpointInsertQueryBuilder, error) { +func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns *ColumnList, includeMoveTablesCutOverColumns bool) (*CheckpointInsertQueryBuilder, error) { if uniqueKeyColumns.Len() == 0 { return nil, fmt.Errorf("got 0 columns in BuildSetCheckpointInsertQuery") } @@ -139,26 +140,48 @@ func NewCheckpointQueryBuilder(databaseName, tableName string, uniqueKeyColumns } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) - stmt := fmt.Sprintf(` - insert /* gh-ost */ - into %s.%s - (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, - gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, - %s, %s) - values - (unix_timestamp(now()), ?, ?, - ?, ?, ?, - %s, %s)`, - databaseName, tableName, - strings.Join(minUniqueColNames, ", "), - strings.Join(maxUniqueColNames, ", "), - strings.Join(values, ", "), - strings.Join(values, ", "), - ) - b := &CheckpointInsertQueryBuilder{ - uniqueKeyColumns: uniqueKeyColumns, - preparedStatement: stmt, + uniqueKeyColumns: uniqueKeyColumns, + preparedStatement: func() string { + if includeMoveTablesCutOverColumns { + return fmt.Sprintf(` + insert /* gh-ost */ + into %s.%s + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, + gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid, + %s, %s) + values + (unix_timestamp(now()), ?, ?, + ?, ?, ?, + ?, ?, + %s, %s)`, + databaseName, tableName, + strings.Join(minUniqueColNames, ", "), + strings.Join(maxUniqueColNames, ", "), + strings.Join(values, ", "), + strings.Join(values, ", "), + ) + } + + return fmt.Sprintf(` + insert /* gh-ost */ + into %s.%s + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, + %s, %s) + values + (unix_timestamp(now()), ?, ?, + ?, ?, ?, + %s, %s)`, + databaseName, tableName, + strings.Join(minUniqueColNames, ", "), + strings.Join(maxUniqueColNames, ", "), + strings.Join(values, ", "), + strings.Join(values, ", "), + ) + }(), + includeMoveTablesCutOverColumns: includeMoveTablesCutOverColumns, } return b, nil } diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 0fcf31441..38b5043ab 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -1347,7 +1347,7 @@ func TestCheckpointQueryBuilder(t *testing.T) { tableName := "_tbl_ghk" valueArgs := []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)} uniqueKeyColumns := NewColumnList([]string{"name", "position", "my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长很长很长"}) - builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns) + builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns, false) require.NoError(t, err) query, uniqueKeyArgs, err := builder.BuildQuery(valueArgs) require.NoError(t, err) @@ -1366,3 +1366,30 @@ func TestCheckpointQueryBuilder(t *testing.T) { require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) require.Equal(t, []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)}, uniqueKeyArgs) } + +func TestMoveTablesCheckpointQueryBuilder(t *testing.T) { + databaseName := "mydb" + tableName := "_tbl_ghk" + valueArgs := []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)} + uniqueKeyColumns := NewColumnList([]string{"name", "position", "my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长很长很长"}) + builder, err := NewCheckpointQueryBuilder(databaseName, tableName, uniqueKeyColumns, true) + require.NoError(t, err) + query, uniqueKeyArgs, err := builder.BuildQuery(valueArgs) + require.NoError(t, err) + expected := ` + insert /* gh-ost */ into mydb._tbl_ghk + (gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, + gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, + gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid, + name_min, position_min, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_min, + name_max, position_max, my_very_long_column_that_is_64_utf8_characters_long_很长很长很长很长_max) + values + (unix_timestamp(now()), ?, ?, + ?, ?, ?, + ?, ?, + ?, ?, ?, + ?, ?, ?) + ` + require.Equal(t, normalizeQuery(expected), normalizeQuery(query)) + require.Equal(t, []interface{}{"mona", "mascot", int8(-17), "anothername", "anotherposition", int8(-2)}, uniqueKeyArgs) +} diff --git a/script/move-tables/README.md b/script/move-tables/README.md index db57f750a..d223a8e1f 100644 --- a/script/move-tables/README.md +++ b/script/move-tables/README.md @@ -31,3 +31,30 @@ Note: replicas in this local topology are configured with `read_only=ON` and `super_read_only=ON`. If you point `--host` at `mysql-source-replica` (3308), the cutover `RENAME TABLE` step will fail by design. Use source primary (3307) as the inspected host when you want cutover to rename on source. + +Start continuous inserts against the source. +```bash +script/move-tables/insert-source-primary-loop +``` + +Check the target - it should have the initial data from the source and should be receiving the new data. +```bash +script/move-tables/mysql-target-primary -D gh_ost_test_db -e "SELECT * FROM gh_ost_test;" +``` + +Remove the cutover flag file. +```bash +rm /tmp/ghost-move-tables.postpone.flag +``` + +You'll see the continous inserts will stop because of the table rename. + +Check the source - table has been renamed. +```bash +script/move-tables/mysql-source-primary -D gh_ost_test_db -e "SELECT * FROM _gh_ost_test_del;" +``` + +Check the target has the same set of data. +```bash +script/move-tables/mysql-target-primary -D gh_ost_test_db -e "SELECT * FROM gh_ost_test;" +``` \ No newline at end of file