Skip to content

Commit ad31059

Browse files
authored
replay: use start-time config to start replay later, and add dryrun argument for api. (#1003)
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent 085a334 commit ad31059

File tree

4 files changed

+79
-8
lines changed

4 files changed

+79
-8
lines changed

cmd/replayer/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func main() {
5959
outputPath := rootCmd.PersistentFlags().String("output-path", "", "the file path to store replayed sql. Empty indicates do not output replayed sql.")
6060
serviceMode := rootCmd.PersistentFlags().Bool("service-mode", false, "run replayer in service mode")
6161
logLevel := rootCmd.PersistentFlags().String("log-level", "info", "the log level: debug, info, warn, error, dpanic, panic, fatal")
62+
startTime := rootCmd.PersistentFlags().Time("start-time", time.Now(), []string{time.RFC3339, time.RFC3339Nano}, "the time to start the replay. Format is RFC3339. Default is the current time.")
6263

6364
rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
6465
// set up general managers
@@ -134,7 +135,7 @@ func main() {
134135
Password: *password,
135136
Format: *format,
136137
ReadOnly: *readonly,
137-
StartTime: time.Now(),
138+
StartTime: *startTime,
138139
CommandStartTime: *cmdStartTime,
139140
CommandEndTime: *cmdEndTime,
140141
IgnoreErrs: *ignoreErrs,

pkg/server/api/traffic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (h *Server) TrafficReplay(c *gin.Context) {
155155
}
156156
cfg.OutputPath = c.PostForm("outputpath")
157157
cfg.Addr = c.PostForm("addr")
158+
cfg.DryRun = strings.EqualFold(c.PostForm("dryrun"), "true")
158159
h.lg.Info("request: traffic replay", zap.Any("cfg", cfg))
159160

160161
if err := h.mgr.ReplayJobMgr.StartReplay(cfg); err != nil {

pkg/sqlreplay/replay/replay.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,6 @@ func (cfg *ReplayConfig) Validate() ([]storage.ExternalStorage, error) {
176176
now := time.Now()
177177
if cfg.StartTime.IsZero() {
178178
return storages, errors.New("start time is not specified")
179-
} else if now.Add(time.Minute).Before(cfg.StartTime) {
180-
return storages, errors.New("start time should not be in the future")
181179
} else if cfg.StartTime.Add(time.Minute).Before(now) {
182180
return storages, errors.New("start time should not be in the past")
183181
}
@@ -269,7 +267,7 @@ type replay struct {
269267
exceptionCh chan conn.Exception
270268
closeConnCh chan uint64
271269
execInfoCh chan conn.ExecInfo
272-
gracefulStop atomic.Bool
270+
gracefulStopCh chan struct{}
273271
wg waitgroup.WaitGroup
274272
cancel context.CancelFunc
275273
connCreator conn.ConnCreator
@@ -309,7 +307,7 @@ func (r *replay) Start(cfg ReplayConfig, backendTLSConfig *tls.Config, hsHandler
309307
r.cfg = cfg
310308
r.storages = storages
311309
r.meta = *r.readMeta()
312-
r.gracefulStop.Store(false)
310+
r.gracefulStopCh = make(chan struct{})
313311
r.startTime = cfg.StartTime
314312
r.endTime = time.Time{}
315313
r.progress = 0
@@ -395,7 +393,45 @@ func (r *replay) Start(cfg ReplayConfig, backendTLSConfig *tls.Config, hsHandler
395393
return nil
396394
}
397395

396+
// waitUntilStartTime waits until the configured start time and returns whether the replay has stopped
397+
func (r *replay) waitUntilStartTime(ctx context.Context) bool {
398+
waitDuration := time.Until(r.cfg.StartTime)
399+
if waitDuration > 0 {
400+
r.lg.Info("wait until the specified time to start replaying", zap.Time("until", r.cfg.StartTime),
401+
zap.Duration("duration", waitDuration))
402+
select {
403+
case <-ctx.Done():
404+
return true
405+
case <-time.After(waitDuration):
406+
return false
407+
case <-r.gracefulStopCh:
408+
return true
409+
}
410+
}
411+
412+
return false
413+
}
414+
415+
// isGracefulStopped returns whether the replay is gracefully stopped.
416+
func (r *replay) isGracefulStopped() bool {
417+
select {
418+
case _, ok := <-r.gracefulStopCh:
419+
if !ok {
420+
return true
421+
} else {
422+
return false
423+
}
424+
default:
425+
return false
426+
}
427+
}
428+
398429
func (r *replay) readCommands(ctx context.Context) {
430+
if r.waitUntilStartTime(ctx) {
431+
r.stop(nil)
432+
return
433+
}
434+
399435
var decoder decoder
400436
var err error
401437

@@ -429,7 +465,7 @@ func (r *replay) readCommands(ctx context.Context) {
429465
connCount := 0 // alive connection count
430466
maxPendingCmds := int64(0)
431467
extraWaitTime := time.Duration(0)
432-
for ctx.Err() == nil && !r.gracefulStop.Load() {
468+
for ctx.Err() == nil && !r.isGracefulStopped() {
433469
for hasCloseEvent := true; hasCloseEvent; {
434470
select {
435471
case id := <-r.closeConnCh:
@@ -501,6 +537,7 @@ func (r *replay) readCommands(ctx context.Context) {
501537
select {
502538
case <-ctx.Done():
503539
case <-time.After(expectedInterval):
540+
case <-r.gracefulStopCh:
504541
}
505542
}
506543
}
@@ -513,7 +550,7 @@ func (r *replay) readCommands(ctx context.Context) {
513550
zap.Int("alive_conns", connCount),
514551
zap.Time("last_cmd_start_ts", time.Unix(0, r.replayStats.CurCmdTs.Load())),
515552
zap.Time("last_cmd_end_ts", time.Unix(0, r.replayStats.CurCmdEndTs.Load())),
516-
zap.Bool("graceful_stop", r.gracefulStop.Load()),
553+
zap.Bool("graceful_stop", r.isGracefulStopped()),
517554
zap.Error(err),
518555
zap.NamedError("ctx_err", ctx.Err()))
519556

@@ -980,6 +1017,10 @@ func (r *replay) Wait() {
9801017
r.wg.Wait()
9811018
}
9821019

1020+
func (r *replay) gracefulStop() {
1021+
close(r.gracefulStopCh)
1022+
}
1023+
9831024
func (r *replay) Stop(err error, graceful bool) {
9841025
r.Lock()
9851026
// already stopped
@@ -989,7 +1030,7 @@ func (r *replay) Stop(err error, graceful bool) {
9891030
}
9901031
r.err = err
9911032
if graceful {
992-
r.gracefulStop.Store(true)
1033+
r.gracefulStop()
9931034
} else if r.cancel != nil {
9941035
r.cancel()
9951036
r.cancel = nil

pkg/sqlreplay/replay/replay_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,3 +1102,31 @@ func TestDynamicInputCoverAllDirectories(t *testing.T) {
11021102
}
11031103
require.Equal(t, expectCommandCount, actualCommandCount)
11041104
}
1105+
1106+
func TestWaitUntil(t *testing.T) {
1107+
replay := NewReplay(zap.NewNop(), id.NewIDManager())
1108+
defer replay.Close()
1109+
loader := newMockNormalLoader()
1110+
startReplayTime := time.Now()
1111+
waitUntil := startReplayTime.Add(500 * time.Millisecond)
1112+
cfg := ReplayConfig{
1113+
DryRun: true,
1114+
Input: t.TempDir(),
1115+
StartTime: waitUntil,
1116+
readers: []cmd.LineReader{loader},
1117+
PSCloseStrategy: cmd.PSCloseStrategyDirected,
1118+
}
1119+
1120+
now := time.Now()
1121+
command := newMockCommand(1)
1122+
command.StartTs = now
1123+
loader.writeCommand(command, cmd.FormatAuditLogPlugin)
1124+
require.NoError(t, replay.Start(cfg, nil, nil, &backend.BCConfig{}))
1125+
loader.Close()
1126+
1127+
require.Eventually(t, func() bool {
1128+
return replay.replayStats.CurCmdTs.Load() != 0
1129+
}, time.Second*5, time.Millisecond)
1130+
actualStartTime := time.Now()
1131+
require.GreaterOrEqual(t, actualStartTime.Sub(startReplayTime), 500*time.Millisecond)
1132+
}

0 commit comments

Comments
 (0)