diff --git a/chunk.go b/chunk.go index c18ca0c..34a63e5 100644 --- a/chunk.go +++ b/chunk.go @@ -33,8 +33,10 @@ func NewChunkWithID(id ChunkID, b []byte, skipVerify bool) (*Chunk, error) { c.idCalculated = true // Pretend this was calculated. No need to re-calc later return c, nil } - sum := c.ID() - if sum != id { + if _, err := c.Data(); err != nil { + return nil, ChunkInvalid{ID: id, Err: err} + } + if sum := c.ID(); sum != id { // ID() reuses the now-cached plain data return nil, ChunkInvalid{ID: id, Sum: sum} } return c, nil @@ -49,8 +51,10 @@ func NewChunkFromStorage(id ChunkID, b []byte, modifiers Converters, skipVerify c.idCalculated = true // Pretend this was calculated. No need to re-calc later return c, nil } - sum := c.ID() - if sum != id { + if _, err := c.Data(); err != nil { // e.g. failed to decompress (truncated/corrupt storage data) + return nil, ChunkInvalid{ID: id, Err: err} + } + if sum := c.ID(); sum != id { // ID() reuses the now-cached plain data return nil, ChunkInvalid{ID: id, Sum: sum} } return c, nil diff --git a/chunk_test.go b/chunk_test.go new file mode 100644 index 0000000..c2f21af --- /dev/null +++ b/chunk_test.go @@ -0,0 +1,67 @@ +package desync + +import ( + "errors" + "testing" +) + +func TestNewChunkFromStorage(t *testing.T) { + conv := Converters{Compressor{}} + plain := []byte("the quick brown fox jumps over the lazy dog") + id := Digest.Sum(plain) + storage, err := conv.toStorage(plain) + if err != nil { + t.Fatal(err) + } + + t.Run("valid", func(t *testing.T) { + c, err := NewChunkFromStorage(id, storage, conv, false) + if err != nil { + t.Fatal(err) + } + if c.ID() != id { + t.Fatalf("got id %s, want %s", c.ID(), id) + } + }) + + t.Run("undecodable storage data", func(t *testing.T) { + // Truncated/garbage storage data fails to decompress. This must surface as + // ChunkInvalid (so RepairableCache and 'verify --repair' still handle it) but + // must also carry the underlying decode error rather than reporting a bogus + // "does not match its hash 0000..." mismatch. + _, err := NewChunkFromStorage(id, []byte("not a valid zstd stream"), conv, false) + var ci ChunkInvalid + if !errors.As(err, &ci) { + t.Fatalf("expected ChunkInvalid, got %T: %v", err, err) + } + if ci.Err == nil { + t.Fatal("expected ChunkInvalid.Err to carry the underlying decode error") + } + if errors.Unwrap(err) == nil { + t.Fatal("expected the error to unwrap to the underlying decode error") + } + }) + + t.Run("hash mismatch", func(t *testing.T) { + var wrongID ChunkID + wrongID[0] = 0x01 + _, err := NewChunkFromStorage(wrongID, storage, conv, false) + var ci ChunkInvalid + if !errors.As(err, &ci) { + t.Fatalf("expected ChunkInvalid, got %T: %v", err, err) + } + if ci.Err != nil { + t.Fatalf("expected ChunkInvalid.Err to be nil for a plain hash mismatch, got %v", ci.Err) + } + if ci.Sum != id { + t.Fatalf("got sum %s, want %s", ci.Sum, id) + } + }) + + t.Run("skip verify", func(t *testing.T) { + // With skipVerify even undecodable data is accepted (no verification done). + if _, err := NewChunkFromStorage(id, []byte("garbage"), conv, true); err != nil { + t.Fatal(err) + } + }) +} diff --git a/errors.go b/errors.go index 45425c6..51b1596 100644 --- a/errors.go +++ b/errors.go @@ -20,16 +20,23 @@ func (e NoSuchObject) Error() string { return fmt.Sprintf("object %s missing from store", e.location) } -// ChunkInvalid means the hash of the chunk content doesn't match its ID +// ChunkInvalid means the hash of the chunk content doesn't match its ID, or the +// chunk could not be validated at all (for example because it failed to decompress). type ChunkInvalid struct { ID ChunkID Sum ChunkID + Err error // underlying cause when validation couldn't be completed; nil for a plain hash mismatch } func (e ChunkInvalid) Error() string { + if e.Err != nil { + return fmt.Sprintf("chunk %s could not be validated: %v", e.ID.String(), e.Err) + } return fmt.Sprintf("chunk id %s does not match its hash %s", e.ID.String(), e.Sum.String()) } +func (e ChunkInvalid) Unwrap() error { return e.Err } + // InvalidFormat is returned when an error occurred when parsing an archive file type InvalidFormat struct { Msg string diff --git a/s3.go b/s3.go index f4eae34..404b436 100644 --- a/s3.go +++ b/s3.go @@ -12,6 +12,7 @@ import ( minio "github.com/minio/minio-go/v6" "github.com/minio/minio-go/v6/pkg/credentials" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) var _ WriteStore = S3Store{} @@ -106,6 +107,7 @@ retry: b, err := io.ReadAll(obj) if err != nil { if attempt <= s.opt.ErrorRetry { + obj.Close() time.Sleep(time.Duration(attempt) * s.opt.ErrorRetryBaseInterval) goto retry } @@ -121,7 +123,25 @@ retry: } return nil, err } - return NewChunkFromStorage(id, b, s.converters, s.opt.SkipVerify) + + // A short read of the chunk body (e.g. flaky transport/endpoint) can leave us + // with truncated data that fails to decompress or hash. Treat that the same as + // other transient errors and retry under the --error-retry policy. + chunk, err := NewChunkFromStorage(id, b, s.converters, s.opt.SkipVerify) + if err != nil { + if attempt <= s.opt.ErrorRetry { + Log.WithFields(logrus.Fields{ + "chunk": id, + "object": name, + "attempt": attempt, + }).WithError(err).Info("chunk failed validation, retrying") + obj.Close() + time.Sleep(time.Duration(attempt) * s.opt.ErrorRetryBaseInterval) + goto retry + } + return nil, err + } + return chunk, nil } // StoreChunk adds a new chunk to the store diff --git a/s3_test.go b/s3_test.go index abee656..f940cd0 100644 --- a/s3_test.go +++ b/s3_test.go @@ -47,7 +47,16 @@ func response(request *http.Request, headers http.Header, statusCode int, body s } } -func sendObject(conn *net.TCPConn, request *http.Request, filePath string, sendRst bool) error { +// s3ErrMode describes how the fake S3 server should mishandle an object request. +type s3ErrMode int + +const ( + s3ErrNone s3ErrMode = iota // serve the object normally + s3ErrRST // send half the body, then force a TCP RST (transport error) + s3ErrCorruptBody // serve a complete, well-formed response whose body is truncated +) + +func sendObject(conn *net.TCPConn, request *http.Request, filePath string, mode s3ErrMode) error { file, err := os.Open(filePath) if err != nil { if os.IsNotExist(err) { @@ -70,19 +79,10 @@ func sendObject(conn *net.TCPConn, request *http.Request, filePath string, sendR headers := http.Header{} headers.Add("Last-Modified", stat.ModTime().Format(http.TimeFormat)) headers.Add("Content-Type", "application/octet-stream") - headers.Add("Content-Length", strconv.FormatInt(stat.Size(), 10)) - if !sendRst { - resp := http.Response{ - StatusCode: 200, - ProtoMajor: 1, - ProtoMinor: 0, - Request: request, - Body: file, - Header: headers, - } - resp.Write(conn) - } else { + switch mode { + case s3ErrRST: + headers.Add("Content-Length", strconv.FormatInt(stat.Size(), 10)) if _, err := io.WriteString(conn, "HTTP/1.0 200 OK\r\n"); err != nil { return err } @@ -103,11 +103,41 @@ func sendObject(conn *net.TCPConn, request *http.Request, filePath string, sendR if err := conn.Close(); err != nil { return err } + case s3ErrCorruptBody: + // Serve a complete, well-formed HTTP response whose Content-Length matches + // the bytes actually written, but only write the first half of the file. The + // client reads it cleanly (no transport error), but the truncated chunk data + // fails to decompress/validate - the scenario reported in issue #334. + half := stat.Size() / 2 + headers.Add("Content-Length", strconv.FormatInt(half, 10)) + if _, err := io.WriteString(conn, "HTTP/1.0 200 OK\r\n"); err != nil { + return err + } + if err := headers.Write(conn); err != nil { + return err + } + if _, err := io.WriteString(conn, "\r\n"); err != nil { + return err + } + if _, err := io.CopyN(conn, file, half); err != nil { + return err + } + default: + headers.Add("Content-Length", strconv.FormatInt(stat.Size(), 10)) + resp := http.Response{ + StatusCode: 200, + ProtoMajor: 1, + ProtoMinor: 0, + Request: request, + Body: file, + Header: headers, + } + resp.Write(conn) } return nil } -func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorTimes *int, errorTimesLimit int) error { +func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorMode s3ErrMode, errorTimes *int, errorTimesLimit int) error { defer conn.Close() objectGetMatcher := regexp.MustCompile(`^/` + bucket + `/(.+)$`) @@ -119,7 +149,11 @@ func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorTimes matches := objectGetMatcher.FindStringSubmatch(request.URL.Path) if matches != nil { - err = sendObject(conn, request, store+"/"+matches[1], *errorTimes < errorTimesLimit) + mode := s3ErrNone + if *errorTimes < errorTimesLimit { + mode = errorMode + } + err = sendObject(conn, request, store+"/"+matches[1], mode) (*errorTimes)++ } else { resp := response(request, http.Header{}, 400, "") @@ -128,9 +162,10 @@ func handleGetObjectRequest(conn *net.TCPConn, bucket, store string, errorTimes return err } -// Run S3 server that can respond objects from `store` -// if `errorTimesLimit` > 0 server will send RST packet `errorTimesLimit` times after sending half of file -func getTcpS3Server(t *testing.T, group *errgroup.Group, ctx context.Context, bucket, store string, errorTimesLimit int) net.Listener { +// Run S3 server that can respond objects from `store`. The first `errorTimesLimit` +// object requests are mishandled according to `errorMode` (e.g. truncated body or +// forced TCP reset); subsequent requests are served normally. +func getTcpS3Server(t *testing.T, group *errgroup.Group, ctx context.Context, bucket, store string, errorMode s3ErrMode, errorTimesLimit int) net.Listener { var errorTimes int // using localhost + resolver let us work on hosts that support only ipv6 or only ipv4 ip, err := net.DefaultResolver.LookupIP(ctx, "ip", "localhost") @@ -161,7 +196,7 @@ func getTcpS3Server(t *testing.T, group *errgroup.Group, ctx context.Context, bu } return err } - err = handleGetObjectRequest(conn, bucket, store, &errorTimes, errorTimesLimit) + err = handleGetObjectRequest(conn, bucket, store, errorMode, &errorTimes, errorTimesLimit) if err != nil { return err } @@ -184,7 +219,7 @@ func TestS3StoreGetChunk(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) group, gCtx := errgroup.WithContext(ctx) - ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", 0) + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrNone, 0) group.Go(func() error { defer cancel() @@ -224,7 +259,7 @@ func TestS3StoreGetChunk(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) group, gCtx := errgroup.WithContext(ctx) - ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", 1) + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrRST, 1) group.Go(func() error { defer cancel() @@ -262,7 +297,96 @@ func TestS3StoreGetChunk(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) group, gCtx := errgroup.WithContext(ctx) - ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", 1) + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrRST, 1) + + group.Go(func() error { + defer cancel() + endpoint := url.URL{Scheme: "s3+http", Host: ln.Addr().String(), Path: "/" + bucket + "/blob1.store/"} + store, err := NewS3Store(&endpoint, credentials.New(&provider), location, StoreOptions{ErrorRetry: 1}, minio.BucketLookupAuto) + if err != nil { + return err + } + + c := make(chan error) + go func() { + chunk, err := store.GetChunk(chunkId) + if err != nil { + c <- err + } + if chunk.ID() != chunkId { + c <- fmt.Errorf("got chunk with id equal to %q, expected %q", chunk.ID(), chunkId) + } + c <- nil + }() + select { + case <-gCtx.Done(): + return nil + case err = <-c: + return err + } + }) + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) + + t.Run("corrupt_body_fail", func(t *testing.T) { + // Server returns a complete, well-formed response with a truncated body, so + // the chunk data fails to decompress/validate (issue #334). With no retries + // configured GetChunk() must return that error - and it must carry the + // underlying decode failure, not the bogus "hash 0000..." mismatch. + ctx, cancel := context.WithCancel(context.Background()) + group, gCtx := errgroup.WithContext(ctx) + + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrCorruptBody, 1) + + group.Go(func() error { + defer cancel() + endpoint := url.URL{Scheme: "s3+http", Host: ln.Addr().String(), Path: "/" + bucket + "/blob1.store/"} + store, err := NewS3Store(&endpoint, credentials.New(&provider), location, StoreOptions{}, minio.BucketLookupAuto) + if err != nil { + return err + } + + c := make(chan error) + go func() { + _, err := store.GetChunk(chunkId) + if err == nil { + c <- errors.New("expected GetChunk to fail on truncated chunk body") + return + } + var ci ChunkInvalid + if !errors.As(err, &ci) { + c <- fmt.Errorf("expected ChunkInvalid, got %T: %v", err, err) + return + } + if ci.Err == nil { + c <- fmt.Errorf("expected ChunkInvalid to carry the underlying decode error, got %v", err) + return + } + c <- nil + }() + select { + case <-gCtx.Done(): + return nil + case err = <-c: + return err + } + }) + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + }) + + t.Run("corrupt_body_recover", func(t *testing.T) { + // Same truncated-body scenario, but with retries enabled GetChunk() should + // retry the validation failure and succeed once a full response is served. + ctx, cancel := context.WithCancel(context.Background()) + group, gCtx := errgroup.WithContext(ctx) + + ln := getTcpS3Server(t, group, ctx, bucket, "cmd/desync/testdata", s3ErrCorruptBody, 1) group.Go(func() error { defer cancel() @@ -277,9 +401,11 @@ func TestS3StoreGetChunk(t *testing.T) { chunk, err := store.GetChunk(chunkId) if err != nil { c <- err + return } if chunk.ID() != chunkId { c <- fmt.Errorf("got chunk with id equal to %q, expected %q", chunk.ID(), chunkId) + return } c <- nil }()