Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 98 additions & 4 deletions faro/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,42 @@ def stimulator_needs_data(self) -> bool:
return False
return isinstance(self.pipeline.stimulator, (StimWithImage, StimWithPipeline))

@staticmethod
def _wait_for_frame_pumping_qt(
queue_obj, frame_idx: int, timeout: float, poll_s: float = 0.05
):
"""Wait for a frame on a ``FrameDispenser`` while pumping Qt events.

``FrameDispenser.wait_for_frame`` is a plain ``threading.Condition.wait()``
with no Qt awareness. Called from the main thread (as
``Controller._build_stim_slm`` does), it freezes the Qt event loop
for as long as the pipeline takes to produce the stim mask -- napari
freezes during every stim frame even though the actual GPU work is
on the pipeline's executor.

Poll with a short timeout and call ``QCoreApplication.processEvents``
between attempts, preserving the caller-supplied total timeout.
Falls back to a plain ``wait_for_frame`` if Qt isn't loaded.
"""
try:
from qtpy.QtCore import QCoreApplication
except Exception:
return queue_obj.wait_for_frame(frame_idx, timeout=timeout)
app = QCoreApplication.instance()
if app is None:
return queue_obj.wait_for_frame(frame_idx, timeout=timeout)

deadline = time.monotonic() + timeout
while True:
remaining = deadline - time.monotonic()
slice_dt = max(0.0, min(poll_s, remaining))
try:
return queue_obj.wait_for_frame(frame_idx, timeout=slice_dt)
except QueueEmpty:
if remaining <= 0:
raise
app.processEvents()

def get_stim_mask(
self, fov_index: int, metadata: dict, *, timeout: float | None = None
) -> np.ndarray | None:
Expand All @@ -185,8 +221,8 @@ def get_stim_mask(
timeout = self._stim_mask_timeout
frame_idx = metadata.get("timestep", 0)
try:
mask = fov_state.stim_mask_queue.wait_for_frame(
frame_idx, timeout=timeout
mask = self._wait_for_frame_pumping_qt(
fov_state.stim_mask_queue, frame_idx, timeout
)
except QueueEmpty as e:
# _build_stim_slm still log-and-continues with False, but
Expand Down Expand Up @@ -577,6 +613,13 @@ def __init__(self, mic, pipeline, *, writer: Writer | None = None):
pipeline: ImageProcessingPipeline instance.
writer: Storage backend. If None, Analyzer uses TiffWriter (default).
Pass an OmeZarrWriter for OME-Zarr output.

Note:
``run_experiment`` automatically stops any continuous sequence
acquisition (live mode) before starting MDA, and pumps the Qt
event loop while waiting on the MDA worker, so napari-mm's own
``preview`` layer keeps updating throughout the run without the
window freezing.
"""
self._mic = mic
self._pipeline = pipeline
Expand Down Expand Up @@ -836,6 +879,16 @@ def _validate_fov_positions(self, events):

def _run_mda_with_events(self, events, *, stim_mode):
"""Run the MDA event loop — shared by run/continue_experiment."""
# Live mode (continuous sequence acquisition) and MDA both drive the
# camera. If live is still running when the MDA's first snapImage
# fires, the snap buffer is consumed by the live-poll listener (in
# napari-micromanager: ``_core_link._image_snapped``) before the
# engine calls getImage, and the engine raises "Camera image buffer
# read failed". Stop it unconditionally before MDA starts.
mmc = getattr(self._mic, "mmc", None)
if mmc is not None and mmc.isSequenceRunning():
mmc.stopSequenceAcquisition()

self._mic.connect_frame(self._on_frame_ready)

# Set up event queue for extend_experiment support.
Expand Down Expand Up @@ -867,7 +920,7 @@ def _run_mda_with_events(self, events, *, stim_mode):
break

while self._queue.qsize() >= 3:
time.sleep(0.1)
self._pump_qt_and_sleep(0.05)
self._n_channels = len(rtm_event.channels)

# In "previous" mode at t=0 there is no predecessor
Expand Down Expand Up @@ -909,14 +962,55 @@ def _run_mda_with_events(self, events, *, stim_mode):
self._event_queue = None
self._queue.put(self.STOP_EVENT)
if mda_thread is not None:
mda_thread.join()
self._qt_join(mda_thread)
self._mic.disconnect_frame(self._on_frame_ready)

if self._fatal_error is not None:
fatal = self._fatal_error
self._fatal_error = None
raise fatal

# ------------------------------------------------------------------
# Qt-event-loop hygiene
# ------------------------------------------------------------------
#
# _run_mda_with_events runs on the main thread — the same thread napari
# paints from. Without explicit pumping, time.sleep / thread.join starve
# the Qt event loop, so napari freezes for the duration of the run and
# any main-thread-queued updates (e.g. napari-micromanager's preview
# layer refreshes) stay in the queue until the cell exits. Both helpers
# below fall back to plain blocking if Qt isn't loaded at all.

@staticmethod
def _pump_qt_and_sleep(dt: float) -> None:
try:
from qtpy.QtCore import QCoreApplication
except Exception:
time.sleep(dt)
return
app = QCoreApplication.instance()
if app is None:
time.sleep(dt)
return
app.processEvents()
time.sleep(dt)

@staticmethod
def _qt_join(thread: threading.Thread, poll_s: float = 0.05) -> None:
try:
from qtpy.QtCore import QCoreApplication
except Exception:
thread.join()
return
app = QCoreApplication.instance()
if app is None:
thread.join()
return
while thread.is_alive():
app.processEvents()
thread.join(timeout=poll_s)
app.processEvents()

# ------------------------------------------------------------------
# Frame handling
# ------------------------------------------------------------------
Expand Down