diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 360758ac6..957af531b 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -263,9 +263,22 @@ async def _run_coro(coro, i): break done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + first_exc = None while done: - result, k = await done.pop() - results[k] = result + task = done.pop() + try: + result, k = await task + results[k] = result + except Exception as exc: + if first_exc is None: + first_exc = exc + + if first_exc is not None: + for task in pending: + task.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) + raise first_exc return results @@ -795,11 +808,18 @@ async def _glob(self, path, maxdepth=None, **kwargs): else: return {} elif "/" in path[:min_idx]: + first_wildcard_idx = min_idx min_idx = path[:min_idx].rindex("/") - root = path[: min_idx + 1] + root = path[ + : min_idx + 1 + ] # everything up to the last / before the first wildcard + prefix = path[ + min_idx + 1 : first_wildcard_idx + ] # stem between last "/" and first wildcard depth = path[min_idx + 1 :].count("/") + 1 else: root = "" + prefix = path[:min_idx] # stem up to the first wildcard depth = path[min_idx + 1 :].count("/") + 1 if "**" in path: @@ -810,6 +830,10 @@ async def _glob(self, path, maxdepth=None, **kwargs): else: depth = None + # Pass the filename stem as prefix= so backends that support it such as + # gcsfs, s3fs and adlfs can filter server-side up to the first wildcard. + if prefix: + kwargs["prefix"] = prefix allpaths = await self._find( root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs ) diff --git a/fsspec/tests/test_async.py b/fsspec/tests/test_async.py index 53dd06562..561a03e4a 100644 --- a/fsspec/tests/test_async.py +++ b/fsspec/tests/test_async.py @@ -244,3 +244,125 @@ def test_rm_file_without_implementation(): fs = fsspec.asyn.AsyncFileSystem() with pytest.raises(NotImplementedError): fs.rm_file("test/file.txt") + + +# --------------------------------------------------------------------------- +# Tests for the prefix= hint that _glob passes to _find +# --------------------------------------------------------------------------- + +_GLOB_PREFIX_FILES = [ + "data/2024/results.csv", + "data/2024/report.txt", + "data/2023/results.csv", + "top_results.csv", + "top_other.txt", + "other/results.csv", +] + + +class _PrefixCapturingFS(fsspec.asyn.AsyncFileSystem): + """Minimal AsyncFileSystem that records every _find call's kwargs. + + _find ignores the prefix hint and returns all files under *root* so that + the client-side glob pattern-matching in _glob still works correctly. + This simulates a "naive" backend that silently absorbs unknown kwargs. + """ + + protocol = "prefixmock" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.find_calls = [] + + async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): + self.find_calls.append({"path": path, "kwargs": dict(kwargs)}) + root = (path.rstrip("/") + "/") if path else "" + results = { + f: {"name": f, "type": "file", "size": 0} + for f in _GLOB_PREFIX_FILES + if f.startswith(root) + } + return results if detail else list(results) + + async def _info(self, path, **kwargs): + for f in _GLOB_PREFIX_FILES: + if f == path: + return {"name": f, "type": "file", "size": 0} + raise FileNotFoundError(path) + + +@pytest.fixture +def prefix_fs(): + return _PrefixCapturingFS(skip_instance_cache=True) + + +@pytest.mark.parametrize( + "pattern, expected_results, expected_prefix", + [ + # root/stem* -> prefix="stem" + ( + "data/2024/res*", + ["data/2024/results.csv"], + "res", + ), + # root/stem*.ext -> prefix="stem" + ( + "data/2024/re*.txt", + ["data/2024/report.txt"], + "re", + ), + # stem* (no slash) -> prefix="stem" + ( + "top_*", + ["top_other.txt", "top_results.csv"], + "top_", + ), + # ? wildcard: prefix is everything before the first ? + ( + "top_r?sults.csv", + ["top_results.csv"], + "top_r", + ), + # [ wildcard: prefix is everything before the first [ + # re[rp]* translates to re[rp][^/]* so only report.txt matches (not results.csv) + ( + "data/2024/re[rp]*", + ["data/2024/report.txt"], + "re", + ), + # root/* (wildcard immediately after /) -> empty prefix, NOT forwarded + ( + "data/2024/*", + ["data/2024/report.txt", "data/2024/results.csv"], + None, + ), + # bare * (no prefix at all) -> NOT forwarded + # * translates to [^/]+ so paths containing / are excluded + ( + "*", + ["top_other.txt", "top_results.csv"], + None, + ), + ], +) +def test_glob_prefix_hint(prefix_fs, pattern, expected_results, expected_prefix): + """_glob should extract the literal stem before the first wildcard and + forward it as ``prefix=`` to ``_find``. When the stem is empty the kwarg + must not be forwarded at all so that backends that reject unknown kwargs + are not broken. The glob results must be correct regardless.""" + results = prefix_fs.glob(pattern) + assert sorted(results) == sorted(expected_results) + + assert len(prefix_fs.find_calls) == 1 + forwarded = prefix_fs.find_calls[0]["kwargs"] + + if expected_prefix is None: + assert "prefix" not in forwarded, ( + f"prefix= should not be forwarded for pattern {pattern!r}, " + f"but got prefix={forwarded.get('prefix')!r}" + ) + else: + assert forwarded.get("prefix") == expected_prefix, ( + f"expected prefix={expected_prefix!r} for pattern {pattern!r}, " + f"got {forwarded.get('prefix')!r}" + )