Skip to content

Commit 001f9c1

Browse files
authored
Merge pull request #3 from FichteForks/pr/sync-spawn-map_n
Make spawn_n and map_n synchronous
2 parents a4f8a37 + c3a30af commit 001f9c1

File tree

10 files changed

+55
-45
lines changed

10 files changed

+55
-45
lines changed

README.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async def spawn_n_usage(todo=[range(1,51), range(51,101), range(101,200)]):
7272
# Returns quickly for all tasks, does not wait for pool space.
7373
# Workers are not spawned, they wait for pool space in their
7474
# own background tasks.
75-
fut = await pool.spawn_n(worker(i))
75+
fut = pool.spawn_n(worker(i))
7676
futures.append(fut)
7777
# At this point not a single worker should start.
7878

@@ -147,7 +147,7 @@ async def callbacks_usage():
147147

148148
async with AioPool(size=2) as pool:
149149
for i in todo:
150-
fut = await pool.spawn_n(wrk(i), cb, (pool, i))
150+
fut = pool.spawn_n(wrk(i), cb, (pool, i))
151151
futures.append(fut)
152152

153153
results = []
@@ -173,7 +173,7 @@ async def callbacks_usage():
173173

174174
async def exec_usage(todo=range(1,11)):
175175
async with AioPool(size=4) as pool:
176-
futures = await pool.map_n(worker, todo)
176+
futures = pool.map_n(worker, todo)
177177

178178
# While other workers are waiting or active, you can "synchronously"
179179
# execute one task. It does not interrupt others, just waits for pool
@@ -195,9 +195,9 @@ async def cancel_usage():
195195

196196
pool = AioPool(size=2)
197197

198-
f_quick = await pool.spawn_n(aio.sleep(0.1))
199-
f12 = await pool.spawn(wrk()), await pool.spawn_n(wrk())
200-
f35 = await pool.map_n(wrk, range(3))
198+
f_quick = pool.spawn_n(aio.sleep(0.1))
199+
f12 = await pool.spawn(wrk()), pool.spawn_n(wrk())
200+
f35 = pool.map_n(wrk, range(3))
201201

202202
# At this point, if you cancel futures, returned by pool methods,
203203
# you just won't be able to retrieve spawned task results, task
@@ -232,9 +232,9 @@ async def details(todo=range(1,11)):
232232
# This code:
233233
f1 = []
234234
for i in todo:
235-
f1.append(await pool.spawn_n(worker(i)))
235+
f1.append(pool.spawn_n(worker(i)))
236236
# is equivalent to one call of `map_n`:
237-
f2 = await pool.map_n(worker, todo)
237+
f2 = pool.map_n(worker, todo)
238238

239239
# Afterwards you can await for any given future:
240240
try:

asyncio_pool/base_pool.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ async def _wrap(self, coro, future, cb=None, ctx=None):
106106
res, exc, tb = None, None, None
107107
try:
108108
res = await coro
109-
except Exception as _exc:
109+
except BaseException as _exc:
110110
exc = _exc
111111
tb = traceback.format_exc()
112112
finally:
@@ -140,7 +140,7 @@ async def _spawn(self, future, coro, cb=None, ctx=None):
140140
acq_error = False
141141
try:
142142
await self.semaphore.acquire()
143-
except Exception as e:
143+
except BaseException as e:
144144
acq_error = True
145145
if not future.done():
146146
future.set_exception(e)
@@ -182,7 +182,7 @@ async def spawn(self, coro, cb=None, ctx=None):
182182
self._waiting[future] = self.loop.create_future() # as a placeholder
183183
return await self._spawn(future, coro, cb=cb, ctx=ctx)
184184

185-
async def spawn_n(self, coro, cb=None, ctx=None):
185+
def spawn_n(self, coro, cb=None, ctx=None):
186186
'''Creates waiting task for given `coro` regardless of pool space. If
187187
pool is not full, this task will be executed very soon. Main difference
188188
is that `spawn_n` does not block and returns future very quickly.
@@ -203,15 +203,15 @@ async def exec(self, coro, cb=None, ctx=None):
203203
'''
204204
return await (await self.spawn(coro, cb, ctx))
205205

206-
async def map_n(self, fn, iterable, cb=None, ctx=None):
206+
def map_n(self, fn, iterable, cb=None, ctx=None):
207207
'''Creates coroutine with `fn` function for each item in `iterable`,
208208
spawns each of them with `spawn_n`, returning futures.
209209
210210
Read more about callbacks in `spawn` docstring.
211211
'''
212212
futures = []
213213
for it in iterable:
214-
fut = await self.spawn_n(fn(it), cb, ctx)
214+
fut = self.spawn_n(fn(it), cb, ctx)
215215
futures.append(fut)
216216
return futures
217217

asyncio_pool/mx_asyncgen.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async def itermap(self, fn, iterable, cb=None, ctx=None, *, flat=True,
3737
'''Spawns coroutines created with `fn` for each item in `iterable`, then
3838
waits for results with `iterwait`. See docs for `map_n` and `iterwait`.
3939
'''
40-
futures = await self.map_n(fn, iterable, cb, ctx)
40+
futures = self.map_n(fn, iterable, cb, ctx)
4141
generator = iterwait(futures, flat=flat, timeout=timeout,
4242
get_result=get_result, yield_when=yield_when, loop=self.loop)
4343
async for batch in generator:

asyncio_pool/mx_asynciter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __aiter__(_self):
6060

6161
async def __anext__(_self):
6262
if not hasattr(_self, 'waiter'):
63-
_self.waiter = mk_waiter(await mk_map())
63+
_self.waiter = mk_waiter(mk_map())
6464
return await _self.waiter.__anext__()
6565

6666
return _itermap()

asyncio_pool/results.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def result_noraise(future, flat=True):
1414
try:
1515
res = future.result()
1616
return res if flat else (res, None)
17-
except Exception as exc:
17+
except BaseException as exc:
1818
return exc if flat else (None, exc)
1919

2020

examples/_usage.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def spawn_n_usage(todo=[range(1,51), range(51,101), range(101,200)]):
2424
# Returns quickly for all tasks, does not wait for pool space.
2525
# Workers are not spawned, they wait for pool space in their
2626
# own background tasks.
27-
fut = await pool.spawn_n(worker(i))
27+
fut = pool.spawn_n(worker(i))
2828
futures.append(fut)
2929
# At this point not a single worker should start.
3030

@@ -99,7 +99,7 @@ async def cb(res, err, ctx): # callback
9999

100100
async with AioPool(size=2) as pool:
101101
for i in todo:
102-
fut = await pool.spawn_n(wrk(i), cb, (pool, i))
102+
fut = pool.spawn_n(wrk(i), cb, (pool, i))
103103
futures.append(fut)
104104

105105
results = []
@@ -110,7 +110,7 @@ async def cb(res, err, ctx): # callback
110110
# results.append(getres.flat(fut))
111111
try:
112112
results.append(fut.result())
113-
except Exception as e:
113+
except BaseException as e:
114114
results.append(e)
115115

116116
# First error happens for n == 0 in wrk, exception of it is passed to
@@ -125,7 +125,7 @@ async def cb(res, err, ctx): # callback
125125

126126
async def exec_usage(todo=range(1,11)):
127127
async with AioPool(size=4) as pool:
128-
futures = await pool.map_n(worker, todo)
128+
futures = pool.map_n(worker, todo)
129129

130130
# While other workers are waiting or active, you can "synchronously"
131131
# execute one task. It does not interrupt others, just waits for pool
@@ -147,9 +147,9 @@ async def wrk(*arg, **kw):
147147

148148
pool = AioPool(size=2)
149149

150-
f_quick = await pool.spawn_n(aio.sleep(0.1))
151-
f12 = await pool.spawn(wrk()), await pool.spawn_n(wrk())
152-
f35 = await pool.map_n(wrk, range(3))
150+
f_quick = pool.spawn_n(aio.sleep(0.1))
151+
f12 = await pool.spawn(wrk()), pool.spawn_n(wrk())
152+
f35 = pool.map_n(wrk, range(3))
153153

154154
# At this point, if you cancel futures, returned by pool methods,
155155
# you just won't be able to retrieve spawned task results, task
@@ -184,15 +184,15 @@ async def details(todo=range(1,11)):
184184
# This code:
185185
f1 = []
186186
for i in todo:
187-
f1.append(await pool.spawn_n(worker(i)))
187+
f1.append(pool.spawn_n(worker(i)))
188188
# is equivalent to one call of `map_n`:
189-
f2 = await pool.map_n(worker, todo)
189+
f2 = pool.map_n(worker, todo)
190190

191191
# Afterwards you can await for any given future:
192192
try:
193193
assert 3 == await f1[2] # result of spawn_n(worker(3))
194-
except Exception as e:
195-
# exception happened in worker (or CancelledError) will be re-raised
194+
except BaseException:
195+
# exception happened in worker (including CancelledError) will be re-raised
196196
pass
197197

198198
# Or use `asyncio.wait` to handle results in batches (see `iterwait` also):

tests/loadtest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ async def loadtest_spawn_n(tasks, pool_size, duration):
2525
futures = []
2626
async with AioPool(size=pool_size) as pool:
2727
for i in range(tasks):
28-
fut = await pool.spawn_n(aio.sleep(duration))
28+
fut = pool.spawn_n(aio.sleep(duration))
2929
futures.append(fut)
3030

3131
return [getres.flat(f) for f in futures]

tests/test_base.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async def wrk(n):
2626

2727
pool_size = 5
2828
async with AioPool(size=pool_size) as pool:
29-
futures = await pool.map_n(wrk, todo)
29+
futures = pool.map_n(wrk, todo)
3030

3131
await aio.sleep(0.01)
3232

@@ -70,7 +70,7 @@ async def outer(n, pool):
7070
loop = aio.get_event_loop()
7171
pool = AioPool(size=100)
7272

73-
tasks = await pool.map_n(inner, todo)
73+
pool.map_n(inner, todo)
7474
joined = [loop.create_task(outer(j, pool)) for j in to_release]
7575
await pool.join()
7676

@@ -86,18 +86,27 @@ async def wrk(*arg, **kw):
8686
await aio.sleep(0.5)
8787
return 1
8888

89-
pool = AioPool(size=2)
89+
async def wrk_safe(*arg, **kw):
90+
try:
91+
await aio.sleep(0.5)
92+
except aio.CancelledError:
93+
await aio.sleep(0.1) # simulate cleanup
94+
pass
95+
return 1
96+
97+
pool = AioPool(size=5)
9098

91-
f_quick = await pool.spawn_n(aio.sleep(0.15))
92-
f12 = await pool.spawn(wrk()), await pool.spawn_n(wrk())
93-
f35 = await pool.map_n(wrk, range(3))
99+
f_quick = pool.spawn_n(aio.sleep(0.15))
100+
f_safe = await pool.spawn(wrk_safe())
101+
f3 = await pool.spawn(wrk())
102+
pool.spawn_n(wrk())
103+
f567 = pool.map_n(wrk, range(3))
94104

95105
# cancel some
96106
await aio.sleep(0.1)
97-
cancelled, results = await pool.cancel(f12[0], f35[2]) # running and waiting
98-
assert 2 == cancelled # none of them had time to finish
99-
assert 2 == len(results) and \
100-
all(isinstance(res, aio.CancelledError) for res in results)
107+
cancelled, results = await pool.cancel(f3, f567[2]) # running and waiting
108+
assert cancelled == len(results) == 2 # none of them had time to finish
109+
assert all(isinstance(res, aio.CancelledError) for res in results)
101110

102111
# cancel all others
103112
await aio.sleep(0.1)
@@ -106,11 +115,12 @@ async def wrk(*arg, **kw):
106115
assert f_quick.done() and f_quick.result() is None
107116

108117
cancelled, results = await pool.cancel() # all
109-
assert 3 == cancelled
110-
assert len(results) == 3 and \
111-
all(isinstance(res, aio.CancelledError) for res in results)
118+
assert cancelled == len(results) == 4
119+
assert f_safe.done() and f_safe.result() == 1 # could recover
120+
# the others could not
121+
assert sum(isinstance(res, aio.CancelledError) for res in results) == 3
112122

113-
assert await pool.join() # joins successfully
123+
assert await pool.join() # joins successfully (basically no-op)
114124

115125

116126
@pytest.mark.asyncio

tests/test_callbacks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async def test_spawn_n():
3131
async with AioPool(size=2) as pool:
3232
for i in todo:
3333
ctx = (pool, i)
34-
fut = await pool.spawn_n(wrk(i), cb, ctx)
34+
fut = pool.spawn_n(wrk(i), cb, ctx)
3535
futures.append(fut)
3636

3737
results = [getres.flat(f) for f in futures]
@@ -52,7 +52,7 @@ async def test_map():
5252
async def test_map_n():
5353
todo = range(2,11)
5454
async with AioPool(size=3) as pool:
55-
futures = await pool.map_n(wrk, todo, cb)
55+
futures = pool.map_n(wrk, todo, cb)
5656

5757
results = [getres.flat(f) for f in futures]
5858
assert 2 * sum(todo) == sum(results)

tests/test_spawn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async def wrk(n):
2727

2828
async with AioPool(size=2) as pool:
2929
for i in range(1,6):
30-
await pool.spawn_n(wrk(i)) # does not wait for pool, just spawns waiting coros
30+
pool.spawn_n(wrk(i)) # does not wait for pool, just spawns waiting coros
3131
assert len(started) == 0 # so atm no worker should be able to start
3232

3333

0 commit comments

Comments
 (0)