-
Notifications
You must be signed in to change notification settings - Fork 173
Add prefetcher reader for standard buckets. #795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
315c16b
229032e
a5aa385
4d8f3ee
e75960b
64ffc3f
298e0a4
242279a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| from .credentials import GoogleCredentials | ||
| from .inventory_report import InventoryReport | ||
| from .retry import errs, retry_request, validate_response | ||
| from .zb_hns_utils import DEFAULT_CONCURRENCY, MAX_PREFETCH_SIZE | ||
|
|
||
| logger = logging.getLogger("gcsfs") | ||
|
|
||
|
|
@@ -299,6 +300,7 @@ class GCSFileSystem(asyn.AsyncFileSystem): | |
| default_block_size = DEFAULT_BLOCK_SIZE | ||
| protocol = "gs", "gcs" | ||
| async_impl = True | ||
| MIN_CHUNK_SIZE_FOR_CONCURRENCY = 5 * 1024 * 1024 | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -1166,22 +1168,78 @@ def url(self, path): | |
| f"&generation={generation}" if generation else "", | ||
| ) | ||
|
|
||
| async def _cat_file(self, path, start=None, end=None, **kwargs): | ||
| async def _cat_file_sequential(self, path, start=None, end=None, **kwargs): | ||
| """Simple one-shot get of file data""" | ||
| # if start and end are both provided and valid, but start >= end, return empty bytes | ||
| # Otherwise, _process_limits would generate an invalid HTTP range (e.g. "bytes=5-4" | ||
| # for start=5, end=5), causing the server to return the whole file instead of nothing. | ||
| if start is not None and end is not None and start >= end >= 0: | ||
| return b"" | ||
|
|
||
| u2 = self.url(path) | ||
| # 'if start or end' fails when start=0 or end=0 because 0 is Falsey. | ||
| if start is not None or end is not None: | ||
| head = {"Range": await self._process_limits(path, start, end)} | ||
| else: | ||
| head = {} | ||
|
|
||
| headers, out = await self._call("GET", u2, headers=head) | ||
| return out | ||
|
|
||
| async def _cat_file_concurrent( | ||
| self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs | ||
| ): | ||
| """Concurrent fetch of file data""" | ||
| if start is None: | ||
| start = 0 | ||
| if end is None: | ||
| end = (await self._info(path))["size"] | ||
| if start >= end: | ||
| return b"" | ||
|
|
||
| if concurrency <= 1 or end - start < self.MIN_CHUNK_SIZE_FOR_CONCURRENCY: | ||
| return await self._cat_file_sequential(path, start=start, end=end, **kwargs) | ||
|
|
||
| total_size = end - start | ||
| part_size = total_size // concurrency | ||
| tasks = [] | ||
|
|
||
| for i in range(concurrency): | ||
| offset = start + (i * part_size) | ||
| actual_size = ( | ||
| part_size if i < concurrency - 1 else total_size - (i * part_size) | ||
| ) | ||
| tasks.append( | ||
| asyncio.create_task( | ||
| self._cat_file_sequential( | ||
| path, start=offset, end=offset + actual_size, **kwargs | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
| try: | ||
| results = await asyncio.gather(*tasks) | ||
| return b"".join(results) | ||
|
googlyrahman marked this conversation as resolved.
|
||
| except BaseException as e: | ||
| for t in tasks: | ||
| if not t.done(): | ||
| t.cancel() | ||
| await asyncio.gather(*tasks, return_exceptions=True) | ||
|
martindurant marked this conversation as resolved.
|
||
| raise e | ||
|
|
||
| async def _cat_file( | ||
| self, path, start=None, end=None, concurrency=DEFAULT_CONCURRENCY, **kwargs | ||
| ): | ||
| """Simple one-shot, or concurrent get of file data""" | ||
| if concurrency > 1: | ||
| return await self._cat_file_concurrent( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should work for concurrency==1 too, instead of having two separate method
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct. However, in a follow-up CL, the concurrent method will use zero-copy. Therefore, this call is necessary because _cat_file_concurrent will fetch data differently moving forward, and we want to avoid shifting our entire workload to that new path all at once.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why? Having one code path to maintain should be better, unless you anticipate some problem. Furthermore,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I don't have any issue pointing this to the same code path, and I do not anticipate any problems with the new implementation. However, from an organizational standpoint, we need to keep this feature strictly behind a flag to ensure the merge has no immediate impact, and hence want to keep these changes isolated so that when we later introduce zero-copy to the concurrent path, users who opt out of the flag will still safely default to the old behavior. Once we make the new behavior the default, we will consolidate the code and remove this method. I have already added a comment in the code for the same. |
||
| path, start=start, end=end, concurrency=concurrency, **kwargs | ||
| ) | ||
|
|
||
| # While we could just call _cat_file_concurrent(concurrency=1), we are choosing | ||
| # to keep it separate because concurrency code path is still in an experimental phase. | ||
| # Once concurrency code path is stabilized, we can remove this if-else condition. | ||
| return await self._cat_file_sequential(path, start=start, end=end, **kwargs) | ||
|
|
||
| async def _getxattr(self, path, attr): | ||
| """Get user-defined metadata attribute""" | ||
| meta = (await self._info(path)).get("metadata", {}) | ||
|
|
@@ -2005,6 +2063,7 @@ def __init__( | |
| if not key: | ||
| raise OSError("Attempt to open a bucket") | ||
| self.generation = _coalesce_generation(generation, path_generation) | ||
| self.concurrency = kwargs.get("concurrency", DEFAULT_CONCURRENCY) | ||
| super().__init__( | ||
| gcsfs, | ||
| path, | ||
|
|
@@ -2021,6 +2080,34 @@ def __init__( | |
| self.acl = acl | ||
| self.consistency = consistency | ||
| self.checker = get_consistency_checker(consistency) | ||
|
|
||
| # Ideally, all of these fields should be part of `cache_options`. Because current | ||
| # `fsspec` caches do not accept arbitrary `*args` and `**kwargs`, passing them | ||
| # there currently causes instantiation errors. We are holding off on introducing | ||
| # them as explicit keyword arguments to ensure existing user workloads are not | ||
| # disrupted. This will be refactored once the upstream `fsspec` changes are merged. | ||
| use_prefetch_reader = kwargs.get( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's only do env variable for flag and not kwargs
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree! This means when we want to surface the arguments, we'll have to support both and decide precedence.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1, I would also like to keep the argument & environment variable so that users can disable the prefetch engine if they would like to disable. |
||
| "use_experimental_adaptive_prefetching", False | ||
| ) or os.environ.get( | ||
| "USE_EXPERIMENTAL_ADAPTIVE_PREFETCHING", "false" | ||
| ).lower() in ( | ||
| "true", | ||
|
googlyrahman marked this conversation as resolved.
|
||
| "1", | ||
| ) | ||
|
|
||
| if "r" in mode and use_prefetch_reader: | ||
| max_prefetch_size = kwargs.get("max_prefetch_size", MAX_PREFETCH_SIZE) | ||
| from .prefetcher import BackgroundPrefetcher | ||
|
|
||
| self._prefetch_engine = BackgroundPrefetcher( | ||
| self._async_fetch_range, | ||
| self.size, | ||
| max_prefetch_size=max_prefetch_size, | ||
| concurrency=self.concurrency, | ||
| ) | ||
| else: | ||
| self._prefetch_engine = None | ||
|
|
||
| # _supports_append is an internal argument not meant to be used directly. | ||
| # If True, allows opening file in append mode. This is generally not supported | ||
| # by GCS, but may be supported by subclasses (e.g. ZonalFile). This flag should | ||
|
|
@@ -2203,12 +2290,30 @@ def _fetch_range(self, start=None, end=None): | |
| if not both None, fetch only given range | ||
| """ | ||
| try: | ||
| return self.gcsfs.cat_file(self.path, start=start, end=end) | ||
| if hasattr(self, "_prefetch_engine") and self._prefetch_engine: | ||
| return self._prefetch_engine._fetch(start=start, end=end) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really don't see why you would have a standard fsspec cacher overlayed on the prefetcher. The only one it might work with is "readhead", but actually the prefetches does all of that functionality and more, no?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The primary reason to position the prefetcher below cache rather than integrating it directly is to allow other caches to benefit from the prefetched data. Furthermore, we will always retain the ability to enable or disable the prefetch logic as needed. This approach mirrors standard OS kernel architecture, which maintains the page cache (which can be bypassed) and the read-ahead prefetching mechanism as distinct, decoupled entities.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well we can do some experiments if you like, read patterns like: but I strongly suspect that the first one would behave just like readahead (but better because of prefetching) and the second would be better with type "first" and the prefetcher doesn't help at all.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're exactly right that the Regarding the 'frequent visits home' pattern specifically: identifying and serving repeatedly accessed blocks (like block 1) is entirely the job of a cache, not a prefetcher. This pattern is actually a perfect example of why decoupling the two is so valuable. Layering them allows the cache to handle the repetitive hits, while the prefetcher handles the sequential scans. |
||
| return self.fs.cat_file( | ||
| self.path, start=start, end=end, concurrency=self.concurrency | ||
| ) | ||
| except RuntimeError as e: | ||
| if "not satisfiable" in str(e): | ||
| return b"" | ||
| raise | ||
|
|
||
| async def _async_fetch_range(self, start_offset, total_size, split_factor=1): | ||
| """Async fetcher mapped to the Prefetcher engine for regional buckets.""" | ||
| return await self.gcsfs._cat_file_concurrent( | ||
| self.path, | ||
| start=start_offset, | ||
| end=start_offset + total_size, | ||
| concurrency=split_factor, | ||
| ) | ||
|
|
||
| def close(self): | ||
| super().close() | ||
| if hasattr(self, "_prefetch_engine") and self._prefetch_engine: | ||
| self._prefetch_engine.close() | ||
|
|
||
|
|
||
| def _convert_fixed_key_metadata(metadata, *, from_google=False): | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just gather(*[...]); I don't think you need to write out the loop. Also, you don't need create_task(), gather() does that automatically if given coroutines.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason to keep asyncio.create_task() is because we need explicit Task objects to manually cancel them in the except block if a failure occurs. If we were on Python 3.11+, we could definitely drop this and use asyncio.TaskGroup to handle the cancellation automatically, but gather doesn't do that natively.
I'm also going to stick with the explicit for loop. Packing the start/end offset calculations into a list comprehension makes that block too dense, so the explicit loop is necessary here for readability.
The code if i remove the loop