diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index acacd436c..ff78d0470 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -2957,6 +2957,7 @@ type TemporalWorkerDeploymentDescribeVersionCommand struct { Parent *TemporalWorkerDeploymentCommand Command cobra.Command DeploymentVersionOptions + ReportTaskQueueStats bool } func NewTemporalWorkerDeploymentDescribeVersionCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentDescribeVersionCommand { @@ -2971,6 +2972,7 @@ func NewTemporalWorkerDeploymentDescribeVersionCommand(cctx *CommandContext, par s.Command.Long = "```\n+---------------------------------------------------------------------+\n| CAUTION: Worker Deployment is experimental. Deployment commands are |\n| subject to change. |\n+---------------------------------------------------------------------+\n```\n\nDescribe properties of a Worker Deployment Version, such as the task\nqueues polled by workers in this Deployment Version, or drainage\ninformation required to safely decommission workers, or user-provided\nmetadata, or its creation/modification time.\n\n```\ntemporal worker deployment describe-version [options]\n```\n\nFor example, to describe a deployment version in a deployment\n`YourDeploymentName`, with Build ID `YourBuildID`, and in the default\nnamespace:\n\n```\ntemporal worker deployment describe-version \\\n --deployment-name YourDeploymentName --build-id YourBuildID\n```" } s.Command.Args = cobra.NoArgs + s.Command.Flags().BoolVar(&s.ReportTaskQueueStats, "report-task-queue-stats", false, "Report stats for task queues that are present in this version.") s.DeploymentVersionOptions.BuildFlags(s.Command.Flags()) s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { diff --git a/internal/temporalcli/commands.worker.deployment.go b/internal/temporalcli/commands.worker.deployment.go index 599759607..89690052a 100644 --- a/internal/temporalcli/commands.worker.deployment.go +++ b/internal/temporalcli/commands.worker.deployment.go @@ -8,7 +8,11 @@ import ( "github.com/fatih/color" "github.com/temporalio/cli/internal/printer" "go.temporal.io/api/common/v1" + deploymentpb "go.temporal.io/api/deployment/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) @@ -57,8 +61,43 @@ type formattedDrainageInfo struct { } type formattedTaskQueueInfoRowType struct { - Name string `json:"name"` - Type string `json:"type"` + Name string `json:"name"` + Type string `json:"type"` + Stats *formattedVersionStatsRowType `json:"stats,omitempty"` + StatsByPriorityKey map[int32]formattedVersionStatsRowType `json:"statsByPriorityKey,omitempty"` +} + +type formattedVersionStatsRowType struct { + ApproximateBacklogCount int64 `json:"approximateBacklogCount"` + ApproximateBacklogAge time.Duration `json:"approximateBacklogAge"` + BacklogIncreaseRate float32 `json:"backlogIncreaseRate"` + TasksAddRate float32 `json:"tasksAddRate"` + TasksDispatchRate float32 `json:"tasksDispatchRate"` +} + +// Text display types for task queue info (flattened stats) +type taskQueueDisplayRowBasic struct { + Name string + Type string +} + +type taskQueueDisplayRowWithStats struct { + Name string + Type string + ApproximateBacklogCount int64 `cli:",align=right"` + ApproximateBacklogAge string `cli:",align=right"` + BacklogIncreaseRate float32 `cli:",align=right"` + TasksAddRate float32 `cli:",align=right"` + TasksDispatchRate float32 `cli:",align=right"` +} + +type priorityStatsDisplayRow struct { + Priority int32 `cli:",align=right"` + ApproximateBacklogCount int64 `cli:",align=right"` + ApproximateBacklogAge string `cli:",align=right"` + BacklogIncreaseRate float32 `cli:",align=right"` + TasksAddRate float32 `cli:",align=right"` + TasksDispatchRate float32 `cli:",align=right"` } type formattedWorkerDeploymentVersionInfoType struct { @@ -222,65 +261,128 @@ func printWorkerDeploymentInfo(cctx *CommandContext, deploymentInfo client.Worke return cctx.Printer.PrintStructured(fDeploymentInfo, printer.StructuredOptions{}) } -func formatDrainageInfo(drainageInfo *client.WorkerDeploymentVersionDrainageInfo) (formattedDrainageInfo, error) { - if drainageInfo == nil { - return formattedDrainageInfo{}, nil +// Proto-based conversion functions for describe-version command +// These functions convert gRPC proto types directly, avoiding SDK type dependencies. + +func drainageStatusProtoToStr(status enumspb.VersionDrainageStatus) (string, error) { + switch status { + case enumspb.VERSION_DRAINAGE_STATUS_UNSPECIFIED: + return "unspecified", nil + case enumspb.VERSION_DRAINAGE_STATUS_DRAINING: + return "draining", nil + case enumspb.VERSION_DRAINAGE_STATUS_DRAINED: + return "drained", nil + default: + return "", fmt.Errorf("unrecognized drainage status: %d", status) } +} - drainageStr, err := drainageStatusToStr(drainageInfo.DrainageStatus) - if err != nil { - return formattedDrainageInfo{}, err +func taskQueueTypeProtoToStr(taskQueueType enumspb.TaskQueueType) (string, error) { + switch taskQueueType { + case enumspb.TASK_QUEUE_TYPE_UNSPECIFIED: + return "unspecified", nil + case enumspb.TASK_QUEUE_TYPE_WORKFLOW: + return "workflow", nil + case enumspb.TASK_QUEUE_TYPE_ACTIVITY: + return "activity", nil + case enumspb.TASK_QUEUE_TYPE_NEXUS: + return "nexus", nil + default: + return "", fmt.Errorf("unrecognized task queue type: %d", taskQueueType) } +} - return formattedDrainageInfo{ - DrainageStatus: drainageStr, - LastChangedTime: drainageInfo.LastChangedTime, - LastCheckedTime: drainageInfo.LastCheckedTime, - }, nil +func formatVersionStatsProto(tqStats *taskqueuepb.TaskQueueStats) formattedVersionStatsRowType { + if tqStats == nil { + return formattedVersionStatsRowType{} + } + return formattedVersionStatsRowType{ + ApproximateBacklogCount: tqStats.ApproximateBacklogCount, + ApproximateBacklogAge: tqStats.ApproximateBacklogAge.AsDuration(), + // BacklogIncreaseRate is computed as TasksAddRate - TasksDispatchRate (same as SDK) + BacklogIncreaseRate: tqStats.TasksAddRate - tqStats.TasksDispatchRate, + TasksAddRate: tqStats.TasksAddRate, + TasksDispatchRate: tqStats.TasksDispatchRate, + } } -func formatTaskQueuesInfos(tqis []client.WorkerDeploymentTaskQueueInfo) ([]formattedTaskQueueInfoRowType, error) { +func formatTaskQueuesInfosProto(tqis []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, includeStats bool) ([]formattedTaskQueueInfoRowType, error) { var tqiRows []formattedTaskQueueInfoRowType for _, tqi := range tqis { - tqTypeStr, err := taskQueueTypeToStr(tqi.Type) + tqTypeStr, err := taskQueueTypeProtoToStr(tqi.GetType()) if err != nil { return tqiRows, err } - tqiRows = append(tqiRows, formattedTaskQueueInfoRowType{ - Name: tqi.Name, + + row := formattedTaskQueueInfoRowType{ + Name: tqi.GetName(), Type: tqTypeStr, - }) + } + + if includeStats { + fVersionStats := formatVersionStatsProto(tqi.GetStats()) + row.Stats = &fVersionStats + + if len(tqi.GetStatsByPriorityKey()) > 0 { + fVersionStatsByPriorityKey := map[int32]formattedVersionStatsRowType{} + for k, v := range tqi.GetStatsByPriorityKey() { + fVersionStatsByPriorityKey[k] = formatVersionStatsProto(v) + } + row.StatsByPriorityKey = fVersionStatsByPriorityKey + } + } + + tqiRows = append(tqiRows, row) } return tqiRows, nil } -func workerDeploymentVersionInfoToRows(deploymentInfo client.WorkerDeploymentVersionInfo) (formattedWorkerDeploymentVersionInfoType, error) { - tqi, err := formatTaskQueuesInfos(deploymentInfo.TaskQueuesInfos) +func formatDrainageInfoProto(drainageInfo *deploymentpb.VersionDrainageInfo) (formattedDrainageInfo, error) { + if drainageInfo == nil { + return formattedDrainageInfo{}, nil + } + + drainageStr, err := drainageStatusProtoToStr(drainageInfo.GetStatus()) + if err != nil { + return formattedDrainageInfo{}, err + } + + return formattedDrainageInfo{ + DrainageStatus: drainageStr, + LastChangedTime: drainageInfo.GetLastChangedTime().AsTime(), + LastCheckedTime: drainageInfo.GetLastCheckedTime().AsTime(), + }, nil +} + +// workerDeploymentVersionInfoProtoToRows converts gRPC proto types to formatted types for display. +func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, includeStats bool) (formattedWorkerDeploymentVersionInfoType, error) { + tqi, err := formatTaskQueuesInfosProto(taskQueueInfos, includeStats) if err != nil { return formattedWorkerDeploymentVersionInfoType{}, err } - drainage, err := formatDrainageInfo(deploymentInfo.DrainageInfo) + drainage, err := formatDrainageInfoProto(deploymentInfo.GetDrainageInfo()) if err != nil { return formattedWorkerDeploymentVersionInfoType{}, err } return formattedWorkerDeploymentVersionInfoType{ - DeploymentName: deploymentInfo.Version.DeploymentName, - BuildID: deploymentInfo.Version.BuildID, - CreateTime: deploymentInfo.CreateTime, - RoutingChangedTime: deploymentInfo.RoutingChangedTime, - CurrentSinceTime: deploymentInfo.CurrentSinceTime, - RampingSinceTime: deploymentInfo.RampingSinceTime, - RampPercentage: deploymentInfo.RampPercentage, + DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(), + BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(), + CreateTime: deploymentInfo.GetCreateTime().AsTime(), + RoutingChangedTime: deploymentInfo.GetRoutingChangedTime().AsTime(), + CurrentSinceTime: deploymentInfo.GetCurrentSinceTime().AsTime(), + RampingSinceTime: deploymentInfo.GetRampingSinceTime().AsTime(), + RampPercentage: deploymentInfo.GetRampPercentage(), DrainageInfo: drainage, TaskQueuesInfos: tqi, - Metadata: deploymentInfo.Metadata, + Metadata: deploymentInfo.GetMetadata().GetEntries(), }, nil } -func printWorkerDeploymentVersionInfo(cctx *CommandContext, deploymentInfo client.WorkerDeploymentVersionInfo, msg string) error { - fDeploymentInfo, err := workerDeploymentVersionInfoToRows(deploymentInfo) +// printWorkerDeploymentVersionInfoProto prints worker deployment version info from proto types. +func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, msg string, opts printVersionInfoOptions) error { + fDeploymentInfo, err := workerDeploymentVersionInfoProtoToRows(deploymentInfo, taskQueueInfos, opts.showStats) if err != nil { return err } @@ -290,13 +392,13 @@ func printWorkerDeploymentVersionInfo(cctx *CommandContext, deploymentInfo clien var drainageStr string var drainageLastChangedTime time.Time var drainageLastCheckedTime time.Time - if deploymentInfo.DrainageInfo != nil { - drainageStr, err = drainageStatusToStr(deploymentInfo.DrainageInfo.DrainageStatus) + if deploymentInfo.GetDrainageInfo() != nil { + drainageStr, err = drainageStatusProtoToStr(deploymentInfo.GetDrainageInfo().GetStatus()) if err != nil { return err } - drainageLastChangedTime = deploymentInfo.DrainageInfo.LastChangedTime - drainageLastCheckedTime = deploymentInfo.DrainageInfo.LastCheckedTime + drainageLastChangedTime = deploymentInfo.GetDrainageInfo().GetLastChangedTime().AsTime() + drainageLastCheckedTime = deploymentInfo.GetDrainageInfo().GetLastCheckedTime().AsTime() } printMe := struct { @@ -312,32 +414,26 @@ func printWorkerDeploymentVersionInfo(cctx *CommandContext, deploymentInfo clien DrainageLastCheckedTime time.Time `cli:",cardOmitEmpty"` Metadata map[string]*common.Payload `cli:",cardOmitEmpty"` }{ - DeploymentName: deploymentInfo.Version.DeploymentName, - BuildID: deploymentInfo.Version.BuildID, - CreateTime: deploymentInfo.CreateTime, - RoutingChangedTime: deploymentInfo.RoutingChangedTime, - CurrentSinceTime: deploymentInfo.CurrentSinceTime, - RampingSinceTime: deploymentInfo.RampingSinceTime, - RampPercentage: deploymentInfo.RampPercentage, + DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(), + BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(), + CreateTime: deploymentInfo.GetCreateTime().AsTime(), + RoutingChangedTime: deploymentInfo.GetRoutingChangedTime().AsTime(), + CurrentSinceTime: deploymentInfo.GetCurrentSinceTime().AsTime(), + RampingSinceTime: deploymentInfo.GetRampingSinceTime().AsTime(), + RampPercentage: deploymentInfo.GetRampPercentage(), DrainageStatus: drainageStr, DrainageLastChangedTime: drainageLastChangedTime, DrainageLastCheckedTime: drainageLastCheckedTime, - Metadata: deploymentInfo.Metadata, + Metadata: deploymentInfo.GetMetadata().GetEntries(), } err := cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{}) if err != nil { return fmt.Errorf("displaying worker deployment version info failed: %w", err) } - if len(deploymentInfo.TaskQueuesInfos) > 0 { - cctx.Printer.Println() - cctx.Printer.Println(color.MagentaString("Task Queues:")) - err := cctx.Printer.PrintStructured( - fDeploymentInfo.TaskQueuesInfos, - printer.StructuredOptions{Table: &printer.TableOptions{}}, - ) - if err != nil { - return fmt.Errorf("displaying task queues failed: %w", err) + if len(taskQueueInfos) > 0 { + if err := printTaskQueuesInfo(cctx, fDeploymentInfo.TaskQueuesInfos, opts); err != nil { + return err } } @@ -348,6 +444,119 @@ func printWorkerDeploymentVersionInfo(cctx *CommandContext, deploymentInfo clien return cctx.Printer.PrintStructured(fDeploymentInfo, printer.StructuredOptions{}) } +type printVersionInfoOptions struct { + showStats bool +} + +func formatDurationShort(d time.Duration) string { + if d == 0 { + return "0s" + } + return d.Truncate(time.Millisecond).String() +} + +func printTaskQueuesInfo(cctx *CommandContext, taskQueues []formattedTaskQueueInfoRowType, opts printVersionInfoOptions) error { + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString("Task Queues:")) + + if opts.showStats { + // Show flattened stats in the table + rows := make([]taskQueueDisplayRowWithStats, 0, len(taskQueues)) + for _, tq := range taskQueues { + row := taskQueueDisplayRowWithStats{ + Name: tq.Name, + Type: tq.Type, + } + if tq.Stats != nil { + row.ApproximateBacklogCount = tq.Stats.ApproximateBacklogCount + row.ApproximateBacklogAge = formatDurationShort(tq.Stats.ApproximateBacklogAge) + row.BacklogIncreaseRate = tq.Stats.BacklogIncreaseRate + row.TasksAddRate = tq.Stats.TasksAddRate + row.TasksDispatchRate = tq.Stats.TasksDispatchRate + } + rows = append(rows, row) + } + if err := cctx.Printer.PrintStructured(rows, printer.StructuredOptions{Table: &printer.TableOptions{}}); err != nil { + return fmt.Errorf("displaying task queues failed: %w", err) + } + + // Show per-priority stats automatically if any task queue has non-default priority data. + // Skip if the only priority key is 3 (the default), as that would be redundant. + for _, tq := range taskQueues { + if !hasNonDefaultPriorityKeys(tq.StatsByPriorityKey) { + continue + } + cctx.Printer.Println() + cctx.Printer.Println(color.MagentaString(fmt.Sprintf("Stats by Priority (%s / %s):", tq.Name, tq.Type))) + + // Sort priority keys for consistent output + priorities := make([]int32, 0, len(tq.StatsByPriorityKey)) + for p := range tq.StatsByPriorityKey { + priorities = append(priorities, p) + } + sortInt32s(priorities) + + priorityRows := make([]priorityStatsDisplayRow, 0, len(priorities)) + for _, p := range priorities { + stats := tq.StatsByPriorityKey[p] + priorityRows = append(priorityRows, priorityStatsDisplayRow{ + Priority: p, + ApproximateBacklogCount: stats.ApproximateBacklogCount, + ApproximateBacklogAge: formatDurationShort(stats.ApproximateBacklogAge), + BacklogIncreaseRate: stats.BacklogIncreaseRate, + TasksAddRate: stats.TasksAddRate, + TasksDispatchRate: stats.TasksDispatchRate, + }) + } + if err := cctx.Printer.PrintStructured(priorityRows, printer.StructuredOptions{Table: &printer.TableOptions{}}); err != nil { + return fmt.Errorf("displaying priority stats failed: %w", err) + } + } + } else { + // Show basic table without stats + rows := make([]taskQueueDisplayRowBasic, 0, len(taskQueues)) + for _, tq := range taskQueues { + rows = append(rows, taskQueueDisplayRowBasic{ + Name: tq.Name, + Type: tq.Type, + }) + } + if err := cctx.Printer.PrintStructured(rows, printer.StructuredOptions{Table: &printer.TableOptions{}}); err != nil { + return fmt.Errorf("displaying task queues failed: %w", err) + } + } + + return nil +} + +func sortInt32s(s []int32) { + for i := 0; i < len(s)-1; i++ { + for j := i + 1; j < len(s); j++ { + if s[j] < s[i] { + s[i], s[j] = s[j], s[i] + } + } + } +} + +// defaultPriorityKey is the default priority key value. When this is the only +// priority key present, we skip showing per-priority stats as it would be redundant. +const defaultPriorityKey = 3 + +// hasNonDefaultPriorityKeys returns true if the map contains any priority keys +// other than the default (3), or contains multiple priority keys. +func hasNonDefaultPriorityKeys(statsByPriorityKey map[int32]formattedVersionStatsRowType) bool { + if len(statsByPriorityKey) == 0 { + return false + } + if len(statsByPriorityKey) > 1 { + return true + } + // Exactly one key - check if it's the default + _, hasDefault := statsByPriorityKey[defaultPriorityKey] + return !hasDefault +} + type getDeploymentConflictTokenOptions struct { safeMode bool safeModeMessage string @@ -592,16 +801,23 @@ func (c *TemporalWorkerDeploymentDescribeVersionCommand) run(cctx *CommandContex } defer cl.Close() - dHandle := cl.WorkerDeploymentClient().GetHandle(c.DeploymentName) - - resp, err := dHandle.DescribeVersion(cctx, client.WorkerDeploymentDescribeVersionOptions{ - BuildID: c.BuildId, + // Use raw gRPC instead of SDK's DeploymentClient to avoid circular dependency + // with SDK release that exposes TaskQueuesInfos in DescribeVersionResponse + resp, err := cl.WorkflowService().DescribeWorkerDeploymentVersion(cctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{ + Namespace: c.Parent.Parent.Namespace, + DeploymentVersion: &deploymentpb.WorkerDeploymentVersion{ + DeploymentName: c.DeploymentName, + BuildId: c.BuildId, + }, + ReportTaskQueueStats: c.ReportTaskQueueStats, }) if err != nil { return fmt.Errorf("error describing worker deployment version: %w", err) } - err = printWorkerDeploymentVersionInfo(cctx, resp.Info, "Worker Deployment Version:") + err = printWorkerDeploymentVersionInfoProto(cctx, resp.GetWorkerDeploymentVersionInfo(), resp.GetVersionTaskQueues(), "Worker Deployment Version:", printVersionInfoOptions{ + showStats: c.ReportTaskQueueStats, + }) if err != nil { return err } diff --git a/internal/temporalcli/commands.worker.deployment_test.go b/internal/temporalcli/commands.worker.deployment_test.go index 86df4368a..1e7301e60 100644 --- a/internal/temporalcli/commands.worker.deployment_test.go +++ b/internal/temporalcli/commands.worker.deployment_test.go @@ -11,6 +11,11 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "go.temporal.io/api/common/v1" + deploymentpb "go.temporal.io/api/deployment/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -48,9 +53,19 @@ type jsonDrainageInfo struct { LastCheckedTime time.Time `json:"lastCheckedTime"` } +type jsonVersionStatsType struct { + ApproximateBacklogCount int64 `json:"approximateBacklogCount"` + ApproximateBacklogAge int64 `json:"approximateBacklogAge"` // Duration is serialized as nanoseconds + BacklogIncreaseRate float32 `json:"backlogIncreaseRate"` + TasksAddRate float32 `json:"tasksAddRate"` + TasksDispatchRate float32 `json:"tasksDispatchRate"` +} + type jsonTaskQueueInfoRowType struct { - Name string `json:"name"` - Type string `json:"type"` + Name string `json:"name"` + Type string `json:"type"` + Stats *jsonVersionStatsType `json:"stats,omitempty"` + StatsByPriorityKey map[string]jsonVersionStatsType `json:"statsByPriorityKey,omitempty"` } type jsonDeploymentVersionInfoType struct { @@ -769,3 +784,265 @@ func (s *SharedServerSuite) TestDeployment_Set_Manager_Identity() { s.Equal(deploymentName, jsonOut.Name) s.Equal(testIdentity, jsonOut.ManagerIdentity) } + +func (s *SharedServerSuite) TestDeployment_Describe_Version_TaskQueueStats_WithPriority() { + s.testDeploymentDescribeVersionTaskQueueStats(true) +} + +func (s *SharedServerSuite) TestDeployment_Describe_Version_TaskQueueStats_WithoutPriority() { + s.testDeploymentDescribeVersionTaskQueueStats(false) +} + +func (s *SharedServerSuite) testDeploymentDescribeVersionTaskQueueStats(withPriority bool) { + deploymentName := uuid.NewString() + buildId := uuid.NewString() + taskQueue := uuid.NewString() + version := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: buildId, + } + + // Create worker directly with explicit versioning behavior + w1 := worker.New(s.Client, taskQueue, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: version, + }, + }) + + // Register a workflow with explicit Pinned versioning behavior + w1.RegisterWorkflowWithOptions( + func(ctx workflow.Context, input any) (any, error) { + workflow.GetSignalChannel(ctx, "complete-signal").Receive(ctx, nil) + return nil, nil + }, + workflow.RegisterOptions{ + Name: "TestBacklogWorkflow", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }, + ) + + s.NoError(w1.Start()) + + // Wait for the deployment version to appear + s.EventuallyWithT(func(t *assert.CollectT) { + res := s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", version.DeploymentName, "--build-id", version.BuildID, + ) + assert.NoError(t, res.Err) + }, 30*time.Second, 100*time.Millisecond) + + // Set as current version so workflows are routed to this version + res := s.Execute( + "worker", "deployment", "set-current-version", + "--address", s.Address(), + "--deployment-name", version.DeploymentName, "--build-id", version.BuildID, + "--yes", + ) + s.NoError(res.Err) + + // Stop the worker so workflow tasks will backlog + w1.Stop() + + // Start workflows - they will queue up as workflow tasks since there is no worker + // Priority keys: 1 (high), 3 (medium/default), 5 (low) + priorityKeys := []int{1, 3, 5} + numWorkflows := len(priorityKeys) + workflowRuns := make([]client.WorkflowRun, numWorkflows) + for i := 0; i < numWorkflows; i++ { + opts := client.StartWorkflowOptions{ + TaskQueue: taskQueue, + } + if withPriority { + opts.Priority = temporal.Priority{PriorityKey: priorityKeys[i]} + } + run, err := s.Client.ExecuteWorkflow( + s.Context, + opts, + "TestBacklogWorkflow", + "test-input", + ) + s.NoError(err) + workflowRuns[i] = run + s.T().Logf("Started workflow %d: %s", i, run.GetID()) + } + + // Test 1: Verify gRPC returns non-zero backlog stats (validates server behavior) + // Use raw gRPC instead of SDK's DeploymentClient to avoid circular dependency + s.EventuallyWithT(func(t *assert.CollectT) { + desc, err := s.Client.WorkflowService().DescribeWorkerDeploymentVersion(s.Context, &workflowservice.DescribeWorkerDeploymentVersionRequest{ + Namespace: s.Namespace(), + DeploymentVersion: &deploymentpb.WorkerDeploymentVersion{ + DeploymentName: version.DeploymentName, + BuildId: version.BuildID, + }, + ReportTaskQueueStats: true, + }) + assert.NoError(t, err) + + // Find the workflow task queue and check backlog stats (matching SDK test pattern) + for _, tqInfo := range desc.GetVersionTaskQueues() { + if tqInfo.GetName() == taskQueue && tqInfo.GetType() == enumspb.TASK_QUEUE_TYPE_WORKFLOW && tqInfo.GetStats() != nil { + stats := tqInfo.GetStats() + backlogIncreaseRate := stats.TasksAddRate - stats.TasksDispatchRate + // Check all stats fields like the SDK test does + if stats.ApproximateBacklogCount > 0 && + stats.ApproximateBacklogAge.AsDuration().Nanoseconds() > 0 && + stats.TasksAddRate > 0 && + stats.TasksDispatchRate == 0 && // zero task dispatch due to no pollers + backlogIncreaseRate > 0 { + return // Success + } + t.Errorf("Unexpected backlog stats for tq %s: backlogCount=%d, backlogAge=%v, addRate=%f, dispatchRate=%f, increaseRate=%f", + taskQueue, stats.ApproximateBacklogCount, stats.ApproximateBacklogAge.AsDuration(), + stats.TasksAddRate, stats.TasksDispatchRate, backlogIncreaseRate) + return + } + } + t.Errorf("No workflow task queue with stats found for task queue %s in %d task queues", taskQueue, len(desc.GetVersionTaskQueues())) + }, 10*time.Second, 500*time.Millisecond) + + // Verify text output has all stats columns with non-zero values + res = s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", version.DeploymentName, "--build-id", version.BuildID, + "--report-task-queue-stats", + ) + s.NoError(res.Err) + outputWithStats := res.Stdout.String() + // Verify column headers are present + s.Contains(outputWithStats, "ApproximateBacklogCount") + s.Contains(outputWithStats, "ApproximateBacklogAge") + s.Contains(outputWithStats, "BacklogIncreaseRate") + s.Contains(outputWithStats, "TasksAddRate") + s.Contains(outputWithStats, "TasksDispatchRate") + // Verify the workflow task queue row has the expected backlog count (we started 3 workflows) + // Format: Name Type ApproximateBacklogCount ApproximateBacklogAge BacklogIncreaseRate TasksAddRate TasksDispatchRate + s.ContainsOnSameLine(outputWithStats, taskQueue, "workflow", "3") + if withPriority { + s.Contains(outputWithStats, "Stats by Priority") + // Verify each priority key row shows approximate backlog count of 1 (one workflow per priority) + // The format is: Priority ApproximateBacklogCount ApproximateBacklogAge ... + // We started one workflow with each priority key (1, 3, 5), so each should have backlog of 1 + for _, priorityKey := range priorityKeys { + // Check that the priority row contains the priority key followed by backlog count of 1 + nonZeroBacklogIncreaseRate := "0." + nonZeroTasksAddRate := "0." + zeroTaskDispatchRate := "0" + // Once priority is enabled in all servers by default, check that backlog count of 1 is on each priority row. + // For servers where priority is not enabled, or workflows that aren't actively using priority keys, + // all backlog will be sent to the default priority key (3). + s.ContainsOnSameLine(outputWithStats, + fmt.Sprintf("%d", priorityKey), + nonZeroBacklogIncreaseRate, + nonZeroTasksAddRate, + zeroTaskDispatchRate, + ) + } + } else { + // Verify that "Stats by Priority" is NOT shown when only the default priority key (3) is used + s.NotContains(outputWithStats, "Stats by Priority") + } + + // Test 2: describe-version WITHOUT --report-task-queue-stats should NOT show stats columns + res = s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", version.DeploymentName, "--build-id", version.BuildID, + ) + s.NoError(res.Err) + // Stats columns should not appear in text output + outputNoStats := res.Stdout.String() + s.NotContains(outputNoStats, "ApproximateBacklogCount") + s.NotContains(outputNoStats, "ApproximateBacklogAge") + s.NotContains(outputNoStats, "BacklogIncreaseRate") + s.NotContains(outputNoStats, "TasksAddRate") + s.NotContains(outputNoStats, "TasksDispatchRate") + + // Test 3: JSON output without stats flag - stats should be nil/missing + res = s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", version.DeploymentName, "--build-id", version.BuildID, + "--output", "json", + ) + s.NoError(res.Err) + var jsonOutNoStats jsonDeploymentVersionInfoType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOutNoStats)) + // Should have task queue info but no stats + s.Greater(len(jsonOutNoStats.TaskQueuesInfos), 0) + for _, tq := range jsonOutNoStats.TaskQueuesInfos { + s.Nil(tq.Stats, "Stats should be nil when --report-task-queue-stats is not provided") + } + + // Test 4: JSON output with stats flag - verify stats structure is present with non-zero backlog + res = s.Execute( + "worker", "deployment", "describe-version", + "--address", s.Address(), + "--deployment-name", version.DeploymentName, "--build-id", version.BuildID, + "--report-task-queue-stats", + "--output", "json", + ) + s.NoError(res.Err) + + var jsonOutWithStats jsonDeploymentVersionInfoType + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOutWithStats)) + s.Greater(len(jsonOutWithStats.TaskQueuesInfos), 0, "Should have task queue info") + + // Find the workflow task queue and verify it has non-zero backlog stats + foundWorkflowTQWithStats := false + for _, tq := range jsonOutWithStats.TaskQueuesInfos { + if tq.Name == taskQueue && tq.Type == "workflow" { + s.NotNil(tq.Stats, "Stats should be present when --report-task-queue-stats is provided") + if tq.Stats != nil { + foundWorkflowTQWithStats = true + // Verify all stats fields are present and backlog count matches what we created + s.Greater(tq.Stats.ApproximateBacklogCount, int64(0), + "ApproximateBacklogCount should be non-zero with pending workflows") + s.Greater(tq.Stats.ApproximateBacklogAge, int64(0), + "ApproximateBacklogAge should be non-zero with pending workflows") + s.Greater(tq.Stats.TasksAddRate, float32(0), + "TasksAddRate should be non-zero after adding tasks") + // BacklogIncreaseRate may be positive (if backlog is growing) + // TasksDispatchRate should be 0 since no worker is running + s.Equal(float32(0), tq.Stats.TasksDispatchRate, + "TasksDispatchRate should be zero with no worker running") + } + } + } + s.True(foundWorkflowTQWithStats, "Should find workflow task queue with stats") + + // Cleanup: Restart worker and signal workflows to complete + w2 := worker.New(s.Client, taskQueue, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: version, + }, + }) + w2.RegisterWorkflowWithOptions( + func(ctx workflow.Context, input any) (any, error) { + workflow.GetSignalChannel(ctx, "complete-signal").Receive(ctx, nil) + return nil, nil + }, + workflow.RegisterOptions{ + Name: "TestBacklogWorkflow", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }, + ) + s.NoError(w2.Start()) + defer w2.Stop() + + // Signal all workflows to complete + for _, run := range workflowRuns { + err := s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "complete-signal", nil) + s.NoError(err) + } + + // Wait for workflows to complete + for _, run := range workflowRuns { + s.NoError(run.Get(s.Context, nil)) + } +} diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 049e82cbc..c2e0ef87e 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -954,6 +954,10 @@ commands: ``` option-sets: - deployment-version + options: + - name: report-task-queue-stats + type: bool + description: Report stats for task queues that are present in this version. - name: temporal worker deployment delete-version summary: Delete a Worker Deployment Version