Skip to content

Commit 91fab67

Browse files
authored
sqlreplay: do not block replaying when logging or reporting hangs (#1000)
1 parent a6be63d commit 91fab67

File tree

4 files changed

+49
-8
lines changed

4 files changed

+49
-8
lines changed

pkg/server/api/traffic.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,14 @@ func (h *Server) TrafficReplay(c *gin.Context) {
155155
}
156156
cfg.OutputPath = c.PostForm("outputpath")
157157
cfg.Addr = c.PostForm("addr")
158+
h.lg.Info("request: traffic replay", zap.Any("cfg", cfg))
158159

159160
if err := h.mgr.ReplayJobMgr.StartReplay(cfg); err != nil {
160161
c.String(http.StatusInternalServerError, err.Error())
162+
h.lg.Info("response: traffic replay", zap.Error(err))
161163
return
162164
}
165+
h.lg.Info("response: traffic replay")
163166
c.String(http.StatusOK, "replay started")
164167
}
165168

@@ -181,7 +184,9 @@ func (h *Server) TrafficCancel(c *gin.Context) {
181184
}
182185
}
183186
cfg.Graceful = strings.EqualFold(c.PostForm("graceful"), "true")
187+
h.lg.Info("request: traffic cancel", zap.Any("cfg", cfg))
184188
result := h.mgr.ReplayJobMgr.Stop(cfg)
189+
h.lg.Info("response: traffic cancel", zap.String("result", result))
185190
c.String(http.StatusOK, result)
186191
}
187192

pkg/sqlreplay/conn/conn.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,15 @@ func (c *conn) Run(ctx context.Context) {
191191
if curDB != command.Value.CurDB && command.Value.CurDB != "" {
192192
// Maybe there's a USE statement already, never mind.
193193
if resp := c.backendConn.ExecuteCmd(ctx, pnet.MakeInitDBRequest(command.Value.CurDB)); resp.Err != nil {
194-
c.replayStats.ExceptionCmds.Add(1)
195-
c.exceptionCh <- NewFailException(resp.Err, command.Value)
194+
c.onExecuteFailed(command.Value, resp.Err)
196195
c.lg.Info("failed to use database", zap.String("db", command.Value.CurDB), zap.Error(resp.Err))
197196
continue
198197
}
199198
curDB = command.Value.CurDB
200199
}
201200
// update psID, SQL, and params for the command.
202201
if err := c.updateExecuteStmt(command.Value); err != nil {
203-
c.replayStats.ExceptionCmds.Add(1)
204-
c.exceptionCh <- NewFailException(err, command.Value)
202+
c.onExecuteFailed(command.Value, err)
205203
continue
206204
}
207205
startTime := time.Now()
@@ -215,16 +213,20 @@ func (c *conn) Run(ctx context.Context) {
215213
curDB = ""
216214
continue
217215
}
218-
c.replayStats.ExceptionCmds.Add(1)
219-
c.exceptionCh <- NewFailException(resp.Err, command.Value)
216+
c.onExecuteFailed(command.Value, resp.Err)
220217
} else {
221218
c.updatePreparedStmts(command.Value.CapturedPsID, command.Value.Payload, resp)
222219
}
223-
c.execInfoCh <- ExecInfo{
220+
// Logging should not block replaying.
221+
select {
222+
case c.execInfoCh <- ExecInfo{
224223
Command: command.Value,
225224
StartTime: startTime,
226225
CostTime: latency,
226+
}:
227+
default:
227228
}
229+
228230
c.replayStats.ReplayedCmds.Add(1)
229231
}
230232
}
@@ -340,11 +342,24 @@ func (c *conn) updateExecuteStmt(command *cmd.Command) error {
340342

341343
func (c *conn) onDisconnected(err error) {
342344
c.replayStats.ExceptionCmds.Add(1)
343-
c.exceptionCh <- NewOtherException(err, c.upstreamConnID)
345+
// Reporting should not block replaying.
346+
select {
347+
case c.exceptionCh <- NewOtherException(err, c.upstreamConnID):
348+
default:
349+
}
344350
clear(c.psIDMapping)
345351
clear(c.preparedStmts)
346352
}
347353

354+
func (c *conn) onExecuteFailed(command *cmd.Command, err error) {
355+
c.replayStats.ExceptionCmds.Add(1)
356+
// Reporting should not block replaying.
357+
select {
358+
case c.exceptionCh <- NewFailException(err, command):
359+
default:
360+
}
361+
}
362+
348363
// ExecuteCmd executes a command asynchronously by adding it to the list.
349364
// Adding commands should never block because it may cause cycle wait, so we don't use channels.
350365
// Conn A: wait for the lock held by conn B, and then its list becomes full and blocks the replay

pkg/sqlreplay/conn/conn_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,3 +822,23 @@ func TestExecInfo(t *testing.T) {
822822
}
823823
}
824824
}
825+
826+
func TestNoBlockReplay(t *testing.T) {
827+
exceptionCh, closeCh, execInfoCh := make(chan Exception, 1), make(chan uint64, 1), make(chan ExecInfo, 1)
828+
stats := &ReplayStats{}
829+
conn := NewConn(zap.NewNop(), ConnOpts{
830+
Username: "u1",
831+
IdMgr: id.NewIDManager(),
832+
ConnID: 1,
833+
UpstreamConnID: 555,
834+
BcConfig: &backend.BCConfig{},
835+
ExceptionCh: exceptionCh,
836+
CloseCh: closeCh,
837+
ExecInfoCh: execInfoCh,
838+
ReplayStats: stats,
839+
})
840+
for range 100 {
841+
conn.onDisconnected(errors.New("conn err"))
842+
conn.onExecuteFailed(&cmd.Command{}, errors.New("exec err"))
843+
}
844+
}

pkg/sqlreplay/replay/replay.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,7 @@ func (r *replay) recordExecInfoLoop() {
903903
}
904904
}
905905

906+
// stop is called when replaying commands fails or finishes, so no connection should be running.
906907
func (r *replay) stop(err error) {
907908
r.Lock()
908909
defer r.Unlock()

0 commit comments

Comments
 (0)