Skip to content

Commit 0450687

Browse files
committed
Fix FsspecFileIO.get_fs thread safety
`FsspecFileIO.get_fs` can be called by multiple threads when `ExecutorFactory` is used (for example by `DataScan.plan_files`). The base class of `fsspec` filesystem objects, `fsspec.spec.AbstractFileSystem`, internally caches instances through the `fsspec.spec._Cached` metaclass. The caching key used includes `threading.get_ident()`, making entries thread-local: https://github.com/fsspec/filesystem_spec/blame/f84b99f0d1f079f990db1a219b74df66ab3e7160/fsspec/spec.py#L71 The `FsspecFileIO.get_fs` LRU cache (around `FsspecFileIO._get_fs`) breaks the thread-locality of the filesystem instances as it will return the same instance for different threads. One consequence of this is that for `s3fs.S3FileSystem`, HTTP connection pooling no longer occurs per thread (as is normal with `aiobotocore`), as the `aiobotocore` client object (containing the `aiohttp.ClientSession`) is stored on the `s3fs.S3FileSystem`. This change addresses this by making the `FsspecFileIO.get_fs` cache thread-local.
1 parent 83789f0 commit 0450687

2 files changed

Lines changed: 42 additions & 3 deletions

File tree

pyiceberg/io/fsspec.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
import logging
2222
import os
23+
import threading
2324
from copy import copy
2425
from functools import lru_cache, partial
2526
from typing import (
@@ -370,7 +371,7 @@ class FsspecFileIO(FileIO):
370371
def __init__(self, properties: Properties):
371372
self._scheme_to_fs = {}
372373
self._scheme_to_fs.update(SCHEME_TO_FS)
373-
self.get_fs: Callable[[str], AbstractFileSystem] = lru_cache(self._get_fs)
374+
self._thread_locals = threading.local()
374375
super().__init__(properties=properties)
375376

376377
def new_input(self, location: str) -> FsspecInputFile:
@@ -416,6 +417,13 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
416417
fs = self.get_fs(uri.scheme)
417418
fs.rm(str_location)
418419

420+
def get_fs(self, scheme: str) -> AbstractFileSystem:
421+
"""Get a filesystem for a specific scheme, cached per thread."""
422+
if not hasattr(self._thread_locals, "get_fs_cached"):
423+
self._thread_locals.get_fs_cached = lru_cache(self._get_fs)
424+
425+
return self._thread_locals.get_fs_cached(scheme)
426+
419427
def _get_fs(self, scheme: str) -> AbstractFileSystem:
420428
"""Get a filesystem for a specific scheme."""
421429
if scheme not in self._scheme_to_fs:
@@ -425,10 +433,10 @@ def _get_fs(self, scheme: str) -> AbstractFileSystem:
425433
def __getstate__(self) -> Dict[str, Any]:
426434
"""Create a dictionary of the FsSpecFileIO fields used when pickling."""
427435
fileio_copy = copy(self.__dict__)
428-
fileio_copy["get_fs"] = None
436+
del fileio_copy["_thread_locals"]
429437
return fileio_copy
430438

431439
def __setstate__(self, state: Dict[str, Any]) -> None:
432440
"""Deserialize the state into a FsSpecFileIO instance."""
433441
self.__dict__ = state
434-
self.get_fs = lru_cache(self._get_fs)
442+
self._thread_locals = threading.local()

tests/io/test_fsspec.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import os
1919
import pickle
2020
import tempfile
21+
import threading
2122
import uuid
2223
from unittest import mock
2324

@@ -54,6 +55,36 @@ def test_fsspec_local_fs_can_create_path_without_parent_dir(fsspec_fileio: Fsspe
5455
pytest.fail("Failed to write to file without parent directory")
5556

5657

58+
def test_fsspec_get_fs_instance_per_thread_caching(fsspec_fileio: FsspecFileIO) -> None:
59+
"""Test that filesystem instances are cached per-thread by `FsspecFileIO.get_fs`"""
60+
fs_instances = []
61+
62+
def get_fs_instance() -> None:
63+
# Call twice to ensure caching within the same thread
64+
for _ in range(2):
65+
fs_instances.append(fsspec_fileio.get_fs("file"))
66+
67+
threads = [
68+
# Differing names required on the threads to ensure distinct `threading.get_ident()` values
69+
# as this is used as part of the `fsspec.spec.AbstractFileSystem` cache key.
70+
threading.Thread(target=get_fs_instance, name="test-thread-1"),
71+
threading.Thread(target=get_fs_instance, name="test-thread-2"),
72+
]
73+
74+
threads[0].start()
75+
threads[0].join()
76+
77+
threads[1].start()
78+
threads[1].join()
79+
80+
# Same thread, same instance
81+
assert fs_instances[0] is fs_instances[1]
82+
assert fs_instances[2] is fs_instances[3]
83+
84+
# Different threads, different instances
85+
assert fs_instances[0] is not fs_instances[2]
86+
87+
5788
@pytest.mark.s3
5889
def test_fsspec_new_input_file(fsspec_fileio: FsspecFileIO) -> None:
5990
"""Test creating a new input file from a fsspec file-io"""

0 commit comments

Comments
 (0)