diff --git a/connection.go b/connection.go index 8ec027fd..728099ef 100644 --- a/connection.go +++ b/connection.go @@ -378,6 +378,23 @@ func (c *Connection) readMessage() (*buffer.InMessage, error) { } } +// Write a buffer.OutMessage to the kernel, with writev if vectored IO is useful +// and write if not. +func (c *Connection) writeOutMessage(outMsg *buffer.OutMessage) error { + var err error + if outMsg.Sglist != nil { + if fusekernel.IsPlatformFuseT { + // writev is not atomic on macos, restrict to fuse-t platform + writeLock.Lock() + defer writeLock.Unlock() + } + _, err = writev(int(c.dev.Fd()), outMsg.Sglist) + } else { + err = c.writeMessage(outMsg.OutHeaderBytes()) + } + return err +} + // Write the supplied message to the kernel. func (c *Connection) writeMessage(msg []byte) error { // Avoid the retry loop in os.File.Write. @@ -535,17 +552,7 @@ func (c *Connection) Reply(ctx context.Context, opErr error) error { noResponse := c.kernelResponse(outMsg, inMsg.Header().Unique, op, opErr) if !noResponse { - var err error - if outMsg.Sglist != nil { - if fusekernel.IsPlatformFuseT { - // writev is not atomic on macos, restrict to fuse-t platform - writeLock.Lock() - defer writeLock.Unlock() - } - _, err = writev(int(c.dev.Fd()), outMsg.Sglist) - } else { - err = c.writeMessage(outMsg.OutHeaderBytes()) - } + err := c.writeOutMessage(outMsg) if err != nil { writeErrMsg := fmt.Sprintf("writeMessage: %v %v", err, outMsg.OutHeaderBytes()) if c.errorLogger != nil { diff --git a/notifier.go b/notifier.go new file mode 100644 index 00000000..e9ea38bc --- /dev/null +++ b/notifier.go @@ -0,0 +1,132 @@ +package fuse + +import ( + "unsafe" + + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/internal/fusekernel" +) + +// Notifier coordinates low-level notifications from the fuse daemon to the +// kernel. A Notifier may be used by the ServeOps implementation of a Server. In +// order to deliver notifications, wrap the server with NewServerWithNotifier. +type Notifier struct { + inodeInvalidations chan invalidateInodeCommand + dentryInvalidations chan invalidateEntryCommand +} + +func NewNotifier() *Notifier { + return &Notifier{ + inodeInvalidations: make(chan invalidateInodeCommand), + dentryInvalidations: make(chan invalidateEntryCommand), + } +} + +type invalidateInodeCommand struct { + inode fuseops.InodeID + offset int64 + length int64 + done chan<- error +} + +type invalidateEntryCommand struct { + parent fuseops.InodeID + name string + // If fusekernel.NotifyInvalEntryOut is updated to use its padding as flags, + // we can support the expire flag in this command as well. + done chan<- error +} + +// InvalidateInode notifies the kernel to invalidate an inode cache entry. See +// the libfuse documentation at +// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#a9cb974af9745294ff446d11cba2422f1 +// for more details. +// +// InvalidateInode blocks until the kernel write completes, and returns the +// error from the kernel, if any. ENOSYS indicates that the kernel does not +// support inode invalidations. +func (n *Notifier) InvalidateInode(inode fuseops.InodeID, offset, length int64) error { + done := make(chan error) + n.inodeInvalidations <- invalidateInodeCommand{inode, offset, length, done} + return <-done +} + +// InvalidateEntry notifies to the kernel to invalidate a dentry cache entry. +// See the libfuse documentation at +// https://libfuse.github.io/doxygen/fuse__lowlevel_8h.html#ab14032b74b0a57a2b3155dd6ba8d6095 +// for more details. +// +// InvalidateEntry blocks until the kernel write completes, and returns the +// error from the kernel, if any. ENOSYS indicates that the kernel does not +// support dentry invalidations. +func (n *Notifier) InvalidateEntry(parent fuseops.InodeID, name string) error { + done := make(chan error) + n.dentryInvalidations <- invalidateEntryCommand{parent, name, done} + return <-done +} + +func serviceInodeInvalidation(c *Connection, inode fuseops.InodeID, offset, length int64) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + + cmd := fusekernel.NotifyInvalInodeOut{ + Ino: uint64(inode), + Off: offset, + Len: length, + } + outMsg.Append(unsafe.Slice((*byte)(unsafe.Pointer(&cmd)), int(unsafe.Sizeof(cmd)))) + + outMsg.OutHeader().Error = fusekernel.NotifyCodeInvalInode + outMsg.OutHeader().Len = uint32(outMsg.Len()) + + return c.writeOutMessage(outMsg) +} + +func serviceEntryInval(c *Connection, parent fuseops.InodeID, name string) error { + outMsg := c.getOutMessage() + defer c.putOutMessage(outMsg) + + cmd := fusekernel.NotifyInvalEntryOut{ + Parent: uint64(parent), + Namelen: uint32(len(name)), + } + outMsg.Append(unsafe.Slice((*byte)(unsafe.Pointer(&cmd)), int(unsafe.Sizeof(cmd)))) + + // The name must be represented as a C string with a null-terminator. + outMsg.AppendString(name) + outMsg.Append([]byte{0}) + + outMsg.OutHeader().Error = fusekernel.NotifyCodeInvalEntry + outMsg.OutHeader().Len = uint32(outMsg.Len()) + return c.writeOutMessage(outMsg) +} + +func (n *Notifier) notify(c *Connection, terminate <-chan struct{}) { + for { + select { + case i := <-n.inodeInvalidations: + i.done <- serviceInodeInvalidation(c, i.inode, i.offset, i.length) + case e := <-n.dentryInvalidations: + e.done <- serviceEntryInval(c, e.parent, e.name) + case <-terminate: + return + } + } +} + +type notifierServer struct { + n *Notifier + s Server +} + +func (s *notifierServer) ServeOps(c *Connection) { + terminate := make(chan struct{}) + + go s.n.notify(c, terminate) + s.s.ServeOps(c) + close(terminate) +} + +func NewServerWithNotifier(n *Notifier, s Server) Server { + return ¬ifierServer{n, s} +} diff --git a/samples/mount_notify_inval/mount.go b/samples/mount_notify_inval/mount.go new file mode 100644 index 00000000..2a162ac2 --- /dev/null +++ b/samples/mount_notify_inval/mount.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "flag" + "log" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/samples/notify_inval" +) + +var mountPoint = flag.String("mountpoint", "", "directory to mount the filesystem") + +type ticker struct { + *time.Ticker +} + +func (t *ticker) Ticks() <-chan time.Time { + return t.Ticker.C +} + +func (t *ticker) Tocks() chan<- time.Time { return nil } + +func main() { + flag.Parse() + + if *mountPoint == "" { + log.Fatalf("--mountpoint is required") + } + + t := &ticker{time.NewTicker(time.Second)} + server := notify_inval.NewNotifyInvalFS(t) + mfs, err := fuse.Mount(*mountPoint, server, &fuse.MountConfig{}) + if err != nil { + panic(err) + } + if err := mfs.Join(context.Background()); err != nil { + panic(err) + } +} diff --git a/samples/notify_inval/notify_inval.go b/samples/notify_inval/notify_inval.go new file mode 100644 index 00000000..32f52a53 --- /dev/null +++ b/samples/notify_inval/notify_inval.go @@ -0,0 +1,217 @@ +package notify_inval + +import ( + "context" + "fmt" + "os" + "sync" + "syscall" + "time" + + "github.com/jacobsa/fuse" + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/fuseutil" +) + +var timeLen = len(time.Time{}.Format(time.RFC3339)) + +// NotifyTimer may emit times on Ticks() to trigger filesystem changes. The +// fuse.Server emits the same times in the same order on Tocks(), if not nil, to +// indicate that invalidation is complete. +type NotifyTimer interface { + Ticks() <-chan time.Time + Tocks() chan<- time.Time +} + +// Create a file system with two files: +// One is empty, and its name is the current time. +// The other is named 'current_time' and always contains the current time. +// +// This filesystem is an analog to the libfuse examples here: +// https://github.com/libfuse/libfuse/blob/e75d2c54a347906478724be24bfa1df2638094cb/example/notify_inval_inode.c +// https://github.com/libfuse/libfuse/blob/e75d2c54a347906478724be24bfa1df2638094cb/example/notify_inval_entry.c +// +// Unlike package dynamicfs, this implementation does _not_ depend on direct IO. +// The invalidations allow file operations to eventually observe the changes. +func NewNotifyInvalFS(t NotifyTimer) fuse.Server { + n := fuse.NewNotifier() + fs := ¬ifyInvalInodeFS{ + notifier: n, + teardown: make(chan struct{}), + } + + ticks := t.Ticks() + tocks := t.Tocks() + go func() { + for { + select { + case t := <-ticks: + fs.mu.Lock() + oldtime := fs.currentTime + fs.currentTime = t + fs.mu.Unlock() + fs.invalidateInodes(oldtime) + if tocks != nil { + tocks <- t + } + case <-fs.teardown: + return + } + } + }() + + return fuse.NewServerWithNotifier(n, fuseutil.NewFileSystemServer(fs)) +} + +type notifyInvalInodeFS struct { + fuseutil.NotImplementedFileSystem + + notifier *fuse.Notifier + teardown chan struct{} + + mu sync.Mutex + // GUARDED_BY(mu) + currentTime time.Time +} + +const ( + currentTimeFilename = "current_time" + + currentTimeInode = fuseops.RootInodeID + iota + changingFnameInode +) + +func (fs *notifyInvalInodeFS) invalidateInodes(oldTime time.Time) { + // Invalidate inode cache and dcache for both dynamic files. + if err := fs.notifier.InvalidateInode(currentTimeInode, 0, 0); err != nil { + fmt.Printf("error invalidating current_time inode %v: %v\n", currentTimeInode, err) + } + if err := fs.notifier.InvalidateEntry(fuseops.RootInodeID, currentTimeFilename); err != nil { + fmt.Printf("error invalidating current_time entry %v for parent %v: %v\n", currentTimeFilename, fuseops.RootInodeID, err) + } + + if err := fs.notifier.InvalidateInode(changingFnameInode, 0, 0); err != nil { + fmt.Printf("error invalidating dynamic filename inode %v: %v\n", changingFnameInode, err) + } + if err := fs.notifier.InvalidateEntry(fuseops.RootInodeID, oldTime.Format(time.RFC3339)); err != nil { + fmt.Printf("error invalidating dynamic filename entry for parent %v: %v\n", fuseops.RootInodeID, err) + } +} + +func (fs *notifyInvalInodeFS) fillStat(ino fuseops.InodeID, attrs *fuseops.InodeAttributes) error { + switch ino { + case fuseops.RootInodeID: + attrs.Nlink = 1 + attrs.Mode = 0555 | os.ModeDir + case currentTimeInode: + attrs.Nlink = 1 + attrs.Mode = 0444 + attrs.Size = uint64(timeLen + 1) // with newline + case changingFnameInode: + attrs.Nlink = 1 + attrs.Mode = 0444 + default: + return fuse.ENOENT + } + return nil +} + +func (fs *notifyInvalInodeFS) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error { + if op.Parent != fuseops.RootInodeID { + return fuse.ENOENT + } + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + switch op.Name { + case currentTimeFilename: + op.Entry.Child = currentTimeInode + fs.fillStat(currentTimeInode, &op.Entry.Attributes) + case t.Format(time.RFC3339): + op.Entry.Child = changingFnameInode + fs.fillStat(changingFnameInode, &op.Entry.Attributes) + default: + return fuse.ENOENT + } + + distantFuture := time.Now().Add(time.Hour * 300) + op.Entry.AttributesExpiration = distantFuture + op.Entry.EntryExpiration = distantFuture + return nil +} + +func (fs *notifyInvalInodeFS) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error { + return fs.fillStat(op.Inode, &op.Attributes) +} + +func (fs *notifyInvalInodeFS) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error { + if op.Inode != fuseops.RootInodeID { + return fuse.ENOTDIR + } + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + if op.Offset <= 0 { + op.BytesRead += fuseutil.WriteDirent(op.Dst[op.BytesRead:], fuseutil.Dirent{ + Offset: fuseops.DirOffset(1), + Inode: currentTimeInode, + Name: currentTimeFilename, + }) + } + if op.Offset <= 1 { + op.BytesRead += fuseutil.WriteDirent(op.Dst[op.BytesRead:], fuseutil.Dirent{ + Offset: fuseops.DirOffset(2), + Inode: changingFnameInode, + Name: t.Format(time.RFC3339), + }) + } + return nil +} + +func (fs *notifyInvalInodeFS) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error { + if op.Inode == fuseops.RootInodeID { + return syscall.EISDIR + } + if op.Inode == changingFnameInode { + // No access to the changing filename contents + return syscall.EACCES + } + if op.Inode != currentTimeInode { + // This should not happen + return fuse.EIO + } + if !op.OpenFlags.IsReadOnly() { + return syscall.EACCES + } + + // Make cache persistent even if the file is closed. This makes it easier to + // see the effects of invalidation. + op.KeepPageCache = true + + return nil +} + +func (fs *notifyInvalInodeFS) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error { + if op.Inode != currentTimeInode { + return fuse.EIO + } + + fs.mu.Lock() + t := fs.currentTime + fs.mu.Unlock() + + contents := t.Format(time.RFC3339) + "\n" + + if op.Offset < int64(len(contents)) { + op.BytesRead = copy(op.Dst, contents[op.Offset:]) + } + return nil +} + +func (fs *notifyInvalInodeFS) Destroy() { + close(fs.teardown) +} diff --git a/samples/notify_inval/notify_inval_test.go b/samples/notify_inval/notify_inval_test.go new file mode 100644 index 00000000..951c4f3f --- /dev/null +++ b/samples/notify_inval/notify_inval_test.go @@ -0,0 +1,85 @@ +package notify_inval_test + +import ( + "io/ioutil" + "os" + "path" + "testing" + "time" + + "github.com/jacobsa/fuse/fusetesting" + "github.com/jacobsa/fuse/samples" + "github.com/jacobsa/fuse/samples/notify_inval" + + . "github.com/jacobsa/ogletest" +) + +func TestNotifyInvalFS(t *testing.T) { RunTests(t) } + +func (t *NotifyInvalFSTest) setTime(tv time.Time) { + t.ticker.tickchan <- tv + t.expectedTime = <-t.ticker.tockchan +} + +func init() { + RegisterTestSuite(&NotifyInvalFSTest{}) +} + +type manualTicker struct { + tickchan chan time.Time + tockchan chan time.Time +} + +func (t *manualTicker) Ticks() <-chan time.Time { return t.tickchan } +func (t *manualTicker) Tocks() chan<- time.Time { return t.tockchan } + +type NotifyInvalFSTest struct { + samples.SampleTest + + ticker *manualTicker + expectedTime time.Time +} + +func (t *NotifyInvalFSTest) SetUp(ti *TestInfo) { + t.ticker = &manualTicker{ + tickchan: make(chan time.Time), + tockchan: make(chan time.Time), + } + t.Server = notify_inval.NewNotifyInvalFS(t.ticker) + t.SampleTest.SetUp(ti) +} + +func (t *NotifyInvalFSTest) ReadDir_Root() { + entries, err := fusetesting.ReadDirPicky(t.Dir) + AssertEq(nil, err) + AssertEq(2, len(entries)) + + var fi os.FileInfo + fi = entries[0] + ExpectEq(t.expectedTime.Format(time.RFC3339), fi.Name()) + ExpectEq(0, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) + + fi = entries[1] + ExpectEq("current_time", fi.Name()) + ExpectEq(len(time.Time{}.Format(time.RFC3339))+1, fi.Size()) + ExpectEq(0444, fi.Mode()) + ExpectFalse(fi.IsDir()) +} + +func (t *NotifyInvalFSTest) ObserveTimeUpdate() { + oldTime := t.expectedTime.Format(time.RFC3339) + + _, err := os.Stat(path.Join(t.Dir, oldTime)) + AssertEq(nil, err) + slice, err := ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectEq(oldTime+"\n", string(slice)) + + t.setTime(t.expectedTime.Add(time.Minute)) + + _, err = os.Stat(path.Join(t.Dir, oldTime)) + AssertNe(nil, err) + slice, err = ioutil.ReadFile(path.Join(t.Dir, "current_time")) + ExpectNe(oldTime+"\n", string(slice)) +}