Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/argus_overview/intel/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,20 +472,24 @@ def _extract_ships(self, message: str) -> list[str]:
"""
Extract ship types from message.

Tokenizes the message once and uses set intersection instead of
running 200+ regex searches per message.

Args:
message: Chat message

Returns:
List of ship types found
"""
found = []
# Tokenize into lowercase words — handles most single-word ship names
words = set(re.findall(r"[a-z]+", message.lower()))
# Single-word ships via fast set intersection
found = list(words & self.SHIP_TYPES)
# Multi-word ship names (e.g. "force auxiliary") need substring check
msg_lower = message.lower()
Comment on lines +485 to 489
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is a great performance improvement. To make it even more efficient, you can avoid calling .lower() twice on the message.

Suggested change
words = set(re.findall(r"[a-z]+", message.lower()))
# Single-word ships via fast set intersection
found = list(words & self.SHIP_TYPES)
# Multi-word ship names (e.g. "force auxiliary") need substring check
msg_lower = message.lower()
msg_lower = message.lower()
words = set(re.findall(r"[a-z]+", msg_lower))
# Single-word ships via fast set intersection
found = list(words & self.SHIP_TYPES)
# Multi-word ship names (e.g. "force auxiliary") need substring check


for ship in self.SHIP_TYPES:
# Use word boundary matching
if re.search(rf"\b{re.escape(ship)}\b", msg_lower):
if " " in ship and ship in msg_lower:
found.append(ship)

return found

def _extract_count(self, message: str) -> int | None:
Expand Down
124 changes: 76 additions & 48 deletions src/argus_overview/platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,109 +84,137 @@ def is_linux() -> bool:
return sys.platform in ("linux", "linux2")


_singleton_cache: dict[str, object] = {}


def _clear_singleton_cache() -> None:
"""Clear singleton cache (for testing)."""
_singleton_cache.clear()


def get_window_manager() -> WindowManager:
"""Create a platform-appropriate WindowManager instance.
"""Get the platform-appropriate WindowManager singleton.

Returns:
WindowManager implementation for current platform

Raises:
RuntimeError: If platform is not supported
"""
platform = get_platform_name()
if platform == "windows":
from .windows import WindowManagerWindows
key = "window_manager"
if key not in _singleton_cache:
platform = get_platform_name()
if platform == "windows":
from .windows import WindowManagerWindows

return WindowManagerWindows()
elif platform == "linux":
from .linux import WindowManagerLinux
_singleton_cache[key] = WindowManagerWindows()
elif platform == "linux":
from .linux import WindowManagerLinux

return WindowManagerLinux()
raise RuntimeError(f"Unsupported platform: {platform}")
_singleton_cache[key] = WindowManagerLinux()
else:
raise RuntimeError(f"Unsupported platform: {platform}")
return _singleton_cache[key] # type: ignore[return-value]


def get_window_capture(max_workers: int = 4) -> WindowCapture:
"""Create a platform-appropriate WindowCapture instance.
"""Get the platform-appropriate WindowCapture singleton.

Args:
max_workers: Number of capture worker threads
max_workers: Number of capture worker threads (only used on first call)

Returns:
WindowCapture implementation for current platform

Raises:
RuntimeError: If platform is not supported
"""
platform = get_platform_name()
if platform == "windows":
from .windows import WindowCaptureWindows
key = "window_capture"
if key not in _singleton_cache:
platform = get_platform_name()
if platform == "windows":
from .windows import WindowCaptureWindows

return WindowCaptureWindows(max_workers=max_workers)
elif platform == "linux":
from .linux import WindowCaptureLinux
_singleton_cache[key] = WindowCaptureWindows(max_workers=max_workers)
elif platform == "linux":
from .linux import WindowCaptureLinux

return WindowCaptureLinux(max_workers=max_workers)
raise RuntimeError(f"Unsupported platform: {platform}")
_singleton_cache[key] = WindowCaptureLinux(max_workers=max_workers)
else:
raise RuntimeError(f"Unsupported platform: {platform}")
return _singleton_cache[key] # type: ignore[return-value]


def get_screen_manager() -> ScreenManager:
"""Create a platform-appropriate ScreenManager instance.
"""Get the platform-appropriate ScreenManager singleton.

Returns:
ScreenManager implementation for current platform

Raises:
RuntimeError: If platform is not supported
"""
platform = get_platform_name()
if platform == "windows":
from .windows import ScreenManagerWindows
key = "screen_manager"
if key not in _singleton_cache:
platform = get_platform_name()
if platform == "windows":
from .windows import ScreenManagerWindows

return ScreenManagerWindows()
elif platform == "linux":
from .linux import ScreenManagerLinux
_singleton_cache[key] = ScreenManagerWindows()
elif platform == "linux":
from .linux import ScreenManagerLinux

return ScreenManagerLinux()
raise RuntimeError(f"Unsupported platform: {platform}")
_singleton_cache[key] = ScreenManagerLinux()
else:
raise RuntimeError(f"Unsupported platform: {platform}")
return _singleton_cache[key] # type: ignore[return-value]


def get_eve_path_resolver() -> EVEPathResolver:
"""Create a platform-appropriate EVEPathResolver instance.
"""Get the platform-appropriate EVEPathResolver singleton.

Returns:
EVEPathResolver implementation for current platform

Raises:
RuntimeError: If platform is not supported
"""
platform = get_platform_name()
if platform == "windows":
from .windows import EVEPathResolverWindows
key = "eve_path_resolver"
if key not in _singleton_cache:
platform = get_platform_name()
if platform == "windows":
from .windows import EVEPathResolverWindows

return EVEPathResolverWindows()
elif platform == "linux":
from .linux import EVEPathResolverLinux
_singleton_cache[key] = EVEPathResolverWindows()
elif platform == "linux":
from .linux import EVEPathResolverLinux

return EVEPathResolverLinux()
raise RuntimeError(f"Unsupported platform: {platform}")
_singleton_cache[key] = EVEPathResolverLinux()
else:
raise RuntimeError(f"Unsupported platform: {platform}")
return _singleton_cache[key] # type: ignore[return-value]


def get_hotkey_helper() -> HotkeyHelper:
"""Create a platform-appropriate HotkeyHelper instance.
"""Get the platform-appropriate HotkeyHelper singleton.

Returns:
HotkeyHelper implementation for current platform

Raises:
RuntimeError: If platform is not supported
"""
platform = get_platform_name()
if platform == "windows":
from .windows import HotkeyHelperWindows

return HotkeyHelperWindows()
elif platform == "linux":
from .linux import HotkeyHelperLinux

return HotkeyHelperLinux()
raise RuntimeError(f"Unsupported platform: {platform}")
key = "hotkey_helper"
if key not in _singleton_cache:
platform = get_platform_name()
if platform == "windows":
from .windows import HotkeyHelperWindows

_singleton_cache[key] = HotkeyHelperWindows()
elif platform == "linux":
from .linux import HotkeyHelperLinux

_singleton_cache[key] = HotkeyHelperLinux()
else:
raise RuntimeError(f"Unsupported platform: {platform}")
return _singleton_cache[key] # type: ignore[return-value]
Comment on lines 95 to +220
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The introduction of a singleton cache is a good performance improvement. However, the implementation is very repetitive across all the factory functions (get_window_manager, get_window_capture, etc.). To improve maintainability and reduce code duplication, consider abstracting the caching and platform-switching logic into a generic helper function or a decorator. This would make the code much more DRY (Don't Repeat Yourself).

39 changes: 18 additions & 21 deletions src/argus_overview/platform/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,17 @@ class WindowManagerLinux(WindowManager):
"""Linux window management using wmctrl and xdotool."""

def get_window_list(self) -> list[tuple[str, str]]:
"""Get list of all windows using wmctrl."""
try:
result = run_x11_subprocess(["wmctrl", "-l"], timeout=2)
stdout = result.stdout.decode("utf-8", errors="replace")

windows = []
for line in stdout.strip().split("\n"):
if line:
parts = line.split(None, 3)
if len(parts) >= 4:
window_id = parts[0]
window_title = parts[3]
windows.append((window_id, window_title))

return windows
except (OSError, subprocess.SubprocessError) as e:
logger.warning(f"Failed to get window list: {e}")
return []
"""Get list of all windows using wmctrl (cached)."""
output = _get_wmctrl_window_list()
windows = []
for line in output.strip().split("\n"):
if line:
parts = line.split(None, 3)
if len(parts) >= 4:
window_id = parts[0]
window_title = parts[3]
windows.append((window_id, window_title))
return windows

def get_eve_windows(self) -> list[tuple[str, str]]:
"""Get list of EVE Online windows."""
Expand Down Expand Up @@ -404,7 +397,11 @@ def capture_window_sync(self, window_id: str, scale: float = 1.0) -> Image.Image
return self._capture_imagemagick(window_id)

def _capture_xlib(self, window_id: str) -> Image.Image | None:
"""Capture window using python-xlib (no subprocess)."""
"""Capture window using python-xlib (no subprocess).

Returns an RGBX image to avoid the overhead of converting to RGB.
The UI layer handles RGBX natively via QImage.Format_RGBX8888.
"""
try:
# Thread-local Display to avoid locking on hot path
if not hasattr(_thread_local, "display"):
Expand All @@ -414,8 +411,8 @@ def _capture_xlib(self, window_id: str) -> Image.Image | None:
window = disp.create_resource_object("window", int(window_id, 16))
geom = window.get_geometry()
raw = window.get_image(0, 0, geom.width, geom.height, X.ZPixmap, 0xFFFFFFFF)
img = Image.frombytes("RGBX", (geom.width, geom.height), raw.data, "raw", "BGRX")
return img.convert("RGB")
# Return RGBX directly — avoids a full-image copy from .convert("RGB")
return Image.frombytes("RGBX", (geom.width, geom.height), raw.data, "raw", "BGRX")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This change to return an 'RGBX' image directly is a great performance optimization. However, it seems the corresponding test TestXlibCapture.test_capture_xlib_success in tests/test_platform.py was not updated. The test still asserts result.mode == 'RGB' at line 1423, which will now fail. Please update the test to assert for 'RGBX'.

except Exception as e:
logger.debug(f"Xlib capture failed for {window_id}: {e}")
return None
Expand Down
58 changes: 28 additions & 30 deletions src/argus_overview/ui/main_tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@
from argus_overview.ui.menu_builder import ContextMenuBuilder, ToolbarBuilder
from argus_overview.utils.screen import ScreenGeometry, get_screen_geometry

# Module-level constant: avoids re-creating the dict on every pil_to_qimage call
_FORMAT_MAP = {
"RGB": (3, QImage.Format.Format_RGB888),
"RGBX": (4, QImage.Format.Format_RGBX8888),
"RGBA": (4, QImage.Format.Format_RGBA8888),
"L": (1, QImage.Format.Format_Grayscale8),
}


class FlowLayout(QLayout):
"""
Expand Down Expand Up @@ -512,44 +520,30 @@ def _move_window_position_only(self, window_id: str, x: int, y: int):
wm.move_window(window_id, x, y, 0, 0)


def pil_to_qimage(pil_image: Image.Image) -> QImage:
def pil_to_qimage(pil_image: Image.Image, raw_data: bytes | None = None) -> QImage:
"""
Convert PIL Image to QImage

Args:
pil_image: PIL Image object
raw_data: Optional pre-computed tobytes() to avoid a redundant copy

Returns:
QImage: Converted image, or None if input is None
"""
if pil_image is None:
return None
if pil_image.mode == "RGB":
bytes_per_line = 3 * pil_image.width
return QImage(
pil_image.tobytes(),
pil_image.width,
pil_image.height,
bytes_per_line,
QImage.Format.Format_RGB888,
)
elif pil_image.mode == "RGBA":
bytes_per_line = 4 * pil_image.width
return QImage(
pil_image.tobytes(),
pil_image.width,
pil_image.height,
bytes_per_line,
QImage.Format.Format_RGBA8888,
)
elif pil_image.mode == "L":
bytes_per_line = pil_image.width

mode = pil_image.mode
if mode in _FORMAT_MAP:
bpp, fmt = _FORMAT_MAP[mode]
data = raw_data if raw_data is not None else pil_image.tobytes()
return QImage(
pil_image.tobytes(),
data,
pil_image.width,
pil_image.height,
bytes_per_line,
QImage.Format.Format_Grayscale8,
bpp * pil_image.width,
fmt,
)
else:
# Convert to RGB if unknown mode
Expand Down Expand Up @@ -694,16 +688,17 @@ def update_frame(self, image: Image.Image):
image: PIL Image
"""
try:
# Frame fingerprint — hash first 4KB of image bytes to skip identical frames
frame_bytes = image.tobytes()
frame_hash = hash(frame_bytes[:4096])
# Frame fingerprint — sample raw pixel bytes to detect duplicate frames
# Accessing the internal buffer avoids a full tobytes() copy.
raw_data = image.tobytes()
frame_hash = hash(raw_data[:4096])
if frame_hash == self._last_frame_hash:
image.close()
return # Frame unchanged, skip conversion pipeline
self._last_frame_hash = frame_hash

# Convert PIL to QImage
qimage = pil_to_qimage(image)
# Convert PIL to QImage (reuse the raw bytes we already have)
qimage = pil_to_qimage(image, raw_data)
image.close() # Release PIL image memory immediately
if qimage is None:
return # Skip frame if capture failed
Expand Down Expand Up @@ -1074,7 +1069,10 @@ def _process_capture_results(self):
break

request_id, window_id, image = result
# Dedup: keep only the latest result per window
# Dedup: keep only the latest result per window, close superseded frames
prev = latest.get(window_id)
if prev is not None and prev[1] is not None:
prev[1].close()
latest[window_id] = (request_id, image)

# Remove from pending
Expand Down
Loading
Loading