@@ -61,21 +61,47 @@ type DataSource interface {
6161 Configure (ctx context.Context , yamlConfig []byte , logger * log.Entry , metricsLevel metrics.AcquisitionMetricsLevel ) error // Complete the YAML datasource configuration and perform runtime checks.
6262}
6363
64- type Fetcher interface {
64+ // BatchFetcher represents a data source that produces a finite set of events.
65+ //
66+ // Implementations should:
67+ //
68+ // - send events to the output channel until the input is fully consumed
69+ // - return (nil) early when the context is canceled
70+ // - return errors if acquisition fails
71+ type BatchFetcher interface {
6572 // Start one shot acquisition(eg, cat a file)
66- OneShotAcquisition (ctx context.Context , out chan pipeline.Event , acquisTomb * tomb. Tomb ) error
73+ OneShot (ctx context.Context , out chan pipeline.Event ) error
6774}
6875
69- type Tailer interface {
70- // Start live acquisition (eg, tail a file)
71- StreamingAcquisition (ctx context.Context , out chan pipeline.Event , acquisTomb * tomb.Tomb ) error
76+ // Fetcher works like BatchFetcher but still relies on tombs, which are being replaced by context cancellation.
77+ // New datasources are expected to implement BatchFetcher instead.
78+ type Fetcher interface {
79+ OneShotAcquisition (ctx context.Context , out chan pipeline.Event , acquisTomb * tomb.Tomb ) error
7280}
7381
74- // RestartableStreamer works Like Tailer but should return any error and leave the retry logic to the caller
82+ // RestartableStreamer represents a data source that produces an ongoing, potentially unbounded stream of events.
83+ //
84+ // Implementations should:
85+ //
86+ // - send events to the output channel, continuously
87+ // - return (nil) when the context is canceled
88+ // - return errors if acquisition fails
89+ // - as much as possible, do not attempt retry/backoff even for transient connection
90+ // failures, but treat them as errors. The caller is responsible for supervising
91+ // Stream(), and restarting it as needed. There is currently no way to differentiate
92+ // retryable vs permanent errors.
7593type RestartableStreamer interface {
94+ // Start live acquisition (eg, tail a file)
7695 Stream (ctx context.Context , out chan pipeline.Event ) error
7796}
7897
98+ // Tailer has the same pupose as RestartableStreamer (provide ongoing events) but
99+ // is responsible for spawning its own goroutines, and handling errors and retries.
100+ // New datasources are expected to implement RestartableStreamer instead.
101+ type Tailer interface {
102+ StreamingAcquisition (ctx context.Context , out chan pipeline.Event , acquisTomb * tomb.Tomb ) error
103+ }
104+
79105type MetricsProvider interface {
80106 // Returns pointers to metrics that are managed by the module
81107 GetMetrics () []prometheus.Collector
@@ -473,6 +499,16 @@ func transform(transformChan chan pipeline.Event, output chan pipeline.Event, ac
473499 }
474500}
475501
502+ func runBatchFetcher (ctx context.Context , bf BatchFetcher , output chan pipeline.Event , acquisTomb * tomb.Tomb ) error {
503+ // wrap tomb logic with context
504+ ctx , cancel := context .WithCancel (ctx )
505+ go func () {
506+ <- acquisTomb .Dying ()
507+ cancel ()
508+ }()
509+
510+ return bf .OneShot (ctx , output )
511+ }
476512
477513func runRestartableStream (ctx context.Context , rs RestartableStreamer , name string , output chan pipeline.Event , acquisTomb * tomb.Tomb ) error {
478514 // wrap tomb logic with context
@@ -521,10 +557,14 @@ func runRestartableStream(ctx context.Context, rs RestartableStreamer, name stri
521557
522558func acquireSource (ctx context.Context , source DataSource , name string , output chan pipeline.Event , acquisTomb * tomb.Tomb ) error {
523559 if source .GetMode () == configuration .CAT_MODE {
560+ if s , ok := source .(BatchFetcher ); ok {
561+ // s.Logger.Info("Start OneShot")
562+ return runBatchFetcher (ctx , s , output , acquisTomb )
563+ }
564+
524565 if s , ok := source .(Fetcher ); ok {
525566 // s.Logger.Info("Start OneShotAcquisition")
526567 return s .OneShotAcquisition (ctx , output , acquisTomb )
527- // s.Logger.Info("Exit OneShotAcquisition")
528568 }
529569
530570 return fmt .Errorf ("%s: cat mode is set but OneShotAcquisition is not supported" , source .GetName ())
0 commit comments