Skip to content

Commit d970c09

Browse files
committed
itermap for python3.5
1 parent 7a710af commit d970c09

File tree

12 files changed

+118
-51
lines changed

12 files changed

+118
-51
lines changed

README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,39 @@ AioPool makes sure _no more_ and _no less_ (if possible) than `size` spawned cor
88

99
Read [code doctrings](../master/asyncio_pool/base_pool.py) for details.
1010

11-
##### AioPool(size=4, *, loop=None)
11+
> `AioPool(size=4, *, loop=None)`
1212
1313
Creates pool of `size` concurrent tasks. Supports async context manager interface.
1414

15-
##### spawn(coro, cb=None, ctx=None)
15+
> `spawn(coro, cb=None, ctx=None)`
1616
1717
Waits for pool space, then creates task for `coro` coroutine, returning future for it's result. Can spawn coroutine, created by `cb` with result of `coro` as first argument. `ctx` context is passed to callback as third positinal argument.
1818

19-
##### exec(coro, cb=None, ctx=None)
19+
> `exec(coro, cb=None, ctx=None)`
2020
2121
Waits for pool space, then creates task for `coro`, then waits for it to finish, then returns result of `coro` if no callback is provided, otherwise creates task for callback, waits for it and returns result of callback.
2222

23-
##### spawn_n(coro, cb=None, ctx=None)
23+
> `spawn_n(coro, cb=None, ctx=None)`
2424
2525
Creates waiting task for `coro`, returns future without waiting for pool space. Task is executed "in pool" when pool space is available.
2626

27-
##### join()
27+
> `join()`
2828
2929
Waits for all spawned (active and waiting) tasks to finish. Joining pool from coroutine, spawned by the same pool leads to *deadlock*.
3030

31-
##### cancel(*futures)
31+
> `cancel(*futures)`
3232
3333
Cancels spawned tasks (active and waiting), finding them by provided `futures`. If no futures provided -- cancels all spawned tasks.
3434

35-
##### map(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)
35+
> `map(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
3636
3737
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn`, waits for all of them to finish (including callbacks), returns results maintaining order of `iterable`.
3838

39-
##### map_n(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)
39+
> `map_n(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
4040
4141
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn_n`, returns futures for task results maintaining order of `iterable`.
4242

43-
##### itermap(fn, iterable, cb=None, ctx=None, *, flat=True, exc_as_result=True, timeout=None, yield_when=asyncio.ALL_COMPLETED)
43+
> `itermap(fn, iterable, cb=None, ctx=None, *, flat=True, exc_as_result=True, timeout=None, yield_when=asyncio.ALL_COMPLETED)`
4444
4545
Spawns tasks with `map_n(fn, iterable, cb, ctx)`, then waits for results with `asyncio.wait` function, yielding ready results one by one if `flat` == True, otherwise yielding list of ready results.
4646

asyncio_pool/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
if sys.version_info < (3, 6): # this means 3.5 # TODO test 3.4?
1010

11-
class AioPool(BaseAioPool): pass
11+
from .mx_asynciter import MxAsyncIterPool, iterwait
12+
13+
class AioPool(MxAsyncIterPool, BaseAioPool): pass
1214

1315
else:
1416
from .mx_asyncgen import MxAsyncGenPool, iterwait

asyncio_pool/base_pool.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88

99
class BaseAioPool(object):
10-
# python3.5 friendly
1110

1211
def __init__(self, size=1024, *, loop=None):
1312
'''Pool of asyncio coroutines with familiar interface.
@@ -231,11 +230,13 @@ async def map(self, fn, iterable, cb=None, ctx=None, *, exc_as_result=True):
231230
await aio.wait(futures)
232231
return [result_noraise(fut, exc_as_result) for fut in futures]
233232

234-
async def iterwait(self, *arg, **kw): # TODO there's a way to support 3.5?
235-
raise NotImplementedError('python3.6+ required')
236-
237-
async def itermap(self, *arg, **kw): # TODO there's a way to support 3.5?
238-
raise NotImplementedError('python3.6+ required')
233+
async def itermap(self, fn, iterable, cb=None, ctx=None, *, flat=True,
234+
exc_as_result=True, timeout=None, yield_when=aio.ALL_COMPLETED):
235+
'''Spawns coroutines created with `fn` for each item in `iterable`, then
236+
waits for results with `iterwait` (implementation specific). See docs
237+
for `map_n` and `iterwait` (in mixins for py3.5 and py3.6+).
238+
'''
239+
raise NotImplementedError('Use one of mixins')
239240

240241
def _cancel(self, *futures):
241242
tasks, _futures = [], []

asyncio_pool/mx_asyncgen.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ async def iterwait(futures, *, flat=True, exc_as_result=True,
3131
class MxAsyncGenPool(object):
3232
# Asynchronous generator wrapper for asyncio.wait.
3333

34-
async def itermap(self, fn, iterable, cb=None, ctx=None, *,
35-
flat=True, exc_as_result=True, timeout=None,
36-
yield_when=aio.ALL_COMPLETED):
34+
async def itermap(self, fn, iterable, cb=None, ctx=None, *, flat=True,
35+
exc_as_result=True, timeout=None, yield_when=aio.ALL_COMPLETED):
3736
'''Spawns coroutines created with `fn` for each item in `iterable`, then
3837
waits for results with `iterwait`. See docs for `map_n` and `iterwait`.
3938
'''

asyncio_pool/mx_asynciter.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# coding: utf8
2+
'''Mixin for BaseAioPool with async generator _simulation_ for python 3.5'''
3+
4+
import asyncio as aio
5+
from collections import deque
6+
from functools import partial
7+
from .utils import result_noraise
8+
9+
10+
class iterwait:
11+
12+
def __init__(self, futures, *, flat=True, exc_as_result=True,
13+
timeout=None, yield_when=aio.ALL_COMPLETED, loop=None):
14+
15+
self.results = deque()
16+
self.flat = flat
17+
self._futures = futures
18+
self._extract = partial(result_noraise, exc_as_result=exc_as_result)
19+
self._wait = partial(aio.wait, timeout=timeout, loop=loop,
20+
return_when=yield_when)
21+
22+
def __aiter__(self):
23+
return self
24+
25+
async def __anext__(self):
26+
if not (self._futures or self.results):
27+
raise StopAsyncIteration()
28+
while not self.results:
29+
await self._wait_next()
30+
return self.results.popleft()
31+
32+
async def _wait_next(self):
33+
while True:
34+
done, self._futures = await self._wait(self._futures)
35+
if done:
36+
batch = [self._extract(fut) for fut in done]
37+
if self.flat:
38+
self.results.extend(batch)
39+
else:
40+
self.results.append(batch)
41+
break
42+
43+
44+
class MxAsyncIterPool(object):
45+
46+
def itermap(self, fn, iterable, cb=None, ctx=None, *, flat=True,
47+
exc_as_result=True, timeout=None, yield_when=aio.ALL_COMPLETED):
48+
'''Spawns coroutines created with `fn` for each item in `iterable`, then
49+
waits for results with `iterwait`. See docs for `map_n` and `iterwait`.
50+
'''
51+
mk_map = partial(self.map_n, fn, iterable, cb=cb, ctx=ctx)
52+
mk_waiter = partial(iterwait, flat=flat, loop=self.loop,
53+
exc_as_result=exc_as_result, timeout=timeout,
54+
yield_when=yield_when)
55+
56+
class _itermap:
57+
def __aiter__(_self):
58+
return _self
59+
60+
async def __anext__(_self):
61+
if not hasattr(_self, 'waiter'):
62+
_self.waiter = mk_waiter(await mk_map())
63+
return await _self.waiter.__anext__()
64+
65+
return _itermap()

asyncio_pool/utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# coding: utf8
22

3-
import asyncio as aio
4-
53

64
def result_noraise(future, exc_as_result=True):
75
try:
8-
return future.result()
6+
res = future.result()
7+
return res if exc_as_result else (res, None)
98
except Exception as exc:
10-
return exc if exc_as_result else None
9+
# TODO traceback ??
10+
return exc if exc_as_result else (None, exc)

docs/_readme_template.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,39 @@ AioPool makes sure _no more_ and _no less_ (if possible) than `size` spawned cor
88

99
Read [code doctrings](../master/asyncio_pool/base_pool.py) for details.
1010

11-
##### AioPool(size=4, *, loop=None)
11+
> `AioPool(size=4, *, loop=None)`
1212
1313
Creates pool of `size` concurrent tasks. Supports async context manager interface.
1414

15-
##### spawn(coro, cb=None, ctx=None)
15+
> `spawn(coro, cb=None, ctx=None)`
1616
1717
Waits for pool space, then creates task for `coro` coroutine, returning future for it's result. Can spawn coroutine, created by `cb` with result of `coro` as first argument. `ctx` context is passed to callback as third positinal argument.
1818

19-
##### exec(coro, cb=None, ctx=None)
19+
> `exec(coro, cb=None, ctx=None)`
2020
2121
Waits for pool space, then creates task for `coro`, then waits for it to finish, then returns result of `coro` if no callback is provided, otherwise creates task for callback, waits for it and returns result of callback.
2222

23-
##### spawn_n(coro, cb=None, ctx=None)
23+
> `spawn_n(coro, cb=None, ctx=None)`
2424
2525
Creates waiting task for `coro`, returns future without waiting for pool space. Task is executed "in pool" when pool space is available.
2626

27-
##### join()
27+
> `join()`
2828
2929
Waits for all spawned (active and waiting) tasks to finish. Joining pool from coroutine, spawned by the same pool leads to *deadlock*.
3030

31-
##### cancel(*futures)
31+
> `cancel(*futures)`
3232
3333
Cancels spawned tasks (active and waiting), finding them by provided `futures`. If no futures provided -- cancels all spawned tasks.
3434

35-
##### map(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)
35+
> `map(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
3636
3737
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn`, waits for all of them to finish (including callbacks), returns results maintaining order of `iterable`.
3838

39-
##### map_n(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)
39+
> `map_n(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
4040
4141
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn_n`, returns futures for task results maintaining order of `iterable`.
4242

43-
##### itermap(fn, iterable, cb=None, ctx=None, *, flat=True, exc_as_result=True, timeout=None, yield_when=asyncio.ALL_COMPLETED)
43+
> `itermap(fn, iterable, cb=None, ctx=None, *, flat=True, exc_as_result=True, timeout=None, yield_when=asyncio.ALL_COMPLETED)`
4444
4545
Spawns tasks with `map_n(fn, iterable, cb, ctx)`, then waits for results with `asyncio.wait` function, yielding ready results one by one if `flat` == True, otherwise yielding list of ready results.
4646

examples/_usage.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ async def map_usage(todo=range(100)):
6666

6767

6868
async def itermap_usage(todo=range(1,11)):
69-
# Python 3.6+
7069
result = 0
7170
async with AioPool(size=10) as pool:
7271
# Combines spawn_n and iterwait, which is a wrapper for asyncio.wait,

setup.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@
33
import setuptools
44

55

6+
github_url = 'https://github.com/gistart/asyncio-pool'
7+
8+
readme_lines = []
69
with open('README.md') as fd:
7-
readme = fd.read()
10+
readme_lines = filter(None, fd.read().splitlines())
11+
readme_lines = list(readme_lines)[:3]
12+
readme_lines.append('Read more at [github page](%s).' % github_url)
13+
readme = '\n\n'.join(readme_lines)
814

915

1016
setuptools.setup(
@@ -15,7 +21,7 @@
1521
description='Pool of asyncio coroutines with familiar interface',
1622
long_description=readme,
1723
long_description_content_type='text/markdown',
18-
url='https://github.com/gistart/asyncio-pool',
24+
url=github_url,
1925
license='MIT',
2026
packages=['asyncio_pool'],
2127
# install_requires=['asyncio'], # where " openstack/deb-python-trollius asyncio" comes from???

tests/loadtest.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,15 @@ async def wrk(i):
5353

5454
def print_stats(args, exec_time):
5555
ideal = args.task_duration * (args.tasks / args.pool_size)
56-
5756
overhead = exec_time - ideal
58-
overhead_perc = ((exec_time / ideal) - 1) * 100
59-
6057
per_task = overhead / args.tasks
61-
per_task_perc = (((args.task_duration + per_task) / args.task_duration) - 1) * 100
58+
overhead_perc = ((exec_time / ideal) - 1) * 100
6259

6360
print(f'{ideal:15.5f}s -- ideal result')
64-
print(f'{exec_time:15.5f}s -- were executing')
65-
print(f'{overhead:15.5f}s -- overhead total')
66-
print(f'{overhead_perc:15.5f}% -- overhead total percent')
61+
print(f'{exec_time:15.5f}s -- total executing time')
62+
print(f'{overhead:15.5f}s -- total overhead')
6763
print(f'{per_task:15.5f}s -- overhead per task')
68-
print(f'{per_task_perc:15.5f}% -- overhead per task percent')
64+
print(f'{overhead_perc:13.3f}% -- overhead total percent')
6965

7066

7167
if __name__ == "__main__":

0 commit comments

Comments
 (0)