diff --git a/fspin/rate_control.py b/fspin/rate_control.py index fa7dfc1..5515688 100644 --- a/fspin/rate_control.py +++ b/fspin/rate_control.py @@ -68,6 +68,7 @@ def __init__(self, freq, is_coroutine, report=False, thread=True): self.thread = thread self.exceptions = [] self._own_loop = None + self._loop_thread = None # Check if running async with high frequency based on OS system = platform.system() @@ -384,7 +385,7 @@ def start_spinning_sync(self, func, condition_fn, *args, **kwargs): self.spin_sync(func, condition_fn, *args, **kwargs) return None - async def start_spinning_async(self, func, condition_fn, *args, **kwargs): + def start_spinning_async(self, func, condition_fn, *args, **kwargs): """ Starts spinning asynchronously as an asyncio Task. @@ -399,8 +400,24 @@ async def start_spinning_async(self, func, condition_fn, *args, **kwargs): asyncio.Task: The created task. """ condition_fn = self._prepare_condition_fn(condition_fn, is_async=True) + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop: use the auto-created loop if available + if self._own_loop is None: + raise + + if not self._own_loop.is_running(): + self._loop_thread = threading.Thread(target=self._own_loop.run_forever, daemon=True) + self._loop_thread.start() + + future = asyncio.run_coroutine_threadsafe( + self.spin_async(func, condition_fn, *args, **kwargs), self._own_loop + ) + self._task = asyncio.wrap_future(future, loop=self._own_loop) + else: + self._task = loop.create_task(self.spin_async(func, condition_fn, *args, **kwargs)) - self._task = asyncio.create_task(self.spin_async(func, condition_fn, *args, **kwargs)) return self._task async def start_spinning_async_wrapper(self, func, condition_fn=None, *, wait=False, **kwargs): @@ -460,7 +477,10 @@ def stop_spinning(self): """ Signals the spinning loop to stop. """ - self._stop_event.set() + if self.is_coroutine and self._own_loop is not None and self._own_loop.is_running(): + self._own_loop.call_soon_threadsafe(self._stop_event.set) + else: + self._stop_event.set() if self.is_coroutine: if self._task: self._task.cancel() @@ -471,8 +491,13 @@ def stop_spinning(self): if self._thread.is_alive() and current is not self._thread: self._thread.join() if self._own_loop is not None: + if self._own_loop.is_running(): + self._own_loop.call_soon_threadsafe(self._own_loop.stop) + if self._loop_thread is not None: + self._loop_thread.join() self._own_loop.close() self._own_loop = None + self._loop_thread = None def get_report(self, output=True): """ diff --git a/tests/test_ratecontrol.py b/tests/test_ratecontrol.py index b6e6698..6b96b36 100644 --- a/tests/test_ratecontrol.py +++ b/tests/test_ratecontrol.py @@ -563,6 +563,40 @@ def test_rate_control_with_own_loop(): if old_loop and not old_loop.is_closed(): asyncio.set_event_loop(old_loop) + +def test_start_spinning_async_with_auto_loop_thread(): + """Ensure an auto-created event loop can schedule async spinning without a running loop.""" + + # Ensure there's no active loop to start with + try: + existing_loop = asyncio.get_event_loop() + except RuntimeError: + existing_loop = None + + if existing_loop and not existing_loop.is_closed(): + existing_loop.close() + + rc = RateControl(freq=50, is_coroutine=True) + calls = [] + + async def awork(): + calls.append(1) + await asyncio.sleep(0) + + async def condition(): + return len(calls) < 2 + + # Should not raise even though no loop is running + task = rc.start_spinning_async(awork, condition) + + time.sleep(0.2) + rc.stop_spinning() + + assert len(calls) > 0, "Async work should have been scheduled on the auto loop" + assert task is not None + if rc._loop_thread is not None: + assert not rc._loop_thread.is_alive(), "Loop thread should be stopped after cleanup" + @pytest.mark.asyncio async def test_async_spin_with_cancelled_error(): """Test async spin with a CancelledError to cover that branch."""