Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cloudbuild/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ case "$TEST_SUITE" in
"--deselect=gcsfs/tests/test_core.py::test_rm_wildcards_non_recursive"
)

# The prefetcher engine is not integrated for zonal in this bucket.
# It will be integrated in a separate PR, after which this will be removed.
ZONAL_DESELECTS+=(
"--deselect=gcsfs/tests/test_core.py::test_cat_file_routing_and_thresholds"
"--deselect=gcsfs/tests/test_core.py::test_cat_file_concurrent_data_integrity"
"--deselect=gcsfs/tests/test_core.py::test_cat_file_concurrent_exception_cancellation"
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_prefetch_disabled_fallback"
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_prefetch_sequential_integrity"
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_prefetch_random_seek_integrity"
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_multithreaded_read_integrity"
"--deselect=gcsfs/tests/test_core.py::test_gcsfile_not_satisfiable_range"
)

pytest "${ARGS[@]}" "${ZONAL_DESELECTS[@]}" gcsfs/tests/test_core.py
;;
esac
Binary file added docs/source/_static/component.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/source/_static/flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
191 changes: 191 additions & 0 deletions docs/source/prefetcher.rst

Large diffs are not rendered by default.

111 changes: 108 additions & 3 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .credentials import GoogleCredentials
from .inventory_report import InventoryReport
from .retry import errs, retry_request, validate_response
from .zb_hns_utils import DEFAULT_CONCURRENCY, MAX_PREFETCH_SIZE

logger = logging.getLogger("gcsfs")

Expand Down Expand Up @@ -299,6 +300,7 @@ class GCSFileSystem(asyn.AsyncFileSystem):
default_block_size = DEFAULT_BLOCK_SIZE
protocol = "gs", "gcs"
async_impl = True
MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024

def __init__(
self,
Expand Down Expand Up @@ -1166,22 +1168,78 @@ def url(self, path):
f"&generation={generation}" if generation else "",
)

async def _cat_file(self, path, start=None, end=None, **kwargs):
async def _cat_file_sequential(self, path, start=None, end=None, **kwargs):
"""Simple one-shot get of file data"""
# if start and end are both provided and valid, but start >= end, return empty bytes
# Otherwise, _process_limits would generate an invalid HTTP range (e.g. "bytes=5-4"
# for start=5, end=5), causing the server to return the whole file instead of nothing.
if start is not None and end is not None and start >= end >= 0:
return b""

u2 = self.url(path)
# 'if start or end' fails when start=0 or end=0 because 0 is Falsey.
if start is not None or end is not None:
head = {"Range": await self._process_limits(path, start, end)}
else:
head = {}

headers, out = await self._call("GET", u2, headers=head)
return out

async def _cat_file_concurrent(
self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs
):
"""Concurrent fetch of file data"""
if start is None:
start = 0
if end is None:
end = (await self._info(path))["size"]
if start >= end:
return b""

if concurrency <= 1 or end - start < self.MIN_CHUNK_SIZE_FOR_CONCURRENCY:
return await self._cat_file_sequential(path, start=start, end=end, **kwargs)

total_size = end - start
part_size = total_size // concurrency
tasks = []

for i in range(concurrency):
offset = start + (i * part_size)
actual_size = (
part_size if i < concurrency - 1 else total_size - (i * part_size)
)
tasks.append(
asyncio.create_task(
self._cat_file_sequential(
path, start=offset, end=offset + actual_size, **kwargs
)
)
)

try:
results = await asyncio.gather(*tasks)
Comment on lines +1206 to +1220
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just gather(*[...]); I don't think you need to write out the loop. Also, you don't need create_task(), gather() does that automatically if given coroutines.

Copy link
Copy Markdown
Contributor Author

@googlyrahman googlyrahman Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to keep asyncio.create_task() is because we need explicit Task objects to manually cancel them in the except block if a failure occurs. If we were on Python 3.11+, we could definitely drop this and use asyncio.TaskGroup to handle the cancellation automatically, but gather doesn't do that natively.

I'm also going to stick with the explicit for loop. Packing the start/end offset calculations into a list comprehension makes that block too dense, so the explicit loop is necessary here for readability.

The code if i remove the loop

tasks = [
    asyncio.create_task(
         self._cat_file_sequential(
              path,
              start=start + (i * part_size),
              end=start + (i * part_size) + (part_size if i < concurrency - 1 else total_size - (i * part_size)),
              **kwargs
         )
    )
    for i in range(concurrency)
]

return b"".join(results)
Comment thread
googlyrahman marked this conversation as resolved.
except BaseException as e:
for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
Comment thread
martindurant marked this conversation as resolved.
raise e

async def _cat_file(
self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs
):
"""Simple one-shot, or concurrent get of file data"""
if concurrency > 1:
return await self._cat_file_concurrent(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should work for concurrency==1 too, instead of having two separate method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. However, in a follow-up CL, the concurrent method will use zero-copy. Therefore, this call is necessary because _cat_file_concurrent will fetch data differently moving forward, and we want to avoid shifting our entire workload to that new path all at once.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to avoid shifting our entire workload to that new path all at once.

Why? Having one code path to maintain should be better, unless you anticipate some problem.

Furthermore, join() doe not copy when not necessary:

>>> x is b"".join([x])
True

Copy link
Copy Markdown
Contributor Author

@googlyrahman googlyrahman Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I don't have any issue pointing this to the same code path, and I do not anticipate any problems with the new implementation.

However, from an organizational standpoint, we need to keep this feature strictly behind a flag to ensure the merge has no immediate impact, and hence want to keep these changes isolated so that when we later introduce zero-copy to the concurrent path, users who opt out of the flag will still safely default to the old behavior.

Once we make the new behavior the default, we will consolidate the code and remove this method. I have already added a comment in the code for the same.

path, start=start, end=end, concurrency=concurrency, **kwargs
)

# While we could just call _cat_file_concurrent(concurrency=1), we are choosing
# to keep it separate because concurrency code path is still in an experimental phase.
# Once concurrency code path is stabilized, we can remove this if-else condition.
return await self._cat_file_sequential(path, start=start, end=end, **kwargs)

async def _getxattr(self, path, attr):
"""Get user-defined metadata attribute"""
meta = (await self._info(path)).get("metadata", {})
Expand Down Expand Up @@ -2005,6 +2063,7 @@ def __init__(
if not key:
raise OSError("Attempt to open a bucket")
self.generation = _coalesce_generation(generation, path_generation)
self.concurrency = kwargs.get("concurrency", DEFAULT_CONCURRENCY)
super().__init__(
gcsfs,
path,
Expand All @@ -2021,6 +2080,34 @@ def __init__(
self.acl = acl
self.consistency = consistency
self.checker = get_consistency_checker(consistency)

# Ideally, all of these fields should be part of `cache_options`. Because current
# `fsspec` caches do not accept arbitrary `*args` and `**kwargs`, passing them
# there currently causes instantiation errors. We are holding off on introducing
# them as explicit keyword arguments to ensure existing user workloads are not
# disrupted. This will be refactored once the upstream `fsspec` changes are merged.
use_prefetch_reader = kwargs.get(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's only do env variable for flag and not kwargs

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree! This means when we want to surface the arguments, we'll have to support both and decide precedence.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I would also like to keep the argument & environment variable so that users can disable the prefetch engine if they would like to disable.

"use_experimental_adaptive_prefetching", False
) or os.environ.get(
"USE_EXPERIMENTAL_ADAPTIVE_PREFETCHING", "false"
).lower() in (
"true",
Comment thread
googlyrahman marked this conversation as resolved.
"1",
)

if "r" in mode and use_prefetch_reader:
max_prefetch_size = kwargs.get("max_prefetch_size", MAX_PREFETCH_SIZE)
from .prefetcher import BackgroundPrefetcher

self._prefetch_engine = BackgroundPrefetcher(
self._async_fetch_range,
self.size,
max_prefetch_size=max_prefetch_size,
concurrency=self.concurrency,
)
else:
self._prefetch_engine = None

# _supports_append is an internal argument not meant to be used directly.
# If True, allows opening file in append mode. This is generally not supported
# by GCS, but may be supported by subclasses (e.g. ZonalFile). This flag should
Expand Down Expand Up @@ -2203,12 +2290,30 @@ def _fetch_range(self, start=None, end=None):
if not both None, fetch only given range
"""
try:
return self.gcsfs.cat_file(self.path, start=start, end=end)
if hasattr(self, "_prefetch_engine") and self._prefetch_engine:
return self._prefetch_engine._fetch(start=start, end=end)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't see why you would have a standard fsspec cacher overlayed on the prefetcher. The only one it might work with is "readhead", but actually the prefetches does all of that functionality and more, no?

Copy link
Copy Markdown
Contributor Author

@googlyrahman googlyrahman Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary reason to position the prefetcher below cache rather than integrating it directly is to allow other caches to benefit from the prefetched data. Furthermore, we will always retain the ability to enable or disable the prefetch logic as needed.

This approach mirrors standard OS kernel architecture, which maintains the page cache (which can be bypassed) and the read-ahead prefetching mechanism as distinct, decoupled entities.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we can do some experiments if you like, read patterns like:

Some backtracking: 1,2,3,4,2,5,6,7,5...
Frequent visits home: 1,2,1,3,1,4,1,5...

but I strongly suspect that the first one would behave just like readahead (but better because of prefetching) and the second would be better with type "first" and the prefetcher doesn't help at all.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're exactly right that the prefetcher doesn't actively help with the 'frequent visits home' pattern, but crucially, it doesn't backfire either. Because the prefetcher only triggers after detecting a threshold of sequential reads (which we can configure to be 2, 3, or more), it simply stays out of the way during non-sequential access. This is why purely random workloads perform exactly the same whether prefetching is enabled or disabled, as reflected in the benchmarks.

Regarding the 'frequent visits home' pattern specifically: identifying and serving repeatedly accessed blocks (like block 1) is entirely the job of a cache, not a prefetcher. This pattern is actually a perfect example of why decoupling the two is so valuable. Layering them allows the cache to handle the repetitive hits, while the prefetcher handles the sequential scans.

return self.fs.cat_file(
self.path, start=start, end=end, concurrency=self.concurrency
)
except RuntimeError as e:
if "not satisfiable" in str(e):
return b""
raise

async def _async_fetch_range(self, start_offset, total_size, split_factor=1):
"""Async fetcher mapped to the Prefetcher engine for regional buckets."""
return await self.gcsfs._cat_file_concurrent(
self.path,
start=start_offset,
end=start_offset + total_size,
concurrency=split_factor,
)

def close(self):
super().close()
if hasattr(self, "_prefetch_engine") and self._prefetch_engine:
self._prefetch_engine.close()


def _convert_fixed_key_metadata(metadata, *, from_google=False):
"""
Expand Down
Loading
Loading