Skip to content

Commit 4a19a43

Browse files
committed
implement a basic audit log analyzer
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
1 parent d9ef65a commit 4a19a43

File tree

4 files changed

+480
-13
lines changed

4 files changed

+480
-13
lines changed

cmd/auditloganalyzer/main.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package main
5+
6+
import (
7+
"database/sql"
8+
"encoding/csv"
9+
"fmt"
10+
"os"
11+
"time"
12+
13+
"github.com/pingcap/tiproxy/lib/config"
14+
"github.com/pingcap/tiproxy/lib/util/cmd"
15+
lg "github.com/pingcap/tiproxy/lib/util/logger"
16+
replaycmd "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
17+
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
18+
"github.com/pingcap/tiproxy/pkg/util/versioninfo"
19+
"github.com/spf13/cobra"
20+
"go.uber.org/zap"
21+
)
22+
23+
const (
24+
formatCSV = "csv"
25+
formatMySQL = "mysql"
26+
)
27+
28+
func main() {
29+
rootCmd := &cobra.Command{
30+
Use: os.Args[0],
31+
Short: "start the analyzer",
32+
Version: fmt.Sprintf("%s, commit %s", versioninfo.TiProxyVersion, versioninfo.TiProxyGitHash),
33+
}
34+
rootCmd.SetOut(os.Stdout)
35+
rootCmd.SetErr(os.Stderr)
36+
37+
input := rootCmd.PersistentFlags().String("input", "", "directory for traffic files")
38+
startTime := rootCmd.PersistentFlags().Time("start-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the start time to analyze the audit log.")
39+
endTime := rootCmd.PersistentFlags().Time("end-time", time.Time{}, []string{time.RFC3339, time.RFC3339Nano}, "the end time to analyze the audit log.")
40+
output := rootCmd.PersistentFlags().String("output", "audit_log_analysis_result.csv", "the output path for analysis result.")
41+
db := rootCmd.PersistentFlags().String("db", "", "the target database to analyze. Empty means all databases will be recorded.")
42+
filterCommandWithRetry := rootCmd.PersistentFlags().Bool("filter-command-with-retry", false, "filter out commands that are retries according to the audit log.")
43+
outputFormat := rootCmd.PersistentFlags().String("output-format", "csv", "the output format for analysis result. Currently only 'csv' and 'mysql' is supported.")
44+
outputTableName := rootCmd.PersistentFlags().String("output-table-name", "audit_log_analysis", "the output table name when output format is 'mysql'.")
45+
46+
rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
47+
logger, _, _, err := lg.BuildLogger(&config.Log{
48+
Encoder: "tidb",
49+
LogOnline: config.LogOnline{
50+
Level: "info",
51+
},
52+
})
53+
if err != nil {
54+
return err
55+
}
56+
57+
result, err := replay.Analyze(logger, replaycmd.AnalyzeConfig{
58+
Input: *input,
59+
Start: *startTime,
60+
End: *endTime,
61+
DB: *db,
62+
FilterCommandWithRetry: *filterCommandWithRetry,
63+
})
64+
if err != nil {
65+
return err
66+
}
67+
68+
switch *outputFormat {
69+
case formatCSV:
70+
logger.Info("writing analysis result to CSV", zap.String("output", *output))
71+
return writeAnalyzeResultToCSV(result, *output)
72+
case formatMySQL:
73+
logger.Info("writing analysis result to MySQL", zap.String("output", *output), zap.String("table", *outputTableName))
74+
return writeAnalyzeResultToMySQL(result, *output, *outputTableName)
75+
default:
76+
return fmt.Errorf("unsupported output format: %s", *outputFormat)
77+
}
78+
}
79+
80+
cmd.RunRootCommand(rootCmd)
81+
}
82+
83+
func writeAnalyzeResultToCSV(result replaycmd.AuditLogAnalyzeResult, outputPath string) error {
84+
f, err := os.Create(outputPath)
85+
if err != nil {
86+
return err
87+
}
88+
defer f.Close()
89+
w := csv.NewWriter(f)
90+
for sql, group := range result {
91+
record := []string{
92+
sql,
93+
fmt.Sprintf("%d", group.ExecutionCount),
94+
fmt.Sprintf("%d", group.TotalCostTime.Microseconds()),
95+
fmt.Sprintf("%d", group.TotalAffectedRows),
96+
group.StmtTypes.String(),
97+
}
98+
if err := w.Write(record); err != nil {
99+
return err
100+
}
101+
}
102+
w.Flush()
103+
if err := w.Error(); err != nil {
104+
return err
105+
}
106+
return nil
107+
}
108+
109+
func writeAnalyzeResultToMySQL(result replaycmd.AuditLogAnalyzeResult, outputPath string, outputTableName string) error {
110+
db, err := sql.Open("mysql", outputPath)
111+
if err != nil {
112+
return err
113+
}
114+
defer db.Close()
115+
116+
createTableSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
117+
sql_text TEXT,
118+
execution_count INT,
119+
total_cost_time BIGINT,
120+
total_affected_rows BIGINT,
121+
statement_types TEXT
122+
);`, outputTableName)
123+
_, err = db.Exec(createTableSQL)
124+
if err != nil {
125+
return err
126+
}
127+
128+
insertSQL := fmt.Sprintf(`INSERT INTO %s (sql_text, execution_count, total_cost_time, total_affected_rows, statement_types) VALUES (?, ?, ?, ?, ?)`, outputTableName)
129+
stmt, err := db.Prepare(insertSQL)
130+
if err != nil {
131+
return err
132+
}
133+
defer stmt.Close()
134+
135+
for sqlText, group := range result {
136+
_, err := stmt.Exec(sqlText, group.ExecutionCount, group.TotalCostTime.Microseconds(), group.TotalAffectedRows, group.StmtTypes.String())
137+
if err != nil {
138+
return err
139+
}
140+
}
141+
142+
return nil
143+
}
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package cmd
5+
6+
import (
7+
"strconv"
8+
"strings"
9+
"time"
10+
11+
"github.com/pingcap/tidb/pkg/parser"
12+
"github.com/pingcap/tiproxy/lib/util/errors"
13+
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
14+
"github.com/siddontang/go/hack"
15+
)
16+
17+
type stmtTypesSet map[string]struct{}
18+
19+
func (s stmtTypesSet) String() string {
20+
var types []string
21+
for stmtType := range s {
22+
types = append(types, stmtType)
23+
}
24+
return strings.Join(types, ",")
25+
}
26+
27+
// AuditLogGroup is the analysis result for a group of similar audit log entries.
28+
type AuditLogGroup struct {
29+
ExecutionCount int
30+
TotalCostTime time.Duration
31+
TotalAffectedRows int64
32+
StmtTypes stmtTypesSet
33+
}
34+
35+
// AuditLogAnalyzeResult is the result of analyzing audit logs.
36+
type AuditLogAnalyzeResult map[string]AuditLogGroup
37+
38+
func (r AuditLogAnalyzeResult) Merge(other AuditLogAnalyzeResult) {
39+
for sql, group := range other {
40+
finalGroup := r[sql]
41+
finalGroup.ExecutionCount += group.ExecutionCount
42+
finalGroup.TotalCostTime += group.TotalCostTime
43+
finalGroup.TotalAffectedRows += group.TotalAffectedRows
44+
for stmtType := range group.StmtTypes {
45+
if finalGroup.StmtTypes == nil {
46+
finalGroup.StmtTypes = make(map[string]struct{})
47+
}
48+
finalGroup.StmtTypes[stmtType] = struct{}{}
49+
}
50+
r[sql] = finalGroup
51+
}
52+
}
53+
54+
// AnalyzeConfig is the configuration for audit log analysis.
55+
type AnalyzeConfig struct {
56+
Input string
57+
Start time.Time
58+
End time.Time
59+
DB string
60+
FilterCommandWithRetry bool
61+
}
62+
63+
type auditLogAnalyzer struct {
64+
reader LineReader
65+
66+
cfg AnalyzeConfig
67+
connInfo map[uint64]auditLogPluginConnCtx
68+
}
69+
70+
// NewAuditLogAnalyzer creates a new audit log analyzer.
71+
func NewAuditLogAnalyzer(reader LineReader, cfg AnalyzeConfig) *auditLogAnalyzer {
72+
return &auditLogAnalyzer{
73+
reader: reader,
74+
cfg: cfg,
75+
connInfo: make(map[uint64]auditLogPluginConnCtx),
76+
}
77+
}
78+
79+
// Analyze analyzes the audit log and returns the analysis result.
80+
func (a *auditLogAnalyzer) Analyze() (AuditLogAnalyzeResult, error) {
81+
result := make(AuditLogAnalyzeResult)
82+
83+
kvs := make(map[string]string, 25)
84+
for {
85+
line, filename, lineIdx, err := a.reader.ReadLine()
86+
if err != nil {
87+
return result, err
88+
}
89+
clear(kvs)
90+
err = parseLog(kvs, hack.String(line))
91+
if err != nil {
92+
return result, errors.Errorf("%s, line %d: %s", filename, lineIdx, err.Error())
93+
}
94+
// Only analyze the COMPLETED event
95+
event, ok := kvs[auditPluginKeyEvent]
96+
if !ok || event != auditPluginEventEnd {
97+
continue
98+
}
99+
100+
// Only analyze the event within the time range
101+
startTs, endTs, err := parseStartAndEndTs(kvs)
102+
if err != nil {
103+
return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx)
104+
}
105+
if endTs.Before(a.cfg.Start) {
106+
continue
107+
}
108+
if endTs.After(a.cfg.End) {
109+
// Reach the end time, stop analyzing.
110+
return result, nil
111+
}
112+
113+
// Only analyze the `Query` and `Execute` commands
114+
cmdStr := parseCommand(kvs[auditPluginKeyCommand])
115+
if cmdStr != "Query" && cmdStr != "Execute" {
116+
continue
117+
}
118+
119+
// Only analyze the SQL in given database
120+
if len(a.cfg.DB) != 0 {
121+
databases, ok := kvs[auditPluginKeyDatabases]
122+
if !ok {
123+
continue
124+
}
125+
126+
includeTargetDB := false
127+
for _, db := range strings.Split(databases, ",") {
128+
if db == a.cfg.DB {
129+
includeTargetDB = true
130+
}
131+
}
132+
if !includeTargetDB {
133+
continue
134+
}
135+
}
136+
137+
// Try to filter out retried commands
138+
connID, err := strconv.ParseUint(kvs[auditPluginKeyConnID], 10, 64)
139+
if err != nil {
140+
return result, errors.Wrapf(err, "%s, line %d: parse conn id failed: %s", filename, lineIdx, kvs[auditPluginKeyConnID])
141+
}
142+
connInfo := a.connInfo[connID]
143+
if a.cfg.FilterCommandWithRetry {
144+
if retryStr, ok := kvs[auditPluginKeyRetry]; ok {
145+
// If it's a retry command, just skip it.
146+
if retryStr == "true" {
147+
continue
148+
}
149+
}
150+
} else {
151+
sql, err := parseSQL(kvs[auditPluginKeySQL])
152+
if err != nil {
153+
return result, errors.Wrapf(err, "%s, line %d: unquote sql failed: %s", filename, lineIdx, kvs[auditPluginKeySQL])
154+
}
155+
if isDuplicatedWrite(connInfo.lastCmd, kvs, cmdStr, sql, startTs, endTs) {
156+
continue
157+
}
158+
}
159+
160+
sql, err := parseSQL(kvs[auditPluginKeySQL])
161+
if err != nil {
162+
return result, errors.Wrapf(err, "unquote sql failed: %s", kvs[auditPluginKeySQL])
163+
}
164+
normalizedSQL := parser.Normalize(sql, "ON")
165+
group := result[normalizedSQL]
166+
167+
var costTime time.Duration
168+
costTimeStr := kvs[auditPluginKeyCostTime]
169+
if len(costTimeStr) != 0 {
170+
millis, err := strconv.ParseFloat(costTimeStr, 32)
171+
if err != nil {
172+
return result, errors.Errorf("parsing cost time failed: %s", costTimeStr)
173+
}
174+
costTime = time.Duration(millis) * (time.Millisecond)
175+
}
176+
177+
var affectedRows int64
178+
affectedRowsStr := kvs[auditPluginKeyRows]
179+
if len(affectedRowsStr) != 0 {
180+
affectedRows, err = strconv.ParseInt(affectedRowsStr, 10, 64)
181+
if err != nil {
182+
return result, errors.Errorf("parsing affected rows failed: %s", affectedRowsStr)
183+
}
184+
}
185+
186+
// Record the last command info for deduplication. We only recorded the needed fields here.
187+
connInfo.lastCmd = &Command{
188+
StartTs: startTs,
189+
EndTs: endTs,
190+
ConnID: connID,
191+
}
192+
switch cmdStr {
193+
case "Query":
194+
connInfo.lastCmd.Type = pnet.ComQuery
195+
connInfo.lastCmd.Payload = append([]byte{pnet.ComQuery.Byte()}, hack.Slice(sql)...)
196+
case "Execute":
197+
connInfo.lastCmd.Type = pnet.ComStmtExecute
198+
connInfo.lastCmd.PreparedStmt = sql
199+
}
200+
connInfo.lastCmd.StmtType = kvs[auditPluginKeyStmtType]
201+
connInfo.lastCmd.kvs = kvs
202+
a.connInfo[connID] = connInfo
203+
204+
group.ExecutionCount++
205+
group.TotalCostTime += costTime
206+
group.TotalAffectedRows += affectedRows
207+
if len(kvs[auditPluginKeyStmtType]) != 0 {
208+
if group.StmtTypes == nil {
209+
group.StmtTypes = make(map[string]struct{})
210+
}
211+
group.StmtTypes[kvs[auditPluginKeyStmtType]] = struct{}{}
212+
}
213+
result[normalizedSQL] = group
214+
}
215+
}

0 commit comments

Comments
 (0)