From 6d21e82591aeee22239d10ddcd61d9b08741e340 Mon Sep 17 00:00:00 2001 From: yuxin00j Date: Tue, 17 Mar 2026 02:53:39 +0000 Subject: [PATCH 1/7] Parallelize bucket get and listing in _info for bucket paths --- gcsfs/core.py | 29 +++++++++++++------ gcsfs/tests/test_core.py | 60 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 916437e4..d61da32d 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1048,17 +1048,28 @@ async def _info(self, path, generation=None, **kwargs): """File information about this path.""" path = self._strip_protocol(path).rstrip("/") if "/" not in path: - try: - out = await self._call("GET", f"b/{path}", json_out=True) + # Parallelize the call and ls + res_call, res_ls = await asyncio.gather( + self._call("GET", f"b/{path}", json_out=True), + self._ls(path, max_results=1), + return_exceptions=True, + ) + + if not isinstance(res_call, Exception): + out = res_call out.update(size=0, type="directory") - except OSError: + return out + + if isinstance(res_call, OSError): # GET bucket failed, try ls; will have no metadata - exists = await self._ls(path) - if exists: - out = {"name": path, "size": 0, "type": "directory"} - else: - raise FileNotFoundError(path) - return out + if isinstance(res_ls, Exception): + raise res_ls + if res_ls: + return {"name": path, "size": 0, "type": "directory"} + raise FileNotFoundError(path) + + raise res_call + # Check directory cache for parent dir parent_path = self._parent(path) parent_cache = self._ls_from_cache(parent_path) diff --git a/gcsfs/tests/test_core.py b/gcsfs/tests/test_core.py index 26f8c054..5e26a2cf 100644 --- a/gcsfs/tests/test_core.py +++ b/gcsfs/tests/test_core.py @@ -2002,3 +2002,63 @@ def test_mv_file_raises_error_for_specific_generation(gcs): gcs.mv_file(src, dest) finally: gcs.version_aware = original_version_aware + + +@pytest.mark.asyncio +async def test_info_bucket_optimization(gcs): + bucket = "test-bucket" + + # Mock _call to fail with OSError on GET b/test-bucket + # and mock _ls to return a list + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.side_effect = OSError("Failed to GET bucket") + with mock.patch.object(gcs, "_ls", new_callable=mock.AsyncMock) as mock_ls: + mock_ls.return_value = ["test-bucket/"] + + # Use await gcs._info as it is an async method + info = await gcs._info(bucket) + + # Verify _call was called for the bucket GET + mock_call.assert_called_with("GET", f"b/{bucket}", json_out=True) + + # Verify _ls was called with max_results=1 + mock_ls.assert_awaited_once_with(bucket, max_results=1) + + assert info == {"name": bucket, "size": 0, "type": "directory"} + + +@pytest.mark.asyncio +async def test_info_bucket_not_found_optimization(gcs): + bucket = "non-existent-bucket" + + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.side_effect = OSError("Failed to GET bucket") + with mock.patch.object(gcs, "_ls", new_callable=mock.AsyncMock) as mock_ls: + mock_ls.return_value = [] + + with pytest.raises(FileNotFoundError): + await gcs._info(bucket) + + mock_ls.assert_awaited_once_with(bucket, max_results=1) + + +@pytest.mark.asyncio +async def test_info_bucket_success_parallel(gcs): + bucket = "test-bucket" + + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.return_value = {"name": bucket, "kind": "storage#bucket"} + with mock.patch.object(gcs, "_ls", new_callable=mock.AsyncMock) as mock_ls: + mock_ls.return_value = ["test-bucket/"] + + info = await gcs._info(bucket) + + mock_call.assert_called_with("GET", f"b/{bucket}", json_out=True) + mock_ls.assert_awaited_once_with(bucket, max_results=1) + + assert info == { + "name": bucket, + "kind": "storage#bucket", + "size": 0, + "type": "directory", + } From 6b4a8d38dcb71c00536417c782ae4f546b9a0b82 Mon Sep 17 00:00:00 2001 From: yuxin00j Date: Thu, 19 Mar 2026 07:57:10 +0000 Subject: [PATCH 2/7] Implement service account impersonation for info microbenchmarks to test restricted permissions. --- gcsfs/tests/perf/microbenchmarks/README.md | 1 + gcsfs/tests/perf/microbenchmarks/conftest.py | 30 ++++++++++++++++--- .../perf/microbenchmarks/info/configs.py | 8 +++++ .../perf/microbenchmarks/info/configs.yaml | 9 ++++++ .../perf/microbenchmarks/info/parameters.py | 3 ++ 5 files changed, 47 insertions(+), 4 deletions(-) diff --git a/gcsfs/tests/perf/microbenchmarks/README.md b/gcsfs/tests/perf/microbenchmarks/README.md index 4d35fab9..ecc200a0 100644 --- a/gcsfs/tests/perf/microbenchmarks/README.md +++ b/gcsfs/tests/perf/microbenchmarks/README.md @@ -47,6 +47,7 @@ The benchmarks use a set of parameter classes to define the configuration for ea * **Info Parameters**: Specific to Info operations (extends Listing Parameters). * `target_type`: The type of target to query: "bucket", "folder", or "file". + * `impersonate_sa`: (Optional) Service account email to impersonate for the benchmark execution. Used to test scenarios with restricted IAM permissions. ## Configuration diff --git a/gcsfs/tests/perf/microbenchmarks/conftest.py b/gcsfs/tests/perf/microbenchmarks/conftest.py index 5531299b..e1879697 100644 --- a/gcsfs/tests/perf/microbenchmarks/conftest.py +++ b/gcsfs/tests/perf/microbenchmarks/conftest.py @@ -148,7 +148,29 @@ def _benchmark_listing_fixture_helper( create_folders=False, require_file_paths=False, ): - gcs = extended_gcs_factory() + gcs_admin = extended_gcs_factory() + gcs = gcs_admin + + impersonate_sa = getattr(params, "impersonate_sa", None) + if impersonate_sa: + logging.info(f"Impersonating service account {impersonate_sa} for benchmark.") + try: + import subprocess + + token_cmd = [ + "gcloud", + "auth", + "print-access-token", + f"--impersonate-service-account={impersonate_sa}", + ] + result = subprocess.run( + token_cmd, capture_output=True, text=True, check=True + ) + token = result.stdout.strip() + gcs = extended_gcs_factory(token=token) + except Exception as e: + logging.error(f"Failed to impersonate service account: {e}") + raise prefix = f"{params.bucket_name}/{prefix_tag}-{uuid.uuid4()}" @@ -198,7 +220,7 @@ def _benchmark_listing_fixture_helper( f"folders at depth {depth} with prefix '{prefix}'." ) start_time = time.perf_counter() - _prepare_folders(gcs, target_dirs) + _prepare_folders(gcs_admin, target_dirs) duration_ms = (time.perf_counter() - start_time) * 1000 logging.info( f"Benchmark '{params.name}' setup created {len(target_dirs)} folders in {duration_ms:.2f} ms." @@ -218,7 +240,7 @@ def _benchmark_listing_fixture_helper( ) start_time = time.perf_counter() - _prepare_files(gcs, file_paths, getattr(params, "file_size_bytes", 0)) + _prepare_files(gcs_admin, file_paths, getattr(params, "file_size_bytes", 0)) duration_ms = (time.perf_counter() - start_time) * 1000 logging.info( @@ -236,7 +258,7 @@ def _benchmark_listing_fixture_helper( f"Tearing down benchmark '{params.name}': deleting files and folders." ) try: - gcs.rm(f"{prefix}*", recursive=True) + gcs_admin.rm(f"{prefix}*", recursive=True) except Exception as e: logging.error(f"Failed to clean up benchmark files: {e}") diff --git a/gcsfs/tests/perf/microbenchmarks/info/configs.py b/gcsfs/tests/perf/microbenchmarks/info/configs.py index 20ae9ff7..1a4c5a9c 100644 --- a/gcsfs/tests/perf/microbenchmarks/info/configs.py +++ b/gcsfs/tests/perf/microbenchmarks/info/configs.py @@ -5,6 +5,14 @@ class InfoConfigurator(ListingConfigurator): param_class = InfoBenchmarkParameters + def build_cases(self, scenario, common_config): + cases = super().build_cases(scenario, common_config) + impersonate_sa = scenario.get("impersonate_sa") + if impersonate_sa: + for case in cases: + case.impersonate_sa = impersonate_sa + return cases + def _get_folders_list(self, scenario, common_config): return common_config.get("folders", [1]) diff --git a/gcsfs/tests/perf/microbenchmarks/info/configs.yaml b/gcsfs/tests/perf/microbenchmarks/info/configs.yaml index ff021956..5ae3cef2 100644 --- a/gcsfs/tests/perf/microbenchmarks/info/configs.yaml +++ b/gcsfs/tests/perf/microbenchmarks/info/configs.yaml @@ -26,3 +26,12 @@ scenarios: pattern: "info" depth: 10 processes: [4, 8] + + # Tests the fallback logic in _info() when GET bucket fails (e.g., restricted permissions). + # Setup creates 100 files/folders using Admin credentials, then benchmark runs with SA token. + # Requires the impersonated SA to have at least 'roles/storage.objectViewer' on the bucket. + - name: "info_restricted_bucket" + pattern: "info" + depth: 1 + target_types: ["bucket"] + impersonate_sa: "parallel-info-tester@gcs-aiml-clients-testing-101.iam.gserviceaccount.com" diff --git a/gcsfs/tests/perf/microbenchmarks/info/parameters.py b/gcsfs/tests/perf/microbenchmarks/info/parameters.py index 7eec9162..351e8734 100644 --- a/gcsfs/tests/perf/microbenchmarks/info/parameters.py +++ b/gcsfs/tests/perf/microbenchmarks/info/parameters.py @@ -13,3 +13,6 @@ class InfoBenchmarkParameters(ListingBenchmarkParameters): # The type of target to query: "bucket", "folder", or "file". target_type: str + + # The service account to impersonate for the benchmark. + impersonate_sa: str = None From 911fa099e2d898ce2e0633b34bf8909be31fc439 Mon Sep 17 00:00:00 2001 From: yuxin00j Date: Fri, 20 Mar 2026 03:35:07 +0000 Subject: [PATCH 3/7] Add missing test coverage for _info bucket optimization --- gcsfs/tests/test_core.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/gcsfs/tests/test_core.py b/gcsfs/tests/test_core.py index 5e26a2cf..0802c6d9 100644 --- a/gcsfs/tests/test_core.py +++ b/gcsfs/tests/test_core.py @@ -2062,3 +2062,35 @@ async def test_info_bucket_success_parallel(gcs): "size": 0, "type": "directory", } + + +@pytest.mark.asyncio +async def test_info_bucket_ls_exception(gcs): + bucket = "test-bucket" + + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.side_effect = OSError("Failed to GET bucket") + with mock.patch.object(gcs, "_ls", new_callable=mock.AsyncMock) as mock_ls: + mock_ls.side_effect = ValueError("LS error") + + with pytest.raises(ValueError, match="LS error"): + await gcs._info(bucket) + + mock_call.assert_called_with("GET", f"b/{bucket}", json_out=True) + mock_ls.assert_awaited_once_with(bucket, max_results=1) + + +@pytest.mark.asyncio +async def test_info_bucket_other_exception(gcs): + bucket = "test-bucket" + + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.side_effect = ValueError("Some other error") + with mock.patch.object(gcs, "_ls", new_callable=mock.AsyncMock) as mock_ls: + mock_ls.return_value = ["test-bucket/"] + + with pytest.raises(ValueError, match="Some other error"): + await gcs._info(bucket) + + mock_call.assert_called_with("GET", f"b/{bucket}", json_out=True) + mock_ls.assert_awaited_once_with(bucket, max_results=1) From ffb88f333e1d8cffb0eae66a485db21f2a7320f4 Mon Sep 17 00:00:00 2001 From: yuxin00j Date: Thu, 26 Mar 2026 03:45:34 +0000 Subject: [PATCH 4/7] Refactor _info for buckets to use independent parallel tasks and early return (PR #780) --- gcsfs/core.py | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index d61da32d..8f5c9441 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1048,27 +1048,24 @@ async def _info(self, path, generation=None, **kwargs): """File information about this path.""" path = self._strip_protocol(path).rstrip("/") if "/" not in path: - # Parallelize the call and ls - res_call, res_ls = await asyncio.gather( - self._call("GET", f"b/{path}", json_out=True), - self._ls(path, max_results=1), - return_exceptions=True, + get_task = asyncio.create_task( + self._call("GET", f"b/{path}", json_out=True) ) - - if not isinstance(res_call, Exception): - out = res_call - out.update(size=0, type="directory") - return out - - if isinstance(res_call, OSError): - # GET bucket failed, try ls; will have no metadata - if isinstance(res_ls, Exception): - raise res_ls - if res_ls: - return {"name": path, "size": 0, "type": "directory"} - raise FileNotFoundError(path) - - raise res_call + ls_task = asyncio.create_task(self._ls(path, max_results=1)) + try: + try: + out = await get_task + out.update(size=0, type="directory") + return out + except OSError: + if await ls_task: + return {"name": path, "size": 0, "type": "directory"} + raise FileNotFoundError(path) + finally: + if not get_task.done(): + get_task.cancel() + if not ls_task.done(): + ls_task.cancel() # Check directory cache for parent dir parent_path = self._parent(path) From 972eb5ea89e865618272b80b5bbcb0ed859b5117 Mon Sep 17 00:00:00 2001 From: yuxin00j Date: Thu, 26 Mar 2026 06:49:23 +0000 Subject: [PATCH 5/7] Clean up microbenchmarks (remove impersonate_sa) and add integration tests for _info fallback --- gcsfs/tests/integration/test_async_gcsfs.py | 31 +++++++++++++++++++ gcsfs/tests/perf/microbenchmarks/README.md | 1 - gcsfs/tests/perf/microbenchmarks/conftest.py | 21 ------------- .../perf/microbenchmarks/info/configs.py | 4 --- .../perf/microbenchmarks/info/configs.yaml | 9 ------ .../perf/microbenchmarks/info/parameters.py | 3 -- gcsfs/tests/test_core.py | 21 +++++++++++++ 7 files changed, 52 insertions(+), 38 deletions(-) diff --git a/gcsfs/tests/integration/test_async_gcsfs.py b/gcsfs/tests/integration/test_async_gcsfs.py index 7b1345c9..500b6603 100644 --- a/gcsfs/tests/integration/test_async_gcsfs.py +++ b/gcsfs/tests/integration/test_async_gcsfs.py @@ -230,6 +230,37 @@ async def test_async_info(async_gcs, hns_file_path): assert info["type"] == "file" +@pytest.mark.asyncio +async def test_async_info_fallback(async_gcs, hns_file_path): + """Test that _info falls back to _ls when _call (GET) fails.""" + # Create a dummy file to ensure listing works if we use a path under it + # We want to test bucket level _info fallback, so we test on the bucket itself. + bucket, _, _ = async_gcs.split_path(hns_file_path) + + # We pipe a file to ensure the bucket is not empty. If the bucket is empty, + # _ls returns [], which the fallback logic evaluates as falsy, causing it + # to raise FileNotFoundError even if the bucket exists. + file_path = f"{hns_file_path}/fallback_file" + await async_gcs._pipe_file(file_path, b"data") + + original_call = async_gcs._call + + async def mock_call(*args, **kwargs): + if len(args) >= 2 and args[0] == "GET" and args[1] == f"b/{bucket}": + raise OSError("Simulated 403 Forbidden") + return await original_call(*args, **kwargs) + + async_gcs._call = mock_call + + try: + # Calling _info on the bucket root should fall back to _ls + info = await async_gcs._info(bucket) + assert info["name"] == bucket + assert info["type"] == "directory" + finally: + async_gcs._call = original_call + + @pytest.mark.asyncio async def test_async_rm_recursive(async_gcs, hns_file_path): """Test async _rm recursive.""" diff --git a/gcsfs/tests/perf/microbenchmarks/README.md b/gcsfs/tests/perf/microbenchmarks/README.md index ecc200a0..4d35fab9 100644 --- a/gcsfs/tests/perf/microbenchmarks/README.md +++ b/gcsfs/tests/perf/microbenchmarks/README.md @@ -47,7 +47,6 @@ The benchmarks use a set of parameter classes to define the configuration for ea * **Info Parameters**: Specific to Info operations (extends Listing Parameters). * `target_type`: The type of target to query: "bucket", "folder", or "file". - * `impersonate_sa`: (Optional) Service account email to impersonate for the benchmark execution. Used to test scenarios with restricted IAM permissions. ## Configuration diff --git a/gcsfs/tests/perf/microbenchmarks/conftest.py b/gcsfs/tests/perf/microbenchmarks/conftest.py index e1879697..4795bd6c 100644 --- a/gcsfs/tests/perf/microbenchmarks/conftest.py +++ b/gcsfs/tests/perf/microbenchmarks/conftest.py @@ -151,27 +151,6 @@ def _benchmark_listing_fixture_helper( gcs_admin = extended_gcs_factory() gcs = gcs_admin - impersonate_sa = getattr(params, "impersonate_sa", None) - if impersonate_sa: - logging.info(f"Impersonating service account {impersonate_sa} for benchmark.") - try: - import subprocess - - token_cmd = [ - "gcloud", - "auth", - "print-access-token", - f"--impersonate-service-account={impersonate_sa}", - ] - result = subprocess.run( - token_cmd, capture_output=True, text=True, check=True - ) - token = result.stdout.strip() - gcs = extended_gcs_factory(token=token) - except Exception as e: - logging.error(f"Failed to impersonate service account: {e}") - raise - prefix = f"{params.bucket_name}/{prefix_tag}-{uuid.uuid4()}" # Deterministic folder structure generation diff --git a/gcsfs/tests/perf/microbenchmarks/info/configs.py b/gcsfs/tests/perf/microbenchmarks/info/configs.py index 1a4c5a9c..f3097782 100644 --- a/gcsfs/tests/perf/microbenchmarks/info/configs.py +++ b/gcsfs/tests/perf/microbenchmarks/info/configs.py @@ -7,10 +7,6 @@ class InfoConfigurator(ListingConfigurator): def build_cases(self, scenario, common_config): cases = super().build_cases(scenario, common_config) - impersonate_sa = scenario.get("impersonate_sa") - if impersonate_sa: - for case in cases: - case.impersonate_sa = impersonate_sa return cases def _get_folders_list(self, scenario, common_config): diff --git a/gcsfs/tests/perf/microbenchmarks/info/configs.yaml b/gcsfs/tests/perf/microbenchmarks/info/configs.yaml index 5ae3cef2..ff021956 100644 --- a/gcsfs/tests/perf/microbenchmarks/info/configs.yaml +++ b/gcsfs/tests/perf/microbenchmarks/info/configs.yaml @@ -26,12 +26,3 @@ scenarios: pattern: "info" depth: 10 processes: [4, 8] - - # Tests the fallback logic in _info() when GET bucket fails (e.g., restricted permissions). - # Setup creates 100 files/folders using Admin credentials, then benchmark runs with SA token. - # Requires the impersonated SA to have at least 'roles/storage.objectViewer' on the bucket. - - name: "info_restricted_bucket" - pattern: "info" - depth: 1 - target_types: ["bucket"] - impersonate_sa: "parallel-info-tester@gcs-aiml-clients-testing-101.iam.gserviceaccount.com" diff --git a/gcsfs/tests/perf/microbenchmarks/info/parameters.py b/gcsfs/tests/perf/microbenchmarks/info/parameters.py index 351e8734..7eec9162 100644 --- a/gcsfs/tests/perf/microbenchmarks/info/parameters.py +++ b/gcsfs/tests/perf/microbenchmarks/info/parameters.py @@ -13,6 +13,3 @@ class InfoBenchmarkParameters(ListingBenchmarkParameters): # The type of target to query: "bucket", "folder", or "file". target_type: str - - # The service account to impersonate for the benchmark. - impersonate_sa: str = None diff --git a/gcsfs/tests/test_core.py b/gcsfs/tests/test_core.py index 0802c6d9..de6449e3 100644 --- a/gcsfs/tests/test_core.py +++ b/gcsfs/tests/test_core.py @@ -2094,3 +2094,24 @@ async def test_info_bucket_other_exception(gcs): mock_call.assert_called_with("GET", f"b/{bucket}", json_out=True) mock_ls.assert_awaited_once_with(bucket, max_results=1) + + +@pytest.mark.asyncio +async def test_info_bucket_fallback_success(gcs): + bucket = "test-bucket" + + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.side_effect = OSError("Access denied") + with mock.patch.object(gcs, "_ls", new_callable=mock.AsyncMock) as mock_ls: + mock_ls.return_value = ["test-bucket/some-file"] + + info = await gcs._info(bucket) + + mock_call.assert_called_with("GET", f"b/{bucket}", json_out=True) + mock_ls.assert_awaited_once_with(bucket, max_results=1) + + assert info == { + "name": bucket, + "size": 0, + "type": "directory", + } From 33b2f369bb9d4d6c9367a59a2c8a4882d15c32f7 Mon Sep 17 00:00:00 2001 From: yuxin00j Date: Thu, 26 Mar 2026 09:15:21 +0000 Subject: [PATCH 6/7] Revert gcs_admin back to gcs in microbenchmarks conftest.py --- gcsfs/tests/perf/microbenchmarks/conftest.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/gcsfs/tests/perf/microbenchmarks/conftest.py b/gcsfs/tests/perf/microbenchmarks/conftest.py index 4795bd6c..5531299b 100644 --- a/gcsfs/tests/perf/microbenchmarks/conftest.py +++ b/gcsfs/tests/perf/microbenchmarks/conftest.py @@ -148,8 +148,7 @@ def _benchmark_listing_fixture_helper( create_folders=False, require_file_paths=False, ): - gcs_admin = extended_gcs_factory() - gcs = gcs_admin + gcs = extended_gcs_factory() prefix = f"{params.bucket_name}/{prefix_tag}-{uuid.uuid4()}" @@ -199,7 +198,7 @@ def _benchmark_listing_fixture_helper( f"folders at depth {depth} with prefix '{prefix}'." ) start_time = time.perf_counter() - _prepare_folders(gcs_admin, target_dirs) + _prepare_folders(gcs, target_dirs) duration_ms = (time.perf_counter() - start_time) * 1000 logging.info( f"Benchmark '{params.name}' setup created {len(target_dirs)} folders in {duration_ms:.2f} ms." @@ -219,7 +218,7 @@ def _benchmark_listing_fixture_helper( ) start_time = time.perf_counter() - _prepare_files(gcs_admin, file_paths, getattr(params, "file_size_bytes", 0)) + _prepare_files(gcs, file_paths, getattr(params, "file_size_bytes", 0)) duration_ms = (time.perf_counter() - start_time) * 1000 logging.info( @@ -237,7 +236,7 @@ def _benchmark_listing_fixture_helper( f"Tearing down benchmark '{params.name}': deleting files and folders." ) try: - gcs_admin.rm(f"{prefix}*", recursive=True) + gcs.rm(f"{prefix}*", recursive=True) except Exception as e: logging.error(f"Failed to clean up benchmark files: {e}") From 0835b7faaa8f2dc2799eb1691e48d352f3a9538a Mon Sep 17 00:00:00 2001 From: yuxin00j Date: Thu, 2 Apr 2026 06:39:09 +0000 Subject: [PATCH 7/7] Optimize GCSFileSystem._info for bucket root using parallel_tasks_first_completed --- gcsfs/concurrency.py | 23 +++++++++++++++++++++++ gcsfs/core.py | 19 +++++++++---------- 2 files changed, 32 insertions(+), 10 deletions(-) create mode 100644 gcsfs/concurrency.py diff --git a/gcsfs/concurrency.py b/gcsfs/concurrency.py new file mode 100644 index 00000000..10b32c9c --- /dev/null +++ b/gcsfs/concurrency.py @@ -0,0 +1,23 @@ +import asyncio +from contextlib import asynccontextmanager + + +@asynccontextmanager +async def parallel_tasks_first_completed(coros): + """ + Starts coroutines in parallel and enters the context as soon as + at least one task has completed. Automatically cancels pending tasks + when exiting the context. + """ + tasks = [asyncio.create_task(c) for c in coros] + try: + # Suspend until the first task finishes for maximum responsiveness + done, pending = await asyncio.wait( + set(tasks), return_when=asyncio.FIRST_COMPLETED + ) + yield tasks, done, pending + finally: + # Ensure 'losing' tasks are cancelled immediately + for t in tasks: + if not t.done(): + t.cancel() diff --git a/gcsfs/core.py b/gcsfs/core.py index 8f5c9441..aa6d2301 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -27,6 +27,7 @@ from . import __version__ as version from .checkers import get_consistency_checker +from .concurrency import parallel_tasks_first_completed from .credentials import GoogleCredentials from .inventory_report import InventoryReport from .retry import errs, retry_request, validate_response @@ -1048,11 +1049,14 @@ async def _info(self, path, generation=None, **kwargs): """File information about this path.""" path = self._strip_protocol(path).rstrip("/") if "/" not in path: - get_task = asyncio.create_task( - self._call("GET", f"b/{path}", json_out=True) - ) - ls_task = asyncio.create_task(self._ls(path, max_results=1)) - try: + async with parallel_tasks_first_completed( + [ + self._call("GET", f"b/{path}", json_out=True), + self._ls(path, max_results=1), + ] + ) as (tasks, done, pending): + get_task, ls_task = tasks + try: out = await get_task out.update(size=0, type="directory") @@ -1061,11 +1065,6 @@ async def _info(self, path, generation=None, **kwargs): if await ls_task: return {"name": path, "size": 0, "type": "directory"} raise FileNotFoundError(path) - finally: - if not get_task.done(): - get_task.cancel() - if not ls_task.done(): - ls_task.cancel() # Check directory cache for parent dir parent_path = self._parent(path)