diff --git a/go/base/context.go b/go/base/context.go index 4daa57b66..bd008c22c 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -508,8 +508,12 @@ func (mctx *MigrationContext) SetCutOverLockTimeoutSeconds(timeoutSeconds int64) if timeoutSeconds < 1 { return fmt.Errorf("minimal timeout is 1sec. Timeout remains at %d", mctx.CutOverLockTimeoutSeconds) } - if timeoutSeconds > 10 { - return fmt.Errorf("maximal timeout is 10sec. Timeout remains at %d", mctx.CutOverLockTimeoutSeconds) + maxTimeout := int64(10) + if mctx.IsMoveTablesMode() { + maxTimeout = 60 + } + if timeoutSeconds > maxTimeout { + return fmt.Errorf("maximal timeout is %dsec. Timeout remains at %d", maxTimeout, mctx.CutOverLockTimeoutSeconds) } mctx.CutOverLockTimeoutSeconds = timeoutSeconds return nil diff --git a/go/base/context_test.go b/go/base/context_test.go index ffbc174a4..35b1b4304 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -326,3 +326,22 @@ func TestSetAbortError_ThreadSafe(t *testing.T) { t.Errorf("Stored error %v not in list of sent errors", got) } } + +func TestSetCutOverLockTimeoutSecondsRangeByMode(t *testing.T) { + { + ctx := NewMigrationContext() + require.NoError(t, ctx.SetCutOverLockTimeoutSeconds(10)) + err := ctx.SetCutOverLockTimeoutSeconds(11) + require.Error(t, err) + require.Contains(t, err.Error(), "maximal timeout is 10sec") + } + + { + ctx := NewMigrationContext() + ctx.MoveTables.TableNames = []string{"tbl"} + require.NoError(t, ctx.SetCutOverLockTimeoutSeconds(60)) + err := ctx.SetCutOverLockTimeoutSeconds(61) + require.Error(t, err) + require.Contains(t, err.Error(), "maximal timeout is 60sec") + } +} diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index fc8ec2634..c88894926 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -198,6 +198,12 @@ func main() { flag.CommandLine.SetOutput(os.Stdout) flag.Parse() + cutOverLockTimeoutUserSpecified := false + flag.Visit(func(f *flag.Flag) { + if f.Name == "cut-over-lock-timeout-seconds" { + cutOverLockTimeoutUserSpecified = true + } + }) if *checkFlag { return @@ -364,6 +370,9 @@ func main() { if migrationContext.MoveTables.TargetHost == "" { log.Fatal("--target-host must be specified when using --move-tables") } + if migrationContext.PostponeCutOverFlagFile == "" { + log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables") + } migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",") for i := range migrationContext.MoveTables.TableNames { migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i]) @@ -384,6 +393,9 @@ func main() { if migrationContext.MoveTables.TargetDatabase == "" { migrationContext.MoveTables.TargetDatabase = migrationContext.DatabaseName } + if !cutOverLockTimeoutUserSpecified { + *cutOverLockTimeoutSeconds = 60 + } migrationContext.MoveTables.ConnectionConfig = mysql.NewConnectionConfig() } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 8a53b7302..b6432947b 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -27,6 +27,10 @@ var ( ErrMigrationNotAllowedOnMaster = errors.New("it seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master") RetrySleepFn = time.Sleep checkpointTimeout = 2 * time.Second + + // moveTablesCutOverDrainPollInterval is the per-iteration sleep in T3's + // drain poll. 100ms per move_table_mode.md §1.5. + moveTablesCutOverDrainPollInterval = 100 * time.Millisecond ) type ChangelogState string @@ -838,6 +842,15 @@ func (mgtr *Migrator) MoveTables() (err error) { if err := mgtr.initiateApplier(); err != nil { return err } + if err := mgtr.checkAbort(); err != nil { + return err + } + if err := mgtr.createFlagFiles(); err != nil { + return err + } + if err := mgtr.checkAbort(); err != nil { + return err + } if err := mgtr.initiateStreaming(); err != nil { return err } @@ -899,14 +912,13 @@ func (mgtr *Migrator) MoveTables() (err error) { return err } - //TODO: cutover here + if err := mgtr.moveTablesCutOver(); err != nil { + return err + } if err := mgtr.finalCleanup(); err != nil { return nil } - if err := mgtr.hooksExecutor.OnSuccess(false); err != nil { - return err - } mgtr.migrationContext.Log.Infof("Done moving tables %v from %s to %s (%s)", mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName), sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), mgtr.migrationContext.MoveTables.TargetHost) @@ -917,6 +929,167 @@ func (mgtr *Migrator) MoveTables() (err error) { return nil } +// moveTablesCutOver orchestrates the cooperative cutover protocol for move-tables +// mode. It implements the T0-T6 transitions described in +// docs/learning/design-refs/coop_cutover.md §1.3. +// +// NOT the standard cutOver() path: every internal call from cutOver() (throttle, +// atomicCutOver, waitForEventsUpToLock, heartbeat-lag) was built on a +// single-server assumption that no longer holds when the applier writes target +// and the streamer reads source. Each is replaced or dropped here. +// +// Crash safety (persisting the drain GTID before T3) is #8210. Enriched hook +// env vars (GH_OST_DRAIN_GTID, GH_OST_TARGET_*) are #8211. Target-side +// throttling is #8212. None of those are wired here. +func (mgtr *Migrator) moveTablesCutOver() (err error) { + if mgtr.migrationContext.Noop { + mgtr.migrationContext.Log.Debugf("Noop operation; not really moving tables") + return nil + } + + // ----- Postpone gate (precedes T0) ----- + // Mirrors standard cutOver()'s sleepWhileTrue postpone structure but DROPS the + // heartbeat-lag branch: move-tables mode disables _ghc heartbeat writes (#8206), + // so TimeSinceLastHeartbeatOnChangelog() returns ~58 years (time.Since(zero)) + // and would deadlock the gate forever. KEEPS the postpone-flag-file + + // unpostpone-socket gate because per coop_cutover.md §1.1 P4, operator-removes- + // postpone is the trigger for the entire cutover phase. + mgtr.migrationContext.Log.Debugf("checking for cut-over postpone") + if err := mgtr.sleepWhileTrue(func() (bool, error) { + if mgtr.migrationContext.PostponeCutOverFlagFile == "" { + return false, nil + } + if atomic.LoadInt64(&mgtr.migrationContext.UserCommandedUnpostponeFlag) > 0 { + atomic.StoreInt64(&mgtr.migrationContext.UserCommandedUnpostponeFlag, 0) + return false, nil + } + if base.FileExists(mgtr.migrationContext.PostponeCutOverFlagFile) { + if atomic.LoadInt64(&mgtr.migrationContext.IsPostponingCutOver) == 0 { + if err := mgtr.hooksExecutor.OnBeginPostponed(); err != nil { + return true, err + } + } + atomic.StoreInt64(&mgtr.migrationContext.IsPostponingCutOver, 1) + return true, nil + } + return false, nil + }); err != nil { + return err + } + atomic.StoreInt64(&mgtr.migrationContext.IsPostponingCutOver, 0) + mgtr.migrationContext.Log.Debugf("checking for cut-over postpone: complete") + + // ----- T0: on-before-cut-over hook ----- + // Non-zero hook exit aborts cutover BEFORE any source DDL fires. + if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil { + return fmt.Errorf("on-before-cut-over hook failed: %w", err) + } + + // ----- T1 + T2: RENAME then capture @@gtid_executed on the same connection ----- + // Pin both operations to a single *sql.Conn so MySQL's within-session + // ordering guarantee makes it impossible for T2 to observe a state that + // pre-dates T1's commit. Using mgtr.inspector.db directly would let the + // pool schedule T1 and T2 on different underlying TCP connections (or, with + // a proxy, different servers), breaking the happens-before relationship. + // + // No retry on the RENAME: it is not idempotent — a partial success leaves + // the table already renamed and a retry would fail. The operator re-runs + // the whole hook chain on failure. + pinnedConn, err := mgtr.inspector.db.Conn(context.Background()) + if err != nil { + return fmt.Errorf("failed to pin connection for T1/T2: %w", err) + } + defer pinnedConn.Close() + + sourceDB := mgtr.migrationContext.DatabaseName + sourceTable := mgtr.migrationContext.OriginalTableName + delTable := mgtr.migrationContext.GetOldTableName() + renameQuery := fmt.Sprintf("RENAME TABLE %s.%s TO %s.%s", + sql.EscapeName(sourceDB), sql.EscapeName(sourceTable), + sql.EscapeName(sourceDB), sql.EscapeName(delTable)) + mgtr.migrationContext.Log.Infof("T1: renaming source table: %s", renameQuery) + if _, err := pinnedConn.ExecContext(context.Background(), renameQuery); err != nil { + return fmt.Errorf("RENAME failed: %w", err) + } + + // ----- T2: capture @@gtid_executed on the SAME connection as T1 ----- + // @@GLOBAL scope is explicit so the intent is unambiguous in the SQL itself. + // Design: https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md#32-correctness-verification-for-p4 + var drainGTIDStr string + if err := pinnedConn.QueryRowContext(context.Background(), "select @@gtid_executed").Scan(&drainGTIDStr); err != nil { + return fmt.Errorf("drain GTID capture failed: %w", err) + } + drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) + if err != nil { + return fmt.Errorf("drain GTID parse failed: %w", err) + } + mgtr.migrationContext.Log.Infof("T2: captured drain GTID: %s", drainGTID.DisplayString()) + + // ----- T3: drain poll ----- + // Wait until applier.CurrentCoordinates catches up to drainGTID. The drain + // is complete when the applier's coords are not strictly smaller than the + // drain target (i.e. the applier contains every GTID in drainGTID). Reads + // of CurrentCoordinates hold the mutex per applier.go:75. Per-iteration + // logging is Debug only to avoid spamming Info on a hot loop. + drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second + mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)", + drainTimeout, moveTablesCutOverDrainPollInterval) + drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout) + defer cancel() + ticker := time.NewTicker(moveTablesCutOverDrainPollInterval) + defer ticker.Stop() + for { + if err := mgtr.checkAbort(); err != nil { + return err + } + mgtr.applier.CurrentCoordinatesMutex.Lock() + applierCoords := mgtr.applier.CurrentCoordinates + mgtr.applier.CurrentCoordinatesMutex.Unlock() + applyBacklog := len(mgtr.applyEventsQueue) + streamerBacklog := 0 + if mgtr.eventsStreamer != nil { + streamerBacklog = len(mgtr.eventsStreamer.eventsChannel) + } + if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) && applyBacklog == 0 && streamerBacklog == 0 { + mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID") + break + } + if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) { + mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)", applyBacklog, streamerBacklog) + } else { + mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling") + } + select { + case <-drainCtx.Done(): + return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", drainTimeout) + case <-ticker.C: + // next iteration + } + } + + // ----- T4: set CutOverCompleteFlag ----- + // MUST be set before T5 so the streamer's canStopStreaming loop (migrator.go:256) + // can wind down in parallel with the (potentially slow) on-success hook. + // Forgetting this has no visible failure at the call site — the run silently + // hangs after cutover because eventsStreamer.StreamEvents() never returns. + atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) + mgtr.migrationContext.Log.Debugf("T4: CutOverCompleteFlag set") + + // ----- T5: on-success hook ----- + // Hook unlocks user_rw@target via db-user-management and flips the + // write_cutover? feature flag. Standard env vars only — GH_OST_DRAIN_GTID + + // GH_OST_TARGET_* are #8211 (1.7), not this PR. The pre-protocol placeholder + // OnSuccess call that used to live in MoveTables() (after finalCleanup) has + // been removed so the hook fires in the order coop_cutover.md §3.2 step 6 + // requires (T5 between T4 and T6, BEFORE finalCleanup). + if err := mgtr.hooksExecutor.OnSuccess(false); err != nil { + return fmt.Errorf("on-success hook failed: %w", err) + } + + // ----- T6: return nil ----- + return nil +} + // ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external // hook access point func (mgtr *Migrator) ExecOnFailureHook() (err error) { diff --git a/go/logic/migrator_move_tables_cutover_test.go b/go/logic/migrator_move_tables_cutover_test.go new file mode 100644 index 000000000..22426be7d --- /dev/null +++ b/go/logic/migrator_move_tables_cutover_test.go @@ -0,0 +1,366 @@ +package logic + +import ( + "context" + gosql "database/sql" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/mysql" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/testcontainers/testcontainers-go" + testmysql "github.com/testcontainers/testcontainers-go/modules/mysql" +) + +// ----------------------------------------------------------------------------- +// Pure unit tests - no MySQL. These exercise the orchestration branches that +// run BEFORE T1's RENAME, so they do not require a real inspector.db. Per the +// Option A decision in commit 3's plan, the "RENAME was not attempted" check +// is a proxy assertion: m.inspector is nil, so if T1 were reached the test +// would panic instead of silently passing. +// ----------------------------------------------------------------------------- + +// TestMoveTablesCutOver_NoopShortCircuits maps to the Noop semantics decision +// in #8209-implement-protocol (Noop returns immediately, no postpone gate, no +// hooks, no RENAME). +func TestMoveTablesCutOver_NoopShortCircuits(t *testing.T) { + var calls []string + fakeHooks := &recordingHooks{name: "fake", calls: &calls} + + ctx := base.NewMigrationContext() + ctx.Noop = true + ctx.Hooks = fakeHooks + + m := NewMigrator(ctx, "test") + + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.CutOverCompleteFlag), "pre-state: flag must be 0") + require.Empty(t, calls, "pre-state: no hooks recorded") + + require.NoError(t, m.moveTablesCutOver()) + + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.CutOverCompleteFlag), + "post-state: Noop must not set CutOverCompleteFlag") + require.Empty(t, calls, "post-state: Noop must not fire any hook") +} + +// TestMoveTablesCutOver_OnBeforeCutOverHookAbortsBeforeRename maps to T0 in +// coop_cutover.md section 1.3 ("non-zero return code aborts cutover"). The "aborts +// BEFORE source DDL" assertion is enforced as a proxy: m.inspector is nil, so +// if T1 RENAME executed via mgtr.inspector.db, this test would panic. +func TestMoveTablesCutOver_OnBeforeCutOverHookAbortsBeforeRename(t *testing.T) { + var calls []string + boom := errors.New("hook says no") + fakeHooks := &recordingHooks{name: "fake", calls: &calls, errOn: "OnBeforeCutOver", errVal: boom} + + ctx := base.NewMigrationContext() + ctx.Hooks = fakeHooks + ctx.DatabaseName = "test" + ctx.OriginalTableName = "t" + + m := NewMigrator(ctx, "test") + + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.CutOverCompleteFlag), "pre-state: flag must be 0") + require.Empty(t, calls, "pre-state: no hooks recorded") + + err := m.moveTablesCutOver() + require.Error(t, err) + require.ErrorIs(t, err, boom) + require.Contains(t, err.Error(), "on-before-cut-over hook failed") + + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.CutOverCompleteFlag), + "post-state: T0 abort must leave CutOverCompleteFlag unset") + require.Equal(t, []string{"fake:OnBeforeCutOver"}, calls, + "post-state: only the failing T0 hook fires; no OnSuccess, no OnBeginPostponed") +} + +// TestMoveTablesCutOver_PostponeGateFiresOnBeginPostponedOnce maps to the +// postpone-gate decision in #8209-implement-protocol (keep OnBeginPostponed +// firing logic with the same once-per-cutover semantics as standard cutOver). +// Also exercises Edge Case Test Quality #5 by asserting both pre-state and +// post-state of IsPostponingCutOver. +func TestMoveTablesCutOver_PostponeGateFiresOnBeginPostponedOnce(t *testing.T) { + flagPath := filepath.Join(t.TempDir(), "postpone.flag") + require.NoError(t, os.WriteFile(flagPath, nil, 0o644)) + + var calls []string + boom := errors.New("we got past the gate") + fakeHooks := &recordingHooks{name: "fake", calls: &calls, errOn: "OnBeforeCutOver", errVal: boom} + + ctx := base.NewMigrationContext() + ctx.PostponeCutOverFlagFile = flagPath + ctx.Hooks = fakeHooks + + m := NewMigrator(ctx, "test") + + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.IsPostponingCutOver), "pre-state: gate flag must be 0") + require.FileExists(t, flagPath, "pre-state: postpone flag file present") + require.Empty(t, calls, "pre-state: no hooks recorded") + + // Remove the flag file during the gate's first 1s sleep so the next + // poll exits cleanly. sleepWhileTrue uses time.Sleep(1 * time.Second). + go func() { + time.Sleep(500 * time.Millisecond) + _ = os.Remove(flagPath) + }() + + err := m.moveTablesCutOver() + require.ErrorIs(t, err, boom, "expect to bail at T0 hook after gate releases") + + onBegin := 0 + for _, c := range calls { + if c == "fake:OnBeginPostponed" { + onBegin++ + } + } + require.Equal(t, 1, onBegin, + "post-state: OnBeginPostponed must fire exactly once per cutover (idempotent via IsPostponingCutOver)") + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.IsPostponingCutOver), + "post-state: gate must reset IsPostponingCutOver to 0 after exit") + require.Contains(t, calls, "fake:OnBeforeCutOver", + "post-state: T0 hook must fire after gate releases") + require.Equal(t, int64(0), atomic.LoadInt64(&ctx.CutOverCompleteFlag), + "post-state: hook failure must leave CutOverCompleteFlag unset") +} + +// ----------------------------------------------------------------------------- +// Integration tests - real MySQL via testcontainers, exercise T1/T2/T3. +// +// These live under a dedicated suite (NOT MigratorTestSuite) so the parent +// function name TestMoveTablesCutOver matches the `-run MoveTablesCutOver` +// verifier alongside the pure unit tests above. SetupSuite duplicates the +// testcontainer setup pattern used by MigratorTestSuite intentionally to +// keep this commit's diff strictly additive (no edits to existing tests or +// shared helpers, per the commit-3 prompt). +// +// Known-environmental flakes when Docker/testcontainers is unavailable mirror +// the same class as #8206; they are not regressions of #8209. +// ----------------------------------------------------------------------------- + +type MoveTablesCutOverSuite struct { + suite.Suite + mysqlContainer testcontainers.Container + db *gosql.DB +} + +func (s *MoveTablesCutOverSuite) SetupSuite() { + ctx := context.Background() + mysqlContainer, err := testmysql.Run(ctx, + testMysqlContainerImage, + testmysql.WithDatabase(testMysqlDatabase), + testmysql.WithUsername(testMysqlUser), + testmysql.WithPassword(testMysqlPass), + testmysql.WithConfigFile("my.cnf.test"), + ) + s.Require().NoError(err) + s.mysqlContainer = mysqlContainer + + dsn, err := mysqlContainer.ConnectionString(ctx) + s.Require().NoError(err) + db, err := gosql.Open("mysql", dsn) + s.Require().NoError(err) + s.db = db +} + +func (s *MoveTablesCutOverSuite) TearDownSuite() { + s.Assert().NoError(s.db.Close()) + s.Assert().NoError(testcontainers.TerminateContainer(s.mysqlContainer)) +} + +func (s *MoveTablesCutOverSuite) SetupTest() { + _, err := s.db.ExecContext(context.Background(), "CREATE DATABASE IF NOT EXISTS "+testMysqlDatabase) + s.Require().NoError(err) +} + +func (s *MoveTablesCutOverSuite) TearDownTest() { + ctx := context.Background() + _, _ = s.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestTableName()) + _, _ = s.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestOldTableName()) +} + +// containingDrainGTID returns a fabricated GTID set guaranteed to contain any +// GTID the test container will assign during the test. RENAME's GTID will be +// a strict subset of this range, so T3's containment poll passes on iteration 1. +func (s *MoveTablesCutOverSuite) containingDrainGTID() *mysql.GTIDBinlogCoordinates { + var serverUUID string + s.Require().NoError(s.db.QueryRow("SELECT @@server_uuid").Scan(&serverUUID)) + g, err := mysql.NewGTIDBinlogCoordinates(fmt.Sprintf("%s:1-99999999", serverUUID)) + s.Require().NoError(err) + return g +} + +// buildMigrator wires a Migrator with the test container's *sql.DB pinned to +// inspector.db and a fresh Applier. initialCoords may be nil for the drain- +// timeout case. +func (s *MoveTablesCutOverSuite) buildMigrator(fakeHooks *recordingHooks, initialCoords mysql.BinlogCoordinates) (*Migrator, *base.MigrationContext) { + ctx := context.Background() + connectionConfig, err := getTestConnectionConfig(ctx, s.mysqlContainer) + s.Require().NoError(err) + + mc := newTestMigrationContext() + mc.ApplierConnectionConfig = connectionConfig + mc.InspectorConnectionConfig = connectionConfig + mc.SetConnectionConfig("innodb") + mc.Hooks = fakeHooks + + m := NewMigrator(mc, "test") + m.inspector = &Inspector{db: s.db, migrationContext: mc} + m.applier = NewApplier(mc) + if initialCoords != nil { + m.applier.CurrentCoordinatesMutex.Lock() + m.applier.CurrentCoordinates = initialCoords + m.applier.CurrentCoordinatesMutex.Unlock() + } + return m, mc +} + +// TestHappyPath drives the full T0-T6 protocol against the test container. +// Asserts hook ordering (T0 then T5), T4 flag set, and the source-side rename. +// Maps to acceptance criterion #8209 "RENAME executes; drain completes; +// CutOverCompleteFlag set; on-success hook fires". +func (s *MoveTablesCutOverSuite) TestHappyPath() { + ctx := context.Background() + _, err := s.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY)", getTestTableName())) + s.Require().NoError(err) + + var calls []string + fakeHooks := &recordingHooks{name: "fake", calls: &calls} + m, mc := s.buildMigrator(fakeHooks, s.containingDrainGTID()) + + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), "pre-state: flag must be 0") + s.Require().Empty(calls, "pre-state: no hooks recorded") + + s.Require().NoError(m.moveTablesCutOver()) + + s.Require().Equal(int64(1), atomic.LoadInt64(&mc.CutOverCompleteFlag), + "post-state: T4 must set CutOverCompleteFlag before T5/T6") + s.Require().Equal([]string{"fake:OnBeforeCutOver", "fake:OnSuccess"}, calls, + "post-state: T0 hook precedes T5 hook") + + // Source-side post-state (Edge Case Test Quality #5: assert both sides). + var renamed string + s.Require().NoError(s.db.QueryRow(fmt.Sprintf("SHOW TABLES IN %s LIKE '_%s_del'", + testMysqlDatabase, testMysqlTableName)).Scan(&renamed)) + s.Require().Equal("_"+testMysqlTableName+"_del", renamed) + + err = s.db.QueryRow(fmt.Sprintf("SHOW TABLES IN %s LIKE '%s'", + testMysqlDatabase, testMysqlTableName)).Scan(&renamed) + s.Require().ErrorIs(err, gosql.ErrNoRows, "original table must no longer exist under its old name") +} + +// TestRenameFailurePropagates maps to T1 failure handling: no source table -> +// RENAME errors. Verify the wrapped error, no CutOverCompleteFlag set, no +// OnSuccess fired. +func (s *MoveTablesCutOverSuite) TestRenameFailurePropagates() { + // Deliberately no CREATE TABLE. + var calls []string + fakeHooks := &recordingHooks{name: "fake", calls: &calls} + m, mc := s.buildMigrator(fakeHooks, s.containingDrainGTID()) + + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), "pre-state: flag must be 0") + + err := m.moveTablesCutOver() + s.Require().Error(err) + s.Require().Contains(err.Error(), "RENAME failed") + + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), + "post-state: RENAME failure must leave CutOverCompleteFlag unset") + for _, c := range calls { + s.Require().NotEqual("fake:OnSuccess", c, "OnSuccess must not fire when RENAME fails") + } + s.Require().Equal([]string{"fake:OnBeforeCutOver"}, calls, + "post-state: only T0 fires before the failed RENAME") +} + +// TestDrainTimeoutPropagates maps to T3 timeout handling: applier coordinates +// never reach the drain GTID -> drain poll bounded by CutOverLockTimeoutSeconds +// returns a wrapped error and the flag is not set. +func (s *MoveTablesCutOverSuite) TestDrainTimeoutPropagates() { + ctx := context.Background() + _, err := s.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY)", getTestTableName())) + s.Require().NoError(err) + + // Patch poll interval and use a 1-second drain timeout for a bounded test. + origPoll := moveTablesCutOverDrainPollInterval + moveTablesCutOverDrainPollInterval = 50 * time.Millisecond + s.T().Cleanup(func() { + moveTablesCutOverDrainPollInterval = origPoll + }) + + var calls []string + fakeHooks := &recordingHooks{name: "fake", calls: &calls} + // initialCoords nil - drain comparison never satisfies. + m, mc := s.buildMigrator(fakeHooks, nil) + mc.CutOverLockTimeoutSeconds = 1 + + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), "pre-state: flag must be 0") + + err = m.moveTablesCutOver() + s.Require().Error(err) + s.Require().Contains(err.Error(), "drain poll timed out") + s.Require().True(strings.HasPrefix(err.Error(), "drain poll timed out"), + "drain timeout error must name the drain") + + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), + "post-state: drain timeout must abort before T4 flag set") + for _, c := range calls { + s.Require().NotEqual("fake:OnSuccess", c, "OnSuccess must not fire on drain timeout") + } + s.Require().Equal([]string{"fake:OnBeforeCutOver"}, calls, + "post-state: only T0 fires before the drain loop") +} + +// TestDrainWaitsForQueuedDML ensures T3 does not declare success just because +// applier.CurrentCoordinates already contains the drain GTID while there is +// still source-table DML queued for application. +func (s *MoveTablesCutOverSuite) TestDrainWaitsForQueuedDML() { + ctx := context.Background() + _, err := s.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY)", getTestTableName())) + s.Require().NoError(err) + + origPoll := moveTablesCutOverDrainPollInterval + moveTablesCutOverDrainPollInterval = 50 * time.Millisecond + s.T().Cleanup(func() { + moveTablesCutOverDrainPollInterval = origPoll + }) + + var calls []string + fakeHooks := &recordingHooks{name: "fake", calls: &calls} + m, mc := s.buildMigrator(fakeHooks, s.containingDrainGTID()) + mc.CutOverLockTimeoutSeconds = 1 + m.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + }, + Coordinates: s.containingDrainGTID(), + }) + + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), "pre-state: flag must be 0") + err = m.moveTablesCutOver() + s.Require().Error(err) + s.Require().Contains(err.Error(), "drain poll timed out") + s.Require().Equal(int64(0), atomic.LoadInt64(&mc.CutOverCompleteFlag), + "post-state: queued DML must keep T3 from reaching T4") + for _, c := range calls { + s.Require().NotEqual("fake:OnSuccess", c, "OnSuccess must not fire while backlog remains") + } + s.Require().Equal([]string{"fake:OnBeforeCutOver"}, calls, + "post-state: only T0 fires before the drain loop times out") +} + +func TestMoveTablesCutOver(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration suite in short mode") + } + suite.Run(t, new(MoveTablesCutOverSuite)) +} diff --git a/script/move-tables/README.md b/script/move-tables/README.md index 52d138d09..db57f750a 100644 --- a/script/move-tables/README.md +++ b/script/move-tables/README.md @@ -24,157 +24,10 @@ script/build --cli Run gh-ost to move tables: ```bash -./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --execute --verbose +./script/build --cli; ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3307 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --postpone-cut-over-flag-file=/tmp/ghost-move-tables.postpone.flag --execute --verbose ``` -### WIP - -Current state based on the current outer dev loop: - - -```bash - - -\u2718 \e[2m\u2388 (\u2205) gh-ost:(move-tables/1.2-skip-ghost-tables) -> rm /tmp/gh-ost.gh_ost_test_db..sock; ./script/build --cli && ./bin/gh-ost --move-tables=gh_ost_test --host=localhost --port=3308 --user root --password opensesame --database=gh_ost_test_db --target-host=localhost --target-port=3309 --target-user root --target-password opensesame --target-database=gh_ost_test_db --execute --verbose -rm: /tmp/gh-ost.gh_ost_test_db..sock: No such file or directory -go version go1.25.9 darwin/arm64 found in : Go Binary: /opt/homebrew/bin/go -++ '[' '!' -L .gopath/src/github.com/github/gh-ost ']' -++ export GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath:/Users/chriskirkland/git/src/github.com/github/gh-ost/.vendor -++ GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath:/Users/chriskirkland/git/src/github.com/github/gh-ost/.vendor -+ mkdir -p bin -+ bindir=/Users/chriskirkland/git/src/github.com/github/gh-ost/bin -+ scriptdir=/Users/chriskirkland/git/src/github.com/github/gh-ost/script -++ git rev-parse HEAD -+ version=0508dd782e1871de9dcaa51d3f59e5ba4cd92117 -++ git describe --tags --always --dirty -+ describe=v1.1.9-19-g0508dd78-dirty -+ export GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath -+ GOPATH=/Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath -+ cd .gopath/src/github.com/github/gh-ost -+ go build -o /Users/chriskirkland/git/src/github.com/github/gh-ost/bin/gh-ost -ldflags '-X main.AppVersion=0508dd782e1871de9dcaa51d3f59e5ba4cd92117 -X main.BuildDescribe=v1.1.9-19-g0508dd78-dirty' ./go/cmd/gh-ost/main.go -2026-06-01 16:41:13 INFO starting gh-ost 0508dd782e1871de9dcaa51d3f59e5ba4cd92117 (git commit: unknown) -2026-06-01 16:41:13 INFO Moving tables [gh_ost_test] from `gh_ost_test_db` to `gh_ost_test_db` (localhost) -2026-06-01 16:41:13 INFO inspector connection validated on localhost:3308 -2026-06-01 16:41:13 INFO User has SUPER, REPLICATION SLAVE privileges, and has ALL privileges on `gh_ost_test_db`.* -2026-06-01 16:41:13 INFO binary logs validated on localhost:3308 -2026-06-01 16:41:13 INFO Restarting replication on localhost:3308 to make sure binlog settings apply to replication thread -2026-06-01 16:41:13 INFO Inspector initiated on 3e162abb4a14:3308, version 8.0.41 -2026-06-01 16:41:13 INFO Inspector validating original table -2026-06-01 16:41:13 INFO Table found. Engine=InnoDB -2026-06-01 16:41:13 INFO Estimated number of rows via EXPLAIN: 20 -2026-06-01 16:41:13 INFO Inspector validated original table -2026-06-01 16:41:13 INFO Inspector inspected original table -2026-06-01 16:41:13 INFO log_slave_updates validated on localhost:3308 -2026-06-01 16:41:13 INFO Inspector validated and initialized -2026-06-01 16:41:13 INFO applier connection validated on localhost:3309 -2026-06-01 16:41:13 INFO applier connection validated on localhost:3309 -2026-06-01 16:41:13 INFO will use time_zone='SYSTEM' on applier -2026-06-01 16:41:13 INFO applier connection validated on localhost:3309 -2026-06-01 16:41:13 INFO Applier initiated on 381ee87dc2c6:3309, version 8.0.41 -2026-06-01 16:41:13 INFO Fetching create table statement for `gh_ost_test_db.gh_ost_test` -2026-06-01 16:41:13 INFO Create table statement: CREATE TABLE `gh_ost_test` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `column1` int NOT NULL, - `column2` smallint unsigned NOT NULL, - `column3` mediumint unsigned NOT NULL, - `column4` tinyint unsigned NOT NULL, - `column5` int NOT NULL, - `column6` int NOT NULL, - PRIMARY KEY (`id`), - KEY `c12_ix` (`column1`,`column2`) -) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci -2026-06-01 16:41:13 INFO Creating target table `gh_ost_test_db`.`gh_ost_test` -2026-06-01 16:41:13 INFO Target table created -2026-06-01 16:41:13 INFO streamer connection validated on localhost:3308 -[2026/06/01 16:41:13] [info] binlogsyncer.go:191 create BinlogSyncer with config {ServerID:99999 Flavor:mysql Host:localhost Port:3308 User:root Password: Localhost: Charset: SemiSyncEnabled:false RawModeEnabled:false TLSConfig: ParseTime:false TimestampStringLocation:UTC UseDecimal:true RecvBufferSize:0 HeartbeatPeriod:0s ReadTimeout:0s MaxReconnectAttempts:0 DisableRetrySync:false VerifyChecksum:false DumpCommandFlag:0 Option: Logger:0x14000494a20 Dialer:0x100c8c0c0 RowsEventDecodeFunc: TableMapOptionalMetaDecodeFunc: DiscardGTIDSet:false EventCacheCount:10240 SynchronousEventHandler:} -2026-06-01 16:41:13 INFO Connecting binlog streamer at mysql-bin.000003:2987908 -[2026/06/01 16:41:13] [info] binlogsyncer.go:443 begin to sync binlog from position (mysql-bin.000003, 2987908) -[2026/06/01 16:41:13] [info] binlogsyncer.go:409 Connected to mysql 8.0.41 server -2026-06-01 16:41:13 INFO Skipping stream of the changelog table [] -[2026/06/01 16:41:13] [info] binlogsyncer.go:868 rotate to (mysql-bin.000003, 2987908) -2026-06-01 16:41:13 INFO rotate to next log from mysql-bin.000003:0 to mysql-bin.000003 -2026-06-01 16:41:13 INFO Listening on unix socket file: /tmp/gh-ost.gh_ost_test_db..sock -2026-06-01 16:41:13 INFO Adding listener for gh_ost_test_db.gh_ost_test -2026-06-01 16:41:13 INFO Reading migration range according to key: PRIMARY ( - select /* gh-ost `gh_ost_test_db`.`gh_ost_test` */ `id` - from - `gh_ost_test_db`.`gh_ost_test` - force index (PRIMARY) - order by - `id` asc - limit 1) -2026-06-01 16:41:13 INFO Migration min values: [1] -2026-06-01 16:41:13 INFO Migration max values: [20] -2026-06-01 16:41:13 INFO Skipping throttling in move tables mode [] -# Migrating `gh_ost_test_db`.`gh_ost_test`; Target table is `gh_ost_test_db`.`gh_ost_test` -# Migrating 381ee87dc2c6:3309; inspecting 3e162abb4a14:3308; executing on Chriss-MBP-2 -# Migration started at Mon Jun 01 16:41:13 -0600 2026 -# chunk-size: 1000; max-lag-millis: 1500ms; dml-batch-size: 10; max-load: ; critical-load: ; nice-ratio: 0.000000 -# throttle-additional-flag-file: /tmp/gh-ost.throttle -# Serving on unix socket: /tmp/gh-ost.gh_ost_test_db..sock -Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 0s(total), 0s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A -2026-06-01 16:41:13 INFO Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 0s(total), 0s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A [] -Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 1s(total), 1s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A -2026-06-01 16:41:14 INFO [execWriteFuncs] Processing row copy function [] -2026-06-01 16:41:14 INFO Copy: 0/20 0.0%; Applied: 0; Backlog: 0/1000; Time: 1s(total), 1s(copy); streamer: mysql-bin.000003:0; Lag: 0.00s, HeartbeatLag: 9223372036.85s, State: migrating; ETA: N/A [] -2026-06-01 16:41:14 INFO ApplyIterationInsertQuery affected 20 rows -2026-06-01 16:41:14 INFO [execWriteFuncs] Processing row copy function [] -2026-06-01 16:41:14 INFO [execWriteFuncs] Processing row copy function [] -2026-06-01 16:41:14 INFO Row copy complete -2026-06-01 16:41:14 INFO Writing changelog state: Migrated -[2026/06/01 16:41:14] [info] binlogsyncer.go:225 syncer is closing... -2026-06-01 16:41:14 INFO StreamEvents encountered unexpected error: Sync was closed -github.com/go-mysql-org/go-mysql/replication.init - :1 -runtime.doInit1 - /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/proc.go:7670 -runtime.doInit - /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/proc.go:7637 -runtime.main - /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/proc.go:256 -runtime.goexit - /Users/chriskirkland/git/src/github.com/github/gh-ost/.gopath/pkg/mod/golang.org/toolchain@v0.0.1-go1.25.9.darwin-arm64/src/runtime/asm_arm64.s:1268 -[2026/06/01 16:41:14] [info] binlogsyncer.go:988 kill last connection id 405 -[2026/06/01 16:41:14] [info] binlogsyncer.go:255 syncer is closed -2026-06-01 16:41:14 INFO Closed streamer connection. err= -2026-06-01 16:41:14 INFO Done moving tables [gh_ost_test] from `gh_ost_test_db` to `gh_ost_test_db` (localhost) -2026-06-01 16:41:14 INFO Removing socket file: /tmp/gh-ost.gh_ost_test_db..sock -2026-06-01 16:41:14 INFO Tearing down inspector -2026-06-01 16:41:14 INFO Tearing down applier -2026-06-01 16:41:14 INFO Tearing down streamer -# Done - -``` - -:tada: :tada: :tada: :tada: -```bash - -\u2714 \e[2m\u2388 (\u2205) gh-ost-tablemove-poc:(chriskirkland/move-tables) -> ./script/move-tables/mysql-target-primary -D "gh_ost_test_db" -e "SELECT * FROM gh_ost_test;" -+----+---------+---------+---------+---------+------------+------------+ -| id | column1 | column2 | column3 | column4 | column5 | column6 | -+----+---------+---------+---------+---------+------------+------------+ -| 1 | 1001 | 100 | 500000 | 10 | 1700000001 | 1700000002 | -| 2 | 1002 | 200 | 600000 | 20 | 1700000003 | 1700000004 | -| 3 | 1003 | 300 | 700000 | 30 | 1700000005 | 1700000006 | -| 4 | 1004 | 400 | 800000 | 40 | 1700000007 | 1700000008 | -| 5 | 1005 | 500 | 900000 | 50 | 1700000009 | 1700000010 | -| 6 | 1006 | 600 | 1000000 | 60 | 1700000011 | 1700000012 | -| 7 | 1007 | 700 | 1100000 | 70 | 1700000013 | 1700000014 | -| 8 | 1008 | 800 | 1200000 | 80 | 1700000015 | 1700000016 | -| 9 | 1009 | 900 | 1300000 | 90 | 1700000017 | 1700000018 | -| 10 | 1010 | 1000 | 1400000 | 100 | 1700000019 | 1700000020 | -| 11 | 1011 | 1100 | 1500000 | 110 | 1700000021 | 1700000022 | -| 12 | 1012 | 1200 | 1600000 | 120 | 1700000023 | 1700000024 | -| 13 | 1013 | 1300 | 1700000 | 130 | 1700000025 | 1700000026 | -| 14 | 1014 | 1400 | 1800000 | 140 | 1700000027 | 1700000028 | -| 15 | 1015 | 1500 | 1900000 | 150 | 1700000029 | 1700000030 | -| 16 | 1016 | 1600 | 2000000 | 160 | 1700000031 | 1700000032 | -| 17 | 1017 | 1700 | 2100000 | 170 | 1700000033 | 1700000034 | -| 18 | 1018 | 1800 | 2200000 | 180 | 1700000035 | 1700000036 | -| 19 | 1019 | 1900 | 2300000 | 190 | 1700000037 | 1700000038 | -| 20 | 1020 | 2000 | 2400000 | 200 | 1700000039 | 1700000040 | -+----+---------+---------+---------+---------+------------+------------+ - -``` \ No newline at end of file +Note: replicas in this local topology are configured with `read_only=ON` and +`super_read_only=ON`. If you point `--host` at `mysql-source-replica` (3308), +the cutover `RENAME TABLE` step will fail by design. Use source primary (3307) +as the inspected host when you want cutover to rename on source. diff --git a/script/move-tables/insert-source-primary-loop b/script/move-tables/insert-source-primary-loop new file mode 100755 index 000000000..c5571ff1c --- /dev/null +++ b/script/move-tables/insert-source-primary-loop @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Continuously insert new rows into gh_ost_test on source primary. +# Usage: +# script/move-tables/insert-source-primary-loop [start_column1] [sleep_seconds] +# Example: +# script/move-tables/insert-source-primary-loop 100000 0.2 + +start_i="${1:-100000}" +delay="${2:-0.2}" +i="$start_i" + +echo "Starting continuous inserts on source primary. Press Ctrl+C to stop." +echo "start_column1=$start_i sleep_seconds=$delay" + +trap 'echo; echo "Stopped."; exit 0' INT TERM + +while true; do + ts="$(date +%s)" + + script/move-tables/mysql-source-primary -D gh_ost_test_db -e " + INSERT INTO gh_ost_test (column1, column2, column3, column4, column5, column6) + VALUES ($i, $((i % 65535)), $((i % 16777215)), $((i % 255)), $ts, $((ts + 1))); + " + + echo "inserted row: column1=$i ts=$ts" + i=$((i + 1)) + sleep "$delay" +done diff --git a/script/move-tables/setup b/script/move-tables/setup index 05fd1ece9..813e7203e 100755 --- a/script/move-tables/setup +++ b/script/move-tables/setup @@ -98,6 +98,9 @@ setup() { exec-mysql-source-replica -e "change master to master_host='mysql-source-primary', master_port=3307, master_user='repl', master_password='repl', master_auto_position=1;" exec-mysql-source-replica -e "start slave;" fi + # Keep replicas non-writable so local move-tables runs that point --host at + # a replica fail at cutover (RENAME) instead of mutating only the replica. + exec-mysql-source-replica -e "set global read_only=ON; set global super_read_only=ON;" echo "OK" # Setup replication for target cluster @@ -115,6 +118,7 @@ setup() { exec-mysql-target-replica -e "change master to master_host='mysql-target-primary', master_port=3309, master_user='repl', master_password='repl', master_auto_position=1;" exec-mysql-target-replica -e "start slave;" fi + exec-mysql-target-replica -e "set global read_only=ON; set global super_read_only=ON;" echo "OK" echo -n "Initializing '$DATABASE_NAME' database into each cluster..."