Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cloudbuild/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ case "$TEST_SUITE" in
"standard")
export GCSFS_TEST_BUCKET="gcsfs-test-standard-${SHORT_BUILD_ID}"
export GCSFS_TEST_VERSIONED_BUCKET="gcsfs-test-versioned-${SHORT_BUILD_ID}"
export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='false'
pytest "${ARGS[@]}" gcsfs/ --deselect gcsfs/tests/test_core.py::test_sign
;;

Expand All @@ -37,7 +38,6 @@ case "$TEST_SUITE" in
export GCSFS_ZONAL_TEST_BUCKET="gcsfs-test-zonal-${SHORT_BUILD_ID}"
export GCSFS_HNS_TEST_BUCKET="gcsfs-test-zonal-${SHORT_BUILD_ID}"
ulimit -n 4096
export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true'
pytest "${ARGS[@]}" \
gcsfs/tests/test_extended_gcsfs.py \
gcsfs/tests/test_zonal_file.py \
Expand All @@ -49,7 +49,6 @@ case "$TEST_SUITE" in
export GCSFS_TEST_BUCKET="gcsfs-test-hns-${SHORT_BUILD_ID}"
export GCSFS_ZONAL_TEST_BUCKET="gcsfs-test-hns-${SHORT_BUILD_ID}"
export GCSFS_HNS_TEST_BUCKET="gcsfs-test-hns-${SHORT_BUILD_ID}"
export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true'
# Excludes tests that are not applicable to HNS buckets:
# - test_extended_gcsfs.py, test_zonal_file.py: Zonal bucket specific tests which won't work on HNS bucket.
# - test_extended_gcsfs_unit.py: Unit tests for zonal bucket features.
Expand All @@ -69,7 +68,6 @@ case "$TEST_SUITE" in

"zonal-core")
export GCSFS_TEST_BUCKET="gcsfs-test-zonal-core-${SHORT_BUILD_ID}"
export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true'
ulimit -n 4096

# Zonal Core Deselections
Expand Down Expand Up @@ -109,6 +107,7 @@ case "$TEST_SUITE" in
# - test_array fails due to CRC32C TypeError with array objects.
# - test_sign fails because it requires a private key
# - test_mv_file_cache: Integration test only applicable for regional buckets.
# - test_write_x_mpu fails because zonal files do not support x mode.
# - test_rm_wildcards_non_recursive: HNS buckets have different behavior for non-recursive wildcard deletion.
ZONAL_DESELECTS+=(
"--deselect=gcsfs/tests/test_core.py::test_flush"
Expand All @@ -118,6 +117,7 @@ case "$TEST_SUITE" in
"--deselect=gcsfs/tests/test_core.py::test_array"
"--deselect=gcsfs/tests/test_core.py::test_sign"
"--deselect=gcsfs/tests/test_core.py::test_mv_file_cache"
"--deselect=gcsfs/tests/test_core.py::test_write_x_mpu"
"--deselect=gcsfs/tests/test_core.py::test_rm_wildcards_non_recursive"
)

Expand Down
74 changes: 66 additions & 8 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import warnings
import weakref
from datetime import datetime, timedelta
from glob import has_magic
from urllib.parse import parse_qs
from urllib.parse import quote as quote_urllib
from urllib.parse import urlsplit
Expand All @@ -23,7 +24,7 @@
from fsspec import asyn
from fsspec.callbacks import NoOpCallback
from fsspec.implementations.http import get_client
from fsspec.utils import setup_logging, stringify_path
from fsspec.utils import other_paths, setup_logging, stringify_path

from . import __version__ as version
from .checkers import get_consistency_checker
Expand Down Expand Up @@ -1280,16 +1281,70 @@ async def _merge(self, path, paths, acl=None):

merge = asyn.sync_wrapper(_merge)

# mv method is already available as sync method in the fsspec.py
# Async version of it is introduced here so that mv can be used in async methods.
# TODO: Add async mv method in the async.py and remove from GCSFileSystem.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this ToDo now, as this is GCS specific custom implementation, we can't copy this method to async.py. We will have to keep it in GCSFileSystem only.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the implementation looks generic - but no need to take any action about it now. In fact, I don't expect too much of a performance gain (but maybe some) - we just need to make sure it passes the abstract tests for mv.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh yes agree that it is generic, I missed that moveTo API is abstracted by mv_file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider moving the implementation from GCSFileSystem to fsspec? We might also see performance gains if other fsspec implementations also include atomic mv_file like we see in case of GCSFS.

There is also a significant code duplication in mv method from other fsspec methods like copy which can be extracted into a helper.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good idea. But the default implementation of mv_file of AsyncFileSystem isn't atomic. If we use mv_file for mv by default, there may be performance drop for some fsspec implementations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing mv implementation in AbstractFileSystem is also not atomic, it relies on a copy+delete logic. Therefore IMO changing from copy+delete to mv_file (which is also not atomic in AsyncFileSystem) should not have any performance implications.

@martindurant Let us know your thoughts about this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not move it right now, but open an issue at fsspec about a future intent to do so. I don't believe there are any existing implementations of _mv in subclasses of Async, but we should spend the time to check for sure.

async def _mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
async def _mv(
self, path1, path2, recursive=False, maxdepth=None, batch_size=None, **kwargs
):
if path1 == path2:
return
# TODO: Pass on_error parameter after copy method handles FileNotFoundError
# for folders when recursive is set to true.
await self._copy(path1, path2, recursive=recursive, maxdepth=maxdepth)
await self._rm(path1, recursive=recursive)

if isinstance(path1, list) and isinstance(path2, list):
# No need to expand paths when both source and destination
# are provided as lists
paths1 = path1
paths2 = path2
else:
source_is_str = isinstance(path1, str)
paths1 = await self._expand_path(
path1, maxdepth=maxdepth, recursive=recursive
)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not move directories
paths1 = [
p
for p in paths1
if not (asyn.trailing_sep(p) or await self._isdir(p))
]
if not paths1:
return

source_is_file = len(paths1) == 1
dest_is_dir = isinstance(path2, str) and (
asyn.trailing_sep(path2) or await self._isdir(path2)
)

exists = source_is_str and (
(has_magic(path1) and source_is_file)
or (
not has_magic(path1)
and dest_is_dir
and not asyn.trailing_sep(path1)
)
)
paths2 = other_paths(
paths1,
path2,
exists=exists,
flatten=not source_is_str,
)

batch_size = batch_size or self.batch_size
result = await asyn._run_coros_in_chunks(
[self._mv_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)],
batch_size=batch_size,
return_exceptions=True,
nofiles=True,
)

for res, p1 in zip(result, paths1):
if isinstance(res, Exception):
if isinstance(res, FileNotFoundError) and recursive:
# Ignore FileNotFoundError for implicit directories returned by _expand_path.
if any(p.startswith(p1.rstrip("/") + "/") for p in paths1):
continue
raise res

mv = asyn.sync_wrapper(_mv)

async def _cp_file(self, path1, path2, acl=None, **kwargs):
"""Duplicate remote file"""
Expand Down Expand Up @@ -1353,6 +1408,9 @@ async def _mv_file(self, path1, path2, **kwargs):
)
await self._mv_file_cache_update(path1, path2, out)
return
except FileNotFoundError:
# Raise immediately because fallback will also fail when file is not found.
raise
except Exception as e:
# TODO: Fallback is added to make sure there is smooth transition, it can be removed
# once we have metrics proving that moveTo API is working properly for all bucket types.
Expand Down
7 changes: 7 additions & 0 deletions gcsfs/extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ async def _mv(self, path1, path2, **kwargs):
)
return

if (
isinstance(path1, list)
or isinstance(path2, list)
or (isinstance(path1, str) and has_magic(path1))
):
return await super()._mv(path1, path2, **kwargs)

bucket1, key1, _ = self.split_path(path1)
bucket2, key2, _ = self.split_path(path2)

Expand Down
6 changes: 5 additions & 1 deletion gcsfs/tests/perf/microbenchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ def gcsfs_benchmark_rename(extended_gcs_factory, request):
"""
params = request.param
yield from _benchmark_listing_fixture_helper(
extended_gcs_factory, params, "benchmark-rename", teardown=True
extended_gcs_factory,
params,
"benchmark-rename",
teardown=True,
require_file_paths=True,
)


Expand Down
4 changes: 4 additions & 0 deletions gcsfs/tests/perf/microbenchmarks/rename/configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ scenarios:
depth: 24
folders: [256]
files: [65536, 131072]

- name: "rename_files"
folders: [10]
files: [100]
39 changes: 37 additions & 2 deletions gcsfs/tests/perf/microbenchmarks/rename/test_rename.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,34 @@ def _rename_op(gcs, src, dst):
logging.info(f"RENAME : {src} -> {dst} - {duration_ms:.2f} ms.")


def _rename_files_sequential_op(gcs, file_paths):
start_time = time.perf_counter()
for src in file_paths:
dst = src + "_renamed"
gcs.rename(src, dst)
duration_ms = (time.perf_counter() - start_time) * 1000
num_files = len(file_paths)
logging.info(f"RENAME FILES SEQ ({num_files} files) - {duration_ms:.2f} ms.")


all_benchmark_cases = get_rename_benchmark_cases()
single_threaded_cases, _, _ = filter_test_cases(all_benchmark_cases)
single_threaded_directory_cases = [
c for c in single_threaded_cases if "rename_files" not in c.name
]
single_threaded_file_cases = [
c for c in single_threaded_cases if "rename_files" in c.name
]


@pytest.mark.parametrize(
"gcsfs_benchmark_rename",
single_threaded_cases,
single_threaded_directory_cases,
indirect=True,
ids=lambda p: p.name,
)
def test_rename_recursive(benchmark, gcsfs_benchmark_rename, monitor):
gcs, _, prefix, params = gcsfs_benchmark_rename
gcs, _, _, prefix, params = gcsfs_benchmark_rename
prefix_renamed = f"{prefix}_renamed"

run_single_threaded(
Expand All @@ -41,3 +57,22 @@ def test_rename_recursive(benchmark, gcsfs_benchmark_rename, monitor):
(gcs, prefix, prefix_renamed),
BENCHMARK_GROUP,
)


@pytest.mark.parametrize(
"gcsfs_benchmark_rename",
single_threaded_file_cases,
indirect=True,
ids=lambda p: p.name,
)
def test_rename_files_sequential(benchmark, gcsfs_benchmark_rename, monitor):
gcs, _, file_paths, prefix, params = gcsfs_benchmark_rename

run_single_threaded(
benchmark,
monitor,
params,
_rename_files_sequential_op,
(gcs, file_paths),
BENCHMARK_GROUP,
)
Loading
Loading