1818import aiohttp
1919from typing_extensions import ParamSpec
2020
21- from ._async_executor import AsyncExecutor
2221from .types import Autocommit
2322from .types import Connection
2423from .types import Cursor
@@ -175,56 +174,12 @@ def conv_err(e: Optional[ErrorResult]) -> Optional[BaseException]:
175174 errors = [conv_err (e ) for e in resp ["step_errors" ]]
176175 return RawExecuteResult (resp ["step_results" ], errors )
177176
178-
179- if TYPE_CHECKING :
180- if sys .version_info [:2 ] >= (3 , 9 ):
181-
182- @overload
183- def run_in_executor (
184- fn : Callable [P , Awaitable [asyncio .Future [T ]]],
185- ) -> Callable [P , T ]:
186- ...
187-
188- @overload
189- def run_in_executor (fn : Callable [P , Awaitable [T ]]) -> Callable [P , T ]:
190- ...
191-
192- @overload
193- def run_in_executor (fn : Callable [P , T ]) -> Callable [P , T ]:
194- ...
195-
196-
197- def run_in_executor (fn : Callable [P , T ]) -> Callable [P , T ]:
198- """ConnectionHrana method decorator that runs code in the executor thread.
199-
200- This will execute the decorated method body inside the
201- :py:class:`AsyncExecutor` thread by doing a ``AsyncExecutor.submit()``
202- and then ``future.result(timeout)``.
203-
204- The method itself will block until the executor runs.
205-
206- :meta private:
207- """
208-
209- def wrapper (* args : P .args , ** kwargs : P .kwargs ) -> T :
210- self = args [0 ]
211- assert isinstance (self , ConnectionHrana )
212- assert self ._executor is not None
213-
214- future = self ._executor .submit (fn , * args , ** kwargs )
215-
216- return future .result (timeout = self ._timeout )
217-
218- return wrapper
219-
220-
221177class ConnectionHrana (Connection ):
222178 """Implement :py:class:`sqlite3.Connection` for remote servers
223179 using the `Hrana Protocol
224180 <https://github.com/libsql/sqld/blob/main/docs/HRANA_2_SPEC.md>`_.
225181 """
226182
227- _executor : Optional [AsyncExecutor ] # TODO: share
228183 _session : Optional [aiohttp .ClientSession ] # TODO: share (per url)
229184 _conn : Optional [HranaConn ] # TODO: share (per url)
230185 _stream : Optional [HranaStream ]
@@ -255,54 +210,32 @@ def __init__(
255210 def _raw_init (self ) -> None :
256211 self .cursor_factory = CursorHrana
257212 try :
258- self ._executor = self ._acquire_executor ()
259213 self ._session = self ._acquire_session ()
260214 self ._conn = self ._acquire_connection (self ._database )
261215 self ._stream = self ._create_stream ()
262216 except Exception :
263217 self ._raw_close ()
264218 raise
265219
266- def _acquire_executor (self ) -> AsyncExecutor :
267- return AsyncExecutor () # TODO: share
268-
269- def _dispose_executor (self , executor : AsyncExecutor ) -> None :
270- executor .shutdown () # TODO: share
271-
272- @run_in_executor
273220 def _acquire_session (self ) -> aiohttp .ClientSession :
274221 return aiohttp .ClientSession () # TODO: share
275222
276- @run_in_executor
277223 async def _dispose_session (self , session : aiohttp .ClientSession ) -> None :
278224 await session .close () # TODO: share
279225
280- @run_in_executor
281- async def _acquire_connection (self , url : str ) -> HranaConn :
226+ def _acquire_connection (self , url : str ) -> HranaConn :
282227 assert self ._session is not None
283- # TODO: share (per url)
284- conn = _create_hrana_connection (self ._session , url )
285- try :
286- await conn .wait_connected ()
287- return conn
288- except Exception as error :
289- await conn .close ()
290- exc = _conv_stmt_result (None , error ).errors [0 ]
291- assert exc is not None # make mypy happy
292- raise exc
228+ return _create_hrana_connection (self ._session , url )
293229
294- @run_in_executor
295230 async def _dispose_connection (self , conn : HranaConn ) -> None :
296231 await conn .close () # TODO: share (per url)
297232
298- @run_in_executor
299233 def _create_stream (self ) -> HranaStream :
300234 assert self ._conn is not None
301235 stream = self ._conn .open_stream ()
302236 self ._inf ("created stream: %s" , stream )
303237 return stream
304238
305- @run_in_executor
306239 def _destroy_stream (self , stream : HranaStream ) -> None :
307240 self ._inf ("closing stream: %s" , stream )
308241 stream .close ()
@@ -311,27 +244,26 @@ async def _raw_execute(self, stmt: Stmt) -> asyncio.Future[StmtResult]:
311244 assert self ._stream is not None
312245 return self ._stream .execute (stmt )
313246
314- @run_in_executor
315247 def _raw_store_sql (self , sql : str ) -> int :
316248 assert self ._conn is not None
317249 return self ._conn .store_sql (sql )
318250
319- @run_in_executor
320251 def _raw_close_sql (self , sql_id : int ) -> None :
321252 assert self ._conn is not None
322253 return self ._conn .close_sql (sql_id )
323254
324- @run_in_executor
325255 def _raw_execute_script (self , sql_script : str ) -> asyncio .Future [None ]:
326256 assert self ._stream is not None
327257 return self ._stream .sequence (sql_script )
328258
329- @run_in_executor
330259 def _raw_batch (self , batch : Batch ) -> asyncio .Future [BatchResult ]:
331260 assert self ._stream is not None
332261 return self ._stream .batch (batch )
333262
334263 def _raw_close (self ) -> None :
264+ asyncio .create_task (self ._cleanup ())
265+
266+ async def _cleanup (self ) -> None :
335267 # use of getattr as the object may fail init
336268 stream = getattr (self , "_stream" , None )
337269 if stream is not None :
@@ -340,18 +272,14 @@ def _raw_close(self) -> None:
340272
341273 conn = getattr (self , "_conn" , None )
342274 if conn is not None :
343- self ._dispose_connection (conn )
275+ await self ._dispose_connection (conn )
344276 self ._conn = None
345277
346278 session = getattr (self , "_session" , None )
347279 if session is not None :
348- self ._dispose_session (session )
280+ await self ._dispose_session (session )
349281 self ._session = None
350282
351- executor = getattr (self , "_executor" , None )
352- if executor is not None :
353- self ._dispose_executor (executor )
354- self ._executor = None
355283
356284
357285class CursorHrana (Cursor ):
0 commit comments