diff --git a/cloudbuild/run_tests.sh b/cloudbuild/run_tests.sh index 9b13fe7c..a5682d43 100644 --- a/cloudbuild/run_tests.sh +++ b/cloudbuild/run_tests.sh @@ -29,6 +29,7 @@ case "$TEST_SUITE" in "standard") export GCSFS_TEST_BUCKET="gcsfs-test-standard-${SHORT_BUILD_ID}" export GCSFS_TEST_VERSIONED_BUCKET="gcsfs-test-versioned-${SHORT_BUILD_ID}" + export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='false' pytest "${ARGS[@]}" gcsfs/ --deselect gcsfs/tests/test_core.py::test_sign ;; @@ -37,7 +38,6 @@ case "$TEST_SUITE" in export GCSFS_ZONAL_TEST_BUCKET="gcsfs-test-zonal-${SHORT_BUILD_ID}" export GCSFS_HNS_TEST_BUCKET="gcsfs-test-zonal-${SHORT_BUILD_ID}" ulimit -n 4096 - export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' pytest "${ARGS[@]}" \ gcsfs/tests/test_extended_gcsfs.py \ gcsfs/tests/test_zonal_file.py \ @@ -49,7 +49,6 @@ case "$TEST_SUITE" in export GCSFS_TEST_BUCKET="gcsfs-test-hns-${SHORT_BUILD_ID}" export GCSFS_ZONAL_TEST_BUCKET="gcsfs-test-hns-${SHORT_BUILD_ID}" export GCSFS_HNS_TEST_BUCKET="gcsfs-test-hns-${SHORT_BUILD_ID}" - export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' # Excludes tests that are not applicable to HNS buckets: # - test_extended_gcsfs.py, test_zonal_file.py: Zonal bucket specific tests which won't work on HNS bucket. # - test_extended_gcsfs_unit.py: Unit tests for zonal bucket features. @@ -69,7 +68,6 @@ case "$TEST_SUITE" in "zonal-core") export GCSFS_TEST_BUCKET="gcsfs-test-zonal-core-${SHORT_BUILD_ID}" - export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' ulimit -n 4096 # Zonal Core Deselections @@ -109,6 +107,7 @@ case "$TEST_SUITE" in # - test_array fails due to CRC32C TypeError with array objects. # - test_sign fails because it requires a private key # - test_mv_file_cache: Integration test only applicable for regional buckets. + # - test_write_x_mpu fails because zonal files do not support x mode. # - test_rm_wildcards_non_recursive: HNS buckets have different behavior for non-recursive wildcard deletion. ZONAL_DESELECTS+=( "--deselect=gcsfs/tests/test_core.py::test_flush" @@ -118,6 +117,7 @@ case "$TEST_SUITE" in "--deselect=gcsfs/tests/test_core.py::test_array" "--deselect=gcsfs/tests/test_core.py::test_sign" "--deselect=gcsfs/tests/test_core.py::test_mv_file_cache" + "--deselect=gcsfs/tests/test_core.py::test_write_x_mpu" "--deselect=gcsfs/tests/test_core.py::test_rm_wildcards_non_recursive" ) diff --git a/gcsfs/core.py b/gcsfs/core.py index 56d21048..9a7116df 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -14,6 +14,7 @@ import warnings import weakref from datetime import datetime, timedelta +from glob import has_magic from urllib.parse import parse_qs from urllib.parse import quote as quote_urllib from urllib.parse import urlsplit @@ -23,7 +24,7 @@ from fsspec import asyn from fsspec.callbacks import NoOpCallback from fsspec.implementations.http import get_client -from fsspec.utils import setup_logging, stringify_path +from fsspec.utils import other_paths, setup_logging, stringify_path from . import __version__ as version from .checkers import get_consistency_checker @@ -1280,16 +1281,70 @@ async def _merge(self, path, paths, acl=None): merge = asyn.sync_wrapper(_merge) - # mv method is already available as sync method in the fsspec.py - # Async version of it is introduced here so that mv can be used in async methods. # TODO: Add async mv method in the async.py and remove from GCSFileSystem. - async def _mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): + async def _mv( + self, path1, path2, recursive=False, maxdepth=None, batch_size=None, **kwargs + ): if path1 == path2: return - # TODO: Pass on_error parameter after copy method handles FileNotFoundError - # for folders when recursive is set to true. - await self._copy(path1, path2, recursive=recursive, maxdepth=maxdepth) - await self._rm(path1, recursive=recursive) + + if isinstance(path1, list) and isinstance(path2, list): + # No need to expand paths when both source and destination + # are provided as lists + paths1 = path1 + paths2 = path2 + else: + source_is_str = isinstance(path1, str) + paths1 = await self._expand_path( + path1, maxdepth=maxdepth, recursive=recursive + ) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not move directories + paths1 = [ + p + for p in paths1 + if not (asyn.trailing_sep(p) or await self._isdir(p)) + ] + if not paths1: + return + + source_is_file = len(paths1) == 1 + dest_is_dir = isinstance(path2, str) and ( + asyn.trailing_sep(path2) or await self._isdir(path2) + ) + + exists = source_is_str and ( + (has_magic(path1) and source_is_file) + or ( + not has_magic(path1) + and dest_is_dir + and not asyn.trailing_sep(path1) + ) + ) + paths2 = other_paths( + paths1, + path2, + exists=exists, + flatten=not source_is_str, + ) + + batch_size = batch_size or self.batch_size + result = await asyn._run_coros_in_chunks( + [self._mv_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)], + batch_size=batch_size, + return_exceptions=True, + nofiles=True, + ) + + for res, p1 in zip(result, paths1): + if isinstance(res, Exception): + if isinstance(res, FileNotFoundError) and recursive: + # Ignore FileNotFoundError for implicit directories returned by _expand_path. + if any(p.startswith(p1.rstrip("/") + "/") for p in paths1): + continue + raise res + + mv = asyn.sync_wrapper(_mv) async def _cp_file(self, path1, path2, acl=None, **kwargs): """Duplicate remote file""" @@ -1353,6 +1408,9 @@ async def _mv_file(self, path1, path2, **kwargs): ) await self._mv_file_cache_update(path1, path2, out) return + except FileNotFoundError: + # Raise immediately because fallback will also fail when file is not found. + raise except Exception as e: # TODO: Fallback is added to make sure there is smooth transition, it can be removed # once we have metrics proving that moveTo API is working properly for all bucket types. diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index 147a0e16..c3a11964 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -460,6 +460,13 @@ async def _mv(self, path1, path2, **kwargs): ) return + if ( + isinstance(path1, list) + or isinstance(path2, list) + or (isinstance(path1, str) and has_magic(path1)) + ): + return await super()._mv(path1, path2, **kwargs) + bucket1, key1, _ = self.split_path(path1) bucket2, key2, _ = self.split_path(path2) diff --git a/gcsfs/tests/perf/microbenchmarks/conftest.py b/gcsfs/tests/perf/microbenchmarks/conftest.py index 514e19c5..7bdbfb7a 100644 --- a/gcsfs/tests/perf/microbenchmarks/conftest.py +++ b/gcsfs/tests/perf/microbenchmarks/conftest.py @@ -261,7 +261,11 @@ def gcsfs_benchmark_rename(extended_gcs_factory, request): """ params = request.param yield from _benchmark_listing_fixture_helper( - extended_gcs_factory, params, "benchmark-rename", teardown=True + extended_gcs_factory, + params, + "benchmark-rename", + teardown=True, + require_file_paths=True, ) diff --git a/gcsfs/tests/perf/microbenchmarks/rename/configs.yaml b/gcsfs/tests/perf/microbenchmarks/rename/configs.yaml index 29acdd61..5f9c2977 100644 --- a/gcsfs/tests/perf/microbenchmarks/rename/configs.yaml +++ b/gcsfs/tests/perf/microbenchmarks/rename/configs.yaml @@ -17,3 +17,7 @@ scenarios: depth: 24 folders: [256] files: [65536, 131072] + + - name: "rename_files" + folders: [10] + files: [100] diff --git a/gcsfs/tests/perf/microbenchmarks/rename/test_rename.py b/gcsfs/tests/perf/microbenchmarks/rename/test_rename.py index 3dd5520f..5275328f 100644 --- a/gcsfs/tests/perf/microbenchmarks/rename/test_rename.py +++ b/gcsfs/tests/perf/microbenchmarks/rename/test_rename.py @@ -19,18 +19,34 @@ def _rename_op(gcs, src, dst): logging.info(f"RENAME : {src} -> {dst} - {duration_ms:.2f} ms.") +def _rename_files_sequential_op(gcs, file_paths): + start_time = time.perf_counter() + for src in file_paths: + dst = src + "_renamed" + gcs.rename(src, dst) + duration_ms = (time.perf_counter() - start_time) * 1000 + num_files = len(file_paths) + logging.info(f"RENAME FILES SEQ ({num_files} files) - {duration_ms:.2f} ms.") + + all_benchmark_cases = get_rename_benchmark_cases() single_threaded_cases, _, _ = filter_test_cases(all_benchmark_cases) +single_threaded_directory_cases = [ + c for c in single_threaded_cases if "rename_files" not in c.name +] +single_threaded_file_cases = [ + c for c in single_threaded_cases if "rename_files" in c.name +] @pytest.mark.parametrize( "gcsfs_benchmark_rename", - single_threaded_cases, + single_threaded_directory_cases, indirect=True, ids=lambda p: p.name, ) def test_rename_recursive(benchmark, gcsfs_benchmark_rename, monitor): - gcs, _, prefix, params = gcsfs_benchmark_rename + gcs, _, _, prefix, params = gcsfs_benchmark_rename prefix_renamed = f"{prefix}_renamed" run_single_threaded( @@ -41,3 +57,22 @@ def test_rename_recursive(benchmark, gcsfs_benchmark_rename, monitor): (gcs, prefix, prefix_renamed), BENCHMARK_GROUP, ) + + +@pytest.mark.parametrize( + "gcsfs_benchmark_rename", + single_threaded_file_cases, + indirect=True, + ids=lambda p: p.name, +) +def test_rename_files_sequential(benchmark, gcsfs_benchmark_rename, monitor): + gcs, _, file_paths, prefix, params = gcsfs_benchmark_rename + + run_single_threaded( + benchmark, + monitor, + params, + _rename_files_sequential_op, + (gcs, file_paths), + BENCHMARK_GROUP, + ) diff --git a/gcsfs/tests/test_core.py b/gcsfs/tests/test_core.py index 23ffa128..5c52cca2 100644 --- a/gcsfs/tests/test_core.py +++ b/gcsfs/tests/test_core.py @@ -785,12 +785,35 @@ def test_copy_errors(gcs): ] +def test_move_src_dst_equal(gcs): + path = TEST_BUCKET + "/file1" + with mock.patch.object( + gcs, "_mv_file", new_callable=mock.AsyncMock + ) as mock_mv_file: + gcs.mv(path, path) + mock_mv_file.assert_not_awaited() + + def test_move(gcs): fn = TEST_BUCKET + "/test/accounts.1.json" - data = gcs.cat(fn) - gcs.mv(fn, fn + "2") - assert gcs.cat(fn + "2") == data - assert not gcs.exists(fn) + + if gcs.on_google: + data = gcs.cat(fn) + gcs.mv(fn, fn + "2") + assert gcs.cat(fn + "2") == data + assert not gcs.exists(fn) + else: + with ( + mock.patch.object( + gcs, "_expand_path", new_callable=mock.AsyncMock + ) as mock_expand, + mock.patch.object( + gcs, "_mv_file", new_callable=mock.AsyncMock + ) as mock_mv_file, + ): + mock_expand.return_value = [fn] + gcs.mv(fn, fn + "2") + mock_mv_file.assert_awaited_once_with(fn, fn + "2") def test_move_recursive_no_slash(gcs): @@ -798,9 +821,32 @@ def test_move_recursive_no_slash(gcs): dir_from = TEST_BUCKET + "/nested" dir_to = TEST_BUCKET + "/new_name" - gcs.mv(dir_from, dir_to, recursive=True) - assert not gcs.exists(dir_from) - assert gcs.ls(dir_to) == [dir_to + "/file1", dir_to + "/file2", dir_to + "/nested2"] + if gcs.on_google: + gcs.mv(dir_from, dir_to, recursive=True) + assert not gcs.exists(dir_from) + assert gcs.ls(dir_to) == [ + dir_to + "/file1", + dir_to + "/file2", + dir_to + "/nested2", + ] + else: + with ( + mock.patch.object( + gcs, "_expand_path", new_callable=mock.AsyncMock + ) as mock_expand, + mock.patch.object(gcs, "_isdir", new_callable=mock.AsyncMock) as mock_isdir, + mock.patch.object( + gcs, "_mv_file", new_callable=mock.AsyncMock + ) as mock_mv_file, + ): + mock_expand.return_value = [dir_from + "/file1", dir_from + "/file2"] + mock_isdir.return_value = False + + gcs.mv(dir_from, dir_to, recursive=True) + + assert mock_mv_file.call_count == 2 + mock_mv_file.assert_any_await(dir_from + "/file1", dir_to + "/file1") + mock_mv_file.assert_any_await(dir_from + "/file2", dir_to + "/file2") def test_move_recursive_with_slash(gcs): @@ -808,24 +854,136 @@ def test_move_recursive_with_slash(gcs): dir_from = TEST_BUCKET + "/nested/" dir_to = TEST_BUCKET + "/new_name_with_slash" - gcs.mv(dir_from, dir_to, recursive=True) - assert not gcs.exists(dir_from.rstrip("/")) - assert gcs.ls(dir_to) == [dir_to + "/file1", dir_to + "/file2", dir_to + "/nested2"] + if gcs.on_google: + gcs.mv(dir_from, dir_to, recursive=True) + assert not gcs.exists(dir_from.rstrip("/")) + assert gcs.ls(dir_to) == [ + dir_to + "/file1", + dir_to + "/file2", + dir_to + "/nested2", + ] + else: + with ( + mock.patch.object( + gcs, "_expand_path", new_callable=mock.AsyncMock + ) as mock_expand, + mock.patch.object(gcs, "_isdir", new_callable=mock.AsyncMock) as mock_isdir, + mock.patch.object( + gcs, "_mv_file", new_callable=mock.AsyncMock + ) as mock_mv_file, + ): + mock_expand.return_value = [dir_from + "file1", dir_from + "file2"] + mock_isdir.return_value = False + + gcs.mv(dir_from, dir_to, recursive=True) + + assert mock_mv_file.call_count == 2 + mock_mv_file.assert_any_await(dir_from + "file1", dir_to + "/file1") + mock_mv_file.assert_any_await(dir_from + "file2", dir_to + "/file2") + + +def test_move_list_to_dir(gcs): + fn1 = TEST_BUCKET + "/test/accounts.1.json" + fn2 = TEST_BUCKET + "/test/accounts.2.json" + + if gcs.on_google: + data1 = gcs.cat(fn1) + data2 = gcs.cat(fn2) + + gcs.mv([fn1, fn2], TEST_BUCKET + "/test2/") + + assert gcs.cat(TEST_BUCKET + "/test2/accounts.1.json") == data1 + assert gcs.cat(TEST_BUCKET + "/test2/accounts.2.json") == data2 + assert not gcs.exists(fn1) + assert not gcs.exists(fn2) + else: + with mock.patch.object( + gcs, "_mv_file", new_callable=mock.AsyncMock + ) as mock_mv_file: + gcs.mv([fn1, fn2], TEST_BUCKET + "/test2/") + + assert mock_mv_file.call_count == 2 + mock_mv_file.assert_any_await(fn1, TEST_BUCKET + "/test2/accounts.1.json") + mock_mv_file.assert_any_await(fn2, TEST_BUCKET + "/test2/accounts.2.json") + + +def test_move_list_to_list(gcs): + fn1 = TEST_BUCKET + "/test/accounts.1.json" + fn2 = TEST_BUCKET + "/test/accounts.2.json" + + if gcs.on_google: + data1 = gcs.cat(fn1) + data2 = gcs.cat(fn2) + + gcs.mv([fn1, fn2], [fn1 + "2", fn2 + "2"]) + + assert gcs.cat(fn1 + "2") == data1 + assert gcs.cat(fn2 + "2") == data2 + assert not gcs.exists(fn1) + assert not gcs.exists(fn2) + else: + with mock.patch.object( + gcs, "_mv_file", new_callable=mock.AsyncMock + ) as mock_mv_file: + gcs.mv([fn1, fn2], [fn1 + "2", fn2 + "2"]) + + assert mock_mv_file.call_count == 2 + mock_mv_file.assert_any_await(fn1, fn1 + "2") + mock_mv_file.assert_any_await(fn2, fn2 + "2") + + +def test_move_implicit_directories_ignore_filenotfound(gcs): + paths1 = [TEST_BUCKET + "/dir/", TEST_BUCKET + "/dir/file1"] + paths2 = [TEST_BUCKET + "/dest/", TEST_BUCKET + "/dest/file1"] + + with mock.patch.object( + gcs, "_mv_file", new_callable=mock.AsyncMock + ) as mock_mv_file: + mock_mv_file.side_effect = [FileNotFoundError("dir not found"), None] + gcs.mv(paths1, paths2, recursive=True) + assert mock_mv_file.call_count == 2 + + +@pytest.mark.asyncio +async def test_move_non_recursive_empty_after_filter(gcs): + path1 = TEST_BUCKET + "/dir" + path2 = TEST_BUCKET + "/dest" + + with ( + mock.patch.object( + gcs, "_expand_path", new_callable=mock.AsyncMock + ) as mock_expand, + mock.patch.object(gcs, "_isdir", new_callable=mock.AsyncMock) as mock_isdir, + mock.patch.object(gcs, "_mv_file", new_callable=mock.AsyncMock) as mock_mv_file, + ): + mock_expand.return_value = [TEST_BUCKET + "/dir/"] + mock_isdir.return_value = True + + await gcs._mv(path1, path2, recursive=False) + + mock_mv_file.assert_not_awaited() def test_mv_file(gcs): - if not gcs.on_google: - pytest.skip("emulator does not support moveTo") fn = TEST_BUCKET + "/test/accounts.1.json" - data = gcs.cat(fn) - gcs.mv_file(fn, fn + "2") - assert gcs.cat(fn + "2") == data - assert not gcs.exists(fn) + + if gcs.on_google: + data = gcs.cat(fn) + gcs.mv_file(fn, fn + "2") + assert gcs.cat(fn + "2") == data + assert not gcs.exists(fn) + else: + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.return_value = { + "kind": "storage#object", + "bucket": TEST_BUCKET, + "name": "test/accounts.1.json2", + } + gcs.mv_file(fn, fn + "2") + mock_call.assert_awaited_once() def test_mv_file_cache(gcs): - if not gcs.on_google: - pytest.skip("emulator does not support moveTo") fn = TEST_BUCKET + "/test/accounts.1.json" fn2 = TEST_BUCKET + "/nested/accounts.1.json" parent = TEST_BUCKET + "/test" @@ -834,10 +992,24 @@ def test_mv_file_cache(gcs): gcs.ls(parent2) assert parent in gcs.dircache assert parent2 in gcs.dircache - gcs.mv_file(fn, fn2) - assert parent not in gcs.dircache - assert parent2 not in gcs.dircache - assert fn2 in gcs.ls(parent2) + + if gcs.on_google: + gcs.mv_file(fn, fn2) + + assert parent not in gcs.dircache + assert parent2 not in gcs.dircache + assert fn2 in gcs.ls(parent2) + else: + with mock.patch.object(gcs, "_call", new_callable=mock.AsyncMock) as mock_call: + mock_call.return_value = { + "kind": "storage#object", + "bucket": TEST_BUCKET, + "name": "nested/accounts.1.json", + } + gcs.mv_file(fn, fn2) + + assert parent not in gcs.dircache + assert parent2 not in gcs.dircache def test_mv_file_calls_move_to(gcs): @@ -2084,8 +2256,9 @@ def test_sign(gcs, monkeypatch): assert response.text == "This is a test string" -@pytest.mark.xfail(reason="emulator does not support condition") def test_write_x_mpu(gcs): + if not gcs.on_google: + pytest.skip("emulator does not support condition") fn = TEST_BUCKET + "/test.file" with gcs.open(fn, mode="xb", block_size=5 * 2**20) as f: assert f.mode == "xb" diff --git a/gcsfs/tests/test_extended_hns_gcsfs.py b/gcsfs/tests/test_extended_hns_gcsfs.py index 7bddef96..6993619b 100644 --- a/gcsfs/tests/test_extended_hns_gcsfs.py +++ b/gcsfs/tests/test_extended_hns_gcsfs.py @@ -474,6 +474,34 @@ def test_mv_same_path_is_noop(self, gcs_hns, gcs_hns_mocks): mocks["control_client"].rename_folder.assert_not_called() mocks["super_mv"].assert_not_called() + def test_mv_list_fallback_to_super_mv(self, gcs_hns, gcs_hns_mocks): + """Test that mv falls back to super_mv if path1 or path2 is a list.""" + gcsfs = gcs_hns + path1 = [f"{TEST_HNS_BUCKET}/file1.txt", f"{TEST_HNS_BUCKET}/file2.txt"] + path2 = f"{TEST_HNS_BUCKET}/new_dir/" + + with gcs_hns_mocks(BucketType.HIERARCHICAL, gcsfs) as mocks: + gcsfs.mv(path1, path2) + + mocks["async_lookup_bucket_type"].assert_not_called() + mocks["info"].assert_not_called() + mocks["control_client"].rename_folder.assert_not_called() + mocks["super_mv"].assert_called_once_with(path1, path2) + + def test_mv_glob_fallback_to_super_mv(self, gcs_hns, gcs_hns_mocks): + """Test that mv falls back to super_mv if path1 contains glob magic characters.""" + gcsfs = gcs_hns + path1 = f"{TEST_HNS_BUCKET}/*.txt" + path2 = f"{TEST_HNS_BUCKET}/new_dir/" + + with gcs_hns_mocks(BucketType.HIERARCHICAL, gcsfs) as mocks: + gcsfs.mv(path1, path2) + + mocks["async_lookup_bucket_type"].assert_not_called() + mocks["info"].assert_not_called() + mocks["control_client"].rename_folder.assert_not_called() + mocks["super_mv"].assert_called_once_with(path1, path2) + def test_hns_rename_fails_if_parent_dne(self, gcs_hns, gcs_hns_mocks): """Test that HNS rename fails if the destination's parent does not exist.""" gcsfs = gcs_hns