From 8dc1b240d120bb4cb9ef94d0a2a1cb2ad00e1c54 Mon Sep 17 00:00:00 2001 From: Armin Burgmeier Date: Wed, 11 Feb 2026 17:18:19 +0100 Subject: [PATCH 1/4] Add `nest` to nest tinyio loops This allows to isolate them from each other so that an exception in a nested group does not affect coroutines running outside. Make test with exceptions work Use isolate() instead of nest() as public API Explicit boolean check and add exception_group parameter Return exception instead of calling `cleanup` with it --- tests/test_isolate.py | 144 ++++++++++++++++++++++++++++++++++++++++++ tinyio/__init__.py | 1 + tinyio/_isolate.py | 115 +++++++++++++++++++++++++++++++++ 3 files changed, 260 insertions(+) create mode 100644 tests/test_isolate.py create mode 100644 tinyio/_isolate.py diff --git a/tests/test_isolate.py b/tests/test_isolate.py new file mode 100644 index 0000000..a7c4219 --- /dev/null +++ b/tests/test_isolate.py @@ -0,0 +1,144 @@ +from collections.abc import Callable + +import pytest +import tinyio + + +class SingleElementQueue: + def __init__(self): + self._event = tinyio.Event() + self._elem = None + + def put(self, x): + if self._elem is not None: + raise ValueError("Queue is full") + + self._elem = x + self._event.set() + + def get(self): + while self._elem is None: + yield self._event.wait() + x = self._elem + self._elem = None + return x + + +@pytest.mark.parametrize("isolate_g", (False, True)) +@pytest.mark.parametrize("isolate_h", (False, True)) +def test_isolate(isolate_g: bool, isolate_h: bool): + """Test that all coroutines make progress when some are isolated""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + # Intertwine two coroutines in such a way that they can only + # finish if both of them make progress at the same time, but + # not if one blocks until the other has completed. + def g() -> tinyio.Coro[int]: + q1.put(1) + x = yield q2.get() + q1.put(x + 1) + return (yield q2.get()) + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + q2.put(x + 1) + x = yield q1.get() + q2.put(x + 1) + return x + + def maybe_isolate(c: Callable[[], tinyio.Coro[int]], isolate: bool) -> tinyio.Coro[int]: + if isolate: + x, _ = yield tinyio.isolate(c) + return x + else: + return (yield c()) + + def f() -> tinyio.Coro[list[int]]: + return (yield [maybe_isolate(g, isolate_g), maybe_isolate(h, isolate_h)]) + + out = tinyio.Loop().run(f()) + assert out == [4, 3] + + +def test_isolate_with_error_in_inner_loop(): + """Test exceptions happening in the isolated loop. + + If an isolated coroutine raises an exception, all other coroutines within + the isolation are cancelled, but outer coroutines keep running.""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + q3 = SingleElementQueue() + + g_was_cancelled = True + i_was_cancelled = True + + def g() -> tinyio.Coro[int]: + nonlocal g_was_cancelled + q2.put(5) + yield q3.get() + g_was_cancelled = False + return 1 + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + y = yield q2.get() + if x == 5 and y == 5: + raise RuntimeError("Kaboom") + return x + y + + def i() -> tinyio.Coro[int]: + nonlocal i_was_cancelled + q1.put(5) + yield tinyio.sleep(1) + i_was_cancelled = False + return 2 + + def isolated() -> tinyio.Coro[list[int]]: + return (yield [h(), i()]) + + def try_isolated() -> tinyio.Coro[list[int]]: + x, success = yield tinyio.isolate(isolated) + if not success: + x = [-1, -1] + + q3.put(0) # wake up the "outer" loop g() + return x + + def f() -> tinyio.Coro[list[int]]: + return (yield [g(), try_isolated()]) + + assert tinyio.Loop().run(f()) == [1, [-1, -1]] + + assert not g_was_cancelled + assert i_was_cancelled + + +def test_isolate_with_args(): + """Test that isolate can be called with additional coroutines as arguments""" + + def slow_add_one(x: int) -> tinyio.Coro[int]: + yield + return x + 1 + + def unreliable_add_two(get_x: tinyio.Coro[int]) -> tinyio.Coro[int]: + x = yield get_x + if x == 3: + raise RuntimeError("That is too hard.") + else: + y = yield slow_add_one(x) + z = yield slow_add_one(y) + return z + + def try_add_three(x: int) -> tinyio.Coro[tuple[int, bool]]: + return (yield tinyio.isolate(unreliable_add_two, slow_add_one(x))) + + assert tinyio.Loop().run(try_add_three(0)) == (3, True) + assert tinyio.Loop().run(try_add_three(1)) == (4, True) + assert tinyio.Loop().run(try_add_three(3)) == (6, True) + assert tinyio.Loop().run(try_add_three(4)) == (7, True) + + result, success = tinyio.Loop().run(try_add_three(2)) + assert not success + assert type(result) is RuntimeError + assert str(result) == "That is too hard." diff --git a/tinyio/__init__.py b/tinyio/__init__.py index 0e6cddc..cc2d7c4 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -11,6 +11,7 @@ to_asyncio as to_asyncio, to_trio as to_trio, ) +from ._isolate import isolate as isolate from ._sync import Barrier as Barrier, Lock as Lock, Semaphore as Semaphore from ._thread import ThreadPool as ThreadPool, run_in_thread as run_in_thread from ._time import TimeoutError as TimeoutError, sleep as sleep, timeout as timeout diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py new file mode 100644 index 0000000..002d91f --- /dev/null +++ b/tinyio/_isolate.py @@ -0,0 +1,115 @@ +from collections.abc import Callable +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar + +import tinyio + + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_R = TypeVar("_R") + + +def _dupe(coro: tinyio.Coro[_T]) -> tuple[tinyio.Coro[None], tinyio.Coro[_T]]: + """Takes a coro assumed to be scheduled on an event loop, and returns: + + - a new coroutine that should be scheduled in the background of the same loop; + - a new coroutine that can be scheduled anywhere at all (typically a new loop), and + will return the same value as the original coroutine. + + Thus, this is a pipe through which two event loops can talk to one another. + """ + pipe = [] + done = tinyio.Event() + failed = tinyio.Event() + + def put_on_old_loop(): + try: + out = yield coro + except BaseException: + failed.set() + done.set() + raise + else: + pipe.append(out) + done.set() + + def put_on_new_loop(): + yield done.wait() + if failed.is_set(): + raise RuntimeError("Could not get input as underlying coroutine failed.") + else: + return pipe[0] + + return put_on_old_loop(), put_on_new_loop() + + +def _nest(coro: tinyio.Coro[_R], exception_group: None | bool = None) -> tinyio.Coro[_R]: + """Runs one tinyio event loop within another. + + The outer loop will be in control of the stepping. The inner loop will have a + separate collection of coroutines, which will be grouped and mutually shut down if + one of them produces an error. Thus, this provides a way to isolate a group of + coroutines within a broader collection. + """ + with tinyio.Loop().runtime(coro, exception_group) as gen: + while True: + try: + wait = next(gen) + except StopIteration as e: + return e.value + if wait is None: + yield + else: + yield tinyio.run_in_thread(wait) + + +def isolate( + fn: Callable[..., tinyio.Coro[_R]], + /, + *args: tinyio.Coro, + exception_group: None | bool = None, +) -> tinyio.Coro[tuple[_R | BaseException, bool]]: + """Runs a coroutine in an isolated event loop, and if it fails, returns the exception that occurred. + + **Arguments:** + + - `fn`: a function that returns a tinyio coroutine. Will be called as `fn(*args)` in order to get the coroutine to + run. All coroutines that it depends on must be passed as `*args` (so that communication can be established + between the two loops). + - `*args`: all coroutines that `fn` depends upon. + + **Returns:** + + A 2-tuple: + + - the first element is either the result of `fn(*args)` or an exception. + - whether `fn(*args)` succeeded or raised an exception. + """ + if len(args) > 0: + olds, news = zip(*map(_dupe, args), strict=True) + else: + olds, news = [], [] + yield set(olds) + try: + # This `yield from` is load bearing! We must not allow the tinyio event loop to + # interpose itself between the exception arising out of `fn(*news)`, and the + # current stack frame. Otherwise we would get a `CancelledError` here instead. + return (yield from _nest(fn(*news), exception_group=exception_group)), True + except BaseException as e: + return e, False + + +# Stand back, some typing hackery required. +if TYPE_CHECKING: + + def _fn_signature(*args: tinyio.Coro[_T], exception_group: None | bool = None): ... + + def _make_isolate( + fn: Callable[_P, Any], + ) -> Callable[ + Concatenate[Callable[_P, tinyio.Coro[_R]], _P], + tinyio.Coro[tuple[_R | BaseException, bool]], + ]: ... + + isolate = _make_isolate(_fn_signature) + del _fn_signature, _make_isolate From cfe755093f4bff9cbbb3d822ad6896020cc3c781 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Wed, 4 Mar 2026 21:41:16 +0100 Subject: [PATCH 2/4] updated design for `isolate` + added devdocs --- devdocs/isolate.md | 115 +++++++++++++++++++++++++ tests/test_core.py | 12 +++ tests/test_isolate.py | 51 +++++++++-- tinyio/__init__.py | 2 +- tinyio/_isolate.py | 192 ++++++++++++++++++++++++++---------------- 5 files changed, 291 insertions(+), 81 deletions(-) create mode 100644 devdocs/isolate.md diff --git a/devdocs/isolate.md b/devdocs/isolate.md new file mode 100644 index 0000000..902b3aa --- /dev/null +++ b/devdocs/isolate.md @@ -0,0 +1,115 @@ +# `tinyio.isolate` + +I went back-and-forth on various designs for `tinyio.isolate`. + +## Scheduling on the main loop vs scheduling on the isolated loop. + +The key issue is around the coroutines spawned by the isolated coroutine (and any coroutines that they transitively spawn, and so on...). Are these part of the isolated loop or not? That is, if they crash, should they only crash the isolated loop, or the whole loop? + +Note that we definitely need some means of placing things on the main loop (not just the isolated loop), as e.g. we may want an isolated coroutine to consume some input made available by the main loop... a dependency typically expressed as `yield some_coroutine`. + +In terms of what this means under-the-hood: +- if they are on the isolated loop then they are just normal coroutines scheduled there; +- if they are on the main loop then we can still access their results from within the isolated loop by proxying their results over in a new coroutine; e.g. the following allows you to create a new coroutine that yields the results of the first: + + ```python + # Usage: coro2 = yield copy(coro) whilst within the main loop. `coro2` is fresh and can be placed on a new loop. + + def copy(coro: tinyio.Coro[_T]) -> tinyio.Coro[tinyio.Coro[_T]]: + pipe = [] + done = tinyio.Event() + failed = tinyio.Event() + + def put_on_old_loop(): + try: + out = yield coro + except BaseException as e: + pipe.append(e) + failed.set() + done.set() + raise + else: + pipe.append(out) + done.set() + + def put_on_new_loop() -> tinyio.Coro[_T]: + yield done.wait() + if failed.is_set(): + raise pipe[0] + else: + return pipe[0] + + yield {put_on_old_loop()} + return put_on_new_loop() + ``` + +After some thought, I realised that this question of placement cannot be determined automatically. + +Here's a thought experiment: +```python +def foo(): + yield [bar, baz] + +def main(): + yield [isolate(foo()), some_other_coro] +``` +now let's suppose that `bar` crashes. Should `baz` also be crashed as well? If it is scheduled exclusively on the isolated loop, then yes. If it is scheduled on the main loop – or will be scheduled there in the future! – then no. And we have no way of telling which scenario we are in. + +## Points to recall + +1. a coroutine can be yielded in multiple places. Recall that `tinyio` allows you to yield previously-seen coroutines to get their result. In particular it is completely possible that the same coroutine will be yielded both inside and outside of the isolated region. + +2. because we are in an asynchronous context, then we cannot rely on the order in which coroutines are yielded. For example supposing the isolated coroutine runs `yield coro`, then we might naively have a check of the form + + ```python + if inspect.getgeneratorstate(coro) == inspect.GEN_CREATED: + # brand-new coroutine, include it in the isolate + else: + # existing coroutine, keep it outside + ``` + + but in practice this just introduces a race condition; what if the coroutine is *going* to be yielded in the main loop later, but hasn't yet? We'd get different behaviour depending on the order in which things are scheduled. + +In light of this, we might still imagine a design where fresh coroutines are stored on the isolated loop, until they appear on the main loop, at which point they transfer over (by some mechanism). This would also introduce an issue, however: what if some third coroutine within the isolated loop crashes? The order of that, relative to the transfer to the main loop, will determine whether the shared coroutine is also crashed. (Either not crashed on the main loop, or crashed within the isolated loop... which would then presumably leak out into crashing the main loop too, once it gets yielded there.) And in an async context, we can't make guarantees about the order in which things happen, so both are always possible. + +# Rejected design 1 + +API of `tinyio.isolate(coro)`, intercept all coroutines yielded by `coro`, automatically detect whether that coroutine has started using `inspect.getgeneratorstate`, and pick the loop appropriately. This does not work for the reasons described above. + +# Rejected design 2 + +API of `tinyio.isolate(coro)`, intercept all coroutines yielded by `coro`, and wrap all of them in a `tinyio.isolate` as well. (Plus unwrapping their results when sending.) This is essentially the maximalist answer to this choice by having the answer be 'never crash the other coroutines on the isolated loop, let them run to completion'. + +We reject this because it does not fit `tinyio`'s crash-all-coroutines model – instead we're basically just back in the bad old days of `asyncio`'s edge-triggered exceptions. This is not easy to reason about. + +# Rejected design 3 + +API of `tinyio.isolate(make_coro: Callable[..., tinyio.Coro], *args: tinyio.Coro)`, have `*args` be placed on the main loop, and assume/require that all coroutines directly yielded by `make_coro(*args)` are exclusively on the isolated loop. + +This API is based around the observation that major use-case for (?) the isolated-on-main dependence is in consuming arguments from coroutines on the main loop. + +This is 'fine', but suffers from (a) some API inconsistency with the rest of `tinyio` (which operates on coroutines directly, not functions-returning-coroutines), and (b) is not very explicit about the purpose of separating out those `*args`. A user may be surprised that `isolate(lambda: coro(x, y))` does not work equivalently to `isolate(coro, x, y)`. + +# Rejected design 4 + +API of `tinyio.isolate(coro: tinyio.Coro, put_on_isolated_loop: Callable[[tinyio.Coro], bool])`, and call `put_on_isolated_loop(x)` on each `yield x` that occurs within the isolated region (directly from `coro` or transitively), and let a user explicitly decide. + +This is also 'fine' but realistically almost all users are not going to be educated on the contents of this document, and may attempt to write out a rule (such as 'has the coroutine started') that are going to be flaky down the line. + +# Accepted design + +API of `tinyio.isolate(coro)`, and assume/require that all yielded coroutines from `coro` are exclusively on the isolated loop. + +In order to consume the result of arguments from the main loop, then also publicly expose `tinyio.copy` for creating a fresh coroutine to place on the isolated loop: +```python +x = yield tinyio.copy(x) +out = yield tinyio.isolate(my_coro(x)) +``` + +This suffers from the major issue that using this is easy to misuse: forgetting the `copy` will probably raise an error when `x` tries to be placed on both loops; placing the `copy` inside of `my_coro` will simply do nothing. + +Relative to design 3, this does have the advantage of making it clear that `x` is being copied. + +Still, this has probably the simplest semantics to explain to a user: it runs the provided coroutine in an isolated loop. If you want to bridge that isolation, use `tinyio.copy`. + +I still don't love this, and would be open to other designs. diff --git a/tests/test_core.py b/tests/test_core.py index 1033a91..834f8ca 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -569,3 +569,15 @@ def g(): with pytest.raises(RuntimeError, match="whilst the loop is currently running"): loop.run(f()) + + +def test_runtime_leave_without_finishing(): + def coro(): + yield + + def foo(): + with tinyio.Loop().runtime(coro(), exception_group=None) as gen: + return 3 + + with pytest.raises(RuntimeError, match="only partially completed"): + foo() diff --git a/tests/test_isolate.py b/tests/test_isolate.py index a7c4219..ca648a9 100644 --- a/tests/test_isolate.py +++ b/tests/test_isolate.py @@ -1,26 +1,31 @@ +import warnings from collections.abc import Callable import pytest import tinyio +_sentinel = object() + + class SingleElementQueue: def __init__(self): self._event = tinyio.Event() - self._elem = None + self._elem = _sentinel def put(self, x): - if self._elem is not None: + if self._elem is not _sentinel: raise ValueError("Queue is full") self._elem = x self._event.set() def get(self): - while self._elem is None: - yield self._event.wait() + yield self._event.wait() + self._event.clear() + assert self._elem is not _sentinel x = self._elem - self._elem = None + self._elem = _sentinel return x @@ -49,7 +54,7 @@ def h() -> tinyio.Coro[int]: def maybe_isolate(c: Callable[[], tinyio.Coro[int]], isolate: bool) -> tinyio.Coro[int]: if isolate: - x, _ = yield tinyio.isolate(c) + x, _ = yield tinyio.isolate(c()) return x else: return (yield c()) @@ -98,7 +103,7 @@ def isolated() -> tinyio.Coro[list[int]]: return (yield [h(), i()]) def try_isolated() -> tinyio.Coro[list[int]]: - x, success = yield tinyio.isolate(isolated) + x, success = yield tinyio.isolate(isolated()) if not success: x = [-1, -1] @@ -131,7 +136,8 @@ def unreliable_add_two(get_x: tinyio.Coro[int]) -> tinyio.Coro[int]: return z def try_add_three(x: int) -> tinyio.Coro[tuple[int, bool]]: - return (yield tinyio.isolate(unreliable_add_two, slow_add_one(x))) + arg = yield tinyio.copy(slow_add_one(x)) + return (yield tinyio.isolate(unreliable_add_two(arg))) assert tinyio.Loop().run(try_add_three(0)) == (3, True) assert tinyio.Loop().run(try_add_three(1)) == (4, True) @@ -142,3 +148,32 @@ def try_add_three(x: int) -> tinyio.Coro[tuple[int, bool]]: assert not success assert type(result) is RuntimeError assert str(result) == "That is too hard." + + +def test_copy_identity(): + def foo(): + yield + return object() + + def bar(): + f = foo() + g = yield tinyio.copy(f) + assert (yield f) is (yield g) + return 3 + + assert tinyio.Loop().run(bar()) == 3 + + +def test_isolate_respects_cancellation(): + def foo(): + while True: + yield + + def bar(): + yield {tinyio.isolate(foo())} + raise RuntimeError("Kaboom") + + with warnings.catch_warnings(): + warnings.simplefilter("error") + with pytest.raises(RuntimeError, match="Kaboom"): + tinyio.Loop().run(bar()) diff --git a/tinyio/__init__.py b/tinyio/__init__.py index cc2d7c4..e75b9f6 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -11,7 +11,7 @@ to_asyncio as to_asyncio, to_trio as to_trio, ) -from ._isolate import isolate as isolate +from ._isolate import copy as copy, isolate as isolate from ._sync import Barrier as Barrier, Lock as Lock, Semaphore as Semaphore from ._thread import ThreadPool as ThreadPool, run_in_thread as run_in_thread from ._time import TimeoutError as TimeoutError, sleep as sleep, timeout as timeout diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py index 002d91f..ca2d974 100644 --- a/tinyio/_isolate.py +++ b/tinyio/_isolate.py @@ -1,31 +1,57 @@ -from collections.abc import Callable -from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar +from typing import Literal, TypeVar -import tinyio +from ._core import Coro, Event, Loop +from ._thread import run_in_thread -_P = ParamSpec("_P") _T = TypeVar("_T") _R = TypeVar("_R") -def _dupe(coro: tinyio.Coro[_T]) -> tuple[tinyio.Coro[None], tinyio.Coro[_T]]: - """Takes a coro assumed to be scheduled on an event loop, and returns: +def copy(coro: Coro[_T]) -> Coro[Coro[_T]]: + """Schedules a coroutine, and returns a new coroutine that returns the same value. - - a new coroutine that should be scheduled in the background of the same loop; - - a new coroutine that can be scheduled anywhere at all (typically a new loop), and - will return the same value as the original coroutine. + Usage: + ```python + def your_coroutine(): + x1 = ... # some tinyio coroutine + x2 = yield tinyio.copy(x1) + # `x2` is a brand-new coroutine that returns the same value as `x`: + return1 = yield x1 + return2 = yield x2 + assert return1 is return2 + ``` + This works by scheduling the provided `coro` on the event loop, and then storing the output of the original + coroutine once it completes. - Thus, this is a pipe through which two event loops can talk to one another. + This function is useful primarily in conjunction with `tinyio.isolate`, to make information from the main loop + available within the isolated loop. + + (More generally this function makes it possible to combine multiple `tinyio.Loop`s. Each individual coroutine can + only be scheduled on a single loop, but this function makes it possible to create a fresh coroutine that produces + the value returned by the first.) + + **Arguments:** + + - `coro`: the coroutine to copy. + + **Returns:** + + A coroutine that returns a copy of `coro`: """ pipe = [] - done = tinyio.Event() - failed = tinyio.Event() + schedule = Event() + done = Event() + failed = Event() def put_on_old_loop(): + # Don't actually `yield coro` until `put_on_new_loop` has started. + # I don't know if that matters but it seems worth doing? + yield schedule.wait() try: out = yield coro - except BaseException: + except BaseException as e: + pipe.append(e) failed.set() done.set() raise @@ -33,83 +59,105 @@ def put_on_old_loop(): pipe.append(out) done.set() - def put_on_new_loop(): + def put_on_new_loop() -> Coro[_T]: + schedule.set() yield done.wait() if failed.is_set(): - raise RuntimeError("Could not get input as underlying coroutine failed.") + raise pipe[0] else: return pipe[0] - return put_on_old_loop(), put_on_new_loop() - - -def _nest(coro: tinyio.Coro[_R], exception_group: None | bool = None) -> tinyio.Coro[_R]: - """Runs one tinyio event loop within another. - - The outer loop will be in control of the stepping. The inner loop will have a - separate collection of coroutines, which will be grouped and mutually shut down if - one of them produces an error. Thus, this provides a way to isolate a group of - coroutines within a broader collection. - """ - with tinyio.Loop().runtime(coro, exception_group) as gen: - while True: - try: - wait = next(gen) - except StopIteration as e: - return e.value - if wait is None: - yield - else: - yield tinyio.run_in_thread(wait) + yield {put_on_old_loop()} + return put_on_new_loop() def isolate( - fn: Callable[..., tinyio.Coro[_R]], - /, - *args: tinyio.Coro, - exception_group: None | bool = None, -) -> tinyio.Coro[tuple[_R | BaseException, bool]]: - """Runs a coroutine in an isolated event loop, and if it fails, returns the exception that occurred. + coro: Coro[_R], /, exception_group: None | bool = None +) -> Coro[tuple[_R, Literal[True]] | tuple[BaseException, Literal[False]]]: + """Runs a coroutine in an isolated event loop, and if it (or any coroutines it yields) fails, then return the + exception that occurred. (Cancelling all coroutines it created... but not cancelling the rest of the coroutines on + the event loop.) + + Note that the coroutines it yields must all be *new* coroutines, and they cannot already have been seen by the event + loop. (Otherwise it would be ambiguous whether they are inside the isolated region or not.) If `coro` depends on + other coroutines, then `tinyio.copy` can be used to asychronously copy their results over. **Arguments:** - - `fn`: a function that returns a tinyio coroutine. Will be called as `fn(*args)` in order to get the coroutine to - run. All coroutines that it depends on must be passed as `*args` (so that communication can be established - between the two loops). - - `*args`: all coroutines that `fn` depends upon. + - `coro`: a tinyio coroutine. + - `exception_group`: as `tinyio.Loop().run(..., exception_group=...)`. **Returns:** A 2-tuple: - - the first element is either the result of `fn(*args)` or an exception. - - whether `fn(*args)` succeeded or raised an exception. - """ - if len(args) > 0: - olds, news = zip(*map(_dupe, args), strict=True) - else: - olds, news = [], [] - yield set(olds) - try: - # This `yield from` is load bearing! We must not allow the tinyio event loop to - # interpose itself between the exception arising out of `fn(*news)`, and the - # current stack frame. Otherwise we would get a `CancelledError` here instead. - return (yield from _nest(fn(*news), exception_group=exception_group)), True - except BaseException as e: - return e, False + - the first element is either the result of `fn(*args)`, or its exception. + - the second element is whether `fn(*args)` succeeded (`True`) or raised an exception (`False`). + + !!! Example + + Run a coroutine, and always perform cleanup even if an error was raised: + ```python + def get_request(): + conn = make_connection(): + out, success = yield tinyio.isolate(conn.say_hello()) + yield conn.say_goodbye() + if success: + return out + else: + raise out + ``` + + !!! Example + If another coroutine provides an output that must be consumed by the isolated coroutine, then it cannot be + yielded by the isolated coroutine. -# Stand back, some typing hackery required. -if TYPE_CHECKING: + ```python + # The following code is wrong! - def _fn_signature(*args: tinyio.Coro[_T], exception_group: None | bool = None): ... + def main(): + get_x = return_x() + # Schedule `get_x` outside of the isolated region... + yield get_x + yield tinyio.isolate(use_x(get_x)) - def _make_isolate( - fn: Callable[_P, Any], - ) -> Callable[ - Concatenate[Callable[_P, tinyio.Coro[_R]], _P], - tinyio.Coro[tuple[_R | BaseException, bool]], - ]: ... + def return_x(): + yield + return 3 - isolate = _make_isolate(_fn_signature) - del _fn_signature, _make_isolate + def use_x(get_x): + # ...and also schedule `get_x` inside of the isolated region! This is not possible. + x = yield get_x + ``` + + Instead, you need to make a fresh coroutine to use within the isolated region: + + ```python + def main(): + get_x = return_x() + yield get_x + get_x_copy = yield tinyio.copy(get_x) # this line is new + yield tinyio.isolate(use_x(get_x_copy)) + ``` + """ + + try: + with Loop().runtime(coro, exception_group) as gen: + while True: + try: + wait = next(gen) + except StopIteration as e: + return e.value, True + if wait is None: + yield + else: + yield run_in_thread(wait) + # Catch all `Exception`s except `AssertionError`, which IMO should really be a `BaseException` – it usually + # indicates a fatal exception because an expected invariant is not true. + except AssertionError: + raise + # Do *not* catch `BaseException` here: in particular `tinyio.CancellationError` must be allowed to propagate; also + # things like `SystemExit`/`KeyboardInterrupt` are probably best treated as fatal. + except Exception as e: + return e, False From 3bcb397dcb156a9fd80e51bae1605064989d65fe Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Wed, 4 Mar 2026 22:28:37 +0100 Subject: [PATCH 3/4] doc update, in particular including isolation --- README.md | 153 +++++++++++++++++++++++++----------------- tests/test_core.py | 2 +- tests/test_isolate.py | 12 +++- tinyio/_isolate.py | 15 ++--- 4 files changed, 108 insertions(+), 74 deletions(-) diff --git a/README.md b/README.md index 73b67b3..249f254 100644 --- a/README.md +++ b/README.md @@ -91,9 +91,9 @@ If any coroutine raises an error, then: This gives every coroutine a chance to shut down gracefully. Debuggers like [`patdb`](https://github.com/patrick-kidger/patdb) offer the ability to navigate across exceptions in an exception group, allowing you to inspect the state of all coroutines that were related to the error. -### Batteries-included +### Synchronisation -We ship batteries-included with the usual collection of standard operations. +We ship batteries-included with the usual collection of standard operations for synchronisation.
Click to expand @@ -181,6 +181,73 @@ tinyio.Lock tinyio.TimeoutError
+### Asynchronous context managers + +
Click to expand + +You can use the following pattern to implement context managers with asynchronous creation: + +```python +import contextlib +import tinyio + +def my_context_manager(x): + print("Initialising...") + yield tinyio.sleep(1) + print("Initialised") + return make_context_manager(x) + +@contextlib.contextmanager +def make_context_manager(x): + try: + yield x + finally: + print("Cleaning up") + +def my_coro(): + with (yield my_context_manager(x=5)) as val: + print(f"Got val {val}") + +tinyio.Loop().run(my_coro()) +``` + +This isn't anything fancier than just using a coroutine that returns a regular `with`-compatible context manager. See `tinyio.Semaphore` for an example of this pattern. + +
+ +### Asynchronous iterators + +
Click to expand + +You can use the following pattern to implement asychronous iterators: + +```python +import tinyio + +def slow_range(x): # this function is an iterator-of-coroutines +for i in range(x): + yield slow_range_i(i) # this `yield` statement is seen by the `for` loop + +def slow_range_i(i): # this function is a coroutine +yield tinyio.sleep(1) # this `yield` statement is seen by the `tinyio.Loop()` +return i + +def my_coro(): +for x in slow_range(5): + x = yield x + print(f"Got {x}") + +tinyio.Loop().run(my_coro()) +``` + +Here we just have `yield` being used in a couple of different ways that you're already used to: +- as a regular Python generator/iterator; +- as a `tinyio` coroutine. + +For an example of this, see `tinyio.as_completed`. + +
+ ### Integration with `asyncio` and `trio` We have support for putting `trio` event loops within `asyncio`/`trio` event loops, or vice-versa. @@ -275,71 +342,28 @@ tinyio.from_asyncio tinyio.from_trio -### Asynchronous context managers - -
Click to expand - -You can use the following pattern to implement context managers with asynchronous creation: - -```python -import contextlib -import tinyio - -def my_context_manager(x): - print("Initialising...") - yield tinyio.sleep(1) - print("Initialised") - return make_context_manager(x) - -@contextlib.contextmanager -def make_context_manager(x): - try: - yield x - finally: - print("Cleaning up") - -def my_coro(): - with (yield my_context_manager(x=5)) as val: - print(f"Got val {val}") - -tinyio.Loop().run(my_coro()) -``` - -This isn't anything fancier than just using a coroutine that returns a regular `with`-compatible context manager. See `tinyio.Semaphore` for an example of this pattern. - -
- -### Asynchronous iterators +### Advanced topic: isolating coroutines without crashing the others
Click to expand -You can use the following pattern to implement asychronous iterators: +If you would like to run a coroutine (and transitively, all the coroutines it `yield`s) without crashing the entire event loop, then you can use `tinyio.isolate`. This will run the coroutine in a nested `tinyio.Loop` – so that it crashing will only affect everything in that nested loop – and then return the result or the error. ```python -import tinyio - -def slow_range(x): # this function is an iterator-of-coroutines - for i in range(x): - yield slow_range_i(i) # this `yield` statement is seen by the `for` loop - -def slow_range_i(i): # this function is a coroutine - yield tinyio.sleep(1) # this `yield` statement is seen by the `tinyio.Loop()` - return i - -def my_coro(): - for x in slow_range(5): - x = yield x - print(f"Got {x}") - -tinyio.Loop().run(my_coro()) +def some_coroutine(x): + if x == 3: + raise ValueError("Oh no, a bug!") + yield + +def someone_elses_buggy_code(): + yield [some_coroutine(x=3), another_coroutine()] + +def main(): + app = someone_elses_buggy_code() + result_or_error, success = yield tinyio.isolate(app) + # at this point then `someone_elses_buggy_code`, `some_coroutine` and + #`another_coroutine` have been cancelled, but `main` is still running. ``` -Here we just have `yield` being used in a couple of different ways that you're already used to: -- as a regular Python generator/iterator; -- as a `tinyio` coroutine. - -For an example of this, see `tinyio.as_completed`. -
## FAQ @@ -359,7 +383,7 @@ You can distinguish it from a normal Python function by putting `if False: yield
-vs asyncio or trio?. +vs asyncio or trio?
I wasted a *lot* of time trying to get correct error propagation with `asyncio`, trying to reason whether my tasks would be cleaned up correctly or not (edge-triggered vs level-triggered etc etc). `trio` is excellent but still has a one-loop-per-thread rule, and doesn't propagate cancellations to/from threads. These points inspired me to try writing my own. @@ -371,8 +395,11 @@ I wasted a *lot* of time trying to get correct error propagation with `asyncio`, - simple+robust error semantics (crash the whole loop if anything goes wrong); - tiny, hackable, codebase. -However conversely, `tinyio` does not offer the ability to schedule work on the event loop whilst cleaning up from errors. +Conversely, at least right now we don't ship batteries-included with a few things: + +- asynchronously launching subprocesses / making network requests / accessing the file system (in all cases use `run_in_thread` instead); +- scheduling work on the event loop whilst cleaning up from errors. -If none of the bullet points are must-haves for you, or if needing the event loop during cleanup is a dealbreaker, then either `trio` or `asyncio` are likely to be better choices. :) +If none of the bullet points are must-haves for you, or if any of its limitations are dealbreakers, then either `trio` or `asyncio` are likely to be better choices. :)
diff --git a/tests/test_core.py b/tests/test_core.py index 834f8ca..e4c16c1 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -576,7 +576,7 @@ def coro(): yield def foo(): - with tinyio.Loop().runtime(coro(), exception_group=None) as gen: + with tinyio.Loop().runtime(coro(), exception_group=None): return 3 with pytest.raises(RuntimeError, match="only partially completed"): diff --git a/tests/test_isolate.py b/tests/test_isolate.py index ca648a9..a3ecba6 100644 --- a/tests/test_isolate.py +++ b/tests/test_isolate.py @@ -165,9 +165,16 @@ def bar(): def test_isolate_respects_cancellation(): + foo_cancelled = False + def foo(): - while True: - yield + nonlocal foo_cancelled + try: + while True: + yield + except tinyio.CancelledError: + foo_cancelled = True + raise def bar(): yield {tinyio.isolate(foo())} @@ -177,3 +184,4 @@ def bar(): warnings.simplefilter("error") with pytest.raises(RuntimeError, match="Kaboom"): tinyio.Loop().run(bar()) + assert foo_cancelled diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py index ca2d974..46b40f7 100644 --- a/tinyio/_isolate.py +++ b/tinyio/_isolate.py @@ -42,7 +42,6 @@ def your_coroutine(): pipe = [] schedule = Event() done = Event() - failed = Event() def put_on_old_loop(): # Don't actually `yield coro` until `put_on_new_loop` has started. @@ -51,21 +50,21 @@ def put_on_old_loop(): try: out = yield coro except BaseException as e: - pipe.append(e) - failed.set() - done.set() + pipe.append((e, False)) raise else: - pipe.append(out) + pipe.append((out, True)) + finally: done.set() def put_on_new_loop() -> Coro[_T]: schedule.set() yield done.wait() - if failed.is_set(): - raise pipe[0] + [[out, success]] = pipe + if success: + return out else: - return pipe[0] + raise out yield {put_on_old_loop()} return put_on_new_loop() From 265fb8a5dbdf73d408839371ffeba0da6c71fa86 Mon Sep 17 00:00:00 2001 From: Patrick Kidger <33688385+patrick-kidger@users.noreply.github.com> Date: Wed, 25 Feb 2026 13:04:27 +0100 Subject: [PATCH 4/4] 0.4.0 version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 68c1a71..f960bb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ name = "tinyio" readme = "README.md" requires-python = ">=3.11" urls = {repository = "https://github.com/patrick-kidger/tinyio"} -version = "0.3.0" +version = "0.4.0" [project.optional-dependencies] dev = ["pre-commit"]