Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,47 @@ type DataSource interface {
Configure(ctx context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error // Complete the YAML datasource configuration and perform runtime checks.
}

type Fetcher interface {
// BatchFetcher represents a data source that produces a finite set of events.
//
// Implementations should:
//
// - send events to the output channel until the input is fully consumed
// - return (nil) early when the context is canceled
// - return errors if acquisition fails
type BatchFetcher interface {
// Start one shot acquisition(eg, cat a file)
OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
OneShot(ctx context.Context, out chan pipeline.Event) error
}

type Tailer interface {
// Start live acquisition (eg, tail a file)
StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
// Fetcher works like BatchFetcher but still relies on tombs, which are being replaced by context cancellation.
// New datasources are expected to implement BatchFetcher instead.
type Fetcher interface {
OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
}

// RestartableStreamer works Like Tailer but should return any error and leave the retry logic to the caller
// RestartableStreamer represents a data source that produces an ongoing, potentially unbounded stream of events.
//
// Implementations should:
//
// - send events to the output channel, continuously
// - return (nil) when the context is canceled
// - return errors if acquisition fails
// - as much as possible, do not attempt retry/backoff even for transient connection
// failures, but treat them as errors. The caller is responsible for supervising
// Stream(), and restarting it as needed. There is currently no way to differentiate
// retryable vs permanent errors.
type RestartableStreamer interface {
// Start live acquisition (eg, tail a file)
Stream(ctx context.Context, out chan pipeline.Event) error
}

// Tailer has the same pupose as RestartableStreamer (provide ongoing events) but
// is responsible for spawning its own goroutines, and handling errors and retries.
// New datasources are expected to implement RestartableStreamer instead.
type Tailer interface {
StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
}

type MetricsProvider interface {
// Returns pointers to metrics that are managed by the module
GetMetrics() []prometheus.Collector
Expand Down Expand Up @@ -473,6 +499,16 @@ func transform(transformChan chan pipeline.Event, output chan pipeline.Event, ac
}
}

func runBatchFetcher(ctx context.Context, bf BatchFetcher, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
// wrap tomb logic with context
ctx, cancel := context.WithCancel(ctx)
go func() {
<-acquisTomb.Dying()
cancel()
}()

return bf.OneShot(ctx, output)
}

func runRestartableStream(ctx context.Context, rs RestartableStreamer, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
// wrap tomb logic with context
Expand Down Expand Up @@ -521,10 +557,14 @@ func runRestartableStream(ctx context.Context, rs RestartableStreamer, name stri

func acquireSource(ctx context.Context, source DataSource, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
if source.GetMode() == configuration.CAT_MODE {
if s, ok := source.(BatchFetcher); ok {
// s.Logger.Info("Start OneShot")
return runBatchFetcher(ctx, s, output, acquisTomb)
}

if s, ok := source.(Fetcher); ok {
// s.Logger.Info("Start OneShotAcquisition")
return s.OneShotAcquisition(ctx, output, acquisTomb)
// s.Logger.Info("Exit OneShotAcquisition")
}

return fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", source.GetName())
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ var (
// verify interface compliance
_ DataSource = (*fileacquisition.Source)(nil)
_ DSNConfigurer = (*fileacquisition.Source)(nil)
_ Fetcher = (*fileacquisition.Source)(nil)
_ BatchFetcher = (*fileacquisition.Source)(nil)
_ Tailer = (*fileacquisition.Source)(nil)
_ MetricsProvider = (*fileacquisition.Source)(nil)
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ var (
// verify interface compliance
_ DataSource = (*journalctlacquisition.Source)(nil)
_ DSNConfigurer = (*journalctlacquisition.Source)(nil)
_ Fetcher = (*journalctlacquisition.Source)(nil)
_ BatchFetcher = (*journalctlacquisition.Source)(nil)
_ RestartableStreamer = (*journalctlacquisition.Source)(nil)
_ MetricsProvider = (*journalctlacquisition.Source)(nil)
)
Expand Down
3 changes: 1 addition & 2 deletions pkg/acquisition/modules/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ filename: %s`, deletedFile),

subLogger := logger.WithField("type", "file")

tomb := tomb.Tomb{}
out := make(chan pipeline.Event, 100)
f := fileacquisition.Source{}

Expand All @@ -240,7 +239,7 @@ filename: %s`, deletedFile),
tc.afterConfigure()
}

err = f.OneShotAcquisition(ctx, out, &tomb)
err = f.OneShot(ctx, out)
cstest.RequireErrorContains(t, err, tc.expectedErr)

if tc.expectedLines != 0 {
Expand Down
13 changes: 4 additions & 9 deletions pkg/acquisition/modules/file/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import (

const defaultPollInterval = 30 * time.Second

// OneShotAcquisition reads a set of file and returns when done
func (s *Source) OneShotAcquisition(_ context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error {
s.logger.Debug("In oneshot")

for _, file := range s.files {
Expand All @@ -45,7 +44,7 @@ func (s *Source) OneShotAcquisition(_ context.Context, out chan pipeline.Event,

s.logger.Infof("reading %s at once", file)

err = s.readFile(file, out, t)
err = s.readFile(ctx, file, out)
if err != nil {
return err
}
Expand Down Expand Up @@ -367,7 +366,7 @@ func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail *tail.Tail
}
}

func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb) error {
func (s *Source) readFile(ctx context.Context, filename string, out chan pipeline.Event) error {
var scanner *bufio.Scanner

logger := s.logger.WithField("oneshot", filename)
Expand Down Expand Up @@ -402,7 +401,7 @@ func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb

for scanner.Scan() {
select {
case <-t.Dying():
case <-ctx.Done():
logger.Info("File datasource stopping")
return nil
default:
Expand All @@ -428,13 +427,9 @@ func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb

if err := scanner.Err(); err != nil {
logger.Errorf("Error while reading file: %s", err)
t.Kill(err)

return err
}

t.Kill(nil)

return nil
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/acquisition/modules/journalctl/journalctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
logtest "github.com/sirupsen/logrus/hooks/test"
"gopkg.in/tomb.v2"

"github.com/crowdsecurity/go-cs-lib/cstest"

Expand Down Expand Up @@ -141,7 +140,6 @@ journalctl_filter:
},
}
for _, ts := range tests {
tomb := tomb.Tomb{}
out := make(chan pipeline.Event, 100)
j := Source{}

Expand All @@ -150,7 +148,7 @@ journalctl_filter:
err := j.Configure(ctx, []byte(ts.config), logrus.NewEntry(logger), metrics.AcquisitionMetricsLevelNone)
require.NoError(t, err)

err = j.OneShotAcquisition(ctx, out, &tomb)
err = j.OneShot(ctx, out)
cstest.RequireErrorContains(t, err, ts.expectedErr)

for _, expectedMessage := range ts.expectedLog {
Expand Down
14 changes: 1 addition & 13 deletions pkg/acquisition/modules/journalctl/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/prometheus/client_golang/prometheus"
"gopkg.in/tomb.v2"

"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/metrics"
Expand All @@ -18,18 +17,7 @@ import (

const journalctlCmd = "journalctl"

func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error {
if acquisTomb != nil {
tombCtx, cancel := context.WithCancel(ctx)

go func() {
<-acquisTomb.Dying()
cancel()
}()

ctx = tombCtx
}

func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error {
err := s.runJournalCtl(ctx, out)
s.logger.Debug("Oneshot acquisition is done")

Expand Down
4 changes: 2 additions & 2 deletions pkg/acquisition/modules/wineventlog/run_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Source) getEvents(out chan pipeline.Event, t *tomb.Tomb) error {
}
}

func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error {
handle, err := wevtapi.EvtQuery(localMachine, s.evtConfig.ChannelPath, s.evtConfig.Query, s.evtConfig.Flags)
if err != nil {
return fmt.Errorf("EvtQuery failed: %v", err)
Expand All @@ -137,7 +137,7 @@ func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event
OUTER_LOOP:
for {
select {
case <-t.Dying():
case <-ctx.Done():
s.logger.Infof("wineventlog is dying")
return nil
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/wineventlog/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (*Source) SupportedModes() []string {
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
}

func (*Source) OneShotAcquisition(_ context.Context, _ chan pipeline.Event, _ *tomb.Tomb) error {
func (*Source) OneShot(_ context.Context, _ chan pipeline.Event) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ event_ids:
}
}

func TestOneShotAcquisition(t *testing.T) {
func TestOneShot(t *testing.T) {
ctx := t.Context()

tests := []struct {
Expand Down Expand Up @@ -289,7 +289,6 @@ func TestOneShotAcquisition(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
lineCount := 0
to := &tomb.Tomb{}
c := make(chan pipeline.Event)
f := Source{}

Expand All @@ -304,13 +303,13 @@ func TestOneShotAcquisition(t *testing.T) {
select {
case <-c:
lineCount++
case <-to.Dying():
case <-ctx.Done():
return
}
}
}()

err = f.OneShotAcquisition(ctx, c, to)
err = f.OneShot(ctx, c)
cstest.RequireErrorContains(t, err, test.expectedErr)

if test.expectedErr != "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ var (
// verify interface compliance
_ DataSource = (*wineventlogacquisition.Source)(nil)
_ DSNConfigurer = (*wineventlogacquisition.Source)(nil)
_ Fetcher = (*wineventlogacquisition.Source)(nil)
_ BatchFetcher = (*wineventlogacquisition.Source)(nil)
_ Tailer = (*wineventlogacquisition.Source)(nil)
_ MetricsProvider = (*wineventlogacquisition.Source)(nil)
)
Expand Down