Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ func main() {
if migrationContext.PostponeCutOverFlagFile == "" {
log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables")
}
if !migrationContext.Checkpoint {
log.Fatal("--checkpoint must be specified when using --move-tables")
}
migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",")
for i := range migrationContext.MoveTables.TableNames {
migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i])
Expand Down
77 changes: 69 additions & 8 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,27 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
}
}

func (apl *Applier) checkpointDB() *gosql.DB {
if apl.migrationContext.IsMoveTablesMode() && apl.moveTablesTargetDB != nil {
return apl.moveTablesTargetDB
}
return apl.db
}

func (apl *Applier) checkpointDatabaseName() string {
if apl.migrationContext.IsMoveTablesMode() {
return apl.migrationContext.GetTargetDatabaseName()
}
return apl.migrationContext.DatabaseName
}

func (apl *Applier) checkpointDrainGTIDString(chk *Checkpoint) string {
if chk == nil || chk.MoveTablesCutoverDrainGTID == nil || chk.MoveTablesCutoverDrainGTID.IsEmpty() {
return ""
}
return chk.MoveTablesCutoverDrainGTID.String()
}

// compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings
// for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions,
// hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid
Expand Down Expand Up @@ -358,9 +379,10 @@ func (apl *Applier) prepareQueries() (err error) {
}
if apl.migrationContext.Checkpoint {
if apl.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder(
apl.migrationContext.DatabaseName,
apl.checkpointDatabaseName(),
apl.migrationContext.GetCheckpointTableName(),
&apl.migrationContext.UniqueKey.Columns,
apl.migrationContext.IsMoveTablesMode(),
); err != nil {
return err
}
Expand Down Expand Up @@ -752,6 +774,12 @@ func (apl *Applier) CreateCheckpointTable() error {
"`gh_ost_dml_applied` bigint",
"`gh_ost_is_cutover` tinyint(1) DEFAULT '0'",
}
if apl.migrationContext.IsMoveTablesMode() {
colDefs = append(colDefs,
"`gh_ost_move_tables_cutover_started` tinyint(1) DEFAULT '0'",
"`gh_ost_move_tables_drain_gtid` text charset ascii",
)
}
for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() {
if col.MySQLType == "" {
return fmt.Errorf("column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name))
Expand All @@ -768,12 +796,12 @@ func (apl *Applier) CreateCheckpointTable() error {
}

query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)",
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(apl.migrationContext.GetCheckpointTableName()),
strings.Join(colDefs, ",\n "),
)
apl.migrationContext.Log.Infof("Created checkpoint table")
if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil {
if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil {
return err
}
return nil
Expand All @@ -782,14 +810,14 @@ func (apl *Applier) CreateCheckpointTable() error {
// dropTable drops a given table on the applied host
func (apl *Applier) dropTable(tableName string) error {
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(tableName),
)
apl.migrationContext.Log.Infof("Dropping table %s.%s",
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil {
if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil {
return err
}
apl.migrationContext.Log.Infof("Table dropped")
Expand Down Expand Up @@ -954,16 +982,19 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
return insertId, err
}
args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied, chk.IsCutover)
if apl.migrationContext.IsMoveTablesMode() {
args = append(args, chk.MoveTablesCutOverStarted, apl.checkpointDrainGTIDString(chk))
}
args = append(args, uniqueKeyArgs...)
res, err := apl.db.Exec(query, args...)
res, err := apl.checkpointDB().Exec(query, args...)
if err != nil {
return insertId, err
}
return res.LastInsertId()
}

func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
row := apl.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetCheckpointTableName())))
row := apl.checkpointDB().QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName())))
chk := &Checkpoint{
IterationRangeMin: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()),
IterationRangeMax: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()),
Expand Down Expand Up @@ -998,6 +1029,36 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
return chk, nil
}

func (apl *Applier) ReadMoveTablesCutOverCheckpoint() (*Checkpoint, error) {
row := apl.checkpointDB().QueryRow(fmt.Sprintf(`select /* gh-ost */ gh_ost_chk_id, gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName())))
chk := &Checkpoint{}
var coordStr, drainGTIDStr string
var timestamp int64
err := row.Scan(&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover, &chk.MoveTablesCutOverStarted, &drainGTIDStr)
if err != nil {
if errors.Is(err, gosql.ErrNoRows) {
return nil, ErrNoCheckpointFound
}
return nil, err
}
chk.Timestamp = time.Unix(timestamp, 0)
if coordStr != "" {
coords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
if err != nil {
return nil, err
}
chk.LastTrxCoords = coords
}
if drainGTIDStr != "" {
drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr)
if err != nil {
return nil, err
}
chk.MoveTablesCutoverDrainGTID = drainGTID
}
return chk, nil
}

// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// Apl is done asynchronously
func (apl *Applier) InitiateHeartbeat() {
Expand Down
2 changes: 2 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,8 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied)
suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied)
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
suite.Require().False(gotChk.MoveTablesCutOverStarted)
suite.Require().Nil(gotChk.MoveTablesCutoverDrainGTID)
}

func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() {
Expand Down
12 changes: 7 additions & 5 deletions go/logic/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type Checkpoint struct {
IterationRangeMin *sql.ColumnValues
// IterationRangeMax is the max shared key value
// for the chunk copier range.
IterationRangeMax *sql.ColumnValues
Iteration int64
RowsCopied int64
DMLApplied int64
IsCutover bool
IterationRangeMax *sql.ColumnValues
Iteration int64
RowsCopied int64
DMLApplied int64
IsCutover bool
MoveTablesCutOverStarted bool
MoveTablesCutoverDrainGTID mysql.BinlogCoordinates
}
Loading
Loading