From 844c66eccce40e9ba74160920df280df94473c44 Mon Sep 17 00:00:00 2001 From: Roman Sysoev Date: Wed, 1 Apr 2026 17:43:19 +0300 Subject: [PATCH] fix(vd): uploader Signed-off-by: Roman Sysoev --- images/dvcr-artifact/pkg/uploader/uploader.go | 6 ++++ .../pkg/common/pod/pod.go | 11 ++++++++ .../cvi/internal/source/interfaces.go | 2 +- .../controller/cvi/internal/source/mock.go | 6 ++-- .../controller/cvi/internal/source/upload.go | 7 ++++- .../pkg/controller/service/stat_service.go | 23 ++++++++++++--- .../pkg/controller/uploader/uploader_pod.go | 16 +++++++++++ .../controller/vd/internal/source/upload.go | 7 ++++- .../vi/internal/source/interfaces.go | 2 +- .../pkg/controller/vi/internal/source/mock.go | 6 ++-- .../controller/vi/internal/source/upload.go | 14 ++++++++-- test/e2e/blockdevice/data_exports.go | 28 +++---------------- test/e2e/internal/d8/d8.go | 24 +++++++++++----- 13 files changed, 105 insertions(+), 47 deletions(-) diff --git a/images/dvcr-artifact/pkg/uploader/uploader.go b/images/dvcr-artifact/pkg/uploader/uploader.go index e3fb281761..7d26dcb651 100644 --- a/images/dvcr-artifact/pkg/uploader/uploader.go +++ b/images/dvcr-artifact/pkg/uploader/uploader.go @@ -280,6 +280,12 @@ func (app *uploadServerApp) healthzHandler(w http.ResponseWriter, _ *http.Reques } func (app *uploadServerApp) validateShouldHandleRequest(w http.ResponseWriter, r *http.Request) bool { + // This method is used to signal that ingress is configured and the server can upload user data. + if r.Method == http.MethodGet { + w.WriteHeader(http.StatusOK) + return false + } + if r.Method != http.MethodPost && r.Method != http.MethodPut { w.WriteHeader(http.StatusNotFound) return false diff --git a/images/virtualization-artifact/pkg/common/pod/pod.go b/images/virtualization-artifact/pkg/common/pod/pod.go index f7836d4c57..a80b2416c6 100644 --- a/images/virtualization-artifact/pkg/common/pod/pod.go +++ b/images/virtualization-artifact/pkg/common/pod/pod.go @@ -103,6 +103,17 @@ func IsPodStarted(pod *corev1.Pod) bool { return true } +// IsPodReady returns true if the pod's `Ready` condition status is true; otherwise, it returns false. +func IsPodReady(pod *corev1.Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return true + } + } + + return false +} + // IsPodComplete returns true if a Pod is in 'Succeeded' phase, false if not. func IsPodComplete(pod *corev1.Pod) bool { return pod != nil && pod.Status.Phase == corev1.PodSucceeded diff --git a/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go b/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go index 8a7592cb04..3814811736 100644 --- a/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go +++ b/images/virtualization-artifact/pkg/controller/cvi/internal/source/interfaces.go @@ -66,7 +66,7 @@ type Stat interface { GetDVCRImageName(pod *corev1.Pod) string GetDownloadSpeed(ownerUID types.UID, pod *corev1.Pod) *v1alpha2.StatusSpeed GetProgress(ownerUID types.UID, pod *corev1.Pod, prevProgress string, opts ...service.GetProgressOption) string - IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool + IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) IsUploadStarted(ownerUID types.UID, pod *corev1.Pod) bool CheckPod(pod *corev1.Pod) error } diff --git a/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go b/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go index 6cd2eb625d..cd0398d657 100644 --- a/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go +++ b/images/virtualization-artifact/pkg/controller/cvi/internal/source/mock.go @@ -1141,7 +1141,7 @@ var _ Stat = &StatMock{} // IsUploadStartedFunc: func(ownerUID types.UID, pod *corev1.Pod) bool { // panic("mock out the IsUploadStarted method") // }, -// IsUploaderReadyFunc: func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool { +// IsUploaderReadyFunc: func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) { // panic("mock out the IsUploaderReady method") // }, // } @@ -1176,7 +1176,7 @@ type StatMock struct { IsUploadStartedFunc func(ownerUID types.UID, pod *corev1.Pod) bool // IsUploaderReadyFunc mocks the IsUploaderReady method. - IsUploaderReadyFunc func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool + IsUploaderReadyFunc func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) // calls tracks calls to the methods. calls struct { @@ -1528,7 +1528,7 @@ func (mock *StatMock) IsUploadStartedCalls() []struct { } // IsUploaderReady calls IsUploaderReadyFunc. -func (mock *StatMock) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool { +func (mock *StatMock) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) { if mock.IsUploaderReadyFunc == nil { panic("StatMock.IsUploaderReadyFunc: method is nil but Stat.IsUploaderReady was just called") } diff --git a/images/virtualization-artifact/pkg/controller/cvi/internal/source/upload.go b/images/virtualization-artifact/pkg/controller/cvi/internal/source/upload.go index dbbb968549..2307ccaab4 100644 --- a/images/virtualization-artifact/pkg/controller/cvi/internal/source/upload.go +++ b/images/virtualization-artifact/pkg/controller/cvi/internal/source/upload.go @@ -92,6 +92,11 @@ func (ds UploadDataSource) Sync(ctx context.Context, cvi *v1alpha2.ClusterVirtua return reconcile.Result{}, err } + isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing) + if err != nil { + return reconcile.Result{}, err + } + switch { case isDiskProvisioningFinished(condition): log.Info("Cluster virtual image provisioning finished: clean up") @@ -221,7 +226,7 @@ func (ds UploadDataSource) Sync(ctx context.Context, cvi *v1alpha2.ClusterVirtua } log.Info("Provisioning...", "progress", cvi.Status.Progress, "pod.phase", pod.Status.Phase) - case ds.statService.IsUploaderReady(pod, svc, ing): + case isUploaderReady: cb. Status(metav1.ConditionFalse). Reason(cvicondition.WaitForUserUpload). diff --git a/images/virtualization-artifact/pkg/controller/service/stat_service.go b/images/virtualization-artifact/pkg/controller/service/stat_service.go index 7974be0ab2..fa2ae1f4a3 100644 --- a/images/virtualization-artifact/pkg/controller/service/stat_service.go +++ b/images/virtualization-artifact/pkg/controller/service/stat_service.go @@ -272,14 +272,29 @@ func (s StatService) IsImportStarted(ownerUID types.UID, pod *corev1.Pod) bool { return progress.ProgressRaw() > 0 } -func (s StatService) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool { +func (s StatService) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) { if pod == nil || svc == nil || ing == nil { - return false + return false, nil + } + + if !podutil.IsPodReady(pod) { + return false, nil } - ingressIsOK := ing.Annotations[annotations.AnnUploadPath] != "" || ing.Annotations[annotations.AnnUploadURLDeprecated] != "" + uploadURL, ok := ing.Annotations[annotations.AnnUploadURL] + if ok { + response, err := http.Get(uploadURL) + if err != nil { + return false, fmt.Errorf("failed to get upload server status: %w", err) + } + defer response.Body.Close() + + if response.StatusCode == http.StatusOK { + return true, nil + } + } - return podutil.IsPodRunning(pod) && podutil.IsPodStarted(pod) && ingressIsOK + return false, nil } func (s StatService) IsUploadStarted(ownerUID types.UID, pod *corev1.Pod) bool { diff --git a/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go b/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go index 5c50ad2b21..570566ca57 100644 --- a/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go +++ b/images/virtualization-artifact/pkg/controller/uploader/uploader_pod.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -36,6 +37,12 @@ const ( destinationAuthVol = "dvcr-secret-vol" ) +// These constants can't be imported from "images/dvcr-artifact/pkg/uploader/uploader.go" due to conflicts with the CDI version. +const ( + healthzPort = 8080 + healthzPath = "/healthz" +) + type Pod struct { PodSettings *PodSettings Settings *Settings @@ -151,6 +158,15 @@ func (p *Pod) makeUploaderContainerSpec() *corev1.Container { SecurityContext: &corev1.SecurityContext{ ReadOnlyRootFilesystem: ptr.To(true), }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: healthzPath, + Port: intstr.FromInt(healthzPort), + }, + }, + InitialDelaySeconds: 5, + }, } if p.PodSettings.ResourceRequirements != nil { diff --git a/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go b/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go index 1cedbb19ad..1733e1f5c5 100644 --- a/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go +++ b/images/virtualization-artifact/pkg/controller/vd/internal/source/upload.go @@ -122,6 +122,11 @@ func (ds UploadDataSource) Sync(ctx context.Context, vd *v1alpha2.VirtualDisk) ( vdsupplements.SetPVCName(vd, dv.Status.ClaimName) } + isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing) + if err != nil { + return reconcile.Result{}, err + } + switch { case IsDiskProvisioningFinished(condition): log.Debug("Disk provisioning finished: clean up") @@ -190,7 +195,7 @@ func (ds UploadDataSource) Sync(ctx context.Context, vd *v1alpha2.VirtualDisk) ( } if !ds.statService.IsUploadStarted(vd.GetUID(), pod) { - if ds.statService.IsUploaderReady(pod, svc, ing) { + if isUploaderReady { log.Info("Waiting for the user upload", "pod.phase", pod.Status.Phase) vd.Status.Phase = v1alpha2.DiskWaitForUserUpload diff --git a/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go b/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go index fca75b057a..ab8d812cf7 100644 --- a/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go +++ b/images/virtualization-artifact/pkg/controller/vi/internal/source/interfaces.go @@ -64,7 +64,7 @@ type Stat interface { step.WaitForPodStepStat step.ReadyContainerRegistryStepStat IsUploadStarted(ownerUID types.UID, pod *corev1.Pod) bool - IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool + IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) GetDownloadSpeed(ownerUID types.UID, pod *corev1.Pod) *v1alpha2.StatusSpeed } diff --git a/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go b/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go index ca40fa45e2..867092df85 100644 --- a/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go +++ b/images/virtualization-artifact/pkg/controller/vi/internal/source/mock.go @@ -1130,7 +1130,7 @@ var _ Stat = &StatMock{} // IsUploadStartedFunc: func(ownerUID types.UID, pod *corev1.Pod) bool { // panic("mock out the IsUploadStarted method") // }, -// IsUploaderReadyFunc: func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool { +// IsUploaderReadyFunc: func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) { // panic("mock out the IsUploaderReady method") // }, // } @@ -1165,7 +1165,7 @@ type StatMock struct { IsUploadStartedFunc func(ownerUID types.UID, pod *corev1.Pod) bool // IsUploaderReadyFunc mocks the IsUploaderReady method. - IsUploaderReadyFunc func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool + IsUploaderReadyFunc func(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) // calls tracks calls to the methods. calls struct { @@ -1517,7 +1517,7 @@ func (mock *StatMock) IsUploadStartedCalls() []struct { } // IsUploaderReady calls IsUploaderReadyFunc. -func (mock *StatMock) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool { +func (mock *StatMock) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) { if mock.IsUploaderReadyFunc == nil { panic("StatMock.IsUploaderReadyFunc: method is nil but Stat.IsUploaderReady was just called") } diff --git a/images/virtualization-artifact/pkg/controller/vi/internal/source/upload.go b/images/virtualization-artifact/pkg/controller/vi/internal/source/upload.go index 9fec96370e..55e63ad779 100644 --- a/images/virtualization-artifact/pkg/controller/vi/internal/source/upload.go +++ b/images/virtualization-artifact/pkg/controller/vi/internal/source/upload.go @@ -100,6 +100,11 @@ func (ds UploadDataSource) StoreToPVC(ctx context.Context, vi *v1alpha2.VirtualI return reconcile.Result{}, err } + isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing) + if err != nil { + return reconcile.Result{}, err + } + var dvQuotaNotExceededCondition *cdiv1.DataVolumeCondition var dvRunningCondition *cdiv1.DataVolumeCondition if dv != nil { @@ -172,7 +177,7 @@ func (ds UploadDataSource) StoreToPVC(ctx context.Context, vi *v1alpha2.VirtualI } if !ds.statService.IsUploadStarted(vi.GetUID(), pod) { - if ds.statService.IsUploaderReady(pod, svc, ing) { + if isUploaderReady { log.Info("Waiting for the user upload", "pod.phase", pod.Status.Phase) vi.Status.Phase = v1alpha2.ImageWaitForUserUpload @@ -357,6 +362,11 @@ func (ds UploadDataSource) StoreToDVCR(ctx context.Context, vi *v1alpha2.Virtual return reconcile.Result{}, err } + isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing) + if err != nil { + return reconcile.Result{}, err + } + switch { case IsImageProvisioningFinished(condition): log.Info("Virtual image provisioning finished: clean up") @@ -454,7 +464,7 @@ func (ds UploadDataSource) StoreToDVCR(ctx context.Context, vi *v1alpha2.Virtual } log.Info("Provisioning...", "pod.phase", pod.Status.Phase) - case ds.statService.IsUploaderReady(pod, svc, ing): + case isUploaderReady: cb. Status(metav1.ConditionFalse). Reason(vicondition.WaitForUserUpload). diff --git a/test/e2e/blockdevice/data_exports.go b/test/e2e/blockdevice/data_exports.go index 76c45fe0c9..6c9117cea4 100644 --- a/test/e2e/blockdevice/data_exports.go +++ b/test/e2e/blockdevice/data_exports.go @@ -270,12 +270,13 @@ func exportData(f *framework.Framework, resourceType, name, outputFile string) { OutputFile: outputFile, Publish: needPublishOption(f), Timeout: framework.LongTimeout, + Cleanup: true, } if IsNFS() { opts.SourcePath = diskImageExportFile } - result := f.D8Virtualization().DataExportDownload(resourceType, name, opts) - Expect(result.WasSuccess()).To(BeTrue(), "d8 data export download failed: %s", result.StdErr()) + err := f.D8Virtualization().DataExportDownload(resourceType, name, opts) + Expect(err).NotTo(HaveOccurred()) DeferCleanup(func() { err := os.Remove(outputFile) @@ -299,20 +300,6 @@ func createUploadDisk(f *framework.Framework, name string) *v1alpha2.VirtualDisk return vd } -func retry(maxRetries int, fn func() error) error { - var lastErr error - for attempt := 1; attempt <= maxRetries; attempt++ { - if err := fn(); err != nil { - lastErr = err - GinkgoWriter.Printf("Attempt %d/%d failed: %v\n", attempt, maxRetries, err) - time.Sleep(time.Duration(attempt) * time.Second) - continue - } - return nil - } - return fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr) -} - func uploadFile(f *framework.Framework, vd *v1alpha2.VirtualDisk, filePath string) { err := f.Clients.GenericClient().Get(context.Background(), crclient.ObjectKeyFromObject(vd), vd) Expect(err).NotTo(HaveOccurred()) @@ -328,14 +315,7 @@ func uploadFile(f *framework.Framework, vd *v1alpha2.VirtualDisk, filePath strin } uploadURL := vd.Status.ImageUploadURLs.External - // During the upload of a VirtualDisk of type 'Upload', there is a bug: - // when the VirtualDisk is in the 'DiskWaitForUserUpload' phase, - // nginx may not be ready yet and can return 413 or 503 errors. - // Once this bug is fixed, the retry mechanism must be removed. - const maxRetries = 5 - err = retry(maxRetries, func() error { - return doUploadAttempt(httpClient, uploadURL, filePath) - }) + err = doUploadAttempt(httpClient, uploadURL, filePath) Expect(err).NotTo(HaveOccurred(), "Upload failed") } diff --git a/test/e2e/internal/d8/d8.go b/test/e2e/internal/d8/d8.go index 99865a00ea..665d741e48 100644 --- a/test/e2e/internal/d8/d8.go +++ b/test/e2e/internal/d8/d8.go @@ -60,7 +60,7 @@ type D8Virtualization interface { StopVM(vmName string, opts SSHOptions) *executor.CMDResult StartVM(vmName string, opts SSHOptions) *executor.CMDResult RestartVM(vmName string, opts SSHOptions) *executor.CMDResult - DataExportDownload(resourceType, name string, opts DataExportOptions) *executor.CMDResult + DataExportDownload(resourceType, name string, opts DataExportOptions) error } type DataExportOptions struct { @@ -71,6 +71,7 @@ type DataExportOptions struct { SourcePath string Timeout time.Duration Publish bool + Cleanup bool } func NewD8Virtualization(conf D8VirtualizationConf) (*D8VirtualizationCMD, error) { @@ -168,7 +169,7 @@ func (v D8VirtualizationCMD) RestartVM(vmName string, opts SSHOptions) *executor return v.ExecContext(ctx, cmd) } -func (v D8VirtualizationCMD) DataExportDownload(resourceType, name string, opts DataExportOptions) *executor.CMDResult { +func (v D8VirtualizationCMD) DataExportDownload(resourceType, name string, opts DataExportOptions) error { timeout := LongTimeout if opts.Timeout != 0 { timeout = opts.Timeout @@ -189,15 +190,24 @@ func (v D8VirtualizationCMD) DataExportDownload(resourceType, name string, opts cmd = fmt.Sprintf("%s --publish", cmd) } - // d8 data export download always returns exit code 0 even on errors, - // so we pipe output through grep to check for success message. - // grep -q returns non-zero exit code if pattern not found. - cmd = fmt.Sprintf(`echo y | %s 2>&1 | grep -q "All files have been downloaded"`, cmd) + if opts.Cleanup { + cmd = fmt.Sprintf("%s --cleanup", cmd) + } + cmd = fmt.Sprintf("%s 2>&1", cmd) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - return v.ExecContext(ctx, cmd) + res := v.ExecContext(ctx, cmd) + if res.Error() != nil { + return fmt.Errorf("failed to execute command `%s`: %w:\n%s", cmd, res.Error(), res.StdOut()) + } + + if strings.Contains(res.StdOut(), "All files have been downloaded") { + return nil + } + + return fmt.Errorf("failed to export data:\n%s\n%s\n%s", cmd, res.StdOut(), res.StdErr()) } func (v D8VirtualizationCMD) addNamespace(cmd, ns string) string {