@@ -30,12 +30,14 @@ import (
3030 "sync"
3131 "time"
3232
33+ containerd "github.com/containerd/containerd/v2/client"
3334 "github.com/fsnotify/fsnotify"
3435 "github.com/muesli/cancelreader"
3536
3637 "github.com/containerd/containerd/v2/core/runtime/v2/logging"
3738 "github.com/containerd/errdefs"
3839 "github.com/containerd/log"
40+ "github.com/containerd/nerdctl/v2/pkg/lockutil"
3941)
4042
4143const (
@@ -149,7 +151,60 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) {
149151 return logConfig , nil
150152}
151153
152- func loggingProcessAdapter (ctx context.Context , driver Driver , dataStore string , config * logging.Config ) error {
154+ func getLockPath (dataStore , ns , id string ) string {
155+ return filepath .Join (dataStore , "containers" , ns , id , "logger-lock" )
156+ }
157+
158+ // WaitForLogger waits until the logger has finished executing and processing container logs
159+ func WaitForLogger (dataStore , ns , id string ) error {
160+ return lockutil .WithDirLock (getLockPath (dataStore , ns , id ), func () error {
161+ return nil
162+ })
163+ }
164+
165+ func getContainerWait (ctx context.Context , address string , config * logging.Config ) (<- chan containerd.ExitStatus , error ) {
166+ client , err := containerd .New (address , containerd .WithDefaultNamespace (config .Namespace ))
167+ if err != nil {
168+ return nil , err
169+ }
170+ con , err := client .LoadContainer (ctx , config .ID )
171+ if err != nil {
172+ return nil , err
173+ }
174+
175+ task , err := con .Task (ctx , nil )
176+ if err == nil {
177+ return task .Wait (ctx )
178+ }
179+ if ! errdefs .IsNotFound (err ) {
180+ return nil , err
181+ }
182+
183+ // If task was not found, it's possible that the container runtime is still being created.
184+ // Retry every 100ms.
185+ ticker := time .NewTicker (100 * time .Millisecond )
186+ defer ticker .Stop ()
187+
188+ for {
189+ select {
190+ case <- ctx .Done ():
191+ return nil , errors .New ("timed out waiting for container task to start" )
192+ case <- ticker .C :
193+ task , err = con .Task (ctx , nil )
194+ if err != nil {
195+ if errdefs .IsNotFound (err ) {
196+ continue
197+ }
198+ return nil , err
199+ }
200+ return task .Wait (ctx )
201+ }
202+ }
203+ }
204+
205+ type ContainerWaitFunc func (ctx context.Context , address string , config * logging.Config ) (<- chan containerd.ExitStatus , error )
206+
207+ func loggingProcessAdapter (ctx context.Context , driver Driver , dataStore , address string , getContainerWait ContainerWaitFunc , config * logging.Config ) error {
153208 if err := driver .PreProcess (ctx , dataStore , config ); err != nil {
154209 return err
155210 }
@@ -168,6 +223,20 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
168223 stderrR .Cancel ()
169224 }()
170225
226+ // initialize goroutines to copy stdout and stderr streams to a closable pipe
227+ pipeStdoutR , pipeStdoutW := io .Pipe ()
228+ pipeStderrR , pipeStderrW := io .Pipe ()
229+ copyStream := func (reader io.Reader , writer * io.PipeWriter ) {
230+ // copy using a buffer of size 32K
231+ buf := make ([]byte , 32 << 10 )
232+ _ , err := io .CopyBuffer (writer , reader , buf )
233+ if err != nil {
234+ log .G (ctx ).Errorf ("failed to copy stream: %s" , err )
235+ }
236+ }
237+ go copyStream (stdoutR , pipeStdoutW )
238+ go copyStream (stderrR , pipeStderrW )
239+
171240 var wg sync.WaitGroup
172241 wg .Add (3 )
173242 stdout := make (chan string , 10000 )
@@ -182,7 +251,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
182251 for err == nil {
183252 var s string
184253 s , err = r .ReadString ('\n' )
185-
186254 if len (s ) > 0 {
187255 dataChan <- strings .TrimSuffix (s , "\n " )
188256 }
@@ -192,12 +260,24 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
192260 }
193261 }
194262 }
195- go processLogFunc (stdoutR , stdout )
196- go processLogFunc (stderrR , stderr )
263+ go processLogFunc (pipeStdoutR , stdout )
264+ go processLogFunc (pipeStderrR , stderr )
197265 go func () {
198266 defer wg .Done ()
199267 driver .Process (stdout , stderr )
200268 }()
269+ go func () {
270+ // close pipeStdoutW and pipeStderrW upon container exit
271+ defer pipeStdoutW .Close ()
272+ defer pipeStderrW .Close ()
273+
274+ exitCh , err := getContainerWait (ctx , address , config )
275+ if err != nil {
276+ log .G (ctx ).Errorf ("failed to get container task wait channel: %v" , err )
277+ return
278+ }
279+ <- exitCh
280+ }()
201281 wg .Wait ()
202282 return driver .PostProcess ()
203283}
@@ -220,11 +300,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
220300 if err != nil {
221301 return err
222302 }
223- if err := ready (); err != nil {
303+
304+ loggerLock := getLockPath (dataStore , config .Namespace , config .ID )
305+ f , err := os .Create (loggerLock )
306+ if err != nil {
224307 return err
225308 }
226-
227- return loggingProcessAdapter (ctx , driver , dataStore , config )
309+ defer f .Close ()
310+
311+ // the logger will obtain an exclusive lock on a file until the container is
312+ // stopped and the driver has finished processing all output,
313+ // so that waiting log viewers can be signalled when the process is complete.
314+ return lockutil .WithDirLock (loggerLock , func () error {
315+ if err := ready (); err != nil {
316+ return err
317+ }
318+ // getContainerWait is extracted as parameter to allow mocking in tests.
319+ return loggingProcessAdapter (ctx , driver , dataStore , logConfig .Address , getContainerWait , config )
320+ })
228321 } else if ! errors .Is (err , os .ErrNotExist ) {
229322 // the file does not exist if the container was created with nerdctl < 0.20
230323 return err
0 commit comments