Skip to content

Commit 71b2090

Browse files
committed
acquisition refact: context-aware OneShot(), for file + journalctl
1 parent 8f4f869 commit 71b2090

File tree

7 files changed

+56
-36
lines changed

7 files changed

+56
-36
lines changed

pkg/acquisition/acquisition.go

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,21 +61,47 @@ type DataSource interface {
6161
Configure(ctx context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error // Complete the YAML datasource configuration and perform runtime checks.
6262
}
6363

64-
type Fetcher interface {
64+
// BatchFetcher represents a data source that produces a finite set of events.
65+
//
66+
// Implementations should:
67+
//
68+
// - send events to the output channel until the input is fully consumed
69+
// - return (nil) early when the context is canceled
70+
// - return errors if acquisition fails
71+
type BatchFetcher interface {
6572
// Start one shot acquisition(eg, cat a file)
66-
OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
73+
OneShot(ctx context.Context, out chan pipeline.Event) error
6774
}
6875

69-
type Tailer interface {
70-
// Start live acquisition (eg, tail a file)
71-
StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
76+
// Fetcher works like BatchFetcher but still relies on tombs, which are being replaced by context cancellation.
77+
// New datasources are expected to implement BatchFetcher instead.
78+
type Fetcher interface {
79+
OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
7280
}
7381

74-
// RestartableStreamer works Like Tailer but should return any error and leave the retry logic to the caller
82+
// RestartableStreamer represents a data source that produces an ongoing, potentially unbounded stream of events.
83+
//
84+
// Implementations should:
85+
//
86+
// - send events to the output channel, continuously
87+
// - return (nil) when the context is canceled
88+
// - return errors if acquisition fails
89+
// - as much as possible, do not attempt retry/backoff even for transient connection
90+
// failures, but treat them as errors. The caller is responsible for supervising
91+
// Stream(), and restarting it as needed. There is currently no way to differentiate
92+
// retryable vs permanent errors.
7593
type RestartableStreamer interface {
94+
// Start live acquisition (eg, tail a file)
7695
Stream(ctx context.Context, out chan pipeline.Event) error
7796
}
7897

98+
// Tailer has the same pupose as RestartableStreamer (provide ongoing events) but
99+
// is responsible for spawning its own goroutines, and handling errors and retries.
100+
// New datasources are expected to implement RestartableStreamer instead.
101+
type Tailer interface {
102+
StreamingAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error
103+
}
104+
79105
type MetricsProvider interface {
80106
// Returns pointers to metrics that are managed by the module
81107
GetMetrics() []prometheus.Collector
@@ -473,6 +499,16 @@ func transform(transformChan chan pipeline.Event, output chan pipeline.Event, ac
473499
}
474500
}
475501

502+
func runBatchFetcher(ctx context.Context, bf BatchFetcher, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
503+
// wrap tomb logic with context
504+
ctx, cancel := context.WithCancel(ctx)
505+
go func() {
506+
<-acquisTomb.Dying()
507+
cancel()
508+
}()
509+
510+
return bf.OneShot(ctx, output)
511+
}
476512

477513
func runRestartableStream(ctx context.Context, rs RestartableStreamer, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
478514
// wrap tomb logic with context
@@ -521,10 +557,14 @@ func runRestartableStream(ctx context.Context, rs RestartableStreamer, name stri
521557

522558
func acquireSource(ctx context.Context, source DataSource, name string, output chan pipeline.Event, acquisTomb *tomb.Tomb) error {
523559
if source.GetMode() == configuration.CAT_MODE {
560+
if s, ok := source.(BatchFetcher); ok {
561+
// s.Logger.Info("Start OneShot")
562+
return runBatchFetcher(ctx, s, output, acquisTomb)
563+
}
564+
524565
if s, ok := source.(Fetcher); ok {
525566
// s.Logger.Info("Start OneShotAcquisition")
526567
return s.OneShotAcquisition(ctx, output, acquisTomb)
527-
// s.Logger.Info("Exit OneShotAcquisition")
528568
}
529569

530570
return fmt.Errorf("%s: cat mode is set but OneShotAcquisition is not supported", source.GetName())

pkg/acquisition/file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ var (
1010
// verify interface compliance
1111
_ DataSource = (*fileacquisition.Source)(nil)
1212
_ DSNConfigurer = (*fileacquisition.Source)(nil)
13-
_ Fetcher = (*fileacquisition.Source)(nil)
13+
_ BatchFetcher = (*fileacquisition.Source)(nil)
1414
_ Tailer = (*fileacquisition.Source)(nil)
1515
_ MetricsProvider = (*fileacquisition.Source)(nil)
1616
)

pkg/acquisition/journalctl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ var (
1010
// verify interface compliance
1111
_ DataSource = (*journalctlacquisition.Source)(nil)
1212
_ DSNConfigurer = (*journalctlacquisition.Source)(nil)
13-
_ Fetcher = (*journalctlacquisition.Source)(nil)
13+
_ BatchFetcher = (*journalctlacquisition.Source)(nil)
1414
_ RestartableStreamer = (*journalctlacquisition.Source)(nil)
1515
_ MetricsProvider = (*journalctlacquisition.Source)(nil)
1616
)

pkg/acquisition/modules/file/file_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,6 @@ filename: %s`, deletedFile),
221221

222222
subLogger := logger.WithField("type", "file")
223223

224-
tomb := tomb.Tomb{}
225224
out := make(chan pipeline.Event, 100)
226225
f := fileacquisition.Source{}
227226

@@ -240,7 +239,7 @@ filename: %s`, deletedFile),
240239
tc.afterConfigure()
241240
}
242241

243-
err = f.OneShotAcquisition(ctx, out, &tomb)
242+
err = f.OneShot(ctx, out)
244243
cstest.RequireErrorContains(t, err, tc.expectedErr)
245244

246245
if tc.expectedLines != 0 {

pkg/acquisition/modules/file/run.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ import (
2828

2929
const defaultPollInterval = 30 * time.Second
3030

31-
// OneShotAcquisition reads a set of file and returns when done
32-
func (s *Source) OneShotAcquisition(_ context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
31+
func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error {
3332
s.logger.Debug("In oneshot")
3433

3534
for _, file := range s.files {
@@ -45,7 +44,7 @@ func (s *Source) OneShotAcquisition(_ context.Context, out chan pipeline.Event,
4544

4645
s.logger.Infof("reading %s at once", file)
4746

48-
err = s.readFile(file, out, t)
47+
err = s.readFile(ctx, file, out)
4948
if err != nil {
5049
return err
5150
}
@@ -367,7 +366,7 @@ func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail *tail.Tail
367366
}
368367
}
369368

370-
func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb) error {
369+
func (s *Source) readFile(ctx context.Context, filename string, out chan pipeline.Event) error {
371370
var scanner *bufio.Scanner
372371

373372
logger := s.logger.WithField("oneshot", filename)
@@ -402,7 +401,7 @@ func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb
402401

403402
for scanner.Scan() {
404403
select {
405-
case <-t.Dying():
404+
case <-ctx.Done():
406405
logger.Info("File datasource stopping")
407406
return nil
408407
default:
@@ -428,13 +427,9 @@ func (s *Source) readFile(filename string, out chan pipeline.Event, t *tomb.Tomb
428427

429428
if err := scanner.Err(); err != nil {
430429
logger.Errorf("Error while reading file: %s", err)
431-
t.Kill(err)
432-
433430
return err
434431
}
435432

436-
t.Kill(nil)
437-
438433
return nil
439434
}
440435

pkg/acquisition/modules/journalctl/journalctl_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
logtest "github.com/sirupsen/logrus/hooks/test"
17-
"gopkg.in/tomb.v2"
1817

1918
"github.com/crowdsecurity/go-cs-lib/cstest"
2019

@@ -141,7 +140,6 @@ journalctl_filter:
141140
},
142141
}
143142
for _, ts := range tests {
144-
tomb := tomb.Tomb{}
145143
out := make(chan pipeline.Event, 100)
146144
j := Source{}
147145

@@ -150,7 +148,7 @@ journalctl_filter:
150148
err := j.Configure(ctx, []byte(ts.config), logrus.NewEntry(logger), metrics.AcquisitionMetricsLevelNone)
151149
require.NoError(t, err)
152150

153-
err = j.OneShotAcquisition(ctx, out, &tomb)
151+
err = j.OneShot(ctx, out)
154152
cstest.RequireErrorContains(t, err, ts.expectedErr)
155153

156154
for _, expectedMessage := range ts.expectedLog {

pkg/acquisition/modules/journalctl/run.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"golang.org/x/sync/errgroup"
1010

1111
"github.com/prometheus/client_golang/prometheus"
12-
"gopkg.in/tomb.v2"
1312

1413
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
1514
"github.com/crowdsecurity/crowdsec/pkg/metrics"
@@ -18,18 +17,7 @@ import (
1817

1918
const journalctlCmd = "journalctl"
2019

21-
func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, acquisTomb *tomb.Tomb) error {
22-
if acquisTomb != nil {
23-
tombCtx, cancel := context.WithCancel(ctx)
24-
25-
go func() {
26-
<-acquisTomb.Dying()
27-
cancel()
28-
}()
29-
30-
ctx = tombCtx
31-
}
32-
20+
func (s *Source) OneShot(ctx context.Context, out chan pipeline.Event) error {
3321
err := s.runJournalCtl(ctx, out)
3422
s.logger.Debug("Oneshot acquisition is done")
3523

0 commit comments

Comments
 (0)