Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 96 additions & 7 deletions blockdev.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package blockdev
import (
"fmt"
"io"
"time"

"github.com/codingminions/blockdev/internal/overlay"
"github.com/codingminions/blockdev/internal/validate"
)

Expand All @@ -16,22 +18,33 @@ const BlockSize = 4096
// after handoff; defensive copy would double peak memory when many sandboxes
// share one image.
type BlockDevice struct {
base []byte
length int64
base []byte
length int64
overlay *overlay.Store
cfg config
}

var _ io.ReaderAt = (*BlockDevice)(nil)
var (
_ io.ReaderAt = (*BlockDevice)(nil)
_ io.WriterAt = (*BlockDevice)(nil)
)

// New validates alignment up front rather than at first ReadAt — a misaligned
// base is a configuration bug, and we'd rather fail at construction than after
// a sandbox has been wired up.
func New(initial []byte) (*BlockDevice, error) {
func New(initial []byte, opts ...Option) (*BlockDevice, error) {
if err := validate.Alignment(0, len(initial)); err != nil {
return nil, fmt.Errorf("blockdev.New: len(initial)=%d: %w", len(initial), ErrMisaligned)
}
var cfg config
for _, opt := range opts {
opt(&cfg)
}
return &BlockDevice{
base: initial,
length: int64(len(initial)),
base: initial,
length: int64(len(initial)),
overlay: overlay.New(),
cfg: cfg,
}, nil
}

Expand All @@ -41,6 +54,26 @@ func New(initial []byte) (*BlockDevice, error) {
// On any failure returns (0, err) and leaves p untouched — partial reads
// could corrupt the guest filesystem before the kernel notices.
func (b *BlockDevice) ReadAt(p []byte, off int64) (int, error) {
var start time.Time
if b.cfg.observer != nil {
start = time.Now()
}
n, err := b.readAt(p, off)
if b.cfg.observer != nil {
b.fireObserver(Event{
Op: OpRead,
Device: b.cfg.name,
Offset: off,
Length: len(p),
Blocks: len(p) / BlockSize,
Duration: time.Since(start),
Err: err,
})
}
return n, err
}

func (b *BlockDevice) readAt(p []byte, off int64) (int, error) {
if err := validate.Bounds(off, len(p), b.length); err != nil {
return 0, fmt.Errorf("blockdev.ReadAt off=%d len=%d device=%d: %w",
off, len(p), b.length, ErrOutOfBounds)
Expand All @@ -52,6 +85,62 @@ func (b *BlockDevice) ReadAt(p []byte, off int64) (int, error) {
if len(p) == 0 {
return 0, nil
}
copy(p, b.base[off:off+int64(len(p))])
for i := 0; i < len(p); i += BlockSize {
blockOff := off + int64(i)
blockNum := blockOff / BlockSize
if data, ok := b.overlay.Get(blockNum); ok {
copy(p[i:i+BlockSize], data)
} else {
copy(p[i:i+BlockSize], b.base[blockOff:blockOff+BlockSize])
}
}
return len(p), nil
}

// WriteAt captures writes into the overlay; the base is never mutated. Caller
// may reuse p immediately — the overlay copies what it stores.
func (b *BlockDevice) WriteAt(p []byte, off int64) (int, error) {
var start time.Time
if b.cfg.observer != nil {
start = time.Now()
}
n, err := b.writeAt(p, off)
if b.cfg.observer != nil {
b.fireObserver(Event{
Op: OpWrite,
Device: b.cfg.name,
Offset: off,
Length: len(p),
Blocks: len(p) / BlockSize,
Duration: time.Since(start),
Err: err,
})
}
return n, err
}

func (b *BlockDevice) writeAt(p []byte, off int64) (int, error) {
if err := validate.Bounds(off, len(p), b.length); err != nil {
return 0, fmt.Errorf("blockdev.WriteAt off=%d len=%d device=%d: %w",
off, len(p), b.length, ErrOutOfBounds)
}
if err := validate.Alignment(off, len(p)); err != nil {
return 0, fmt.Errorf("blockdev.WriteAt off=%d len=%d: %w",
off, len(p), ErrMisaligned)
}
if len(p) == 0 {
return 0, nil
}
for i := 0; i < len(p); i += BlockSize {
blockNum := (off + int64(i)) / BlockSize
b.overlay.Put(blockNum, p[i:i+BlockSize])
}
return len(p), nil
}

// Isolated so the deferred recover catches only the observer's panic, never a
// panic that originated in the I/O path.
func (b *BlockDevice) fireObserver(e Event) {
defer func() { _ = recover() }()
b.cfg.observer(e)
}
48 changes: 48 additions & 0 deletions internal/overlay/overlay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Package overlay is the copy-on-write block store: reads expose the stored
// slice (callers must not mutate), Puts copy so callers can reuse their buffer.
package overlay

import "sync"

type Store struct {
mu sync.RWMutex
m map[int64][]byte
}

func New() *Store {
return &Store{m: make(map[int64][]byte)}
}

// Returned slice aliases the store — caller must not mutate.
func (s *Store) Get(block int64) ([]byte, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
data, ok := s.m[block]
return data, ok
}

// Copies data so the caller can reuse its buffer immediately.
func (s *Store) Put(block int64, data []byte) {
buf := make([]byte, len(data))
copy(buf, data)
s.mu.Lock()
s.m[block] = buf
s.mu.Unlock()
}

// fn must not mutate data; return false to stop early.
func (s *Store) Range(fn func(block int64, data []byte) bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for block, data := range s.m {
if !fn(block, data) {
return
}
}
}

func (s *Store) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.m)
}
64 changes: 64 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package blockdev

import "time"

// Option is the functional-option type. Variadic in New keeps the constructor
// extensible without forcing a breaking change every time a knob is added.
type Option func(*config)

type config struct {
name string
observer func(Event)
}

// WithName tags the device so a single observer can distinguish events from
// many devices (one per agent in the demo).
func WithName(name string) Option {
return func(c *config) { c.name = name }
}

// WithObserver installs a synchronous callback invoked after every public
// operation. Panics from the callback are recovered so user instrumentation
// cannot crash the I/O path.
func WithObserver(fn func(Event)) Option {
return func(c *config) { c.observer = fn }
}

// Op identifies the operation that produced an Event.
type Op uint8

const (
OpRead Op = iota + 1
OpWrite
OpSerialize
OpDeserialize
)

func (o Op) String() string {
switch o {
case OpRead:
return "read"
case OpWrite:
return "write"
case OpSerialize:
return "serialize"
case OpDeserialize:
return "deserialize"
default:
return "unknown"
}
}

// Event is the per-operation record passed to WithObserver's callback.
// Length and Blocks have op-specific meaning: for Read/Write they describe
// the request; for Serialize/Deserialize they describe the blob size and the
// number of changed-block entries.
type Event struct {
Op Op
Device string
Offset int64
Length int
Blocks int
Duration time.Duration
Err error
}
Loading