diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index a9b83eea715..fb09c593dae 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -61,21 +61,47 @@ type DataSource interface { Configure(ctx context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error // Complete the YAML datasource configuration and perform runtime checks. } -type Fetcher interface { +// BatchFetcher represents a data source that produces a finite set of events. +// +// Implementations should: +// +// - send events to the output channel until the input is fully consumed +// - return (nil) early when the context is canceled +// - return errors if acquisition fails +type BatchFetcher interface { // Start one shot acquisition(eg, cat a file) - OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error + OneShot(ctx context.Context, out chan pipeline.Event) error } -type Tailer interface { - // Start live acquisition (eg, tail a file) - StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error +// Fetcher works like BatchFetcher but still relies on tombs, which are being replaced by context cancellation. +// New datasources are expected to implement BatchFetcher instead. +type Fetcher interface { + OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error } -// RestartableStreamer works Like Tailer but should return any error and leave the retry logic to the caller +// RestartableStreamer represents a data source that produces an ongoing, potentially unbounded stream of events. +// +// Implementations should: +// +// - send events to the output channel, continuously +// - return (nil) when the context is canceled +// - return errors if acquisition fails +// - as much as possible, do not attempt retry/backoff even for transient connection +// failures, but treat them as errors. The caller is responsible for supervising +// Stream(), and restarting it as needed. There is currently no way to differentiate +// retryable vs permanent errors. type RestartableStreamer interface { + // Start live acquisition (eg, tail a file) Stream(ctx context.Context, out chan pipeline.Event) error } +// Tailer has the same pupose as RestartableStreamer (provide ongoing events) but +// is responsible for spawning its own goroutines, and handling errors and retries. +// New datasources are expected to implement RestartableStreamer instead. +type Tailer interface { + StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error +} + type MetricsProvider interface { // Returns pointers to metrics that are managed by the module GetMetrics() []prometheus.Collector @@ -473,6 +499,16 @@ func transform(transformChan chan pipeline.Event, output chan pipeline.Event, ac } } +func runBatchFetcher(ctx context.Context, bf BatchFetcher, output chan pipeline.Event, acquisTomb *tomb.Tomb) error { + // wrap tomb logic with context + ctx, cancel := context.WithCancel(ctx) + go func() { + <-acquisTomb.Dying() + cancel() + }() + + return bf.OneShot(ctx, output) +} func runRestartableStream(ctx context.Context, rs RestartableStreamer, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error { // wrap tomb logic with context @@ -521,10 +557,14 @@ func runRestartableStream(ctx context.Context, rs RestartableStreamer, name stri func acquireSource(ctx context.Context, source DataSource, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error { if source.GetMode() == configuration.CAT_MODE { + if s, ok := source.(BatchFetcher); ok { + // s.Logger.Info("Start OneShot") + return runBatchFetcher(ctx, s, output, acquisTomb) + } + if s, ok := source.(Fetcher); ok { // s.Logger.Info("Start OneShotAcquisition") return s.OneShotAcquisition(ctx, output, acquisTomb) - // s.Logger.Info("Exit OneShotAcquisition") } return fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", source.GetName()) diff --git a/pkg/acquisition/file.go b/pkg/acquisition/file.go index 4cfd5e4a8e2..445353502c9 100644 --- a/pkg/acquisition/file.go +++ b/pkg/acquisition/file.go @@ -10,7 +10,7 @@ var ( // verify interface compliance _ DataSource = (*fileacquisition.Source)(nil) _ DSNConfigurer = (*fileacquisition.Source)(nil) - _ Fetcher = (*fileacquisition.Source)(nil) + _ BatchFetcher = (*fileacquisition.Source)(nil) _ Tailer = (*fileacquisition.Source)(nil) _ MetricsProvider = (*fileacquisition.Source)(nil) ) diff --git a/pkg/acquisition/journalctl.go b/pkg/acquisition/journalctl.go index 1791b26fe2c..ac10b45912d 100644 --- a/pkg/acquisition/journalctl.go +++ b/pkg/acquisition/journalctl.go @@ -10,7 +10,7 @@ var ( // verify interface compliance _ DataSource = (*journalctlacquisition.Source)(nil) _ DSNConfigurer = (*journalctlacquisition.Source)(nil) - _ Fetcher = (*journalctlacquisition.Source)(nil) + _ BatchFetcher = (*journalctlacquisition.Source)(nil) _ RestartableStreamer = (*journalctlacquisition.Source)(nil) _ MetricsProvider = (*journalctlacquisition.Source)(nil) ) diff --git a/pkg/acquisition/modules/file/file_test.go b/pkg/acquisition/modules/file/file_test.go index c4274a4c0e7..6ed85075cba 100644 --- a/pkg/acquisition/modules/file/file_test.go +++ b/pkg/acquisition/modules/file/file_test.go @@ -221,7 +221,6 @@ filename: %s`, deletedFile), subLogger := logger.WithField("type", "file") - tomb := tomb.Tomb{} out := make(chan pipeline.Event, 100) f := fileacquisition.Source{} @@ -240,7 +239,7 @@ filename: %s`, deletedFile), tc.afterConfigure() } - err = f.OneShotAcquisition(ctx, out, &tomb) + err = f.OneShot(ctx, out) cstest.RequireErrorContains(t, err, tc.expectedErr) if tc.expectedLines != 0 { diff --git a/pkg/acquisition/modules/file/run.go b/pkg/acquisition/modules/file/run.go index 020e117d991..fa71ad8b002 100644 --- a/pkg/acquisition/modules/file/run.go +++ b/pkg/acquisition/modules/file/run.go @@ -28,8 +28,7 @@ import ( const defaultPollInterval = 30 * time.Second -// OneShotAcquisition reads a set of file and returns when done -func (s *Source) OneShotAcquisition(_ context.Context, out chan pipeline.Event, t *tomb.Tomb) error { +func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error { s.logger.Debug("In oneshot") for _, file := range s.files { @@ -45,7 +44,7 @@ func (s *Source) OneShotAcquisition(_ context.Context, out chan pipeline.Event, s.logger.Infof("reading %s at once", file) - err = s.readFile(file, out, t) + err = s.readFile(ctx, file, out) if err != nil { return err } @@ -367,7 +366,7 @@ func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail *tail.Tail } } -func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb) error { +func (s *Source) readFile(ctx context.Context, filename string, out chan pipeline.Event) error { var scanner *bufio.Scanner logger := s.logger.WithField("oneshot", filename) @@ -402,7 +401,7 @@ func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb for scanner.Scan() { select { - case <-t.Dying(): + case <-ctx.Done(): logger.Info("File datasource stopping") return nil default: @@ -428,13 +427,9 @@ func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb if err := scanner.Err(); err != nil { logger.Errorf("Error while reading file: %s", err) - t.Kill(err) - return err } - t.Kill(nil) - return nil } diff --git a/pkg/acquisition/modules/journalctl/journalctl_test.go b/pkg/acquisition/modules/journalctl/journalctl_test.go index f295499aaab..4c8fc9a61c7 100644 --- a/pkg/acquisition/modules/journalctl/journalctl_test.go +++ b/pkg/acquisition/modules/journalctl/journalctl_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" logtest "github.com/sirupsen/logrus/hooks/test" - "gopkg.in/tomb.v2" "github.com/crowdsecurity/go-cs-lib/cstest" @@ -141,7 +140,6 @@ journalctl_filter: }, } for _, ts := range tests { - tomb := tomb.Tomb{} out := make(chan pipeline.Event, 100) j := Source{} @@ -150,7 +148,7 @@ journalctl_filter: err := j.Configure(ctx, []byte(ts.config), logrus.NewEntry(logger), metrics.AcquisitionMetricsLevelNone) require.NoError(t, err) - err = j.OneShotAcquisition(ctx, out, &tomb) + err = j.OneShot(ctx, out) cstest.RequireErrorContains(t, err, ts.expectedErr) for _, expectedMessage := range ts.expectedLog { diff --git a/pkg/acquisition/modules/journalctl/run.go b/pkg/acquisition/modules/journalctl/run.go index 77c95ce5294..53004e29ffe 100644 --- a/pkg/acquisition/modules/journalctl/run.go +++ b/pkg/acquisition/modules/journalctl/run.go @@ -9,7 +9,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/prometheus/client_golang/prometheus" - "gopkg.in/tomb.v2" "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/metrics" @@ -18,18 +17,7 @@ import ( const journalctlCmd = "journalctl" -func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error { - if acquisTomb != nil { - tombCtx, cancel := context.WithCancel(ctx) - - go func() { - <-acquisTomb.Dying() - cancel() - }() - - ctx = tombCtx - } - +func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error { err := s.runJournalCtl(ctx, out) s.logger.Debug("Oneshot acquisition is done") diff --git a/pkg/acquisition/modules/wineventlog/run_windows.go b/pkg/acquisition/modules/wineventlog/run_windows.go index 4dc8fc753c0..6f323130424 100644 --- a/pkg/acquisition/modules/wineventlog/run_windows.go +++ b/pkg/acquisition/modules/wineventlog/run_windows.go @@ -119,7 +119,7 @@ func (s *Source) getEvents(out chan pipeline.Event, t *tomb.Tomb) error { } } -func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error { +func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error { handle, err := wevtapi.EvtQuery(localMachine, s.evtConfig.ChannelPath, s.evtConfig.Query, s.evtConfig.Flags) if err != nil { return fmt.Errorf("EvtQuery failed: %v", err) @@ -137,7 +137,7 @@ func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event OUTER_LOOP: for { select { - case <-t.Dying(): + case <-ctx.Done(): s.logger.Infof("wineventlog is dying") return nil default: diff --git a/pkg/acquisition/modules/wineventlog/stub.go b/pkg/acquisition/modules/wineventlog/stub.go index 0d4839b757b..548018e4c21 100644 --- a/pkg/acquisition/modules/wineventlog/stub.go +++ b/pkg/acquisition/modules/wineventlog/stub.go @@ -41,7 +41,7 @@ func (*Source) SupportedModes() []string { return []string{configuration.TAIL_MODE, configuration.CAT_MODE} } -func (*Source) OneShotAcquisition(_ context.Context, _ chan pipeline.Event, _ *tomb.Tomb) error { +func (*Source) OneShot(_ context.Context, _ chan pipeline.Event) error { return nil } diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go index a7f8d1e7436..96c37826581 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows_test.go @@ -243,7 +243,7 @@ event_ids: } } -func TestOneShotAcquisition(t *testing.T) { +func TestOneShot(t *testing.T) { ctx := t.Context() tests := []struct { @@ -289,7 +289,6 @@ func TestOneShotAcquisition(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lineCount := 0 - to := &tomb.Tomb{} c := make(chan pipeline.Event) f := Source{} @@ -304,13 +303,13 @@ func TestOneShotAcquisition(t *testing.T) { select { case <-c: lineCount++ - case <-to.Dying(): + case <-ctx.Done(): return } } }() - err = f.OneShotAcquisition(ctx, c, to) + err = f.OneShot(ctx, c) cstest.RequireErrorContains(t, err, test.expectedErr) if test.expectedErr != "" { diff --git a/pkg/acquisition/wineventlog.go b/pkg/acquisition/wineventlog.go index b62019b28e4..f9d72f859b5 100644 --- a/pkg/acquisition/wineventlog.go +++ b/pkg/acquisition/wineventlog.go @@ -10,7 +10,7 @@ var ( // verify interface compliance _ DataSource = (*wineventlogacquisition.Source)(nil) _ DSNConfigurer = (*wineventlogacquisition.Source)(nil) - _ Fetcher = (*wineventlogacquisition.Source)(nil) + _ BatchFetcher = (*wineventlogacquisition.Source)(nil) _ Tailer = (*wineventlogacquisition.Source)(nil) _ MetricsProvider = (*wineventlogacquisition.Source)(nil) )