diff --git a/README.md b/README.md index 73b67b3..249f254 100644 --- a/README.md +++ b/README.md @@ -91,9 +91,9 @@ If any coroutine raises an error, then: This gives every coroutine a chance to shut down gracefully. Debuggers like [`patdb`](https://github.com/patrick-kidger/patdb) offer the ability to navigate across exceptions in an exception group, allowing you to inspect the state of all coroutines that were related to the error. -### Batteries-included +### Synchronisation -We ship batteries-included with the usual collection of standard operations. +We ship batteries-included with the usual collection of standard operations for synchronisation.
Click to expand @@ -181,6 +181,73 @@ tinyio.Lock tinyio.TimeoutError
+### Asynchronous context managers + +
Click to expand + +You can use the following pattern to implement context managers with asynchronous creation: + +```python +import contextlib +import tinyio + +def my_context_manager(x): + print("Initialising...") + yield tinyio.sleep(1) + print("Initialised") + return make_context_manager(x) + +@contextlib.contextmanager +def make_context_manager(x): + try: + yield x + finally: + print("Cleaning up") + +def my_coro(): + with (yield my_context_manager(x=5)) as val: + print(f"Got val {val}") + +tinyio.Loop().run(my_coro()) +``` + +This isn't anything fancier than just using a coroutine that returns a regular `with`-compatible context manager. See `tinyio.Semaphore` for an example of this pattern. + +
+ +### Asynchronous iterators + +
Click to expand + +You can use the following pattern to implement asychronous iterators: + +```python +import tinyio + +def slow_range(x): # this function is an iterator-of-coroutines +for i in range(x): + yield slow_range_i(i) # this `yield` statement is seen by the `for` loop + +def slow_range_i(i): # this function is a coroutine +yield tinyio.sleep(1) # this `yield` statement is seen by the `tinyio.Loop()` +return i + +def my_coro(): +for x in slow_range(5): + x = yield x + print(f"Got {x}") + +tinyio.Loop().run(my_coro()) +``` + +Here we just have `yield` being used in a couple of different ways that you're already used to: +- as a regular Python generator/iterator; +- as a `tinyio` coroutine. + +For an example of this, see `tinyio.as_completed`. + +
+ ### Integration with `asyncio` and `trio` We have support for putting `trio` event loops within `asyncio`/`trio` event loops, or vice-versa. @@ -275,71 +342,28 @@ tinyio.from_asyncio tinyio.from_trio -### Asynchronous context managers - -
Click to expand - -You can use the following pattern to implement context managers with asynchronous creation: - -```python -import contextlib -import tinyio - -def my_context_manager(x): - print("Initialising...") - yield tinyio.sleep(1) - print("Initialised") - return make_context_manager(x) - -@contextlib.contextmanager -def make_context_manager(x): - try: - yield x - finally: - print("Cleaning up") - -def my_coro(): - with (yield my_context_manager(x=5)) as val: - print(f"Got val {val}") - -tinyio.Loop().run(my_coro()) -``` - -This isn't anything fancier than just using a coroutine that returns a regular `with`-compatible context manager. See `tinyio.Semaphore` for an example of this pattern. - -
- -### Asynchronous iterators +### Advanced topic: isolating coroutines without crashing the others
Click to expand -You can use the following pattern to implement asychronous iterators: +If you would like to run a coroutine (and transitively, all the coroutines it `yield`s) without crashing the entire event loop, then you can use `tinyio.isolate`. This will run the coroutine in a nested `tinyio.Loop` – so that it crashing will only affect everything in that nested loop – and then return the result or the error. ```python -import tinyio - -def slow_range(x): # this function is an iterator-of-coroutines - for i in range(x): - yield slow_range_i(i) # this `yield` statement is seen by the `for` loop - -def slow_range_i(i): # this function is a coroutine - yield tinyio.sleep(1) # this `yield` statement is seen by the `tinyio.Loop()` - return i - -def my_coro(): - for x in slow_range(5): - x = yield x - print(f"Got {x}") - -tinyio.Loop().run(my_coro()) +def some_coroutine(x): + if x == 3: + raise ValueError("Oh no, a bug!") + yield + +def someone_elses_buggy_code(): + yield [some_coroutine(x=3), another_coroutine()] + +def main(): + app = someone_elses_buggy_code() + result_or_error, success = yield tinyio.isolate(app) + # at this point then `someone_elses_buggy_code`, `some_coroutine` and + #`another_coroutine` have been cancelled, but `main` is still running. ``` -Here we just have `yield` being used in a couple of different ways that you're already used to: -- as a regular Python generator/iterator; -- as a `tinyio` coroutine. - -For an example of this, see `tinyio.as_completed`. -
## FAQ @@ -359,7 +383,7 @@ You can distinguish it from a normal Python function by putting `if False: yield
-vs asyncio or trio?. +vs asyncio or trio?
I wasted a *lot* of time trying to get correct error propagation with `asyncio`, trying to reason whether my tasks would be cleaned up correctly or not (edge-triggered vs level-triggered etc etc). `trio` is excellent but still has a one-loop-per-thread rule, and doesn't propagate cancellations to/from threads. These points inspired me to try writing my own. @@ -371,8 +395,11 @@ I wasted a *lot* of time trying to get correct error propagation with `asyncio`, - simple+robust error semantics (crash the whole loop if anything goes wrong); - tiny, hackable, codebase. -However conversely, `tinyio` does not offer the ability to schedule work on the event loop whilst cleaning up from errors. +Conversely, at least right now we don't ship batteries-included with a few things: + +- asynchronously launching subprocesses / making network requests / accessing the file system (in all cases use `run_in_thread` instead); +- scheduling work on the event loop whilst cleaning up from errors. -If none of the bullet points are must-haves for you, or if needing the event loop during cleanup is a dealbreaker, then either `trio` or `asyncio` are likely to be better choices. :) +If none of the bullet points are must-haves for you, or if any of its limitations are dealbreakers, then either `trio` or `asyncio` are likely to be better choices. :)
diff --git a/devdocs/isolate.md b/devdocs/isolate.md new file mode 100644 index 0000000..902b3aa --- /dev/null +++ b/devdocs/isolate.md @@ -0,0 +1,115 @@ +# `tinyio.isolate` + +I went back-and-forth on various designs for `tinyio.isolate`. + +## Scheduling on the main loop vs scheduling on the isolated loop. + +The key issue is around the coroutines spawned by the isolated coroutine (and any coroutines that they transitively spawn, and so on...). Are these part of the isolated loop or not? That is, if they crash, should they only crash the isolated loop, or the whole loop? + +Note that we definitely need some means of placing things on the main loop (not just the isolated loop), as e.g. we may want an isolated coroutine to consume some input made available by the main loop... a dependency typically expressed as `yield some_coroutine`. + +In terms of what this means under-the-hood: +- if they are on the isolated loop then they are just normal coroutines scheduled there; +- if they are on the main loop then we can still access their results from within the isolated loop by proxying their results over in a new coroutine; e.g. the following allows you to create a new coroutine that yields the results of the first: + + ```python + # Usage: coro2 = yield copy(coro) whilst within the main loop. `coro2` is fresh and can be placed on a new loop. + + def copy(coro: tinyio.Coro[_T]) -> tinyio.Coro[tinyio.Coro[_T]]: + pipe = [] + done = tinyio.Event() + failed = tinyio.Event() + + def put_on_old_loop(): + try: + out = yield coro + except BaseException as e: + pipe.append(e) + failed.set() + done.set() + raise + else: + pipe.append(out) + done.set() + + def put_on_new_loop() -> tinyio.Coro[_T]: + yield done.wait() + if failed.is_set(): + raise pipe[0] + else: + return pipe[0] + + yield {put_on_old_loop()} + return put_on_new_loop() + ``` + +After some thought, I realised that this question of placement cannot be determined automatically. + +Here's a thought experiment: +```python +def foo(): + yield [bar, baz] + +def main(): + yield [isolate(foo()), some_other_coro] +``` +now let's suppose that `bar` crashes. Should `baz` also be crashed as well? If it is scheduled exclusively on the isolated loop, then yes. If it is scheduled on the main loop – or will be scheduled there in the future! – then no. And we have no way of telling which scenario we are in. + +## Points to recall + +1. a coroutine can be yielded in multiple places. Recall that `tinyio` allows you to yield previously-seen coroutines to get their result. In particular it is completely possible that the same coroutine will be yielded both inside and outside of the isolated region. + +2. because we are in an asynchronous context, then we cannot rely on the order in which coroutines are yielded. For example supposing the isolated coroutine runs `yield coro`, then we might naively have a check of the form + + ```python + if inspect.getgeneratorstate(coro) == inspect.GEN_CREATED: + # brand-new coroutine, include it in the isolate + else: + # existing coroutine, keep it outside + ``` + + but in practice this just introduces a race condition; what if the coroutine is *going* to be yielded in the main loop later, but hasn't yet? We'd get different behaviour depending on the order in which things are scheduled. + +In light of this, we might still imagine a design where fresh coroutines are stored on the isolated loop, until they appear on the main loop, at which point they transfer over (by some mechanism). This would also introduce an issue, however: what if some third coroutine within the isolated loop crashes? The order of that, relative to the transfer to the main loop, will determine whether the shared coroutine is also crashed. (Either not crashed on the main loop, or crashed within the isolated loop... which would then presumably leak out into crashing the main loop too, once it gets yielded there.) And in an async context, we can't make guarantees about the order in which things happen, so both are always possible. + +# Rejected design 1 + +API of `tinyio.isolate(coro)`, intercept all coroutines yielded by `coro`, automatically detect whether that coroutine has started using `inspect.getgeneratorstate`, and pick the loop appropriately. This does not work for the reasons described above. + +# Rejected design 2 + +API of `tinyio.isolate(coro)`, intercept all coroutines yielded by `coro`, and wrap all of them in a `tinyio.isolate` as well. (Plus unwrapping their results when sending.) This is essentially the maximalist answer to this choice by having the answer be 'never crash the other coroutines on the isolated loop, let them run to completion'. + +We reject this because it does not fit `tinyio`'s crash-all-coroutines model – instead we're basically just back in the bad old days of `asyncio`'s edge-triggered exceptions. This is not easy to reason about. + +# Rejected design 3 + +API of `tinyio.isolate(make_coro: Callable[..., tinyio.Coro], *args: tinyio.Coro)`, have `*args` be placed on the main loop, and assume/require that all coroutines directly yielded by `make_coro(*args)` are exclusively on the isolated loop. + +This API is based around the observation that major use-case for (?) the isolated-on-main dependence is in consuming arguments from coroutines on the main loop. + +This is 'fine', but suffers from (a) some API inconsistency with the rest of `tinyio` (which operates on coroutines directly, not functions-returning-coroutines), and (b) is not very explicit about the purpose of separating out those `*args`. A user may be surprised that `isolate(lambda: coro(x, y))` does not work equivalently to `isolate(coro, x, y)`. + +# Rejected design 4 + +API of `tinyio.isolate(coro: tinyio.Coro, put_on_isolated_loop: Callable[[tinyio.Coro], bool])`, and call `put_on_isolated_loop(x)` on each `yield x` that occurs within the isolated region (directly from `coro` or transitively), and let a user explicitly decide. + +This is also 'fine' but realistically almost all users are not going to be educated on the contents of this document, and may attempt to write out a rule (such as 'has the coroutine started') that are going to be flaky down the line. + +# Accepted design + +API of `tinyio.isolate(coro)`, and assume/require that all yielded coroutines from `coro` are exclusively on the isolated loop. + +In order to consume the result of arguments from the main loop, then also publicly expose `tinyio.copy` for creating a fresh coroutine to place on the isolated loop: +```python +x = yield tinyio.copy(x) +out = yield tinyio.isolate(my_coro(x)) +``` + +This suffers from the major issue that using this is easy to misuse: forgetting the `copy` will probably raise an error when `x` tries to be placed on both loops; placing the `copy` inside of `my_coro` will simply do nothing. + +Relative to design 3, this does have the advantage of making it clear that `x` is being copied. + +Still, this has probably the simplest semantics to explain to a user: it runs the provided coroutine in an isolated loop. If you want to bridge that isolation, use `tinyio.copy`. + +I still don't love this, and would be open to other designs. diff --git a/pyproject.toml b/pyproject.toml index 68c1a71..f960bb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ name = "tinyio" readme = "README.md" requires-python = ">=3.11" urls = {repository = "https://github.com/patrick-kidger/tinyio"} -version = "0.3.0" +version = "0.4.0" [project.optional-dependencies] dev = ["pre-commit"] diff --git a/tests/test_core.py b/tests/test_core.py index 1033a91..e4c16c1 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -569,3 +569,15 @@ def g(): with pytest.raises(RuntimeError, match="whilst the loop is currently running"): loop.run(f()) + + +def test_runtime_leave_without_finishing(): + def coro(): + yield + + def foo(): + with tinyio.Loop().runtime(coro(), exception_group=None): + return 3 + + with pytest.raises(RuntimeError, match="only partially completed"): + foo() diff --git a/tests/test_isolate.py b/tests/test_isolate.py new file mode 100644 index 0000000..a3ecba6 --- /dev/null +++ b/tests/test_isolate.py @@ -0,0 +1,187 @@ +import warnings +from collections.abc import Callable + +import pytest +import tinyio + + +_sentinel = object() + + +class SingleElementQueue: + def __init__(self): + self._event = tinyio.Event() + self._elem = _sentinel + + def put(self, x): + if self._elem is not _sentinel: + raise ValueError("Queue is full") + + self._elem = x + self._event.set() + + def get(self): + yield self._event.wait() + self._event.clear() + assert self._elem is not _sentinel + x = self._elem + self._elem = _sentinel + return x + + +@pytest.mark.parametrize("isolate_g", (False, True)) +@pytest.mark.parametrize("isolate_h", (False, True)) +def test_isolate(isolate_g: bool, isolate_h: bool): + """Test that all coroutines make progress when some are isolated""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + # Intertwine two coroutines in such a way that they can only + # finish if both of them make progress at the same time, but + # not if one blocks until the other has completed. + def g() -> tinyio.Coro[int]: + q1.put(1) + x = yield q2.get() + q1.put(x + 1) + return (yield q2.get()) + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + q2.put(x + 1) + x = yield q1.get() + q2.put(x + 1) + return x + + def maybe_isolate(c: Callable[[], tinyio.Coro[int]], isolate: bool) -> tinyio.Coro[int]: + if isolate: + x, _ = yield tinyio.isolate(c()) + return x + else: + return (yield c()) + + def f() -> tinyio.Coro[list[int]]: + return (yield [maybe_isolate(g, isolate_g), maybe_isolate(h, isolate_h)]) + + out = tinyio.Loop().run(f()) + assert out == [4, 3] + + +def test_isolate_with_error_in_inner_loop(): + """Test exceptions happening in the isolated loop. + + If an isolated coroutine raises an exception, all other coroutines within + the isolation are cancelled, but outer coroutines keep running.""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + q3 = SingleElementQueue() + + g_was_cancelled = True + i_was_cancelled = True + + def g() -> tinyio.Coro[int]: + nonlocal g_was_cancelled + q2.put(5) + yield q3.get() + g_was_cancelled = False + return 1 + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + y = yield q2.get() + if x == 5 and y == 5: + raise RuntimeError("Kaboom") + return x + y + + def i() -> tinyio.Coro[int]: + nonlocal i_was_cancelled + q1.put(5) + yield tinyio.sleep(1) + i_was_cancelled = False + return 2 + + def isolated() -> tinyio.Coro[list[int]]: + return (yield [h(), i()]) + + def try_isolated() -> tinyio.Coro[list[int]]: + x, success = yield tinyio.isolate(isolated()) + if not success: + x = [-1, -1] + + q3.put(0) # wake up the "outer" loop g() + return x + + def f() -> tinyio.Coro[list[int]]: + return (yield [g(), try_isolated()]) + + assert tinyio.Loop().run(f()) == [1, [-1, -1]] + + assert not g_was_cancelled + assert i_was_cancelled + + +def test_isolate_with_args(): + """Test that isolate can be called with additional coroutines as arguments""" + + def slow_add_one(x: int) -> tinyio.Coro[int]: + yield + return x + 1 + + def unreliable_add_two(get_x: tinyio.Coro[int]) -> tinyio.Coro[int]: + x = yield get_x + if x == 3: + raise RuntimeError("That is too hard.") + else: + y = yield slow_add_one(x) + z = yield slow_add_one(y) + return z + + def try_add_three(x: int) -> tinyio.Coro[tuple[int, bool]]: + arg = yield tinyio.copy(slow_add_one(x)) + return (yield tinyio.isolate(unreliable_add_two(arg))) + + assert tinyio.Loop().run(try_add_three(0)) == (3, True) + assert tinyio.Loop().run(try_add_three(1)) == (4, True) + assert tinyio.Loop().run(try_add_three(3)) == (6, True) + assert tinyio.Loop().run(try_add_three(4)) == (7, True) + + result, success = tinyio.Loop().run(try_add_three(2)) + assert not success + assert type(result) is RuntimeError + assert str(result) == "That is too hard." + + +def test_copy_identity(): + def foo(): + yield + return object() + + def bar(): + f = foo() + g = yield tinyio.copy(f) + assert (yield f) is (yield g) + return 3 + + assert tinyio.Loop().run(bar()) == 3 + + +def test_isolate_respects_cancellation(): + foo_cancelled = False + + def foo(): + nonlocal foo_cancelled + try: + while True: + yield + except tinyio.CancelledError: + foo_cancelled = True + raise + + def bar(): + yield {tinyio.isolate(foo())} + raise RuntimeError("Kaboom") + + with warnings.catch_warnings(): + warnings.simplefilter("error") + with pytest.raises(RuntimeError, match="Kaboom"): + tinyio.Loop().run(bar()) + assert foo_cancelled diff --git a/tinyio/__init__.py b/tinyio/__init__.py index 0e6cddc..e75b9f6 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -11,6 +11,7 @@ to_asyncio as to_asyncio, to_trio as to_trio, ) +from ._isolate import copy as copy, isolate as isolate from ._sync import Barrier as Barrier, Lock as Lock, Semaphore as Semaphore from ._thread import ThreadPool as ThreadPool, run_in_thread as run_in_thread from ._time import TimeoutError as TimeoutError, sleep as sleep, timeout as timeout diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py new file mode 100644 index 0000000..46b40f7 --- /dev/null +++ b/tinyio/_isolate.py @@ -0,0 +1,162 @@ +from typing import Literal, TypeVar + +from ._core import Coro, Event, Loop +from ._thread import run_in_thread + + +_T = TypeVar("_T") +_R = TypeVar("_R") + + +def copy(coro: Coro[_T]) -> Coro[Coro[_T]]: + """Schedules a coroutine, and returns a new coroutine that returns the same value. + + Usage: + ```python + def your_coroutine(): + x1 = ... # some tinyio coroutine + x2 = yield tinyio.copy(x1) + # `x2` is a brand-new coroutine that returns the same value as `x`: + return1 = yield x1 + return2 = yield x2 + assert return1 is return2 + ``` + This works by scheduling the provided `coro` on the event loop, and then storing the output of the original + coroutine once it completes. + + This function is useful primarily in conjunction with `tinyio.isolate`, to make information from the main loop + available within the isolated loop. + + (More generally this function makes it possible to combine multiple `tinyio.Loop`s. Each individual coroutine can + only be scheduled on a single loop, but this function makes it possible to create a fresh coroutine that produces + the value returned by the first.) + + **Arguments:** + + - `coro`: the coroutine to copy. + + **Returns:** + + A coroutine that returns a copy of `coro`: + """ + pipe = [] + schedule = Event() + done = Event() + + def put_on_old_loop(): + # Don't actually `yield coro` until `put_on_new_loop` has started. + # I don't know if that matters but it seems worth doing? + yield schedule.wait() + try: + out = yield coro + except BaseException as e: + pipe.append((e, False)) + raise + else: + pipe.append((out, True)) + finally: + done.set() + + def put_on_new_loop() -> Coro[_T]: + schedule.set() + yield done.wait() + [[out, success]] = pipe + if success: + return out + else: + raise out + + yield {put_on_old_loop()} + return put_on_new_loop() + + +def isolate( + coro: Coro[_R], /, exception_group: None | bool = None +) -> Coro[tuple[_R, Literal[True]] | tuple[BaseException, Literal[False]]]: + """Runs a coroutine in an isolated event loop, and if it (or any coroutines it yields) fails, then return the + exception that occurred. (Cancelling all coroutines it created... but not cancelling the rest of the coroutines on + the event loop.) + + Note that the coroutines it yields must all be *new* coroutines, and they cannot already have been seen by the event + loop. (Otherwise it would be ambiguous whether they are inside the isolated region or not.) If `coro` depends on + other coroutines, then `tinyio.copy` can be used to asychronously copy their results over. + + **Arguments:** + + - `coro`: a tinyio coroutine. + - `exception_group`: as `tinyio.Loop().run(..., exception_group=...)`. + + **Returns:** + + A 2-tuple: + + - the first element is either the result of `fn(*args)`, or its exception. + - the second element is whether `fn(*args)` succeeded (`True`) or raised an exception (`False`). + + !!! Example + + Run a coroutine, and always perform cleanup even if an error was raised: + ```python + def get_request(): + conn = make_connection(): + out, success = yield tinyio.isolate(conn.say_hello()) + yield conn.say_goodbye() + if success: + return out + else: + raise out + ``` + + !!! Example + + If another coroutine provides an output that must be consumed by the isolated coroutine, then it cannot be + yielded by the isolated coroutine. + + ```python + # The following code is wrong! + + def main(): + get_x = return_x() + # Schedule `get_x` outside of the isolated region... + yield get_x + yield tinyio.isolate(use_x(get_x)) + + def return_x(): + yield + return 3 + + def use_x(get_x): + # ...and also schedule `get_x` inside of the isolated region! This is not possible. + x = yield get_x + ``` + + Instead, you need to make a fresh coroutine to use within the isolated region: + + ```python + def main(): + get_x = return_x() + yield get_x + get_x_copy = yield tinyio.copy(get_x) # this line is new + yield tinyio.isolate(use_x(get_x_copy)) + ``` + """ + + try: + with Loop().runtime(coro, exception_group) as gen: + while True: + try: + wait = next(gen) + except StopIteration as e: + return e.value, True + if wait is None: + yield + else: + yield run_in_thread(wait) + # Catch all `Exception`s except `AssertionError`, which IMO should really be a `BaseException` – it usually + # indicates a fatal exception because an expected invariant is not true. + except AssertionError: + raise + # Do *not* catch `BaseException` here: in particular `tinyio.CancellationError` must be allowed to propagate; also + # things like `SystemExit`/`KeyboardInterrupt` are probably best treated as fatal. + except Exception as e: + return e, False