Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,22 @@ async def _run_coro(coro, i):
break

done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
first_exc = None
while done:
result, k = await done.pop()
results[k] = result
task = done.pop()
try:
result, k = await task
results[k] = result
except Exception as exc:
if first_exc is None:
first_exc = exc

if first_exc is not None:
for task in pending:
task.cancel()
if pending:
await asyncio.gather(*pending, return_exceptions=True)
raise first_exc

return results

Expand Down Expand Up @@ -795,11 +808,18 @@ async def _glob(self, path, maxdepth=None, **kwargs):
else:
return {}
elif "/" in path[:min_idx]:
first_wildcard_idx = min_idx
min_idx = path[:min_idx].rindex("/")
root = path[: min_idx + 1]
root = path[
: min_idx + 1
] # everything up to the last / before the first wildcard
prefix = path[
min_idx + 1 : first_wildcard_idx
] # stem between last "/" and first wildcard
depth = path[min_idx + 1 :].count("/") + 1
else:
root = ""
prefix = path[:min_idx] # stem up to the first wildcard
depth = path[min_idx + 1 :].count("/") + 1

if "**" in path:
Expand All @@ -810,6 +830,10 @@ async def _glob(self, path, maxdepth=None, **kwargs):
else:
depth = None

# Pass the filename stem as prefix= so backends that support it such as
# gcsfs, s3fs and adlfs can filter server-side up to the first wildcard.
if prefix:
kwargs["prefix"] = prefix
allpaths = await self._find(
root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs
)
Expand Down
122 changes: 122 additions & 0 deletions fsspec/tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,125 @@ def test_rm_file_without_implementation():
fs = fsspec.asyn.AsyncFileSystem()
with pytest.raises(NotImplementedError):
fs.rm_file("test/file.txt")


# ---------------------------------------------------------------------------
# Tests for the prefix= hint that _glob passes to _find
# ---------------------------------------------------------------------------

_GLOB_PREFIX_FILES = [
"data/2024/results.csv",
"data/2024/report.txt",
"data/2023/results.csv",
"top_results.csv",
"top_other.txt",
"other/results.csv",
]


class _PrefixCapturingFS(fsspec.asyn.AsyncFileSystem):
"""Minimal AsyncFileSystem that records every _find call's kwargs.

_find ignores the prefix hint and returns all files under *root* so that
the client-side glob pattern-matching in _glob still works correctly.
This simulates a "naive" backend that silently absorbs unknown kwargs.
"""

protocol = "prefixmock"

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.find_calls = []

async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
self.find_calls.append({"path": path, "kwargs": dict(kwargs)})
root = (path.rstrip("/") + "/") if path else ""
results = {
f: {"name": f, "type": "file", "size": 0}
for f in _GLOB_PREFIX_FILES
if f.startswith(root)
}
return results if detail else list(results)

async def _info(self, path, **kwargs):
for f in _GLOB_PREFIX_FILES:
if f == path:
return {"name": f, "type": "file", "size": 0}
raise FileNotFoundError(path)


@pytest.fixture
def prefix_fs():
return _PrefixCapturingFS(skip_instance_cache=True)


@pytest.mark.parametrize(
"pattern, expected_results, expected_prefix",
[
# root/stem* -> prefix="stem"
(
"data/2024/res*",
["data/2024/results.csv"],
"res",
),
# root/stem*.ext -> prefix="stem"
(
"data/2024/re*.txt",
["data/2024/report.txt"],
"re",
),
# stem* (no slash) -> prefix="stem"
(
"top_*",
["top_other.txt", "top_results.csv"],
"top_",
),
# ? wildcard: prefix is everything before the first ?
(
"top_r?sults.csv",
["top_results.csv"],
"top_r",
),
# [ wildcard: prefix is everything before the first [
# re[rp]* translates to re[rp][^/]* so only report.txt matches (not results.csv)
(
"data/2024/re[rp]*",
["data/2024/report.txt"],
"re",
),
# root/* (wildcard immediately after /) -> empty prefix, NOT forwarded
(
"data/2024/*",
["data/2024/report.txt", "data/2024/results.csv"],
None,
),
# bare * (no prefix at all) -> NOT forwarded
# * translates to [^/]+ so paths containing / are excluded
(
"*",
["top_other.txt", "top_results.csv"],
None,
),
],
)
def test_glob_prefix_hint(prefix_fs, pattern, expected_results, expected_prefix):
"""_glob should extract the literal stem before the first wildcard and
forward it as ``prefix=`` to ``_find``. When the stem is empty the kwarg
must not be forwarded at all so that backends that reject unknown kwargs
are not broken. The glob results must be correct regardless."""
results = prefix_fs.glob(pattern)
assert sorted(results) == sorted(expected_results)

assert len(prefix_fs.find_calls) == 1
forwarded = prefix_fs.find_calls[0]["kwargs"]

if expected_prefix is None:
assert "prefix" not in forwarded, (
f"prefix= should not be forwarded for pattern {pattern!r}, "
f"but got prefix={forwarded.get('prefix')!r}"
)
else:
assert forwarded.get("prefix") == expected_prefix, (
f"expected prefix={expected_prefix!r} for pattern {pattern!r}, "
f"got {forwarded.get('prefix')!r}"
)
Loading