Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
torch
torchvision
lightning-utilities
filelock <3.24 # v3.24.0 removed lock file auto-delete on Windows, breaking our cleanup logic
filelock
numpy
boto3
requests
Expand Down
23 changes: 15 additions & 8 deletions src/litdata/streaming/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,18 +539,25 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None:
if not os.path.exists(remote_filepath):
raise FileNotFoundError(f"The provided remote_path doesn't exist: {remote_filepath}")

lock_path = local_filepath + ".lock"
lock_acquired = False
with (
suppress(Timeout, FileNotFoundError),
FileLock(local_filepath + ".lock", timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0),
FileLock(lock_path, timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0),
):
if remote_filepath == local_filepath or os.path.exists(local_filepath):
return
# make an atomic operation to be safe
temp_file_path = local_filepath + ".tmp"
shutil.copy(remote_filepath, temp_file_path)
os.rename(temp_file_path, local_filepath)
lock_acquired = True
if not (remote_filepath == local_filepath or os.path.exists(local_filepath)):
# make an atomic operation to be safe
temp_file_path = local_filepath + ".tmp"
shutil.copy(remote_filepath, temp_file_path)
os.rename(temp_file_path, local_filepath)
# FileLock doesn't delete its lock file on release — we clean it up manually.
# This must happen after release (Windows can't delete open files) and after the
# work is done (on Linux, deleting an in-use lock file lets other processes lock
# on a new inode, bypassing mutual exclusion).
if lock_acquired:
with contextlib.suppress(Exception):
os.remove(local_filepath + ".lock")
os.remove(lock_path)


class HFDownloader(Downloader):
Expand Down
96 changes: 66 additions & 30 deletions src/litdata/streaming/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,55 +113,88 @@ def _decrement_local_lock(self, chunk_index: int) -> int:
chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)]

countpath = chunk_filepath + ".cnt"
with suppress(Timeout, FileNotFoundError), FileLock(countpath + ".lock", timeout=3):
if not os.path.exists(countpath):
return 0
with open(countpath) as count_f:
try:
curr_count = int(count_f.read().strip())
except Exception:
curr_count = 1
curr_count -= 1
if curr_count <= 0:
with suppress(FileNotFoundError, PermissionError):
os.remove(countpath)

with suppress(FileNotFoundError, PermissionError):
os.remove(countpath + ".lock")
lock_path = countpath + ".lock"
curr_count = 0
remove_lock = False
with suppress(Timeout, FileNotFoundError), FileLock(lock_path, timeout=3):
if os.path.exists(countpath):
with open(countpath) as count_f:
try:
curr_count = int(count_f.read().strip())
except Exception:
curr_count = 1
curr_count -= 1
if curr_count <= 0:
with suppress(FileNotFoundError, PermissionError):
os.remove(countpath)
remove_lock = True
else:
with open(countpath, "w+") as count_f:
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"}))
count_f.write(str(curr_count))
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"}))
else:
with open(countpath, "w+") as count_f:
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"}))
count_f.write(str(curr_count))
logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"}))
return curr_count
return 0
remove_lock = True
# FileLock doesn't delete its lock file on release — we clean it up manually.
# This must happen after release (Windows can't delete open files) and after the
# work is done (on Linux, deleting an in-use lock file lets other processes lock
# on a new inode, bypassing mutual exclusion).
if remove_lock:
with suppress(FileNotFoundError, PermissionError):
os.remove(lock_path)
return curr_count

def _cleanup_download_locks(self, chunk_filepath: str, chunk_index: int) -> None:
"""Remove stale download lock files for a chunk.

Download lock files (e.g. ``chunk-0-3.zstd.bin.lock``) are FileLock artifacts created
during download. They are safe to remove once the chunk exists locally, regardless of
the refcount held in ``.cnt`` files. Reference-count lock files (``.cnt.lock``) are
excluded because they may still be needed by concurrent refcount operations.

"""
base_name = os.path.basename(chunk_filepath)
base_prefix = os.path.splitext(base_name)[0]
cache_dir = os.path.dirname(chunk_filepath)
pattern = os.path.join(cache_dir, f"{base_prefix}*.lock")
matched_locks = [p for p in glob.glob(pattern) if not p.endswith(".cnt.lock")]
if matched_locks:
logger.debug(f"_apply_delete({chunk_index}): glob matched {matched_locks}")
for lock_path in matched_locks:
try:
os.remove(lock_path)
logger.debug(f"_apply_delete({chunk_index}): removed {lock_path}")
except (FileNotFoundError, PermissionError) as e:
logger.warning(f"_apply_delete({chunk_index}): failed to remove {lock_path}: {e}")
except Exception as e:
logger.warning(f"_apply_delete({chunk_index}): unexpected error removing {lock_path}: {e}")

def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None:
"""Inform the item loader of the chunk to delete."""
logger.debug(f"_apply_delete({chunk_index}, skip_lock={skip_lock}) called")
# TODO: Fix the can_delete method
can_delete_chunk = self._config.can_delete(chunk_index)
chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)]

if not skip_lock:
remaining_locks = self._remaining_locks(chunk_filepath)
if remaining_locks > 0: # Can't delete this, something has it
logger.debug(f"_apply_delete({chunk_index}): skipping data deletion, remaining_locks={remaining_locks}")
if _DEBUG:
print(f"Skip delete {chunk_filepath} by {self._rank or 0}, current lock count: {remaining_locks}")
self._cleanup_download_locks(chunk_filepath, chunk_index)
return

if _DEBUG:
with open(chunk_filepath + ".tmb", "w+") as tombstone_file:
tombstone_file.write(f"Deleted {chunk_filepath} by {self._rank or 0}. Debug: {can_delete_chunk}")

self._item_loader.delete(chunk_index, chunk_filepath)
try:
self._item_loader.delete(chunk_index, chunk_filepath)
except (FileNotFoundError, PermissionError) as e:
logger.debug(f"_apply_delete({chunk_index}): could not remove data file: {e}")

base_name = os.path.basename(chunk_filepath)
base_prefix = os.path.splitext(base_name)[0]
cache_dir = os.path.dirname(chunk_filepath)
pattern = os.path.join(cache_dir, f"{base_prefix}*.lock")
for lock_path in glob.glob(pattern):
with suppress(FileNotFoundError, PermissionError):
os.remove(lock_path)
self._cleanup_download_locks(chunk_filepath, chunk_index)

def stop(self) -> None:
"""Receive the list of the chunk indices to download for the current epoch."""
Expand Down Expand Up @@ -462,6 +495,10 @@ def read(self, index: ChunkedIndex) -> Any:
self._last_chunk_size = index.chunk_size

if index.is_last_index and self._prepare_thread:
# Close the item loader's handle on the last chunk before requesting
# deletion. On Windows, os.remove fails if the file is still open.
self._item_loader.close(self._last_chunk_index)

# inform the thread it is time to stop
self._prepare_thread._decrement_local_lock(index.chunk_index)
self._prepare_thread.delete([index.chunk_index])
Expand All @@ -475,7 +512,6 @@ def read(self, index: ChunkedIndex) -> Any:
"This can happen if the chunk files are too large."
)
self._prepare_thread = None
self._item_loader.close(self._last_chunk_index)
self._last_chunk_index = None
self._last_chunk_size = None
self._chunks_queued_for_download = False
Expand Down
17 changes: 14 additions & 3 deletions tests/streaming/test_lock_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import shutil
from contextlib import suppress
Expand Down Expand Up @@ -40,7 +41,10 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: # t


@pytest.mark.skipif(not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")
def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir):
def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir, caplog):
# Enable debug logging so _apply_delete diagnostics appear in CI output
caplog.set_level(logging.DEBUG, logger="litdata.streaming.reader")

cache_dir = os.path.join(tmpdir, "cache_dir")
remote_dir = os.path.join(tmpdir, "remote_dir")
os.makedirs(cache_dir, exist_ok=True)
Expand Down Expand Up @@ -74,8 +78,15 @@ def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir):
chunk_idx = ChunkedIndex(index=idx[0], chunk_index=idx[1], is_last_index=(i == 9))
reader.read(chunk_idx)

# Diagnostic: dump all files and captured logs before asserting
all_files = sorted(os.listdir(cache_dir))
print(f"\n[DIAG] All files in cache_dir: {all_files}")
print("[DIAG] Captured log messages:")
for record in caplog.records:
print(f" [{record.levelname}] {record.message}")

# At the end, no chunk-related lock files should remain
leftover_locks = [f for f in os.listdir(cache_dir) if f.endswith(".lock") and f.startswith("chunk-")]
assert leftover_locks == []
leftover_locks = [f for f in all_files if f.endswith(".lock") and f.startswith("chunk-")]
assert leftover_locks == [], f"Leftover locks: {leftover_locks}, all files: {all_files}"
finally:
unregister_downloader(prefix)
Loading