Skip to content

Commit 83ed42d

Browse files
committed
feat: enhance error handling and improve upload service logic
1 parent e420e0d commit 83ed42d

File tree

9 files changed

+216
-80
lines changed

9 files changed

+216
-80
lines changed

cmd/main.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,6 @@ func main() {
6060
// grpcServer := flag.String("grpc", "localhost:51353", "gRPC server address")
6161
httpServer := flag.String("http", "localhost:11380", "HTTP server address")
6262
logLevel := flag.String("loglevel", "info", "log level (debug, info, warn, error)")
63-
// clientId := flag.String("clientid", "", "client id")
64-
// serverEndpoint := flag.String("server", "", "server endpoint")
65-
// token := flag.String("token", "", "authentication token")
6663
enableSwagger := flag.Bool("swagger", false, "enable swagger documentation")
6764
enablePprof := flag.Bool("pprof", false, "enable pprof profiling")
6865
pprofAddr := flag.String("pprof-addr", "localhost:6060", "pprof server address")

internal/daemon/daemon.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -110,28 +110,28 @@ func (d *Daemon) Start() {
110110
}()
111111

112112
// Start fetch server hash tree task
113-
d.wg.Add(1)
114-
go func() {
115-
ticker := time.NewTicker(1 * time.Hour)
116-
defer ticker.Stop()
117-
118-
defer func() {
119-
if r := recover(); r != nil {
120-
d.logger.Error("fetch server hash task panic recovered: %v", r)
121-
}
122-
d.wg.Done()
123-
}()
124-
125-
for {
126-
select {
127-
case <-d.ctx.Done():
128-
d.logger.Info("fetch server hash task stopped")
129-
return
130-
case <-ticker.C:
131-
d.fetchServerHashTree()
132-
}
133-
}
134-
}()
113+
// d.wg.Add(1)
114+
// go func() {
115+
// ticker := time.NewTicker(1 * time.Hour)
116+
// defer ticker.Stop()
117+
118+
// defer func() {
119+
// if r := recover(); r != nil {
120+
// d.logger.Error("fetch server hash task panic recovered: %v", r)
121+
// }
122+
// d.wg.Done()
123+
// }()
124+
125+
// for {
126+
// select {
127+
// case <-d.ctx.Done():
128+
// d.logger.Info("fetch server hash task stopped")
129+
// return
130+
// case <-ticker.C:
131+
// d.fetchServerHashTree()
132+
// }
133+
// }
134+
// }()
135135

136136
// 启动文件扫描任务(5分钟间隔)
137137
d.wg.Add(1)

internal/errs/extension.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package errs
22

33
const (
4-
ErrAuthenticationFailed = "codebase-indexer.authentication_failed"
5-
ErrEmbeddingFailed = "codebase-indexer.embedding_failed"
4+
ErrNetworkConnectionError = "codebase-indexer.network_connection_error"
5+
ErrAuthenticationFailed = "codebase-indexer.authentication_failed"
6+
ErrInternalServerError = "codebase-indexer.internal_server_error"
7+
ErrFileEmbeddingFailed = "codebase-indexer.file_embedding_failed"
68
)

internal/service/embedding_processor.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strings"
77
"time"
88

9+
"codebase-indexer/internal/errs"
910
"codebase-indexer/internal/model"
1011
"codebase-indexer/internal/repository"
1112
"codebase-indexer/internal/utils"
@@ -89,6 +90,7 @@ func (ep *embeddingProcessService) ProcessAddFileEvent(ctx context.Context, even
8990
if updateErr != nil {
9091
return nil, fmt.Errorf("failed to update event status to uploadFailed: %w", updateErr)
9192
}
93+
ep.uploadFilePathFailed(event, err)
9294
return nil, fmt.Errorf("failed to upload add file %s: %w", event.SourceFilePath, err)
9395
}
9496

@@ -126,6 +128,7 @@ func (ep *embeddingProcessService) ProcessModifyFileEvent(ctx context.Context, e
126128
if updateErr != nil {
127129
return nil, fmt.Errorf("failed to update event status to upload failed: %w", updateErr)
128130
}
131+
ep.uploadFilePathFailed(event, err)
129132
return nil, fmt.Errorf("failed to upload modified file %s: %w", event.SourceFilePath, err)
130133
}
131134

@@ -163,6 +166,7 @@ func (ep *embeddingProcessService) ProcessDeleteFileEvent(ctx context.Context, e
163166
if updateErr != nil {
164167
return nil, fmt.Errorf("failed to update event status to upload failed: %w", updateErr)
165168
}
169+
ep.uploadFilePathFailed(event, err)
166170
return nil, fmt.Errorf("failed to upload delete file %s: %w", event.SourceFilePath, err)
167171
}
168172

@@ -200,6 +204,7 @@ func (ep *embeddingProcessService) ProcessRenameFileEvent(ctx context.Context, e
200204
if updateErr != nil {
201205
return nil, fmt.Errorf("failed to update event status to upload failed: %w", updateErr)
202206
}
207+
ep.uploadFilePathFailed(event, err)
203208
return nil, fmt.Errorf("failed to upload renamed file %s->%s: %w", event.SourceFilePath, event.TargetFilePath, err)
204209
}
205210

@@ -341,6 +346,71 @@ func (ep *embeddingProcessService) ProcessEmbeddingEvents(ctx context.Context, w
341346
return nil
342347
}
343348

349+
func (ep *embeddingProcessService) uploadFilePathFailed(event *model.Event, uploadErr error) error {
350+
filePath := event.SourceFilePath
351+
if event.EventType == model.EventTypeRenameFile {
352+
filePath = event.TargetFilePath
353+
}
354+
codebaseId := utils.GenerateCodebaseEmbeddingID(event.WorkspacePath)
355+
codebaseEmbeddingConfig, err := ep.codebaseEmbeddingRepo.GetCodebaseEmbeddingConfig(codebaseId)
356+
if err != nil {
357+
return fmt.Errorf("failed to get codebase embedding config for workspace %s: %w", event.WorkspacePath, err)
358+
}
359+
if codebaseEmbeddingConfig.HashTree == nil {
360+
codebaseEmbeddingConfig.HashTree = make(map[string]string)
361+
}
362+
if codebaseEmbeddingConfig.FailedFiles == nil {
363+
codebaseEmbeddingConfig.FailedFiles = make(map[string]string)
364+
}
365+
if codebaseEmbeddingConfig.SyncFiles == nil {
366+
codebaseEmbeddingConfig.SyncFiles = make(map[string]string)
367+
}
368+
delete(codebaseEmbeddingConfig.HashTree, filePath)
369+
delete(codebaseEmbeddingConfig.SyncFiles, filePath)
370+
if utils.IsUnauthorizedError(uploadErr) {
371+
codebaseEmbeddingConfig.FailedFiles[filePath] = errs.ErrAuthenticationFailed
372+
} else if utils.IsTooManyRequestsError(uploadErr) {
373+
codebaseEmbeddingConfig.FailedFiles[filePath] = errs.ErrInternalServerError
374+
} else if utils.IsServiceUnavailableError(uploadErr) {
375+
codebaseEmbeddingConfig.FailedFiles[filePath] = errs.ErrInternalServerError
376+
} else {
377+
codebaseEmbeddingConfig.FailedFiles[filePath] = errs.ErrFileEmbeddingFailed
378+
}
379+
// 保存 codebase embedding 配置
380+
err = ep.codebaseEmbeddingRepo.SaveCodebaseEmbeddingConfig(codebaseEmbeddingConfig)
381+
if err != nil {
382+
ep.logger.Error("failed to save codebase embedding config for workspace %s: %v", event.WorkspacePath, err)
383+
return fmt.Errorf("failed to save codebase embedding config: %w", err)
384+
}
385+
386+
embeddingFileNum := len(codebaseEmbeddingConfig.HashTree)
387+
var embeddingFailedFilePaths string
388+
var embeddingMessage string
389+
embeddingFaildFiles := codebaseEmbeddingConfig.FailedFiles
390+
failedKeys := make([]string, 0, len(embeddingFaildFiles))
391+
for k, v := range embeddingFaildFiles {
392+
failedKeys = append(failedKeys, k)
393+
embeddingMessage = v
394+
if len(failedKeys) > 5 {
395+
break
396+
}
397+
}
398+
if len(failedKeys) == 0 {
399+
embeddingFailedFilePaths = ""
400+
embeddingMessage = ""
401+
} else if len(failedKeys) > 5 {
402+
embeddingFailedFilePaths = strings.Join(failedKeys[:5], ",")
403+
} else {
404+
embeddingFailedFilePaths = strings.Join(failedKeys, ",")
405+
}
406+
407+
err = ep.workspaceRepo.UpdateEmbeddingInfo(event.WorkspacePath, embeddingFileNum, time.Now().Unix(), embeddingMessage, embeddingFailedFilePaths)
408+
if err != nil {
409+
return fmt.Errorf("failed to update workspace: %w", err)
410+
}
411+
return nil
412+
}
413+
344414
// CleanWorkspaceFilePath 删除 workspace 中指定文件的 filepath 记录
345415
func (ep *embeddingProcessService) CleanWorkspaceFilePath(ctx context.Context, fileStatus *utils.FileStatus, event *model.Event) error {
346416
ep.logger.Info("cleaning workspace filepath for event: %s, workspace: %s", event.SourceFilePath, event.WorkspacePath)
@@ -446,7 +516,7 @@ func (ep *embeddingProcessService) CleanWorkspaceFilePath(ctx context.Context, f
446516
embeddingFailedFilePaths = strings.Join(failedKeys, ",")
447517
}
448518

449-
err = ep.workspaceRepo.UpdateEmbeddingInfo(event.WorkspacePath, embeddingFileNum, time.Now().Unix(), embeddingFailedFilePaths, embeddingMessage)
519+
err = ep.workspaceRepo.UpdateEmbeddingInfo(event.WorkspacePath, embeddingFileNum, time.Now().Unix(), embeddingMessage, embeddingFailedFilePaths)
450520
if err != nil {
451521
ep.logger.Error("failed to update workspace file num: %v", err)
452522
return fmt.Errorf("failed to update workspace file num: %w", err)

internal/service/embedding_status.go

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,17 @@ func (sc *embeddingStatusService) checkWorkspaceBuildingStates(workspacePath str
107107
sc.logger.Info("found %d building events for workspace: %s", len(events), workspacePath)
108108

109109
// 检查每个event的构建状态
110+
nowTime := time.Now()
110111
for _, event := range events {
112+
if nowTime.Sub(event.UpdatedAt) > time.Minute*3 {
113+
updateEvent := &model.Event{ID: event.ID, EmbeddingStatus: model.EmbeddingStatusBuildFailed}
114+
err := sc.eventRepo.UpdateEvent(updateEvent)
115+
if err != nil {
116+
sc.logger.Error("failed to update event status: %v", err)
117+
}
118+
sc.buildFilePathFailed(event)
119+
continue
120+
}
111121
err := sc.checkEventBuildStatus(workspacePath, event)
112122
if err != nil {
113123
sc.logger.Error("failed to check event build status: %v", err)
@@ -136,14 +146,15 @@ func (sc *embeddingStatusService) checkWorkspaceUploadingStates(workspacePath st
136146
// 检查每个event的上传状态
137147
nowTime := time.Now()
138148
for _, event := range events {
139-
if nowTime.Sub(event.UpdatedAt) < time.Minute*5 {
149+
if nowTime.Sub(event.UpdatedAt) < time.Minute*3 {
140150
continue
141151
}
142152
updateEvent := &model.Event{ID: event.ID, EmbeddingStatus: model.EmbeddingStatusUploadFailed}
143153
err := sc.eventRepo.UpdateEvent(updateEvent)
144154
if err != nil {
145155
sc.logger.Error("failed to update event status: %v", err)
146156
}
157+
sc.buildFilePathFailed(event)
147158
}
148159

149160
return nil
@@ -206,6 +217,7 @@ func (sc *embeddingStatusService) checkEventBuildStatus(workspacePath string, ev
206217
if err != nil {
207218
return fmt.Errorf("failed to update event: %w", err)
208219
}
220+
sc.buildFilePathFailed(event)
209221
return nil
210222
}
211223

@@ -226,12 +238,68 @@ func (sc *embeddingStatusService) fetchFileStatus(workspacePath, syncId string)
226238

227239
fileStatusResp, err := sc.syncer.FetchFileStatus(fileStatusReq)
228240
if err != nil {
229-
return nil, fmt.Errorf("failed to fetch file status: %w", err)
241+
return nil, err
230242
}
231243

232244
return fileStatusResp, nil
233245
}
234246

247+
func (sc *embeddingStatusService) buildFilePathFailed(event *model.Event) error {
248+
filePath := event.SourceFilePath
249+
if event.EventType == model.EventTypeRenameFile {
250+
filePath = event.TargetFilePath
251+
}
252+
codebaseId := utils.GenerateCodebaseEmbeddingID(event.WorkspacePath)
253+
codebaseEmbeddingConfig, err := sc.codebaseEmbeddingRepo.GetCodebaseEmbeddingConfig(codebaseId)
254+
if err != nil {
255+
return fmt.Errorf("failed to get codebase embedding config for workspace %s: %w", event.WorkspacePath, err)
256+
}
257+
if codebaseEmbeddingConfig.HashTree == nil {
258+
codebaseEmbeddingConfig.HashTree = make(map[string]string)
259+
}
260+
if codebaseEmbeddingConfig.FailedFiles == nil {
261+
codebaseEmbeddingConfig.FailedFiles = make(map[string]string)
262+
}
263+
if codebaseEmbeddingConfig.SyncFiles == nil {
264+
codebaseEmbeddingConfig.SyncFiles = make(map[string]string)
265+
}
266+
delete(codebaseEmbeddingConfig.SyncFiles, filePath)
267+
codebaseEmbeddingConfig.FailedFiles[filePath] = errs.ErrFileEmbeddingFailed
268+
// 保存 codebase embedding 配置
269+
err = sc.codebaseEmbeddingRepo.SaveCodebaseEmbeddingConfig(codebaseEmbeddingConfig)
270+
if err != nil {
271+
sc.logger.Error("failed to save codebase embedding config for workspace %s: %v", event.WorkspacePath, err)
272+
return fmt.Errorf("failed to save codebase embedding config: %w", err)
273+
}
274+
275+
embeddingFileNum := len(codebaseEmbeddingConfig.HashTree)
276+
var embeddingFailedFilePaths string
277+
var embeddingMessage string
278+
embeddingFaildFiles := codebaseEmbeddingConfig.FailedFiles
279+
failedKeys := make([]string, 0, len(embeddingFaildFiles))
280+
for k, v := range embeddingFaildFiles {
281+
failedKeys = append(failedKeys, k)
282+
embeddingMessage = v
283+
if len(failedKeys) > 5 {
284+
break
285+
}
286+
}
287+
if len(failedKeys) == 0 {
288+
embeddingFailedFilePaths = ""
289+
embeddingMessage = ""
290+
} else if len(failedKeys) > 5 {
291+
embeddingFailedFilePaths = strings.Join(failedKeys[:5], ",")
292+
} else {
293+
embeddingFailedFilePaths = strings.Join(failedKeys, ",")
294+
}
295+
296+
err = sc.workspaceRepo.UpdateEmbeddingInfo(event.WorkspacePath, embeddingFileNum, time.Now().Unix(), embeddingMessage, embeddingFailedFilePaths)
297+
if err != nil {
298+
return fmt.Errorf("failed to update workspace: %w", err)
299+
}
300+
return nil
301+
}
302+
235303
// handleBuildCompletion 处理构建完成后的状态更新
236304
func (sc *embeddingStatusService) handleBuildCompletion(workspacePath string, event *model.Event, fileStatusList []dto.FileStatusRespFileListItem) error {
237305
// 检查所有文件状态是否都成功
@@ -289,7 +357,7 @@ func (sc *embeddingStatusService) handleBuildCompletion(workspacePath string, ev
289357

290358
if status == dto.EmbeddingFailed {
291359
delete(codebaseEmbeddingConfig.SyncFiles, filePath)
292-
codebaseEmbeddingConfig.FailedFiles[filePath] = errs.ErrEmbeddingFailed
360+
codebaseEmbeddingConfig.FailedFiles[filePath] = errs.ErrFileEmbeddingFailed
293361
} else {
294362
delete(codebaseEmbeddingConfig.SyncFiles, filePath)
295363
delete(codebaseEmbeddingConfig.FailedFiles, filePath)
@@ -323,7 +391,7 @@ func (sc *embeddingStatusService) handleBuildCompletion(workspacePath string, ev
323391
embeddingFailedFilePaths = strings.Join(failedKeys, ",")
324392
}
325393

326-
err = sc.workspaceRepo.UpdateEmbeddingInfo(event.WorkspacePath, embeddingFileNum, time.Now().Unix(), embeddingFailedFilePaths, embeddingMessage)
394+
err = sc.workspaceRepo.UpdateEmbeddingInfo(event.WorkspacePath, embeddingFileNum, time.Now().Unix(), embeddingMessage, embeddingFailedFilePaths)
327395
if err != nil {
328396
return fmt.Errorf("failed to update workspace: %w", err)
329397
}

internal/service/extension.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,9 +1112,9 @@ func (s *extensionService) calculateEmbeddingStatus(workspace *model.Workspace)
11121112
// 计算失败文件数
11131113
failedFilePaths := strings.Split(workspace.EmbeddingFailedFilePaths, ",")
11141114
fullFailedfilePaths := make([]string, 0, len(failedFilePaths))
1115-
for i, failedFilePath := range failedFilePaths {
1115+
for _, failedFilePath := range failedFilePaths {
11161116
if failedFilePath != "" {
1117-
fullFailedfilePaths[i] = filepath.Join(workspace.WorkspacePath, failedFilePath)
1117+
fullFailedfilePaths = append(fullFailedfilePaths, filepath.Join(workspace.WorkspacePath, failedFilePath))
11181118
}
11191119
}
11201120
totalFailed := len(fullFailedfilePaths)
@@ -1142,8 +1142,10 @@ func (s *extensionService) calculateEmbeddingStatus(workspace *model.Workspace)
11421142
// 存在初始或进行中状态事件时,状态为 running
11431143
if processingCount > 0 {
11441144
status.Status = dto.ProcessStatusRunning
1145-
} else if failedCount > 0 {
1146-
// 存在失败状态时,判断比较 process 和配置中的百分比阈值
1145+
return status
1146+
}
1147+
// 存在失败状态时,判断比较 process 和配置中的百分比阈值
1148+
if failedCount > 0 {
11471149
clientConfig := config.GetClientConfig()
11481150
embeddingSuccessPercent := clientConfig.Sync.EmbeddingSuccessPercent
11491151
if status.Process < embeddingSuccessPercent {
@@ -1187,9 +1189,9 @@ func (s *extensionService) calculateCodegraphStatus(workspace *model.Workspace)
11871189
// 计算失败文件数
11881190
failedFilePaths := strings.Split(workspace.EmbeddingFailedFilePaths, ",")
11891191
fullFailedfilePaths := make([]string, 0, len(failedFilePaths))
1190-
for i, failedFilePath := range failedFilePaths {
1192+
for _, failedFilePath := range failedFilePaths {
11911193
if failedFilePath != "" {
1192-
fullFailedfilePaths[i] = filepath.Join(workspace.WorkspacePath, failedFilePath)
1194+
fullFailedfilePaths = append(fullFailedfilePaths, filepath.Join(workspace.WorkspacePath, failedFilePath))
11931195
}
11941196
}
11951197
totalFailed := len(fullFailedfilePaths)
@@ -1217,8 +1219,10 @@ func (s *extensionService) calculateCodegraphStatus(workspace *model.Workspace)
12171219
// 存在初始或进行中状态事件时,状态为 running
12181220
if processingCount > 0 {
12191221
status.Status = dto.ProcessStatusRunning
1220-
} else if failedCount > 0 {
1221-
// 存在失败状态时,判断比较 process 和配置中的百分比阈值
1222+
return status
1223+
}
1224+
// 存在失败状态时,判断比较 process 和配置中的百分比阈值
1225+
if failedCount > 0 {
12221226
clientConfig := config.GetClientConfig()
12231227
codegraphSuccessPercent := clientConfig.Sync.CodegraphSuccessPercent
12241228
if status.Process < codegraphSuccessPercent {

0 commit comments

Comments
 (0)