Skip to content

Commit c0d68af

Browse files
authored
fix: ensure log messages aren't lost when application exits (#392)
#### Relevant issue or PR #356, #357 #### Description of changes This fixes the observed issue of missing logs (printed messages + tracebacks) in some scenarios, specifically when the interpreter crashes / exits before corresponding I/O pipes are exhausted. Also renames `LogPipe` -> `TeePipe` because that's really what it does (teeing). The primary difference is that `TeePipe` now blocks the main thread when exiting while there are messages waiting in the pipe. Only when nothing has been read for X seconds do we continue with shutdown. This ensures that all queued up messages are forwarded properly. #### Testing done new test + manual check that messages are now shown as expected on the reproducer from #356.
1 parent 12197b3 commit c0d68af

File tree

6 files changed

+231
-56
lines changed

6 files changed

+231
-56
lines changed

tesseract_core/runtime/logs.py

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,53 +3,105 @@
33

44
import os
55
import threading
6+
import time
67
from collections.abc import Callable
78
from typing import Any
89

910

1011
# NOTE: This is duplicated in `tesseract_core/sdk/logs.py`.
1112
# Make sure to propagate changes to both files.
12-
class LogPipe(threading.Thread):
13-
"""Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor.
13+
class TeePipe(threading.Thread):
14+
"""Custom I/O construct to support live logging from a single file descriptor to multiple sinks.
1415
15-
Runs a thread that logs everything read from the pipe to the given sinks.
16-
Can be used as a context manager for automatic cleanup.
16+
Runs a thread that records everything written to the file descriptor. Can be used as a
17+
context manager for automatic cleanup.
18+
19+
Example:
20+
>>> with TeePipe(print, logger.info) as pipe_fd:
21+
... fd = os.fdopen(pipe_fd, "w")
22+
... print("Hello, World!", file=fd, flush=True)
23+
Hello, World!
24+
2025-06-10 12:00:00,000 - INFO - Hello, World!
1725
"""
1826

1927
daemon = True
2028

2129
def __init__(self, *sinks: Callable) -> None:
22-
"""Initialize the LogPipe with the given logging level."""
30+
"""Initialize the TeePipe by creating file descriptors."""
2331
super().__init__()
2432
self._sinks = sinks
2533
self._fd_read, self._fd_write = os.pipe()
26-
self._pipe_reader = os.fdopen(self._fd_read)
2734
self._captured_lines = []
35+
self._last_time = time.time()
36+
self._is_blocking = threading.Event()
37+
self._grace_period = 0.1
2838

2939
def __enter__(self) -> int:
3040
"""Start the thread and return the write file descriptor of the pipe."""
3141
self.start()
3242
return self.fileno()
3343

34-
def __exit__(self, *args: Any) -> None:
44+
def stop(self) -> None:
3545
"""Close the pipe and join the thread."""
46+
# Wait for ongoing streams to dry up
47+
# We only continue once the reader has spent some time blocked on reading
48+
while True:
49+
self._is_blocking.wait(timeout=1)
50+
if (time.time() - self._last_time) >= self._grace_period:
51+
break
52+
time.sleep(self._grace_period / 10)
53+
54+
# This will signal EOF to the reader thread
3655
os.close(self._fd_write)
37-
# Use a timeout so something weird happening in the logging thread doesn't
38-
# cause this to hang indefinitely
39-
self.join(timeout=10)
40-
# Do not close reader before thread is joined since there may be pending data
41-
# This also closes the fd_read pipe
42-
self._pipe_reader.close()
56+
os.close(self._fd_read)
57+
58+
# Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong
59+
self.join(timeout=1)
60+
61+
def __exit__(self, *args: Any) -> None:
62+
"""Close the pipe and join the thread."""
63+
self.stop()
4364

4465
def fileno(self) -> int:
4566
"""Return the write file descriptor of the pipe."""
4667
return self._fd_write
4768

4869
def run(self) -> None:
49-
"""Run the thread, logging everything."""
50-
for line in iter(self._pipe_reader.readline, ""):
51-
if line.endswith("\n"):
52-
line = line[:-1]
70+
"""Run the thread, pushing every full line of text to the sinks."""
71+
line_buffer = []
72+
while True:
73+
self._last_time = time.time()
74+
self._is_blocking.set()
75+
try:
76+
data = os.read(self._fd_read, 1024)
77+
self._is_blocking.clear()
78+
except OSError:
79+
# Pipe closed
80+
break
81+
82+
if data == b"":
83+
# EOF reached
84+
break
85+
86+
lines = data.split(b"\n")
87+
88+
# Log complete lines
89+
for i, line in enumerate(lines[:-1]):
90+
if i == 0:
91+
line = b"".join([*line_buffer, line])
92+
line_buffer = []
93+
line = line.decode(errors="ignore")
94+
self._captured_lines.append(line)
95+
for sink in self._sinks:
96+
sink(line)
97+
98+
# Accumulate incomplete line
99+
line_buffer.append(lines[-1])
100+
101+
# Flush incomplete lines at the end of the stream
102+
line = b"".join(line_buffer)
103+
if line:
104+
line = line.decode(errors="ignore")
53105
self._captured_lines.append(line)
54106
for sink in self._sinks:
55107
sink(line)

tesseract_core/runtime/mpa.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import requests
2121

2222
from tesseract_core.runtime.config import get_config
23-
from tesseract_core.runtime.logs import LogPipe
23+
from tesseract_core.runtime.logs import TeePipe
2424

2525

2626
class BaseBackend(ABC):
@@ -261,7 +261,7 @@ def redirect_stdio(logfile: str | Path) -> Generator[None, None, None]:
261261
# Use `print` instead of `.write` so we get appropriate newlines and flush behavior
262262
write_to_stderr = lambda msg: print(msg, file=orig_stderr_file, flush=True)
263263
write_to_file = lambda msg: print(msg, file=f, flush=True)
264-
pipe_fd = stack.enter_context(LogPipe(write_to_stderr, write_to_file))
264+
pipe_fd = stack.enter_context(TeePipe(write_to_stderr, write_to_file))
265265

266266
# Redirect file descriptors at OS level
267267
stack.enter_context(redirect_fd(sys.stdout, pipe_fd))

tesseract_core/sdk/docker_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from typing import List as list_ # noqa: UP035
1717

1818
from tesseract_core.sdk.config import get_config
19-
from tesseract_core.sdk.logs import LogPipe
19+
from tesseract_core.sdk.logs import TeePipe
2020

2121
logger = logging.getLogger("tesseract")
2222

@@ -232,7 +232,7 @@ def buildx(
232232
ssh=ssh,
233233
)
234234

235-
out_pipe = LogPipe(logger.debug)
235+
out_pipe = TeePipe(logger.debug)
236236

237237
with out_pipe as out_pipe_fd:
238238
proc = subprocess.run(build_cmd, stdout=out_pipe_fd, stderr=out_pipe_fd)

tesseract_core/sdk/logs.py

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
import logging
5-
import logging.handlers
65
import os
76
import sys
87
import threading
8+
import time
99
import warnings
1010
from collections.abc import Callable, Iterable
1111
from types import ModuleType
@@ -29,47 +29,98 @@
2929

3030
# NOTE: This is duplicated in `tesseract_core/runtime/logs.py`.
3131
# Make sure to propagate changes to both files.
32-
class LogPipe(threading.Thread):
33-
"""Custom IO pipe to support live logging from subprocess.run or OS-level file descriptor.
34-
35-
Runs a thread that logs everything read from the pipe to the given sinks.
36-
Can be used as a context manager for automatic cleanup.
32+
class TeePipe(threading.Thread):
33+
"""Custom I/O construct to support live logging from a single file descriptor to multiple sinks.
34+
35+
Runs a thread that records everything written to the file descriptor. Can be used as a
36+
context manager for automatic cleanup.
37+
38+
Example:
39+
>>> with TeePipe(print, logger.info) as pipe_fd:
40+
... fd = os.fdopen(pipe_fd, "w")
41+
... print("Hello, World!", file=fd, flush=True)
42+
Hello, World!
43+
2025-06-10 12:00:00,000 - INFO - Hello, World!
3744
"""
3845

3946
daemon = True
4047

4148
def __init__(self, *sinks: Callable) -> None:
42-
"""Initialize the LogPipe with the given logging level."""
49+
"""Initialize the TeePipe by creating file descriptors."""
4350
super().__init__()
4451
self._sinks = sinks
4552
self._fd_read, self._fd_write = os.pipe()
46-
self._pipe_reader = os.fdopen(self._fd_read)
4753
self._captured_lines = []
54+
self._last_time = time.time()
55+
self._is_blocking = threading.Event()
56+
self._grace_period = 0.1
4857

4958
def __enter__(self) -> int:
5059
"""Start the thread and return the write file descriptor of the pipe."""
5160
self.start()
5261
return self.fileno()
5362

54-
def __exit__(self, *args: Any) -> None:
63+
def stop(self) -> None:
5564
"""Close the pipe and join the thread."""
65+
# Wait for ongoing streams to dry up
66+
# We only continue once the reader has spent some time blocked on reading
67+
while True:
68+
self._is_blocking.wait(timeout=1)
69+
if (time.time() - self._last_time) >= self._grace_period:
70+
break
71+
time.sleep(self._grace_period / 10)
72+
73+
# This will signal EOF to the reader thread
5674
os.close(self._fd_write)
57-
# Use a timeout so something weird happening in the logging thread doesn't
58-
# cause this to hang indefinitely
59-
self.join(timeout=10)
60-
# Do not close reader before thread is joined since there may be pending data
61-
# This also closes the fd_read pipe
62-
self._pipe_reader.close()
75+
os.close(self._fd_read)
76+
77+
# Use timeout and daemon=True to avoid hanging indefinitely if something goes wrong
78+
self.join(timeout=1)
79+
80+
def __exit__(self, *args: Any) -> None:
81+
"""Close the pipe and join the thread."""
82+
self.stop()
6383

6484
def fileno(self) -> int:
6585
"""Return the write file descriptor of the pipe."""
6686
return self._fd_write
6787

6888
def run(self) -> None:
69-
"""Run the thread, logging everything."""
70-
for line in iter(self._pipe_reader.readline, ""):
71-
if line.endswith("\n"):
72-
line = line[:-1]
89+
"""Run the thread, pushing every full line of text to the sinks."""
90+
line_buffer = []
91+
while True:
92+
self._last_time = time.time()
93+
self._is_blocking.set()
94+
try:
95+
data = os.read(self._fd_read, 1024)
96+
self._is_blocking.clear()
97+
except OSError:
98+
# Pipe closed
99+
break
100+
101+
if data == b"":
102+
# EOF reached
103+
break
104+
105+
lines = data.split(b"\n")
106+
107+
# Log complete lines
108+
for i, line in enumerate(lines[:-1]):
109+
if i == 0:
110+
line = b"".join([*line_buffer, line])
111+
line_buffer = []
112+
line = line.decode(errors="ignore")
113+
self._captured_lines.append(line)
114+
for sink in self._sinks:
115+
sink(line)
116+
117+
# Accumulate incomplete line
118+
line_buffer.append(lines[-1])
119+
120+
# Flush incomplete lines at the end of the stream
121+
line = b"".join(line_buffer)
122+
if line:
123+
line = line.decode(errors="ignore")
73124
self._captured_lines.append(line)
74125
for sink in self._sinks:
75126
sink(line)

tests/endtoend_tests/test_tesseract_sdk.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,11 @@ def test_signature_consistency():
210210
)
211211

212212

213-
def test_logpipe_consistency():
214-
"""Test that the source code of the two duplicate LogPipe implementations is identical."""
215-
from tesseract_core.runtime.logs import LogPipe as RuntimeLogPipe
216-
from tesseract_core.sdk.logs import LogPipe as SDKLogPipe
213+
def test_teepipe_consistency():
214+
"""Test that the source code of the two duplicate TeePipe implementations is identical."""
215+
from tesseract_core.runtime.logs import TeePipe as RuntimeTeePipe
216+
from tesseract_core.sdk.logs import TeePipe as SDKTeePipe
217217

218-
runtime_source = inspect.getsource(RuntimeLogPipe)
219-
sdk_source = inspect.getsource(SDKLogPipe)
218+
runtime_source = inspect.getsource(RuntimeTeePipe)
219+
sdk_source = inspect.getsource(SDKTeePipe)
220220
assert runtime_source == sdk_source

0 commit comments

Comments
 (0)