Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ func NewChunkWithID(id ChunkID, b []byte, skipVerify bool) (*Chunk, error) {
c.idCalculated = true // Pretend this was calculated. No need to re-calc later
return c, nil
}
sum := c.ID()
if sum != id {
if _, err := c.Data(); err != nil {
return nil, ChunkInvalid{ID: id, Err: err}
}
if sum := c.ID(); sum != id { // ID() reuses the now-cached plain data
return nil, ChunkInvalid{ID: id, Sum: sum}
}
return c, nil
Expand All @@ -49,8 +51,10 @@ func NewChunkFromStorage(id ChunkID, b []byte, modifiers Converters, skipVerify
c.idCalculated = true // Pretend this was calculated. No need to re-calc later
return c, nil
}
sum := c.ID()
if sum != id {
if _, err := c.Data(); err != nil { // e.g. failed to decompress (truncated/corrupt storage data)
return nil, ChunkInvalid{ID: id, Err: err}
}
if sum := c.ID(); sum != id { // ID() reuses the now-cached plain data
return nil, ChunkInvalid{ID: id, Sum: sum}
}
return c, nil
Expand Down
67 changes: 67 additions & 0 deletions chunk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package desync

import (
"errors"
"testing"
)

func TestNewChunkFromStorage(t *testing.T) {
conv := Converters{Compressor{}}
plain := []byte("the quick brown fox jumps over the lazy dog")
id := Digest.Sum(plain)
storage, err := conv.toStorage(plain)
if err != nil {
t.Fatal(err)
}

t.Run("valid", func(t *testing.T) {
c, err := NewChunkFromStorage(id, storage, conv, false)
if err != nil {
t.Fatal(err)
}
if c.ID() != id {
t.Fatalf("got id %s, want %s", c.ID(), id)
}
})

t.Run("undecodable storage data", func(t *testing.T) {
// Truncated/garbage storage data fails to decompress. This must surface as
// ChunkInvalid (so RepairableCache and 'verify --repair' still handle it) but
// must also carry the underlying decode error rather than reporting a bogus
// "does not match its hash 0000..." mismatch.
_, err := NewChunkFromStorage(id, []byte("not a valid zstd stream"), conv, false)
var ci ChunkInvalid
if !errors.As(err, &ci) {
t.Fatalf("expected ChunkInvalid, got %T: %v", err, err)
}
if ci.Err == nil {
t.Fatal("expected ChunkInvalid.Err to carry the underlying decode error")
}
if errors.Unwrap(err) == nil {
t.Fatal("expected the error to unwrap to the underlying decode error")
}
})

t.Run("hash mismatch", func(t *testing.T) {
var wrongID ChunkID
wrongID[0] = 0x01
_, err := NewChunkFromStorage(wrongID, storage, conv, false)
var ci ChunkInvalid
if !errors.As(err, &ci) {
t.Fatalf("expected ChunkInvalid, got %T: %v", err, err)
}
if ci.Err != nil {
t.Fatalf("expected ChunkInvalid.Err to be nil for a plain hash mismatch, got %v", ci.Err)
}
if ci.Sum != id {
t.Fatalf("got sum %s, want %s", ci.Sum, id)
}
})

t.Run("skip verify", func(t *testing.T) {
// With skipVerify even undecodable data is accepted (no verification done).
if _, err := NewChunkFromStorage(id, []byte("garbage"), conv, true); err != nil {
t.Fatal(err)
}
})
}
9 changes: 8 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ func (e NoSuchObject) Error() string {
return fmt.Sprintf("object %s missing from store", e.location)
}

// ChunkInvalid means the hash of the chunk content doesn't match its ID
// ChunkInvalid means the hash of the chunk content doesn't match its ID, or the
// chunk could not be validated at all (for example because it failed to decompress).
type ChunkInvalid struct {
ID ChunkID
Sum ChunkID
Err error // underlying cause when validation couldn't be completed; nil for a plain hash mismatch
}

func (e ChunkInvalid) Error() string {
if e.Err != nil {
return fmt.Sprintf("chunk %s could not be validated: %v", e.ID.String(), e.Err)
}
return fmt.Sprintf("chunk id %s does not match its hash %s", e.ID.String(), e.Sum.String())
}

func (e ChunkInvalid) Unwrap() error { return e.Err }

// InvalidFormat is returned when an error occurred when parsing an archive file
type InvalidFormat struct {
Msg string
Expand Down
22 changes: 21 additions & 1 deletion s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
minio "github.com/minio/minio-go/v6"
"github.com/minio/minio-go/v6/pkg/credentials"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var _ WriteStore = S3Store{}
Expand Down Expand Up @@ -106,6 +107,7 @@ retry:
b, err := io.ReadAll(obj)
if err != nil {
if attempt <= s.opt.ErrorRetry {
obj.Close()
time.Sleep(time.Duration(attempt) * s.opt.ErrorRetryBaseInterval)
goto retry
}
Expand All @@ -121,7 +123,25 @@ retry:
}
return nil, err
}
return NewChunkFromStorage(id, b, s.converters, s.opt.SkipVerify)

// A short read of the chunk body (e.g. flaky transport/endpoint) can leave us
// with truncated data that fails to decompress or hash. Treat that the same as
// other transient errors and retry under the --error-retry policy.
chunk, err := NewChunkFromStorage(id, b, s.converters, s.opt.SkipVerify)
if err != nil {
if attempt <= s.opt.ErrorRetry {
Log.WithFields(logrus.Fields{
"chunk": id,
"object": name,
"attempt": attempt,
}).WithError(err).Info("chunk failed validation, retrying")
obj.Close()
time.Sleep(time.Duration(attempt) * s.opt.ErrorRetryBaseInterval)
goto retry
}
return nil, err
}
return chunk, nil
}

// StoreChunk adds a new chunk to the store
Expand Down
Loading
Loading