diff --git a/api/client/kubeclient/async.go b/api/client/kubeclient/async.go index 61ed3f56d0..2d22345fbf 100644 --- a/api/client/kubeclient/async.go +++ b/api/client/kubeclient/async.go @@ -49,7 +49,7 @@ func (aws *asyncWSRoundTripper) WebsocketCallback(ws *websocket.Conn, resp *http if resp != nil && resp.StatusCode != http.StatusOK { return enrichError(err, resp) } - return fmt.Errorf("Can't connect to websocket: %s\n", err.Error()) + return fmt.Errorf("can't connect to websocket: %w", err) } aws.Connection <- ws @@ -105,7 +105,9 @@ func asyncSubresourceHelper( } if response != nil { - defer response.Body.Close() + defer func() { + _ = response.Body.Close() + }() switch response.StatusCode { case http.StatusOK: case http.StatusNotFound: @@ -165,7 +167,7 @@ func enrichError(httpErr error, resp *http.Response) error { if resp == nil { return httpErr } - httpErr = fmt.Errorf("Can't connect to websocket (%d): %s\n", resp.StatusCode, httpErr.Error()) + httpErr = fmt.Errorf("can't connect to websocket (%d): %w", resp.StatusCode, httpErr) status := &metav1.Status{} if resp.Header.Get("Content-Type") != "application/json" { @@ -201,7 +203,9 @@ type WebsocketRoundTripper struct { func (d *WebsocketRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { conn, resp, err := d.Dialer.Dial(r.URL.String(), r.Header) if err == nil { - defer conn.Close() + defer func() { + _ = conn.Close() + }() } return resp, d.Do(conn, resp, err) } diff --git a/api/client/kubeclient/streamer.go b/api/client/kubeclient/streamer.go index 216bfd2f0e..120929a398 100644 --- a/api/client/kubeclient/streamer.go +++ b/api/client/kubeclient/streamer.go @@ -69,10 +69,10 @@ type wsConn struct { } func (c *wsConn) SetDeadline(t time.Time) error { - if err := c.Conn.SetWriteDeadline(t); err != nil { + if err := c.SetWriteDeadline(t); err != nil { return err } - return c.Conn.SetReadDeadline(t) + return c.SetReadDeadline(t) } func NewWebsocketStreamer(conn *websocket.Conn, done chan struct{}) *wsStreamer { diff --git a/api/client/kubeclient/websocket.go b/api/client/kubeclient/websocket.go index 94c2018732..2cb4e4ccfb 100644 --- a/api/client/kubeclient/websocket.go +++ b/api/client/kubeclient/websocket.go @@ -76,7 +76,9 @@ func (s *binaryWriter) Write(p []byte) (int, error) { if err != nil { return 0, convert(err) } - defer w.Close() + defer func() { + _ = w.Close() + }() n, err := w.Write(p) return n, err } diff --git a/api/core/v1alpha2/virtual_machine_operation.go b/api/core/v1alpha2/virtual_machine_operation.go index 1b11cdc0a4..c367e3e5f3 100644 --- a/api/core/v1alpha2/virtual_machine_operation.go +++ b/api/core/v1alpha2/virtual_machine_operation.go @@ -31,6 +31,7 @@ const ( // +kubebuilder:subresource:status // +kubebuilder:resource:categories={virtualization},scope=Namespaced,shortName={vmop},singular=virtualmachineoperation // +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase",description="VirtualMachineOperation phase." +// +kubebuilder:printcolumn:name="Progress",type="integer",JSONPath=".status.progress",description="VirtualMachineOperation progress in percent." // +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".spec.type",description="VirtualMachineOperation type." // +kubebuilder:printcolumn:name="VirtualMachine",type="string",JSONPath=".spec.virtualMachineName",description="VirtualMachine name." // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Time of resource creation." @@ -109,6 +110,10 @@ type VirtualMachineOperationCloneCustomization struct { type VirtualMachineOperationStatus struct { Phase VMOPPhase `json:"phase"` + // Progress reports operation completion percentage for migration-related VMOPs (Evict/Migrate). + // +kubebuilder:validation:Minimum=0 + // +kubebuilder:validation:Maximum=100 + Progress *int32 `json:"progress,omitempty"` // The latest detailed observations of the VirtualMachineOperation resource. Conditions []metav1.Condition `json:"conditions,omitempty"` // Resource generation last processed by the controller. diff --git a/api/core/v1alpha2/vmopcondition/condition.go b/api/core/v1alpha2/vmopcondition/condition.go index 58d917ab5a..9326c306ec 100644 --- a/api/core/v1alpha2/vmopcondition/condition.go +++ b/api/core/v1alpha2/vmopcondition/condition.go @@ -86,6 +86,42 @@ const ( // ReasonMigrationRunning is a ReasonCompleted indicating that the migration process is currently in progress. ReasonMigrationRunning ReasonCompleted = "MigrationRunning" + // ReasonDisksPreparing indicates that migration-related disk preparation is in progress. + ReasonDisksPreparing ReasonCompleted = "DisksPreparing" + + // ReasonTargetScheduling indicates that the target pod is being scheduled. + ReasonTargetScheduling ReasonCompleted = "TargetScheduling" + + // ReasonTargetUnschedulable indicates that the target pod cannot be scheduled. + ReasonTargetUnschedulable ReasonCompleted = "TargetUnschedulable" + + // ReasonTargetDiskError indicates that target disk attachment failed. + ReasonTargetDiskError ReasonCompleted = "TargetDiskError" + + // ReasonTargetPreparing indicates that target pod is being prepared. + ReasonTargetPreparing ReasonCompleted = "TargetPreparing" + + // ReasonSyncing indicates that source and target are synchronizing migration data. + ReasonSyncing ReasonCompleted = "Syncing" + + // ReasonNotConverging indicates that migration cannot converge even with maximum throttling. + ReasonNotConverging ReasonCompleted = "NotConverging" + + // ReasonSourceSuspended indicates that source VM has been suspended. + ReasonSourceSuspended ReasonCompleted = "SourceSuspended" + + // ReasonTargetResumed indicates that target VM has resumed. + ReasonTargetResumed ReasonCompleted = "TargetResumed" + + // ReasonMigrationCompleted indicates that migration has completed successfully. + ReasonMigrationCompleted ReasonCompleted = "Completed" + + // ReasonAborted indicates that migration has been aborted. + ReasonAborted ReasonCompleted = "Aborted" + + // ReasonFailed indicates that migration failed for an unspecified reason. + ReasonFailed ReasonCompleted = "Failed" + // ReasonOtherMigrationInProgress is a ReasonCompleted indicating that there are other migrations in progress. ReasonOtherMigrationInProgress ReasonCompleted = "OtherMigrationInProgress" diff --git a/api/core/v1alpha2/zz_generated.deepcopy.go b/api/core/v1alpha2/zz_generated.deepcopy.go index e1939fd64a..0f918a95bd 100644 --- a/api/core/v1alpha2/zz_generated.deepcopy.go +++ b/api/core/v1alpha2/zz_generated.deepcopy.go @@ -2996,6 +2996,11 @@ func (in *VirtualMachineOperationSpec) DeepCopy() *VirtualMachineOperationSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VirtualMachineOperationStatus) DeepCopyInto(out *VirtualMachineOperationStatus) { *out = *in + if in.Progress != nil { + in, out := &in.Progress, &out.Progress + *out = new(int32) + **out = **in + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]v1.Condition, len(*in)) diff --git a/build/components/versions.yml b/build/components/versions.yml index f72a94e45d..f47ec65762 100644 --- a/build/components/versions.yml +++ b/build/components/versions.yml @@ -3,7 +3,7 @@ firmware: libvirt: v10.9.0 edk2: stable202411 core: - 3p-kubevirt: v1.6.2-v12n.21 + 3p-kubevirt: feat/vm/migration-progress 3p-containerized-data-importer: v1.60.3-v12n.17 distribution: 2.8.3 package: diff --git a/crds/doc-ru-virtualmachineoperations.yaml b/crds/doc-ru-virtualmachineoperations.yaml index 0c9713d792..d06acf316c 100644 --- a/crds/doc-ru-virtualmachineoperations.yaml +++ b/crds/doc-ru-virtualmachineoperations.yaml @@ -128,6 +128,9 @@ spec: * `Completed` — операция прошла успешно; * `Failed` — операция завершилась неудачно. За подробностями обратитесь к полю `conditions` и событиям; * `Terminating` — операция удаляется. + progress: + description: | + Прогресс выполнения операции в процентах для миграционных VMOP (`Evict`/`Migrate`). observedGeneration: description: | Поколение ресурса, которое в последний раз обрабатывалось контроллером. diff --git a/crds/embedded/virtualmachineinstances.yaml b/crds/embedded/virtualmachineinstances.yaml index c94463fec5..f68101a3b8 100644 --- a/crds/embedded/virtualmachineinstances.yaml +++ b/crds/embedded/virtualmachineinstances.yaml @@ -4100,6 +4100,36 @@ spec: description: Lets us know if the vmi is currently running pre or post copy migration type: string + transferStatus: + description: TransferStatus contains migration transfer details + reported by the source runtime. + properties: + autoConvergeThrottle: + description: AutoConvergeThrottle is the current auto-converge throttle + reported by the source runtime. + format: int32 + type: integer + dataProcessedBytes: + description: DataProcessedBytes is the amount of migration data already + processed by the source runtime. + format: int64 + type: integer + dataRemainingBytes: + description: DataRemainingBytes is the amount of migration data still + remaining on the source runtime. + format: int64 + type: integer + dataTotalBytes: + description: DataTotalBytes is the total amount of migration data reported + by the source runtime. + format: int64 + type: integer + iteration: + description: Iteration is the current migration iteration reported by + the source runtime. + format: int32 + type: integer + type: object sourceNode: description: The source node that the VMI originated on type: string diff --git a/crds/virtualmachineoperations.yaml b/crds/virtualmachineoperations.yaml index 86b75a01c4..7c79b47bb2 100644 --- a/crds/virtualmachineoperations.yaml +++ b/crds/virtualmachineoperations.yaml @@ -26,6 +26,10 @@ spec: jsonPath: .status.phase name: Phase type: string + - description: VirtualMachineOperation progress in percent. + jsonPath: .status.progress + name: Progress + type: integer - description: VirtualMachineOperation type. jsonPath: .spec.type name: Type @@ -319,6 +323,14 @@ spec: - Failed - Terminating type: string + progress: + description: + Progress reports operation completion percentage for + migration-related VMOPs (Evict/Migrate). + format: int32 + maximum: 100 + minimum: 0 + type: integer resources: description: Resources contains the list of resources that are affected diff --git a/images/virtualization-artifact/go.mod b/images/virtualization-artifact/go.mod index bade8ad4ec..0b1b9a3238 100644 --- a/images/virtualization-artifact/go.mod +++ b/images/virtualization-artifact/go.mod @@ -168,4 +168,4 @@ replace ( ) // Kubevirt API replaces -replace kubevirt.io/api => github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v1.6.2-v12n.21 +replace kubevirt.io/api => github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v0.0.0-20260403154920-301347b413ce diff --git a/images/virtualization-artifact/go.sum b/images/virtualization-artifact/go.sum index 0ec653e4a0..e85f01e7e7 100644 --- a/images/virtualization-artifact/go.sum +++ b/images/virtualization-artifact/go.sum @@ -49,8 +49,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v1.6.2-v12n.21 h1:W0nSf7fOTTLn5lVqR4JR3KctrABzyqb/sCkmSPx2fEY= -github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v1.6.2-v12n.21/go.mod h1:wGZLfRa/b4w/V/hakmfcK0CmrAZGfpj+jN7BMt0s19E= +github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v0.0.0-20260403095053-aefa74c02fee h1:FL3Sn9OL9HZZX01vWiO6t6ps8nkxH+AOilBp+Rdp6iU= +github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v0.0.0-20260403095053-aefa74c02fee/go.mod h1:wGZLfRa/b4w/V/hakmfcK0CmrAZGfpj+jN7BMt0s19E= +github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v0.0.0-20260403154920-301347b413ce h1:b6I/SUcA30j2wcOBBERbN20cKARaDAHNtSzP4XB6kgg= +github.com/deckhouse/3p-kubevirt/staging/src/kubevirt.io/api v0.0.0-20260403154920-301347b413ce/go.mod h1:wGZLfRa/b4w/V/hakmfcK0CmrAZGfpj+jN7BMt0s19E= github.com/deckhouse/deckhouse/pkg/log v0.0.0-20250226105106-176cd3afcdd5 h1:PsN1E0oxC/+4zdA977txrqUCuObFL3HAuu5Xnud8m8c= github.com/deckhouse/deckhouse/pkg/log v0.0.0-20250226105106-176cd3afcdd5/go.mod h1:Mk5HRzkc5pIcDIZ2JJ6DPuuqnwhXVkb3you8M8Mg+4w= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/migrating.go b/images/virtualization-artifact/pkg/controller/vm/internal/migrating.go index b7a4ba143a..99d3f0609d 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/migrating.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/migrating.go @@ -162,16 +162,16 @@ func (h *MigratingHandler) syncMigrating(ctx context.Context, s state.VirtualMac completed, _ := conditions.GetCondition(vmopcondition.TypeCompleted, vmop.Status.Conditions) switch completed.Reason { - case vmopcondition.ReasonMigrationPending.String(): + case vmopcondition.ReasonMigrationPending.String(), vmopcondition.ReasonTargetScheduling.String(): cb.Message("Migration is awaiting start.") case vmopcondition.ReasonQuotaExceeded.String(): cb.Message(fmt.Sprintf("Migration is pending: %s.", completed.Message)) - case vmopcondition.ReasonMigrationPrepareTarget.String(): + case vmopcondition.ReasonMigrationPrepareTarget.String(), vmopcondition.ReasonTargetPreparing.String(), vmopcondition.ReasonDisksPreparing.String(): cb.Message("Migration is awaiting target preparation.") - case vmopcondition.ReasonMigrationTargetReady.String(): + case vmopcondition.ReasonMigrationTargetReady.String(), vmopcondition.ReasonSyncing.String(), vmopcondition.ReasonSourceSuspended.String(), vmopcondition.ReasonTargetResumed.String(): cb.Message("Migration is awaiting execution.") case vmopcondition.ReasonWaitingForVirtualMachineToBeReadyToMigrate.String(): @@ -183,7 +183,7 @@ func (h *MigratingHandler) syncMigrating(ctx context.Context, s state.VirtualMac case vmopcondition.ReasonMigrationRunning.String(): cb.Status(metav1.ConditionTrue).Reason(vmcondition.ReasonMigratingInProgress) - case vmopcondition.ReasonOperationCompleted.String(): + case vmopcondition.ReasonOperationCompleted.String(), vmopcondition.ReasonMigrationCompleted.String(): conditions.RemoveCondition(vmcondition.TypeMigrating, &vm.Status.Conditions) return nil diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/service/migration_volumes.go b/images/virtualization-artifact/pkg/controller/vm/internal/service/migration_volumes.go index 5f208c9f1c..2fc332e9ae 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/service/migration_volumes.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/service/migration_volumes.go @@ -94,7 +94,7 @@ func (s MigrationVolumesService) SyncVolumes(ctx context.Context, vmState state. if vmop != nil { completed, _ := conditions.GetCondition(vmopcondition.TypeCompleted, vmop.Status.Conditions) switch completed.Reason { - case vmopcondition.ReasonMigrationPrepareTarget.String(), vmopcondition.ReasonMigrationTargetReady.String(), vmopcondition.ReasonMigrationRunning.String(): + case vmopcondition.ReasonMigrationPrepareTarget.String(), vmopcondition.ReasonMigrationTargetReady.String(), vmopcondition.ReasonMigrationRunning.String(), vmopcondition.ReasonTargetPreparing.String(), vmopcondition.ReasonSyncing.String(), vmopcondition.ReasonSourceSuspended.String(), vmopcondition.ReasonTargetResumed.String(): return reconcile.Result{}, nil } } diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/deletion_test.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/deletion_test.go index 91ecaa08ea..c1ebe4e2be 100644 --- a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/deletion_test.go +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/deletion_test.go @@ -108,11 +108,11 @@ var _ = Describe("DeletionHandler", func() { }, Entry("VMOP Evict 1", newVmop(v1alpha2.VMOPPhaseInProgress, vmopbuilder.WithType(v1alpha2.VMOPTypeEvict), vmopbuilder.WithVirtualMachine("test-vm")), - newSimpleMigration("vmop-"+name, namespace, "test-vm"), true, + newSimpleMigration("vmop-"+name, "test-vm"), true, ), Entry("VMOP Evict 2", newVmop(v1alpha2.VMOPPhaseCompleted, vmopbuilder.WithType(v1alpha2.VMOPTypeEvict), vmopbuilder.WithVirtualMachine("test-vm")), - newSimpleMigration("vmop-"+name, namespace, "test-vm"), false, + newSimpleMigration("vmop-"+name, "test-vm"), false, ), ) }) diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go index dad39694b6..2618bcf3b8 100644 --- a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle.go @@ -19,10 +19,14 @@ package handler import ( "context" "fmt" + "strings" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" virtv1 "kubevirt.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -30,6 +34,7 @@ import ( "github.com/deckhouse/virtualization-controller/pkg/common/object" commonvmop "github.com/deckhouse/virtualization-controller/pkg/common/vmop" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" + migrationprogress "github.com/deckhouse/virtualization-controller/pkg/controller/vmop/migration/internal/progress" migrationservice "github.com/deckhouse/virtualization-controller/pkg/controller/vmop/migration/internal/service" genericservice "github.com/deckhouse/virtualization-controller/pkg/controller/vmop/service" "github.com/deckhouse/virtualization-controller/pkg/eventrecord" @@ -42,6 +47,28 @@ import ( const lifecycleHandlerName = "LifecycleHandler" +const ( + progressDisksPreparing int32 = 1 + progressTargetScheduling int32 = 2 + progressTargetPreparing int32 = 3 + progressSourceSuspended int32 = 91 + progressTargetResumed int32 = 92 + progressMigrationCompleted int32 = 100 +) + +const ( + messageSyncingSourceAndTarget = "Syncing source and target" + messageTargetPodScheduling = "Target pod is being scheduled" + messageTargetPodPreparing = "Target pod is being prepared" + messageTargetVMResumed = "Target VM resumed" + messageSourceVMSuspended = "Source VM suspended" +) + +const ( + reasonFailedAttachVolume = "FailedAttachVolume" + reasonFailedMount = "FailedMount" +) + type Base interface { Init(vmop *v1alpha2.VirtualMachineOperation) ShouldExecuteOrSetFailedPhase(ctx context.Context, vmop *v1alpha2.VirtualMachineOperation) (bool, error) @@ -49,30 +76,34 @@ type Base interface { IsApplicableOrSetFailedPhase(checker genericservice.ApplicableChecker, vmop *v1alpha2.VirtualMachineOperation, vm *v1alpha2.VirtualMachine) bool } type LifecycleHandler struct { - client client.Client - migration *migrationservice.MigrationService - base Base - recorder eventrecord.EventRecorderLogger + client client.Client + migration *migrationservice.MigrationService + base Base + recorder eventrecord.EventRecorderLogger + progressStrategy migrationprogress.Strategy } func NewLifecycleHandler(client client.Client, migration *migrationservice.MigrationService, base Base, recorder eventrecord.EventRecorderLogger) *LifecycleHandler { return &LifecycleHandler{ - client: client, - migration: migration, - base: base, - recorder: recorder, + client: client, + migration: migration, + base: base, + recorder: recorder, + progressStrategy: migrationprogress.NewProgress(), } } func (h LifecycleHandler) Handle(ctx context.Context, vmop *v1alpha2.VirtualMachineOperation) (reconcile.Result, error) { // Do not update conditions for object in the deletion state. if commonvmop.IsTerminating(vmop) { + h.forgetProgress(vmop) vmop.Status.Phase = v1alpha2.VMOPPhaseTerminating return reconcile.Result{}, nil } // Ignore if VMOP is in final state. if commonvmop.IsFinished(vmop) { + h.forgetProgress(vmop) return reconcile.Result{}, nil } @@ -264,44 +295,57 @@ func (h LifecycleHandler) syncOperationComplete(ctx context.Context, vmop *v1alp vmop.Status.Phase = v1alpha2.VMOPPhaseFailed h.recorder.Event(vmop, corev1.EventTypeWarning, v1alpha2.ReasonErrVMOPFailed, "VirtualMachineOperation failed") - msg := "Migration failed" - if mig.Status.MigrationState != nil && mig.Status.MigrationState.FailureReason != "" { - msg += ": " + mig.Status.MigrationState.FailureReason - } - msgByFailedReason := getMessageByMigrationFailedReason(mig) - if msgByFailedReason != "" { - msg += ": " + msgByFailedReason + reason := h.getFailedReason(mig) + if reason == vmopcondition.ReasonFailed { + if prev, found := conditions.GetCondition(vmopcondition.TypeCompleted, vmop.Status.Conditions); found { + if prev.Reason == vmopcondition.ReasonNotConverging.String() { + reason = vmopcondition.ReasonNotConverging + } + } } + msg := h.getFailedMessage(reason, mig) + progress := h.calculateMigrationProgress(vmop, mig, reason) + vmop.Status.Progress = ptr.To(progress) completedCond. Status(metav1.ConditionFalse). - Reason(vmopcondition.ReasonOperationFailed). + Reason(reason). Message(msg) conditions.SetCondition(completedCond, &vmop.Status.Conditions) return nil case virtv1.MigrationSucceeded: vmop.Status.Phase = v1alpha2.VMOPPhaseCompleted h.recorder.Event(vmop, corev1.EventTypeNormal, v1alpha2.ReasonVMOPSucceeded, "VirtualMachineOperation succeeded") + vmop.Status.Progress = ptr.To(int32(100)) completedCond. Status(metav1.ConditionTrue). - Reason(vmopcondition.ReasonOperationCompleted) + Reason(vmopcondition.ReasonMigrationCompleted) conditions.SetCondition(completedCond, &vmop.Status.Conditions) return nil } // 3. Migration in progress. Set in progress phase - vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress - reason := mapMigrationPhaseToReason[mig.Status.Phase] - if reason == vmopcondition.ReasonMigrationPending { - vmop.Status.Phase = v1alpha2.VMOPPhasePending - } - - msg, err := h.getConditionCompletedMessageByReason(ctx, reason, mig) + reason, msg, err := h.getInProgressReasonAndMessage(ctx, mig) if err != nil { return err } + if reason == vmopcondition.ReasonSyncing { + record := migrationprogress.BuildRecord(vmop, mig, time.Now()) + if h.progressStrategy != nil && h.progressStrategy.IsNotConverging(record) { + reason = vmopcondition.ReasonNotConverging + msg = "Migration is not converging: data remaining is not decreasing at maximum throttle" + } + } + + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + if reason == vmopcondition.ReasonTargetScheduling { + vmop.Status.Phase = v1alpha2.VMOPPhasePending + } + progress := h.calculateMigrationProgress(vmop, mig, reason) + vmop.Status.Progress = ptr.To(progress) + completedCond. Status(metav1.ConditionFalse). Reason(reason). @@ -363,6 +407,7 @@ func (h LifecycleHandler) canExecute(vmop *v1alpha2.VirtualMachineOperation, vm if migratable.Status == metav1.ConditionTrue { vmop.Status.Phase = v1alpha2.VMOPPhasePending + vmop.Status.Progress = ptr.To(int32(1)) conditions.SetCondition( conditions.NewConditionBuilder(vmopcondition.TypeCompleted). Generation(vmop.GetGeneration()). @@ -406,17 +451,18 @@ func (h LifecycleHandler) execute(ctx context.Context, vmop *v1alpha2.VirtualMac return err } - vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress - reason := mapMigrationPhaseToReason[mig.Status.Phase] - if reason == vmopcondition.ReasonMigrationPending { - vmop.Status.Phase = v1alpha2.VMOPPhasePending - } - - msg, err := h.getConditionCompletedMessageByReason(ctx, reason, mig) + reason, msg, err := h.getInProgressReasonAndMessage(ctx, mig) if err != nil { return err } + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + if reason == vmopcondition.ReasonTargetScheduling { + vmop.Status.Phase = v1alpha2.VMOPPhasePending + } + progress := h.calculateMigrationProgress(vmop, mig, reason) + vmop.Status.Progress = ptr.To(progress) + conditions.SetCondition( conditions.NewConditionBuilder(vmopcondition.TypeCompleted). Generation(vmop.GetGeneration()). @@ -455,17 +501,6 @@ func (h LifecycleHandler) recordEvent(ctx context.Context, vmop *v1alpha2.Virtua } } -var mapMigrationPhaseToReason = map[virtv1.VirtualMachineInstanceMigrationPhase]vmopcondition.ReasonCompleted{ - virtv1.MigrationPhaseUnset: vmopcondition.ReasonMigrationPending, - virtv1.MigrationPending: vmopcondition.ReasonMigrationPending, - virtv1.MigrationScheduling: vmopcondition.ReasonMigrationPrepareTarget, - virtv1.MigrationScheduled: vmopcondition.ReasonMigrationPrepareTarget, - virtv1.MigrationTargetReady: vmopcondition.ReasonMigrationTargetReady, - virtv1.MigrationRunning: vmopcondition.ReasonMigrationRunning, - virtv1.MigrationSucceeded: vmopcondition.ReasonOperationCompleted, - virtv1.MigrationFailed: vmopcondition.ReasonOperationFailed, -} - func getMessageByMigrationFailedReason(mig *virtv1.VirtualMachineInstanceMigration) string { cond, found := conditions.GetKVVMIMCondition(virtv1.VirtualMachineInstanceMigrationFailed, mig.Status.Conditions) @@ -481,6 +516,169 @@ func getMessageByMigrationFailedReason(mig *virtv1.VirtualMachineInstanceMigrati return "" } +func (h LifecycleHandler) getFailedReason(mig *virtv1.VirtualMachineInstanceMigration) vmopcondition.ReasonCompleted { + if mig == nil { + return vmopcondition.ReasonFailed + } + + if mig.Status.MigrationState != nil { + state := mig.Status.MigrationState + if state.AbortRequested || state.AbortStatus == virtv1.MigrationAbortSucceeded { + return vmopcondition.ReasonAborted + } + if strings.Contains(strings.ToLower(state.FailureReason), "converg") || strings.Contains(strings.ToLower(state.FailureReason), "progress") { + return vmopcondition.ReasonNotConverging + } + } + + if cond, found := conditions.GetKVVMIMCondition(virtv1.VirtualMachineInstanceMigrationFailed, mig.Status.Conditions); found { + reason := strings.ToLower(cond.Reason + " " + cond.Message) + if strings.Contains(reason, "schedul") || strings.Contains(reason, "unschedul") { + return vmopcondition.ReasonTargetUnschedulable + } + if strings.Contains(reason, "csi") || strings.Contains(reason, "attach") || strings.Contains(reason, "volume") || strings.Contains(reason, "disk") { + return vmopcondition.ReasonTargetDiskError + } + } + + return vmopcondition.ReasonFailed +} + +func (h LifecycleHandler) getFailedMessage(reason vmopcondition.ReasonCompleted, mig *virtv1.VirtualMachineInstanceMigration) string { + base := "Migration failed" + switch reason { + case vmopcondition.ReasonAborted: + base = "Migration aborted" + case vmopcondition.ReasonNotConverging: + base = "Migration did not converge" + case vmopcondition.ReasonTargetUnschedulable: + base = "Migration failed: target pod is unschedulable" + case vmopcondition.ReasonTargetDiskError: + base = "Migration failed: target disk attach error" + } + + if mig != nil && mig.Status.MigrationState != nil && mig.Status.MigrationState.FailureReason != "" { + return fmt.Sprintf("%s: %s", base, mig.Status.MigrationState.FailureReason) + } + if msg := getMessageByMigrationFailedReason(mig); msg != "" { + return fmt.Sprintf("%s: %s", base, msg) + } + return base +} + +func (h LifecycleHandler) getInProgressReasonAndMessage( + ctx context.Context, + mig *virtv1.VirtualMachineInstanceMigration, +) (vmopcondition.ReasonCompleted, string, error) { + reason := vmopcondition.ReasonSyncing + message := messageSyncingSourceAndTarget + + switch mig.Status.Phase { + case virtv1.MigrationPhaseUnset, virtv1.MigrationPending, virtv1.MigrationScheduling: + reason = vmopcondition.ReasonTargetScheduling + message = messageTargetPodScheduling + case virtv1.MigrationScheduled, virtv1.MigrationPreparingTarget: + reason = vmopcondition.ReasonTargetPreparing + message = messageTargetPodPreparing + case virtv1.MigrationTargetReady, virtv1.MigrationWaitingForSync, virtv1.MigrationSynchronizing, virtv1.MigrationRunning: + reason = vmopcondition.ReasonSyncing + message = messageSyncingSourceAndTarget + } + + pod, err := h.getTargetPod(ctx, mig) + if err != nil { + return "", "", err + } + if isPodPendingUnschedulable(pod) { + return vmopcondition.ReasonTargetUnschedulable, fmt.Sprintf("Target pod %q is unschedulable", pod.Namespace+"/"+pod.Name), nil + } + if diskErrMsg, hasDiskErr := h.getTargetPodDiskError(ctx, pod); hasDiskErr { + return vmopcondition.ReasonTargetDiskError, fmt.Sprintf("Target pod has disk attach error: %s", diskErrMsg), nil + } + + if mig.Status.MigrationState != nil { + state := mig.Status.MigrationState + if state.TargetNodeDomainReadyTimestamp != nil { + reason = vmopcondition.ReasonTargetResumed + message = messageTargetVMResumed + } + if state.Completed { + reason = vmopcondition.ReasonSourceSuspended + message = messageSourceVMSuspended + } + } + + return reason, message, nil +} + +func (h LifecycleHandler) calculateMigrationProgress( + vmop *v1alpha2.VirtualMachineOperation, + mig *virtv1.VirtualMachineInstanceMigration, + reason vmopcondition.ReasonCompleted, +) int32 { + switch reason { + case vmopcondition.ReasonDisksPreparing: + return progressDisksPreparing + case vmopcondition.ReasonTargetScheduling: + return progressTargetScheduling + case vmopcondition.ReasonTargetUnschedulable: + return progressTargetScheduling + case vmopcondition.ReasonTargetPreparing: + return progressTargetPreparing + case vmopcondition.ReasonTargetDiskError: + return progressTargetPreparing + case vmopcondition.ReasonSyncing, vmopcondition.ReasonNotConverging: + record := migrationprogress.BuildRecord(vmop, mig, time.Now()) + return h.progressStrategy.SyncProgress(record) + case vmopcondition.ReasonSourceSuspended: + h.forgetProgress(vmop) + return progressSourceSuspended + case vmopcondition.ReasonTargetResumed: + h.forgetProgress(vmop) + return progressTargetResumed + case vmopcondition.ReasonMigrationCompleted: + h.forgetProgress(vmop) + return progressMigrationCompleted + default: + h.forgetProgress(vmop) + if vmop != nil && vmop.Status.Progress != nil { + return *vmop.Status.Progress + } + return 0 + } +} + +func (h LifecycleHandler) getTargetPodDiskError(ctx context.Context, pod *corev1.Pod) (string, bool) { + if pod == nil || !isContainerCreating(pod) || pod.DeletionTimestamp != nil { + return "", false + } + + eventList := &corev1.EventList{} + err := h.client.List(ctx, eventList, &client.ListOptions{ + Namespace: pod.Namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{ + "involvedObject.name": pod.Name, + "involvedObject.kind": "Pod", + }), + }) + if err != nil { + return "", false + } + for _, e := range eventList.Items { + if e.Type == corev1.EventTypeWarning && (e.Reason == reasonFailedAttachVolume || e.Reason == reasonFailedMount) { + return fmt.Sprintf("%s: %s", e.Reason, e.Message), true + } + } + return "", false +} + +func (h LifecycleHandler) forgetProgress(vmop *v1alpha2.VirtualMachineOperation) { + if h.progressStrategy == nil || vmop == nil { + return + } + h.progressStrategy.Forget(vmop.UID) +} + func (h LifecycleHandler) getTargetPod(ctx context.Context, mig *virtv1.VirtualMachineInstanceMigration) (*corev1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -505,6 +703,18 @@ func (h LifecycleHandler) getTargetPod(ctx context.Context, mig *virtv1.VirtualM return nil, nil } +func isContainerCreating(pod *corev1.Pod) bool { + if pod == nil || pod.Status.Phase != corev1.PodPending { + return false + } + for _, cs := range pod.Status.ContainerStatuses { + if cs.State.Waiting != nil && cs.State.Waiting.Reason == "ContainerCreating" { + return true + } + } + return false +} + func isPodPendingUnschedulable(pod *corev1.Pod) bool { if pod == nil { return false @@ -522,30 +732,3 @@ func isPodPendingUnschedulable(pod *corev1.Pod) bool { } return false } - -func (h LifecycleHandler) getConditionCompletedMessageByReason( - ctx context.Context, - reason vmopcondition.ReasonCompleted, - mig *virtv1.VirtualMachineInstanceMigration, -) (string, error) { - switch reason { - case vmopcondition.ReasonMigrationPending: - return "The VirtualMachineOperation for migrating the virtual machine has been queued. " + - "Waiting for the queue to be processed and for this operation to be executed.", nil - - case vmopcondition.ReasonMigrationPrepareTarget: - pod, err := h.getTargetPod(ctx, mig) - if err != nil { - return "", err - } - - if isPodPendingUnschedulable(pod) { - return fmt.Sprintf("Waiting for the virtual machine to be scheduled: "+ - "target pod \"%s/%s\" is unschedulable.", pod.Namespace, pod.Name), nil - } - return "The target environment is in the process of being prepared for migration.", nil - - default: - return "Wait until operation is completed.", nil - } -} diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go index 4b0495882e..d89e17b529 100644 --- a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/lifecycle_test.go @@ -19,10 +19,13 @@ package handler import ( "context" "fmt" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/component-base/featuregate" "k8s.io/utils/ptr" virtv1 "kubevirt.io/api/core/v1" @@ -31,15 +34,36 @@ import ( vmbuilder "github.com/deckhouse/virtualization-controller/pkg/builder/vm" vmopbuilder "github.com/deckhouse/virtualization-controller/pkg/builder/vmop" "github.com/deckhouse/virtualization-controller/pkg/common/testutil" + "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/reconciler" + migrationprogress "github.com/deckhouse/virtualization-controller/pkg/controller/vmop/migration/internal/progress" "github.com/deckhouse/virtualization-controller/pkg/controller/vmop/migration/internal/service" genericservice "github.com/deckhouse/virtualization-controller/pkg/controller/vmop/service" "github.com/deckhouse/virtualization-controller/pkg/eventrecord" "github.com/deckhouse/virtualization-controller/pkg/featuregates" "github.com/deckhouse/virtualization/api/core/v1alpha2" "github.com/deckhouse/virtualization/api/core/v1alpha2/vmcondition" + "github.com/deckhouse/virtualization/api/core/v1alpha2/vmopcondition" ) +type progressStrategyStub struct { + value int32 + isNotConverging bool + forgotten []types.UID +} + +func (s *progressStrategyStub) SyncProgress(_ migrationprogress.Record) int32 { + return s.value +} + +func (s *progressStrategyStub) IsNotConverging(_ migrationprogress.Record) bool { + return s.isNotConverging +} + +func (s *progressStrategyStub) Forget(uid types.UID) { + s.forgotten = append(s.forgotten, uid) +} + var _ = Describe("LifecycleHandler", func() { const ( name = "test" @@ -246,4 +270,606 @@ var _ = Describe("LifecycleHandler", func() { false, // targetMigrationEnabled ), ) + + Describe("migration progress integration", func() { + It("should return generic failed reason for nil migration", func() { + h := LifecycleHandler{} + + Expect(h.getFailedReason(nil)).To(Equal(vmopcondition.ReasonFailed)) + }) + + It("should forget progress for terminating vmop", func() { + stub := &progressStrategyStub{} + vmop := newVMOPMigrate() + now := metav1.Now() + vmop.DeletionTimestamp = &now + h := LifecycleHandler{progressStrategy: stub} + + _, err := h.Handle(ctx, vmop) + Expect(err).NotTo(HaveOccurred()) + Expect(vmop.Status.Phase).To(Equal(v1alpha2.VMOPPhaseTerminating)) + Expect(stub.forgotten).To(Equal([]types.UID{vmop.UID})) + }) + + It("should forget progress for finished vmop", func() { + stub := &progressStrategyStub{} + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseCompleted + h := LifecycleHandler{progressStrategy: stub} + + _, err := h.Handle(ctx, vmop) + Expect(err).NotTo(HaveOccurred()) + Expect(stub.forgotten).To(Equal([]types.UID{vmop.UID})) + }) + + DescribeTable("should detect failed reason", func(mig *virtv1.VirtualMachineInstanceMigration, expected vmopcondition.ReasonCompleted) { + h := LifecycleHandler{} + Expect(h.getFailedReason(mig)).To(Equal(expected)) + }, + Entry("aborted by request", + &virtv1.VirtualMachineInstanceMigration{Status: virtv1.VirtualMachineInstanceMigrationStatus{MigrationState: &virtv1.VirtualMachineInstanceMigrationState{AbortRequested: true}}}, + vmopcondition.ReasonAborted, + ), + Entry("aborted with succeeded status", + &virtv1.VirtualMachineInstanceMigration{Status: virtv1.VirtualMachineInstanceMigrationStatus{MigrationState: &virtv1.VirtualMachineInstanceMigrationState{AbortStatus: virtv1.MigrationAbortSucceeded}}}, + vmopcondition.ReasonAborted, + ), + Entry("not converging from failure reason", + &virtv1.VirtualMachineInstanceMigration{Status: virtv1.VirtualMachineInstanceMigrationStatus{MigrationState: &virtv1.VirtualMachineInstanceMigrationState{FailureReason: "no progress during convergence"}}}, + vmopcondition.ReasonNotConverging, + ), + Entry("target unschedulable from condition", + &virtv1.VirtualMachineInstanceMigration{Status: virtv1.VirtualMachineInstanceMigrationStatus{Conditions: []virtv1.VirtualMachineInstanceMigrationCondition{{Type: virtv1.VirtualMachineInstanceMigrationFailed, Reason: "Unschedulable", Message: "pod is unschedulable"}}}}, + vmopcondition.ReasonTargetUnschedulable, + ), + Entry("target disk error from condition", + &virtv1.VirtualMachineInstanceMigration{Status: virtv1.VirtualMachineInstanceMigrationStatus{Conditions: []virtv1.VirtualMachineInstanceMigrationCondition{{Type: virtv1.VirtualMachineInstanceMigrationFailed, Reason: "VolumeAttach", Message: "csi volume attach failed"}}}}, + vmopcondition.ReasonTargetDiskError, + ), + Entry("generic failed reason", + &virtv1.VirtualMachineInstanceMigration{}, + vmopcondition.ReasonFailed, + ), + ) + + DescribeTable("should build in-progress reason and message", func( + phase virtv1.VirtualMachineInstanceMigrationPhase, + state *virtv1.VirtualMachineInstanceMigrationState, + pod *corev1.Pod, + expectedReason vmopcondition.ReasonCompleted, + ) { + mig := newSimpleMigration("vmop-test", name) + mig.UID = "migration-uid" + mig.Status.Phase = phase + mig.Status.MigrationState = state + + objects := []client.Object{mig} + if pod != nil { + objects = append(objects, pod) + } + fakeClient, err := testutil.NewFakeClientWithObjects(objects...) + Expect(err).NotTo(HaveOccurred()) + + h := LifecycleHandler{client: fakeClient} + reason, _, err := h.getInProgressReasonAndMessage(ctx, mig) + Expect(err).NotTo(HaveOccurred()) + Expect(reason).To(Equal(expectedReason)) + }, + Entry("phase unset means target scheduling", + virtv1.MigrationPhaseUnset, + nil, + nil, + vmopcondition.ReasonTargetScheduling, + ), + Entry("scheduled means target preparing", + virtv1.MigrationScheduled, + nil, + nil, + vmopcondition.ReasonTargetPreparing, + ), + Entry("running means syncing", + virtv1.MigrationRunning, + nil, + nil, + vmopcondition.ReasonSyncing, + ), + Entry("unschedulable pod has priority", + virtv1.MigrationScheduling, + nil, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "target-pod", + Labels: map[string]string{ + virtv1.AppLabel: "virt-launcher", + virtv1.MigrationJobLabel: "migration-uid", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{{ + Type: corev1.PodScheduled, + Status: corev1.ConditionFalse, + Reason: corev1.PodReasonUnschedulable, + }}, + }, + }, + vmopcondition.ReasonTargetUnschedulable, + ), + Entry("target resumed after domain ready timestamp", + virtv1.MigrationRunning, + &virtv1.VirtualMachineInstanceMigrationState{TargetNodeDomainReadyTimestamp: &metav1.Time{Time: time.Now()}}, + nil, + vmopcondition.ReasonTargetResumed, + ), + Entry("source suspended after completed flag", + virtv1.MigrationRunning, + &virtv1.VirtualMachineInstanceMigrationState{Completed: true}, + nil, + vmopcondition.ReasonSourceSuspended, + ), + ) + + DescribeTable("should map progress by reason", func(reason vmopcondition.ReasonCompleted, initial *int32, expected int32) { + h := LifecycleHandler{progressStrategy: &progressStrategyStub{value: 55}} + vmop := &v1alpha2.VirtualMachineOperation{Status: v1alpha2.VirtualMachineOperationStatus{Progress: initial}} + mig := &virtv1.VirtualMachineInstanceMigration{} + + Expect(h.calculateMigrationProgress(vmop, mig, reason)).To(Equal(expected)) + }, + Entry("disks preparing", vmopcondition.ReasonDisksPreparing, nil, int32(1)), + Entry("target scheduling", vmopcondition.ReasonTargetScheduling, nil, int32(2)), + Entry("target unschedulable", vmopcondition.ReasonTargetUnschedulable, nil, int32(2)), + Entry("target preparing", vmopcondition.ReasonTargetPreparing, nil, int32(3)), + Entry("target disk error", vmopcondition.ReasonTargetDiskError, nil, int32(3)), + Entry("syncing delegates to strategy", vmopcondition.ReasonSyncing, nil, int32(55)), + Entry("source suspended", vmopcondition.ReasonSourceSuspended, nil, int32(91)), + Entry("target resumed", vmopcondition.ReasonTargetResumed, nil, int32(92)), + Entry("migration completed", vmopcondition.ReasonMigrationCompleted, nil, int32(100)), + Entry("unknown keeps existing progress", vmopcondition.ReasonFailed, ptr.To[int32](44), int32(44)), + ) + + It("should set syncing progress inside [10,90] for running migration", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + vmop.Status.Progress = ptr.To[int32](10) + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationRunning + mig.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{ + StartTimestamp: &metav1.Time{Time: time.Now().Add(-2 * time.Minute)}, + } + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhaseInProgress)) + Expect(srv.Changed().Status.Progress).NotTo(BeNil()) + Expect(*srv.Changed().Status.Progress).To(BeNumerically(">=", migrationprogress.SyncRangeMin)) + Expect(*srv.Changed().Status.Progress).To(BeNumerically("<=", migrationprogress.SyncRangeMax)) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonSyncing.String())) + }) + + It("should set pending phase and progress to 2 for scheduling migration", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationScheduling + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhasePending)) + Expect(srv.Changed().Status.Progress).NotTo(BeNil()) + Expect(*srv.Changed().Status.Progress).To(Equal(int32(2))) + }) + + It("should set aborted reason and preserve progress for failed migration", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + vmop.Status.Progress = ptr.To[int32](55) + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationFailed + mig.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{AbortRequested: true} + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhaseFailed)) + Expect(srv.Changed().Status.Progress).NotTo(BeNil()) + Expect(*srv.Changed().Status.Progress).To(Equal(int32(55))) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonAborted.String())) + }) + + It("should set progress to 100 for succeeded migration", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationSucceeded + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhaseCompleted)) + Expect(srv.Changed().Status.Progress).NotTo(BeNil()) + Expect(*srv.Changed().Status.Progress).To(Equal(int32(100))) + }) + + It("should override Syncing with NotConverging when strategy detects stall", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + vmop.Status.Progress = ptr.To[int32](50) + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationRunning + mig.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{ + StartTimestamp: &metav1.Time{Time: time.Now().Add(-2 * time.Minute)}, + } + + stub := &progressStrategyStub{value: 50, isNotConverging: true} + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + h.progressStrategy = stub + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhaseInProgress)) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonNotConverging.String())) + }) + + It("should stay Syncing when strategy does not detect stall", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + vmop.Status.Progress = ptr.To[int32](30) + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationRunning + mig.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{ + StartTimestamp: &metav1.Time{Time: time.Now().Add(-1 * time.Minute)}, + } + + stub := &progressStrategyStub{value: 30, isNotConverging: false} + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + h.progressStrategy = stub + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonSyncing.String())) + }) + + It("should prefer Aborted over NotConverging for terminal reason", func() { + h := LifecycleHandler{} + mig := &virtv1.VirtualMachineInstanceMigration{ + Status: virtv1.VirtualMachineInstanceMigrationStatus{ + MigrationState: &virtv1.VirtualMachineInstanceMigrationState{ + AbortRequested: true, + FailureReason: "no progress during convergence", + }, + }, + } + Expect(h.getFailedReason(mig)).To(Equal(vmopcondition.ReasonAborted)) + }) + + It("should set completed condition reason on success", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationSucceeded + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonMigrationCompleted.String())) + }) + + It("should use OperationFailed reason when migration is nil (mig==nil path)", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + vmop.Status.Conditions = []metav1.Condition{ + { + Type: vmopcondition.TypeSignalSent.String(), + Status: metav1.ConditionTrue, + Reason: vmopcondition.ReasonSignalSentSuccess.String(), + }, + } + + fakeClient, srv = setupEnvironment(vmop, vm) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhaseFailed)) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonOperationFailed.String())) + }) + + It("should set target preparing progress (3) for scheduled migration", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationScheduled + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Progress).NotTo(BeNil()) + Expect(*srv.Changed().Status.Progress).To(Equal(int32(3))) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonTargetPreparing.String())) + }) + + It("should set target resumed progress (92) when domain ready timestamp is set", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationRunning + mig.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{ + TargetNodeDomainReadyTimestamp: &metav1.Time{Time: time.Now()}, + } + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Progress).NotTo(BeNil()) + Expect(*srv.Changed().Status.Progress).To(Equal(int32(92))) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonTargetResumed.String())) + }) + + It("should set source suspended progress (91) when migration state completed flag is set", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationRunning + mig.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{ + Completed: true, + TargetNodeDomainReadyTimestamp: &metav1.Time{Time: time.Now()}, + } + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Progress).NotTo(BeNil()) + Expect(*srv.Changed().Status.Progress).To(Equal(int32(91))) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonSourceSuspended.String())) + }) + + It("should preserve NotConverging reason when migration fails with generic reason", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + vmop.Status.Progress = ptr.To[int32](60) + vmop.Status.Conditions = []metav1.Condition{ + { + Type: vmopcondition.TypeSignalSent.String(), + Status: metav1.ConditionTrue, + Reason: vmopcondition.ReasonSignalSentSuccess.String(), + }, + { + Type: vmopcondition.TypeCompleted.String(), + Status: metav1.ConditionFalse, + Reason: vmopcondition.ReasonNotConverging.String(), + }, + } + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationFailed + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhaseFailed)) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonNotConverging.String())) + }) + + It("should NOT preserve NotConverging when migration fails with specific reason (Aborted)", func() { + vm := newVM(v1alpha2.PreferSafeMigrationPolicy) + vmop := newVMOPMigrate() + vmop.Status.Phase = v1alpha2.VMOPPhaseInProgress + vmop.Status.Progress = ptr.To[int32](60) + vmop.Status.Conditions = []metav1.Condition{ + { + Type: vmopcondition.TypeSignalSent.String(), + Status: metav1.ConditionTrue, + Reason: vmopcondition.ReasonSignalSentSuccess.String(), + }, + { + Type: vmopcondition.TypeCompleted.String(), + Status: metav1.ConditionFalse, + Reason: vmopcondition.ReasonNotConverging.String(), + }, + } + + mig := newSimpleMigration(fmt.Sprintf("vmop-%s", vmop.Name), name) + mig.Status.Phase = virtv1.MigrationFailed + mig.Status.MigrationState = &virtv1.VirtualMachineInstanceMigrationState{ + AbortRequested: true, + } + + fakeClient, srv = setupEnvironment(vmop, vm, mig) + migrationService := service.NewMigrationService(fakeClient, featuregates.Default()) + base := genericservice.NewBaseVMOPService(fakeClient, recorderMock) + h := NewLifecycleHandler(fakeClient, migrationService, base, recorderMock) + + _, err := h.Handle(ctx, srv.Changed()) + Expect(err).NotTo(HaveOccurred()) + Expect(srv.Changed().Status.Phase).To(Equal(v1alpha2.VMOPPhaseFailed)) + + completed, found := conditions.GetCondition(vmopcondition.TypeCompleted, srv.Changed().Status.Conditions) + Expect(found).To(BeTrue()) + Expect(completed.Reason).To(Equal(vmopcondition.ReasonAborted.String())) + }) + + It("should return TargetDiskError when target pod has disk attach error", func() { + mig := newSimpleMigration("vmop-test", name) + mig.UID = "migration-uid" + mig.Status.Phase = virtv1.MigrationPreparingTarget + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "target-pod", + Labels: map[string]string{ + virtv1.AppLabel: "virt-launcher", + virtv1.MigrationJobLabel: "migration-uid", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "compute", + State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "ContainerCreating"}}, + }, + }, + }, + } + event := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "disk-event"}, + InvolvedObject: corev1.ObjectReference{Name: "target-pod", Kind: "Pod", Namespace: namespace}, + Type: corev1.EventTypeWarning, + Reason: "FailedAttachVolume", + Message: "failed to attach disk", + } + + fakeClient, err := testutil.NewFakeClientWithObjects(mig, pod, event) + Expect(err).NotTo(HaveOccurred()) + + h := LifecycleHandler{client: fakeClient} + reason, _, err := h.getInProgressReasonAndMessage(ctx, mig) + Expect(err).NotTo(HaveOccurred()) + Expect(reason).To(Equal(vmopcondition.ReasonTargetDiskError)) + }) + + It("should ignore target pod disk attach error when pod is not in ContainerCreating", func() { + mig := newSimpleMigration("vmop-test", name) + mig.UID = "migration-uid" + mig.Status.Phase = virtv1.MigrationPreparingTarget + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "target-pod", + Labels: map[string]string{ + virtv1.AppLabel: "virt-launcher", + virtv1.MigrationJobLabel: "migration-uid", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "compute", + State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "ImagePullBackOff"}}, + }, + }, + }, + } + event := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "disk-event"}, + InvolvedObject: corev1.ObjectReference{Name: "target-pod", Kind: "Pod", Namespace: namespace}, + Type: corev1.EventTypeWarning, + Reason: "FailedAttachVolume", + Message: "failed to attach disk", + } + + fakeClient, err := testutil.NewFakeClientWithObjects(mig, pod, event) + Expect(err).NotTo(HaveOccurred()) + + h := LifecycleHandler{client: fakeClient} + reason, _, err := h.getInProgressReasonAndMessage(ctx, mig) + Expect(err).NotTo(HaveOccurred()) + Expect(reason).To(Equal(vmopcondition.ReasonTargetPreparing)) + }) + }) }) diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/suite_test.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/suite_test.go index 12cbc6a48c..a49f67229e 100644 --- a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/suite_test.go +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/handler/suite_test.go @@ -62,7 +62,7 @@ func setupEnvironment(vmop *v1alpha2.VirtualMachineOperation, objs ...client.Obj return fakeClient, srv } -func newSimpleMigration(name, namespace, vm string) *virtv1.VirtualMachineInstanceMigration { +func newSimpleMigration(name, vm string) *virtv1.VirtualMachineInstanceMigration { return &virtv1.VirtualMachineInstanceMigration{ TypeMeta: metav1.TypeMeta{ APIVersion: virtv1.SchemeGroupVersion.String(), @@ -70,7 +70,7 @@ func newSimpleMigration(name, namespace, vm string) *virtv1.VirtualMachineInstan }, ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: namespace, + Namespace: "default", }, Spec: virtv1.VirtualMachineInstanceMigrationSpec{ VMIName: vm, diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/mapper.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/mapper.go new file mode 100644 index 0000000000..77446e4b79 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/mapper.go @@ -0,0 +1,122 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + "time" + + virtv1 "kubevirt.io/api/core/v1" + + "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +const unknownMetric = -1.0 + +func BuildRecord(vmop *v1alpha2.VirtualMachineOperation, mig *virtv1.VirtualMachineInstanceMigration, now time.Time) Record { + record := Record{ + Now: now, + StartedAt: now, + PreviousProgress: previousProgress(vmop), + DataTotalMiB: unknownMetric, + DataProcessedMiB: unknownMetric, + DataRemainingMiB: unknownMetric, + } + + if vmop != nil { + record.OperationUID = vmop.UID + record.StartedAt = vmop.CreationTimestamp.Time + } + + if mig == nil { + return record + } + + record.Phase = mig.Status.Phase + if state := mig.Status.MigrationState; state != nil { + if state.StartTimestamp != nil { + record.StartedAt = state.StartTimestamp.Time + } + record.Mode = state.Mode + record.Iteration, record.HasIteration = mapIteration(state) + record.AutoConvergeThrottle, record.HasThrottle = mapThrottle(state) + record.Throttle = normalizeThrottle(record.AutoConvergeThrottle, record.HasThrottle) + record.DataTotalMiB = mapDataTotalMiB(state) + record.DataProcessedMiB = mapDataProcessedMiB(state) + record.DataRemainingMiB = mapDataRemainingMiB(state) + if state.MigrationConfiguration != nil && state.MigrationConfiguration.AllowAutoConverge != nil { + record.AutoConverge = *state.MigrationConfiguration.AllowAutoConverge + } + } + + return record +} + +func mapBytesToMiB(v *uint64) float64 { + if v == nil { + return unknownMetric + } + return float64(*v) / (1024.0 * 1024.0) +} + +func previousProgress(vmop *v1alpha2.VirtualMachineOperation) int32 { + if vmop == nil || vmop.Status.Progress == nil { + return SyncRangeMin + } + return *vmop.Status.Progress +} + +func mapIteration(state *virtv1.VirtualMachineInstanceMigrationState) (uint32, bool) { + if state == nil || state.TransferStatus == nil || state.TransferStatus.Iteration == nil { + return 0, false + } + return *state.TransferStatus.Iteration, true +} + +func mapThrottle(state *virtv1.VirtualMachineInstanceMigrationState) (uint32, bool) { + if state == nil || state.TransferStatus == nil || state.TransferStatus.AutoConvergeThrottle == nil { + return 0, false + } + return *state.TransferStatus.AutoConvergeThrottle, true +} + +func mapDataTotalMiB(state *virtv1.VirtualMachineInstanceMigrationState) float64 { + if state == nil || state.TransferStatus == nil { + return unknownMetric + } + return mapBytesToMiB(state.TransferStatus.DataTotalBytes) +} + +func mapDataProcessedMiB(state *virtv1.VirtualMachineInstanceMigrationState) float64 { + if state == nil || state.TransferStatus == nil { + return unknownMetric + } + return mapBytesToMiB(state.TransferStatus.DataProcessedBytes) +} + +func mapDataRemainingMiB(state *virtv1.VirtualMachineInstanceMigrationState) float64 { + if state == nil || state.TransferStatus == nil { + return unknownMetric + } + return mapBytesToMiB(state.TransferStatus.DataRemainingBytes) +} + +func normalizeThrottle(raw uint32, ok bool) float64 { + if !ok { + return 0 + } + return clampFloat(float64(raw)/100.0, 0, 1) +} diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/mapper_test.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/mapper_test.go new file mode 100644 index 0000000000..9395a1551a --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/mapper_test.go @@ -0,0 +1,299 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + virtv1 "kubevirt.io/api/core/v1" + + "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +func TestBuildRecord_NilVMOPAndMigration(t *testing.T) { + now := time.Unix(1710000000, 0) + + record := BuildRecord(nil, nil, now) + + if !record.StartedAt.Equal(now) { + t.Fatalf("expected StartedAt=%v, got %v", now, record.StartedAt) + } + if record.PreviousProgress != SyncRangeMin { + t.Fatalf("expected PreviousProgress=%d, got %d", SyncRangeMin, record.PreviousProgress) + } + if record.DataTotalMiB != unknownMetric || record.DataProcessedMiB != unknownMetric || record.DataRemainingMiB != unknownMetric { + t.Fatalf("expected unknown metrics, got total=%v processed=%v remaining=%v", record.DataTotalMiB, record.DataProcessedMiB, record.DataRemainingMiB) + } +} + +func TestBuildRecord_UsesVMOPCreationTimestampAndPreviousProgress(t *testing.T) { + now := time.Unix(1710000000, 0) + vmop := &v1alpha2.VirtualMachineOperation{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("vmop-uid"), + CreationTimestamp: metav1.NewTime(now.Add(-3 * time.Minute)), + }, + Status: v1alpha2.VirtualMachineOperationStatus{Progress: ptr.To[int32](42)}, + } + + record := BuildRecord(vmop, nil, now) + + if record.OperationUID != vmop.UID { + t.Fatalf("expected OperationUID=%s, got %s", vmop.UID, record.OperationUID) + } + if !record.StartedAt.Equal(vmop.CreationTimestamp.Time) { + t.Fatalf("expected StartedAt=%v, got %v", vmop.CreationTimestamp.Time, record.StartedAt) + } + if record.PreviousProgress != 42 { + t.Fatalf("expected PreviousProgress=42, got %d", record.PreviousProgress) + } +} + +func TestBuildRecord_UsesMigrationState(t *testing.T) { + now := time.Unix(1710000000, 0) + start := metav1.NewTime(now.Add(-5 * time.Minute)) + totalBytes := uint64(1024 * 1024 * 1024) + processedBytes := uint64(512 * 1024 * 1024) + remainingBytes := uint64(256 * 1024 * 1024) + iteration := uint32(10) + autoConvergeThrottle := uint32(50) + mig := &virtv1.VirtualMachineInstanceMigration{ + Status: virtv1.VirtualMachineInstanceMigrationStatus{ + Phase: virtv1.MigrationRunning, + MigrationState: &virtv1.VirtualMachineInstanceMigrationState{ + StartTimestamp: &start, + Mode: virtv1.MigrationPreCopy, + TransferStatus: &virtv1.VirtualMachineInstanceMigrationTransferStatus{ + Iteration: &iteration, + AutoConvergeThrottle: &autoConvergeThrottle, + DataTotalBytes: &totalBytes, + DataProcessedBytes: &processedBytes, + DataRemainingBytes: &remainingBytes, + }, + }, + }, + } + + record := BuildRecord(nil, mig, now) + + if record.Phase != virtv1.MigrationRunning { + t.Fatalf("expected Phase=%s, got %s", virtv1.MigrationRunning, record.Phase) + } + if !record.StartedAt.Equal(start.Time) { + t.Fatalf("expected StartedAt=%v, got %v", start.Time, record.StartedAt) + } + if record.Mode != virtv1.MigrationPreCopy { + t.Fatalf("expected Mode=%s, got %s", virtv1.MigrationPreCopy, record.Mode) + } + if !record.HasIteration || record.Iteration != 10 { + t.Fatalf("expected Iteration=10 with flag, got value=%d has=%v", record.Iteration, record.HasIteration) + } + if !record.HasThrottle || record.AutoConvergeThrottle != 50 { + t.Fatalf("expected AutoConvergeThrottle=50 with flag, got value=%d has=%v", record.AutoConvergeThrottle, record.HasThrottle) + } + if record.Throttle != 0.5 { + t.Fatalf("expected normalized Throttle=0.5, got %v", record.Throttle) + } + if record.DataTotalMiB != 1024 || record.DataProcessedMiB != 512 || record.DataRemainingMiB != 256 { + t.Fatalf("expected mapped MiB counters, got total=%v processed=%v remaining=%v", record.DataTotalMiB, record.DataProcessedMiB, record.DataRemainingMiB) + } +} + +func TestPreviousProgress(t *testing.T) { + tests := []struct { + name string + vmop *v1alpha2.VirtualMachineOperation + want int32 + }{ + { + name: "nil vmop", + vmop: nil, + want: SyncRangeMin, + }, + { + name: "nil progress", + vmop: &v1alpha2.VirtualMachineOperation{}, + want: SyncRangeMin, + }, + { + name: "explicit progress", + vmop: &v1alpha2.VirtualMachineOperation{Status: v1alpha2.VirtualMachineOperationStatus{Progress: ptr.To[int32](37)}}, + want: 37, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := previousProgress(tt.vmop); got != tt.want { + t.Fatalf("previousProgress() = %d, want %d", got, tt.want) + } + }) + } +} + +func TestMapIteration(t *testing.T) { + tests := []struct { + name string + state *virtv1.VirtualMachineInstanceMigrationState + want uint32 + wantSet bool + }{ + { + name: "nil state", + state: nil, + want: 0, + wantSet: false, + }, + { + name: "missing iteration", + state: &virtv1.VirtualMachineInstanceMigrationState{}, + want: 0, + wantSet: false, + }, + { + name: "explicit iteration", + state: &virtv1.VirtualMachineInstanceMigrationState{TransferStatus: &virtv1.VirtualMachineInstanceMigrationTransferStatus{ + Iteration: ptr.To[uint32](7), + }}, + want: 7, + wantSet: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, gotSet := mapIteration(tt.state) + if got != tt.want || gotSet != tt.wantSet { + t.Fatalf("mapIteration() = (%d,%v), want (%d,%v)", got, gotSet, tt.want, tt.wantSet) + } + }) + } +} + +func TestMapBytesToMiB(t *testing.T) { + if got := mapBytesToMiB(nil); got != unknownMetric { + t.Fatalf("expected unknown metric for nil, got %v", got) + } + + bytes := uint64(3 * 1024 * 1024) + if got := mapBytesToMiB(&bytes); got != 3 { + t.Fatalf("expected 3 MiB, got %v", got) + } +} + +func TestMapThrottle(t *testing.T) { + tests := []struct { + name string + state *virtv1.VirtualMachineInstanceMigrationState + wantRaw uint32 + wantSet bool + wantValue float64 + }{ + { + name: "nil state", + state: nil, + wantRaw: 0, + wantSet: false, + wantValue: 0, + }, + { + name: "missing throttle", + state: &virtv1.VirtualMachineInstanceMigrationState{}, + wantRaw: 0, + wantSet: false, + wantValue: 0, + }, + { + name: "explicit throttle", + state: &virtv1.VirtualMachineInstanceMigrationState{TransferStatus: &virtv1.VirtualMachineInstanceMigrationTransferStatus{ + AutoConvergeThrottle: ptr.To[uint32](70), + }}, + wantRaw: 70, + wantSet: true, + wantValue: 0.7, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + raw, gotSet := mapThrottle(tt.state) + if raw != tt.wantRaw || gotSet != tt.wantSet { + t.Fatalf("mapThrottle() = (%d,%v), want (%d,%v)", raw, gotSet, tt.wantRaw, tt.wantSet) + } + if got := normalizeThrottle(raw, gotSet); got != tt.wantValue { + t.Fatalf("normalizeThrottle() = %v, want %v", got, tt.wantValue) + } + }) + } +} + +func TestBuildRecord_AutoConvergeFromMigrationConfiguration(t *testing.T) { + now := time.Unix(1710000000, 0) + allowAutoConverge := true + mig := &virtv1.VirtualMachineInstanceMigration{ + Status: virtv1.VirtualMachineInstanceMigrationStatus{ + MigrationState: &virtv1.VirtualMachineInstanceMigrationState{ + MigrationConfiguration: &virtv1.MigrationConfiguration{ + AllowAutoConverge: &allowAutoConverge, + }, + }, + }, + } + + record := BuildRecord(nil, mig, now) + if !record.AutoConverge { + t.Fatal("expected AutoConverge=true from MigrationConfiguration.AllowAutoConverge") + } +} + +func TestBuildRecord_AutoConverge_False_WhenNotSet(t *testing.T) { + now := time.Unix(1710000000, 0) + + recordNoMig := BuildRecord(nil, nil, now) + if recordNoMig.AutoConverge { + t.Fatal("expected AutoConverge=false when mig is nil") + } + + migNoConfig := &virtv1.VirtualMachineInstanceMigration{ + Status: virtv1.VirtualMachineInstanceMigrationStatus{ + MigrationState: &virtv1.VirtualMachineInstanceMigrationState{}, + }, + } + recordNoConfig := BuildRecord(nil, migNoConfig, now) + if recordNoConfig.AutoConverge { + t.Fatal("expected AutoConverge=false when MigrationConfiguration is nil") + } + + allowAutoConverge := false + migFalse := &virtv1.VirtualMachineInstanceMigration{ + Status: virtv1.VirtualMachineInstanceMigrationStatus{ + MigrationState: &virtv1.VirtualMachineInstanceMigrationState{ + MigrationConfiguration: &virtv1.MigrationConfiguration{ + AllowAutoConverge: &allowAutoConverge, + }, + }, + }, + } + recordFalse := BuildRecord(nil, migFalse, now) + if recordFalse.AutoConverge { + t.Fatal("expected AutoConverge=false when AllowAutoConverge=false") + } +} diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/progress.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/progress.go new file mode 100644 index 0000000000..cd7cc0547a --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/progress.go @@ -0,0 +1,426 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + "math" + "time" + + "k8s.io/apimachinery/pkg/types" + virtv1 "kubevirt.io/api/core/v1" +) + +const ( + SyncRangeMin int32 = 10 + SyncRangeMax int32 = 90 + + bulkCeiling = 45.0 + iterativeFloor = 45.0 + iterativeCeiling = 90.0 + thresholdFactor = 0.05 + + notConvergingWindow = 10 * time.Second + + bulkTimeRate = 0.55 + iterBaseTimeRate = 0.022 + iterThrottleRate = 0.0012 + bulkMetricWeight = 0.80 + bulkTimeWeight = 0.20 + iterMetricWeight = 0.76 + iterTimeWeight = 0.24 + smoothAlphaUp = 0.18 + smoothAlphaDown = 0.34 + bulkStallSeconds = 10.0 + iterStallSeconds = 8.0 + finalStallSeconds = 6.0 +) + +type Strategy interface { + SyncProgress(record Record) int32 + IsNotConverging(record Record) bool + Forget(uid types.UID) +} + +type Record struct { + OperationUID types.UID + Now time.Time + StartedAt time.Time + PreviousProgress int32 + Phase virtv1.VirtualMachineInstanceMigrationPhase + Mode virtv1.MigrationMode + HasIteration bool + Iteration uint32 + HasThrottle bool + AutoConvergeThrottle uint32 + Throttle float64 + AutoConverge bool + DataTotalMiB float64 + DataProcessedMiB float64 + DataRemainingMiB float64 +} + +type Progress struct { + store *Store +} + +func NewProgress() *Progress { + return &Progress{store: NewStore()} +} + +func (p *Progress) Forget(uid types.UID) { + if p == nil || p.store == nil || uid == "" { + return + } + p.store.Delete(uid) +} + +func (p *Progress) SyncProgress(record Record) int32 { + state := p.getState(record) + + prev := clampSyncRange(record.PreviousProgress) + if state.Progress > prev { + prev = state.Progress + } + + elapsed := record.Now.Sub(record.StartedAt) + if elapsed < 0 { + elapsed = 0 + } + elapsedSec := elapsed.Seconds() + + iterative := isIterative(record) + if iterative && !state.Iterative { + state.Iterative = true + state.IterativeSince = record.Now + p.initIterative(record, &state, elapsedSec) + } + + if iterative { + observeRemaining(record, &state) + } + + target := bulkTarget(record, elapsedSec) + if iterative { + target = iterativeTarget(record, &state, elapsedSec) + } + + maxStep := int32(10) + if iterative { + maxStep = 5 + } + + cap := stageCap(iterative) + progress := math.Max(float64(prev), math.Min(target, cap)) + next := clampPercent(progress) + if next < prev { + next = prev + } + if next > prev+maxStep { + next = prev + maxStep + } + + if next == prev && float64(next) < cap { + lastIncrease := state.LastIncreaseAt + if lastIncrease.IsZero() { + lastIncrease = record.StartedAt + } + stallWin := stallWindow(record, &state, iterative) + if record.Now.Sub(lastIncrease).Seconds() >= stallWin { + next++ + } + } + + if float64(next) > cap { + next = int32(cap) + } + + if next > prev { + state.LastIncreaseAt = record.Now + } + + updateMetricState(record, &state) + updateMinRemaining(record, &state) + state.Progress = next + state.LastUpdatedAt = record.Now + state.LastIteration = record.Iteration + state.Iterative = iterative + + if record.OperationUID != "" { + p.store.Store(record.OperationUID, state) + } + + return next +} + +func (p *Progress) IsNotConverging(record Record) bool { + if p == nil || p.store == nil || record.OperationUID == "" { + return false + } + + state, ok := p.store.Load(record.OperationUID) + if !ok || !state.Iterative { + return false + } + + if !isAtMaxThrottle(record) { + return false + } + + if state.MinRemaining <= 0 || state.MinRemainingAt.IsZero() { + return false + } + + return record.Now.Sub(state.MinRemainingAt) >= notConvergingWindow +} + +func (p *Progress) getState(record Record) State { + if p == nil || p.store == nil || record.OperationUID == "" { + return State{Progress: clampSyncRange(record.PreviousProgress), LastMetricAt: record.Now} + } + state, ok := p.store.Load(record.OperationUID) + if !ok { + state = State{ + Progress: clampSyncRange(record.PreviousProgress), + LastMetricAt: record.Now, + } + } + return state +} + +func (p *Progress) initIterative(record Record, state *State, _ float64) { + total := record.DataTotalMiB + if total <= 0 { + total = 1 + } + if total > state.InitialTotal { + state.InitialTotal = total + } + if state.InitialTotal <= 0 { + state.InitialTotal = total + } + + remaining := maxRemaining(record) + if remaining <= 0 { + remaining = state.InitialTotal + } + + state.Threshold = math.Max(math.Ceil(state.InitialTotal*thresholdFactor), 1) + state.InitialRemaining = math.Max(remaining, state.Threshold) + state.SmoothedRemaining = state.InitialRemaining +} + +func observeRemaining(record Record, state *State) { + remaining := maxRemaining(record) + if remaining <= 0 { + return + } + + alpha := smoothAlphaUp + if remaining < state.SmoothedRemaining { + alpha = smoothAlphaDown + } + if record.Throttle >= 0.80 { + alpha += 0.08 + } + if alpha > 0.90 { + alpha = 0.90 + } + + if state.SmoothedRemaining <= 0 { + state.SmoothedRemaining = remaining + } else { + state.SmoothedRemaining = alpha*remaining + (1-alpha)*state.SmoothedRemaining + } +} + +func bulkTarget(record Record, elapsedSec float64) float64 { + total := record.DataTotalMiB + if total <= 0 { + total = 1 + } + + processed := math.Max(record.DataProcessedMiB, 0) + metricRatio := clampFloat(processed/total, 0, 1) + metricPct := float64(SyncRangeMin) + (bulkCeiling-float64(SyncRangeMin))*metricRatio + + timePct := float64(SyncRangeMin) + elapsedSec*bulkTimeRate + if timePct > bulkCeiling { + timePct = bulkCeiling + } + + return bulkMetricWeight*metricPct + bulkTimeWeight*timePct +} + +func iterativeTarget(record Record, state *State, elapsedSec float64) float64 { + metricRatio := iterativeMetricRatio(state) + metricPct := iterativeFloor + (iterativeCeiling-5-iterativeFloor)*metricRatio + + throttle := record.Throttle + iterSince := state.IterativeSince + if iterSince.IsZero() { + iterSince = record.Now + } + iterElapsed := math.Max(0, elapsedSec-record.Now.Sub(iterSince).Seconds()+record.Now.Sub(iterSince).Seconds()) + iterElapsedSec := math.Max(0, record.Now.Sub(iterSince).Seconds()) + + timeRate := iterBaseTimeRate + throttle*iterThrottleRate + timePct := iterativeFloor + iterElapsedSec*timeRate + if timePct > iterativeCeiling { + timePct = iterativeCeiling + } + _ = iterElapsed + + target := iterMetricWeight*metricPct + iterTimeWeight*timePct + return math.Min(target, iterativeCeiling) +} + +func iterativeMetricRatio(state *State) float64 { + if state.InitialRemaining <= state.Threshold { + return 1 + } + + current := math.Max(state.SmoothedRemaining, state.Threshold) + initial := math.Max(state.InitialRemaining, state.Threshold) + base := math.Log(initial / state.Threshold) + if base <= 0 { + return 1 + } + + ratio := 1 - math.Log(current/state.Threshold)/base + return clampFloat(ratio, 0, 1) +} + +func stageCap(iterative bool) float64 { + if !iterative { + return bulkCeiling + } + return iterativeCeiling +} + +func stallWindow(record Record, state *State, iterative bool) float64 { + if !iterative { + return bulkStallSeconds + } + + if state.Progress >= int32(iterativeCeiling)-2 { + return 24.0 + } + if state.Progress >= int32(iterativeCeiling)-5 { + return 14.0 + } + if state.SmoothedRemaining > 0 && state.SmoothedRemaining <= state.Threshold { + return finalStallSeconds + } + + window := iterStallSeconds - 3*record.Throttle + if window < finalStallSeconds { + return finalStallSeconds + } + return window +} + +func isIterative(record Record) bool { + return record.HasIteration && record.Iteration > 0 +} + +func maxRemaining(record Record) float64 { + if record.DataRemainingMiB > 0 { + return record.DataRemainingMiB + } + if record.DataTotalMiB > 0 && record.DataProcessedMiB >= 0 { + r := record.DataTotalMiB - record.DataProcessedMiB + if r > 0 { + return r + } + } + return 0 +} + +func updateMinRemaining(record Record, state *State) { + remaining := maxRemaining(record) + if remaining <= 0 { + return + } + if state.MinRemaining <= 0 || remaining < state.MinRemaining { + state.MinRemaining = remaining + state.MinRemainingAt = record.Now + } +} + +func isAtMaxThrottle(record Record) bool { + if !record.AutoConverge { + return true + } + return record.HasThrottle && record.Throttle >= 0.99 +} + +func updateMetricState(record Record, state *State) { + if !metricChanged(record, state) { + return + } + state.LastMetricAt = record.Now + state.LastProcessedMiB = record.DataProcessedMiB + state.LastRemainingMiB = record.DataRemainingMiB +} + +func metricChanged(record Record, state *State) bool { + if state.LastMetricAt.IsZero() { + return true + } + if record.DataProcessedMiB >= 0 && !almostEqual(record.DataProcessedMiB, state.LastProcessedMiB) { + return true + } + if record.DataRemainingMiB >= 0 && !almostEqual(record.DataRemainingMiB, state.LastRemainingMiB) { + return true + } + return false +} + +func almostEqual(a, b float64) bool { + return math.Abs(a-b) < 0.01 +} + +func clampPercent(v float64) int32 { + i := int32(v) + if i < SyncRangeMin { + return SyncRangeMin + } + if i > SyncRangeMax { + return SyncRangeMax + } + return i +} + +func clampFloat(v, minV, maxV float64) float64 { + if v < minV { + return minV + } + if v > maxV { + return maxV + } + return v +} + +func clampSyncRange(v int32) int32 { + if v < SyncRangeMin { + return SyncRangeMin + } + if v > SyncRangeMax { + return SyncRangeMax + } + return v +} diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/progress_test.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/progress_test.go new file mode 100644 index 0000000000..d1296af6cf --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/progress_test.go @@ -0,0 +1,603 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + "math" + "testing" + "time" + + "k8s.io/apimachinery/pkg/types" + virtv1 "kubevirt.io/api/core/v1" +) + +func TestProgress_MonotonicGrowth(t *testing.T) { + now := time.Now() + p := NewProgress() + uid := types.UID("vmop") + + first := p.SyncProgress(Record{ + OperationUID: uid, + Now: now, + StartedAt: now.Add(-20 * time.Second), + PreviousProgress: 10, + Phase: virtv1.MigrationRunning, + DataTotalMiB: 1024, + DataProcessedMiB: 100, + DataRemainingMiB: 900, + }) + second := p.SyncProgress(Record{ + OperationUID: uid, + Now: now.Add(40 * time.Second), + StartedAt: now.Add(-80 * time.Second), + PreviousProgress: first, + Phase: virtv1.MigrationRunning, + DataTotalMiB: 1024, + DataProcessedMiB: 200, + DataRemainingMiB: 800, + }) + + if second < first { + t.Fatalf("expected monotonic progress, first=%d second=%d", first, second) + } +} + +func TestProgress_SyncRangeCaps(t *testing.T) { + now := time.Now() + p := NewProgress() + + progress := p.SyncProgress(Record{ + OperationUID: types.UID("vmop"), + Now: now, + StartedAt: now.Add(-2 * time.Hour), + PreviousProgress: 10, + Phase: virtv1.MigrationRunning, + HasIteration: true, + Iteration: 1, + HasThrottle: true, + AutoConvergeThrottle: 100, + Throttle: 1, + DataTotalMiB: 1024, + DataProcessedMiB: 2048, + DataRemainingMiB: 0, + }) + + if progress < SyncRangeMin || progress > SyncRangeMax { + t.Fatalf("expected progress in sync range [%d,%d], got=%d", SyncRangeMin, SyncRangeMax, progress) + } +} + +func TestProgress_StallBump(t *testing.T) { + now := time.Now() + p := NewProgress() + uid := types.UID("vmop") + + first := p.SyncProgress(Record{ + OperationUID: uid, + Now: now, + StartedAt: now.Add(-50 * time.Second), + PreviousProgress: 30, + Phase: virtv1.MigrationRunning, + DataTotalMiB: 1024, + DataProcessedMiB: 300, + DataRemainingMiB: 700, + }) + + var progress int32 + for i := 1; i <= 5; i++ { + stallDuration := time.Duration(i) * time.Duration(bulkStallSeconds+2) * time.Second + progress = p.SyncProgress(Record{ + OperationUID: uid, + Now: now.Add(stallDuration), + StartedAt: now.Add(-50 * time.Second), + PreviousProgress: progress, + Phase: virtv1.MigrationRunning, + DataTotalMiB: 1024, + DataProcessedMiB: 300, + DataRemainingMiB: 700, + }) + } + + if progress <= first { + t.Fatalf("expected stall bump to increase progress beyond %d, got=%d", first, progress) + } +} + +func TestProgress_DegradedModeWithoutMetrics(t *testing.T) { + now := time.Now() + p := NewProgress() + + progress := p.SyncProgress(Record{ + OperationUID: types.UID("vmop"), + Now: now, + StartedAt: now.Add(-2 * time.Minute), + PreviousProgress: 10, + Phase: virtv1.MigrationRunning, + DataTotalMiB: unknownMetric, + DataProcessedMiB: unknownMetric, + DataRemainingMiB: unknownMetric, + }) + + if progress < SyncRangeMin || progress > SyncRangeMax { + t.Fatalf("expected degraded-mode progress in sync range [%d,%d], got=%d", SyncRangeMin, SyncRangeMax, progress) + } +} + +func TestProgress_WithMetricsInBulkPhase(t *testing.T) { + now := time.Now() + p := NewProgress() + + progress := p.SyncProgress(Record{ + OperationUID: types.UID("vmop"), + Now: now, + StartedAt: now.Add(-30 * time.Second), + PreviousProgress: 10, + Phase: virtv1.MigrationRunning, + DataTotalMiB: 1024, + DataProcessedMiB: 512, + }) + + if progress <= SyncRangeMin || progress >= SyncRangeMax { + t.Fatalf("expected bulk progress strictly inside sync range, got=%d", progress) + } +} + +func TestProgress_EntersIterativePhaseByIteration(t *testing.T) { + now := time.Now() + p := NewProgress() + uid := types.UID("vmop") + + bulk := p.SyncProgress(Record{ + OperationUID: uid, + Now: now, + StartedAt: now.Add(-30 * time.Second), + PreviousProgress: 10, + Phase: virtv1.MigrationRunning, + DataTotalMiB: 1024, + DataProcessedMiB: 512, + DataRemainingMiB: 512, + }) + iterative := p.SyncProgress(Record{ + OperationUID: uid, + Now: now.Add(40 * time.Second), + StartedAt: now.Add(-3 * time.Minute), + PreviousProgress: bulk, + Phase: virtv1.MigrationRunning, + HasIteration: true, + Iteration: 2, + HasThrottle: true, + AutoConvergeThrottle: 50, + Throttle: 0.5, + DataTotalMiB: 1024, + DataProcessedMiB: 960, + DataRemainingMiB: 64, + }) + + if iterative <= bulk { + t.Fatalf("expected iterative progress to be greater than bulk progress, bulk=%d iterative=%d", bulk, iterative) + } +} + +func TestProgress_UsesRemainingDataFallback(t *testing.T) { + now := time.Now() + p := NewProgress() + + progress := p.SyncProgress(Record{ + OperationUID: types.UID("vmop"), + Now: now, + StartedAt: now.Add(-90 * time.Second), + PreviousProgress: 10, + Phase: virtv1.MigrationRunning, + DataTotalMiB: 100, + DataProcessedMiB: unknownMetric, + DataRemainingMiB: 25, + }) + + if progress <= SyncRangeMin { + t.Fatalf("expected fallback metric progress above SyncRangeMin, got=%d", progress) + } +} + +func TestProgress_ZeroElapsed(t *testing.T) { + now := time.Now() + p := NewProgress() + + progress := p.SyncProgress(Record{ + OperationUID: types.UID("vmop"), + Now: now, + StartedAt: now, + PreviousProgress: SyncRangeMin, + Phase: virtv1.MigrationPending, + }) + + if progress != SyncRangeMin { + t.Fatalf("expected zero elapsed progress=%d, got=%d", SyncRangeMin, progress) + } +} + +func TestProgress_VeryLargeElapsedStaysInRange(t *testing.T) { + now := time.Now() + p := NewProgress() + + progress := p.SyncProgress(Record{ + OperationUID: types.UID("vmop"), + Now: now, + StartedAt: now.Add(-24 * time.Hour), + PreviousProgress: 10, + Phase: virtv1.MigrationRunning, + HasIteration: true, + Iteration: 5, + DataTotalMiB: 1024, + DataRemainingMiB: 10, + }) + + if progress < SyncRangeMin || progress > SyncRangeMax { + t.Fatalf("expected progress in range [%d,%d], got=%d", SyncRangeMin, SyncRangeMax, progress) + } +} + +func TestIsIterative(t *testing.T) { + tests := []struct { + name string + record Record + expected bool + }{ + { + name: "iteration implies iterative", + record: Record{HasIteration: true, Iteration: 1}, + expected: true, + }, + { + name: "post copy without iteration is not iterative", + record: Record{Mode: virtv1.MigrationPostCopy}, + expected: false, + }, + { + name: "paused without iteration is not iterative", + record: Record{Mode: virtv1.MigrationPaused}, + expected: false, + }, + { + name: "pre-copy without iteration is not iterative", + record: Record{Mode: virtv1.MigrationPreCopy}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isIterative(tt.record); got != tt.expected { + t.Fatalf("isIterative() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestForget_RemovesState(t *testing.T) { + p := NewProgress() + uid := types.UID("vmop") + p.store.Store(uid, State{Progress: 55}) + + p.Forget(uid) + + if p.store.Len() != 0 { + t.Fatalf("expected empty store after forget, got=%d", p.store.Len()) + } +} + +func TestProgress_SmoothGrowthOverMultipleSyncs(t *testing.T) { + now := time.Now() + p := NewProgress() + uid := types.UID("vmop") + start := now.Add(-10 * time.Second) + + var values []int32 + prev := SyncRangeMin + totalMiB := 1024.0 + remaining := 900.0 + + for i := 0; i < 40; i++ { + tick := now.Add(time.Duration(i*3) * time.Second) + remaining = math.Max(10, remaining-25) + processed := totalMiB - remaining + + iter := uint32(0) + hasIter := false + if i >= 5 { + iter = uint32(i - 4) + hasIter = true + } + + progress := p.SyncProgress(Record{ + OperationUID: uid, + Now: tick, + StartedAt: start, + PreviousProgress: prev, + Phase: virtv1.MigrationRunning, + HasIteration: hasIter, + Iteration: iter, + DataTotalMiB: totalMiB, + DataProcessedMiB: processed, + DataRemainingMiB: remaining, + }) + + values = append(values, progress) + prev = progress + } + + for i := 1; i < len(values); i++ { + if values[i] < values[i-1] { + t.Fatalf("progress decreased at step %d: %d -> %d", i, values[i-1], values[i]) + } + } + + maxJump := int32(0) + for i := 1; i < len(values); i++ { + jump := values[i] - values[i-1] + if jump > maxJump { + maxJump = jump + } + } + if maxJump > 15 { + t.Fatalf("max single-step jump too large: %d (values: %v)", maxJump, values) + } +} + +func TestIterativeMetricRatio(t *testing.T) { + tests := []struct { + name string + state State + wantLow float64 + wantHigh float64 + }{ + { + name: "initial remaining equals threshold", + state: State{InitialRemaining: 50, SmoothedRemaining: 50, Threshold: 50}, + wantLow: 0.99, + wantHigh: 1.01, + }, + { + name: "smoothed at initial", + state: State{InitialRemaining: 1000, SmoothedRemaining: 1000, Threshold: 50}, + wantLow: -0.01, + wantHigh: 0.01, + }, + { + name: "smoothed at threshold", + state: State{InitialRemaining: 1000, SmoothedRemaining: 50, Threshold: 50}, + wantLow: 0.99, + wantHigh: 1.01, + }, + { + name: "smoothed halfway log scale", + state: State{InitialRemaining: 1000, SmoothedRemaining: 200, Threshold: 50}, + wantLow: 0.3, + wantHigh: 0.7, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ratio := iterativeMetricRatio(&tt.state) + if ratio < tt.wantLow || ratio > tt.wantHigh { + t.Fatalf("iterativeMetricRatio() = %v, want in [%v, %v]", ratio, tt.wantLow, tt.wantHigh) + } + }) + } +} + +func TestMaxRemaining(t *testing.T) { + tests := []struct { + name string + record Record + want float64 + }{ + { + name: "direct remaining", + record: Record{DataRemainingMiB: 100}, + want: 100, + }, + { + name: "computed from total minus processed", + record: Record{DataTotalMiB: 200, DataProcessedMiB: 150}, + want: 50, + }, + { + name: "no data", + record: Record{DataTotalMiB: unknownMetric, DataProcessedMiB: unknownMetric, DataRemainingMiB: unknownMetric}, + want: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := maxRemaining(tt.record) + if !almostEqual(got, tt.want) { + t.Fatalf("maxRemaining() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestObserveRemaining_EMA(t *testing.T) { + state := State{SmoothedRemaining: 100} + + observeRemaining(Record{DataRemainingMiB: 80}, &state) + if state.SmoothedRemaining >= 100 || state.SmoothedRemaining <= 80 { + t.Fatalf("expected EMA to move smoothed remaining between 80 and 100, got=%v", state.SmoothedRemaining) + } + + prev := state.SmoothedRemaining + observeRemaining(Record{DataRemainingMiB: 120}, &state) + if state.SmoothedRemaining <= prev { + t.Fatalf("expected EMA to increase smoothed remaining from %v, got=%v", prev, state.SmoothedRemaining) + } +} + +func TestProgress_AdaptiveStallWindow(t *testing.T) { + state := State{Progress: 50, SmoothedRemaining: 100, Threshold: 50} + record := Record{Throttle: 0} + + w := stallWindow(record, &state, true) + if w != iterStallSeconds { + t.Fatalf("expected base iterative stall window=%v, got=%v", iterStallSeconds, w) + } + + state.Progress = int32(iterativeCeiling) - 2 + w = stallWindow(record, &state, true) + if w != 24.0 { + t.Fatalf("expected late-stage stall window=24, got=%v", w) + } + + state.Progress = int32(iterativeCeiling) - 4 + w = stallWindow(record, &state, true) + if w != 14.0 { + t.Fatalf("expected near-end stall window=14, got=%v", w) + } +} + +func makeIterativeState(p *Progress, uid types.UID, now time.Time, minRemaining float64, minRemainingAt time.Time) { + state := State{ + Progress: SyncRangeMin, + Iterative: true, + IterativeSince: now.Add(-30 * time.Second), + InitialRemaining: 500, + SmoothedRemaining: minRemaining + 10, + Threshold: 10, + MinRemaining: minRemaining, + MinRemainingAt: minRemainingAt, + } + p.store.Store(uid, state) +} + +func TestIsNotConverging_NoAutoConverge_Stall(t *testing.T) { + p := NewProgress() + uid := types.UID("vmop-nc") + now := time.Now() + stallStart := now.Add(-15 * time.Second) + + makeIterativeState(p, uid, now, 100.0, stallStart) + + record := Record{ + OperationUID: uid, + Now: now, + HasIteration: true, + Iteration: 3, + AutoConverge: false, + DataRemainingMiB: 100, + } + + if !p.IsNotConverging(record) { + t.Fatal("expected IsNotConverging=true when AutoConverge=false, iterative, stall>10s") + } +} + +func TestIsNotConverging_AutoConverge_ThrottleNotMax(t *testing.T) { + p := NewProgress() + uid := types.UID("vmop-nc2") + now := time.Now() + stallStart := now.Add(-15 * time.Second) + + makeIterativeState(p, uid, now, 100.0, stallStart) + + record := Record{ + OperationUID: uid, + Now: now, + HasIteration: true, + Iteration: 3, + AutoConverge: true, + HasThrottle: true, + Throttle: 0.5, + DataRemainingMiB: 100, + } + + if p.IsNotConverging(record) { + t.Fatal("expected IsNotConverging=false when AutoConverge=true and throttle not at max") + } +} + +func TestIsNotConverging_AutoConverge_MaxThrottle_Stall(t *testing.T) { + p := NewProgress() + uid := types.UID("vmop-nc3") + now := time.Now() + stallStart := now.Add(-15 * time.Second) + + makeIterativeState(p, uid, now, 100.0, stallStart) + + record := Record{ + OperationUID: uid, + Now: now, + HasIteration: true, + Iteration: 3, + AutoConverge: true, + HasThrottle: true, + Throttle: 0.99, + DataRemainingMiB: 100, + } + + if !p.IsNotConverging(record) { + t.Fatal("expected IsNotConverging=true when AutoConverge=true, throttle=max, stall>10s") + } +} + +func TestIsNotConverging_RemainingDecreased(t *testing.T) { + p := NewProgress() + uid := types.UID("vmop-nc4") + now := time.Now() + + makeIterativeState(p, uid, now, 50.0, now) + + record := Record{ + OperationUID: uid, + Now: now, + HasIteration: true, + Iteration: 3, + AutoConverge: false, + DataRemainingMiB: 50, + } + + if p.IsNotConverging(record) { + t.Fatal("expected IsNotConverging=false when minRemainingAt is just now (stall < 10s)") + } +} + +func TestIsNotConverging_NotIterative(t *testing.T) { + p := NewProgress() + uid := types.UID("vmop-nc5") + now := time.Now() + stallStart := now.Add(-15 * time.Second) + + state := State{ + Progress: SyncRangeMin, + Iterative: false, + MinRemaining: 100, + MinRemainingAt: stallStart, + } + p.store.Store(uid, state) + + record := Record{ + OperationUID: uid, + Now: now, + HasIteration: false, + AutoConverge: false, + DataRemainingMiB: 100, + } + + if p.IsNotConverging(record) { + t.Fatal("expected IsNotConverging=false when not iterative") + } +} diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/store.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/store.go new file mode 100644 index 0000000000..e595a71534 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/store.go @@ -0,0 +1,81 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + "time" + + "k8s.io/apimachinery/pkg/types" + utilcache "k8s.io/apimachinery/pkg/util/cache" +) + +const ( + storeMaxSize = 1024 + storeTTL = 30 * time.Minute +) + +type State struct { + Progress int32 + Iterative bool + IterativeSince time.Time + LastUpdatedAt time.Time + LastMetricAt time.Time + LastIteration uint32 + LastProcessedMiB float64 + LastRemainingMiB float64 + InitialTotal float64 + InitialRemaining float64 + SmoothedRemaining float64 + Threshold float64 + LastIncreaseAt time.Time + MinRemaining float64 + MinRemainingAt time.Time +} + +type Store struct { + cache *utilcache.LRUExpireCache +} + +func NewStore() *Store { + return &Store{cache: utilcache.NewLRUExpireCache(storeMaxSize)} +} + +func (s *Store) Load(uid types.UID) (State, bool) { + v, ok := s.cache.Get(uid) + if !ok { + return State{}, false + } + return v.(State), true +} + +func (s *Store) Store(uid types.UID, state State) { + if uid == "" { + return + } + s.cache.Add(uid, state, storeTTL) +} + +func (s *Store) Delete(uid types.UID) { + if uid == "" { + return + } + s.cache.Remove(uid) +} + +func (s *Store) Len() int { + return len(s.cache.Keys()) +} diff --git a/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/store_test.go b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/store_test.go new file mode 100644 index 0000000000..c389cc9b94 --- /dev/null +++ b/images/virtualization-artifact/pkg/controller/vmop/migration/internal/progress/store_test.go @@ -0,0 +1,60 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + "testing" + "time" + + "k8s.io/apimachinery/pkg/types" +) + +func TestStore_LoadStoreDelete(t *testing.T) { + store := NewStore() + uid := types.UID("vmop") + state := State{ + Progress: 42, + Iterative: true, + IterativeSince: time.Unix(100, 0), + } + + store.Store(uid, state) + + loaded, ok := store.Load(uid) + if !ok { + t.Fatal("expected state to be present") + } + if loaded.Progress != 42 || !loaded.Iterative { + t.Fatalf("unexpected loaded state: %+v", loaded) + } + + store.Delete(uid) + + if _, ok := store.Load(uid); ok { + t.Fatal("expected state to be removed") + } +} + +func TestStore_IgnoresEmptyUID(t *testing.T) { + store := NewStore() + store.Store("", State{Progress: 10}) + store.Delete("") + + if store.Len() != 0 { + t.Fatalf("expected empty store, got=%d", store.Len()) + } +} diff --git a/images/virtualization-artifact/pkg/monitoring/metrics/vmop/data_metric.go b/images/virtualization-artifact/pkg/monitoring/metrics/vmop/data_metric.go index ecb1d71092..526043eeb7 100644 --- a/images/virtualization-artifact/pkg/monitoring/metrics/vmop/data_metric.go +++ b/images/virtualization-artifact/pkg/monitoring/metrics/vmop/data_metric.go @@ -36,6 +36,20 @@ type dataMetric struct { FinishedAt int64 // Unix timestamp when operation finished (Completed/Failed) (0 = not set) } +var successfulTerminalReasons = map[string]struct{}{ + string(vmopcondition.ReasonOperationCompleted): {}, + string(vmopcondition.ReasonMigrationCompleted): {}, +} + +var failedTerminalReasons = map[string]struct{}{ + string(vmopcondition.ReasonOperationFailed): {}, + string(vmopcondition.ReasonFailed): {}, + string(vmopcondition.ReasonAborted): {}, + string(vmopcondition.ReasonNotConverging): {}, + string(vmopcondition.ReasonTargetUnschedulable): {}, + string(vmopcondition.ReasonTargetDiskError): {}, +} + // DO NOT mutate VirtualMachineOperation! func newDataMetric(vmop *v1alpha2.VirtualMachineOperation) *dataMetric { if vmop == nil { @@ -51,8 +65,7 @@ func newDataMetric(vmop *v1alpha2.VirtualMachineOperation) *dataMetric { var finishedAt int64 if vmop.Status.Phase == v1alpha2.VMOPPhaseCompleted || vmop.Status.Phase == v1alpha2.VMOPPhaseFailed { completedCond, _ := conditions.GetCondition(vmopcondition.TypeCompleted, vmop.Status.Conditions) - if (completedCond.Status == metav1.ConditionTrue && completedCond.Reason == string(vmopcondition.ReasonOperationCompleted)) || - (completedCond.Status == metav1.ConditionFalse && completedCond.Reason == string(vmopcondition.ReasonOperationFailed)) { + if isTerminalCompletedCondition(completedCond) { finishedAt = completedCond.LastTransitionTime.Unix() } } @@ -69,3 +82,15 @@ func newDataMetric(vmop *v1alpha2.VirtualMachineOperation) *dataMetric { FinishedAt: finishedAt, } } + +func isTerminalCompletedCondition(cond metav1.Condition) bool { + if cond.Status == metav1.ConditionTrue { + _, ok := successfulTerminalReasons[cond.Reason] + return ok + } + if cond.Status == metav1.ConditionFalse { + _, ok := failedTerminalReasons[cond.Reason] + return ok + } + return false +} diff --git a/images/virtualization-artifact/pkg/monitoring/metrics/vmop/data_metric_test.go b/images/virtualization-artifact/pkg/monitoring/metrics/vmop/data_metric_test.go new file mode 100644 index 0000000000..fd44252a5b --- /dev/null +++ b/images/virtualization-artifact/pkg/monitoring/metrics/vmop/data_metric_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vmop + +import ( + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/deckhouse/virtualization/api/core/v1alpha2" + "github.com/deckhouse/virtualization/api/core/v1alpha2/vmopcondition" +) + +func TestIsTerminalCompletedCondition(t *testing.T) { + tests := []struct { + name string + cond metav1.Condition + want bool + }{ + { + name: "migration completed reason is terminal", + cond: metav1.Condition{Status: metav1.ConditionTrue, Reason: vmopcondition.ReasonMigrationCompleted.String()}, + want: true, + }, + { + name: "target disk error reason is terminal", + cond: metav1.Condition{Status: metav1.ConditionFalse, Reason: vmopcondition.ReasonTargetDiskError.String()}, + want: true, + }, + { + name: "aborted reason is terminal", + cond: metav1.Condition{Status: metav1.ConditionFalse, Reason: vmopcondition.ReasonAborted.String()}, + want: true, + }, + { + name: "not converging reason is terminal", + cond: metav1.Condition{Status: metav1.ConditionFalse, Reason: vmopcondition.ReasonNotConverging.String()}, + want: true, + }, + { + name: "in progress reason is not terminal", + cond: metav1.Condition{Status: metav1.ConditionFalse, Reason: vmopcondition.ReasonSyncing.String()}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isTerminalCompletedCondition(tt.cond); got != tt.want { + t.Fatalf("isTerminalCompletedCondition() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewDataMetric_SetsFinishedAtForTerminalMigrationReasons(t *testing.T) { + finishedAt := metav1.NewTime(time.Unix(1710000000, 0)) + vmop := &v1alpha2.VirtualMachineOperation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vmop-test", + Namespace: "default", + CreationTimestamp: metav1.NewTime(time.Unix(1700000000, 0)), + }, + Spec: v1alpha2.VirtualMachineOperationSpec{ + Type: v1alpha2.VMOPTypeMigrate, + VirtualMachine: "test-vm", + }, + Status: v1alpha2.VirtualMachineOperationStatus{ + Phase: v1alpha2.VMOPPhaseFailed, + Conditions: []metav1.Condition{ + { + Type: vmopcondition.TypeCompleted.String(), + Status: metav1.ConditionFalse, + Reason: vmopcondition.ReasonTargetUnschedulable.String(), + LastTransitionTime: finishedAt, + }, + }, + }, + } + + metric := newDataMetric(vmop) + if metric == nil { + t.Fatal("expected metric to be created") + } + if metric.FinishedAt != finishedAt.Unix() { + t.Fatalf("expected FinishedAt=%d, got %d", finishedAt.Unix(), metric.FinishedAt) + } +}