When use service.Stream.Run
https://github.com/redpanda-data/benthos/blob/main/public/service/stream.go#L72
func (s *Stream) Run(ctx context.Context) (err error) {
s.strmMut.Lock()
if s.strm != nil {
err = errors.New("stream has already been run")
} else {
s.strm, err = stream.New(s.conf, s.mgr,
stream.OptOnClose(func() {
s.shutSig.TriggerHasStopped()
}))
}
//...
}
the stream.New will be called
https://github.com/redpanda-data/benthos/blob/main/internal/stream/type.go#L53
func New(conf Config, mgr bundle.NewManagement, opts ...func(*Type)) (*Type, error) {
t := &Type{
conf: conf,
manager: mgr,
onClose: func() {},
closed: 0,
}
for _, opt := range opts {
opt(t)
}
if err := t.start(); err != nil {
// bug in here!
// when we get an error, we need do some clean up
return nil, err
}
// ...
}
Because in stream.Type.start we construct all layer.
If we construct the input layer success, and the input loop will be run in background.
Then, we construct the output layer fail, the func just returned, and the input loop leaked.
https://github.com/redpanda-data/benthos/blob/main/internal/stream/type.go#L141
func (t *Type) start() (err error) {
// Constructors
iMgr := t.manager.IntoPath("input")
// bug in here
// the input loop start when the input component constructed
// https://github.com/redpanda-data/benthos/blob/main/internal/component/input/async_reader.go#L68
if t.inputLayer, err = iMgr.NewInput(t.conf.Input); err != nil {
return
}
if t.conf.Buffer.Type != "none" {
bMgr := t.manager.IntoPath("buffer")
if t.bufferLayer, err = bMgr.NewBuffer(t.conf.Buffer); err != nil {
return
}
}
if tLen := len(t.conf.Pipeline.Processors); tLen > 0 {
pMgr := t.manager.IntoPath("pipeline")
if t.pipelineLayer, err = pipeline.New(t.conf.Pipeline, pMgr); err != nil {
return
}
}
oMgr := t.manager.IntoPath("output")
if t.outputLayer, err = oMgr.NewOutput(t.conf.Output); err != nil {
return
}
// ...
}
And also, other layers loop start when the Consume interface been called.
We also need to handle their clean up work when some error happened.
When use service.Stream.Run
https://github.com/redpanda-data/benthos/blob/main/public/service/stream.go#L72
the
stream.Newwill be calledhttps://github.com/redpanda-data/benthos/blob/main/internal/stream/type.go#L53
Because in
stream.Type.startwe construct all layer.If we construct the input layer success, and the input loop will be run in background.
Then, we construct the output layer fail, the func just returned, and the input loop leaked.
https://github.com/redpanda-data/benthos/blob/main/internal/stream/type.go#L141
And also, other layers loop start when the
Consumeinterface been called.We also need to handle their clean up work when some error happened.