Skip to content

Commit 6e27865

Browse files
committed
implement a basic audit log analyzer
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent d9ef65a commit 6e27865

File tree

4 files changed

+450
-13
lines changed

4 files changed

+450
-13
lines changed

cmd/auditloganalyzer/main.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package main
5+
6+
import (
7+
"database/sql"
8+
"encoding/csv"
9+
"fmt"
10+
"os"
11+
"time"
12+
13+
"github.com/pingcap/tiproxy/lib/config"
14+
"github.com/pingcap/tiproxy/lib/util/cmd"
15+
lg "github.com/pingcap/tiproxy/lib/util/logger"
16+
replaycmd "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
17+
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
18+
"github.com/pingcap/tiproxy/pkg/util/versioninfo"
19+
"github.com/spf13/cobra"
20+
"go.uber.org/zap"
21+
)
22+
23+
const (
24+
formatCSV = "csv"
25+
formatMySQL = "mysql"
26+
)
27+
28+
func main() {
29+
rootCmd := &cobra.Command{
30+
Use: os.Args[0],
31+
Short: "start the analyzer",
32+
Version: fmt.Sprintf("%s, commit %s", versioninfo.TiProxyVersion, versioninfo.TiProxyGitHash),
33+
}
34+
rootCmd.SetOut(os.Stdout)
35+
rootCmd.SetErr(os.Stderr)
36+
37+
input := rootCmd.PersistentFlags().String("input", "", "directory for traffic files")
38+
startTime := rootCmd.PersistentFlags().Time("start-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the start time to analyze the audit log.")
39+
endTime := rootCmd.PersistentFlags().Time("end-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the end time to analyze the audit log.")
40+
output := rootCmd.PersistentFlags().String("output", "audit_log_analysis_result.csv", "the output path for analysis result.")
41+
db := rootCmd.PersistentFlags().String("db", "", "the target database to analyze. Empty means all databases will be recorded.")
42+
filterCommandWithRetry := rootCmd.PersistentFlags().Bool("filter-command-with-retry", false, "filter out commands that are retries according to the audit log.")
43+
outputFormat := rootCmd.PersistentFlags().String("output-format", "csv", "the output format for analysis result. Currently only 'csv' and 'mysql' is supported.")
44+
outputTableName := rootCmd.PersistentFlags().String("output-table-name", "audit_log_analysis", "the output table name when output format is 'mysql'.")
45+
46+
rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
47+
logger, _, _, err := lg.BuildLogger(&config.Log{
48+
Encoder: "tidb",
49+
LogOnline: config.LogOnline{
50+
Level: "info",
51+
},
52+
})
53+
if err != nil {
54+
return err
55+
}
56+
57+
result, err := replay.Analyze(logger, replaycmd.AnalyzeConfig{
58+
Input: *input,
59+
Start: *startTime,
60+
End: *endTime,
61+
DB: *db,
62+
FilterCommandWithRetry: *filterCommandWithRetry,
63+
})
64+
if err != nil {
65+
return err
66+
}
67+
68+
switch *outputFormat {
69+
case formatCSV:
70+
logger.Info("writing analysis result to CSV", zap.String("output", *output))
71+
return writeAnalyzeResultToCSV(result, *output)
72+
case formatMySQL:
73+
logger.Info("writing analysis result to MySQL", zap.String("output", *output), zap.String("table", *outputTableName))
74+
return writeAnalyzeResultToMySQL(result, *output, *outputTableName)
75+
default:
76+
return fmt.Errorf("unsupported output format: %s", *outputFormat)
77+
}
78+
}
79+
80+
cmd.RunRootCommand(rootCmd)
81+
}
82+
83+
func writeAnalyzeResultToCSV(result replaycmd.AuditLogAnalyzeResult, outputPath string) error {
84+
f, err := os.Create(outputPath)
85+
if err != nil {
86+
return err
87+
}
88+
defer f.Close()
89+
w := csv.NewWriter(f)
90+
for sql, group := range result {
91+
record := []string{
92+
sql,
93+
fmt.Sprintf("%d", group.ExecutionCount),
94+
group.TotalCostTime.String(),
95+
fmt.Sprintf("%d", group.TotalAffectedRows),
96+
}
97+
if err := w.Write(record); err != nil {
98+
return err
99+
}
100+
}
101+
w.Flush()
102+
if err := w.Error(); err != nil {
103+
return err
104+
}
105+
return nil
106+
}
107+
108+
func writeAnalyzeResultToMySQL(result replaycmd.AuditLogAnalyzeResult, outputPath string, outputTableName string) error {
109+
db, err := sql.Open("mysql", outputPath)
110+
if err != nil {
111+
return err
112+
}
113+
defer db.Close()
114+
115+
createTableSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
116+
sql_text TEXT,
117+
execution_count INT,
118+
total_cost_time VARCHAR(64),
119+
total_affected_rows BIGINT
120+
);`, outputTableName)
121+
_, err = db.Exec(createTableSQL)
122+
if err != nil {
123+
return err
124+
}
125+
126+
insertSQL := fmt.Sprintf(`INSERT INTO %s (sql_text, execution_count, total_cost_time, total_affected_rows) VALUES (?, ?, ?, ?)`, outputTableName)
127+
stmt, err := db.Prepare(insertSQL)
128+
if err != nil {
129+
return err
130+
}
131+
defer stmt.Close()
132+
133+
for sqlText, group := range result {
134+
_, err := stmt.Exec(sqlText, group.ExecutionCount, group.TotalCostTime.String(), group.TotalAffectedRows)
135+
if err != nil {
136+
return err
137+
}
138+
}
139+
140+
return nil
141+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package cmd
5+
6+
import (
7+
"strconv"
8+
"strings"
9+
"time"
10+
11+
"github.com/pingcap/tidb/pkg/parser"
12+
"github.com/pingcap/tiproxy/lib/util/errors"
13+
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
14+
"github.com/siddontang/go/hack"
15+
)
16+
17+
// AuditLogGroup is the analysis result for a group of similar audit log entries.
18+
type AuditLogGroup struct {
19+
ExecutionCount int
20+
TotalCostTime time.Duration
21+
TotalAffectedRows int64
22+
}
23+
24+
// AuditLogAnalyzeResult is the result of analyzing audit logs.
25+
type AuditLogAnalyzeResult map[string]AuditLogGroup
26+
27+
// AnalyzeConfig is the configuration for audit log analysis.
28+
type AnalyzeConfig struct {
29+
Input string
30+
Start time.Time
31+
End time.Time
32+
DB string
33+
FilterCommandWithRetry bool
34+
}
35+
36+
type auditLogAnalyzer struct {
37+
reader LineReader
38+
39+
cfg AnalyzeConfig
40+
connInfo map[uint64]auditLogPluginConnCtx
41+
}
42+
43+
// NewAuditLogAnalyzer creates a new audit log analyzer.
44+
func NewAuditLogAnalyzer(reader LineReader, cfg AnalyzeConfig) *auditLogAnalyzer {
45+
return &auditLogAnalyzer{
46+
reader: reader,
47+
cfg: cfg,
48+
connInfo: make(map[uint64]auditLogPluginConnCtx),
49+
}
50+
}
51+
52+
// Analyze analyzes the audit log and returns the analysis result.
53+
func (a *auditLogAnalyzer) Analyze() (AuditLogAnalyzeResult, error) {
54+
result := make(AuditLogAnalyzeResult)
55+
56+
kvs := make(map[string]string, 25)
57+
for {
58+
line, filename, lineIdx, err := a.reader.ReadLine()
59+
if err != nil {
60+
return result, err
61+
}
62+
clear(kvs)
63+
err = parseLog(kvs, hack.String(line))
64+
if err != nil {
65+
return result, errors.Errorf("%s, line %d: %s", filename, lineIdx, err.Error())
66+
}
67+
// Only analyze the COMPLETED event
68+
event, ok := kvs[auditPluginKeyEvent]
69+
if !ok || event != auditPluginEventEnd {
70+
continue
71+
}
72+
73+
// Only analyze the event within the time range
74+
startTs, endTs, err := parseStartAndEndTs(kvs)
75+
if err != nil {
76+
return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx)
77+
}
78+
if endTs.Before(a.cfg.Start) {
79+
continue
80+
}
81+
if endTs.After(a.cfg.End) {
82+
// Reach the end time, stop analyzing.
83+
return result, nil
84+
}
85+
86+
// Only analyze the `Query` and `Execute` commands
87+
cmdStr := parseCommand(kvs[auditPluginKeyCommand])
88+
if cmdStr != "Query" && cmdStr != "Execute" {
89+
continue
90+
}
91+
92+
// Only analyze the SQL in given database
93+
if len(a.cfg.DB) != 0 {
94+
databases, ok := kvs[auditPluginKeyDatabases]
95+
if !ok {
96+
continue
97+
}
98+
99+
includeTargetDB := false
100+
for _, db := range strings.Split(databases, ",") {
101+
if db == a.cfg.DB {
102+
includeTargetDB = true
103+
}
104+
}
105+
if !includeTargetDB {
106+
continue
107+
}
108+
}
109+
110+
// Try to filter out retried commands
111+
connID, err := strconv.ParseUint(kvs[auditPluginKeyConnID], 10, 64)
112+
if err != nil {
113+
return result, errors.Wrapf(err, "%s, line %d: parse conn id failed: %s", filename, lineIdx, kvs[auditPluginKeyConnID])
114+
}
115+
connInfo := a.connInfo[connID]
116+
if a.cfg.FilterCommandWithRetry {
117+
if retryStr, ok := kvs[auditPluginKeyRetry]; ok {
118+
// If it's a retry command, just skip it.
119+
if retryStr == "true" {
120+
continue
121+
}
122+
}
123+
} else {
124+
if isDuplicatedWrite(connInfo.lastCmd, kvs, cmdStr, kvs[auditPluginKeySQL], startTs, endTs) {
125+
continue
126+
}
127+
}
128+
129+
sql, err := parseSQL(kvs[auditPluginKeySQL])
130+
if err != nil {
131+
return result, errors.Wrapf(err, "unquote sql failed: %s", kvs[auditPluginKeySQL])
132+
}
133+
normalizedSQL := parser.Normalize(sql, "ON")
134+
group := result[normalizedSQL]
135+
136+
var costTime time.Duration
137+
costTimeStr := kvs[auditPluginKeyCostTime]
138+
if len(costTimeStr) != 0 {
139+
millis, err := strconv.ParseFloat(costTimeStr, 32)
140+
if err != nil {
141+
return result, errors.Errorf("parsing cost time failed: %s", costTimeStr)
142+
}
143+
costTime = time.Duration(millis) * (time.Millisecond)
144+
}
145+
146+
var affectedRows int64
147+
affectedRowsStr := kvs[auditPluginKeyRows]
148+
if len(affectedRowsStr) != 0 {
149+
affectedRows, err = strconv.ParseInt(affectedRowsStr, 10, 64)
150+
if err != nil {
151+
return result, errors.Errorf("parsing affected rows failed: %s", affectedRowsStr)
152+
}
153+
}
154+
155+
// Record the last command info for deduplication. We only recorded the needed fields here.
156+
connInfo.lastCmd = &Command{
157+
StartTs: startTs,
158+
EndTs: endTs,
159+
ConnID: connID,
160+
}
161+
switch cmdStr {
162+
case "Query":
163+
connInfo.lastCmd.Type = pnet.ComQuery
164+
connInfo.lastCmd.Payload = append([]byte{pnet.ComQuery.Byte()}, hack.Slice(sql)...)
165+
case "Execute":
166+
connInfo.lastCmd.Type = pnet.ComStmtExecute
167+
connInfo.lastCmd.PreparedStmt = sql
168+
}
169+
connInfo.lastCmd.StmtType = kvs[auditPluginKeyStmtType]
170+
connInfo.lastCmd.kvs = kvs
171+
a.connInfo[connID] = connInfo
172+
173+
group.ExecutionCount++
174+
group.TotalCostTime += costTime
175+
group.TotalAffectedRows += affectedRows
176+
result[normalizedSQL] = group
177+
}
178+
}

pkg/sqlreplay/cmd/audit_log_plugin.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
auditPluginKeyCostTime = "COST_TIME"
3131
auditPluginKeyPreparedStmtID = "PREPARED_STMT_ID"
3232
auditPluginKeyRetry = "RETRY"
33+
auditPluginKeyRows = "ROWS"
34+
auditPluginKeyDatabases = "DATABASES"
3335

3436
auditPluginClassGeneral = "GENERAL"
3537
auditPluginClassTableAccess = "TABLE_ACCESS"
@@ -603,6 +605,28 @@ func parseStmtID(value string) (uint32, error) {
603605

604606
// Transaction retrials will record the same SQL multiple times in the audit logs, so we need to deduplicate them.
605607
func (decoder *AuditLogPluginDecoder) isDuplicatedWrite(lastCmd *Command, kvs map[string]string, cmdType, sql string, startTs, endTs time.Time) bool {
608+
isDuplicated := isDuplicatedWrite(lastCmd, kvs, cmdType, sql, startTs, endTs)
609+
if !isDuplicated {
610+
return false
611+
}
612+
613+
// Record the deduplication.
614+
decoder.dedup.Lock()
615+
dedup := decoder.dedup.Items[lastCmd.StmtType]
616+
dedup.Times++
617+
dedup.Cost += endTs.Sub(startTs)
618+
overlap := lastCmd.EndTs.Sub(startTs)
619+
if dedup.MinOverlap == 0 {
620+
dedup.MinOverlap = overlap
621+
} else if dedup.MinOverlap > overlap {
622+
dedup.MinOverlap = overlap
623+
}
624+
decoder.dedup.Items[lastCmd.StmtType] = dedup
625+
decoder.dedup.Unlock()
626+
return true
627+
}
628+
629+
func isDuplicatedWrite(lastCmd *Command, kvs map[string]string, cmdType, sql string, startTs, endTs time.Time) bool {
606630
if lastCmd == nil {
607631
return false
608632
}
@@ -637,19 +661,7 @@ func (decoder *AuditLogPluginDecoder) isDuplicatedWrite(lastCmd *Command, kvs ma
637661
default:
638662
return false
639663
}
640-
// Record the deduplication.
641-
decoder.dedup.Lock()
642-
dedup := decoder.dedup.Items[lastCmd.StmtType]
643-
dedup.Times++
644-
dedup.Cost += endTs.Sub(startTs)
645-
overlap := lastCmd.EndTs.Sub(startTs)
646-
if dedup.MinOverlap == 0 {
647-
dedup.MinOverlap = overlap
648-
} else if dedup.MinOverlap > overlap {
649-
dedup.MinOverlap = overlap
650-
}
651-
decoder.dedup.Items[lastCmd.StmtType] = dedup
652-
decoder.dedup.Unlock()
664+
653665
return true
654666
}
655667

0 commit comments

Comments
 (0)