Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,12 @@ func (mctx *MigrationContext) SetCutOverLockTimeoutSeconds(timeoutSeconds int64)
if timeoutSeconds < 1 {
return fmt.Errorf("minimal timeout is 1sec. Timeout remains at %d", mctx.CutOverLockTimeoutSeconds)
}
if timeoutSeconds > 10 {
return fmt.Errorf("maximal timeout is 10sec. Timeout remains at %d", mctx.CutOverLockTimeoutSeconds)
maxTimeout := int64(10)
if mctx.IsMoveTablesMode() {
maxTimeout = 60
}
if timeoutSeconds > maxTimeout {
return fmt.Errorf("maximal timeout is %dsec. Timeout remains at %d", maxTimeout, mctx.CutOverLockTimeoutSeconds)
}
mctx.CutOverLockTimeoutSeconds = timeoutSeconds
return nil
Comment thread
zacharysierakowski marked this conversation as resolved.
Expand Down
19 changes: 19 additions & 0 deletions go/base/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,22 @@ func TestSetAbortError_ThreadSafe(t *testing.T) {
t.Errorf("Stored error %v not in list of sent errors", got)
}
}

func TestSetCutOverLockTimeoutSecondsRangeByMode(t *testing.T) {
{
ctx := NewMigrationContext()
require.NoError(t, ctx.SetCutOverLockTimeoutSeconds(10))
err := ctx.SetCutOverLockTimeoutSeconds(11)
require.Error(t, err)
require.Contains(t, err.Error(), "maximal timeout is 10sec")
}

{
ctx := NewMigrationContext()
ctx.MoveTables.TableNames = []string{"tbl"}
require.NoError(t, ctx.SetCutOverLockTimeoutSeconds(60))
err := ctx.SetCutOverLockTimeoutSeconds(61)
require.Error(t, err)
require.Contains(t, err.Error(), "maximal timeout is 60sec")
}
}
12 changes: 12 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ func main() {

flag.CommandLine.SetOutput(os.Stdout)
flag.Parse()
cutOverLockTimeoutUserSpecified := false
flag.Visit(func(f *flag.Flag) {
if f.Name == "cut-over-lock-timeout-seconds" {
cutOverLockTimeoutUserSpecified = true
}
})

if *checkFlag {
return
Expand Down Expand Up @@ -364,6 +370,9 @@ func main() {
if migrationContext.MoveTables.TargetHost == "" {
log.Fatal("--target-host must be specified when using --move-tables")
}
if migrationContext.PostponeCutOverFlagFile == "" {
log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables")
}
migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",")
for i := range migrationContext.MoveTables.TableNames {
migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i])
Expand All @@ -384,6 +393,9 @@ func main() {
if migrationContext.MoveTables.TargetDatabase == "" {
migrationContext.MoveTables.TargetDatabase = migrationContext.DatabaseName
}
if !cutOverLockTimeoutUserSpecified {
*cutOverLockTimeoutSeconds = 60
}
migrationContext.MoveTables.ConnectionConfig = mysql.NewConnectionConfig()
}

Expand Down
181 changes: 177 additions & 4 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var (
ErrMigrationNotAllowedOnMaster = errors.New("it seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master")
RetrySleepFn = time.Sleep
checkpointTimeout = 2 * time.Second

// moveTablesCutOverDrainPollInterval is the per-iteration sleep in T3's
// drain poll. 100ms per move_table_mode.md §1.5.
moveTablesCutOverDrainPollInterval = 100 * time.Millisecond
Comment thread
zacharysierakowski marked this conversation as resolved.
Comment thread
zacharysierakowski marked this conversation as resolved.
)

type ChangelogState string
Expand Down Expand Up @@ -838,6 +842,15 @@ func (mgtr *Migrator) MoveTables() (err error) {
if err := mgtr.initiateApplier(); err != nil {
return err
}
if err := mgtr.checkAbort(); err != nil {
return err
}
if err := mgtr.createFlagFiles(); err != nil {
return err
}
if err := mgtr.checkAbort(); err != nil {
return err
}
if err := mgtr.initiateStreaming(); err != nil {
return err
}
Expand Down Expand Up @@ -899,14 +912,13 @@ func (mgtr *Migrator) MoveTables() (err error) {
return err
}

//TODO: cutover here
if err := mgtr.moveTablesCutOver(); err != nil {
return err
}

if err := mgtr.finalCleanup(); err != nil {
return nil
}
Comment thread
zacharysierakowski marked this conversation as resolved.
if err := mgtr.hooksExecutor.OnSuccess(false); err != nil {
return err
}
mgtr.migrationContext.Log.Infof("Done moving tables %v from %s to %s (%s)",
mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName),
sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), mgtr.migrationContext.MoveTables.TargetHost)
Expand All @@ -917,6 +929,167 @@ func (mgtr *Migrator) MoveTables() (err error) {
return nil
}

// moveTablesCutOver orchestrates the cooperative cutover protocol for move-tables
// mode. It implements the T0-T6 transitions described in
// docs/learning/design-refs/coop_cutover.md §1.3.
Comment thread
zacharysierakowski marked this conversation as resolved.
//
// NOT the standard cutOver() path: every internal call from cutOver() (throttle,
// atomicCutOver, waitForEventsUpToLock, heartbeat-lag) was built on a
// single-server assumption that no longer holds when the applier writes target
// and the streamer reads source. Each is replaced or dropped here.
//
// Crash safety (persisting the drain GTID before T3) is #8210. Enriched hook
// env vars (GH_OST_DRAIN_GTID, GH_OST_TARGET_*) are #8211. Target-side
// throttling is #8212. None of those are wired here.
func (mgtr *Migrator) moveTablesCutOver() (err error) {
if mgtr.migrationContext.Noop {
mgtr.migrationContext.Log.Debugf("Noop operation; not really moving tables")
return nil
}

// ----- Postpone gate (precedes T0) -----
// Mirrors standard cutOver()'s sleepWhileTrue postpone structure but DROPS the
// heartbeat-lag branch: move-tables mode disables _ghc heartbeat writes (#8206),
// so TimeSinceLastHeartbeatOnChangelog() returns ~58 years (time.Since(zero))
// and would deadlock the gate forever. KEEPS the postpone-flag-file +
// unpostpone-socket gate because per coop_cutover.md §1.1 P4, operator-removes-
// postpone is the trigger for the entire cutover phase.
mgtr.migrationContext.Log.Debugf("checking for cut-over postpone")
if err := mgtr.sleepWhileTrue(func() (bool, error) {
if mgtr.migrationContext.PostponeCutOverFlagFile == "" {
Comment thread
zacharysierakowski marked this conversation as resolved.
return false, nil
}
if atomic.LoadInt64(&mgtr.migrationContext.UserCommandedUnpostponeFlag) > 0 {
atomic.StoreInt64(&mgtr.migrationContext.UserCommandedUnpostponeFlag, 0)
return false, nil
}
if base.FileExists(mgtr.migrationContext.PostponeCutOverFlagFile) {
if atomic.LoadInt64(&mgtr.migrationContext.IsPostponingCutOver) == 0 {
if err := mgtr.hooksExecutor.OnBeginPostponed(); err != nil {
return true, err
}
}
atomic.StoreInt64(&mgtr.migrationContext.IsPostponingCutOver, 1)
return true, nil
}
return false, nil
}); err != nil {
return err
}
atomic.StoreInt64(&mgtr.migrationContext.IsPostponingCutOver, 0)
mgtr.migrationContext.Log.Debugf("checking for cut-over postpone: complete")

// ----- T0: on-before-cut-over hook -----
// Non-zero hook exit aborts cutover BEFORE any source DDL fires.
if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil {
return fmt.Errorf("on-before-cut-over hook failed: %w", err)
}

// ----- T1 + T2: RENAME then capture @@gtid_executed on the same connection -----
// Pin both operations to a single *sql.Conn so MySQL's within-session
// ordering guarantee makes it impossible for T2 to observe a state that
// pre-dates T1's commit. Using mgtr.inspector.db directly would let the
// pool schedule T1 and T2 on different underlying TCP connections (or, with
// a proxy, different servers), breaking the happens-before relationship.
//
// No retry on the RENAME: it is not idempotent — a partial success leaves
// the table already renamed and a retry would fail. The operator re-runs
// the whole hook chain on failure.
pinnedConn, err := mgtr.inspector.db.Conn(context.Background())
if err != nil {
return fmt.Errorf("failed to pin connection for T1/T2: %w", err)
}
defer pinnedConn.Close()

sourceDB := mgtr.migrationContext.DatabaseName
sourceTable := mgtr.migrationContext.OriginalTableName
delTable := mgtr.migrationContext.GetOldTableName()
renameQuery := fmt.Sprintf("RENAME TABLE %s.%s TO %s.%s",
sql.EscapeName(sourceDB), sql.EscapeName(sourceTable),
sql.EscapeName(sourceDB), sql.EscapeName(delTable))
mgtr.migrationContext.Log.Infof("T1: renaming source table: %s", renameQuery)
if _, err := pinnedConn.ExecContext(context.Background(), renameQuery); err != nil {
return fmt.Errorf("RENAME failed: %w", err)
}

// ----- T2: capture @@gtid_executed on the SAME connection as T1 -----
// @@GLOBAL scope is explicit so the intent is unambiguous in the SQL itself.
// Design: https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md#32-correctness-verification-for-p4
var drainGTIDStr string
if err := pinnedConn.QueryRowContext(context.Background(), "select @@gtid_executed").Scan(&drainGTIDStr); err != nil {
Comment thread
zacharysierakowski marked this conversation as resolved.
return fmt.Errorf("drain GTID capture failed: %w", err)
}
drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr)
if err != nil {
return fmt.Errorf("drain GTID parse failed: %w", err)
}
mgtr.migrationContext.Log.Infof("T2: captured drain GTID: %s", drainGTID.DisplayString())

// ----- T3: drain poll -----
// Wait until applier.CurrentCoordinates catches up to drainGTID. The drain
// is complete when the applier's coords are not strictly smaller than the
// drain target (i.e. the applier contains every GTID in drainGTID). Reads
// of CurrentCoordinates hold the mutex per applier.go:75. Per-iteration
// logging is Debug only to avoid spamming Info on a hot loop.
drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second
mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)",
drainTimeout, moveTablesCutOverDrainPollInterval)
drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout)
defer cancel()
ticker := time.NewTicker(moveTablesCutOverDrainPollInterval)
defer ticker.Stop()
for {
if err := mgtr.checkAbort(); err != nil {
return err
}
mgtr.applier.CurrentCoordinatesMutex.Lock()
applierCoords := mgtr.applier.CurrentCoordinates
mgtr.applier.CurrentCoordinatesMutex.Unlock()
applyBacklog := len(mgtr.applyEventsQueue)
streamerBacklog := 0
if mgtr.eventsStreamer != nil {
streamerBacklog = len(mgtr.eventsStreamer.eventsChannel)
}
if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) && applyBacklog == 0 && streamerBacklog == 0 {
mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID")
break
}
if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) {
mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)", applyBacklog, streamerBacklog)
} else {
mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling")
}
select {
case <-drainCtx.Done():
return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", drainTimeout)
case <-ticker.C:
// next iteration
}
}

// ----- T4: set CutOverCompleteFlag -----
// MUST be set before T5 so the streamer's canStopStreaming loop (migrator.go:256)
// can wind down in parallel with the (potentially slow) on-success hook.
// Forgetting this has no visible failure at the call site — the run silently
// hangs after cutover because eventsStreamer.StreamEvents() never returns.
atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1)
mgtr.migrationContext.Log.Debugf("T4: CutOverCompleteFlag set")

// ----- T5: on-success hook -----
// Hook unlocks user_rw@target via db-user-management and flips the

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should revise these comments before merging this feature branch to the main branch.
They contain some info and URLs that are only relevant/accessible to GitHub internal users.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I mentioned the same thing!

// write_cutover? feature flag. Standard env vars only — GH_OST_DRAIN_GTID +
// GH_OST_TARGET_* are #8211 (1.7), not this PR. The pre-protocol placeholder
// OnSuccess call that used to live in MoveTables() (after finalCleanup) has
// been removed so the hook fires in the order coop_cutover.md §3.2 step 6
// requires (T5 between T4 and T6, BEFORE finalCleanup).
if err := mgtr.hooksExecutor.OnSuccess(false); err != nil {
return fmt.Errorf("on-success hook failed: %w", err)
}

// ----- T6: return nil -----
return nil
}

// ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external
// hook access point
func (mgtr *Migrator) ExecOnFailureHook() (err error) {
Expand Down
Loading
Loading