diff --git a/packages/envd/internal/api/api.gen.go b/packages/envd/internal/api/api.gen.go index 512747b05f..12f18816ee 100644 --- a/packages/envd/internal/api/api.gen.go +++ b/packages/envd/internal/api/api.gen.go @@ -74,6 +74,30 @@ type Metrics struct { Ts *int64 `json:"ts,omitempty"` } +// MultipartUploadComplete defines model for MultipartUploadComplete. +type MultipartUploadComplete struct { + // Path Path to the final assembled file + Path string `json:"path"` + + // Size Total size of the assembled file in bytes + Size int64 `json:"size"` +} + +// MultipartUploadInit defines model for MultipartUploadInit. +type MultipartUploadInit struct { + // UploadId Unique identifier for the upload session + UploadId string `json:"uploadId"` +} + +// MultipartUploadPart defines model for MultipartUploadPart. +type MultipartUploadPart struct { + // PartNumber The part number that was uploaded + PartNumber int `json:"partNumber"` + + // Size Size of the uploaded part in bytes + Size int64 `json:"size"` +} + // VolumeMount Volume type VolumeMount struct { NfsTarget string `json:"nfs_target"` @@ -92,6 +116,9 @@ type SignatureExpiration = int // User defines model for User. type User = string +// Conflict defines model for Conflict. +type Conflict = Error + // FileNotFound defines model for FileNotFound. type FileNotFound = Error @@ -107,6 +134,12 @@ type InvalidUser = Error // NotEnoughDiskSpace defines model for NotEnoughDiskSpace. type NotEnoughDiskSpace = Error +// TooManyRequests defines model for TooManyRequests. +type TooManyRequests = Error + +// UploadNotFound defines model for UploadNotFound. +type UploadNotFound = Error + // UploadSuccess defines model for UploadSuccess. type UploadSuccess = []EntryInfo @@ -145,6 +178,30 @@ type PostFilesParams struct { SignatureExpiration *SignatureExpiration `form:"signature_expiration,omitempty" json:"signature_expiration,omitempty"` } +// PostFilesUploadInitJSONBody defines parameters for PostFilesUploadInit. +type PostFilesUploadInitJSONBody struct { + // PartSize Size of each part in bytes (last part may be smaller) + PartSize int64 `json:"partSize"` + + // Path Path to the file to upload + Path string `json:"path"` + + // TotalSize Total size of the file in bytes + TotalSize int64 `json:"totalSize"` +} + +// PostFilesUploadInitParams defines parameters for PostFilesUploadInit. +type PostFilesUploadInitParams struct { + // Username User used for setting the owner, or resolving relative paths. + Username *User `form:"username,omitempty" json:"username,omitempty"` +} + +// PutFilesUploadUploadIdParams defines parameters for PutFilesUploadUploadId. +type PutFilesUploadUploadIdParams struct { + // Part The part number (0-indexed) + Part int `form:"part" json:"part"` +} + // PostInitJSONBody defines parameters for PostInit. type PostInitJSONBody struct { // AccessToken Access token for secure access to envd service @@ -170,6 +227,9 @@ type PostInitJSONBody struct { // PostFilesMultipartRequestBody defines body for PostFiles for multipart/form-data ContentType. type PostFilesMultipartRequestBody PostFilesMultipartBody +// PostFilesUploadInitJSONRequestBody defines body for PostFilesUploadInit for application/json ContentType. +type PostFilesUploadInitJSONRequestBody PostFilesUploadInitJSONBody + // PostInitJSONRequestBody defines body for PostInit for application/json ContentType. type PostInitJSONRequestBody PostInitJSONBody @@ -184,6 +244,18 @@ type ServerInterface interface { // Upload a file and ensure the parent directories exist. If the file exists, it will be overwritten. // (POST /files) PostFiles(w http.ResponseWriter, r *http.Request, params PostFilesParams) + // Initialize a multipart file upload session + // (POST /files/upload/init) + PostFilesUploadInit(w http.ResponseWriter, r *http.Request, params PostFilesUploadInitParams) + // Abort a multipart file upload and clean up temporary files + // (DELETE /files/upload/{uploadId}) + DeleteFilesUploadUploadId(w http.ResponseWriter, r *http.Request, uploadId string) + // Upload a part of a multipart file upload + // (PUT /files/upload/{uploadId}) + PutFilesUploadUploadId(w http.ResponseWriter, r *http.Request, uploadId string, params PutFilesUploadUploadIdParams) + // Complete a multipart file upload and assemble the final file + // (POST /files/upload/{uploadId}/complete) + PostFilesUploadUploadIdComplete(w http.ResponseWriter, r *http.Request, uploadId string) // Check the health of the service // (GET /health) GetHealth(w http.ResponseWriter, r *http.Request) @@ -217,6 +289,30 @@ func (_ Unimplemented) PostFiles(w http.ResponseWriter, r *http.Request, params w.WriteHeader(http.StatusNotImplemented) } +// Initialize a multipart file upload session +// (POST /files/upload/init) +func (_ Unimplemented) PostFilesUploadInit(w http.ResponseWriter, r *http.Request, params PostFilesUploadInitParams) { + w.WriteHeader(http.StatusNotImplemented) +} + +// Abort a multipart file upload and clean up temporary files +// (DELETE /files/upload/{uploadId}) +func (_ Unimplemented) DeleteFilesUploadUploadId(w http.ResponseWriter, r *http.Request, uploadId string) { + w.WriteHeader(http.StatusNotImplemented) +} + +// Upload a part of a multipart file upload +// (PUT /files/upload/{uploadId}) +func (_ Unimplemented) PutFilesUploadUploadId(w http.ResponseWriter, r *http.Request, uploadId string, params PutFilesUploadUploadIdParams) { + w.WriteHeader(http.StatusNotImplemented) +} + +// Complete a multipart file upload and assemble the final file +// (POST /files/upload/{uploadId}/complete) +func (_ Unimplemented) PostFilesUploadUploadIdComplete(w http.ResponseWriter, r *http.Request, uploadId string) { + w.WriteHeader(http.StatusNotImplemented) +} + // Check the health of the service // (GET /health) func (_ Unimplemented) GetHealth(w http.ResponseWriter, r *http.Request) { @@ -378,6 +474,150 @@ func (siw *ServerInterfaceWrapper) PostFiles(w http.ResponseWriter, r *http.Requ handler.ServeHTTP(w, r) } +// PostFilesUploadInit operation middleware +func (siw *ServerInterfaceWrapper) PostFilesUploadInit(w http.ResponseWriter, r *http.Request) { + + var err error + + ctx := r.Context() + + ctx = context.WithValue(ctx, AccessTokenAuthScopes, []string{}) + + r = r.WithContext(ctx) + + // Parameter object where we will unmarshal all parameters from the context + var params PostFilesUploadInitParams + + // ------------- Optional query parameter "username" ------------- + + err = runtime.BindQueryParameter("form", true, false, "username", r.URL.Query(), ¶ms.Username) + if err != nil { + siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "username", Err: err}) + return + } + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.PostFilesUploadInit(w, r, params) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + +// DeleteFilesUploadUploadId operation middleware +func (siw *ServerInterfaceWrapper) DeleteFilesUploadUploadId(w http.ResponseWriter, r *http.Request) { + + var err error + + // ------------- Path parameter "uploadId" ------------- + var uploadId string + + err = runtime.BindStyledParameterWithOptions("simple", "uploadId", chi.URLParam(r, "uploadId"), &uploadId, runtime.BindStyledParameterOptions{ParamLocation: runtime.ParamLocationPath, Explode: false, Required: true}) + if err != nil { + siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "uploadId", Err: err}) + return + } + + ctx := r.Context() + + ctx = context.WithValue(ctx, AccessTokenAuthScopes, []string{}) + + r = r.WithContext(ctx) + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.DeleteFilesUploadUploadId(w, r, uploadId) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + +// PutFilesUploadUploadId operation middleware +func (siw *ServerInterfaceWrapper) PutFilesUploadUploadId(w http.ResponseWriter, r *http.Request) { + + var err error + + // ------------- Path parameter "uploadId" ------------- + var uploadId string + + err = runtime.BindStyledParameterWithOptions("simple", "uploadId", chi.URLParam(r, "uploadId"), &uploadId, runtime.BindStyledParameterOptions{ParamLocation: runtime.ParamLocationPath, Explode: false, Required: true}) + if err != nil { + siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "uploadId", Err: err}) + return + } + + ctx := r.Context() + + ctx = context.WithValue(ctx, AccessTokenAuthScopes, []string{}) + + r = r.WithContext(ctx) + + // Parameter object where we will unmarshal all parameters from the context + var params PutFilesUploadUploadIdParams + + // ------------- Required query parameter "part" ------------- + + if paramValue := r.URL.Query().Get("part"); paramValue != "" { + + } else { + siw.ErrorHandlerFunc(w, r, &RequiredParamError{ParamName: "part"}) + return + } + + err = runtime.BindQueryParameter("form", true, true, "part", r.URL.Query(), ¶ms.Part) + if err != nil { + siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "part", Err: err}) + return + } + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.PutFilesUploadUploadId(w, r, uploadId, params) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + +// PostFilesUploadUploadIdComplete operation middleware +func (siw *ServerInterfaceWrapper) PostFilesUploadUploadIdComplete(w http.ResponseWriter, r *http.Request) { + + var err error + + // ------------- Path parameter "uploadId" ------------- + var uploadId string + + err = runtime.BindStyledParameterWithOptions("simple", "uploadId", chi.URLParam(r, "uploadId"), &uploadId, runtime.BindStyledParameterOptions{ParamLocation: runtime.ParamLocationPath, Explode: false, Required: true}) + if err != nil { + siw.ErrorHandlerFunc(w, r, &InvalidParamFormatError{ParamName: "uploadId", Err: err}) + return + } + + ctx := r.Context() + + ctx = context.WithValue(ctx, AccessTokenAuthScopes, []string{}) + + r = r.WithContext(ctx) + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.PostFilesUploadUploadIdComplete(w, r, uploadId) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + // GetHealth operation middleware func (siw *ServerInterfaceWrapper) GetHealth(w http.ResponseWriter, r *http.Request) { @@ -554,6 +794,18 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl r.Group(func(r chi.Router) { r.Post(options.BaseURL+"/files", wrapper.PostFiles) }) + r.Group(func(r chi.Router) { + r.Post(options.BaseURL+"/files/upload/init", wrapper.PostFilesUploadInit) + }) + r.Group(func(r chi.Router) { + r.Delete(options.BaseURL+"/files/upload/{uploadId}", wrapper.DeleteFilesUploadUploadId) + }) + r.Group(func(r chi.Router) { + r.Put(options.BaseURL+"/files/upload/{uploadId}", wrapper.PutFilesUploadUploadId) + }) + r.Group(func(r chi.Router) { + r.Post(options.BaseURL+"/files/upload/{uploadId}/complete", wrapper.PostFilesUploadUploadIdComplete) + }) r.Group(func(r chi.Router) { r.Get(options.BaseURL+"/health", wrapper.GetHealth) }) diff --git a/packages/envd/internal/api/download_test.go b/packages/envd/internal/api/download_test.go index fabcb55a57..cda2a78e6e 100644 --- a/packages/envd/internal/api/download_test.go +++ b/packages/envd/internal/api/download_test.go @@ -21,6 +21,17 @@ import ( "github.com/e2b-dev/infra/packages/envd/internal/utils" ) +func newDownloadTestAPI(t *testing.T, username string) *API { + t.Helper() + logger := zerolog.Nop() + defaults := &execcontext.Defaults{ + EnvVars: utils.NewMap[string, string](), + User: username, + } + + return New(&logger, defaults, nil, false) +} + func TestGetFilesContentDisposition(t *testing.T) { t.Parallel() @@ -89,13 +100,7 @@ func TestGetFilesContentDisposition(t *testing.T) { err := os.WriteFile(tempFile, []byte("test content"), 0o644) require.NoError(t, err) - // Create test API - logger := zerolog.Nop() - defaults := &execcontext.Defaults{ - EnvVars: utils.NewMap[string, string](), - User: currentUser.Username, - } - api := New(&logger, defaults, nil, false) + api := newDownloadTestAPI(t, currentUser.Username) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -138,13 +143,7 @@ func TestGetFilesContentDispositionWithNestedPath(t *testing.T) { err = os.WriteFile(tempFile, []byte("test content"), 0o644) require.NoError(t, err) - // Create test API - logger := zerolog.Nop() - defaults := &execcontext.Defaults{ - EnvVars: utils.NewMap[string, string](), - User: currentUser.Username, - } - api := New(&logger, defaults, nil, false) + api := newDownloadTestAPI(t, currentUser.Username) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -181,13 +180,7 @@ func TestGetFiles_GzipEncoding_ExplicitIdentityOffWithRange(t *testing.T) { err = os.WriteFile(tempFile, []byte("test content"), 0o644) require.NoError(t, err) - // Create test API - logger := zerolog.Nop() - defaults := &execcontext.Defaults{ - EnvVars: utils.NewMap[string, string](), - User: currentUser.Username, - } - api := New(&logger, defaults, nil, false) + api := newDownloadTestAPI(t, currentUser.Username) // Create request and response recorder req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) @@ -223,12 +216,7 @@ func TestGetFiles_GzipDownload(t *testing.T) { err = os.WriteFile(tempFile, originalContent, 0o644) require.NoError(t, err) - logger := zerolog.Nop() - defaults := &execcontext.Defaults{ - EnvVars: utils.NewMap[string, string](), - User: currentUser.Username, - } - api := New(&logger, defaults, nil, false) + api := newDownloadTestAPI(t, currentUser.Username) req := httptest.NewRequest(http.MethodGet, "/files?path="+url.QueryEscape(tempFile), nil) req.Header.Set("Accept-Encoding", "gzip") @@ -288,12 +276,7 @@ func TestPostFiles_GzipUpload(t *testing.T) { tempDir := t.TempDir() destPath := filepath.Join(tempDir, "uploaded.txt") - logger := zerolog.Nop() - defaults := &execcontext.Defaults{ - EnvVars: utils.NewMap[string, string](), - User: currentUser.Username, - } - api := New(&logger, defaults, nil, false) + api := newDownloadTestAPI(t, currentUser.Username) req := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &gzBuf) req.Header.Set("Content-Type", mpWriter.FormDataContentType()) @@ -348,12 +331,7 @@ func TestGzipUploadThenGzipDownload(t *testing.T) { tempDir := t.TempDir() destPath := filepath.Join(tempDir, "roundtrip.txt") - logger := zerolog.Nop() - defaults := &execcontext.Defaults{ - EnvVars: utils.NewMap[string, string](), - User: currentUser.Username, - } - api := New(&logger, defaults, nil, false) + api := newDownloadTestAPI(t, currentUser.Username) uploadReq := httptest.NewRequest(http.MethodPost, "/files?path="+url.QueryEscape(destPath), &gzBuf) uploadReq.Header.Set("Content-Type", mpWriter.FormDataContentType()) diff --git a/packages/envd/internal/api/multipart_upload.go b/packages/envd/internal/api/multipart_upload.go new file mode 100644 index 0000000000..5daf6f4ee2 --- /dev/null +++ b/packages/envd/internal/api/multipart_upload.go @@ -0,0 +1,627 @@ +package api + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "os/user" + "path/filepath" + "syscall" + + "github.com/google/uuid" + + "github.com/e2b-dev/infra/packages/envd/internal/execcontext" + "github.com/e2b-dev/infra/packages/envd/internal/logs" + "github.com/e2b-dev/infra/packages/envd/internal/permissions" +) + +const ( + // maxUploadSessions limits concurrent upload sessions to prevent resource exhaustion + maxUploadSessions = 100 + // maxTotalSize limits the total upload size to 10GB + maxTotalSize = 10 * 1024 * 1024 * 1024 + // maxPartSize limits individual part size to 100MB to prevent DoS + maxPartSize = 100 * 1024 * 1024 + // maxNumParts caps the number of parts to prevent memory/CPU exhaustion. + // With totalSize=10GB and partSize=1, numParts would be ~10 billion without this. + maxNumParts = 10_000 +) + +// PostFilesUploadInit initializes a multipart upload session +func (a *API) PostFilesUploadInit(w http.ResponseWriter, r *http.Request, params PostFilesUploadInitParams) { + defer r.Body.Close() + + operationID := logs.AssignOperationID() + + // Parse the request body + var body PostFilesUploadInitJSONRequestBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("failed to decode request body") + jsonError(w, http.StatusBadRequest, fmt.Errorf("invalid request body: %w", err)) + + return + } + + if body.PartSize < 1 { + jsonError(w, http.StatusBadRequest, fmt.Errorf("partSize must be at least 1")) + + return + } + if body.TotalSize < 0 { + jsonError(w, http.StatusBadRequest, fmt.Errorf("totalSize must be non-negative")) + + return + } + if body.TotalSize > maxTotalSize { + jsonError(w, http.StatusBadRequest, fmt.Errorf("totalSize %d exceeds maximum allowed size of %d bytes", body.TotalSize, maxTotalSize)) + + return + } + if body.PartSize > maxPartSize { + jsonError(w, http.StatusBadRequest, fmt.Errorf("partSize exceeds maximum allowed size of %d bytes", maxPartSize)) + + return + } + + // Compute numParts as int64 and validate the cap before any file I/O. + // The cast to int is safe after the cap check (maxNumParts fits in any int). + var numParts64 int64 + if body.TotalSize > 0 { + numParts64 = (body.TotalSize + body.PartSize - 1) / body.PartSize + } + + if numParts64 > maxNumParts { + jsonError(w, http.StatusBadRequest, fmt.Errorf("upload would require %d parts, exceeding the maximum of %d (increase partSize)", numParts64, maxNumParts)) + + return + } + + numParts := int(numParts64) + + // Check session limit early, before any file I/O, to avoid truncating + // existing files only to reject the request due to capacity. + a.uploadsLock.RLock() + sessionCount := len(a.uploads) + a.uploadsLock.RUnlock() + + if sessionCount >= maxUploadSessions { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Int("maxSessions", maxUploadSessions).Msg("too many concurrent upload sessions") + jsonError(w, http.StatusTooManyRequests, fmt.Errorf("too many concurrent upload sessions (max %d)", maxUploadSessions)) + + return + } + + // Resolve username + username, err := execcontext.ResolveDefaultUsername(params.Username, a.defaults.User) + if err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("no user specified") + jsonError(w, http.StatusBadRequest, fmt.Errorf("error resolving username (provided=%v, default=%q): %w", params.Username, a.defaults.User, err)) + + return + } + + // Lookup user + u, err := user.Lookup(username) + if err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Str("username", username).Msg("error looking up user") + jsonError(w, http.StatusUnauthorized, fmt.Errorf("error looking up user '%s': %w", username, err)) + + return + } + + uid, gid, err := permissions.GetUserIdInts(u) + if err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error getting user ids") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error getting user ids for user %q (uid=%s, gid=%s): %w", u.Username, u.Uid, u.Gid, err)) + + return + } + + // Resolve the file path + filePath, err := permissions.ExpandAndResolve(body.Path, u, a.defaults.Workdir) + if err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error resolving path") + jsonError(w, http.StatusBadRequest, fmt.Errorf("error resolving path %q: %w", body.Path, err)) + + return + } + + // Register a placeholder session under the lock to claim the path and + // count toward the session limit, then perform file I/O outside the lock + // to avoid blocking unrelated upload operations. The session starts with + // completed=true so any concurrent access (Put/Complete/Delete) is safely + // rejected until initialization finishes. + uploadID := uuid.NewString() + tempPath := filePath + ".upload." + uploadID + + session := &multipartUploadSession{ + UploadID: uploadID, + FilePath: filePath, + TempPath: tempPath, + TotalSize: body.TotalSize, + PartSize: body.PartSize, + NumParts: numParts, + UID: uid, + GID: gid, + } + session.completed.Store(true) // Block access until initialization finishes + + a.uploadsLock.Lock() + if len(a.uploads) >= maxUploadSessions { + a.uploadsLock.Unlock() + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Int("maxSessions", maxUploadSessions).Msg("too many concurrent upload sessions") + jsonError(w, http.StatusTooManyRequests, fmt.Errorf("too many concurrent upload sessions (max %d)", maxUploadSessions)) + + return + } + for _, existing := range a.uploads { + if existing.FilePath == filePath { + a.uploadsLock.Unlock() + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("filePath", filePath).Msg("destination path already has an active upload") + jsonError(w, http.StatusConflict, fmt.Errorf("destination path %q already has an active upload session", filePath)) + + return + } + } + a.uploads[uploadID] = session + a.uploadsLock.Unlock() + + // removeSession unregisters the placeholder on file I/O failure. + removeSession := func() { + a.uploadsLock.Lock() + delete(a.uploads, uploadID) + a.uploadsLock.Unlock() + } + + // Ensure parent directories exist after the authoritative session-limit + // check to avoid creating directories for requests that will be rejected. + if err := permissions.EnsureDirs(filepath.Dir(filePath), uid, gid); err != nil { + removeSession() + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error ensuring directories") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error ensuring directories for %q: %w", filepath.Dir(filePath), err)) + + return + } + + // Create and preallocate a temporary file outside the lock; the final + // path is untouched until complete atomically renames the temp file + // into place. The temp path is unique per upload ID so no other + // operation can conflict. + destFile, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o666) + if err != nil { + removeSession() + if errors.Is(err, syscall.ENOSPC) { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("not enough disk space") + jsonError(w, http.StatusInsufficientStorage, fmt.Errorf("not enough disk space")) + + return + } + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error creating temp file") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error creating temp file: %w", err)) + + return + } + + // Preallocate the file to the total size (creates sparse file) + if body.TotalSize > 0 { + if err := destFile.Truncate(body.TotalSize); err != nil { + destFile.Close() + if rmErr := ignoreNotExist(os.Remove(tempPath)); rmErr != nil { + a.logger.Warn().Err(rmErr).Str(string(logs.OperationIDKey), operationID).Msg("failed to remove temp file after truncate error") + } + removeSession() + if errors.Is(err, syscall.ENOSPC) { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("not enough disk space") + jsonError(w, http.StatusInsufficientStorage, fmt.Errorf("not enough disk space")) + + return + } + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error preallocating file") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error preallocating file: %w", err)) + + return + } + } + + // Set ownership on the temp file + if err := os.Chown(tempPath, uid, gid); err != nil { + destFile.Close() + if rmErr := ignoreNotExist(os.Remove(tempPath)); rmErr != nil { + a.logger.Warn().Err(rmErr).Str(string(logs.OperationIDKey), operationID).Msg("failed to remove temp file after chown error") + } + removeSession() + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error changing file ownership") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error changing file ownership: %w", err)) + + return + } + + // Initialization complete — set the file handle and parts map under + // session.mu, then clear the completed flag. The mutex ensures that + // any goroutine that later acquires session.mu and observes + // completed==false is guaranteed to see DestFile and Parts. + session.mu.Lock() + session.DestFile = destFile + session.Parts = make(map[int]partStatus) + session.completed.Store(false) + session.mu.Unlock() + + a.logger.Debug(). + Str(string(logs.OperationIDKey), operationID). + Str("uploadId", uploadID). + Str("filePath", filePath). + Str("tempPath", tempPath). + Int64("totalSize", body.TotalSize). + Int64("partSize", body.PartSize). + Int("numParts", numParts). + Msg("multipart upload initialized") + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(MultipartUploadInit{ + UploadId: uploadID, + }); err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("failed to encode response") + // Client never received the uploadId, so clean up to avoid a permanent leak. + // Set completed under session.mu to synchronize with part uploads that + // check completed and call wg.Add under the same lock. + session.mu.Lock() + session.completed.Store(true) + session.mu.Unlock() + removeSession() + // Wait for any in-flight part writes before closing the file descriptor. + session.wg.Wait() + destFile.Close() + if rmErr := ignoreNotExist(os.Remove(tempPath)); rmErr != nil { + a.logger.Warn().Err(rmErr).Str(string(logs.OperationIDKey), operationID).Msg("failed to remove temp file after response encoding error") + } + } +} + +// PutFilesUploadUploadId uploads a part of a multipart upload directly to the destination file +func (a *API) PutFilesUploadUploadId(w http.ResponseWriter, r *http.Request, uploadId string, params PutFilesUploadUploadIdParams) { + defer r.Body.Close() + + operationID := logs.AssignOperationID() + + // Get the session + a.uploadsLock.RLock() + session, exists := a.uploads[uploadId] + a.uploadsLock.RUnlock() + + if !exists { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("upload session not found") + jsonError(w, http.StatusNotFound, fmt.Errorf("upload session not found: %s", uploadId)) + + return + } + + // Fast-path: reject early if session is already completing (authoritative check under session.mu below) + if session.completed.Load() { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("upload session is already completing") + jsonError(w, http.StatusConflict, fmt.Errorf("upload session %s is already completing or aborted", uploadId)) + + return + } + + // Validate part number is non-negative + if params.Part < 0 { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Int("part", params.Part).Msg("negative part number") + jsonError(w, http.StatusBadRequest, fmt.Errorf("part number must be non-negative, got %d", params.Part)) + + return + } + + // Reject parts for empty files (no parts expected) + if session.NumParts == 0 { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Int("partNumber", params.Part).Msg("upload has no parts (empty file)") + jsonError(w, http.StatusBadRequest, fmt.Errorf("upload has no parts (empty file); no part uploads are accepted")) + + return + } + + // Check part number is within range + if params.Part >= session.NumParts { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Int("partNumber", params.Part).Int("numParts", session.NumParts).Msg("part number out of range") + jsonError(w, http.StatusBadRequest, fmt.Errorf("part number %d out of range (expected 0-%d)", params.Part, session.NumParts-1)) + + return + } + + // Calculate offset and expected size for this part + offset := int64(params.Part) * session.PartSize + expectedSize := session.PartSize + if params.Part == session.NumParts-1 { + // Last part may be smaller + expectedSize = session.TotalSize - offset + } + + // Reserve this part under lock to prevent concurrent writes to the same part number + // and to authoritatively check completed status (the atomic check above is a fast path). + // Also register with the WaitGroup so Complete/Delete wait for this write to finish. + session.mu.Lock() + if session.completed.Load() { + session.mu.Unlock() + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("upload session completed during part reservation") + jsonError(w, http.StatusConflict, fmt.Errorf("upload session %s is already completing or aborted", uploadId)) + + return + } + if session.Parts[params.Part] == partInProgress { + session.mu.Unlock() + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Int("partNumber", params.Part).Msg("part is already being uploaded by another request") + jsonError(w, http.StatusConflict, fmt.Errorf("part %d is already being uploaded by another request for session %s", params.Part, uploadId)) + + return + } + if session.Parts[params.Part] == partComplete { + session.mu.Unlock() + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Int("partNumber", params.Part).Msg("part was already uploaded") + jsonError(w, http.StatusConflict, fmt.Errorf("part %d was already uploaded for session %s", params.Part, uploadId)) + + return + } + session.Parts[params.Part] = partInProgress + session.wg.Add(1) // Must happen under mu while completed is false to avoid Add/Wait race + session.mu.Unlock() + + // Always signal writer completion so Complete/Delete can proceed. + // This must be the first defer (runs last) so cleanup below finishes first. + defer session.wg.Done() + + // Ensure in-progress flag is cleaned up on any early return (write errors, size mismatch, etc.) + partWritten := false + defer func() { + if !partWritten { + session.mu.Lock() + delete(session.Parts, params.Part) + session.mu.Unlock() + } + }() + + // Limit the request body to expectedSize+1 so the server does not buffer + // an arbitrarily large oversized body. The +1 allows the trailing-byte + // check below to detect excess data without triggering MaxBytesError + // during io.CopyN itself (which reads exactly expectedSize bytes). + r.Body = http.MaxBytesReader(w, r.Body, expectedSize+1) + + // Stream the part data directly to the file at offset without buffering the + // entire part in memory. OffsetWriter + CopyN uses a small internal buffer + // (~32KB) instead of reading the full part into a single allocation. + offsetWriter := io.NewOffsetWriter(session.DestFile, offset) + written, err := io.CopyN(offsetWriter, r.Body, expectedSize) + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { + if errors.Is(err, syscall.ENOSPC) { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("not enough disk space") + jsonError(w, http.StatusInsufficientStorage, fmt.Errorf("not enough disk space")) + + return + } + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error writing part data") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error writing part %d data: %w", params.Part, err)) + + return + } + + if written != expectedSize { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Int64("written", written).Int64("expectedSize", expectedSize).Msg("part size mismatch") + jsonError(w, http.StatusBadRequest, fmt.Errorf("part size %d does not match expected size %d", written, expectedSize)) + + return + } + + // Check for extra data beyond expected size + var extra [1]byte + if n, _ := r.Body.Read(extra[:]); n > 0 { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Int64("expectedSize", expectedSize).Msg("part data exceeds expected size") + jsonError(w, http.StatusBadRequest, fmt.Errorf("part data exceeds expected size %d", expectedSize)) + + return + } + + // Finalize: mark the part as complete since the data was written to disk. + // Mark partWritten so the deferred cleanup does not revert the status. + // We intentionally do not check session.completed here — the write + // succeeded and Complete's parts scan will count it, so returning 200 + // gives the client an accurate view regardless of concurrent completion. + session.mu.Lock() + session.Parts[params.Part] = partComplete + partWritten = true + session.mu.Unlock() + + a.logger.Debug(). + Str(string(logs.OperationIDKey), operationID). + Str("uploadId", uploadId). + Int("partNumber", params.Part). + Int64("size", written). + Int64("offset", offset). + Msg("part uploaded") + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(MultipartUploadPart{ + PartNumber: params.Part, + Size: written, + }); err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("failed to encode response") + } +} + +// PostFilesUploadUploadIdComplete completes a multipart upload +func (a *API) PostFilesUploadUploadIdComplete(w http.ResponseWriter, r *http.Request, uploadId string) { + defer r.Body.Close() + + operationID := logs.AssignOperationID() + + // Look up the session. + a.uploadsLock.RLock() + session, exists := a.uploads[uploadId] + a.uploadsLock.RUnlock() + + if !exists { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("upload session not found") + jsonError(w, http.StatusNotFound, fmt.Errorf("upload session not found: %s", uploadId)) + + return + } + + // Mark as completing under session.mu so the transition is synchronized + // with part reservation (which checks completed and calls wg.Add under + // the same lock). This prevents a part upload from calling wg.Add(1) + // after our wg.Wait below has already observed a zero counter. + session.mu.Lock() + if !session.completed.CompareAndSwap(false, true) { + session.mu.Unlock() + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("upload session is already completing") + jsonError(w, http.StatusConflict, fmt.Errorf("upload session %s is already completing", uploadId)) + + return + } + session.mu.Unlock() + + // Wait for all in-flight part writes to finish before checking part status. + // This prevents closing the file while io.CopyN is still writing and ensures + // parts that were mid-write when completed was set are properly accounted for. + session.wg.Wait() + + // Verify all parts were uploaded + session.mu.Lock() + var missingParts []int + for i := range session.NumParts { + if session.Parts[i] != partComplete { + missingParts = append(missingParts, i) + } + } + session.mu.Unlock() + + if len(missingParts) > 0 { + // Reset completed flag under session.mu so the client can upload missing + // parts and retry. Holding the lock prevents a concurrent Complete from + // winning the CAS (false→true) before this goroutine has returned, + // which would cause two goroutines to race on Close/Rename. + session.mu.Lock() + session.completed.Store(false) + session.mu.Unlock() + a.logger.Error(). + Str(string(logs.OperationIDKey), operationID). + Str("uploadId", uploadId). + Int("missingCount", len(missingParts)). + Msg("missing parts in upload") + jsonError(w, http.StatusBadRequest, fmt.Errorf("missing %d of %d parts", len(missingParts), session.NumParts)) + + return + } + + // All parts present — close the file and rename to the final path. + // The session stays in the map during finalization to prevent a new + // upload to the same path from starting before the rename completes. + if err := session.DestFile.Close(); err != nil { + a.uploadsLock.Lock() + delete(a.uploads, uploadId) + a.uploadsLock.Unlock() + if rmErr := ignoreNotExist(os.Remove(session.TempPath)); rmErr != nil { + a.logger.Warn().Err(rmErr).Str(string(logs.OperationIDKey), operationID).Msg("failed to remove temp file after close error") + } + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error closing temp file") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error closing temp file: %w", err)) + + return + } + + if err := os.Rename(session.TempPath, session.FilePath); err != nil { + a.uploadsLock.Lock() + delete(a.uploads, uploadId) + a.uploadsLock.Unlock() + if rmErr := ignoreNotExist(os.Remove(session.TempPath)); rmErr != nil { + a.logger.Warn().Err(rmErr).Str(string(logs.OperationIDKey), operationID).Msg("failed to remove temp file after rename error") + } + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("error renaming temp file to destination") + jsonError(w, http.StatusInternalServerError, fmt.Errorf("error renaming temp file to destination: %w", err)) + + return + } + + a.uploadsLock.Lock() + delete(a.uploads, uploadId) + a.uploadsLock.Unlock() + + a.logger.Debug(). + Str(string(logs.OperationIDKey), operationID). + Str("uploadId", uploadId). + Str("filePath", session.FilePath). + Int64("totalSize", session.TotalSize). + Int("numParts", session.NumParts). + Msg("multipart upload completed") + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(MultipartUploadComplete{ + Path: session.FilePath, + Size: session.TotalSize, + }); err != nil { + a.logger.Error().Err(err).Str(string(logs.OperationIDKey), operationID).Msg("failed to encode response") + } +} + +// DeleteFilesUploadUploadId aborts a multipart upload and cleans up +func (a *API) DeleteFilesUploadUploadId(w http.ResponseWriter, r *http.Request, uploadId string) { + defer r.Body.Close() + + operationID := logs.AssignOperationID() + + // Look up the session under a read lock, then operate on it + // independently. This avoids holding the global write lock during + // the CAS, which would block all concurrent RLock callers. + a.uploadsLock.RLock() + session, exists := a.uploads[uploadId] + a.uploadsLock.RUnlock() + + if !exists { + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("upload session not found") + jsonError(w, http.StatusNotFound, fmt.Errorf("upload session not found: %s", uploadId)) + + return + } + + // Mark as completed under session.mu to synchronize with part + // reservation (which checks completed and calls wg.Add under the + // same lock). This prevents a part upload from calling wg.Add(1) + // after our wg.Wait below has already observed a zero counter. + session.mu.Lock() + if !session.completed.CompareAndSwap(false, true) { + session.mu.Unlock() + a.logger.Error().Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("upload session is already completing") + jsonError(w, http.StatusConflict, fmt.Errorf("upload session %s is already completing or aborted", uploadId)) + + return + } + session.mu.Unlock() + + // Remove session from map under the write lock. + a.uploadsLock.Lock() + delete(a.uploads, uploadId) + a.uploadsLock.Unlock() + + // Unlink the temp file. The temp path is unique per upload ID so no + // other operation can conflict. In-flight writers use the open DestFile + // descriptor, which remains valid after unlink. + if err := ignoreNotExist(os.Remove(session.TempPath)); err != nil { + a.logger.Warn().Err(err).Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("error removing temp file") + } + + // Wait for any in-flight part writes to finish before closing the file descriptor + session.wg.Wait() + if err := session.DestFile.Close(); err != nil { + a.logger.Warn().Err(err).Str(string(logs.OperationIDKey), operationID).Str("uploadId", uploadId).Msg("error closing temp file during abort") + } + + a.logger.Debug(). + Str(string(logs.OperationIDKey), operationID). + Str("uploadId", uploadId). + Msg("multipart upload aborted") + + w.WriteHeader(http.StatusNoContent) +} diff --git a/packages/envd/internal/api/multipart_upload_test.go b/packages/envd/internal/api/multipart_upload_test.go new file mode 100644 index 0000000000..b9dc07d491 --- /dev/null +++ b/packages/envd/internal/api/multipart_upload_test.go @@ -0,0 +1,875 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/e2b-dev/infra/packages/envd/internal/execcontext" + "github.com/e2b-dev/infra/packages/envd/internal/utils" +) + +func newMultipartTestAPI(t *testing.T) *API { + t.Helper() + logger := zerolog.New(os.Stderr).Level(zerolog.Disabled) + defaults := &execcontext.Defaults{ + User: "root", + EnvVars: utils.NewMap[string, string](), + } + + return New(&logger, defaults, nil, true) +} + +func TestMultipartUpload(t *testing.T) { + t.Parallel() + + // Skip if not running as root (needed for user lookup and chown) + if os.Geteuid() != 0 { + t.Skip("skipping multipart upload tests: requires root") + } + + t.Run("init upload", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + + body := PostFilesUploadInitJSONRequestBody{ + Path: filepath.Join(tempDir, "test-file.txt"), + TotalSize: 100, + PartSize: 50, + } + bodyBytes, _ := json.Marshal(body) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + api.PostFilesUploadInit(w, req, PostFilesUploadInitParams{}) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp MultipartUploadInit + err := json.Unmarshal(w.Body.Bytes(), &resp) + require.NoError(t, err) + assert.NotEmpty(t, resp.UploadId) + + // Clean up + api.uploadsLock.Lock() + session := api.uploads[resp.UploadId] + if session != nil { + session.DestFile.Close() + os.Remove(session.TempPath) + } + delete(api.uploads, resp.UploadId) + api.uploadsLock.Unlock() + }) + + t.Run("complete multipart upload", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "assembled-file.txt") + + part0Content := []byte("Hello, ") + part1Content := []byte("World!") + totalSize := int64(len(part0Content) + len(part1Content)) + partSize := int64(len(part0Content)) + + // Initialize upload + initBody := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: totalSize, + PartSize: partSize, + } + initBodyBytes, _ := json.Marshal(initBody) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(initBodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Upload part 0 + part0Req := httptest.NewRequest(http.MethodPut, "/files/upload/"+uploadId+"?part=0", bytes.NewReader(part0Content)) + part0Req.Header.Set("Content-Type", "application/octet-stream") + part0W := httptest.NewRecorder() + + api.PutFilesUploadUploadId(part0W, part0Req, uploadId, PutFilesUploadUploadIdParams{Part: 0}) + require.Equal(t, http.StatusOK, part0W.Code) + + var part0Resp MultipartUploadPart + err = json.Unmarshal(part0W.Body.Bytes(), &part0Resp) + require.NoError(t, err) + assert.Equal(t, 0, part0Resp.PartNumber) + assert.Equal(t, int64(len(part0Content)), part0Resp.Size) + + // Upload part 1 + part1Req := httptest.NewRequest(http.MethodPut, "/files/upload/"+uploadId+"?part=1", bytes.NewReader(part1Content)) + part1Req.Header.Set("Content-Type", "application/octet-stream") + part1W := httptest.NewRecorder() + + api.PutFilesUploadUploadId(part1W, part1Req, uploadId, PutFilesUploadUploadIdParams{Part: 1}) + require.Equal(t, http.StatusOK, part1W.Code) + + // Complete upload + completeReq := httptest.NewRequest(http.MethodPost, "/files/upload/"+uploadId+"/complete", nil) + completeW := httptest.NewRecorder() + + api.PostFilesUploadUploadIdComplete(completeW, completeReq, uploadId) + require.Equal(t, http.StatusOK, completeW.Code) + + var completeResp MultipartUploadComplete + err = json.Unmarshal(completeW.Body.Bytes(), &completeResp) + require.NoError(t, err) + assert.Equal(t, destPath, completeResp.Path) + assert.Equal(t, totalSize, completeResp.Size) + + // Verify file contents + content, err := os.ReadFile(destPath) + require.NoError(t, err) + assert.Equal(t, "Hello, World!", string(content)) + }) + + t.Run("abort multipart upload", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "aborted-file.txt") + + // Initialize upload + initBody := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 100, + PartSize: 50, + } + initBodyBytes, _ := json.Marshal(initBody) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(initBodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Verify temp file was created but destination is untouched + api.uploadsLock.RLock() + session := api.uploads[uploadId] + api.uploadsLock.RUnlock() + require.NotNil(t, session) + _, err = os.Stat(session.TempPath) + require.NoError(t, err, "temp file should exist after init") + _, err = os.Stat(destPath) + assert.True(t, os.IsNotExist(err), "destination should not exist yet") + + tempPath := session.TempPath + + // Abort upload + abortReq := httptest.NewRequest(http.MethodDelete, "/files/upload/"+uploadId, nil) + abortW := httptest.NewRecorder() + + api.DeleteFilesUploadUploadId(abortW, abortReq, uploadId) + assert.Equal(t, http.StatusNoContent, abortW.Code) + + // Verify session is removed + api.uploadsLock.RLock() + _, exists := api.uploads[uploadId] + api.uploadsLock.RUnlock() + assert.False(t, exists) + + // Verify temp file is cleaned up and destination still doesn't exist + _, err = os.Stat(tempPath) + assert.True(t, os.IsNotExist(err), "temp file should be removed after abort") + _, err = os.Stat(destPath) + assert.True(t, os.IsNotExist(err), "destination should not exist after abort") + }) + + t.Run("upload part to non-existent session", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + + req := httptest.NewRequest(http.MethodPut, "/files/upload/no-such-session?part=0", bytes.NewReader([]byte("test"))) + w := httptest.NewRecorder() + + api.PutFilesUploadUploadId(w, req, "no-such-session", PutFilesUploadUploadIdParams{Part: 0}) + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("complete non-existent session", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/non-existent/complete", nil) + w := httptest.NewRecorder() + + api.PostFilesUploadUploadIdComplete(w, req, "non-existent") + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("abort non-existent session", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + + req := httptest.NewRequest(http.MethodDelete, "/files/upload/non-existent", nil) + w := httptest.NewRecorder() + + api.DeleteFilesUploadUploadId(w, req, "non-existent") + assert.Equal(t, http.StatusNotFound, w.Code) + }) + + t.Run("missing part in sequence", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "gap-file.txt") + + // Initialize upload with 3 parts + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 30, + PartSize: 10, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Upload parts 0 and 2, but skip part 1 + for _, partNum := range []int{0, 2} { + content := make([]byte, 10) + partReq := httptest.NewRequest(http.MethodPut, fmt.Sprintf("/files/upload/%s?part=%d", uploadId, partNum), bytes.NewReader(content)) + partReq.Header.Set("Content-Type", "application/octet-stream") + partW := httptest.NewRecorder() + + api.PutFilesUploadUploadId(partW, partReq, uploadId, PutFilesUploadUploadIdParams{Part: partNum}) + require.Equal(t, http.StatusOK, partW.Code) + } + + // Complete should fail due to missing part 1 + completeReq := httptest.NewRequest(http.MethodPost, "/files/upload/"+uploadId+"/complete", nil) + completeW := httptest.NewRecorder() + + api.PostFilesUploadUploadIdComplete(completeW, completeReq, uploadId) + assert.Equal(t, http.StatusBadRequest, completeW.Code) + + // Session should still exist (completed flag reset) so client can retry + api.uploadsLock.RLock() + _, exists := api.uploads[uploadId] + api.uploadsLock.RUnlock() + assert.True(t, exists, "session should still exist after failed complete") + + // Clean up + api.uploadsLock.Lock() + if s := api.uploads[uploadId]; s != nil { + s.DestFile.Close() + os.Remove(s.TempPath) + } + delete(api.uploads, uploadId) + api.uploadsLock.Unlock() + }) + + t.Run("upload part after complete started", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "race-file.txt") + + // Initialize upload + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 10, + PartSize: 10, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Upload part 0 + part0Content := make([]byte, 10) + part0Req := httptest.NewRequest(http.MethodPut, "/files/upload/"+uploadId+"?part=0", bytes.NewReader(part0Content)) + part0Req.Header.Set("Content-Type", "application/octet-stream") + part0W := httptest.NewRecorder() + + api.PutFilesUploadUploadId(part0W, part0Req, uploadId, PutFilesUploadUploadIdParams{Part: 0}) + require.Equal(t, http.StatusOK, part0W.Code) + + // Mark the session as completing + api.uploadsLock.RLock() + session := api.uploads[uploadId] + api.uploadsLock.RUnlock() + require.NotNil(t, session) + session.completed.Store(true) + + // Try to upload another part - should fail with 409 Conflict + part1Content := make([]byte, 10) + part1Req := httptest.NewRequest(http.MethodPut, "/files/upload/"+uploadId+"?part=0", bytes.NewReader(part1Content)) + part1Req.Header.Set("Content-Type", "application/octet-stream") + part1W := httptest.NewRecorder() + + api.PutFilesUploadUploadId(part1W, part1Req, uploadId, PutFilesUploadUploadIdParams{Part: 0}) + assert.Equal(t, http.StatusConflict, part1W.Code) + + // Clean up + api.uploadsLock.Lock() + delete(api.uploads, uploadId) + api.uploadsLock.Unlock() + session.DestFile.Close() + os.Remove(session.TempPath) + }) + + t.Run("max sessions limit", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + + // Create maxUploadSessions sessions + for i := range maxUploadSessions { + body := PostFilesUploadInitJSONRequestBody{ + Path: filepath.Join(tempDir, fmt.Sprintf("file-%d.txt", i)), + TotalSize: 100, + PartSize: 50, + } + bodyBytes, _ := json.Marshal(body) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + api.PostFilesUploadInit(w, req, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, w.Code, "session %d should succeed", i) + } + + // The next one should fail with 429 + body := PostFilesUploadInitJSONRequestBody{ + Path: filepath.Join(tempDir, "one-too-many.txt"), + TotalSize: 100, + PartSize: 50, + } + bodyBytes, _ := json.Marshal(body) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + api.PostFilesUploadInit(w, req, PostFilesUploadInitParams{}) + assert.Equal(t, http.StatusTooManyRequests, w.Code) + + // Clean up all sessions + api.uploadsLock.Lock() + for _, session := range api.uploads { + session.DestFile.Close() + os.Remove(session.TempPath) + } + api.uploads = make(map[string]*multipartUploadSession) + api.uploadsLock.Unlock() + }) + + t.Run("parts uploaded out of order", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "out-of-order-file.txt") + + // Initialize upload with 3 parts of 1 byte each + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 3, + PartSize: 1, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Upload parts out of order (part 2 first, then 0, then 1) + parts := []struct { + num int + content string + }{ + {2, "C"}, + {0, "A"}, + {1, "B"}, + } + + for _, part := range parts { + partReq := httptest.NewRequest(http.MethodPut, fmt.Sprintf("/files/upload/%s?part=%d", uploadId, part.num), bytes.NewReader([]byte(part.content))) + partReq.Header.Set("Content-Type", "application/octet-stream") + partW := httptest.NewRecorder() + + api.PutFilesUploadUploadId(partW, partReq, uploadId, PutFilesUploadUploadIdParams{Part: part.num}) + require.Equal(t, http.StatusOK, partW.Code) + } + + // Complete upload + completeReq := httptest.NewRequest(http.MethodPost, "/files/upload/"+uploadId+"/complete", nil) + completeW := httptest.NewRecorder() + + api.PostFilesUploadUploadIdComplete(completeW, completeReq, uploadId) + require.Equal(t, http.StatusOK, completeW.Code) + + // Verify file contents are assembled in order + content, err := os.ReadFile(destPath) + require.NoError(t, err) + assert.Equal(t, "ABC", string(content)) + }) + + t.Run("empty file upload", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "empty-file.txt") + + // Initialize upload with 0 size + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 0, + PartSize: 1024, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Complete upload (no parts needed) + completeReq := httptest.NewRequest(http.MethodPost, "/files/upload/"+uploadId+"/complete", nil) + completeW := httptest.NewRecorder() + + api.PostFilesUploadUploadIdComplete(completeW, completeReq, uploadId) + require.Equal(t, http.StatusOK, completeW.Code) + + // Verify file exists and is empty + content, err := os.ReadFile(destPath) + require.NoError(t, err) + assert.Empty(t, string(content)) + }) + + t.Run("reject too many parts", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + + // totalSize=10GB, partSize=1 would create ~10 billion parts + body := PostFilesUploadInitJSONRequestBody{ + Path: "/tmp/too-many-parts.txt", + TotalSize: maxTotalSize, + PartSize: 1, + } + bodyBytes, _ := json.Marshal(body) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + api.PostFilesUploadInit(w, req, PostFilesUploadInitParams{}) + assert.Equal(t, http.StatusBadRequest, w.Code) + + var errResp Error + err := json.Unmarshal(w.Body.Bytes(), &errResp) + require.NoError(t, err) + assert.Contains(t, errResp.Message, "parts") + }) + + t.Run("reject negative totalSize", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + + body := PostFilesUploadInitJSONRequestBody{ + Path: "/tmp/negative-size.txt", + TotalSize: -1, + PartSize: 1024, + } + bodyBytes, _ := json.Marshal(body) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + api.PostFilesUploadInit(w, req, PostFilesUploadInitParams{}) + assert.Equal(t, http.StatusBadRequest, w.Code) + + var errResp Error + err := json.Unmarshal(w.Body.Bytes(), &errResp) + require.NoError(t, err) + assert.Contains(t, errResp.Message, "non-negative") + }) + + t.Run("reject partSize zero", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + + body := PostFilesUploadInitJSONRequestBody{ + Path: "/tmp/should-not-exist.txt", + TotalSize: 100, + PartSize: 0, + } + bodyBytes, _ := json.Marshal(body) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + api.PostFilesUploadInit(w, req, PostFilesUploadInitParams{}) + assert.Equal(t, http.StatusBadRequest, w.Code) + }) + + t.Run("reject part upload on empty file", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "empty-reject.txt") + + // Initialize upload with 0 size + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 0, + PartSize: 1024, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Try to upload a part — should be rejected with clear message + partReq := httptest.NewRequest(http.MethodPut, "/files/upload/"+uploadId+"?part=0", bytes.NewReader([]byte("data"))) + partReq.Header.Set("Content-Type", "application/octet-stream") + partW := httptest.NewRecorder() + + api.PutFilesUploadUploadId(partW, partReq, uploadId, PutFilesUploadUploadIdParams{Part: 0}) + assert.Equal(t, http.StatusBadRequest, partW.Code) + + // Verify error message does not contain a huge number from uint underflow + var errResp Error + err = json.Unmarshal(partW.Body.Bytes(), &errResp) + require.NoError(t, err) + assert.Contains(t, errResp.Message, "empty file") + + // Clean up + api.uploadsLock.Lock() + session := api.uploads[uploadId] + if session != nil { + session.DestFile.Close() + os.Remove(session.TempPath) + } + delete(api.uploads, uploadId) + api.uploadsLock.Unlock() + }) + + t.Run("reject negative part number", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "neg-part.txt") + + // Initialize upload + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 10, + PartSize: 10, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Try to upload with negative part number + partReq := httptest.NewRequest(http.MethodPut, "/files/upload/"+uploadId+"?part=-1", bytes.NewReader([]byte("data"))) + partReq.Header.Set("Content-Type", "application/octet-stream") + partW := httptest.NewRecorder() + + api.PutFilesUploadUploadId(partW, partReq, uploadId, PutFilesUploadUploadIdParams{Part: -1}) + assert.Equal(t, http.StatusBadRequest, partW.Code) + + var errResp Error + err = json.Unmarshal(partW.Body.Bytes(), &errResp) + require.NoError(t, err) + assert.Contains(t, errResp.Message, "non-negative") + + // Clean up + api.uploadsLock.Lock() + session := api.uploads[uploadId] + if session != nil { + session.DestFile.Close() + os.Remove(session.TempPath) + } + delete(api.uploads, uploadId) + api.uploadsLock.Unlock() + }) + + t.Run("reject duplicate destination path", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "dup-path.txt") + + // First init should succeed + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 100, + PartSize: 50, + } + bodyBytes, _ := json.Marshal(body) + + req := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + api.PostFilesUploadInit(w, req, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, w.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(w.Body.Bytes(), &initResp) + require.NoError(t, err) + uploadId := initResp.UploadId + + // Second init with same path should be rejected with 409 + bodyBytes2, _ := json.Marshal(body) + req2 := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes2)) + req2.Header.Set("Content-Type", "application/json") + w2 := httptest.NewRecorder() + + api.PostFilesUploadInit(w2, req2, PostFilesUploadInitParams{}) + assert.Equal(t, http.StatusConflict, w2.Code) + + var errResp Error + err = json.Unmarshal(w2.Body.Bytes(), &errResp) + require.NoError(t, err) + assert.Contains(t, errResp.Message, "active upload session") + + // Clean up + api.uploadsLock.Lock() + session := api.uploads[uploadId] + if session != nil { + session.DestFile.Close() + os.Remove(session.TempPath) + } + delete(api.uploads, uploadId) + api.uploadsLock.Unlock() + }) + + t.Run("reuse path after complete", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "reuse-path.txt") + + // First upload (empty file for simplicity) + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 0, + PartSize: 1024, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + + // Complete it + completeReq := httptest.NewRequest(http.MethodPost, "/files/upload/"+initResp.UploadId+"/complete", nil) + completeW := httptest.NewRecorder() + + api.PostFilesUploadUploadIdComplete(completeW, completeReq, initResp.UploadId) + require.Equal(t, http.StatusOK, completeW.Code) + + // Second init with same path should succeed now + bodyBytes2, _ := json.Marshal(body) + initReq2 := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes2)) + initReq2.Header.Set("Content-Type", "application/json") + initW2 := httptest.NewRecorder() + + api.PostFilesUploadInit(initW2, initReq2, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW2.Code) + + var initResp2 MultipartUploadInit + err = json.Unmarshal(initW2.Body.Bytes(), &initResp2) + require.NoError(t, err) + + // Clean up + api.uploadsLock.Lock() + session := api.uploads[initResp2.UploadId] + if session != nil { + session.DestFile.Close() + os.Remove(session.TempPath) + } + delete(api.uploads, initResp2.UploadId) + api.uploadsLock.Unlock() + }) + + t.Run("reuse path after abort", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "reuse-abort.txt") + + // First upload + body := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 100, + PartSize: 50, + } + bodyBytes, _ := json.Marshal(body) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + + // Abort it + abortReq := httptest.NewRequest(http.MethodDelete, "/files/upload/"+initResp.UploadId, nil) + abortW := httptest.NewRecorder() + + api.DeleteFilesUploadUploadId(abortW, abortReq, initResp.UploadId) + require.Equal(t, http.StatusNoContent, abortW.Code) + + // Second init with same path should succeed now + bodyBytes2, _ := json.Marshal(body) + initReq2 := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(bodyBytes2)) + initReq2.Header.Set("Content-Type", "application/json") + initW2 := httptest.NewRecorder() + + api.PostFilesUploadInit(initW2, initReq2, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW2.Code) + + // Clean up + var initResp2 MultipartUploadInit + err = json.Unmarshal(initW2.Body.Bytes(), &initResp2) + require.NoError(t, err) + + api.uploadsLock.Lock() + session := api.uploads[initResp2.UploadId] + if session != nil { + session.DestFile.Close() + os.Remove(session.TempPath) + } + delete(api.uploads, initResp2.UploadId) + api.uploadsLock.Unlock() + }) + + t.Run("abort preserves original file", func(t *testing.T) { + t.Parallel() + api := newMultipartTestAPI(t) + tempDir := t.TempDir() + destPath := filepath.Join(tempDir, "existing-file.txt") + + // Create a pre-existing file at the destination + originalContent := []byte("original content") + require.NoError(t, os.WriteFile(destPath, originalContent, 0o644)) + + // Initialize upload targeting the same path + initBody := PostFilesUploadInitJSONRequestBody{ + Path: destPath, + TotalSize: 100, + PartSize: 50, + } + initBodyBytes, _ := json.Marshal(initBody) + + initReq := httptest.NewRequest(http.MethodPost, "/files/upload/init", bytes.NewReader(initBodyBytes)) + initReq.Header.Set("Content-Type", "application/json") + initW := httptest.NewRecorder() + + api.PostFilesUploadInit(initW, initReq, PostFilesUploadInitParams{}) + require.Equal(t, http.StatusOK, initW.Code) + + var initResp MultipartUploadInit + err := json.Unmarshal(initW.Body.Bytes(), &initResp) + require.NoError(t, err) + + // Abort the upload + abortReq := httptest.NewRequest(http.MethodDelete, "/files/upload/"+initResp.UploadId, nil) + abortW := httptest.NewRecorder() + + api.DeleteFilesUploadUploadId(abortW, abortReq, initResp.UploadId) + require.Equal(t, http.StatusNoContent, abortW.Code) + + // Verify original file is untouched + content, err := os.ReadFile(destPath) + require.NoError(t, err) + assert.Equal(t, string(originalContent), string(content)) + }) +} diff --git a/packages/envd/internal/api/store.go b/packages/envd/internal/api/store.go index 2524206226..ea0990027c 100644 --- a/packages/envd/internal/api/store.go +++ b/packages/envd/internal/api/store.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "net/http" + "os" "sync" + "sync/atomic" "github.com/rs/zerolog" @@ -13,6 +15,41 @@ import ( "github.com/e2b-dev/infra/packages/envd/internal/utils" ) +// partStatus represents the state of a multipart upload part. +type partStatus int + +const ( + partPending partStatus = iota // zero value: part not yet started + partInProgress // write currently in flight + partComplete // write finished successfully +) + +// multipartUploadSession tracks an in-progress multipart upload +type multipartUploadSession struct { + UploadID string + FilePath string // Final destination path + TempPath string // Temporary file path during upload (renamed to FilePath on complete) + DestFile *os.File // Open file handle for direct writes + TotalSize int64 // Total expected file size (validated >= 0 at input) + PartSize int64 // Size of each part (validated > 0 at input) + NumParts int // Total number of expected parts + UID int + GID int + Parts map[int]partStatus // partNumber -> status + completed atomic.Bool // Set to true when complete/abort starts to prevent new parts + mu sync.Mutex // Protects Parts and activeWriters + wg sync.WaitGroup // Tracks in-flight part writes; Complete/Delete wait on this before closing DestFile +} + +// ignoreNotExist returns nil if err is a "not exist" error, otherwise returns err unchanged. +func ignoreNotExist(err error) error { + if os.IsNotExist(err) { + return nil + } + + return err +} + // MMDSClient provides access to MMDS metadata. type MMDSClient interface { GetAccessTokenHash(ctx context.Context) (string, error) @@ -37,6 +74,10 @@ type API struct { lastSetTime *utils.AtomicMax initLock sync.Mutex + + // Multipart upload sessions + uploads map[string]*multipartUploadSession + uploadsLock sync.RWMutex } func New(l *zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host.MMDSOpts, isNotFC bool) *API { @@ -48,6 +89,7 @@ func New(l *zerolog.Logger, defaults *execcontext.Defaults, mmdsChan chan *host. mmdsClient: &DefaultMMDSClient{}, lastSetTime: utils.NewAtomicMax(), accessToken: &SecureToken{}, + uploads: make(map[string]*multipartUploadSession), } } diff --git a/packages/envd/spec/envd.yaml b/packages/envd/spec/envd.yaml index 83091f1ab5..77c03e25d9 100644 --- a/packages/envd/spec/envd.yaml +++ b/packages/envd/spec/envd.yaml @@ -130,6 +130,157 @@ paths: "507": $ref: "#/components/responses/NotEnoughDiskSpace" + /files/upload/init: + post: + summary: Initialize a multipart file upload session + tags: [files] + security: + - AccessTokenAuth: [] + - {} + parameters: + - $ref: "#/components/parameters/User" + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - path + - totalSize + - partSize + properties: + path: + type: string + description: Path to the file to upload + totalSize: + type: integer + format: int64 + minimum: 0 + description: Total size of the file in bytes + partSize: + type: integer + format: int64 + minimum: 1 + description: Size of each part in bytes (last part may be smaller) + responses: + "200": + description: Upload session initialized + content: + application/json: + schema: + $ref: "#/components/schemas/MultipartUploadInit" + "400": + $ref: "#/components/responses/InvalidPath" + "401": + $ref: "#/components/responses/InvalidUser" + "409": + $ref: "#/components/responses/Conflict" + "429": + $ref: "#/components/responses/TooManyRequests" + "500": + $ref: "#/components/responses/InternalServerError" + "507": + $ref: "#/components/responses/NotEnoughDiskSpace" + + /files/upload/{uploadId}: + put: + summary: Upload a part of a multipart file upload + tags: [files] + security: + - AccessTokenAuth: [] + - {} + parameters: + - name: uploadId + in: path + required: true + description: The upload session ID + schema: + type: string + - name: part + in: query + required: true + description: The part number (0-indexed) + schema: + type: integer + minimum: 0 + requestBody: + required: true + content: + application/octet-stream: + schema: + type: string + format: binary + responses: + "200": + description: Part uploaded successfully + content: + application/json: + schema: + $ref: "#/components/schemas/MultipartUploadPart" + "400": + $ref: "#/components/responses/InvalidPath" + "404": + $ref: "#/components/responses/UploadNotFound" + "409": + $ref: "#/components/responses/Conflict" + "500": + $ref: "#/components/responses/InternalServerError" + "507": + $ref: "#/components/responses/NotEnoughDiskSpace" + delete: + summary: Abort a multipart file upload and clean up temporary files + tags: [files] + security: + - AccessTokenAuth: [] + - {} + parameters: + - name: uploadId + in: path + required: true + description: The upload session ID + schema: + type: string + responses: + "204": + description: Upload aborted and cleaned up successfully + "404": + $ref: "#/components/responses/UploadNotFound" + "409": + $ref: "#/components/responses/Conflict" + "500": + $ref: "#/components/responses/InternalServerError" + + /files/upload/{uploadId}/complete: + post: + summary: Complete a multipart file upload and assemble the final file + tags: [files] + security: + - AccessTokenAuth: [] + - {} + parameters: + - name: uploadId + in: path + required: true + description: The upload session ID + schema: + type: string + responses: + "200": + description: Upload completed successfully + content: + application/json: + schema: + $ref: "#/components/schemas/MultipartUploadComplete" + "400": + $ref: "#/components/responses/InvalidPath" + "404": + $ref: "#/components/responses/UploadNotFound" + "409": + $ref: "#/components/responses/Conflict" + "500": + $ref: "#/components/responses/InternalServerError" + components: securitySchemes: AccessTokenAuth: @@ -227,6 +378,24 @@ components: application/json: schema: $ref: "#/components/schemas/Error" + Conflict: + description: Conflict with current state of the resource + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + TooManyRequests: + description: Too many concurrent upload sessions + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + UploadNotFound: + description: Upload session not found + content: + application/json: + schema: + $ref: "#/components/schemas/Error" schemas: Error: @@ -289,6 +458,40 @@ components: disk_total: type: integer description: Total disk space in bytes + MultipartUploadInit: + type: object + required: + - uploadId + properties: + uploadId: + type: string + description: Unique identifier for the upload session + MultipartUploadPart: + type: object + required: + - partNumber + - size + properties: + partNumber: + type: integer + description: The part number that was uploaded + size: + type: integer + format: int64 + description: Size of the uploaded part in bytes + MultipartUploadComplete: + type: object + required: + - path + - size + properties: + path: + type: string + description: Path to the final assembled file + size: + type: integer + format: int64 + description: Total size of the assembled file in bytes VolumeMount: type: object description: Volume diff --git a/packages/orchestrator/internal/sandbox/envd/envd.gen.go b/packages/orchestrator/internal/sandbox/envd/envd.gen.go index 73d03e999b..2b26c2f08d 100644 --- a/packages/orchestrator/internal/sandbox/envd/envd.gen.go +++ b/packages/orchestrator/internal/sandbox/envd/envd.gen.go @@ -69,6 +69,30 @@ type Metrics struct { Ts int64 `json:"ts,omitempty"` } +// MultipartUploadComplete defines model for MultipartUploadComplete. +type MultipartUploadComplete struct { + // Path Path to the final assembled file + Path string `json:"path"` + + // Size Total size of the assembled file in bytes + Size int64 `json:"size"` +} + +// MultipartUploadInit defines model for MultipartUploadInit. +type MultipartUploadInit struct { + // UploadId Unique identifier for the upload session + UploadId string `json:"uploadId"` +} + +// MultipartUploadPart defines model for MultipartUploadPart. +type MultipartUploadPart struct { + // PartNumber The part number that was uploaded + PartNumber int `json:"partNumber"` + + // Size Size of the uploaded part in bytes + Size int64 `json:"size"` +} + // VolumeMount Volume type VolumeMount struct { NfsTarget string `json:"nfs_target"` @@ -87,6 +111,9 @@ type SignatureExpiration = int // User defines model for User. type User = string +// Conflict defines model for Conflict. +type Conflict = Error + // FileNotFound defines model for FileNotFound. type FileNotFound = Error @@ -102,6 +129,12 @@ type InvalidUser = Error // NotEnoughDiskSpace defines model for NotEnoughDiskSpace. type NotEnoughDiskSpace = Error +// TooManyRequests defines model for TooManyRequests. +type TooManyRequests = Error + +// UploadNotFound defines model for UploadNotFound. +type UploadNotFound = Error + // UploadSuccess defines model for UploadSuccess. type UploadSuccess = []EntryInfo @@ -140,6 +173,30 @@ type PostFilesParams struct { SignatureExpiration SignatureExpiration `form:"signature_expiration,omitempty" json:"signature_expiration,omitempty"` } +// PostFilesUploadInitJSONBody defines parameters for PostFilesUploadInit. +type PostFilesUploadInitJSONBody struct { + // PartSize Size of each part in bytes (last part may be smaller) + PartSize int64 `json:"partSize"` + + // Path Path to the file to upload + Path string `json:"path"` + + // TotalSize Total size of the file in bytes + TotalSize int64 `json:"totalSize"` +} + +// PostFilesUploadInitParams defines parameters for PostFilesUploadInit. +type PostFilesUploadInitParams struct { + // Username User used for setting the owner, or resolving relative paths. + Username User `form:"username,omitempty" json:"username,omitempty"` +} + +// PutFilesUploadUploadIdParams defines parameters for PutFilesUploadUploadId. +type PutFilesUploadUploadIdParams struct { + // Part The part number (0-indexed) + Part int `form:"part" json:"part"` +} + // PostInitJSONBody defines parameters for PostInit. type PostInitJSONBody struct { // AccessToken Access token for secure access to envd service @@ -165,5 +222,8 @@ type PostInitJSONBody struct { // PostFilesMultipartRequestBody defines body for PostFiles for multipart/form-data ContentType. type PostFilesMultipartRequestBody PostFilesMultipartBody +// PostFilesUploadInitJSONRequestBody defines body for PostFilesUploadInit for application/json ContentType. +type PostFilesUploadInitJSONRequestBody PostFilesUploadInitJSONBody + // PostInitJSONRequestBody defines body for PostInit for application/json ContentType. type PostInitJSONRequestBody PostInitJSONBody diff --git a/tests/integration/internal/envd/generated.go b/tests/integration/internal/envd/generated.go index c0461908a3..4a769929f5 100644 --- a/tests/integration/internal/envd/generated.go +++ b/tests/integration/internal/envd/generated.go @@ -78,6 +78,30 @@ type Metrics struct { Ts *int64 `json:"ts,omitempty"` } +// MultipartUploadComplete defines model for MultipartUploadComplete. +type MultipartUploadComplete struct { + // Path Path to the final assembled file + Path string `json:"path"` + + // Size Total size of the assembled file in bytes + Size int64 `json:"size"` +} + +// MultipartUploadInit defines model for MultipartUploadInit. +type MultipartUploadInit struct { + // UploadId Unique identifier for the upload session + UploadId string `json:"uploadId"` +} + +// MultipartUploadPart defines model for MultipartUploadPart. +type MultipartUploadPart struct { + // PartNumber The part number that was uploaded + PartNumber int `json:"partNumber"` + + // Size Size of the uploaded part in bytes + Size int64 `json:"size"` +} + // VolumeMount Volume type VolumeMount struct { NfsTarget string `json:"nfs_target"` @@ -96,6 +120,9 @@ type SignatureExpiration = int // User defines model for User. type User = string +// Conflict defines model for Conflict. +type Conflict = Error + // FileNotFound defines model for FileNotFound. type FileNotFound = Error @@ -111,6 +138,12 @@ type InvalidUser = Error // NotEnoughDiskSpace defines model for NotEnoughDiskSpace. type NotEnoughDiskSpace = Error +// TooManyRequests defines model for TooManyRequests. +type TooManyRequests = Error + +// UploadNotFound defines model for UploadNotFound. +type UploadNotFound = Error + // UploadSuccess defines model for UploadSuccess. type UploadSuccess = []EntryInfo @@ -149,6 +182,30 @@ type PostFilesParams struct { SignatureExpiration *SignatureExpiration `form:"signature_expiration,omitempty" json:"signature_expiration,omitempty"` } +// PostFilesUploadInitJSONBody defines parameters for PostFilesUploadInit. +type PostFilesUploadInitJSONBody struct { + // PartSize Size of each part in bytes (last part may be smaller) + PartSize int64 `json:"partSize"` + + // Path Path to the file to upload + Path string `json:"path"` + + // TotalSize Total size of the file in bytes + TotalSize int64 `json:"totalSize"` +} + +// PostFilesUploadInitParams defines parameters for PostFilesUploadInit. +type PostFilesUploadInitParams struct { + // Username User used for setting the owner, or resolving relative paths. + Username *User `form:"username,omitempty" json:"username,omitempty"` +} + +// PutFilesUploadUploadIdParams defines parameters for PutFilesUploadUploadId. +type PutFilesUploadUploadIdParams struct { + // Part The part number (0-indexed) + Part int `form:"part" json:"part"` +} + // PostInitJSONBody defines parameters for PostInit. type PostInitJSONBody struct { // AccessToken Access token for secure access to envd service @@ -174,6 +231,9 @@ type PostInitJSONBody struct { // PostFilesMultipartRequestBody defines body for PostFiles for multipart/form-data ContentType. type PostFilesMultipartRequestBody PostFilesMultipartBody +// PostFilesUploadInitJSONRequestBody defines body for PostFilesUploadInit for application/json ContentType. +type PostFilesUploadInitJSONRequestBody PostFilesUploadInitJSONBody + // PostInitJSONRequestBody defines body for PostInit for application/json ContentType. type PostInitJSONRequestBody PostInitJSONBody @@ -259,6 +319,20 @@ type ClientInterface interface { // PostFilesWithBody request with any body PostFilesWithBody(ctx context.Context, params *PostFilesParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + // PostFilesUploadInitWithBody request with any body + PostFilesUploadInitWithBody(ctx context.Context, params *PostFilesUploadInitParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + + PostFilesUploadInit(ctx context.Context, params *PostFilesUploadInitParams, body PostFilesUploadInitJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) + + // DeleteFilesUploadUploadId request + DeleteFilesUploadUploadId(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*http.Response, error) + + // PutFilesUploadUploadIdWithBody request with any body + PutFilesUploadUploadIdWithBody(ctx context.Context, uploadId string, params *PutFilesUploadUploadIdParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + + // PostFilesUploadUploadIdComplete request + PostFilesUploadUploadIdComplete(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*http.Response, error) + // GetHealth request GetHealth(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) @@ -307,6 +381,66 @@ func (c *Client) PostFilesWithBody(ctx context.Context, params *PostFilesParams, return c.Client.Do(req) } +func (c *Client) PostFilesUploadInitWithBody(ctx context.Context, params *PostFilesUploadInitParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewPostFilesUploadInitRequestWithBody(c.Server, params, contentType, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) PostFilesUploadInit(ctx context.Context, params *PostFilesUploadInitParams, body PostFilesUploadInitJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewPostFilesUploadInitRequest(c.Server, params, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) DeleteFilesUploadUploadId(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewDeleteFilesUploadUploadIdRequest(c.Server, uploadId) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) PutFilesUploadUploadIdWithBody(ctx context.Context, uploadId string, params *PutFilesUploadUploadIdParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewPutFilesUploadUploadIdRequestWithBody(c.Server, uploadId, params, contentType, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) PostFilesUploadUploadIdComplete(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewPostFilesUploadUploadIdCompleteRequest(c.Server, uploadId) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + func (c *Client) GetHealth(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) { req, err := NewGetHealthRequest(c.Server) if err != nil { @@ -578,6 +712,190 @@ func NewPostFilesRequestWithBody(server string, params *PostFilesParams, content return req, nil } +// NewPostFilesUploadInitRequest calls the generic PostFilesUploadInit builder with application/json body +func NewPostFilesUploadInitRequest(server string, params *PostFilesUploadInitParams, body PostFilesUploadInitJSONRequestBody) (*http.Request, error) { + var bodyReader io.Reader + buf, err := json.Marshal(body) + if err != nil { + return nil, err + } + bodyReader = bytes.NewReader(buf) + return NewPostFilesUploadInitRequestWithBody(server, params, "application/json", bodyReader) +} + +// NewPostFilesUploadInitRequestWithBody generates requests for PostFilesUploadInit with any type of body +func NewPostFilesUploadInitRequestWithBody(server string, params *PostFilesUploadInitParams, contentType string, body io.Reader) (*http.Request, error) { + var err error + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/files/upload/init") + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + if params != nil { + queryValues := queryURL.Query() + + if params.Username != nil { + + if queryFrag, err := runtime.StyleParamWithLocation("form", true, "username", runtime.ParamLocationQuery, *params.Username); err != nil { + return nil, err + } else if parsed, err := url.ParseQuery(queryFrag); err != nil { + return nil, err + } else { + for k, v := range parsed { + for _, v2 := range v { + queryValues.Add(k, v2) + } + } + } + + } + + queryURL.RawQuery = queryValues.Encode() + } + + req, err := http.NewRequest("POST", queryURL.String(), body) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", contentType) + + return req, nil +} + +// NewDeleteFilesUploadUploadIdRequest generates requests for DeleteFilesUploadUploadId +func NewDeleteFilesUploadUploadIdRequest(server string, uploadId string) (*http.Request, error) { + var err error + + var pathParam0 string + + pathParam0, err = runtime.StyleParamWithLocation("simple", false, "uploadId", runtime.ParamLocationPath, uploadId) + if err != nil { + return nil, err + } + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/files/upload/%s", pathParam0) + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("DELETE", queryURL.String(), nil) + if err != nil { + return nil, err + } + + return req, nil +} + +// NewPutFilesUploadUploadIdRequestWithBody generates requests for PutFilesUploadUploadId with any type of body +func NewPutFilesUploadUploadIdRequestWithBody(server string, uploadId string, params *PutFilesUploadUploadIdParams, contentType string, body io.Reader) (*http.Request, error) { + var err error + + var pathParam0 string + + pathParam0, err = runtime.StyleParamWithLocation("simple", false, "uploadId", runtime.ParamLocationPath, uploadId) + if err != nil { + return nil, err + } + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/files/upload/%s", pathParam0) + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + if params != nil { + queryValues := queryURL.Query() + + if queryFrag, err := runtime.StyleParamWithLocation("form", true, "part", runtime.ParamLocationQuery, params.Part); err != nil { + return nil, err + } else if parsed, err := url.ParseQuery(queryFrag); err != nil { + return nil, err + } else { + for k, v := range parsed { + for _, v2 := range v { + queryValues.Add(k, v2) + } + } + } + + queryURL.RawQuery = queryValues.Encode() + } + + req, err := http.NewRequest("PUT", queryURL.String(), body) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", contentType) + + return req, nil +} + +// NewPostFilesUploadUploadIdCompleteRequest generates requests for PostFilesUploadUploadIdComplete +func NewPostFilesUploadUploadIdCompleteRequest(server string, uploadId string) (*http.Request, error) { + var err error + + var pathParam0 string + + pathParam0, err = runtime.StyleParamWithLocation("simple", false, "uploadId", runtime.ParamLocationPath, uploadId) + if err != nil { + return nil, err + } + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/files/upload/%s/complete", pathParam0) + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", queryURL.String(), nil) + if err != nil { + return nil, err + } + + return req, nil +} + // NewGetHealthRequest generates requests for GetHealth func NewGetHealthRequest(server string) (*http.Request, error) { var err error @@ -724,6 +1042,20 @@ type ClientWithResponsesInterface interface { // PostFilesWithBodyWithResponse request with any body PostFilesWithBodyWithResponse(ctx context.Context, params *PostFilesParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*PostFilesResponse, error) + // PostFilesUploadInitWithBodyWithResponse request with any body + PostFilesUploadInitWithBodyWithResponse(ctx context.Context, params *PostFilesUploadInitParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*PostFilesUploadInitResponse, error) + + PostFilesUploadInitWithResponse(ctx context.Context, params *PostFilesUploadInitParams, body PostFilesUploadInitJSONRequestBody, reqEditors ...RequestEditorFn) (*PostFilesUploadInitResponse, error) + + // DeleteFilesUploadUploadIdWithResponse request + DeleteFilesUploadUploadIdWithResponse(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*DeleteFilesUploadUploadIdResponse, error) + + // PutFilesUploadUploadIdWithBodyWithResponse request with any body + PutFilesUploadUploadIdWithBodyWithResponse(ctx context.Context, uploadId string, params *PutFilesUploadUploadIdParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*PutFilesUploadUploadIdResponse, error) + + // PostFilesUploadUploadIdCompleteWithResponse request + PostFilesUploadUploadIdCompleteWithResponse(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*PostFilesUploadUploadIdCompleteResponse, error) + // GetHealthWithResponse request GetHealthWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*GetHealthResponse, error) @@ -809,13 +1141,20 @@ func (r PostFilesResponse) StatusCode() int { return 0 } -type GetHealthResponse struct { +type PostFilesUploadInitResponse struct { Body []byte HTTPResponse *http.Response + JSON200 *MultipartUploadInit + JSON400 *InvalidPath + JSON401 *InvalidUser + JSON409 *Conflict + JSON429 *TooManyRequests + JSON500 *InternalServerError + JSON507 *NotEnoughDiskSpace } // Status returns HTTPResponse.Status -func (r GetHealthResponse) Status() string { +func (r PostFilesUploadInitResponse) Status() string { if r.HTTPResponse != nil { return r.HTTPResponse.Status } @@ -823,20 +1162,23 @@ func (r GetHealthResponse) Status() string { } // StatusCode returns HTTPResponse.StatusCode -func (r GetHealthResponse) StatusCode() int { +func (r PostFilesUploadInitResponse) StatusCode() int { if r.HTTPResponse != nil { return r.HTTPResponse.StatusCode } return 0 } -type PostInitResponse struct { +type DeleteFilesUploadUploadIdResponse struct { Body []byte HTTPResponse *http.Response + JSON404 *UploadNotFound + JSON409 *Conflict + JSON500 *InternalServerError } // Status returns HTTPResponse.Status -func (r PostInitResponse) Status() string { +func (r DeleteFilesUploadUploadIdResponse) Status() string { if r.HTTPResponse != nil { return r.HTTPResponse.Status } @@ -844,21 +1186,26 @@ func (r PostInitResponse) Status() string { } // StatusCode returns HTTPResponse.StatusCode -func (r PostInitResponse) StatusCode() int { +func (r DeleteFilesUploadUploadIdResponse) StatusCode() int { if r.HTTPResponse != nil { return r.HTTPResponse.StatusCode } return 0 } -type GetMetricsResponse struct { +type PutFilesUploadUploadIdResponse struct { Body []byte HTTPResponse *http.Response - JSON200 *Metrics + JSON200 *MultipartUploadPart + JSON400 *InvalidPath + JSON404 *UploadNotFound + JSON409 *Conflict + JSON500 *InternalServerError + JSON507 *NotEnoughDiskSpace } // Status returns HTTPResponse.Status -func (r GetMetricsResponse) Status() string { +func (r PutFilesUploadUploadIdResponse) Status() string { if r.HTTPResponse != nil { return r.HTTPResponse.Status } @@ -866,7 +1213,97 @@ func (r GetMetricsResponse) Status() string { } // StatusCode returns HTTPResponse.StatusCode -func (r GetMetricsResponse) StatusCode() int { +func (r PutFilesUploadUploadIdResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + +type PostFilesUploadUploadIdCompleteResponse struct { + Body []byte + HTTPResponse *http.Response + JSON200 *MultipartUploadComplete + JSON400 *InvalidPath + JSON404 *UploadNotFound + JSON409 *Conflict + JSON500 *InternalServerError +} + +// Status returns HTTPResponse.Status +func (r PostFilesUploadUploadIdCompleteResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r PostFilesUploadUploadIdCompleteResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + +type GetHealthResponse struct { + Body []byte + HTTPResponse *http.Response +} + +// Status returns HTTPResponse.Status +func (r GetHealthResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r GetHealthResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + +type PostInitResponse struct { + Body []byte + HTTPResponse *http.Response +} + +// Status returns HTTPResponse.Status +func (r PostInitResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r PostInitResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + +type GetMetricsResponse struct { + Body []byte + HTTPResponse *http.Response + JSON200 *Metrics +} + +// Status returns HTTPResponse.Status +func (r GetMetricsResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r GetMetricsResponse) StatusCode() int { if r.HTTPResponse != nil { return r.HTTPResponse.StatusCode } @@ -900,6 +1337,50 @@ func (c *ClientWithResponses) PostFilesWithBodyWithResponse(ctx context.Context, return ParsePostFilesResponse(rsp) } +// PostFilesUploadInitWithBodyWithResponse request with arbitrary body returning *PostFilesUploadInitResponse +func (c *ClientWithResponses) PostFilesUploadInitWithBodyWithResponse(ctx context.Context, params *PostFilesUploadInitParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*PostFilesUploadInitResponse, error) { + rsp, err := c.PostFilesUploadInitWithBody(ctx, params, contentType, body, reqEditors...) + if err != nil { + return nil, err + } + return ParsePostFilesUploadInitResponse(rsp) +} + +func (c *ClientWithResponses) PostFilesUploadInitWithResponse(ctx context.Context, params *PostFilesUploadInitParams, body PostFilesUploadInitJSONRequestBody, reqEditors ...RequestEditorFn) (*PostFilesUploadInitResponse, error) { + rsp, err := c.PostFilesUploadInit(ctx, params, body, reqEditors...) + if err != nil { + return nil, err + } + return ParsePostFilesUploadInitResponse(rsp) +} + +// DeleteFilesUploadUploadIdWithResponse request returning *DeleteFilesUploadUploadIdResponse +func (c *ClientWithResponses) DeleteFilesUploadUploadIdWithResponse(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*DeleteFilesUploadUploadIdResponse, error) { + rsp, err := c.DeleteFilesUploadUploadId(ctx, uploadId, reqEditors...) + if err != nil { + return nil, err + } + return ParseDeleteFilesUploadUploadIdResponse(rsp) +} + +// PutFilesUploadUploadIdWithBodyWithResponse request with arbitrary body returning *PutFilesUploadUploadIdResponse +func (c *ClientWithResponses) PutFilesUploadUploadIdWithBodyWithResponse(ctx context.Context, uploadId string, params *PutFilesUploadUploadIdParams, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*PutFilesUploadUploadIdResponse, error) { + rsp, err := c.PutFilesUploadUploadIdWithBody(ctx, uploadId, params, contentType, body, reqEditors...) + if err != nil { + return nil, err + } + return ParsePutFilesUploadUploadIdResponse(rsp) +} + +// PostFilesUploadUploadIdCompleteWithResponse request returning *PostFilesUploadUploadIdCompleteResponse +func (c *ClientWithResponses) PostFilesUploadUploadIdCompleteWithResponse(ctx context.Context, uploadId string, reqEditors ...RequestEditorFn) (*PostFilesUploadUploadIdCompleteResponse, error) { + rsp, err := c.PostFilesUploadUploadIdComplete(ctx, uploadId, reqEditors...) + if err != nil { + return nil, err + } + return ParsePostFilesUploadUploadIdCompleteResponse(rsp) +} + // GetHealthWithResponse request returning *GetHealthResponse func (c *ClientWithResponses) GetHealthWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*GetHealthResponse, error) { rsp, err := c.GetHealth(ctx, reqEditors...) @@ -1062,6 +1543,229 @@ func ParsePostFilesResponse(rsp *http.Response) (*PostFilesResponse, error) { return response, nil } +// ParsePostFilesUploadInitResponse parses an HTTP response from a PostFilesUploadInitWithResponse call +func ParsePostFilesUploadInitResponse(rsp *http.Response) (*PostFilesUploadInitResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &PostFilesUploadInitResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: + var dest MultipartUploadInit + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON200 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400: + var dest InvalidPath + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON400 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 401: + var dest InvalidUser + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON401 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 409: + var dest Conflict + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON409 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 429: + var dest TooManyRequests + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON429 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 500: + var dest InternalServerError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON500 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 507: + var dest NotEnoughDiskSpace + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON507 = &dest + + } + + return response, nil +} + +// ParseDeleteFilesUploadUploadIdResponse parses an HTTP response from a DeleteFilesUploadUploadIdWithResponse call +func ParseDeleteFilesUploadUploadIdResponse(rsp *http.Response) (*DeleteFilesUploadUploadIdResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &DeleteFilesUploadUploadIdResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 404: + var dest UploadNotFound + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON404 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 409: + var dest Conflict + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON409 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 500: + var dest InternalServerError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON500 = &dest + + } + + return response, nil +} + +// ParsePutFilesUploadUploadIdResponse parses an HTTP response from a PutFilesUploadUploadIdWithResponse call +func ParsePutFilesUploadUploadIdResponse(rsp *http.Response) (*PutFilesUploadUploadIdResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &PutFilesUploadUploadIdResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: + var dest MultipartUploadPart + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON200 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400: + var dest InvalidPath + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON400 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 404: + var dest UploadNotFound + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON404 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 409: + var dest Conflict + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON409 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 500: + var dest InternalServerError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON500 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 507: + var dest NotEnoughDiskSpace + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON507 = &dest + + } + + return response, nil +} + +// ParsePostFilesUploadUploadIdCompleteResponse parses an HTTP response from a PostFilesUploadUploadIdCompleteWithResponse call +func ParsePostFilesUploadUploadIdCompleteResponse(rsp *http.Response) (*PostFilesUploadUploadIdCompleteResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &PostFilesUploadUploadIdCompleteResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: + var dest MultipartUploadComplete + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON200 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400: + var dest InvalidPath + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON400 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 404: + var dest UploadNotFound + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON404 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 409: + var dest Conflict + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON409 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 500: + var dest InternalServerError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON500 = &dest + + } + + return response, nil +} + // ParseGetHealthResponse parses an HTTP response from a GetHealthWithResponse call func ParseGetHealthResponse(rsp *http.Response) (*GetHealthResponse, error) { bodyBytes, err := io.ReadAll(rsp.Body)