Skip to content

Commit adbe311

Browse files
committed
init
0 parents  commit adbe311

File tree

15 files changed

+356
-0
lines changed

15 files changed

+356
-0
lines changed

.gitignore

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
*.swo
2+
*.swp
3+
*.pyc
4+
*.pyo
5+
*.log
6+
*.lock
7+
.env*
8+
.pypyenv*
9+
.pytest_*
10+
.git
11+
__pycache__
12+
local_settings.py
13+
14+
!.gitkeep
15+
!/.gitignore

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# asyncio-pool
2+
3+
TODO

__init__.py

Whitespace-only changes.

asyncio_pool/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# coding: utf8
2+
3+
from .pool import AioPool

asyncio_pool/pool.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# coding: utf8
2+
3+
import traceback
4+
import asyncio as aio
5+
6+
7+
class AioPool(object):
8+
9+
def __init__(self, size=1024, loop=None):
10+
self._size = size
11+
self._waiting = 0
12+
self._executed = 0
13+
self.semaphore = aio.Semaphore(value=self._size)
14+
self.loop = loop or aio.get_event_loop()
15+
self._all_done = self.loop.create_future()
16+
17+
@property
18+
def is_empty(self):
19+
return 0 == self._waiting == (self._size - self.semaphore._value)
20+
21+
@property
22+
def is_full(self):
23+
return self._waiting + (self._size - self.semaphore._value) >= self._size
24+
25+
async def join(self):
26+
await self._all_done
27+
28+
async def __aenter__(self):
29+
return self
30+
31+
async def __aexit__(self, ext_type, exc, tb):
32+
await self.join()
33+
34+
async def _acquire(self):
35+
if self._all_done.done():
36+
self._all_done = self.loop.create_future()
37+
self._waiting += 1
38+
await self.semaphore.acquire()
39+
self._waiting -= 1
40+
41+
async def _wrap(self, coro, future, cb=None, ctx=None):
42+
res, exc, tb = None, None, None
43+
try:
44+
res = await coro
45+
future.set_result(res)
46+
except Exception as exc:
47+
future.set_exception(exc)
48+
tb = traceback.format_exc()
49+
finally:
50+
self.semaphore.release()
51+
self._executed += 1
52+
53+
if cb:
54+
await self.spawn(cb(res, (exc, tb), ctx))
55+
elif self.is_empty:
56+
self._all_done.set_result('done')
57+
58+
async def spawn(self, coro, cb=None, ctx=None):
59+
await self._acquire()
60+
future = self.loop.create_future()
61+
wrapped = self._wrap(coro, future, cb=cb, ctx=ctx)
62+
self.loop.create_task(wrapped)
63+
return future
64+
65+
async def exec(self, coro):
66+
return await (await self.spawn(coro))
67+
68+
async def itermap(self, fn, iterable):
69+
for it in iterable:
70+
yield (await self.spawn(fn(it)))
71+
72+
async def map(self, fn, iterable, exc_as_result=True):
73+
futures = []
74+
async for fut in self.itermap(fn, iterable):
75+
futures.append(fut)
76+
await self.join()
77+
78+
result = []
79+
for fut in futures:
80+
if fut.exception():
81+
res = fut.exception() if exc_as_result else None
82+
else:
83+
res = fut.result()
84+
result.append(res)
85+
return result

examples/__init__.py

Whitespace-only changes.

examples/ls_remote.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# coding: utf8
2+
3+
import logging
4+
import asyncio as aio
5+
import json as myformat # just for a demo
6+
7+
8+
def parse_res(res):
9+
doc = myformat.loads(res)
10+
return doc.get('files') or [], doc.get('folders') or []
11+
12+
13+
async def ls_cb(res, err, ctx):
14+
apipool, dbpool, api, db, log = ctx
15+
if err:
16+
log.error()
17+
return
18+
19+
files, folders = parse_res(res)
20+
for folder in folders:
21+
await apipool.spawn(api.ls(folder['path']), cb=ls_cb, ctx=ctx)
22+
await dbpool.spawn(db.save(folder))
23+
for file in files:
24+
await dbpool.spawn(db.save(file))
25+
26+
27+
async def example_ls():
28+
loop = aio.get_event_loop()
29+
api = await create_client(loop)
30+
db = await create_connection(dsn, loop)
31+
log = logging.getLogger('example_ls')
32+
33+
with AioPool(size=10) as dbpool, \
34+
AioPool(size=10) as apipool:
35+
ctx = (apipool, dbpool, api, db, log)
36+
await apipool.spawn(api.ls('/'), cb=ls_cb, ctx=ctx)
37+
38+
39+
if __name__ == "__main__":
40+
aio.get_event_loop().run_until_complete(example_ls())

reqs-dev.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ipython
2+
pytest
3+
pytest-asyncio

run_tests.sh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#! /bin/sh
2+
3+
py35=python3.5
4+
py36=python3.6
5+
py37=python3.7
6+
pypy3=/opt/pypy3/pypy3/bin/pypy3
7+
8+
default_env=$py36
9+
10+
11+
for py in $py35 $py36 $py37 $pypy3
12+
do
13+
if [ -x "$(command -v $py)" ]; then
14+
pyname="$(basename $py)"
15+
envname=".env_$pyname"
16+
17+
if ! [ -d $envname ]; then
18+
echo "$pyname: virtual env does not exist"
19+
else
20+
$envname/bin/pytest tests
21+
fi
22+
else
23+
echo "$py: not found"
24+
fi
25+
done

setup.py

Whitespace-only changes.

0 commit comments

Comments
 (0)