Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,12 @@ func (mctx *MigrationContext) SetCutOverLockTimeoutSeconds(timeoutSeconds int64)
if timeoutSeconds < 1 {
return fmt.Errorf("minimal timeout is 1sec. Timeout remains at %d", mctx.CutOverLockTimeoutSeconds)
}
if timeoutSeconds > 10 {
return fmt.Errorf("maximal timeout is 10sec. Timeout remains at %d", mctx.CutOverLockTimeoutSeconds)
maxTimeout := int64(10)
if mctx.IsMoveTablesMode() {
maxTimeout = 60
}
if timeoutSeconds > maxTimeout {
return fmt.Errorf("maximal timeout is %dsec. Timeout remains at %d", maxTimeout, mctx.CutOverLockTimeoutSeconds)
}
mctx.CutOverLockTimeoutSeconds = timeoutSeconds
return nil
Expand Down
19 changes: 19 additions & 0 deletions go/base/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,22 @@ func TestSetAbortError_ThreadSafe(t *testing.T) {
t.Errorf("Stored error %v not in list of sent errors", got)
}
}

func TestSetCutOverLockTimeoutSecondsRangeByMode(t *testing.T) {
{
ctx := NewMigrationContext()
require.NoError(t, ctx.SetCutOverLockTimeoutSeconds(10))
err := ctx.SetCutOverLockTimeoutSeconds(11)
require.Error(t, err)
require.Contains(t, err.Error(), "maximal timeout is 10sec")
}

{
ctx := NewMigrationContext()
ctx.MoveTables.TableNames = []string{"tbl"}
require.NoError(t, ctx.SetCutOverLockTimeoutSeconds(60))
err := ctx.SetCutOverLockTimeoutSeconds(61)
require.Error(t, err)
require.Contains(t, err.Error(), "maximal timeout is 60sec")
}
}
12 changes: 12 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ func main() {

flag.CommandLine.SetOutput(os.Stdout)
flag.Parse()
cutOverLockTimeoutUserSpecified := false
flag.Visit(func(f *flag.Flag) {
if f.Name == "cut-over-lock-timeout-seconds" {
cutOverLockTimeoutUserSpecified = true
}
})

if *checkFlag {
return
Expand Down Expand Up @@ -364,6 +370,9 @@ func main() {
if migrationContext.MoveTables.TargetHost == "" {
log.Fatal("--target-host must be specified when using --move-tables")
}
if migrationContext.PostponeCutOverFlagFile == "" {
log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables")
}
migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",")
for i := range migrationContext.MoveTables.TableNames {
migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i])
Expand All @@ -384,6 +393,9 @@ func main() {
if migrationContext.MoveTables.TargetDatabase == "" {
migrationContext.MoveTables.TargetDatabase = migrationContext.DatabaseName
}
if !cutOverLockTimeoutUserSpecified {
*cutOverLockTimeoutSeconds = 60
}
migrationContext.MoveTables.ConnectionConfig = mysql.NewConnectionConfig()
}

Expand Down
75 changes: 44 additions & 31 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

"github.com/openark/golib/sqlutils"
)

var (
Expand All @@ -30,13 +28,6 @@ var (
RetrySleepFn = time.Sleep
checkpointTimeout = 2 * time.Second

// moveTablesCutOverDrainTimeout caps how long T3 of the move-tables
// cooperative cutover (#8209) waits for the applier to catch up to the
// drain GTID captured at T2. 60s is a starting default chosen over
// CutOverLockTimeoutSeconds (which is capped at 1-10s by
// SetCutOverLockTimeoutSeconds and would be too short for a real drain
// under load). Up for review with Daniel/Eric in the PR.
moveTablesCutOverDrainTimeout = 60 * time.Second
// moveTablesCutOverDrainPollInterval is the per-iteration sleep in T3's
// drain poll. 100ms per move_table_mode.md §1.5.
moveTablesCutOverDrainPollInterval = 100 * time.Millisecond
Expand Down Expand Up @@ -851,6 +842,15 @@ func (mgtr *Migrator) MoveTables() (err error) {
if err := mgtr.initiateApplier(); err != nil {
return err
}
if err := mgtr.checkAbort(); err != nil {
return err
}
if err := mgtr.createFlagFiles(); err != nil {
return err
}
if err := mgtr.checkAbort(); err != nil {
return err
}
if err := mgtr.initiateStreaming(); err != nil {
return err
}
Expand Down Expand Up @@ -985,29 +985,38 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) {
return fmt.Errorf("on-before-cut-over hook failed: %w", err)
}

// ----- T1: RENAME source table -----
// Issued on mgtr.inspector.db (the privileged source connection per
// move_table_mode.md §1.5). No retry: RENAME is not idempotent — a partial
// success leaves the table already renamed and a retry would fail. The
// operator re-runs the whole hook chain on failure.
// ----- T1 + T2: RENAME then capture @@gtid_executed on the same connection -----
// Pin both operations to a single *sql.Conn so MySQL's within-session
// ordering guarantee makes it impossible for T2 to observe a state that
// pre-dates T1's commit. Using mgtr.inspector.db directly would let the
// pool schedule T1 and T2 on different underlying TCP connections (or, with
// a proxy, different servers), breaking the happens-before relationship.
//
// No retry on the RENAME: it is not idempotent — a partial success leaves
// the table already renamed and a retry would fail. The operator re-runs
// the whole hook chain on failure.
pinnedConn, err := mgtr.inspector.db.Conn(context.Background())
if err != nil {
return fmt.Errorf("failed to pin connection for T1/T2: %w", err)
}
defer pinnedConn.Close()
Comment on lines +998 to +1002

sourceDB := mgtr.migrationContext.DatabaseName
sourceTable := mgtr.migrationContext.OriginalTableName
delTable := mgtr.migrationContext.GetOldTableName()
renameQuery := fmt.Sprintf("RENAME TABLE %s.%s TO %s.%s",
sql.EscapeName(sourceDB), sql.EscapeName(sourceTable),
sql.EscapeName(sourceDB), sql.EscapeName(delTable))
mgtr.migrationContext.Log.Infof("T1: renaming source table: %s", renameQuery)
if _, err := sqlutils.ExecNoPrepare(mgtr.inspector.db, renameQuery); err != nil {
if _, err := pinnedConn.ExecContext(context.Background(), renameQuery); err != nil {
return fmt.Errorf("RENAME failed: %w", err)
}

// ----- T2: capture @@gtid_executed on the SAME *sql.DB handle as T1 -----
// The design doc specifies @@gtid_executed. We query @@GLOBAL.gtid_executed explicitly rather than the unqualified
// @@gtid_executed form to make the global server-wide scope unambiguous
// in the SQL itself.
// ----- T2: capture @@gtid_executed on the SAME connection as T1 -----
// @@GLOBAL scope is explicit so the intent is unambiguous in the SQL itself.
// Design: https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md#32-correctness-verification-for-p4
var drainGTIDStr string
if err := mgtr.inspector.db.QueryRow("select @@global.gtid_executed").Scan(&drainGTIDStr); err != nil {
if err := pinnedConn.QueryRowContext(context.Background(), "select @@gtid_executed").Scan(&drainGTIDStr); err != nil {
return fmt.Errorf("drain GTID capture failed: %w", err)
}
drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr)
Expand All @@ -1022,15 +1031,10 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) {
// drain target (i.e. the applier contains every GTID in drainGTID). Reads
// of CurrentCoordinates hold the mutex per applier.go:75. Per-iteration
// logging is Debug only to avoid spamming Info on a hot loop.
//
// Timeout is a named constant (moveTablesCutOverDrainTimeout = 60s). NOTE:
// the internalization doc suggested reusing CutOverLockTimeoutSeconds, but
// that's capped at 1-10s (context.go:SetCutOverLockTimeoutSeconds) — too
// short for a real drain under load. 60s default is up for review with
// Daniel/Eric; flagged in the PR description.
drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second
mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)",
moveTablesCutOverDrainTimeout, moveTablesCutOverDrainPollInterval)
drainCtx, cancel := context.WithTimeout(context.Background(), moveTablesCutOverDrainTimeout)
drainTimeout, moveTablesCutOverDrainPollInterval)
drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout)
defer cancel()
ticker := time.NewTicker(moveTablesCutOverDrainPollInterval)
defer ticker.Stop()
Expand All @@ -1041,14 +1045,23 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) {
mgtr.applier.CurrentCoordinatesMutex.Lock()
applierCoords := mgtr.applier.CurrentCoordinates
mgtr.applier.CurrentCoordinatesMutex.Unlock()
if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) {
applyBacklog := len(mgtr.applyEventsQueue)
streamerBacklog := 0
if mgtr.eventsStreamer != nil {
streamerBacklog = len(mgtr.eventsStreamer.eventsChannel)
}
if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) && applyBacklog == 0 && streamerBacklog == 0 {
mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID")
break
}
mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling")
if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) {
mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)", applyBacklog, streamerBacklog)
} else {
mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling")
}
select {
case <-drainCtx.Done():
return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", moveTablesCutOverDrainTimeout)
return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", drainTimeout)
case <-ticker.C:
// next iteration
}
Expand Down
56 changes: 47 additions & 9 deletions go/logic/migrator_move_tables_cutover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
"testing"
"time"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/testcontainers/testcontainers-go"
testmysql "github.com/testcontainers/testcontainers-go/modules/mysql"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
)

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -282,26 +281,25 @@ func (s *MoveTablesCutOverSuite) TestRenameFailurePropagates() {
}

// TestDrainTimeoutPropagates maps to T3 timeout handling: applier coordinates
// never reach the drain GTID -> drain poll bounded by moveTablesCutOverDrainTimeout
// never reach the drain GTID -> drain poll bounded by CutOverLockTimeoutSeconds
// returns a wrapped error and the flag is not set.
func (s *MoveTablesCutOverSuite) TestDrainTimeoutPropagates() {
ctx := context.Background()
_, err := s.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY)", getTestTableName()))
s.Require().NoError(err)

// Patch the package-level timeout/poll just for this test.
origTimeout, origPoll := moveTablesCutOverDrainTimeout, moveTablesCutOverDrainPollInterval
moveTablesCutOverDrainTimeout = 200 * time.Millisecond
// Patch poll interval and use a 1-second drain timeout for a bounded test.
origPoll := moveTablesCutOverDrainPollInterval
moveTablesCutOverDrainPollInterval = 50 * time.Millisecond
s.T().Cleanup(func() {
moveTablesCutOverDrainTimeout = origTimeout
moveTablesCutOverDrainPollInterval = origPoll
})

var calls []string
fakeHooks := &recordingHooks{name: "fake", calls: &calls}
// initialCoords nil - drain comparison never satisfies.
m, mc := s.buildMigrator(fakeHooks, nil)
mc.CutOverLockTimeoutSeconds = 1

s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), "pre-state: flag must be 0")

Expand All @@ -320,6 +318,46 @@ func (s *MoveTablesCutOverSuite) TestDrainTimeoutPropagates() {
"post-state: only T0 fires before the drain loop")
}

// TestDrainWaitsForQueuedDML ensures T3 does not declare success just because
// applier.CurrentCoordinates already contains the drain GTID while there is
// still source-table DML queued for application.
func (s *MoveTablesCutOverSuite) TestDrainWaitsForQueuedDML() {
ctx := context.Background()
_, err := s.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY)", getTestTableName()))
s.Require().NoError(err)

origPoll := moveTablesCutOverDrainPollInterval
moveTablesCutOverDrainPollInterval = 50 * time.Millisecond
s.T().Cleanup(func() {
moveTablesCutOverDrainPollInterval = origPoll
})

var calls []string
fakeHooks := &recordingHooks{name: "fake", calls: &calls}
m, mc := s.buildMigrator(fakeHooks, s.containingDrainGTID())
mc.CutOverLockTimeoutSeconds = 1
m.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{
DmlEvent: &binlog.BinlogDMLEvent{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
},
Coordinates: s.containingDrainGTID(),
})

s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), "pre-state: flag must be 0")
err = m.moveTablesCutOver()
s.Require().Error(err)
s.Require().Contains(err.Error(), "drain poll timed out")
s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag),
"post-state: queued DML must keep T3 from reaching T4")
for _, c := range calls {
s.Require().NotEqual("fake:OnSuccess", c, "OnSuccess must not fire while backlog remains")
}
s.Require().Equal([]string{"fake:OnBeforeCutOver"}, calls,
"post-state: only T0 fires before the drain loop times out")
}

func TestMoveTablesCutOver(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration suite in short mode")
Expand Down
Loading
Loading