Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions fspin/rate_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self, freq, is_coroutine, report=False, thread=True):
self.thread = thread
self.exceptions = []
self._own_loop = None
self._loop_thread = None

# Check if running async with high frequency based on OS
system = platform.system()
Expand Down Expand Up @@ -384,7 +385,7 @@ def start_spinning_sync(self, func, condition_fn, *args, **kwargs):
self.spin_sync(func, condition_fn, *args, **kwargs)
return None

async def start_spinning_async(self, func, condition_fn, *args, **kwargs):
def start_spinning_async(self, func, condition_fn, *args, **kwargs):
"""
Comment on lines +388 to 389

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid awaiting spin task in wrapper

start_spinning_async is now a synchronous function that immediately returns the spin task (lines 388-389), but start_spinning_async_wrapper still does task = await self.start_spinning_async(...) with wait=False on line 438. Because the awaited object is the long-running spin task, the wrapper now blocks (and can propagate CancelledError) instead of returning immediately unless wait=True. Any async caller using the wrapper with the default wait=False will hang until the spin loop ends, which is a regression from the prior fire-and-forget behavior.

Useful? React with 👍 / 👎.

Starts spinning asynchronously as an asyncio Task.

Expand All @@ -399,8 +400,24 @@ async def start_spinning_async(self, func, condition_fn, *args, **kwargs):
asyncio.Task: The created task.
"""
condition_fn = self._prepare_condition_fn(condition_fn, is_async=True)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop: use the auto-created loop if available
if self._own_loop is None:
raise

if not self._own_loop.is_running():
self._loop_thread = threading.Thread(target=self._own_loop.run_forever, daemon=True)
self._loop_thread.start()

future = asyncio.run_coroutine_threadsafe(
self.spin_async(func, condition_fn, *args, **kwargs), self._own_loop
)
self._task = asyncio.wrap_future(future, loop=self._own_loop)
else:
self._task = loop.create_task(self.spin_async(func, condition_fn, *args, **kwargs))

self._task = asyncio.create_task(self.spin_async(func, condition_fn, *args, **kwargs))
return self._task

async def start_spinning_async_wrapper(self, func, condition_fn=None, *, wait=False, **kwargs):
Expand Down Expand Up @@ -460,7 +477,10 @@ def stop_spinning(self):
"""
Signals the spinning loop to stop.
"""
self._stop_event.set()
if self.is_coroutine and self._own_loop is not None and self._own_loop.is_running():
self._own_loop.call_soon_threadsafe(self._stop_event.set)
else:
self._stop_event.set()
if self.is_coroutine:
if self._task:
self._task.cancel()
Expand All @@ -471,8 +491,13 @@ def stop_spinning(self):
if self._thread.is_alive() and current is not self._thread:
self._thread.join()
if self._own_loop is not None:
if self._own_loop.is_running():
self._own_loop.call_soon_threadsafe(self._own_loop.stop)
if self._loop_thread is not None:
self._loop_thread.join()
self._own_loop.close()
self._own_loop = None
self._loop_thread = None

def get_report(self, output=True):
"""
Expand Down
34 changes: 34 additions & 0 deletions tests/test_ratecontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,40 @@ def test_rate_control_with_own_loop():
if old_loop and not old_loop.is_closed():
asyncio.set_event_loop(old_loop)


def test_start_spinning_async_with_auto_loop_thread():
"""Ensure an auto-created event loop can schedule async spinning without a running loop."""

# Ensure there's no active loop to start with
try:
existing_loop = asyncio.get_event_loop()
except RuntimeError:
existing_loop = None

if existing_loop and not existing_loop.is_closed():
existing_loop.close()

rc = RateControl(freq=50, is_coroutine=True)
calls = []

async def awork():
calls.append(1)
await asyncio.sleep(0)

async def condition():
return len(calls) < 2

# Should not raise even though no loop is running
task = rc.start_spinning_async(awork, condition)

time.sleep(0.2)
rc.stop_spinning()

assert len(calls) > 0, "Async work should have been scheduled on the auto loop"
assert task is not None
if rc._loop_thread is not None:
assert not rc._loop_thread.is_alive(), "Loop thread should be stopped after cleanup"

@pytest.mark.asyncio
async def test_async_spin_with_cancelled_error():
"""Test async spin with a CancelledError to cover that branch."""
Expand Down
Loading