Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 114 additions & 13 deletions src/plugins/rv-packages/session_manager/local_thumbnail_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self) -> None:
self._cache_dir = Path(tempfile.gettempdir()) / f"rv_thumbnails_{os.getpid()}"
self._cache_dir.mkdir(parents=True, exist_ok=True)
self._in_flight: set[str] = set()
# Cache key to set of source node names
self._cache_key_to_sources: dict[str, set[str]] = {}
self._deferred_sources: set[str] = set()
self._deferred_jobs: list[tuple[str, str, str, str]] = []
Expand All @@ -78,7 +79,7 @@ def __init__(self) -> None:
self._loading_active = False
self._display_preview = False if os.getenv("RV_SESSION_MANAGER_USE_THUMBNAILS") == "0" else True
self._shutting_down = False
self._active_procs: list[subprocess.Popen] = []
self._active_procs: list[tuple[subprocess.Popen, str]] = []
self._procs_lock = threading.Lock()
self._pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)

Expand Down Expand Up @@ -106,6 +107,16 @@ def global_bindings(self) -> list[tuple[str, Any, str]]:
self._on_session_deletion,
"Delete all cached local filmstrips and thumbnails on RV close",
),
(
"before-clear-session",
self._on_clear_session,
"Cancel in-flight generation and evict cache when the session is cleared",
),
(
"before-source-delete",
self._on_source_delete,
"Cancel in-flight generation and evict cache when a media source is removed",
),
(
"play-start",
self._on_play_start,
Expand Down Expand Up @@ -152,15 +163,16 @@ def _get_cached_path(self, event: Any, path_key: str) -> None:
return

cache_key = self._cache_key(media_path)

self._cache_key_to_sources.setdefault(cache_key, set()).add(source_node)

cached = self._cache.get(cache_key, {})
path = cached.get(path_key)

if path:
event.setReturnContent(str(path))
return

self._cache_key_to_sources.setdefault(cache_key, set()).add(source_node)

flight_key = f"{cache_key}_{path_key}"
if flight_key not in self._in_flight:
self._start_generation(source_node, cache_key, media_path, path_key)
Expand Down Expand Up @@ -231,6 +243,19 @@ def _get_media_path(self, source_node: str) -> str | None:
logger.warning(f"Could not get media path: {e}")
return None

def _source_node_of_group(self, group: str) -> str | None:
"""
RVSourceGroup nodes have at most 1 RVFileSource or RVImageSource child (as a leaf), which is the actual media source.
Find it and return its node name.
"""
try:
for node in commands.nodesInGroup(group):
if commands.nodeType(node) in ("RVFileSource", "RVImageSource"):
return node
except Exception:
return
return None

def _get_source_info(self, source_node: str) -> tuple[int, int, int, int] | None:
# Skip inactive media representations
if not commands.getIntProperty(f"{source_node}.media.active")[0]:
Expand Down Expand Up @@ -396,7 +421,7 @@ def _write_filmstrip_session(

return output_width, output_height

def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None:
def _run_suspendable(self, cmd: list[str], cache_key: str, timeout: int = 120) -> None:
"""Run a subprocess that can be suspended/resumed during playback.

The timeout counts only non-suspended wall-clock time: while the
Expand All @@ -406,7 +431,7 @@ def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None:
creationflags = subprocess.CREATE_NO_WINDOW if _IS_WIN32 else 0
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
with self._procs_lock:
self._active_procs.append(proc)
self._active_procs.append((proc, cache_key))
if self._should_defer():
_suspend_proc(proc)
deadline = time.monotonic() + timeout
Expand All @@ -427,7 +452,7 @@ def _run_suspendable(self, cmd: list[str], timeout: int = 120) -> None:
finally:
with self._procs_lock:
try:
self._active_procs.remove(proc)
self._active_procs.remove((proc, cache_key))
except ValueError:
logger.warning(f"Process {proc.pid} was not in active processes list")

Expand All @@ -437,6 +462,7 @@ def _generate_thumbnail(self, cache_key: str, rvio_bin: str, media_path: str, mi
try:
self._run_suspendable(
[rvio_bin, media_path, "-t", str(mid_frame), "-o", str(output_path)],
cache_key,
)
except Exception as e:
logger.error(f"Thumbnail generation failed: {e}")
Expand Down Expand Up @@ -477,6 +503,7 @@ def _generate_filmstrip(
"-o",
str(output_path),
],
cache_key,
)
except Exception as e:
logger.error(f"Filmstrip generation failed: {e}")
Expand Down Expand Up @@ -531,7 +558,7 @@ def _on_play_start(self, event: Any) -> None:
self._playback_active = True
if should_defer:
return
for proc in self._active_procs:
for proc, _ in self._active_procs:
_suspend_proc(proc)

def _on_play_stop(self, event: Any) -> None:
Expand All @@ -543,26 +570,27 @@ def _on_play_stop(self, event: Any) -> None:
with self._procs_lock:
self._playback_active = False
if not self._should_defer():
for proc in self._active_procs:
for proc, _ in self._active_procs:
_resume_proc(proc)
self._drain_one()

def _on_loading_start(self, event: Any) -> None:
event.reject()
self._shutting_down = False
with self._procs_lock:
should_defer = self._should_defer()
self._loading_active = True
if should_defer:
return
for proc in self._active_procs:
for proc, _ in self._active_procs:
_suspend_proc(proc)

def _on_loading_stop(self, event: Any) -> None:
event.reject()
with self._procs_lock:
self._loading_active = False
if not self._should_defer():
for proc in self._active_procs:
for proc, _ in self._active_procs:
_resume_proc(proc)
self._drain_one()

Expand All @@ -573,23 +601,44 @@ def _on_previews_disabled(self, event: Any) -> None:
self._display_preview = False
if should_defer:
return
for proc in self._active_procs:
for proc, _ in self._active_procs:
_suspend_proc(proc)

def _on_previews_enabled(self, event: Any) -> None:
event.reject()
with self._procs_lock:
self._display_preview = True
if not self._should_defer():
for proc in self._active_procs:
for proc, _ in self._active_procs:
_resume_proc(proc)
self._drain_one()

def _on_clear_session(self, event: Any) -> None:
"""Cancel in-flight generation and evict all caches when the session is cleared."""
event.reject()
self._shutting_down = True
self._pool.shutdown(wait=False, cancel_futures=True)
self._pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)
self._in_flight.clear()
self._deferred_jobs.clear()
self._cache_key_to_sources.clear()
self._deferred_sources.clear()
self._cache.clear()
with self._procs_lock:
procs_to_terminate = list(self._active_procs)
for proc, _ in procs_to_terminate:
_resume_proc(proc)
try:
proc.kill()
proc.wait()
except OSError:
logger.warning(f"Failed to kill process {proc}")

def _on_session_deletion(self, event: Any) -> None:
event.reject()
self._shutting_down = True
with self._procs_lock:
for proc in self._active_procs:
for proc, _ in self._active_procs:
_resume_proc(proc)
try:
proc.terminate()
Expand All @@ -606,6 +655,58 @@ def _on_session_deletion(self, event: Any) -> None:
logger.warning(f"Failed to delete cache directory {self._cache_dir}: {e}")
self._cache.clear()

def _on_source_delete(self, event: Any) -> None:
"""Cancel generation immediately and evict the cache for a removed media source."""
event.reject()

node = event.contents()

if commands.nodeType(node) in ("RVFileSource", "RVImageSource"):
source_node = node
else:
source_node = self._source_node_of_group(node)
if not source_node:
return

media_path = self._get_media_path(source_node)
if not media_path:
return

cache_key = self._cache_key(media_path)

self._deferred_sources.discard(source_node)

sources = self._cache_key_to_sources.get(cache_key)
if sources is not None:
sources.discard(source_node)
if sources:
return
self._cache_key_to_sources.pop(cache_key, None)

# Kill any running rvio proc generating for this media.
with self._procs_lock:
for proc, proc_cache_key in self._active_procs:
if proc_cache_key == cache_key:
# Can't reliably kill a stopped proc, so resume before killing
_resume_proc(proc)
try:
proc.terminate()
except OSError:
logger.warning(f"Failed to terminate process {proc.pid}")

self._deferred_jobs = [job for job in self._deferred_jobs if job[1] != cache_key]

self._in_flight.discard(f"{cache_key}_thumbnail_path")
self._in_flight.discard(f"{cache_key}_filmstrip_path")

cached = self._cache.pop(cache_key, {})
for path in cached.values():
if path:
try:
Path(path).unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Failed to delete cached preview {path}: {e}")


def createMode() -> LocalThumbnailGen:
global the_mode
Expand Down
Loading