Skip to content
139 changes: 121 additions & 18 deletions src/plugins/rv-packages/session_manager/local_thumbnail_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self) -> None:
self._cache_dir = Path(tempfile.gettempdir()) / f"rv_thumbnails_{os.getpid()}"
self._cache_dir.mkdir(parents=True, exist_ok=True)
self._in_flight: set[str] = set()
# Cache key to set of source node names
self._cache_key_to_sources: dict[str, set[str]] = {}
self._deferred_sources: set[str] = set()
self._deferred_jobs: list[tuple[str, str, str, str]] = []
Expand All @@ -78,7 +79,7 @@ def __init__(self) -> None:
self._loading_active = False
self._display_preview = False if os.getenv("RV_SESSION_MANAGER_USE_THUMBNAILS") == "0" else True
self._shutting_down = False
self._active_procs: list[subprocess.Popen] = []
self._active_procs: list[tuple[subprocess.Popen, str]] = []
self._procs_lock = threading.Lock()
self._pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)

Expand Down Expand Up @@ -106,6 +107,16 @@ def global_bindings(self) -> list[tuple[str, Any, str]]:
self._on_session_deletion,
"Delete all cached local filmstrips and thumbnails on RV close",
),
(
"before-clear-session",
self._on_clear_session,
"Cancel in-flight generation and evict cache when the session is cleared",
),
(
"before-source-delete",
self._on_source_delete,
"Cancel in-flight generation and evict cache when a media source is removed",
),
(
"play-start",
self._on_play_start,
Expand Down Expand Up @@ -152,15 +163,16 @@ def _get_cached_path(self, event: Any, path_key: str) -> None:
return

cache_key = self._cache_key(media_path)

self._cache_key_to_sources.setdefault(cache_key, set()).add(source_node)

cached = self._cache.get(cache_key, {})
path = cached.get(path_key)

if path:
event.setReturnContent(str(path))
return

self._cache_key_to_sources.setdefault(cache_key, set()).add(source_node)

flight_key = f"{cache_key}_{path_key}"
if flight_key not in self._in_flight:
self._start_generation(source_node, cache_key, media_path, path_key)
Expand Down Expand Up @@ -231,6 +243,19 @@ def _get_media_path(self, source_node: str) -> str | None:
logger.warning(f"Could not get media path: {e}")
return None

def _source_node_of_group(self, group: str) -> str | None:
"""
RVSourceGroup nodes have at most 1 RVFileSource or RVImageSource child (as a leaf), which is the actual media source.
Find it and return its node name.
"""
try:
for node in commands.nodesInGroup(group):
if commands.nodeType(node) in ("RVFileSource", "RVImageSource"):
return node
except Exception:
return
return None

def _get_source_info(self, source_node: str) -> tuple[int, int, int, int] | None:
# Skip inactive media representations
if not commands.getIntProperty(f"{source_node}.media.active")[0]:
Expand Down Expand Up @@ -396,7 +421,7 @@ def _write_filmstrip_session(

return output_width, output_height

def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None:
def _run_suspendable(self, cmd: list[str], cache_key: str, timeout: int = 120) -> None:
"""Run a subprocess that can be suspended/resumed during playback.

The timeout counts only non-suspended wall-clock time: while the
Expand All @@ -406,7 +431,7 @@ def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None:
creationflags = subprocess.CREATE_NO_WINDOW if _IS_WIN32 else 0
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
with self._procs_lock:
self._active_procs.append(proc)
self._active_procs.append((proc, cache_key))
if self._should_defer():
_suspend_proc(proc)
deadline = time.monotonic() + timeout
Expand All @@ -427,7 +452,7 @@ def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None:
finally:
with self._procs_lock:
try:
self._active_procs.remove(proc)
self._active_procs.remove((proc, cache_key))
except ValueError:
logger.warning(f"Process {proc.pid} was not in active processes list")

Expand All @@ -437,6 +462,7 @@ def _generate_thumbnail(self, cache_key: str, rvio_bin: str, media_path: str, mi
try:
self._run_suspendable(
[rvio_bin, media_path, "-t", str(mid_frame), "-o", str(output_path)],
cache_key,
)
except Exception as e:
logger.error(f"Thumbnail generation failed: {e}")
Expand Down Expand Up @@ -477,6 +503,7 @@ def _generate_filmstrip(
"-o",
str(output_path),
],
cache_key,
)
except Exception as e:
logger.error(f"Filmstrip generation failed: {e}")
Expand Down Expand Up @@ -531,7 +558,7 @@ def _on_play_start(self, event: Any) -> None:
self._playback_active = True
if should_defer:
return
for proc in self._active_procs:
for proc, _ in self._active_procs:
_suspend_proc(proc)

def _on_play_stop(self, event: Any) -> None:
Expand All @@ -543,26 +570,27 @@ def _on_play_stop(self, event: Any) -> None:
with self._procs_lock:
self._playback_active = False
if not self._should_defer():
for proc in self._active_procs:
for proc, _ in self._active_procs:
_resume_proc(proc)
self._drain_one()

def _on_loading_start(self, event: Any) -> None:
event.reject()
self._shutting_down = False
with self._procs_lock:
should_defer = self._should_defer()
self._loading_active = True
if should_defer:
return
for proc in self._active_procs:
for proc, _ in self._active_procs:
_suspend_proc(proc)

def _on_loading_stop(self, event: Any) -> None:
event.reject()
with self._procs_lock:
self._loading_active = False
if not self._should_defer():
for proc in self._active_procs:
for proc, _ in self._active_procs:
_resume_proc(proc)
self._drain_one()

Expand All @@ -573,28 +601,51 @@ def _on_previews_disabled(self, event: Any) -> None:
self._display_preview = False
if should_defer:
return
for proc in self._active_procs:
for proc, _ in self._active_procs:
_suspend_proc(proc)

def _on_previews_enabled(self, event: Any) -> None:
event.reject()
with self._procs_lock:
self._display_preview = True
if not self._should_defer():
for proc in self._active_procs:
for proc, _ in self._active_procs:
_resume_proc(proc)
self._drain_one()

def _on_clear_session(self, event: Any) -> None:
"""Cancel in-flight generation and evict all caches when the session is cleared."""
event.reject()
self._shutting_down = True
self._pool.shutdown(wait=False, cancel_futures=True)
self._pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)
self._in_flight.clear()
self._deferred_jobs.clear()
self._cache_key_to_sources.clear()
self._deferred_sources.clear()
self._cache.clear()
with self._procs_lock:
procs_to_terminate = list(self._active_procs)
for proc, _ in procs_to_terminate:
_resume_proc(proc)
try:
proc.kill()
proc.wait()
except OSError:
logger.warning(f"Failed to kill process {proc}")

def _on_session_deletion(self, event: Any) -> None:
event.reject()
self._shutting_down = True
with self._procs_lock:
for proc in self._active_procs:
_resume_proc(proc)
try:
proc.terminate()
except OSError:
logger.warning(f"Failed to terminate process {proc}")
procs_to_kill = list(self._active_procs)
for proc, _ in procs_to_kill:
_resume_proc(proc)
try:
proc.kill()
proc.wait()
except OSError:
logger.warning(f"Failed to kill process {proc}")
self._pool.shutdown(wait=False, cancel_futures=True)
self._in_flight.clear()
self._cache_key_to_sources.clear()
Expand All @@ -606,6 +657,58 @@ def _on_session_deletion(self, event: Any) -> None:
logger.warning(f"Failed to delete cache directory {self._cache_dir}: {e}")
self._cache.clear()

def _on_source_delete(self, event: Any) -> None:
"""Cancel generation immediately and evict the cache for a removed media source."""
event.reject()

node = event.contents()

if commands.nodeType(node) in ("RVFileSource", "RVImageSource"):
source_node = node
else:
source_node = self._source_node_of_group(node)
if not source_node:
return

media_path = self._get_media_path(source_node)
if not media_path:
return

cache_key = self._cache_key(media_path)

self._deferred_sources.discard(source_node)

sources = self._cache_key_to_sources.get(cache_key)
if sources is not None:
sources.discard(source_node)
if sources:
return
Comment thread
bernie-laberge marked this conversation as resolved.
self._cache_key_to_sources.pop(cache_key, None)

# Kill any running rvio proc generating for this media.
with self._procs_lock:
for proc, proc_cache_key in self._active_procs:
if proc_cache_key == cache_key:
# Can't reliably kill a stopped proc, so resume before killing
_resume_proc(proc)
try:
proc.terminate()
except OSError:
logger.warning(f"Failed to terminate process {proc.pid}")

self._deferred_jobs = [job for job in self._deferred_jobs if job[1] != cache_key]

self._in_flight.discard(f"{cache_key}_thumbnail_path")
self._in_flight.discard(f"{cache_key}_filmstrip_path")

cached = self._cache.pop(cache_key, {})
for path in cached.values():
if path:
try:
Path(path).unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Failed to delete cached preview {path}: {e}")


def createMode() -> LocalThumbnailGen:
global the_mode
Expand Down
Loading