diff --git a/requirements.txt b/requirements.txt index 9146aa30..b2c75416 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ torch torchvision lightning-utilities -filelock <3.24 # v3.24.0 removed lock file auto-delete on Windows, breaking our cleanup logic +filelock numpy boto3 requests diff --git a/src/litdata/streaming/downloader.py b/src/litdata/streaming/downloader.py index 323b7626..53558bee 100644 --- a/src/litdata/streaming/downloader.py +++ b/src/litdata/streaming/downloader.py @@ -539,18 +539,25 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: if not os.path.exists(remote_filepath): raise FileNotFoundError(f"The provided remote_path doesn't exist: {remote_filepath}") + lock_path = local_filepath + ".lock" + lock_acquired = False with ( suppress(Timeout, FileNotFoundError), - FileLock(local_filepath + ".lock", timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0), + FileLock(lock_path, timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0), ): - if remote_filepath == local_filepath or os.path.exists(local_filepath): - return - # make an atomic operation to be safe - temp_file_path = local_filepath + ".tmp" - shutil.copy(remote_filepath, temp_file_path) - os.rename(temp_file_path, local_filepath) + lock_acquired = True + if not (remote_filepath == local_filepath or os.path.exists(local_filepath)): + # make an atomic operation to be safe + temp_file_path = local_filepath + ".tmp" + shutil.copy(remote_filepath, temp_file_path) + os.rename(temp_file_path, local_filepath) + # FileLock doesn't delete its lock file on release — we clean it up manually. + # This must happen after release (Windows can't delete open files) and after the + # work is done (on Linux, deleting an in-use lock file lets other processes lock + # on a new inode, bypassing mutual exclusion). + if lock_acquired: with contextlib.suppress(Exception): - os.remove(local_filepath + ".lock") + os.remove(lock_path) class HFDownloader(Downloader): diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 4077f18b..ab2ea856 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -113,31 +113,65 @@ def _decrement_local_lock(self, chunk_index: int) -> int: chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)] countpath = chunk_filepath + ".cnt" - with suppress(Timeout, FileNotFoundError), FileLock(countpath + ".lock", timeout=3): - if not os.path.exists(countpath): - return 0 - with open(countpath) as count_f: - try: - curr_count = int(count_f.read().strip()) - except Exception: - curr_count = 1 - curr_count -= 1 - if curr_count <= 0: - with suppress(FileNotFoundError, PermissionError): - os.remove(countpath) - - with suppress(FileNotFoundError, PermissionError): - os.remove(countpath + ".lock") + lock_path = countpath + ".lock" + curr_count = 0 + remove_lock = False + with suppress(Timeout, FileNotFoundError), FileLock(lock_path, timeout=3): + if os.path.exists(countpath): + with open(countpath) as count_f: + try: + curr_count = int(count_f.read().strip()) + except Exception: + curr_count = 1 + curr_count -= 1 + if curr_count <= 0: + with suppress(FileNotFoundError, PermissionError): + os.remove(countpath) + remove_lock = True + else: + with open(countpath, "w+") as count_f: + logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"})) + count_f.write(str(curr_count)) + logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"})) else: - with open(countpath, "w+") as count_f: - logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"})) - count_f.write(str(curr_count)) - logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"})) - return curr_count - return 0 + remove_lock = True + # FileLock doesn't delete its lock file on release — we clean it up manually. + # This must happen after release (Windows can't delete open files) and after the + # work is done (on Linux, deleting an in-use lock file lets other processes lock + # on a new inode, bypassing mutual exclusion). + if remove_lock: + with suppress(FileNotFoundError, PermissionError): + os.remove(lock_path) + return curr_count + + def _cleanup_download_locks(self, chunk_filepath: str, chunk_index: int) -> None: + """Remove stale download lock files for a chunk. + + Download lock files (e.g. ``chunk-0-3.zstd.bin.lock``) are FileLock artifacts created + during download. They are safe to remove once the chunk exists locally, regardless of + the refcount held in ``.cnt`` files. Reference-count lock files (``.cnt.lock``) are + excluded because they may still be needed by concurrent refcount operations. + + """ + base_name = os.path.basename(chunk_filepath) + base_prefix = os.path.splitext(base_name)[0] + cache_dir = os.path.dirname(chunk_filepath) + pattern = os.path.join(cache_dir, f"{base_prefix}*.lock") + matched_locks = [p for p in glob.glob(pattern) if not p.endswith(".cnt.lock")] + if matched_locks: + logger.debug(f"_apply_delete({chunk_index}): glob matched {matched_locks}") + for lock_path in matched_locks: + try: + os.remove(lock_path) + logger.debug(f"_apply_delete({chunk_index}): removed {lock_path}") + except (FileNotFoundError, PermissionError) as e: + logger.warning(f"_apply_delete({chunk_index}): failed to remove {lock_path}: {e}") + except Exception as e: + logger.warning(f"_apply_delete({chunk_index}): unexpected error removing {lock_path}: {e}") def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: """Inform the item loader of the chunk to delete.""" + logger.debug(f"_apply_delete({chunk_index}, skip_lock={skip_lock}) called") # TODO: Fix the can_delete method can_delete_chunk = self._config.can_delete(chunk_index) chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)] @@ -145,23 +179,22 @@ def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: if not skip_lock: remaining_locks = self._remaining_locks(chunk_filepath) if remaining_locks > 0: # Can't delete this, something has it + logger.debug(f"_apply_delete({chunk_index}): skipping data deletion, remaining_locks={remaining_locks}") if _DEBUG: print(f"Skip delete {chunk_filepath} by {self._rank or 0}, current lock count: {remaining_locks}") + self._cleanup_download_locks(chunk_filepath, chunk_index) return if _DEBUG: with open(chunk_filepath + ".tmb", "w+") as tombstone_file: tombstone_file.write(f"Deleted {chunk_filepath} by {self._rank or 0}. Debug: {can_delete_chunk}") - self._item_loader.delete(chunk_index, chunk_filepath) + try: + self._item_loader.delete(chunk_index, chunk_filepath) + except (FileNotFoundError, PermissionError) as e: + logger.debug(f"_apply_delete({chunk_index}): could not remove data file: {e}") - base_name = os.path.basename(chunk_filepath) - base_prefix = os.path.splitext(base_name)[0] - cache_dir = os.path.dirname(chunk_filepath) - pattern = os.path.join(cache_dir, f"{base_prefix}*.lock") - for lock_path in glob.glob(pattern): - with suppress(FileNotFoundError, PermissionError): - os.remove(lock_path) + self._cleanup_download_locks(chunk_filepath, chunk_index) def stop(self) -> None: """Receive the list of the chunk indices to download for the current epoch.""" @@ -462,6 +495,10 @@ def read(self, index: ChunkedIndex) -> Any: self._last_chunk_size = index.chunk_size if index.is_last_index and self._prepare_thread: + # Close the item loader's handle on the last chunk before requesting + # deletion. On Windows, os.remove fails if the file is still open. + self._item_loader.close(self._last_chunk_index) + # inform the thread it is time to stop self._prepare_thread._decrement_local_lock(index.chunk_index) self._prepare_thread.delete([index.chunk_index]) @@ -475,7 +512,6 @@ def read(self, index: ChunkedIndex) -> Any: "This can happen if the chunk files are too large." ) self._prepare_thread = None - self._item_loader.close(self._last_chunk_index) self._last_chunk_index = None self._last_chunk_size = None self._chunks_queued_for_download = False diff --git a/tests/streaming/test_lock_cleanup.py b/tests/streaming/test_lock_cleanup.py index c9deb5e1..bfce226f 100644 --- a/tests/streaming/test_lock_cleanup.py +++ b/tests/streaming/test_lock_cleanup.py @@ -1,3 +1,4 @@ +import logging import os import shutil from contextlib import suppress @@ -40,7 +41,10 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: # t @pytest.mark.skipif(not _ZSTD_AVAILABLE, reason="Requires: ['zstd']") -def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir): +def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir, caplog): + # Enable debug logging so _apply_delete diagnostics appear in CI output + caplog.set_level(logging.DEBUG, logger="litdata.streaming.reader") + cache_dir = os.path.join(tmpdir, "cache_dir") remote_dir = os.path.join(tmpdir, "remote_dir") os.makedirs(cache_dir, exist_ok=True) @@ -74,8 +78,15 @@ def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir): chunk_idx = ChunkedIndex(index=idx[0], chunk_index=idx[1], is_last_index=(i == 9)) reader.read(chunk_idx) + # Diagnostic: dump all files and captured logs before asserting + all_files = sorted(os.listdir(cache_dir)) + print(f"\n[DIAG] All files in cache_dir: {all_files}") + print("[DIAG] Captured log messages:") + for record in caplog.records: + print(f" [{record.levelname}] {record.message}") + # At the end, no chunk-related lock files should remain - leftover_locks = [f for f in os.listdir(cache_dir) if f.endswith(".lock") and f.startswith("chunk-")] - assert leftover_locks == [] + leftover_locks = [f for f in all_files if f.endswith(".lock") and f.startswith("chunk-")] + assert leftover_locks == [], f"Leftover locks: {leftover_locks}, all files: {all_files}" finally: unregister_downloader(prefix)