diff --git a/src/plugins/rv-packages/session_manager/local_thumbnail_gen.py b/src/plugins/rv-packages/session_manager/local_thumbnail_gen.py index 35bb9dfcc..e6e2f519d 100644 --- a/src/plugins/rv-packages/session_manager/local_thumbnail_gen.py +++ b/src/plugins/rv-packages/session_manager/local_thumbnail_gen.py @@ -68,6 +68,7 @@ def __init__(self) -> None: self._cache_dir = Path(tempfile.gettempdir()) / f"rv_thumbnails_{os.getpid()}" self._cache_dir.mkdir(parents=True, exist_ok=True) self._in_flight: set[str] = set() + # Cache key to set of source node names self._cache_key_to_sources: dict[str, set[str]] = {} self._deferred_sources: set[str] = set() self._deferred_jobs: list[tuple[str, str, str, str]] = [] @@ -78,7 +79,7 @@ def __init__(self) -> None: self._loading_active = False self._display_preview = False if os.getenv("RV_SESSION_MANAGER_USE_THUMBNAILS") == "0" else True self._shutting_down = False - self._active_procs: list[subprocess.Popen] = [] + self._active_procs: list[tuple[subprocess.Popen, str]] = [] self._procs_lock = threading.Lock() self._pool = ThreadPoolExecutor(max_workers=MAX_WORKERS) @@ -106,6 +107,16 @@ def global_bindings(self) -> list[tuple[str, Any, str]]: self._on_session_deletion, "Delete all cached local filmstrips and thumbnails on RV close", ), + ( + "before-clear-session", + self._on_clear_session, + "Cancel in-flight generation and evict cache when the session is cleared", + ), + ( + "before-source-delete", + self._on_source_delete, + "Cancel in-flight generation and evict cache when a media source is removed", + ), ( "play-start", self._on_play_start, @@ -152,6 +163,9 @@ def _get_cached_path(self, event: Any, path_key: str) -> None: return cache_key = self._cache_key(media_path) + + self._cache_key_to_sources.setdefault(cache_key, set()).add(source_node) + cached = self._cache.get(cache_key, {}) path = cached.get(path_key) @@ -159,8 +173,6 @@ def _get_cached_path(self, event: Any, path_key: str) -> None: event.setReturnContent(str(path)) return - self._cache_key_to_sources.setdefault(cache_key, set()).add(source_node) - flight_key = f"{cache_key}_{path_key}" if flight_key not in self._in_flight: self._start_generation(source_node, cache_key, media_path, path_key) @@ -231,6 +243,19 @@ def _get_media_path(self, source_node: str) -> str | None: logger.warning(f"Could not get media path: {e}") return None + def _source_node_of_group(self, group: str) -> str | None: + """ + RVSourceGroup nodes have at most 1 RVFileSource or RVImageSource child (as a leaf), which is the actual media source. + Find it and return its node name. + """ + try: + for node in commands.nodesInGroup(group): + if commands.nodeType(node) in ("RVFileSource", "RVImageSource"): + return node + except Exception: + return + return None + def _get_source_info(self, source_node: str) -> tuple[int, int, int, int] | None: # Skip inactive media representations if not commands.getIntProperty(f"{source_node}.media.active")[0]: @@ -396,7 +421,7 @@ def _write_filmstrip_session( return output_width, output_height - def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None: + def _run_suspendable(self, cmd: list[str], cache_key: str, timeout: int = 120) -> None: """Run a subprocess that can be suspended/resumed during playback. The timeout counts only non-suspended wall-clock time: while the @@ -406,7 +431,7 @@ def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None: creationflags = subprocess.CREATE_NO_WINDOW if _IS_WIN32 else 0 proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags) with self._procs_lock: - self._active_procs.append(proc) + self._active_procs.append((proc, cache_key)) if self._should_defer(): _suspend_proc(proc) deadline = time.monotonic() + timeout @@ -427,7 +452,7 @@ def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None: finally: with self._procs_lock: try: - self._active_procs.remove(proc) + self._active_procs.remove((proc, cache_key)) except ValueError: logger.warning(f"Process {proc.pid} was not in active processes list") @@ -437,6 +462,7 @@ def _generate_thumbnail(self, cache_key: str, rvio_bin: str, media_path: str, mi try: self._run_suspendable( [rvio_bin, media_path, "-t", str(mid_frame), "-o", str(output_path)], + cache_key, ) except Exception as e: logger.error(f"Thumbnail generation failed: {e}") @@ -477,6 +503,7 @@ def _generate_filmstrip( "-o", str(output_path), ], + cache_key, ) except Exception as e: logger.error(f"Filmstrip generation failed: {e}") @@ -531,7 +558,7 @@ def _on_play_start(self, event: Any) -> None: self._playback_active = True if should_defer: return - for proc in self._active_procs: + for proc, _ in self._active_procs: _suspend_proc(proc) def _on_play_stop(self, event: Any) -> None: @@ -543,18 +570,19 @@ def _on_play_stop(self, event: Any) -> None: with self._procs_lock: self._playback_active = False if not self._should_defer(): - for proc in self._active_procs: + for proc, _ in self._active_procs: _resume_proc(proc) self._drain_one() def _on_loading_start(self, event: Any) -> None: event.reject() + self._shutting_down = False with self._procs_lock: should_defer = self._should_defer() self._loading_active = True if should_defer: return - for proc in self._active_procs: + for proc, _ in self._active_procs: _suspend_proc(proc) def _on_loading_stop(self, event: Any) -> None: @@ -562,7 +590,7 @@ def _on_loading_stop(self, event: Any) -> None: with self._procs_lock: self._loading_active = False if not self._should_defer(): - for proc in self._active_procs: + for proc, _ in self._active_procs: _resume_proc(proc) self._drain_one() @@ -573,7 +601,7 @@ def _on_previews_disabled(self, event: Any) -> None: self._display_preview = False if should_defer: return - for proc in self._active_procs: + for proc, _ in self._active_procs: _suspend_proc(proc) def _on_previews_enabled(self, event: Any) -> None: @@ -581,20 +609,43 @@ def _on_previews_enabled(self, event: Any) -> None: with self._procs_lock: self._display_preview = True if not self._should_defer(): - for proc in self._active_procs: + for proc, _ in self._active_procs: _resume_proc(proc) self._drain_one() + def _on_clear_session(self, event: Any) -> None: + """Cancel in-flight generation and evict all caches when the session is cleared.""" + event.reject() + self._shutting_down = True + self._pool.shutdown(wait=False, cancel_futures=True) + self._pool = ThreadPoolExecutor(max_workers=MAX_WORKERS) + self._in_flight.clear() + self._deferred_jobs.clear() + self._cache_key_to_sources.clear() + self._deferred_sources.clear() + self._cache.clear() + with self._procs_lock: + procs_to_terminate = list(self._active_procs) + for proc, _ in procs_to_terminate: + _resume_proc(proc) + try: + proc.kill() + proc.wait() + except OSError: + logger.warning(f"Failed to kill process {proc}") + def _on_session_deletion(self, event: Any) -> None: event.reject() self._shutting_down = True with self._procs_lock: - for proc in self._active_procs: - _resume_proc(proc) - try: - proc.terminate() - except OSError: - logger.warning(f"Failed to terminate process {proc}") + procs_to_kill = list(self._active_procs) + for proc, _ in procs_to_kill: + _resume_proc(proc) + try: + proc.kill() + proc.wait() + except OSError: + logger.warning(f"Failed to kill process {proc}") self._pool.shutdown(wait=False, cancel_futures=True) self._in_flight.clear() self._cache_key_to_sources.clear() @@ -606,6 +657,58 @@ def _on_session_deletion(self, event: Any) -> None: logger.warning(f"Failed to delete cache directory {self._cache_dir}: {e}") self._cache.clear() + def _on_source_delete(self, event: Any) -> None: + """Cancel generation immediately and evict the cache for a removed media source.""" + event.reject() + + node = event.contents() + + if commands.nodeType(node) in ("RVFileSource", "RVImageSource"): + source_node = node + else: + source_node = self._source_node_of_group(node) + if not source_node: + return + + media_path = self._get_media_path(source_node) + if not media_path: + return + + cache_key = self._cache_key(media_path) + + self._deferred_sources.discard(source_node) + + sources = self._cache_key_to_sources.get(cache_key) + if sources is not None: + sources.discard(source_node) + if sources: + return + self._cache_key_to_sources.pop(cache_key, None) + + # Kill any running rvio proc generating for this media. + with self._procs_lock: + for proc, proc_cache_key in self._active_procs: + if proc_cache_key == cache_key: + # Can't reliably kill a stopped proc, so resume before killing + _resume_proc(proc) + try: + proc.terminate() + except OSError: + logger.warning(f"Failed to terminate process {proc.pid}") + + self._deferred_jobs = [job for job in self._deferred_jobs if job[1] != cache_key] + + self._in_flight.discard(f"{cache_key}_thumbnail_path") + self._in_flight.discard(f"{cache_key}_filmstrip_path") + + cached = self._cache.pop(cache_key, {}) + for path in cached.values(): + if path: + try: + Path(path).unlink(missing_ok=True) + except Exception as e: + logger.warning(f"Failed to delete cached preview {path}: {e}") + def createMode() -> LocalThumbnailGen: global the_mode