From adccc0f5739ac0393deec324373fda11b5d500ad Mon Sep 17 00:00:00 2001 From: folbrich Date: Sat, 9 May 2026 21:07:15 +0200 Subject: [PATCH] Retry S3 chunk validation failures and preserve the underlying error A transient short read of a chunk body from an S3-compatible store (e.g. Cloudflare R2) leaves S3Store.GetChunk with truncated data that fails to decompress. Chunk.ID() discarded that error and returned the zero ChunkID, so the failure surfaced as the misleading "chunk id does not match its hash 0000...0000", and GetChunk's --error-retry loop didn't cover it, making the transient failure fatal. - ChunkInvalid now carries an optional underlying cause (Err) and implements Unwrap(); its message reports the cause when present. - NewChunkFromStorage / NewChunkWithID capture the decode error instead of producing a bogus zero-hash mismatch, still returning ChunkInvalid so RepairableCache and 'verify --repair' keep handling such chunks. - S3Store.GetChunk retries NewChunkFromStorage failures under the existing --error-retry policy and closes the object reader before retrying. - Tests: new s3ErrCorruptBody fake-server mode plus corrupt_body_fail / corrupt_body_recover subtests, and a chunk_test.go covering ChunkInvalid.Err. Fixes #334 --- chunk.go | 12 ++-- chunk_test.go | 67 ++++++++++++++++++++ errors.go | 9 ++- s3.go | 22 ++++++- s3_test.go | 170 +++++++++++++++++++++++++++++++++++++++++++------- 5 files changed, 252 insertions(+), 28 deletions(-) create mode 100644 chunk_test.go diff --git a/chunk.go b/chunk.go index c18ca0c9..34a63e51 100644 --- a/chunk.go +++ b/chunk.go @@ -33,8 +33,10 @@ func NewChunkWithID(id ChunkID, b []byte, skipVerify bool) (*Chunk, error) { c.idCalculated = true // Pretend this was calculated. No need to re-calc later return c, nil } - sum := c.ID() - if sum != id { + if _, err := c.Data(); err != nil { + return nil, ChunkInvalid{ID: id, Err: err} + } + if sum := c.ID(); sum != id { // ID() reuses the now-cached plain data return nil, ChunkInvalid{ID: id, Sum: sum} } return c, nil @@ -49,8 +51,10 @@ func NewChunkFromStorage(id ChunkID, b []byte, modifiers Converters, skipVerify c.idCalculated = true // Pretend this was calculated. No need to re-calc later return c, nil } - sum := c.ID() - if sum != id { + if _, err := c.Data(); err != nil { // e.g. failed to decompress (truncated/corrupt storage data) + return nil, ChunkInvalid{ID: id, Err: err} + } + if sum := c.ID(); sum != id { // ID() reuses the now-cached plain data return nil, ChunkInvalid{ID: id, Sum: sum} } return c, nil diff --git a/chunk_test.go b/chunk_test.go new file mode 100644 index 00000000..c2f21afb --- /dev/null +++ b/chunk_test.go @@ -0,0 +1,67 @@ +package desync + +import ( + "errors" + "testing" +) + +func TestNewChunkFromStorage(t *testing.T) { + conv := Converters{Compressor{}} + plain := []byte("the quick brown fox jumps over the lazy dog") + id := Digest.Sum(plain) + storage, err := conv.toStorage(plain) + if err != nil { + t.Fatal(err) + } + + t.Run("valid", func(t *testing.T) { + c, err := NewChunkFromStorage(id, storage, conv, false) + if err != nil { + t.Fatal(err) + } + if c.ID() != id { + t.Fatalf("got id %s, want %s", c.ID(), id) + } + }) + + t.Run("undecodable storage data", func(t *testing.T) { + // Truncated/garbage storage data fails to decompress. This must surface as + // ChunkInvalid (so RepairableCache and 'verify --repair' still handle it) but + // must also carry the underlying decode error rather than reporting a bogus + // "does not match its hash 0000..." mismatch. + _, err := NewChunkFromStorage(id, []byte("not a valid zstd stream"), conv, false) + var ci ChunkInvalid + if !errors.As(err, &ci) { + t.Fatalf("expected ChunkInvalid, got %T: %v", err, err) + } + if ci.Err == nil { + t.Fatal("expected ChunkInvalid.Err to carry the underlying decode error") + } + if errors.Unwrap(err) == nil { + t.Fatal("expected the error to unwrap to the underlying decode error") + } + }) + + t.Run("hash mismatch", func(t *testing.T) { + var wrongID ChunkID + wrongID[0] = 0x01 + _, err := NewChunkFromStorage(wrongID, storage, conv, false) + var ci ChunkInvalid + if !errors.As(err, &ci) { + t.Fatalf("expected ChunkInvalid, got %T: %v", err, err) + } + if ci.Err != nil { + t.Fatalf("expected ChunkInvalid.Err to be nil for a plain hash mismatch, got %v", ci.Err) + } + if ci.Sum != id { + t.Fatalf("got sum %s, want %s", ci.Sum, id) + } + }) + + t.Run("skip verify", func(t *testing.T) { + // With skipVerify even undecodable data is accepted (no verification done). + if _, err := NewChunkFromStorage(id, []byte("garbage"), conv, true); err != nil { + t.Fatal(err) + } + }) +} diff --git a/errors.go b/errors.go index 45425c6f..51b15963 100644 --- a/errors.go +++ b/errors.go @@ -20,16 +20,23 @@ func (e NoSuchObject) Error() string { return fmt.Sprintf("object %s missing from store", e.location) } -// ChunkInvalid means the hash of the chunk content doesn't match its ID +// ChunkInvalid means the hash of the chunk content doesn't match its ID, or the +// chunk could not be validated at all (for example because it failed to decompress). type ChunkInvalid struct { ID ChunkID Sum ChunkID + Err error // underlying cause when validation couldn't be completed; nil for a plain hash mismatch } func (e ChunkInvalid) Error() string { + if e.Err != nil { + return fmt.Sprintf("chunk %s could not be validated: %v", e.ID.String(), e.Err) + } return fmt.Sprintf("chunk id %s does not match its hash %s", e.ID.String(), e.Sum.String()) } +func (e ChunkInvalid) Unwrap() error { return e.Err } + // InvalidFormat is returned when an error occurred when parsing an archive file type InvalidFormat struct { Msg string diff --git a/s3.go b/s3.go index f4eae34c..404b4369 100644 --- a/s3.go +++ b/s3.go @@ -12,6 +12,7 @@ import ( minio "github.com/minio/minio-go/v6" "github.com/minio/minio-go/v6/pkg/credentials" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) var _ WriteStore = S3Store{} @@ -106,6 +107,7 @@ retry: b, err := io.ReadAll(obj) if err != nil { if attempt <= s.opt.ErrorRetry { + obj.Close() time.Sleep(time.Duration(attempt) * s.opt.ErrorRetryBaseInterval) goto retry } @@ -121,7 +123,25 @@ retry: } return nil, err } - return NewChunkFromStorage(id, b, s.converters, s.opt.SkipVerify) + + // A short read of the chunk body (e.g. flaky transport/endpoint) can leave us + // with truncated data that fails to decompress or hash. Treat that the same as + // other transient errors and retry under the --error-retry policy. + chunk, err := NewChunkFromStorage(id, b, s.converters, s.opt.SkipVerify) + if err != nil { + if attempt <= s.opt.ErrorRetry { + Log.WithFields(logrus.Fields{ + "chunk": id, + "object": name, + "attempt": attempt, + }).WithError(err).Info("chunk failed validation, retrying") + obj.Close() + time.Sleep(time.Duration(attempt) * s.opt.ErrorRetryBaseInterval) + goto retry + } + return nil, err + } + return chunk, nil } // StoreChunk adds a new chunk to the store diff --git a/s3_test.go b/s3_test.go index abee656d..f940cd01 100644 --- a/s3_test.go +++ b/s3_test.go @@ -47,7 +47,16 @@ func response(request *http.Request, headers http.Header, statusCode int, body s } } -func sendObject(conn *net.TCPConn, request *http.Request, filePath string, sendRst bool) error { +// s3ErrMode describes how the fake S3 server should mishandle an object request. +type s3ErrMode int + +const ( + s3ErrNone s3ErrMode = iota // serve the object normally + s3ErrRST // send half the body, then force a TCP RST (transport error) + s3ErrCorruptBody // serve a complete, well-formed response whose body is truncated +) + +func sendObject(conn *net.TCPConn, request *http.Request, filePath string, mode s3ErrMode) error { file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { @@ -70,19 +79,10 @@ func sendObject(conn *net.TCPConn, request *http.Request, filePath string, sendR headers := http.Header{} headers.Add("Last-Modified", stat.ModTime().Format(http.TimeFormat)) headers.Add("Content-Type", "application/octet-stream") - headers.Add("Content-Length", strconv.FormatInt(stat.Size(), 10)) - if !sendRst { - resp := http.Response{ - StatusCode: 200, - ProtoMajor: 1, - ProtoMinor: 0, - Request: request, - Body: file, - Header: headers, - } - resp.Write(conn) - } else { + switch mode { + case s3ErrRST: + headers.Add("Content-Length", strconv.FormatInt(stat.Size(), 10)) if _, err := io.WriteString(conn, "HTTP/1.0 200 OK\r\n"); err != nil { return err } @@ -103,11 +103,41 @@ func sendObject(conn *net.TCPConn, request *http.Request, filePath string, sendR if err := conn.Close(); err != nil { return err } + case s3ErrCorruptBody: + // Serve a complete, well-formed HTTP response whose Content-Length matches + // the bytes actually written, but only write the first half of the file. The + // client reads it cleanly (no transport error), but the truncated chunk data + // fails to decompress/validate - the scenario reported in issue #334. + half := stat.Size() / 2 + headers.Add("Content-Length", strconv.FormatInt(half, 10)) + if _, err := io.WriteString(conn, "HTTP/1.0 200 OK\r\n"); err != nil { + return err + } + if err := headers.Write(conn); err != nil { + return err + } + if _, err := io.WriteString(conn, "\r\n"); err != nil { + return err + } + if _, err := io.CopyN(conn, file, half); err != nil { + return err + } + default: + headers.Add("Content-Length", strconv.FormatInt(stat.Size(), 10)) + resp := http.Response{ + StatusCode: 200, + ProtoMajor: 1, + ProtoMinor: 0, + Request: request, + Body: file, + Header: headers, + } + resp.Write(conn) } return nil } -func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorTimes *int, errorTimesLimit int) error { +func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorMode s3ErrMode, errorTimes *int, errorTimesLimit int) error { defer conn.Close() objectGetMatcher := regexp.MustCompile(`^/` + bucket + `/(.+)$`) @@ -119,7 +149,11 @@ func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorTimes matches := objectGetMatcher.FindStringSubmatch(request.URL.Path) if matches != nil { - err = sendObject(conn, request, store+"/"+matches[1], *errorTimes < errorTimesLimit) + mode := s3ErrNone + if *errorTimes < errorTimesLimit { + mode = errorMode + } + err = sendObject(conn, request, store+"/"+matches[1], mode) (*errorTimes)++ } else { resp := response(request, http.Header{}, 400, "") @@ -128,9 +162,10 @@ func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorTimes return err } -// Run S3 server that can respond objects from `store` -// if `errorTimesLimit` > 0 server will send RST packet `errorTimesLimit` times after sending half of file -func getTcpS3Server(t *testing.T, group *errgroup.Group, ctx context.Context, bucket, store string, errorTimesLimit int) net.Listener { +// Run S3 server that can respond objects from `store`. The first `errorTimesLimit` +// object requests are mishandled according to `errorMode` (e.g. truncated body or +// forced TCP reset); subsequent requests are served normally. +func getTcpS3Server(t *testing.T, group *errgroup.Group, ctx context.Context, bucket, store string, errorMode s3ErrMode, errorTimesLimit int) net.Listener { var errorTimes int // using localhost + resolver let us work on hosts that support only ipv6 or only ipv4 ip, err := net.DefaultResolver.LookupIP(ctx, "ip", "localhost") @@ -161,7 +196,7 @@ func getTcpS3Server(t *testing.T, group *errgroup.Group, ctx context.Context, bu } return err } - err = handleGetObjectRequest(conn, bucket, store, &errorTimes, errorTimesLimit) + err = handleGetObjectRequest(conn, bucket, store, errorMode, &errorTimes, errorTimesLimit) if err != nil { return err } @@ -184,7 +219,7 @@ func TestS3StoreGetChunk(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) group, gCtx := errgroup.WithContext(ctx) - ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", 0) + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrNone, 0) group.Go(func() error { defer cancel() @@ -224,7 +259,7 @@ func TestS3StoreGetChunk(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) group, gCtx := errgroup.WithContext(ctx) - ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", 1) + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrRST, 1) group.Go(func() error { defer cancel() @@ -262,7 +297,96 @@ func TestS3StoreGetChunk(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) group, gCtx := errgroup.WithContext(ctx) - ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", 1) + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrRST, 1) + + group.Go(func() error { + defer cancel() + endpoint := url.URL{Scheme: "s3+http", Host: ln.Addr().String(), Path: "/" + bucket + "/blob1.store/"} + store, err := NewS3Store(&endpoint, credentials.New(&provider), location, StoreOptions{ErrorRetry: 1}, minio.BucketLookupAuto) + if err != nil { + return err + } + + c := make(chan error) + go func() { + chunk, err := store.GetChunk(chunkId) + if err != nil { + c <- err + } + if chunk.ID() != chunkId { + c <- fmt.Errorf("got chunk with id equal to %q, expected %q", chunk.ID(), chunkId) + } + c <- nil + }() + select { + case <-gCtx.Done(): + return nil + case err = <-c: + return err + } + }) + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) + + t.Run("corrupt_body_fail", func(t *testing.T) { + // Server returns a complete, well-formed response with a truncated body, so + // the chunk data fails to decompress/validate (issue #334). With no retries + // configured GetChunk() must return that error - and it must carry the + // underlying decode failure, not the bogus "hash 0000..." mismatch. + ctx, cancel := context.WithCancel(context.Background()) + group, gCtx := errgroup.WithContext(ctx) + + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrCorruptBody, 1) + + group.Go(func() error { + defer cancel() + endpoint := url.URL{Scheme: "s3+http", Host: ln.Addr().String(), Path: "/" + bucket + "/blob1.store/"} + store, err := NewS3Store(&endpoint, credentials.New(&provider), location, StoreOptions{}, minio.BucketLookupAuto) + if err != nil { + return err + } + + c := make(chan error) + go func() { + _, err := store.GetChunk(chunkId) + if err == nil { + c <- errors.New("expected GetChunk to fail on truncated chunk body") + return + } + var ci ChunkInvalid + if !errors.As(err, &ci) { + c <- fmt.Errorf("expected ChunkInvalid, got %T: %v", err, err) + return + } + if ci.Err == nil { + c <- fmt.Errorf("expected ChunkInvalid to carry the underlying decode error, got %v", err) + return + } + c <- nil + }() + select { + case <-gCtx.Done(): + return nil + case err = <-c: + return err + } + }) + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) + + t.Run("corrupt_body_recover", func(t *testing.T) { + // Same truncated-body scenario, but with retries enabled GetChunk() should + // retry the validation failure and succeed once a full response is served. + ctx, cancel := context.WithCancel(context.Background()) + group, gCtx := errgroup.WithContext(ctx) + + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrCorruptBody, 1) group.Go(func() error { defer cancel() @@ -277,9 +401,11 @@ func TestS3StoreGetChunk(t *testing.T) { chunk, err := store.GetChunk(chunkId) if err != nil { c <- err + return } if chunk.ID() != chunkId { c <- fmt.Errorf("got chunk with id equal to %q, expected %q", chunk.ID(), chunkId) + return } c <- nil }()