Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions images/dvcr-artifact/pkg/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ func (app *uploadServerApp) healthzHandler(w http.ResponseWriter, _ *http.Reques
}

func (app *uploadServerApp) validateShouldHandleRequest(w http.ResponseWriter, r *http.Request) bool {
// This method is used to signal that ingress is configured and the server can upload user data.
if r.Method == http.MethodGet {
w.WriteHeader(http.StatusOK)
return false
}

if r.Method != http.MethodPost && r.Method != http.MethodPut {
w.WriteHeader(http.StatusNotFound)
return false
Expand Down
11 changes: 11 additions & 0 deletions images/virtualization-artifact/pkg/common/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ func IsPodStarted(pod *corev1.Pod) bool {
return true
}

// IsPodReady returns true if the pod's `Ready` condition status is true; otherwise, it returns false.
func IsPodReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
return true
}
}

return false
}

// IsPodComplete returns true if a Pod is in 'Succeeded' phase, false if not.
func IsPodComplete(pod *corev1.Pod) bool {
return pod != nil && pod.Status.Phase == corev1.PodSucceeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Stat interface {
GetDVCRImageName(pod *corev1.Pod) string
GetDownloadSpeed(ownerUID types.UID, pod *corev1.Pod) *v1alpha2.StatusSpeed
GetProgress(ownerUID types.UID, pod *corev1.Pod, prevProgress string, opts ...service.GetProgressOption) string
IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool
IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error)
IsUploadStarted(ownerUID types.UID, pod *corev1.Pod) bool
CheckPod(pod *corev1.Pod) error
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func (ds UploadDataSource) Sync(ctx context.Context, cvi *v1alpha2.ClusterVirtua
return reconcile.Result{}, err
}

isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing)
if err != nil {
return reconcile.Result{}, err
}

switch {
case isDiskProvisioningFinished(condition):
log.Info("Cluster virtual image provisioning finished: clean up")
Expand Down Expand Up @@ -221,7 +226,7 @@ func (ds UploadDataSource) Sync(ctx context.Context, cvi *v1alpha2.ClusterVirtua
}

log.Info("Provisioning...", "progress", cvi.Status.Progress, "pod.phase", pod.Status.Phase)
case ds.statService.IsUploaderReady(pod, svc, ing):
case isUploaderReady:
cb.
Status(metav1.ConditionFalse).
Reason(cvicondition.WaitForUserUpload).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,29 @@ func (s StatService) IsImportStarted(ownerUID types.UID, pod *corev1.Pod) bool {
return progress.ProgressRaw() > 0
}

func (s StatService) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool {
func (s StatService) IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error) {
if pod == nil || svc == nil || ing == nil {
return false
return false, nil
}

if !podutil.IsPodReady(pod) {
return false, nil
}

ingressIsOK := ing.Annotations[annotations.AnnUploadPath] != "" || ing.Annotations[annotations.AnnUploadURLDeprecated] != ""
uploadURL, ok := ing.Annotations[annotations.AnnUploadURL]
if ok {
response, err := http.Get(uploadURL)
if err != nil {
return false, fmt.Errorf("failed to get upload server status: %w", err)
}
defer response.Body.Close()

if response.StatusCode == http.StatusOK {
return true, nil
}
}

return podutil.IsPodRunning(pod) && podutil.IsPodStarted(pod) && ingressIsOK
return false, nil
}

func (s StatService) IsUploadStarted(ownerUID types.UID, pod *corev1.Pod) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -36,6 +37,12 @@ const (
destinationAuthVol = "dvcr-secret-vol"
)

// These constants can't be imported from "images/dvcr-artifact/pkg/uploader/uploader.go" due to conflicts with the CDI version.
const (
healthzPort = 8080
healthzPath = "/healthz"
)

type Pod struct {
PodSettings *PodSettings
Settings *Settings
Expand Down Expand Up @@ -151,6 +158,15 @@ func (p *Pod) makeUploaderContainerSpec() *corev1.Container {
SecurityContext: &corev1.SecurityContext{
ReadOnlyRootFilesystem: ptr.To(true),
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: healthzPath,
Port: intstr.FromInt(healthzPort),
},
},
InitialDelaySeconds: 5,
},
}

if p.PodSettings.ResourceRequirements != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (ds UploadDataSource) Sync(ctx context.Context, vd *v1alpha2.VirtualDisk) (
vdsupplements.SetPVCName(vd, dv.Status.ClaimName)
}

isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing)
if err != nil {
return reconcile.Result{}, err
}

switch {
case IsDiskProvisioningFinished(condition):
log.Debug("Disk provisioning finished: clean up")
Expand Down Expand Up @@ -190,7 +195,7 @@ func (ds UploadDataSource) Sync(ctx context.Context, vd *v1alpha2.VirtualDisk) (
}

if !ds.statService.IsUploadStarted(vd.GetUID(), pod) {
if ds.statService.IsUploaderReady(pod, svc, ing) {
if isUploaderReady {
log.Info("Waiting for the user upload", "pod.phase", pod.Status.Phase)

vd.Status.Phase = v1alpha2.DiskWaitForUserUpload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Stat interface {
step.WaitForPodStepStat
step.ReadyContainerRegistryStepStat
IsUploadStarted(ownerUID types.UID, pod *corev1.Pod) bool
IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) bool
IsUploaderReady(pod *corev1.Pod, svc *corev1.Service, ing *netv1.Ingress) (bool, error)
GetDownloadSpeed(ownerUID types.UID, pod *corev1.Pod) *v1alpha2.StatusSpeed
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (ds UploadDataSource) StoreToPVC(ctx context.Context, vi *v1alpha2.VirtualI
return reconcile.Result{}, err
}

isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing)
if err != nil {
return reconcile.Result{}, err
}

var dvQuotaNotExceededCondition *cdiv1.DataVolumeCondition
var dvRunningCondition *cdiv1.DataVolumeCondition
if dv != nil {
Expand Down Expand Up @@ -172,7 +177,7 @@ func (ds UploadDataSource) StoreToPVC(ctx context.Context, vi *v1alpha2.VirtualI
}

if !ds.statService.IsUploadStarted(vi.GetUID(), pod) {
if ds.statService.IsUploaderReady(pod, svc, ing) {
if isUploaderReady {
log.Info("Waiting for the user upload", "pod.phase", pod.Status.Phase)

vi.Status.Phase = v1alpha2.ImageWaitForUserUpload
Expand Down Expand Up @@ -357,6 +362,11 @@ func (ds UploadDataSource) StoreToDVCR(ctx context.Context, vi *v1alpha2.Virtual
return reconcile.Result{}, err
}

isUploaderReady, err := ds.statService.IsUploaderReady(pod, svc, ing)
if err != nil {
return reconcile.Result{}, err
}

switch {
case IsImageProvisioningFinished(condition):
log.Info("Virtual image provisioning finished: clean up")
Expand Down Expand Up @@ -454,7 +464,7 @@ func (ds UploadDataSource) StoreToDVCR(ctx context.Context, vi *v1alpha2.Virtual
}

log.Info("Provisioning...", "pod.phase", pod.Status.Phase)
case ds.statService.IsUploaderReady(pod, svc, ing):
case isUploaderReady:
cb.
Status(metav1.ConditionFalse).
Reason(vicondition.WaitForUserUpload).
Expand Down
28 changes: 4 additions & 24 deletions test/e2e/blockdevice/data_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,13 @@ func exportData(f *framework.Framework, resourceType, name, outputFile string) {
OutputFile: outputFile,
Publish: needPublishOption(f),
Timeout: framework.LongTimeout,
Cleanup: true,
}
if IsNFS() {
opts.SourcePath = diskImageExportFile
}
result := f.D8Virtualization().DataExportDownload(resourceType, name, opts)
Expect(result.WasSuccess()).To(BeTrue(), "d8 data export download failed: %s", result.StdErr())
err := f.D8Virtualization().DataExportDownload(resourceType, name, opts)
Expect(err).NotTo(HaveOccurred())

DeferCleanup(func() {
err := os.Remove(outputFile)
Expand All @@ -299,20 +300,6 @@ func createUploadDisk(f *framework.Framework, name string) *v1alpha2.VirtualDisk
return vd
}

func retry(maxRetries int, fn func() error) error {
var lastErr error
for attempt := 1; attempt <= maxRetries; attempt++ {
if err := fn(); err != nil {
lastErr = err
GinkgoWriter.Printf("Attempt %d/%d failed: %v\n", attempt, maxRetries, err)
time.Sleep(time.Duration(attempt) * time.Second)
continue
}
return nil
}
return fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr)
}

func uploadFile(f *framework.Framework, vd *v1alpha2.VirtualDisk, filePath string) {
err := f.Clients.GenericClient().Get(context.Background(), crclient.ObjectKeyFromObject(vd), vd)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -328,14 +315,7 @@ func uploadFile(f *framework.Framework, vd *v1alpha2.VirtualDisk, filePath strin
}
uploadURL := vd.Status.ImageUploadURLs.External

// During the upload of a VirtualDisk of type 'Upload', there is a bug:
// when the VirtualDisk is in the 'DiskWaitForUserUpload' phase,
// nginx may not be ready yet and can return 413 or 503 errors.
// Once this bug is fixed, the retry mechanism must be removed.
const maxRetries = 5
err = retry(maxRetries, func() error {
return doUploadAttempt(httpClient, uploadURL, filePath)
})
err = doUploadAttempt(httpClient, uploadURL, filePath)
Expect(err).NotTo(HaveOccurred(), "Upload failed")
}

Expand Down
24 changes: 17 additions & 7 deletions test/e2e/internal/d8/d8.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type D8Virtualization interface {
StopVM(vmName string, opts SSHOptions) *executor.CMDResult
StartVM(vmName string, opts SSHOptions) *executor.CMDResult
RestartVM(vmName string, opts SSHOptions) *executor.CMDResult
DataExportDownload(resourceType, name string, opts DataExportOptions) *executor.CMDResult
DataExportDownload(resourceType, name string, opts DataExportOptions) error
}

type DataExportOptions struct {
Expand All @@ -71,6 +71,7 @@ type DataExportOptions struct {
SourcePath string
Timeout time.Duration
Publish bool
Cleanup bool
}

func NewD8Virtualization(conf D8VirtualizationConf) (*D8VirtualizationCMD, error) {
Expand Down Expand Up @@ -168,7 +169,7 @@ func (v D8VirtualizationCMD) RestartVM(vmName string, opts SSHOptions) *executor
return v.ExecContext(ctx, cmd)
}

func (v D8VirtualizationCMD) DataExportDownload(resourceType, name string, opts DataExportOptions) *executor.CMDResult {
func (v D8VirtualizationCMD) DataExportDownload(resourceType, name string, opts DataExportOptions) error {
timeout := LongTimeout
if opts.Timeout != 0 {
timeout = opts.Timeout
Expand All @@ -189,15 +190,24 @@ func (v D8VirtualizationCMD) DataExportDownload(resourceType, name string, opts
cmd = fmt.Sprintf("%s --publish", cmd)
}

// d8 data export download always returns exit code 0 even on errors,
// so we pipe output through grep to check for success message.
// grep -q returns non-zero exit code if pattern not found.
cmd = fmt.Sprintf(`echo y | %s 2>&1 | grep -q "All files have been downloaded"`, cmd)
if opts.Cleanup {
cmd = fmt.Sprintf("%s --cleanup", cmd)
}

cmd = fmt.Sprintf("%s 2>&1", cmd)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

return v.ExecContext(ctx, cmd)
res := v.ExecContext(ctx, cmd)
if res.Error() != nil {
return fmt.Errorf("failed to execute command `%s`: %w:\n%s", cmd, res.Error(), res.StdOut())
}

if strings.Contains(res.StdOut(), "All files have been downloaded") {
return nil
}

return fmt.Errorf("failed to export data:\n%s\n%s\n%s", cmd, res.StdOut(), res.StdErr())
}

func (v D8VirtualizationCMD) addNamespace(cmd, ns string) string {
Expand Down
Loading