diff --git a/docs/source/index.rst b/docs/source/index.rst index f80f27ef..5d015249 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -154,6 +154,67 @@ Python's standard `logging framework`_. .. _logging framework: https://docs.python.org/3/library/logging.html +Errors +------ + +The ``s3fs`` library includes a built-in mechanism to automatically retry +operations when specific transient errors occur. You can customize this behavior +by adding specific exception types or defining complex logic via custom handlers. + +Default Retryable Errors +~~~~~~~~~~~~~~~~~~~~~~~~ + +By default, ``s3fs`` will retry the following exception types: + +- ``socket.timeout`` +- ``HTTPClientError`` +- ``IncompleteRead`` +- ``FSTimeoutError`` +- ``ResponseParserError`` +- ``aiohttp.ClientPayloadError`` (if available) + +Registering Custom Error Types +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +To include additional exception types in the default retry logic, use the +``add_retryable_error`` function. This is useful for simple type-based retries. + +.. code-block:: python + + >>> class MyCustomError(Exception): + pass + >>> s3fs.add_retryable_error(MyCustomError) + +Implementing Custom Error Handlers +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For more complex scenarios, such as retrying based on an error message rather than +just the type, you can register a custom error handler using ``set_custom_error_handler``. + +The handler should be a callable that accepts an exception instance and returns ``True`` +if the error should be retried, or ``False`` otherwise. + +.. code-block:: python + + >>> def my_handler(e): + return isinstance(e, MyCustomError) and "some condition" in str(e) + >>> s3fs.set_custom_error_handler(my_handler) + +Handling AWS ClientErrors +~~~~~~~~~~~~~~~~~~~~~~~~~ + +``s3fs`` provides specialized handling for ``botocore.exceptions.ClientError``. +While ``s3fs`` checks these against internal patterns (like throttling), +you can extend this behavior using a custom handler. Note that the internal +patterns will still be checked and handled before the custom handler. + +.. code-block:: python + + >>> def another_handler(e): + return isinstance(e, ClientError) and "Throttling" in str(e) + >>> s3fs.set_custom_error_handler(another_handler) + + Credentials ----------- diff --git a/s3fs/__init__.py b/s3fs/__init__.py index 2267b817..b20d0218 100644 --- a/s3fs/__init__.py +++ b/s3fs/__init__.py @@ -1,4 +1,4 @@ -from .core import S3FileSystem, S3File +from .core import S3FileSystem, S3File, add_retryable_error, set_custom_error_handler from .mapping import S3Map from ._version import get_versions diff --git a/s3fs/core.py b/s3fs/core.py index 23b2fe60..a59351bf 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -73,6 +73,56 @@ def setup_logging(level=None): if ClientPayloadError is not None: S3_RETRYABLE_ERRORS += (ClientPayloadError,) + +def add_retryable_error(exc): + """ + Add an exception type to the list of retryable S3 errors. + + Parameters + ---------- + exc : Exception + The exception type to add to the retryable errors. + + Examples + ---------- + >>> class MyCustomError(Exception): # doctest: +SKIP + ... pass # doctest: +SKIP + >>> add_retryable_error(MyCustomError) # doctest: +SKIP + """ + global S3_RETRYABLE_ERRORS + S3_RETRYABLE_ERRORS += (exc,) + + +CUSTOM_ERROR_HANDLER = lambda _: False + + +def set_custom_error_handler(func): + """Set a custom error handler function for S3 retryable errors. + + The function should take an exception instance as its only argument, + and return True if the operation should be retried, or False otherwise. + This can also be used for custom behavior on `ClientError` exceptions, + such as retrying other patterns. + + Parameters + ---------- + func : callable[[Exception], bool] + The custom error handler function. + + Examples + ---------- + >>> def my_handler(e): # doctest: +SKIP + ... return isinstance(e, MyCustomError) and "some condition" in str(e) # doctest: +SKIP + >>> set_custom_error_handler(my_handler) # doctest: +SKIP + + >>> def another_handler(e): # doctest: +SKIP + ... return isinstance(e, ClientError) and "Throttling" in str(e)" # doctest: +SKIP + >>> set_custom_error_handler(another_handler) # doctest: +SKIP + """ + global CUSTOM_ERROR_HANDLER + CUSTOM_ERROR_HANDLER = func + + _VALID_FILE_MODES = {"r", "w", "a", "rb", "wb", "ab"} _PRESERVE_KWARGS = [ @@ -110,29 +160,46 @@ def setup_logging(level=None): async def _error_wrapper(func, *, args=(), kwargs=None, retries): if kwargs is None: kwargs = {} + err = None for i in range(retries): + wait_time = min(1.7**i * 0.1, 15) + try: return await func(*args, **kwargs) except S3_RETRYABLE_ERRORS as e: err = e logger.debug("Retryable error: %s", e) - await asyncio.sleep(min(1.7**i * 0.1, 15)) + await asyncio.sleep(wait_time) except ClientError as e: logger.debug("Client error (maybe retryable): %s", e) err = e - wait_time = min(1.7**i * 0.1, 15) - if "SlowDown" in str(e): - await asyncio.sleep(wait_time) - elif "reduce your request rate" in str(e): - await asyncio.sleep(wait_time) - elif "XAmzContentSHA256Mismatch" in str(e): + + matched = False + for pattern in [ + "SlowDown", + "reduce your request rate", + "XAmzContentSHA256Mismatch", + ]: + if pattern in str(e): + matched = True + break + + if matched: await asyncio.sleep(wait_time) else: - break + should_retry = CUSTOM_ERROR_HANDLER(e) + if should_retry: + await asyncio.sleep(wait_time) + else: + break except Exception as e: - logger.debug("Nonretryable error: %s", e) err = e - break + should_retry = CUSTOM_ERROR_HANDLER(e) + if should_retry: + await asyncio.sleep(wait_time) + else: + logger.debug("Nonretryable error: %s", e) + break if "'coroutine'" in str(err): # aiobotocore internal error - fetch original botocore error diff --git a/s3fs/tests/test_custom_error_handler.py b/s3fs/tests/test_custom_error_handler.py new file mode 100644 index 00000000..bd603071 --- /dev/null +++ b/s3fs/tests/test_custom_error_handler.py @@ -0,0 +1,255 @@ +"""Tests for custom error handler functionality.""" + +import asyncio +import pytest +from botocore.exceptions import ClientError + +import s3fs.core +from s3fs.core import ( + S3FileSystem, + _error_wrapper, + set_custom_error_handler, + add_retryable_error, +) + + +# Custom exception types for testing +class CustomRetryableError(Exception): + """A custom exception that should be retried.""" + + pass + + +class CustomNonRetryableError(Exception): + """A custom exception that should not be retried.""" + + pass + + +@pytest.fixture(autouse=True) +def reset_error_handler(): + """Reset the custom error handler and retryable errors after each test.""" + original_errors = s3fs.core.S3_RETRYABLE_ERRORS + yield + # Reset to default handler + s3fs.core.CUSTOM_ERROR_HANDLER = lambda e: False + # Reset retryable errors tuple + s3fs.core.S3_RETRYABLE_ERRORS = original_errors + + +def test_handler_retry_on_custom_exception(): + """Test that custom error handler allows retrying on custom exceptions.""" + call_count = 0 + + async def failing_func(): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise CustomRetryableError("Custom error that should retry") + return "success" + + # Set up custom handler to retry CustomRetryableError + def custom_handler(e): + return isinstance(e, CustomRetryableError) + + set_custom_error_handler(custom_handler) + + # Should retry and eventually succeed + async def run_test(): + result = await _error_wrapper(failing_func, retries=5) + assert result == "success" + assert call_count == 3 # Failed twice, succeeded on third attempt + + asyncio.run(run_test()) + + +def test_handler_no_retry_on_other_exception(): + """Test that custom error handler does not retry exceptions it doesn't handle.""" + call_count = 0 + + async def failing_func(): + nonlocal call_count + call_count += 1 + raise CustomNonRetryableError("Custom error that should not retry") + + # Set up custom handler that only retries CustomRetryableError + def custom_handler(e): + return isinstance(e, CustomRetryableError) + + set_custom_error_handler(custom_handler) + + # Should not retry and fail immediately + async def run_test(): + with pytest.raises(CustomNonRetryableError): + await _error_wrapper(failing_func, retries=5) + + assert call_count == 1 # Should only be called once + + asyncio.run(run_test()) + + +def test_handler_with_client_error(): + """Test that custom handler can make ClientError retryable.""" + call_count = 0 + + async def failing_func(): + nonlocal call_count + call_count += 1 + if call_count < 3: + # Create a ClientError that doesn't match the built-in retry patterns + error_response = { + "Error": { + "Code": "CustomThrottlingError", + "Message": "Custom throttling message", + } + } + raise ClientError(error_response, "operation_name") + return "success" + + # Set up custom handler to retry on specific ClientError codes + def custom_handler(e): + if isinstance(e, ClientError): + return e.response.get("Error", {}).get("Code") == "CustomThrottlingError" + return False + + set_custom_error_handler(custom_handler) + + # Should retry and eventually succeed + async def run_test(): + result = await _error_wrapper(failing_func, retries=5) + assert result == "success" + assert call_count == 3 + + asyncio.run(run_test()) + + +def test_handler_preserves_builtin_retry_pattern(): + """Test that custom handler doesn't interfere with built-in retry logic.""" + call_count = 0 + + async def failing_func(): + nonlocal call_count + call_count += 1 + if call_count < 3: + # SlowDown is a built-in retryable pattern + error_response = { + "Error": { + "Code": "SlowDown", + "Message": "Please reduce your request rate", + } + } + raise ClientError(error_response, "operation_name") + return "success" + + # Set up a custom handler that handles something else + def custom_handler(e): + return isinstance(e, CustomRetryableError) + + set_custom_error_handler(custom_handler) + + # Should still retry SlowDown errors due to built-in logic + async def run_test(): + result = await _error_wrapper(failing_func, retries=5) + assert result == "success" + assert call_count == 3 + + asyncio.run(run_test()) + + +def test_handler_max_retries(): + """Test that custom handler respects max retries.""" + call_count = 0 + + async def always_failing_func(): + nonlocal call_count + call_count += 1 + raise CustomRetryableError("Always fails") + + def custom_handler(e): + return isinstance(e, CustomRetryableError) + + set_custom_error_handler(custom_handler) + + # Should retry up to retries limit then raise + async def run_test(): + with pytest.raises(CustomRetryableError): + await _error_wrapper(always_failing_func, retries=3) + + assert call_count == 3 + + asyncio.run(run_test()) + + +def test_handler_sleep_behavior(): + """Test that retries due to custom handler also wait between attempts.""" + call_times = [] + + async def failing_func(): + call_times.append(asyncio.get_event_loop().time()) + raise CustomRetryableError("Retry me") + + def custom_handler(e): + return isinstance(e, CustomRetryableError) + + set_custom_error_handler(custom_handler) + + async def run_test(): + with pytest.raises(CustomRetryableError): + await _error_wrapper(failing_func, retries=3) + + # Should have made 3 attempts + assert len(call_times) == 3 + + # Check that there was a delay between attempts + # The wait time formula is min(1.7**i * 0.1, 15) + # For i=0: min(0.1, 15) = 0.1 + # For i=1: min(0.17, 15) = 0.17 + if len(call_times) >= 2: + time_between_first_and_second = call_times[1] - call_times[0] + # Should be roughly 0.1 seconds (with some tolerance) + assert time_between_first_and_second >= 0.05 + + asyncio.run(run_test()) + + +def test_default_handler(): + """Test behavior when custom handler is not set explicitly.""" + call_count = 0 + + async def failing_func(): + nonlocal call_count + call_count += 1 + raise ValueError("Regular exception") + + # Don't set a custom handler, use default (returns False) + # Should not retry regular exceptions + async def run_test(): + with pytest.raises(ValueError): + await _error_wrapper(failing_func, retries=5) + + assert call_count == 1 + + asyncio.run(run_test()) + + +def test_add_retryable_error(): + """Test adding a custom exception to the retryable errors tuple.""" + call_count = 0 + + async def failing_func(): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise CustomRetryableError("Custom error") + return "success" + + # Add CustomRetryableError to the retryable errors + add_retryable_error(CustomRetryableError) + + # Should now be retried automatically without custom handler + async def run_test(): + result = await _error_wrapper(failing_func, retries=5) + assert result == "success" + assert call_count == 3 + + asyncio.run(run_test())