From 8d6bb835c75c2c722351fa0ebd677f2feb81e64e Mon Sep 17 00:00:00 2001 From: Prateek Kumar Date: Mon, 18 May 2026 18:45:37 -0700 Subject: [PATCH] Add the Write Path with Overlay and Events. --- blockdev.go | 103 +++++++++++++++++++++++++++++++++--- internal/overlay/overlay.go | 48 +++++++++++++++++ options.go | 64 ++++++++++++++++++++++ 3 files changed, 208 insertions(+), 7 deletions(-) create mode 100644 internal/overlay/overlay.go create mode 100644 options.go diff --git a/blockdev.go b/blockdev.go index 2289c80..10e3edf 100644 --- a/blockdev.go +++ b/blockdev.go @@ -3,7 +3,9 @@ package blockdev import ( "fmt" "io" + "time" + "github.com/codingminions/blockdev/internal/overlay" "github.com/codingminions/blockdev/internal/validate" ) @@ -16,22 +18,33 @@ const BlockSize = 4096 // after handoff; defensive copy would double peak memory when many sandboxes // share one image. type BlockDevice struct { - base []byte - length int64 + base []byte + length int64 + overlay *overlay.Store + cfg config } -var _ io.ReaderAt = (*BlockDevice)(nil) +var ( + _ io.ReaderAt = (*BlockDevice)(nil) + _ io.WriterAt = (*BlockDevice)(nil) +) // New validates alignment up front rather than at first ReadAt — a misaligned // base is a configuration bug, and we'd rather fail at construction than after // a sandbox has been wired up. -func New(initial []byte) (*BlockDevice, error) { +func New(initial []byte, opts ...Option) (*BlockDevice, error) { if err := validate.Alignment(0, len(initial)); err != nil { return nil, fmt.Errorf("blockdev.New: len(initial)=%d: %w", len(initial), ErrMisaligned) } + var cfg config + for _, opt := range opts { + opt(&cfg) + } return &BlockDevice{ - base: initial, - length: int64(len(initial)), + base: initial, + length: int64(len(initial)), + overlay: overlay.New(), + cfg: cfg, }, nil } @@ -41,6 +54,26 @@ func New(initial []byte) (*BlockDevice, error) { // On any failure returns (0, err) and leaves p untouched — partial reads // could corrupt the guest filesystem before the kernel notices. func (b *BlockDevice) ReadAt(p []byte, off int64) (int, error) { + var start time.Time + if b.cfg.observer != nil { + start = time.Now() + } + n, err := b.readAt(p, off) + if b.cfg.observer != nil { + b.fireObserver(Event{ + Op: OpRead, + Device: b.cfg.name, + Offset: off, + Length: len(p), + Blocks: len(p) / BlockSize, + Duration: time.Since(start), + Err: err, + }) + } + return n, err +} + +func (b *BlockDevice) readAt(p []byte, off int64) (int, error) { if err := validate.Bounds(off, len(p), b.length); err != nil { return 0, fmt.Errorf("blockdev.ReadAt off=%d len=%d device=%d: %w", off, len(p), b.length, ErrOutOfBounds) @@ -52,6 +85,62 @@ func (b *BlockDevice) ReadAt(p []byte, off int64) (int, error) { if len(p) == 0 { return 0, nil } - copy(p, b.base[off:off+int64(len(p))]) + for i := 0; i < len(p); i += BlockSize { + blockOff := off + int64(i) + blockNum := blockOff / BlockSize + if data, ok := b.overlay.Get(blockNum); ok { + copy(p[i:i+BlockSize], data) + } else { + copy(p[i:i+BlockSize], b.base[blockOff:blockOff+BlockSize]) + } + } + return len(p), nil +} + +// WriteAt captures writes into the overlay; the base is never mutated. Caller +// may reuse p immediately — the overlay copies what it stores. +func (b *BlockDevice) WriteAt(p []byte, off int64) (int, error) { + var start time.Time + if b.cfg.observer != nil { + start = time.Now() + } + n, err := b.writeAt(p, off) + if b.cfg.observer != nil { + b.fireObserver(Event{ + Op: OpWrite, + Device: b.cfg.name, + Offset: off, + Length: len(p), + Blocks: len(p) / BlockSize, + Duration: time.Since(start), + Err: err, + }) + } + return n, err +} + +func (b *BlockDevice) writeAt(p []byte, off int64) (int, error) { + if err := validate.Bounds(off, len(p), b.length); err != nil { + return 0, fmt.Errorf("blockdev.WriteAt off=%d len=%d device=%d: %w", + off, len(p), b.length, ErrOutOfBounds) + } + if err := validate.Alignment(off, len(p)); err != nil { + return 0, fmt.Errorf("blockdev.WriteAt off=%d len=%d: %w", + off, len(p), ErrMisaligned) + } + if len(p) == 0 { + return 0, nil + } + for i := 0; i < len(p); i += BlockSize { + blockNum := (off + int64(i)) / BlockSize + b.overlay.Put(blockNum, p[i:i+BlockSize]) + } return len(p), nil } + +// Isolated so the deferred recover catches only the observer's panic, never a +// panic that originated in the I/O path. +func (b *BlockDevice) fireObserver(e Event) { + defer func() { _ = recover() }() + b.cfg.observer(e) +} diff --git a/internal/overlay/overlay.go b/internal/overlay/overlay.go new file mode 100644 index 0000000..b86438e --- /dev/null +++ b/internal/overlay/overlay.go @@ -0,0 +1,48 @@ +// Package overlay is the copy-on-write block store: reads expose the stored +// slice (callers must not mutate), Puts copy so callers can reuse their buffer. +package overlay + +import "sync" + +type Store struct { + mu sync.RWMutex + m map[int64][]byte +} + +func New() *Store { + return &Store{m: make(map[int64][]byte)} +} + +// Returned slice aliases the store — caller must not mutate. +func (s *Store) Get(block int64) ([]byte, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + data, ok := s.m[block] + return data, ok +} + +// Copies data so the caller can reuse its buffer immediately. +func (s *Store) Put(block int64, data []byte) { + buf := make([]byte, len(data)) + copy(buf, data) + s.mu.Lock() + s.m[block] = buf + s.mu.Unlock() +} + +// fn must not mutate data; return false to stop early. +func (s *Store) Range(fn func(block int64, data []byte) bool) { + s.mu.RLock() + defer s.mu.RUnlock() + for block, data := range s.m { + if !fn(block, data) { + return + } + } +} + +func (s *Store) Len() int { + s.mu.RLock() + defer s.mu.RUnlock() + return len(s.m) +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..5040e44 --- /dev/null +++ b/options.go @@ -0,0 +1,64 @@ +package blockdev + +import "time" + +// Option is the functional-option type. Variadic in New keeps the constructor +// extensible without forcing a breaking change every time a knob is added. +type Option func(*config) + +type config struct { + name string + observer func(Event) +} + +// WithName tags the device so a single observer can distinguish events from +// many devices (one per agent in the demo). +func WithName(name string) Option { + return func(c *config) { c.name = name } +} + +// WithObserver installs a synchronous callback invoked after every public +// operation. Panics from the callback are recovered so user instrumentation +// cannot crash the I/O path. +func WithObserver(fn func(Event)) Option { + return func(c *config) { c.observer = fn } +} + +// Op identifies the operation that produced an Event. +type Op uint8 + +const ( + OpRead Op = iota + 1 + OpWrite + OpSerialize + OpDeserialize +) + +func (o Op) String() string { + switch o { + case OpRead: + return "read" + case OpWrite: + return "write" + case OpSerialize: + return "serialize" + case OpDeserialize: + return "deserialize" + default: + return "unknown" + } +} + +// Event is the per-operation record passed to WithObserver's callback. +// Length and Blocks have op-specific meaning: for Read/Write they describe +// the request; for Serialize/Deserialize they describe the blob size and the +// number of changed-block entries. +type Event struct { + Op Op + Device string + Offset int64 + Length int + Blocks int + Duration time.Duration + Err error +}