Skip to content
19 changes: 13 additions & 6 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,13 @@ type MigrationContext struct {

// move tables:
MoveTables struct {
TableNames []string // List of table names to be moved.
TargetHost string // Target hostname for the move. This must be a primary/writable host.
TargetPort int // Target MySQL port for the move.
TargetUser string // Target username for the move. If not specified, it will default to the source user.
TargetPass string // Target password for the move. If not specified, it will default to the source password.
TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name.
TableNames []string // List of table names to be moved.
TargetHost string // Target hostname for the move. This must be a primary/writable host.
TargetPort int // Target MySQL port for the move.
TargetUser string // Target username for the move. If not specified, it will default to the source user.
TargetPass string // Target password for the move. If not specified, it will default to the source password.
TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name.
ConnectionConfig *mysql.ConnectionConfig
}

Log Logger
Expand Down Expand Up @@ -356,6 +357,9 @@ func (mctx *MigrationContext) SetConnectionConfig(storageEngine string) error {
}
mctx.InspectorConnectionConfig.TransactionIsolation = transactionIsolation
mctx.ApplierConnectionConfig.TransactionIsolation = transactionIsolation
if mctx.MoveTables.ConnectionConfig != nil {
mctx.MoveTables.ConnectionConfig.TransactionIsolation = transactionIsolation
}
return nil
}

Expand All @@ -366,6 +370,9 @@ func (mctx *MigrationContext) SetConnectionCharset(charset string) {

mctx.InspectorConnectionConfig.Charset = charset
mctx.ApplierConnectionConfig.Charset = charset
if mctx.MoveTables.ConnectionConfig != nil {
mctx.MoveTables.ConnectionConfig.Charset = charset
}
}

func getSafeTableName(baseName string, suffix string) string {
Expand Down
198 changes: 191 additions & 7 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ type Applier struct {
migrationLockName string
migrationLockStop chan struct{}
migrationLockDone chan struct{}

moveTablesTargetDB *gosql.DB
moveTablesConnectionConfig *mysql.ConnectionConfig
moveTablesCopySelectFirstQueryBuilder *sql.MoveTableCopySelectQueryBuilder
moveTablesCopySelectNextQueryBuilder *sql.MoveTableCopySelectQueryBuilder
moveTablesCopyInsertQueryBuilder *sql.MoveTableCopyInsertQueryBuilder
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand All @@ -99,6 +105,8 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
migrationContext: migrationContext,
finishedMigrating: 0,
name: "applier",

moveTablesConnectionConfig: migrationContext.MoveTables.ConnectionConfig,
}
}

Expand Down Expand Up @@ -149,6 +157,15 @@ func (apl *Applier) InitDBConnections() (err error) {
if err := apl.readTableColumns(); err != nil {
return err
}
if apl.moveTablesConnectionConfig != nil {
moveTablesURI := apl.moveTablesConnectionConfig.GetDBUri(apl.migrationContext.MoveTables.TargetDatabase) + "&multiStatements=true"
if apl.moveTablesTargetDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, moveTablesURI); err != nil {
return err
}
if _, err := base.ValidateConnection(apl.moveTablesTargetDB, apl.moveTablesConnectionConfig, apl.migrationContext, apl.name); err != nil {
return err
}
}
Comment thread
danieljoos marked this conversation as resolved.
Comment thread
danieljoos marked this conversation as resolved.
apl.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", apl.connectionConfig.ImpliedKey, apl.migrationContext.ApplierMySQLVersion)
return nil
}
Expand Down Expand Up @@ -297,26 +314,33 @@ func (apl *Applier) releaseMigrationLock() {
}

func (apl *Applier) prepareQueries() (err error) {
targetDatabaseName := apl.migrationContext.DatabaseName
targetTableName := apl.migrationContext.GetGhostTableName()
if apl.migrationContext.IsMoveTablesMode() {
targetDatabaseName = apl.migrationContext.MoveTables.TargetDatabase
targetTableName = apl.migrationContext.OriginalTableName
}

if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.GetGhostTableName(),
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
&apl.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
if apl.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.GetGhostTableName(),
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.SharedColumns,
apl.migrationContext.MappedSharedColumns,
); err != nil {
return err
}
if apl.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.GetGhostTableName(),
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.SharedColumns,
apl.migrationContext.MappedSharedColumns,
Expand All @@ -333,6 +357,35 @@ func (apl *Applier) prepareQueries() (err error) {
return err
}
}
if apl.migrationContext.IsMoveTablesMode() {
if apl.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.OriginalTableName,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.UniqueKey.Name,
&apl.migrationContext.UniqueKey.Columns,
true, // <-- include start range values for first select query
); err != nil {
return err
}
if apl.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.OriginalTableName,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.UniqueKey.Name,
&apl.migrationContext.UniqueKey.Columns,
false,
); err != nil {
return err
}
if apl.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder(
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -1164,6 +1217,130 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i
return chunkSize, rowsAffected, duration, nil
}

// ApplyIterationMoveTableCopyQueries issues a SELECT query on the original table and an INSERT query on the target table,
// copying a chunk of rows. It is used when `--move-tables` is specified, instead of ApplyIterationInsertQuery.
func (apl *Applier) ApplyIterationMoveTableCopyQueries() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize)

// First, select data from the source database:
Comment thread
danieljoos marked this conversation as resolved.
rows, err := func() ([]*sql.ColumnValues, error) {
var qb *sql.MoveTableCopySelectQueryBuilder
if apl.migrationContext.GetIteration() == 0 {
qb = apl.moveTablesCopySelectFirstQueryBuilder
} else {
qb = apl.moveTablesCopySelectNextQueryBuilder
}
query, explodedArgs, err := qb.BuildQuery(
apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
)
if err != nil {
return nil, err
}
sqlRows, err := apl.db.Query(query, explodedArgs...)
if err != nil {
return nil, err
}
defer sqlRows.Close()
chunkRows := make([]*sql.ColumnValues, 0, chunkSize)
for sqlRows.Next() {
row := sql.NewColumnValues(apl.migrationContext.SharedColumns.Len())
Comment thread
danieljoos marked this conversation as resolved.
err := sqlRows.Scan(row.ValuesPointers...)
if err != nil {
return nil, err
}
chunkRows = append(chunkRows, row)
}
Comment thread
danieljoos marked this conversation as resolved.
if rowsErr := sqlRows.Err(); rowsErr != nil {
return nil, rowsErr
}
return chunkRows, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}

// no need to INSERT if there are no rows to copy:
if len(rows) == 0 {
duration = time.Since(startTime)
return chunkSize, 0, duration, nil
}

// Then, insert data into the destination database:
sqlResult, err := func() (gosql.Result, error) {
query, explodedArgs, err := apl.moveTablesCopyInsertQueryBuilder.BuildQuery(rows)
if err != nil {
Comment thread
danieljoos marked this conversation as resolved.
return nil, err
}
tx, err := apl.moveTablesTargetDB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()

sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s', %s`,
apl.migrationContext.ApplierTimeZone,
apl.generateSqlModeQuery())
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}

sqlResult, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
}

if apl.migrationContext.PanicOnWarnings {
rows, err := tx.Query("SHOW WARNINGS")
if err != nil {
return nil, err
}
defer rows.Close()
if err = rows.Err(); err != nil {
return nil, err
}
migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex()
if err != nil {
return nil, err
}
var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
}

if err := tx.Commit(); err != nil {
return nil, err
}
return sqlResult, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
apl.migrationContext.Log.Debugf(
"Issued SELECT+INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
apl.migrationContext.MigrationIterationRangeMinValues,
apl.migrationContext.MigrationIterationRangeMaxValues,
apl.migrationContext.GetIteration(),
chunkSize,
)

return chunkSize, rowsAffected, duration, nil
}

// LockOriginalTable places a write lock on the original table
func (apl *Applier) LockOriginalTable() error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
Expand Down Expand Up @@ -1783,7 +1960,11 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e
ctx := context.Background()

err := func() error {
conn, err := apl.db.Conn(ctx)
db := apl.db
if apl.migrationContext.IsMoveTablesMode() {
db = apl.moveTablesTargetDB
}
conn, err := db.Conn(ctx)
Comment thread
danieljoos marked this conversation as resolved.
if err != nil {
return err
}
Expand Down Expand Up @@ -1888,6 +2069,9 @@ func (apl *Applier) Teardown() {
apl.releaseMigrationLock()
apl.db.Close()
apl.singletonDB.Close()
if apl.moveTablesTargetDB != nil {
apl.moveTablesTargetDB.Close()
}
atomic.StoreInt64(&apl.finishedMigrating, 1)
}

Expand Down
Loading
Loading