Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions packages/orchestrator/internal/sandbox/block/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ import (
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

type Chunker struct {
// Chunker is the interface satisfied by both FullFetchChunker and StreamingChunker.
type Chunker interface {
Slice(ctx context.Context, off, length int64) ([]byte, error)
ReadAt(ctx context.Context, b []byte, off int64) (int, error)
WriteTo(ctx context.Context, w io.Writer) (int64, error)
Close() error
FileSize() (int64, error)
}

type FullFetchChunker struct {
base storage.SeekableReader
cache *Cache
metrics metrics.Metrics
Expand All @@ -28,18 +37,18 @@ type Chunker struct {
fetchers *utils.WaitMap
}

func NewChunker(
func NewFullFetchChunker(
size, blockSize int64,
base storage.SeekableReader,
cachePath string,
metrics metrics.Metrics,
) (*Chunker, error) {
) (*FullFetchChunker, error) {
cache, err := NewCache(size, blockSize, cachePath, false)
if err != nil {
return nil, fmt.Errorf("failed to create file cache: %w", err)
}

chunker := &Chunker{
chunker := &FullFetchChunker{
size: size,
base: base,
cache: cache,
Expand All @@ -50,7 +59,7 @@ func NewChunker(
return chunker, nil
}

func (c *Chunker) ReadAt(ctx context.Context, b []byte, off int64) (int, error) {
func (c *FullFetchChunker) ReadAt(ctx context.Context, b []byte, off int64) (int, error) {
slice, err := c.Slice(ctx, off, int64(len(b)))
if err != nil {
return 0, fmt.Errorf("failed to slice cache at %d-%d: %w", off, off+int64(len(b)), err)
Expand All @@ -59,7 +68,7 @@ func (c *Chunker) ReadAt(ctx context.Context, b []byte, off int64) (int, error)
return copy(b, slice), nil
}

func (c *Chunker) WriteTo(ctx context.Context, w io.Writer) (int64, error) {
func (c *FullFetchChunker) WriteTo(ctx context.Context, w io.Writer) (int64, error) {
for i := int64(0); i < c.size; i += storage.MemoryChunkSize {
chunk := make([]byte, storage.MemoryChunkSize)

Expand All @@ -77,7 +86,7 @@ func (c *Chunker) WriteTo(ctx context.Context, w io.Writer) (int64, error) {
return c.size, nil
}

func (c *Chunker) Slice(ctx context.Context, off, length int64) ([]byte, error) {
func (c *FullFetchChunker) Slice(ctx context.Context, off, length int64) ([]byte, error) {
timer := c.metrics.SlicesTimerFactory.Begin()

b, err := c.cache.Slice(off, length)
Expand Down Expand Up @@ -121,7 +130,7 @@ func (c *Chunker) Slice(ctx context.Context, off, length int64) ([]byte, error)
}

// fetchToCache ensures that the data at the given offset and length is available in the cache.
func (c *Chunker) fetchToCache(ctx context.Context, off, length int64) error {
func (c *FullFetchChunker) fetchToCache(ctx context.Context, off, length int64) error {
var eg errgroup.Group

chunks := header.BlocksOffsets(length, storage.MemoryChunkSize)
Expand Down Expand Up @@ -194,11 +203,11 @@ func (c *Chunker) fetchToCache(ctx context.Context, off, length int64) error {
return nil
}

func (c *Chunker) Close() error {
func (c *FullFetchChunker) Close() error {
return c.cache.Close()
}

func (c *Chunker) FileSize() (int64, error) {
func (c *FullFetchChunker) FileSize() (int64, error) {
return c.cache.FileSize()
}

Expand Down
Loading
Loading