From 0bec5f739366aefe6c89db4e43514da9080e9ff4 Mon Sep 17 00:00:00 2001 From: carlon Date: Wed, 3 Dec 2025 16:27:53 -0500 Subject: [PATCH] Add support for storing data. Add definitions for the NotifyStore lowlevel operation, and extend the Notifier to support writing data directly into page cache. Include an example filesystem similar to the one in libfuse or notify_inval. --- internal/fusekernel/fuse_kernel.go | 8 + notifier.go | 41 +++++ samples/mount_notify_store/mount.go | 41 +++++ samples/notify_inval/notify_inval.go | 3 + samples/notify_store/notify_store.go | 182 ++++++++++++++++++++++ samples/notify_store/notify_store_test.go | 76 +++++++++ 6 files changed, 351 insertions(+) create mode 100644 samples/mount_notify_store/mount.go create mode 100644 samples/notify_store/notify_store.go create mode 100644 samples/notify_store/notify_store_test.go diff --git a/internal/fusekernel/fuse_kernel.go b/internal/fusekernel/fuse_kernel.go index 8c47b524..7bbf8fb3 100644 --- a/internal/fusekernel/fuse_kernel.go +++ b/internal/fusekernel/fuse_kernel.go @@ -803,6 +803,7 @@ const ( NotifyCodePoll int32 = 1 NotifyCodeInvalInode int32 = 2 NotifyCodeInvalEntry int32 = 3 + NotifyCodeStore int32 = 4 ) type NotifyInvalInodeOut struct { @@ -817,6 +818,13 @@ type NotifyInvalEntryOut struct { padding uint32 } +type NotifyStoreOut struct { + NodeID uint64 + Offset int64 + Size uint32 + padding uint32 +} + type SyncFSIn struct { Padding uint64 } diff --git a/notifier.go b/notifier.go index e9ea38bc..d976a366 100644 --- a/notifier.go +++ b/notifier.go @@ -13,12 +13,14 @@ import ( type Notifier struct { inodeInvalidations chan invalidateInodeCommand dentryInvalidations chan invalidateEntryCommand + stores chan storeCommand } func NewNotifier() *Notifier { return &Notifier{ inodeInvalidations: make(chan invalidateInodeCommand), dentryInvalidations: make(chan invalidateEntryCommand), + stores: make(chan storeCommand), } } @@ -37,6 +39,13 @@ type invalidateEntryCommand struct { done chan<- error } +type storeCommand struct { + nodeid fuseops.InodeID + offset int64 + data []byte + done chan<- error +} + // InvalidateInode notifies the kernel to invalidate an inode cache entry. See // the libfuse documentation at // https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#a9cb974af9745294ff446d11cba2422f1 @@ -65,6 +74,19 @@ func (n *Notifier) InvalidateEntry(parent fuseops.InodeID, name string) error { return <-done } +// Store notifies the kernel to store the given data at the offset for the +// target inode. See the libfuse documentation at +// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#af856725ed4a13ed7c17512554043edbc +// for more details. Note that data must be no more than 4GiB in length. +// +// Store blocks until the kernel write completes, and returns the error from the +// kernel, if any. ENOSYS indicates that the kernel does not support stores. +func (n *Notifier) Store(nodeid fuseops.InodeID, offset int64, data []byte) error { + done := make(chan error) + n.stores <- storeCommand{nodeid, offset, data, done} + return <-done +} + func serviceInodeInvalidation(c *Connection, inode fuseops.InodeID, offset, length int64) error { outMsg := c.getOutMessage() defer c.putOutMessage(outMsg) @@ -101,6 +123,23 @@ func serviceEntryInval(c *Connection, parent fuseops.InodeID, name string) error return c.writeOutMessage(outMsg) } +func serviceStore(c *Connection, nodeid fuseops.InodeID, offset int64, data []byte) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + + cmd := fusekernel.NotifyStoreOut{ + NodeID: uint64(nodeid), + Offset: offset, + Size: uint32(len(data)), + } + outMsg.Append(unsafe.Slice((*byte)(unsafe.Pointer(&cmd)), int(unsafe.Sizeof(cmd)))) + outMsg.Append(data) + + outMsg.OutHeader().Error = fusekernel.NotifyCodeStore + outMsg.OutHeader().Len = uint32(outMsg.Len()) + return c.writeOutMessage(outMsg) +} + func (n *Notifier) notify(c *Connection, terminate <-chan struct{}) { for { select { @@ -108,6 +147,8 @@ func (n *Notifier) notify(c *Connection, terminate <-chan struct{}) { i.done <- serviceInodeInvalidation(c, i.inode, i.offset, i.length) case e := <-n.dentryInvalidations: e.done <- serviceEntryInval(c, e.parent, e.name) + case s := <-n.stores: + s.done <- serviceStore(c, s.nodeid, s.offset, s.data) case <-terminate: return } diff --git a/samples/mount_notify_store/mount.go b/samples/mount_notify_store/mount.go new file mode 100644 index 00000000..2402b282 --- /dev/null +++ b/samples/mount_notify_store/mount.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "flag" + "log" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/samples/notify_store" +) + +var mountPoint = flag.String("mountpoint", "", "directory to mount the filesystem") + +type ticker struct { + *time.Ticker +} + +func (t *ticker) Ticks() <-chan time.Time { + return t.Ticker.C +} + +func (t *ticker) Tocks() chan<- time.Time { return nil } + +func main() { + flag.Parse() + + if *mountPoint == "" { + log.Fatalf("--mountpoint is required") + } + + t := &ticker{time.NewTicker(time.Second)} + server := notify_store.NewNotifyStoreFS(t) + mfs, err := fuse.Mount(*mountPoint, server, &fuse.MountConfig{}) + if err != nil { + panic(err) + } + if err := mfs.Join(context.Background()); err != nil { + panic(err) + } +} diff --git a/samples/notify_inval/notify_inval.go b/samples/notify_inval/notify_inval.go index 32f52a53..e510b58c 100644 --- a/samples/notify_inval/notify_inval.go +++ b/samples/notify_inval/notify_inval.go @@ -33,6 +33,9 @@ type NotifyTimer interface { // // Unlike package dynamicfs, this implementation does _not_ depend on direct IO. // The invalidations allow file operations to eventually observe the changes. +// +// Note that there is overlap with package notify_store, so that each is a +// self-contained example. func NewNotifyInvalFS(t NotifyTimer) fuse.Server { n := fuse.NewNotifier() fs := ¬ifyInvalInodeFS{ diff --git a/samples/notify_store/notify_store.go b/samples/notify_store/notify_store.go new file mode 100644 index 00000000..3dd6cc5a --- /dev/null +++ b/samples/notify_store/notify_store.go @@ -0,0 +1,182 @@ +package notify_store + +import ( + "context" + "fmt" + "os" + "sync" + "syscall" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/fuseutil" +) + +var timeLen = len(time.Time{}.Format(time.RFC3339)) + +// NotifyTimer may emit times on Ticks() to trigger filesystem changes. The +// fuse.Server emits the same times in the same order on Tocks(), if not nil, to +// indicate that invalidation is complete. +type NotifyTimer interface { + Ticks() <-chan time.Time + Tocks() chan<- time.Time +} + +// Create a file system with a single file named 'current_time' which always +// contains the current time. +// +// This filesystem is an analog to the libfuse example here: +// https://github.com/libfuse/libfuse/blob/master/example/notify_store_retrieve.c +// +// Unlike package dynamicfs, this implementation does _not_ depend on direct IO. +// The filesystem directly modifies the page cache so file operations eventually +// observe the changes. +// +// Note that there is overlap with package notify_inval, so that each is a +// self-contained example. +func NewNotifyStoreFS(t NotifyTimer) fuse.Server { + n := fuse.NewNotifier() + fs := ¬ifyStoreFS{ + notifier: n, + teardown: make(chan struct{}), + } + + ticks := t.Ticks() + tocks := t.Tocks() + go func() { + for { + select { + case t := <-ticks: + fs.mu.Lock() + fs.currentTime = t + fs.mu.Unlock() + fs.store(t) + if tocks != nil { + tocks <- t + } + case <-fs.teardown: + return + } + } + }() + + return fuse.NewServerWithNotifier(n, fuseutil.NewFileSystemServer(fs)) +} + +type notifyStoreFS struct { + fuseutil.NotImplementedFileSystem + + notifier *fuse.Notifier + teardown chan struct{} + + mu sync.Mutex + currentTime time.Time +} + +const ( + currentTimeFilename = "current_time" + + currentTimeInode = fuseops.RootInodeID + iota +) + +func (fs *notifyStoreFS) store(t time.Time) { + if err := fs.notifier.Store(currentTimeInode, 0, []byte(t.Format(time.RFC3339)+"\n")); err != nil { + fmt.Printf("error storing current_time inode %v: %v\n", currentTimeInode, err) + } +} + +func (fs *notifyStoreFS) fillStat(ino fuseops.InodeID, attrs *fuseops.InodeAttributes) error { + switch ino { + case fuseops.RootInodeID: + attrs.Nlink = 1 + attrs.Mode = 0555 | os.ModeDir + case currentTimeInode: + attrs.Nlink = 1 + attrs.Mode = 0444 + attrs.Size = uint64(timeLen + 1) // with newline + default: + return fuse.ENOENT + } + return nil +} + +func (fs *notifyStoreFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error { + if op.Parent != fuseops.RootInodeID { + return fuse.ENOENT + } + + switch op.Name { + case currentTimeFilename: + op.Entry.Child = currentTimeInode + fs.fillStat(currentTimeInode, &op.Entry.Attributes) + default: + return fuse.ENOENT + } + + distantFuture := time.Now().Add(time.Hour * 300) + op.Entry.AttributesExpiration = distantFuture + op.Entry.EntryExpiration = distantFuture + return nil +} + +func (fs *notifyStoreFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error { + return fs.fillStat(op.Inode, &op.Attributes) +} + +func (fs *notifyStoreFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error { + if op.Inode != fuseops.RootInodeID { + return fuse.ENOTDIR + } + + if op.Offset <= 0 { + op.BytesRead += fuseutil.WriteDirent(op.Dst[op.BytesRead:], fuseutil.Dirent{ + Offset: fuseops.DirOffset(1), + Inode: currentTimeInode, + Name: currentTimeFilename, + }) + } + return nil +} + +func (fs *notifyStoreFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error { + if op.Inode == fuseops.RootInodeID { + return syscall.EISDIR + } + if op.Inode != currentTimeInode { + // This should not happen + return fuse.EIO + } + if !op.OpenFlags.IsReadOnly() { + return syscall.EACCES + } + + // Make cache persistent even if the file is closed. This makes it easier to + // see the effects of invalidation. + op.KeepPageCache = true + + return nil +} + +func (fs *notifyStoreFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error { + if op.Inode != currentTimeInode { + return fuse.EIO + } + + fmt.Print("Direct read received, bypassing page cache") + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + contents := t.Format(time.RFC3339) + "\n" + + if op.Offset < int64(len(contents)) { + op.BytesRead = copy(op.Dst, contents[op.Offset:]) + } + return nil +} + +func (fs *notifyStoreFS) Destroy() { + close(fs.teardown) +} diff --git a/samples/notify_store/notify_store_test.go b/samples/notify_store/notify_store_test.go new file mode 100644 index 00000000..d2a3e473 --- /dev/null +++ b/samples/notify_store/notify_store_test.go @@ -0,0 +1,76 @@ +package notify_store_test + +import ( + "io/ioutil" + "os" + "path" + "testing" + "time" + + "github.com/jacobsa/fuse/fusetesting" + "github.com/jacobsa/fuse/samples" + "github.com/jacobsa/fuse/samples/notify_store" + . "github.com/jacobsa/ogletest" +) + +func TestNotifyStoreFS(t *testing.T) { RunTests(t) } + +func (t *NotifyStoreFSTest) setTime(tv time.Time) { + t.ticker.tickchan <- tv + t.expectedTime = <-t.ticker.tockchan +} + +func init() { + RegisterTestSuite(&NotifyStoreFSTest{}) +} + +type manualTicker struct { + tickchan chan time.Time + tockchan chan time.Time +} + +func (t *manualTicker) Ticks() <-chan time.Time { return t.tickchan } +func (t *manualTicker) Tocks() chan<- time.Time { return t.tockchan } + +type NotifyStoreFSTest struct { + samples.SampleTest + + ticker *manualTicker + expectedTime time.Time +} + +func (t *NotifyStoreFSTest) SetUp(ti *TestInfo) { + t.ticker = &manualTicker{ + tickchan: make(chan time.Time), + tockchan: make(chan time.Time), + } + t.Server = notify_store.NewNotifyStoreFS(t.ticker) + t.SampleTest.SetUp(ti) +} + +func (t *NotifyStoreFSTest) ReadDir_Root() { + entries, err := fusetesting.ReadDirPicky(t.Dir) + AssertEq(nil, err) + AssertEq(1, len(entries)) + + var fi os.FileInfo + fi = entries[0] + ExpectEq("current_time", fi.Name()) + ExpectEq(len(time.Time{}.Format(time.RFC3339))+1, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) +} + +func (t *NotifyStoreFSTest) ObserveTimeUpdate() { + oldTime := t.expectedTime.Format(time.RFC3339) + + slice, err := ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectEq(err, nil) + ExpectEq(oldTime+"\n", string(slice)) + + t.setTime(t.expectedTime.Add(time.Minute)) + + slice, err = ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectEq(err, nil) + ExpectNe(oldTime+"\n", string(slice)) +}