From a9485cd801a48927ac9267d4ebfa565da4e01ebb Mon Sep 17 00:00:00 2001 From: Andy Carlson <2yinyang2@gmail.com> Date: Fri, 9 Jan 2026 11:35:00 +0800 Subject: [PATCH 1/3] handle task cancellation request --- internal/types/messages.go | 9 +++++---- internal/worker/worker.go | 23 ++++++++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/internal/types/messages.go b/internal/types/messages.go index dc3c97f..82ce4a7 100644 --- a/internal/types/messages.go +++ b/internal/types/messages.go @@ -6,10 +6,11 @@ import "encoding/json" type MessageType string const ( - MessageTypeTaskAssignment MessageType = "task_assignment" - MessageTypeTaskClaimed MessageType = "task_claimed" - MessageTypeTaskFailed MessageType = "task_failed" - MessageTypeHeartbeat MessageType = "heartbeat" + MessageTypeTaskAssignment MessageType = "task_assignment" + MessageTypeTaskClaimed MessageType = "task_claimed" + MessageTypeTaskFailed MessageType = "task_failed" + MessageTypeTaskCancellation MessageType = "task_cancellation" + MessageTypeHeartbeat MessageType = "heartbeat" ) // WebSocketMessage is the base structure for all WebSocket messages diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 5c727ff..7f909f1 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -273,7 +273,6 @@ func (w *Worker) handleMessage(message []byte) { return } - // Currently there is only one message type, but we anticipate needing more in the future. switch msg.Type { case types.MessageTypeTaskAssignment: var assignment types.TaskAssignmentMessage @@ -283,6 +282,14 @@ func (w *Worker) handleMessage(message []byte) { } w.handleTaskAssignment(&assignment) + case types.MessageTypeTaskCancellation: + var cancellation types.TaskCancellationMessage + if err := json.Unmarshal(msg.Data, &cancellation); err != nil { + log.Errorf(w.ctx, "Failed to unmarshal task cancellation: %v", err) + return + } + w.handleTaskCancellation(cancellation.TaskID) + default: log.Warnf(w.ctx, "Unknown message type: %s", msg.Type) } @@ -305,6 +312,20 @@ func (w *Worker) handleTaskAssignment(assignment *types.TaskAssignmentMessage) { go w.executeTask(taskCtx, assignment) } +func (w *Worker) handleTaskCancellation(taskID string) { + w.tasksMutex.Lock() + cancelFunc, exists := w.activeTasks[taskID] + w.tasksMutex.Unlock() + + if !exists { + log.Warnf(w.ctx, "Cannot cancel task %s: task not found or already completed", taskID) + return + } + + log.Infof(w.ctx, "Cancelling task: %s", taskID) + cancelFunc() +} + func (w *Worker) executeTask(ctx context.Context, assignment *types.TaskAssignmentMessage) { defer func() { w.tasksMutex.Lock() From 682896f2d29be4faab956f27304e8763a4c97451 Mon Sep 17 00:00:00 2001 From: Andy Carlson <2yinyang2@gmail.com> Date: Fri, 9 Jan 2026 11:36:32 +0800 Subject: [PATCH 2/3] re-add accidentally deleted type --- internal/types/messages.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/types/messages.go b/internal/types/messages.go index 82ce4a7..1b7d63a 100644 --- a/internal/types/messages.go +++ b/internal/types/messages.go @@ -40,3 +40,8 @@ type TaskFailedMessage struct { TaskID string `json:"task_id"` Message string `json:"message"` } + +// TaskCancellationMessage is sent from server to worker to cancel a running task +type TaskCancellationMessage struct { + TaskID string `json:"task_id"` +} From 40894ea7033fcb54aa52531e346b8b15c7070087 Mon Sep 17 00:00:00 2001 From: Ben Navetta Date: Sun, 22 Feb 2026 10:46:18 +0000 Subject: [PATCH 3/3] Fix task cancellation: proper cleanup and error handling - Differentiate task cancellation from genuine failures in executeTask; cancelled tasks no longer send a TaskFailed message to the server - Use background context for container cleanup (remove) so it succeeds even when the task context is already cancelled - Explicitly stop the running container when the task context is cancelled during ContainerWait, using a background context with timeout Co-Authored-By: Oz --- internal/worker/worker.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 7f909f1..0a3bef0 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -337,6 +337,11 @@ func (w *Worker) executeTask(ctx context.Context, assignment *types.TaskAssignme log.Infof(ctx, "Starting task execution: taskID=%s, title=%s", taskID, assignment.Task.Title) if err := w.executeTaskInDocker(ctx, assignment); err != nil { + // Don't report cancellation as a failure — it's an intentional action. + if ctx.Err() != nil { + log.Infof(w.ctx, "Task was cancelled: taskID=%s", taskID) + return + } log.Errorf(ctx, "Task launch failed: taskID=%s, error=%v", taskID, err) if statusErr := w.sendTaskFailed(taskID, fmt.Sprintf("Failed to launch task: %v", err)); statusErr != nil { log.Errorf(ctx, "Failed to send task failed message: %v", statusErr) @@ -523,8 +528,11 @@ func (w *Worker) executeTaskInDocker(ctx context.Context, assignment *types.Task defer func() { if containerID != "" { - if removeErr := dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}); removeErr != nil { - log.Debugf(ctx, "Container %s already removed or removal failed: %v", containerID, removeErr) + // Use a background context for cleanup so it succeeds even if the task context was cancelled. + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cleanupCancel() + if removeErr := dockerClient.ContainerRemove(cleanupCtx, containerID, container.RemoveOptions{Force: true}); removeErr != nil { + log.Debugf(w.ctx, "Container %s already removed or removal failed: %v", containerID, removeErr) } } }() @@ -537,6 +545,14 @@ func (w *Worker) executeTaskInDocker(ctx context.Context, assignment *types.Task statusCh, errCh := dockerClient.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) select { + case <-ctx.Done(): + log.Infof(w.ctx, "Task context cancelled, stopping container %s", containerID) + stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer stopCancel() + if stopErr := dockerClient.ContainerStop(stopCtx, containerID, container.StopOptions{}); stopErr != nil { + log.Warnf(w.ctx, "Failed to stop container %s: %v", containerID, stopErr) + } + return ctx.Err() case err := <-errCh: if err != nil { return fmt.Errorf("error waiting for container: %w", err)