Skip to content

Commit b6de1e1

Browse files
committed
new features: spawn, spawn_n; map, map_n, itermap; usage code instead of proper doc for now
1 parent 24518ca commit b6de1e1

File tree

14 files changed

+496
-203
lines changed

14 files changed

+496
-203
lines changed

README.md

Lines changed: 105 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,124 @@
11
# asyncio-pool
22

3-
TODO: cancelled, timeouts, callbacks, features, tests, readme
3+
###### Supports python 3.5+ (including PyPy 6+, which is also 3.5)
44

5-
Example (more in `tests/` and `examples/` dirs): # TODO
5+
6+
## Usage
7+
(more in `tests/` and `examples/` dirs): # TODO
68

79
```python
8-
import asyncio as aio
9-
from asyncio_pool import AioPool
10+
async def worker(n): # dummy worker
11+
await aio.sleep(1 / n)
12+
return n
1013

1114

12-
async def worker(n):
13-
await aio.sleep(1 / n)
15+
async def spawn_n_usage(todo=[range(1,51), range(51,101), range(101,200)]):
16+
futures = []
17+
async with AioPool(size=20) as pool:
18+
for tasks in todo:
19+
for i in tasks: # too many tasks
20+
# Returns quickly for all tasks, does not wait for pool space.
21+
# Workers are not spawned, they wait for pool space in their
22+
# own background tasks.
23+
fut = await pool.spawn_n(worker(i))
24+
futures.append(fut)
25+
# At this point not a single worker should start.
26+
27+
# Context manager calls `join` at exit, so this will finish when all
28+
# workers return, crash or cancelled.
29+
30+
assert sum(itertools.chain.from_iterable(todo)) == \
31+
sum(f.result() for f in futures)
1432

1533

16-
async def run_in_pool():
34+
async def spawn_usage(todo=range(1,4)):
35+
futures = []
36+
async with AioPool(size=2) as pool:
37+
for i in todo: # 1, 2, 3
38+
# Returns quickly for 1 and 2, then waits for empty space for 3,
39+
# spawns 3 and returns. Can save some resources I guess.
40+
fut = await pool.spawn(worker(i))
41+
futures.append(fut)
42+
# At this point some of the workers already started.
1743

18-
async with AioPool(size=10) as pool: # no more than 10 concurrent coroutines
19-
results = await pool.map(worker, range(1, 100))
44+
# Context manager calls `join()` at exit, so this will finish when all
45+
# workers return, crash or cancelled.
2046

21-
### OR
47+
assert sum(todo) == sum(fut.result() for fut in futures) # all done
2248

49+
50+
async def map_usage(todo=range(100)):
2351
pool = AioPool(size=10)
52+
# Joins internally, collects results from all spawned workers,
53+
# returns them in same order as `todo`, if worker crashes or cancelled:
54+
# returns exception object as a result.
55+
# Basically, it wraps `spawn_usage` code into one call.
56+
results = await pool.map(worker, todo)
57+
58+
assert isinstance(results[0], ZeroDivisionError) \
59+
and sum(results[1:]) == sum(todo)
60+
61+
62+
async def itermap_usage(todo=range(1,11)):
63+
# Python 3.6+
64+
result = 0
65+
async with AioPool(size=10) as pool:
66+
# Combines spawn_n and iterwait, which is a wrapper for asyncio.wait,
67+
# which yields results of finished workers according to `timeout` and
68+
# `yield_when` params passed to asyncio.wait (see it's docs for details)
69+
async for res in pool.itermap(worker, todo, timeout=0.5):
70+
result += res
71+
# technically, you can skip join call
72+
73+
assert result == sum(todo)
2474

25-
# generator returning futures for each worker result
26-
futures = await pool.itermap(worker, range(1,100))
27-
# or spawning manually: list of futures for each worker result
28-
futures = [await pool.spawn(worker(i)) for i in range(1,100)]
2975

76+
async def callbacks_usage():
77+
pass # TODO
78+
79+
80+
async def details(todo=range(1,11)):
81+
pool = AioPool(size=5)
82+
83+
# This code:
84+
f1 = []
85+
for i in todo:
86+
f1.append(await pool.spawn_n(worker(i)))
87+
# is equivalent to one call of `map_n`:
88+
f2 = await pool.map_n(worker, todo)
89+
90+
# Afterwards you can await for any given future:
91+
try:
92+
assert 3 == await f1[2] # result of spawn_n(worker(3))
93+
except Exception as e:
94+
# exception happened in worker (or CancelledError) will be re-raised
95+
pass
96+
97+
# Or use `asyncio.wait` to handle results in batches (see `iterwait` also):
98+
important_res = 0
99+
more_important = [f1[1], f2[1], f2[2]]
100+
while more_important:
101+
done, more_important = await aio.wait(more_important, timeout=0.5)
102+
# handle result, note it will re-raise exceptions
103+
important_res += sum(f.result() for f in done)
104+
105+
assert important_res == 2 + 2 + 3
106+
107+
# But you need to join, to allow all spawned workers to finish
108+
# (of course you can `asyncio.wait` all of the futures if you want to)
30109
await pool.join()
31-
print [fut.result() for fut in batch] # will re-raise exceptions
32110

111+
assert all(f.done() for f in itertools.chain(f1,f2)) # this is guaranteed
33112

34-
### OR moar later
35113
```
114+
115+
116+
## TODO:
117+
118+
- [ ] cancelled, timeouts
119+
- [ ] callbacks
120+
- [x] tests
121+
- [x] usage
122+
- [ ] examples
123+
- [ ] docs
124+
- [ ] readme

asyncio_pool/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
11
# coding: utf8
22

3-
from .pool import AioPool
3+
import sys
4+
from .base_pool import BaseAioPool
5+
6+
7+
if sys.version_info < (3, 6): # this means 3.5 # TODO test 3.4?
8+
9+
class AioPool(BaseAioPool): pass
10+
11+
else:
12+
from .mx_asynciter import MxAsyncIterPool
13+
14+
class AioPool(MxAsyncIterPool, BaseAioPool): pass

asyncio_pool/base_pool.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# coding: utf8
2+
'''Pool of asyncio coroutines with familiar interface, python3.5+ friendly'''
3+
4+
import traceback
5+
import collections
6+
import asyncio as aio
7+
from .utils import _get_future_result
8+
9+
10+
class BaseAioPool(object):
11+
12+
def __init__(self, size=1024, *, loop=None):
13+
self.loop = loop or aio.get_event_loop()
14+
15+
self.size = size
16+
self._executed = 0
17+
self._joined = collections.deque()
18+
self._waiting = collections.deque()
19+
self.semaphore = aio.Semaphore(value=self.size, loop=self.loop)
20+
21+
async def __aenter__(self):
22+
return self
23+
24+
async def __aexit__(self, ext_type, exc, tb):
25+
await self.join()
26+
27+
@property
28+
def n_active(self):
29+
return self.size - self.semaphore._value
30+
31+
@property
32+
def is_empty(self):
33+
return 0 == len(self._waiting) == self.n_active
34+
35+
@property
36+
def is_full(self):
37+
return self.size <= len(self._waiting) + self.n_active
38+
39+
async def join(self):
40+
if self.is_empty:
41+
return True
42+
43+
fut = self.loop.create_future()
44+
self._joined.append(fut)
45+
try:
46+
return await fut
47+
finally:
48+
self._joined.remove(fut)
49+
50+
def _release_joined(self):
51+
if not self.is_empty:
52+
raise RuntimeError() # TODO better message
53+
54+
for fut in self._joined:
55+
if not fut.done():
56+
fut.set_result(True)
57+
58+
async def _wrap(self, coro, future, cb=None, ctx=None):
59+
res, exc, tb = None, None, None
60+
try:
61+
res = await coro
62+
except Exception as _exc:
63+
exc = _exc
64+
tb = traceback.format_exc() # TODO tb object instead of text
65+
finally:
66+
self._executed += 1
67+
68+
if cb:
69+
err = None if exc is None else (exc, tb)
70+
wrapped = self._wrap(cb(res, err, ctx), future)
71+
self.loop.create_task(wrapped)
72+
return
73+
74+
self.semaphore.release()
75+
if not exc:
76+
future.set_result(res)
77+
else:
78+
future.set_exception(exc)
79+
80+
if self.is_empty:
81+
self._release_joined()
82+
83+
async def _spawn(self, future, coro, cb=None, ctx=None):
84+
try:
85+
await self.semaphore.acquire()
86+
except Exception as e:
87+
future.set_exception(e)
88+
self._waiting.remove(future)
89+
wrapped = self._wrap(coro, future, cb=cb, ctx=ctx)
90+
self.loop.create_task(wrapped)
91+
return future
92+
93+
async def spawn_n(self, coro, cb=None, ctx=None):
94+
future = self.loop.create_future()
95+
self._waiting.append(future)
96+
self.loop.create_task(self._spawn(future, coro, cb=cb, ctx=ctx))
97+
return future
98+
99+
async def spawn(self, coro, cb=None, ctx=None):
100+
future = self.loop.create_future()
101+
self._waiting.append(future)
102+
return await self._spawn(future, coro, cb=cb, ctx=ctx)
103+
104+
async def exec(self, coro, cb=None, ctx=None):
105+
return await (await self.spawn(coro, cb=cb, ctx=ctx))
106+
107+
async def map_n(self, fn, iterable):
108+
futures = []
109+
for it in iterable:
110+
fut = await self.spawn_n(fn(it))
111+
futures.append(fut)
112+
return futures
113+
114+
async def map(self, fn, iterable, exc_as_result=True):
115+
futures = await self.map_n(fn, iterable)
116+
await self.join()
117+
118+
result = []
119+
for fut in futures:
120+
if fut.exception():
121+
res = fut.exception() if exc_as_result else None
122+
else:
123+
res = fut.result()
124+
result.append(res)
125+
return result
126+
127+
async def iterwait(self, *arg, **kw): # TODO there's a way to support 3.5?
128+
raise NotImplementedError('python3.6+ required')
129+
130+
async def itermap(self, *arg, **kw): # TODO there's a way to support 3.5?
131+
raise NotImplementedError('python3.6+ required')
132+
133+
'''
134+
if sys.version_info >= (3, 6): # supports async generators
135+
136+
async def iterwait(self, futures, *, flat=True, exc_as_result=True,
137+
timeout=None, yield_when=aio.ALL_COMPLETED):
138+
139+
_futures = futures[:]
140+
while _futures:
141+
done, _futures = await aio.wait(_futures, loop=self.loop,
142+
timeout=timeout, return_when=yield_when)
143+
if flat:
144+
for fut in done:
145+
yield _get_future_result(fut, exc_as_result)
146+
else:
147+
yield [_get_future_result(f, exc_as_result) for f in done]
148+
149+
async def itermap(self, fn, iterable, *, flat=True, exc_as_result=True,
150+
timeout=None, yield_when=aio.ALL_COMPLETED):
151+
152+
futures = await self.map_n(fn, iterable)
153+
generator = self.iterwait(futures, flat=flat, timeout=timeout,
154+
exc_as_result=exc_as_result, yield_when=yield_when)
155+
async for batch in generator:
156+
yield batch # TODO is it possible to return a generator?
157+
'''

asyncio_pool/mx_asynciter.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# coding: utf8
2+
'''Mixin for BaseAioPool with async iterator features python3.6+'''
3+
4+
import asyncio as aio
5+
from .utils import _get_future_result
6+
7+
8+
class MxAsyncIterPool(object):
9+
10+
async def iterwait(self, futures, *, flat=True, exc_as_result=True,
11+
timeout=None, yield_when=aio.ALL_COMPLETED):
12+
13+
_futures = futures[:]
14+
while _futures:
15+
done, _futures = await aio.wait(_futures, loop=self.loop,
16+
timeout=timeout, return_when=yield_when)
17+
if flat:
18+
for fut in done:
19+
yield _get_future_result(fut, exc_as_result)
20+
else:
21+
yield [_get_future_result(f, exc_as_result) for f in done]
22+
23+
async def itermap(self, fn, iterable, *, flat=True, exc_as_result=True,
24+
timeout=None, yield_when=aio.ALL_COMPLETED):
25+
26+
futures = await self.map_n(fn, iterable)
27+
generator = self.iterwait(futures, flat=flat, timeout=timeout,
28+
exc_as_result=exc_as_result, yield_when=yield_when)
29+
async for batch in generator:
30+
yield batch # TODO is it possible to return a generator?

0 commit comments

Comments
 (0)