-
Notifications
You must be signed in to change notification settings - Fork 1.4k
feat(move-tables): build cooperative cutover orchestration (#8209) #1704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2db5332
60acf2c
54652ab
4aecc93
279b5a3
3047cda
287e8a9
17e3e47
732581d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,10 @@ var ( | |
| ErrMigrationNotAllowedOnMaster = errors.New("it seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master") | ||
| RetrySleepFn = time.Sleep | ||
| checkpointTimeout = 2 * time.Second | ||
|
|
||
| // moveTablesCutOverDrainPollInterval is the per-iteration sleep in T3's | ||
| // drain poll. 100ms per move_table_mode.md §1.5. | ||
| moveTablesCutOverDrainPollInterval = 100 * time.Millisecond | ||
|
zacharysierakowski marked this conversation as resolved.
zacharysierakowski marked this conversation as resolved.
|
||
| ) | ||
|
|
||
| type ChangelogState string | ||
|
|
@@ -838,6 +842,15 @@ func (mgtr *Migrator) MoveTables() (err error) { | |
| if err := mgtr.initiateApplier(); err != nil { | ||
| return err | ||
| } | ||
| if err := mgtr.checkAbort(); err != nil { | ||
| return err | ||
| } | ||
| if err := mgtr.createFlagFiles(); err != nil { | ||
| return err | ||
| } | ||
| if err := mgtr.checkAbort(); err != nil { | ||
| return err | ||
| } | ||
| if err := mgtr.initiateStreaming(); err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -899,14 +912,13 @@ func (mgtr *Migrator) MoveTables() (err error) { | |
| return err | ||
| } | ||
|
|
||
| //TODO: cutover here | ||
| if err := mgtr.moveTablesCutOver(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := mgtr.finalCleanup(); err != nil { | ||
| return nil | ||
| } | ||
|
zacharysierakowski marked this conversation as resolved.
|
||
| if err := mgtr.hooksExecutor.OnSuccess(false); err != nil { | ||
| return err | ||
| } | ||
| mgtr.migrationContext.Log.Infof("Done moving tables %v from %s to %s (%s)", | ||
| mgtr.migrationContext.MoveTables.TableNames, sql.EscapeName(mgtr.migrationContext.DatabaseName), | ||
| sql.EscapeName(mgtr.migrationContext.GetTargetDatabaseName()), mgtr.migrationContext.MoveTables.TargetHost) | ||
|
|
@@ -917,6 +929,167 @@ func (mgtr *Migrator) MoveTables() (err error) { | |
| return nil | ||
| } | ||
|
|
||
| // moveTablesCutOver orchestrates the cooperative cutover protocol for move-tables | ||
| // mode. It implements the T0-T6 transitions described in | ||
| // docs/learning/design-refs/coop_cutover.md §1.3. | ||
|
zacharysierakowski marked this conversation as resolved.
|
||
| // | ||
| // NOT the standard cutOver() path: every internal call from cutOver() (throttle, | ||
| // atomicCutOver, waitForEventsUpToLock, heartbeat-lag) was built on a | ||
| // single-server assumption that no longer holds when the applier writes target | ||
| // and the streamer reads source. Each is replaced or dropped here. | ||
| // | ||
| // Crash safety (persisting the drain GTID before T3) is #8210. Enriched hook | ||
| // env vars (GH_OST_DRAIN_GTID, GH_OST_TARGET_*) are #8211. Target-side | ||
| // throttling is #8212. None of those are wired here. | ||
| func (mgtr *Migrator) moveTablesCutOver() (err error) { | ||
| if mgtr.migrationContext.Noop { | ||
| mgtr.migrationContext.Log.Debugf("Noop operation; not really moving tables") | ||
| return nil | ||
| } | ||
|
|
||
| // ----- Postpone gate (precedes T0) ----- | ||
| // Mirrors standard cutOver()'s sleepWhileTrue postpone structure but DROPS the | ||
| // heartbeat-lag branch: move-tables mode disables _ghc heartbeat writes (#8206), | ||
| // so TimeSinceLastHeartbeatOnChangelog() returns ~58 years (time.Since(zero)) | ||
| // and would deadlock the gate forever. KEEPS the postpone-flag-file + | ||
| // unpostpone-socket gate because per coop_cutover.md §1.1 P4, operator-removes- | ||
| // postpone is the trigger for the entire cutover phase. | ||
| mgtr.migrationContext.Log.Debugf("checking for cut-over postpone") | ||
| if err := mgtr.sleepWhileTrue(func() (bool, error) { | ||
| if mgtr.migrationContext.PostponeCutOverFlagFile == "" { | ||
|
zacharysierakowski marked this conversation as resolved.
|
||
| return false, nil | ||
| } | ||
| if atomic.LoadInt64(&mgtr.migrationContext.UserCommandedUnpostponeFlag) > 0 { | ||
| atomic.StoreInt64(&mgtr.migrationContext.UserCommandedUnpostponeFlag, 0) | ||
| return false, nil | ||
| } | ||
| if base.FileExists(mgtr.migrationContext.PostponeCutOverFlagFile) { | ||
| if atomic.LoadInt64(&mgtr.migrationContext.IsPostponingCutOver) == 0 { | ||
| if err := mgtr.hooksExecutor.OnBeginPostponed(); err != nil { | ||
| return true, err | ||
| } | ||
| } | ||
| atomic.StoreInt64(&mgtr.migrationContext.IsPostponingCutOver, 1) | ||
| return true, nil | ||
| } | ||
| return false, nil | ||
| }); err != nil { | ||
| return err | ||
| } | ||
| atomic.StoreInt64(&mgtr.migrationContext.IsPostponingCutOver, 0) | ||
| mgtr.migrationContext.Log.Debugf("checking for cut-over postpone: complete") | ||
|
|
||
| // ----- T0: on-before-cut-over hook ----- | ||
| // Non-zero hook exit aborts cutover BEFORE any source DDL fires. | ||
| if err := mgtr.hooksExecutor.OnBeforeCutOver(); err != nil { | ||
| return fmt.Errorf("on-before-cut-over hook failed: %w", err) | ||
| } | ||
|
|
||
| // ----- T1 + T2: RENAME then capture @@gtid_executed on the same connection ----- | ||
| // Pin both operations to a single *sql.Conn so MySQL's within-session | ||
| // ordering guarantee makes it impossible for T2 to observe a state that | ||
| // pre-dates T1's commit. Using mgtr.inspector.db directly would let the | ||
| // pool schedule T1 and T2 on different underlying TCP connections (or, with | ||
| // a proxy, different servers), breaking the happens-before relationship. | ||
| // | ||
| // No retry on the RENAME: it is not idempotent — a partial success leaves | ||
| // the table already renamed and a retry would fail. The operator re-runs | ||
| // the whole hook chain on failure. | ||
| pinnedConn, err := mgtr.inspector.db.Conn(context.Background()) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to pin connection for T1/T2: %w", err) | ||
| } | ||
| defer pinnedConn.Close() | ||
|
|
||
| sourceDB := mgtr.migrationContext.DatabaseName | ||
| sourceTable := mgtr.migrationContext.OriginalTableName | ||
| delTable := mgtr.migrationContext.GetOldTableName() | ||
| renameQuery := fmt.Sprintf("RENAME TABLE %s.%s TO %s.%s", | ||
| sql.EscapeName(sourceDB), sql.EscapeName(sourceTable), | ||
| sql.EscapeName(sourceDB), sql.EscapeName(delTable)) | ||
| mgtr.migrationContext.Log.Infof("T1: renaming source table: %s", renameQuery) | ||
| if _, err := pinnedConn.ExecContext(context.Background(), renameQuery); err != nil { | ||
| return fmt.Errorf("RENAME failed: %w", err) | ||
| } | ||
|
|
||
| // ----- T2: capture @@gtid_executed on the SAME connection as T1 ----- | ||
| // @@GLOBAL scope is explicit so the intent is unambiguous in the SQL itself. | ||
| // Design: https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md#32-correctness-verification-for-p4 | ||
| var drainGTIDStr string | ||
| if err := pinnedConn.QueryRowContext(context.Background(), "select @@gtid_executed").Scan(&drainGTIDStr); err != nil { | ||
|
zacharysierakowski marked this conversation as resolved.
|
||
| return fmt.Errorf("drain GTID capture failed: %w", err) | ||
| } | ||
| drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr) | ||
| if err != nil { | ||
| return fmt.Errorf("drain GTID parse failed: %w", err) | ||
| } | ||
| mgtr.migrationContext.Log.Infof("T2: captured drain GTID: %s", drainGTID.DisplayString()) | ||
|
|
||
| // ----- T3: drain poll ----- | ||
| // Wait until applier.CurrentCoordinates catches up to drainGTID. The drain | ||
| // is complete when the applier's coords are not strictly smaller than the | ||
| // drain target (i.e. the applier contains every GTID in drainGTID). Reads | ||
| // of CurrentCoordinates hold the mutex per applier.go:75. Per-iteration | ||
| // logging is Debug only to avoid spamming Info on a hot loop. | ||
| drainTimeout := time.Duration(mgtr.migrationContext.CutOverLockTimeoutSeconds) * time.Second | ||
| mgtr.migrationContext.Log.Infof("T3: draining applier to drain GTID (timeout %s, poll %s)", | ||
| drainTimeout, moveTablesCutOverDrainPollInterval) | ||
| drainCtx, cancel := context.WithTimeout(context.Background(), drainTimeout) | ||
| defer cancel() | ||
| ticker := time.NewTicker(moveTablesCutOverDrainPollInterval) | ||
| defer ticker.Stop() | ||
| for { | ||
| if err := mgtr.checkAbort(); err != nil { | ||
| return err | ||
| } | ||
| mgtr.applier.CurrentCoordinatesMutex.Lock() | ||
| applierCoords := mgtr.applier.CurrentCoordinates | ||
| mgtr.applier.CurrentCoordinatesMutex.Unlock() | ||
| applyBacklog := len(mgtr.applyEventsQueue) | ||
| streamerBacklog := 0 | ||
| if mgtr.eventsStreamer != nil { | ||
| streamerBacklog = len(mgtr.eventsStreamer.eventsChannel) | ||
| } | ||
| if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) && applyBacklog == 0 && streamerBacklog == 0 { | ||
| mgtr.migrationContext.Log.Infof("T3: drain complete; applier caught up to drain GTID") | ||
| break | ||
| } | ||
| if applierCoords != nil && !applierCoords.IsEmpty() && !applierCoords.SmallerThan(drainGTID) { | ||
| mgtr.migrationContext.Log.Debugf("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)", applyBacklog, streamerBacklog) | ||
| } else { | ||
| mgtr.migrationContext.Log.Debugf("T3: applier still behind drain GTID, polling") | ||
| } | ||
| select { | ||
| case <-drainCtx.Done(): | ||
| return fmt.Errorf("drain poll timed out after %s: applier did not catch up to drain GTID", drainTimeout) | ||
| case <-ticker.C: | ||
| // next iteration | ||
| } | ||
| } | ||
|
|
||
| // ----- T4: set CutOverCompleteFlag ----- | ||
| // MUST be set before T5 so the streamer's canStopStreaming loop (migrator.go:256) | ||
| // can wind down in parallel with the (potentially slow) on-success hook. | ||
| // Forgetting this has no visible failure at the call site — the run silently | ||
| // hangs after cutover because eventsStreamer.StreamEvents() never returns. | ||
| atomic.StoreInt64(&mgtr.migrationContext.CutOverCompleteFlag, 1) | ||
| mgtr.migrationContext.Log.Debugf("T4: CutOverCompleteFlag set") | ||
|
|
||
| // ----- T5: on-success hook ----- | ||
| // Hook unlocks user_rw@target via db-user-management and flips the | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should revise these comments before merging this feature branch to the main branch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I mentioned the same thing! |
||
| // write_cutover? feature flag. Standard env vars only — GH_OST_DRAIN_GTID + | ||
| // GH_OST_TARGET_* are #8211 (1.7), not this PR. The pre-protocol placeholder | ||
| // OnSuccess call that used to live in MoveTables() (after finalCleanup) has | ||
| // been removed so the hook fires in the order coop_cutover.md §3.2 step 6 | ||
| // requires (T5 between T4 and T6, BEFORE finalCleanup). | ||
| if err := mgtr.hooksExecutor.OnSuccess(false); err != nil { | ||
| return fmt.Errorf("on-success hook failed: %w", err) | ||
| } | ||
|
|
||
| // ----- T6: return nil ----- | ||
| return nil | ||
| } | ||
|
|
||
| // ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external | ||
| // hook access point | ||
| func (mgtr *Migrator) ExecOnFailureHook() (err error) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.