Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions testdata/watch/sources/Taskfile.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: '3'

tasks:
default:
sources:
- "./**/*.txt"
cmds:
- echo "Task running!"
107 changes: 83 additions & 24 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/puzpuzpuz/xsync/v4"

"github.com/go-task/task/v3/errors"
"github.com/go-task/task/v3/internal/filepathext"
"github.com/go-task/task/v3/internal/fingerprint"
"github.com/go-task/task/v3/internal/fsnotifyext"
"github.com/go-task/task/v3/internal/logger"
Expand All @@ -25,6 +24,8 @@ import (

const defaultWaitTime = 100 * time.Millisecond

var refreshChan = make(chan string)

// watchTasks start watching the given tasks
func (e *Executor) watchTasks(calls ...*Call) error {
tasks := make([]string, len(calls))
Expand Down Expand Up @@ -68,44 +69,82 @@ func (e *Executor) watchTasks(calls ...*Call) error {

closeOnInterrupt(w)

watchFiles, err := e.collectSources(calls)
if err != nil {
cancel()
return err
}
go func() {
for {
select {
case path := <-refreshChan:
// If a path is added its necessary to refresh the sources, otherwise the
// watcher may not pick up any changes in that new path.
_ = path
watchFiles, err = e.collectSources(calls)
if err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
continue
}

case event, ok := <-eventsChan:
if !ok {
cancel()
return
}
e.Logger.VerboseErrf(logger.Magenta, "task: received watch event: %v\n", event)

cancel()
ctx, cancel = context.WithCancel(context.Background())

e.Compiler.ResetCache()

for _, c := range calls {
go func() {
if ShouldIgnore(event.Name) {
e.Logger.VerboseErrf(logger.Magenta, "task: event skipped for being an ignored dir: %s\n", event.Name)
return
// Check if this watch event should be ignored.
if ShouldIgnore(event.Name) {
e.Logger.VerboseErrf(logger.Magenta, "task: event skipped for being an ignored dir: %s\n", event.Name)
continue
}
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) || event.Has(fsnotify.Write) {
if !slices.Contains(watchFiles, event.Name) {
relPath := event.Name
if rel, err := filepath.Rel(e.Dir, event.Name); err == nil {
relPath = rel
}
t, err := e.GetTask(c)
if err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
return
e.Logger.VerboseErrf(logger.Magenta, "task: skipped for file not in sources: %s\n", relPath)
continue
}
}
if event.Has(fsnotify.Create) {
createDir := false
if info, err := os.Stat(event.Name); err == nil {
if info.IsDir() {
createDir = true
}
baseDir := filepathext.SmartJoin(e.Dir, t.Dir)
files, err := e.collectSources(calls)
if err != nil {
}
watchFiles, err = e.collectSources(calls)
if err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
continue
}

if createDir {
// If the CREATE relates to a folder, update the registered watch dirs (immediately).
if err := e.registerWatchedDirs(w, calls...); err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
return
}

if !event.Has(fsnotify.Remove) && !slices.Contains(files, event.Name) {
relPath, _ := filepath.Rel(baseDir, event.Name)
} else {
if !slices.Contains(watchFiles, event.Name) {
relPath := event.Name
if rel, err := filepath.Rel(e.Dir, event.Name); err == nil {
relPath = rel
}
e.Logger.VerboseErrf(logger.Magenta, "task: skipped for file not in sources: %s\n", relPath)
return
continue
}
}
}

// The watch event is good, restart the task calls.
cancel()
ctx, cancel = context.WithCancel(context.Background())
e.Compiler.ResetCache()
for _, c := range calls {
go func() {
err = e.RunTask(ctx, c)
if err == nil {
e.Logger.Errf(logger.Green, "task: task \"%s\" finished running\n", c.Task)
Expand Down Expand Up @@ -167,8 +206,25 @@ func (e *Executor) registerWatchedDirs(w *fsnotify.Watcher, calls ...*Call) erro
if err != nil {
return err
}
dirs := []string{}
for _, f := range files {
d := filepath.Dir(f)
dir := filepath.Dir(f)
if !slices.Contains(dirs, dir) {
dirs = append(dirs, dir)
}
}

// Remove dirs from the watch, otherwise the watched dir may become stale and
// if the dir is recreated, it will not trigger any watch events.
e.watchedDirs.Range(func(dir string, value bool) bool {
if !slices.Contains(dirs, dir) {
e.watchedDirs.Delete(dir)
}
return true
})

// Add new dirs to the watch.
for _, d := range dirs {
if isSet, ok := e.watchedDirs.Load(d); ok && isSet {
continue
}
Expand All @@ -181,6 +237,9 @@ func (e *Executor) registerWatchedDirs(w *fsnotify.Watcher, calls ...*Call) erro
e.watchedDirs.Store(d, true)
relPath, _ := filepath.Rel(e.Dir, d)
e.Logger.VerboseOutf(logger.Green, "task: watching new dir: %v\n", relPath)

// Signal that the watcher should refresh its watch file list.
refreshChan <- d
}
return nil
}
Expand Down
131 changes: 131 additions & 0 deletions watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -100,3 +102,132 @@ func TestShouldIgnore(t *testing.T) {
})
}
}

func TestWatchSources(t *testing.T) {
t.Parallel()

tests := []struct {
action string
path string
expectRestart bool
}{
// Entry condition: file fubar/foo.txt exists.
{"create", "fubar/bar.txt", true},
{"remove", "fubar/foo.txt", true},
{"rename", "fubar/foo.txt", true},
{"write", "fubar/foo.txt", true},
{"create", "fubar/bar.text", false},
{"remove", "fubar/foo.text", false},
{"rename", "fubar/foo.text", false},
{"write", "fubar/foo.text", false},
}

for _, tc := range tests {
tc := tc
t.Run(fmt.Sprintf("%s-%s", tc.action, tc.path), func(t *testing.T) {
t.Parallel()

checks := []string{`Started watching for tasks: default`, `echo "Task running!"`}

// Setup the watch dir.
tmpDir := t.TempDir()
data, _ := os.ReadFile("testdata/watch/sources/Taskfile.yaml")
os.WriteFile(filepath.Join(tmpDir, "Taskfile.yaml"), data, 0644)
testFile := filepath.Join(tmpDir, "fubar/foo.txt")
os.MkdirAll(filepath.Dir(testFile), 0755)
os.WriteFile(testFile, []byte("hello world"), 0644)

// Correct test case paths.
tc.path = filepath.Join(tmpDir, tc.path)

// Start the Task.
var buf bytes.Buffer
e := task.NewExecutor(
task.WithDir(tmpDir),
task.WithStdout(&buf),
task.WithStderr(&buf),
task.WithWatch(true),
task.WithVerbose(true),
)
require.NoError(t, e.Setup())
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-ctx.Done():
return
default:
err := e.Run(ctx, &task.Call{Task: "default"})
if err != nil {
panic(err)
}
}
}
}()

// Introduce the test condition.
time.Sleep(200 * time.Millisecond)
switch tc.action {
case "create":
f, _ := os.OpenFile(tc.path, os.O_CREATE|os.O_WRONLY, 0644)
defer f.Close()
f.WriteString("watch test")
checks = append(checks, `watch event: CREATE`)

case "remove":
if !tc.expectRestart {
f, _ := os.OpenFile(tc.path, os.O_CREATE|os.O_WRONLY, 0644)
f.Close()
time.Sleep(100 * time.Millisecond)
checks = append(checks, `watch event: CREATE`)
}
os.Remove(tc.path)
checks = append(checks, `watch event: REMOVE`)

case "rename":
if !tc.expectRestart {
f, _ := os.OpenFile(tc.path, os.O_CREATE|os.O_WRONLY, 0644)
f.Close()
time.Sleep(100 * time.Millisecond)
checks = append(checks, `watch event: CREATE`)
}
dir := filepath.Dir(tc.path)
base := filepath.Base(tc.path)
ext := filepath.Ext(base)
name := base[:len(base)-len(ext)]
_b := []byte(name)
slices.Reverse(_b)
name = string(_b)
os.Rename(tc.path, filepath.Join(dir, name+ext))
checks = append(checks, `watch event: RENAME`)

case "write":
f, _ := os.OpenFile(tc.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
defer f.Close()
f.WriteString("watch test")
checks = append(checks, `watch event: WRITE`)
}

// Observe the expected conditions.
time.Sleep(200 * time.Millisecond)
cancel()
if tc.expectRestart {
checks = append(checks, `echo "Task running!"`)
} else {
checks = append(checks, `skipped for file not in sources:`)
}

output := buf.String()
t.Log(output)
for _, check := range checks {
if idx := strings.Index(output, check); idx == -1 {
t.Log(output)
t.Log(checks)
t.Fatalf("Expected output not observed in sequence: %s", check)
} else {
output = output[idx+len(check):]
}
}
})
}
}
Loading