Skip to content

Commit 05ab675

Browse files
SleepyKFishwantWhatBike
authored andcommitted
fix: resolve SQLite locking when opening large projects or rebuilding workspace
1 parent 2176975 commit 05ab675

File tree

4 files changed

+236
-18
lines changed

4 files changed

+236
-18
lines changed

internal/repository/event.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package repository
33
import (
44
"database/sql"
55
"fmt"
6+
"runtime"
67
"strings"
78
"time"
89

@@ -11,6 +12,28 @@ import (
1112
"codebase-indexer/pkg/logger"
1213
)
1314

15+
// getCallerInfo 获取调用者信息(跳过指定层数的调用栈)
16+
func getCallerInfo(skip int) string {
17+
pc, file, line, ok := runtime.Caller(skip)
18+
if !ok {
19+
return "unknown"
20+
}
21+
fn := runtime.FuncForPC(pc)
22+
funcName := "unknown"
23+
if fn != nil {
24+
funcName = fn.Name()
25+
// 只保留函数名,去掉包路径
26+
if idx := strings.LastIndex(funcName, "/"); idx != -1 {
27+
funcName = funcName[idx+1:]
28+
}
29+
}
30+
// 只保留文件名,去掉路径
31+
if idx := strings.LastIndex(file, "/"); idx != -1 {
32+
file = file[idx+1:]
33+
}
34+
return fmt.Sprintf("%s:%d %s", file, line, funcName)
35+
}
36+
1437
// EventRepository 事件数据访问层
1538
type EventRepository interface {
1639
// CreateEvent 创建事件
@@ -49,6 +72,8 @@ type EventRepository interface {
4972
BatchCreateEvents(events []*model.Event) error
5073
// BatchDeleteEvents 批量删除事件
5174
BatchDeleteEvents(ids []int64) error
75+
// BatchUpdateEvents 批量更新事件(用于文件变更检测时的批量状态更新)
76+
BatchUpdateEvents(events []*model.Event) error
5277
// UpdateEvents 批量更新事件嵌入信息
5378
UpdateEventsEmbedding(events []*model.Event) error
5479
// UpdateEventsEmbeddingStatus 批量更新事件嵌入状态
@@ -83,6 +108,11 @@ func (r *eventRepository) CreateEvent(event *model.Event) error {
83108
`
84109

85110
nowTime := time.Now()
111+
112+
// 写数据库前打印调用者信息
113+
caller := getCallerInfo(2)
114+
r.logger.Info("[DB] CreateEvent called by: %s, path: %s", caller, event.SourceFilePath)
115+
86116
result, err := r.db.GetDB().Exec(query,
87117
event.WorkspacePath,
88118
event.EventType,
@@ -847,6 +877,10 @@ func (r *eventRepository) UpdateEvent(event *model.Event) error {
847877
query := fmt.Sprintf("UPDATE events SET %s WHERE id = ?", strings.Join(setClauses, ", "))
848878
args = append(args, event.ID)
849879

880+
// 写数据库前打印调用者信息
881+
caller := getCallerInfo(2)
882+
r.logger.Info("[DB] UpdateEvent called by: %s, eventID: %d", caller, event.ID)
883+
850884
result, err := r.db.GetDB().Exec(query, args...)
851885
if err != nil {
852886
return fmt.Errorf("[DB] failed to update event: %w", err)
@@ -1227,6 +1261,9 @@ func (r *eventRepository) BatchCreateEvents(events []*model.Event) error {
12271261
return nil
12281262
}
12291263

1264+
// 获取调用者信息
1265+
caller := getCallerInfo(2)
1266+
12301267
const batchSize = 1000
12311268
nowTime := time.Now()
12321269
totalCreated := int64(0)
@@ -1260,6 +1297,9 @@ func (r *eventRepository) BatchCreateEvents(events []*model.Event) error {
12601297
query := fmt.Sprintf("INSERT INTO events (workspace_path, event_type, source_file_path, target_file_path, embedding_status, codegraph_status, created_at, updated_at) VALUES %s",
12611298
strings.Join(valueStrings, ","))
12621299

1300+
// 写数据库前打印调用者信息
1301+
r.logger.Info("[DB] BatchCreateEvents called by: %s, batch: %d-%d, count: %d", caller, i+1, end, len(batch))
1302+
12631303
result, err := r.db.GetDB().Exec(query, valueArgs...)
12641304
if err != nil {
12651305
return fmt.Errorf("[DB] failed to batch create events (batch %d-%d): %w", i+1, end, err)
@@ -1296,6 +1336,9 @@ func (r *eventRepository) BatchDeleteEvents(ids []int64) error {
12961336
return nil
12971337
}
12981338

1339+
// 获取调用者信息
1340+
caller := getCallerInfo(2)
1341+
12991342
const batchSize = 1000
13001343
totalDeleted := int64(0)
13011344

@@ -1319,6 +1362,9 @@ func (r *eventRepository) BatchDeleteEvents(ids []int64) error {
13191362
args[j] = id
13201363
}
13211364

1365+
// 写数据库前打印调用者信息
1366+
r.logger.Info("[DB] BatchDeleteEvents called by: %s, batch: %d-%d, count: %d", caller, i+1, end, len(batch))
1367+
13221368
result, err := r.db.GetDB().Exec(query, args...)
13231369
if err != nil {
13241370
return fmt.Errorf("[DB] failed to batch delete events (batch %d-%d): %w", i+1, end, err)
@@ -1337,12 +1383,89 @@ func (r *eventRepository) BatchDeleteEvents(ids []int64) error {
13371383
return nil
13381384
}
13391385

1386+
// BatchUpdateEvents 批量更新事件(用于文件变更检测时的批量状态更新)
1387+
func (r *eventRepository) BatchUpdateEvents(events []*model.Event) error {
1388+
if len(events) == 0 {
1389+
return nil
1390+
}
1391+
1392+
// 获取调用者信息
1393+
caller := getCallerInfo(2)
1394+
1395+
const batchSize = 1000
1396+
nowTime := time.Now()
1397+
totalUpdated := int64(0)
1398+
1399+
query := `
1400+
UPDATE events
1401+
SET event_type = ?, target_file_path = ?, embedding_status = ?, codegraph_status = ?, updated_at = ?
1402+
WHERE id = ?
1403+
`
1404+
1405+
// 分批处理
1406+
for i := 0; i < len(events); i += batchSize {
1407+
end := i + batchSize
1408+
if end > len(events) {
1409+
end = len(events)
1410+
}
1411+
batch := events[i:end]
1412+
1413+
// 写数据库前打印调用者信息
1414+
r.logger.Info("[DB] BatchUpdateEvents called by: %s, batch: %d-%d, count: %d", caller, i+1, end, len(batch))
1415+
1416+
// 每个批次一个事务
1417+
tx, err := r.db.GetDB().Begin()
1418+
if err != nil {
1419+
return fmt.Errorf("[DB] failed to begin transaction (batch %d-%d): %w", i+1, end, err)
1420+
}
1421+
1422+
stmt, err := tx.Prepare(query)
1423+
if err != nil {
1424+
tx.Rollback()
1425+
return fmt.Errorf("[DB] failed to prepare statement (batch %d-%d): %w", i+1, end, err)
1426+
}
1427+
1428+
for _, event := range batch {
1429+
_, err = stmt.Exec(
1430+
event.EventType,
1431+
event.TargetFilePath,
1432+
event.EmbeddingStatus,
1433+
event.CodegraphStatus,
1434+
nowTime,
1435+
event.ID,
1436+
)
1437+
if err != nil {
1438+
stmt.Close()
1439+
tx.Rollback()
1440+
return fmt.Errorf("[DB] failed to update event %d: %w", event.ID, err)
1441+
}
1442+
}
1443+
stmt.Close()
1444+
1445+
if err = tx.Commit(); err != nil {
1446+
return fmt.Errorf("[DB] failed to commit transaction (batch %d-%d): %w", i+1, end, err)
1447+
}
1448+
1449+
totalUpdated += int64(len(batch))
1450+
r.logger.Info("[DB] Successfully updated batch %d-%d: %d events", i+1, end, len(batch))
1451+
}
1452+
1453+
r.logger.Info("[DB] Successfully batch updated total %d events", totalUpdated)
1454+
return nil
1455+
}
1456+
13401457
// UpdateEvents 批量更新事件嵌入信息
13411458
func (r *eventRepository) UpdateEventsEmbedding(events []*model.Event) error {
13421459
if len(events) == 0 {
13431460
return nil
13441461
}
13451462

1463+
// 获取调用者信息
1464+
caller := getCallerInfo(2)
1465+
1466+
// 写数据库前打印调用者信息
1467+
r.logger.Info("[DB] UpdateEventsEmbedding called by: %s, count: %d", caller, len(events))
1468+
13461469
tx, err := r.db.GetDB().Begin()
13471470
if err != nil {
13481471
return fmt.Errorf("[DB] failed to begin transaction: %w", err)
@@ -1393,6 +1516,12 @@ func (r *eventRepository) UpdateEventsEmbeddingStatus(eventIDs []int64, status i
13931516
return nil
13941517
}
13951518

1519+
// 获取调用者信息
1520+
caller := getCallerInfo(2)
1521+
1522+
// 写数据库前打印调用者信息
1523+
r.logger.Info("[DB] UpdateEventsEmbeddingStatus called by: %s, count: %d, status: %d", caller, len(eventIDs), status)
1524+
13961525
tx, err := r.db.GetDB().Begin()
13971526
if err != nil {
13981527
return fmt.Errorf("[DB] failed to begin transaction: %w", err)

internal/service/codegraph_processor_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ func TestCodegraphProcessor_ProcessDeleteFileEvent(t *testing.T) {
416416
mockLogger := &mocks.MockLogger{}
417417
mockWorkspaceReader := mocks.NewMockWorkspaceReader(ctrl)
418418
mockIndexer := mocks.NewMockIndexer(ctrl)
419+
// mockgen -source=internal/repository/event.go -destination=test/mocks/mock_event_repository.go -package=mocks
419420
mockEventRepo := mocks.NewMockEventRepository(ctrl)
420421

421422
// 创建测试实例

internal/service/file_scanner.go

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ func (ws *fileScanService) DetectFileChanges(workspacePath string) ([]*model.Eve
125125

126126
// 生成事件并进行去重处理
127127
var events []*model.Event
128+
var eventsToCreate []*model.Event // 需要批量创建的事件(新文件 + building时需创建的)
129+
var eventsToUpdate []*model.Event // 需要批量更新的事件
130+
128131
for _, change := range changes {
129132
filePth := change.Path
130133
event := &model.Event{
@@ -139,21 +142,62 @@ func (ws *fileScanService) DetectFileChanges(workspacePath string) ([]*model.Eve
139142

140143
// 检查是否已存在相同路径的事件
141144
if existingEvent, exists := eventPathMap[filePth]; exists {
142-
// 更新现有事件
143-
err := ws.updateExistingEvent(existingEvent, event)
144-
if err != nil {
145-
ws.logger.Error("failed to update existing event for path %s: %v", filePth, err)
146-
continue
145+
// 分类处理现有事件
146+
action := ws.classifyExistingEventAction(existingEvent, event)
147+
switch action {
148+
case "skip":
149+
// 正在 building 且类型相同,跳过
150+
events = append(events, existingEvent)
151+
case "create":
152+
// 正在 building 但类型不同,需要创建新事件
153+
eventsToCreate = append(eventsToCreate, event)
154+
case "update":
155+
// 更新现有事件的状态
156+
existingEvent.EventType = event.EventType
157+
existingEvent.TargetFilePath = event.TargetFilePath
158+
existingEvent.EmbeddingStatus = model.EmbeddingStatusInit
159+
existingEvent.CodegraphStatus = model.CodegraphStatusSuccess
160+
eventsToUpdate = append(eventsToUpdate, existingEvent)
161+
events = append(events, existingEvent)
147162
}
148-
events = append(events, existingEvent)
149163
} else {
150-
// 创建新事件
151-
err := ws.eventRepo.CreateEvent(event)
152-
if err != nil {
153-
ws.logger.Error("failed to create event for path %s: %v", filePth, err)
154-
continue
164+
// 新文件,收集后批量创建
165+
eventsToCreate = append(eventsToCreate, event)
166+
}
167+
}
168+
169+
// 批量创建事件(减少 fsync 次数,提升性能)
170+
if len(eventsToCreate) > 0 {
171+
err := ws.eventRepo.BatchCreateEvents(eventsToCreate)
172+
if err != nil {
173+
ws.logger.Error("failed to batch create events: %v", err)
174+
// 降级处理:逐条创建
175+
for _, event := range eventsToCreate {
176+
if createErr := ws.eventRepo.CreateEvent(event); createErr != nil {
177+
ws.logger.Error("failed to create event for path %s: %v", event.SourceFilePath, createErr)
178+
continue
179+
}
180+
events = append(events, event)
155181
}
156-
events = append(events, event)
182+
} else {
183+
events = append(events, eventsToCreate...)
184+
ws.logger.Info("batch created %d events for workspace: %s", len(eventsToCreate), workspacePath)
185+
}
186+
}
187+
188+
// 批量更新现有事件
189+
if len(eventsToUpdate) > 0 {
190+
err := ws.eventRepo.BatchUpdateEvents(eventsToUpdate)
191+
if err != nil {
192+
ws.logger.Error("failed to batch update events: %v", err)
193+
// 降级处理:逐条更新
194+
for _, event := range eventsToUpdate {
195+
if updateErr := ws.eventRepo.UpdateEvent(event); updateErr != nil {
196+
ws.logger.Error("failed to update event for path %s: %v", event.SourceFilePath, updateErr)
197+
}
198+
}
199+
} else {
200+
ws.logger.Info("batch updated %d existing events for workspace: %s", len(eventsToUpdate), workspacePath)
157201
}
158202
}
159203

@@ -251,14 +295,27 @@ func (ws *fileScanService) handleEventsWithoutDeduplication(changes []*utils.Fil
251295
EmbeddingStatus: model.EmbeddingStatusInit,
252296
CodegraphStatus: model.CodegraphStatusSuccess,
253297
}
298+
events = append(events, event)
299+
}
254300

255-
err := ws.eventRepo.CreateEvent(event)
301+
// 批量创建事件(减少 fsync 次数,提升性能)
302+
if len(events) > 0 {
303+
err := ws.eventRepo.BatchCreateEvents(events)
256304
if err != nil {
257-
ws.logger.Error("failed to create event for path %s: %v", filePth, err)
258-
continue
305+
ws.logger.Error("failed to batch create events: %v", err)
306+
// 降级处理:逐条创建
307+
var createdEvents []*model.Event
308+
for _, event := range events {
309+
if createErr := ws.eventRepo.CreateEvent(event); createErr != nil {
310+
ws.logger.Error("failed to create event for path %s: %v", event.SourceFilePath, createErr)
311+
continue
312+
}
313+
createdEvents = append(createdEvents, event)
314+
}
315+
events = createdEvents
316+
} else {
317+
ws.logger.Info("batch created %d events for workspace: %s", len(events), workspacePath)
259318
}
260-
261-
events = append(events, event)
262319
}
263320

264321
// 查询 open_workspace 事件并更新状态为完成
@@ -267,7 +324,24 @@ func (ws *fileScanService) handleEventsWithoutDeduplication(changes []*utils.Fil
267324
return events, nil
268325
}
269326

270-
// updateExistingEvent 更新现有事件的信息
327+
// classifyExistingEventAction 判断现有事件应该执行的操作
328+
// 返回值: "skip" - 跳过, "create" - 创建新事件, "update" - 更新现有事件
329+
func (ws *fileScanService) classifyExistingEventAction(existingEvent, newEvent *model.Event) string {
330+
if existingEvent.EmbeddingStatus == model.EmbeddingStatusBuilding ||
331+
existingEvent.EmbeddingStatus == model.EmbeddingStatusUploading ||
332+
existingEvent.CodegraphStatus == model.CodegraphStatusBuilding {
333+
if newEvent.EventType == existingEvent.EventType {
334+
return "skip"
335+
}
336+
ws.logger.Debug("building event, will create new event for path: %s, type: %s", existingEvent.SourceFilePath, newEvent.EventType)
337+
return "create"
338+
}
339+
340+
ws.logger.Debug("will update existing event for path: %s, type: %s", existingEvent.SourceFilePath, newEvent.EventType)
341+
return "update"
342+
}
343+
344+
// updateExistingEvent 更新现有事件的信息(保留用于降级处理)
271345
func (ws *fileScanService) updateExistingEvent(existingEvent, newEvent *model.Event) error {
272346
if existingEvent.EmbeddingStatus == model.EmbeddingStatusBuilding ||
273347
existingEvent.EmbeddingStatus == model.EmbeddingStatusUploading ||

test/mocks/mock_event_repository.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)