Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/enums/v1/task.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions api/enums/v1/task.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

499 changes: 288 additions & 211 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ config as the other services.`,
false,
`EnableActivityEagerExecution indicates if activity eager execution is enabled per namespace`,
)
EnableActivityCancellationNexusTask = NewGlobalBoolSetting(
"system.enableActivityCancellationNexusTask",
false,
`EnableActivityCancellationNexusTask enables pushing activity cancellation to workers via Nexus task`,
)
NamespaceMinRetentionGlobal = NewGlobalDurationSetting(
"system.namespaceMinRetentionGlobal",
24*time.Hour,
Expand Down
38 changes: 38 additions & 0 deletions common/persistence/serialization/task_serializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func serializeTransferTask(
transferTask = transferDeleteExecutionTaskToProto(task)
case *tasks.ChasmTask:
transferTask = transferChasmTaskToProto(task)
case *tasks.CancelActivityNexusTask:
transferTask = transferCancelActivityNexusTaskToProto(task)
default:
return nil, serviceerror.NewInternalf("Unknown transfer task type: %v", task)
}
Expand Down Expand Up @@ -86,6 +88,8 @@ func deserializeTransferTask(
task = transferDeleteExecutionTaskFromProto(transferTask)
case enumsspb.TASK_TYPE_CHASM:
task = transferChasmTaskFromProto(transferTask)
case enumsspb.TASK_TYPE_TRANSFER_CANCEL_ACTIVITY_NEXUS:
task = transferCancelActivityNexusTaskFromProto(transferTask)
default:
return nil, serviceerror.NewInternalf("Unknown transfer task type: %v", transferTask.TaskType)
}
Expand All @@ -106,6 +110,40 @@ func transferChasmTaskFromProto(task *persistencespb.TransferTaskInfo) tasks.Tas
}
}

func transferCancelActivityNexusTaskToProto(task *tasks.CancelActivityNexusTask) *persistencespb.TransferTaskInfo {
return &persistencespb.TransferTaskInfo{
NamespaceId: task.NamespaceID,
WorkflowId: task.WorkflowID,
RunId: task.RunID,
TaskId: task.TaskID,
TaskType: task.GetType(),
Version: task.Version,
VisibilityTime: timestamppb.New(task.VisibilityTimestamp),
TaskDetails: &persistencespb.TransferTaskInfo_CancelActivityNexusTaskDetails_{
CancelActivityNexusTaskDetails: &persistencespb.TransferTaskInfo_CancelActivityNexusTaskDetails{
ScheduledEventIds: task.ScheduledEventIDs,
WorkerInstanceKey: task.WorkerInstanceKey,
},
},
}
}

func transferCancelActivityNexusTaskFromProto(task *persistencespb.TransferTaskInfo) tasks.Task {
details := task.GetCancelActivityNexusTaskDetails()
return &tasks.CancelActivityNexusTask{
WorkflowKey: definition.NewWorkflowKey(
task.NamespaceId,
task.WorkflowId,
task.RunId,
),
VisibilityTimestamp: task.VisibilityTime.AsTime(),
TaskID: task.TaskId,
Version: task.Version,
ScheduledEventIDs: details.GetScheduledEventIds(),
WorkerInstanceKey: details.GetWorkerInstanceKey(),
}
}

func serializeTimerTask(
encoder Encoder,
task tasks.Task,
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/serialization/task_serializers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ func (s *taskSerializerSuite) TestTransferResetTask() {
s.assertEqualTasks(resetTask)
}

func (s *taskSerializerSuite) TestTransferCancelActivityNexusTask() {
cancelActivityNexusTask := &tasks.CancelActivityNexusTask{
WorkflowKey: s.workflowKey,
VisibilityTimestamp: time.Unix(0, rand.Int63()).UTC(),
TaskID: rand.Int63(),
Version: rand.Int63(),
ScheduledEventIDs: []int64{rand.Int63(), rand.Int63(), rand.Int63()},
WorkerInstanceKey: "test-worker-instance-key",
}

s.assertEqualTasks(cancelActivityNexusTask)
}

func (s *taskSerializerSuite) TestTimerWorkflowTask() {
workflowTaskTimer := &tasks.WorkflowTaskTimeoutTask{
WorkflowKey: s.workflowKey,
Expand Down
3 changes: 3 additions & 0 deletions proto/internal/temporal/server/api/enums/v1/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ enum TaskType {

// A task with side effects generated by a CHASM component.
TASK_TYPE_CHASM = 33;

// A task to cancel a running activity via Nexus control queue.
TASK_TYPE_TRANSFER_CANCEL_ACTIVITY_NEXUS = 34;
}

// TaskPriority is only used for replication task as of May 2024
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,21 @@ message TransferTaskInfo {
// by some other task, so this task doesn't need to worry about it.
bool can_skip_visibility_archival = 1;
}

// Details for a Nexus task that cancels activities belonging to a specific worker.
message CancelActivityNexusTaskDetails {
// Scheduled event IDs of activities to cancel.
repeated int64 scheduled_event_ids = 1;
string worker_instance_key = 2;
}

oneof task_details {
CloseExecutionTaskDetails close_execution_task_details = 16;

// If the task addresses a CHASM component, this field will be set.
ChasmTaskInfo chasm_task_info = 18;

CancelActivityNexusTaskDetails cancel_activity_nexus_task_details = 19;
}
// Stamp represents the "version" of the entity's internal state for which the transfer task was created.
// It increases monotonically when the entity's options are modified.
Expand Down
24 changes: 12 additions & 12 deletions service/history/api/respondactivitytaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ func Invoke(
// we need to force complete an activity
fabricateStartedEvent = ai.StartedEventId == common.EmptyEventID
if fabricateStartedEvent {
_, err := mutableState.AddActivityTaskStartedEvent(
ai,
scheduledEventID,
"",
req.GetCompleteRequest().GetIdentity(),
nil,
nil,
// TODO (shahab): do we need to do anything with wf redirect in this case or any
// other case where an activity starts?
nil,
"", // workerInstanceKey not available for force complete
)
_, err := mutableState.AddActivityTaskStartedEvent(
ai,
scheduledEventID,
"",
req.GetCompleteRequest().GetIdentity(),
nil,
nil,
// TODO (shahab): do we need to do anything with wf redirect in this case or any
// other case where an activity starts?
nil,
"", // workerInstanceKey not available for force complete
)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/configs"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/history/workflow/update"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -658,6 +659,13 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity(
return nil, err
}
handler.activityNotStartedCancelled = true
} else if ai.WorkerInstanceKey != "" && handler.config.EnableActivityCancellationNexusTask() {
// Activity has started and worker supports Nexus tasks - create cancel task.
handler.mutableState.AddTasks(&tasks.CancelActivityNexusTask{
WorkflowKey: handler.mutableState.GetWorkflowKey(),
ScheduledEventIDs: []int64{ai.ScheduledEventId},
WorkerInstanceKey: ai.WorkerInstanceKey,
})
}
}
return actCancelReqEvent, nil
Expand Down
22 changes: 12 additions & 10 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,12 @@ type Config struct {
ESProcessorFlushInterval dynamicconfig.DurationPropertyFn
ESProcessorAckTimeout dynamicconfig.DurationPropertyFn

EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
EnableActivityEagerExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableActivityRetryStampIncrement dynamicconfig.BoolPropertyFn
EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
EnableActivityEagerExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableActivityRetryStampIncrement dynamicconfig.BoolPropertyFn
EnableActivityCancellationNexusTask dynamicconfig.BoolPropertyFn
EnableEagerWorkflowStart dynamicconfig.BoolPropertyFnWithNamespaceFilter
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn

// ArchivalQueueProcessor settings
ArchivalProcessorSchedulerWorkerCount dynamicconfig.TypedSubscribable[int]
Expand Down Expand Up @@ -721,11 +722,12 @@ func NewConfig(
ESProcessorFlushInterval: dynamicconfig.WorkerESProcessorFlushInterval.Get(dc),
ESProcessorAckTimeout: dynamicconfig.WorkerESProcessorAckTimeout.Get(dc),

EnableCrossNamespaceCommands: dynamicconfig.EnableCrossNamespaceCommands.Get(dc),
EnableActivityEagerExecution: dynamicconfig.EnableActivityEagerExecution.Get(dc),
EnableActivityRetryStampIncrement: dynamicconfig.EnableActivityRetryStampIncrement.Get(dc),
EnableEagerWorkflowStart: dynamicconfig.EnableEagerWorkflowStart.Get(dc),
NamespaceCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(dc),
EnableCrossNamespaceCommands: dynamicconfig.EnableCrossNamespaceCommands.Get(dc),
EnableActivityEagerExecution: dynamicconfig.EnableActivityEagerExecution.Get(dc),
EnableActivityRetryStampIncrement: dynamicconfig.EnableActivityRetryStampIncrement.Get(dc),
EnableActivityCancellationNexusTask: dynamicconfig.EnableActivityCancellationNexusTask.Get(dc),
EnableEagerWorkflowStart: dynamicconfig.EnableEagerWorkflowStart.Get(dc),
NamespaceCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(dc),

// Archival related
ArchivalTaskBatchSize: dynamicconfig.ArchivalTaskBatchSize.Get(dc),
Expand Down
71 changes: 71 additions & 0 deletions service/history/tasks/cancel_activity_nexus_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package tasks

import (
"fmt"
"time"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/definition"
)

var _ Task = (*CancelActivityNexusTask)(nil)

type (
CancelActivityNexusTask struct {
definition.WorkflowKey
VisibilityTimestamp time.Time
TaskID int64
Version int64

// ScheduledEventIDs of activities to cancel (batched by worker).
ScheduledEventIDs []int64
WorkerInstanceKey string
}
)

func (t *CancelActivityNexusTask) GetKey() Key {
return NewImmediateKey(t.TaskID)
}

func (t *CancelActivityNexusTask) GetVersion() int64 {
return t.Version
}

func (t *CancelActivityNexusTask) SetVersion(version int64) {
t.Version = version
}

func (t *CancelActivityNexusTask) GetTaskID() int64 {
return t.TaskID
}

func (t *CancelActivityNexusTask) SetTaskID(id int64) {
t.TaskID = id
}

func (t *CancelActivityNexusTask) GetVisibilityTime() time.Time {
return t.VisibilityTimestamp
}

func (t *CancelActivityNexusTask) SetVisibilityTime(timestamp time.Time) {
t.VisibilityTimestamp = timestamp
}

func (t *CancelActivityNexusTask) GetCategory() Category {
return CategoryTransfer
}

func (t *CancelActivityNexusTask) GetType() enumsspb.TaskType {
return enumsspb.TASK_TYPE_TRANSFER_CANCEL_ACTIVITY_NEXUS
}

func (t *CancelActivityNexusTask) String() string {
return fmt.Sprintf("CancelActivityNexusTask{WorkflowKey: %s, VisibilityTimestamp: %v, TaskID: %v, ScheduledEventIDs: %v, WorkerInstanceKey: %v, Version: %v}",
t.WorkflowKey.String(),
t.VisibilityTimestamp,
t.TaskID,
t.ScheduledEventIDs,
t.WorkerInstanceKey,
t.Version,
)
}
6 changes: 6 additions & 0 deletions service/history/tasks/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func GetTransferTaskEventID(
eventID = common.FirstEventID
case *ChasmTask:
return getChasmTaskEventID()
case *CancelActivityNexusTask:
if len(task.ScheduledEventIDs) > 0 {
eventID = task.ScheduledEventIDs[0]
} else {
eventID = common.FirstEventID
}
case *FakeTask:
// no-op
default:
Expand Down
3 changes: 3 additions & 0 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func (t *transferQueueActiveTaskExecutor) Execute(
err = t.processDeleteExecutionTask(ctx, task)
case *tasks.ChasmTask:
err = t.executeChasmSideEffectTransferTask(ctx, task)
case *tasks.CancelActivityNexusTask:
// TODO: Implement dispatch to worker control queue
err = nil
default:
err = errUnknownTransferTask
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/transfer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (t *transferQueueStandbyTaskExecutor) Execute(
err = t.processDeleteExecutionTask(ctx, task, false)
case *tasks.ChasmTask:
err = t.executeChasmSideEffectTransferTask(ctx, task)
case *tasks.CancelActivityNexusTask:
// Cancel activity nexus task is best-effort and only processed in active cluster
err = nil
default:
err = errUnknownTransferTask
}
Expand Down
Loading