From 0e75af37d4ac3f9ad30d2cac7ed08008b5e8abb1 Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 18 Feb 2026 01:32:18 +0000 Subject: [PATCH 01/15] fix: pass None instead of empty string for uint64 version field in _create_dataset The SDK bump in #783 switched to DatasetServiceCreateDatasetBody and added `or ""` fallbacks for all fields. This is valid for string fields but version is a uint64 in protobuf, so "" causes a 400 Bad Request when saving to Lightning storage. Revert to passing None for version when no value is provided, matching the pre-bump behaviour. --- src/litdata/processing/utilities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/processing/utilities.py b/src/litdata/processing/utilities.py index 9d1f6741..83090a34 100644 --- a/src/litdata/processing/utilities.py +++ b/src/litdata/processing/utilities.py @@ -85,7 +85,7 @@ def _create_dataset( num_bytes_per_chunk=num_bytes_per_chunk or [], storage_dir=storage_dir, type=dataset_type, - version=version or "", + version=version, ), project_id=project_id, ) From 968ea25e97f2547e39349114b7788b88d55d922f Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 18 Feb 2026 08:48:47 +0000 Subject: [PATCH 02/15] update --- setup.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index d391c1b7..a17e69fe 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,8 @@ from importlib.util import module_from_spec, spec_from_file_location from pathlib import Path -from pkg_resources import parse_requirements +from lightning_utilities.install.requirements import _parse_requirements +from packaging.requirements import Requirement from setuptools import find_packages, setup _PATH_ROOT = os.path.dirname(__file__) @@ -20,7 +21,7 @@ def _load_py_module(fname, pkg="litdata"): def _load_requirements(path_dir: str = _PATH_ROOT, file_name: str = "requirements.txt") -> list: - reqs = parse_requirements(open(os.path.join(path_dir, file_name)).readlines()) + reqs = _parse_requirements(open(os.path.join(path_dir, file_name)).readlines()) return list(map(str, reqs)) From 1ba2efb6aebdfdea062ccac52e11cb1c76247574 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 18 Feb 2026 08:50:45 +0000 Subject: [PATCH 03/15] update --- setup.py | 4 +- src/litdata/requirements.py | 192 ++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 src/litdata/requirements.py diff --git a/setup.py b/setup.py index a17e69fe..34463288 100644 --- a/setup.py +++ b/setup.py @@ -4,10 +4,10 @@ from importlib.util import module_from_spec, spec_from_file_location from pathlib import Path -from lightning_utilities.install.requirements import _parse_requirements -from packaging.requirements import Requirement from setuptools import find_packages, setup +from litdata.requirements import _parse_requirements + _PATH_ROOT = os.path.dirname(__file__) _PATH_SOURCE = os.path.join(_PATH_ROOT, "src") _PATH_REQUIRES = os.path.join(_PATH_ROOT, "requirements") diff --git a/src/litdata/requirements.py b/src/litdata/requirements.py new file mode 100644 index 00000000..f473fd1a --- /dev/null +++ b/src/litdata/requirements.py @@ -0,0 +1,192 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# http://www.apache.org/licenses/LICENSE-2.0 +# +"""Utilities to parse and adjust Python requirements files. + +This module parses requirement lines while preserving inline comments and pip arguments and +supports relaxing version pins based on a chosen unfreeze strategy: "none", "major", or "all". + +""" + +import re +from collections.abc import Iterable, Iterator +from pathlib import Path +from typing import Any + +from packaging.requirements import Requirement +from packaging.version import Version + + +def _yield_lines(strs: str | Iterable[str]) -> Iterator[str]: + """Yield non-empty, non-comment lines from a string or iterable of strings. + + Adapted from pkg_resources.yield_lines. + + """ + if isinstance(strs, str): + strs = strs.splitlines() + for line in strs: + line = line.strip() + if line and not line.startswith("#"): + yield line + + +class _RequirementWithComment(Requirement): + """Requirement subclass that preserves an inline comment and optional pip argument. + + Attributes: + comment: The trailing comment captured from the requirement line (including the leading '# ...'). + pip_argument: A preceding pip argument line (e.g., ``"--extra-index-url ..."``) associated + with this requirement, or ``None`` if not provided. + strict: Whether the special marker ``"# strict"`` appears in ``comment`` (case-insensitive), in which case + upper bound adjustments are disabled. + + """ + + strict_string = "# strict" + + def __init__(self, *args: Any, comment: str = "", pip_argument: str | None = None, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.comment = comment + if not (pip_argument is None or pip_argument): # sanity check that it's not an empty str + raise RuntimeError(f"wrong pip argument: {pip_argument}") + self.pip_argument = pip_argument + self.strict = self.strict_string in comment.lower() + + def adjust(self, unfreeze: str) -> str: + """Adjust version specifiers according to the selected unfreeze strategy. + + The special marker ``"# strict"`` in the captured comment disables any relaxation of upper bounds. + + >>> _RequirementWithComment("arrow<=1.2.2,>=1.2.0", comment="# anything").adjust("none") + 'arrow<=1.2.2,>=1.2.0' + >>> _RequirementWithComment("arrow<=1.2.2,>=1.2.0", comment="# strict").adjust("none") + 'arrow<=1.2.2,>=1.2.0 # strict' + >>> _RequirementWithComment("arrow<=1.2.2,>=1.2.0", comment="# my name").adjust("all") + 'arrow>=1.2.0' + >>> _RequirementWithComment("arrow>=1.2.0, <=1.2.2", comment="# strict").adjust("all") + 'arrow<=1.2.2,>=1.2.0 # strict' + >>> _RequirementWithComment("arrow").adjust("all") + 'arrow' + >>> _RequirementWithComment("arrow>=1.2.0, <=1.2.2", comment="# cool").adjust("major") + 'arrow<2.0,>=1.2.0' + >>> _RequirementWithComment("arrow>=1.2.0, <=1.2.2", comment="# strict").adjust("major") + 'arrow<=1.2.2,>=1.2.0 # strict' + >>> _RequirementWithComment("arrow>=1.2.0").adjust("major") + 'arrow>=1.2.0' + >>> _RequirementWithComment("arrow").adjust("major") + 'arrow' + + Args: + unfreeze: One of: + - ``"none"``: Keep all version specifiers unchanged. + - ``"major"``: Relax the upper bound to the next major version (e.g., ``<2.0``). + - ``"all"``: Drop any upper bound constraint entirely. + + Returns: + The adjusted requirement string. If strict, the original string is returned with the strict marker appended. + + """ + out = str(self) + if self.strict: + return f"{out} {self.strict_string}" + if unfreeze == "major": + for spec in self.specifier: + if spec.operator in ("<", "<="): + major = Version(spec.version).major + # replace upper bound with major version increased by one + return out.replace(f"{spec.operator}{spec.version}", f"<{int(major) + 1}.0") + elif unfreeze == "all": + for spec in self.specifier: + if spec.operator in ("<", "<="): + # drop upper bound (with or without trailing/leading comma) + upper = f"{spec.operator}{spec.version}" + result = out.replace(f"{upper},", "").replace(f",{upper}", "") + if upper in result: + result = result.replace(upper, "") + return result.strip() + elif unfreeze != "none": + raise ValueError(f"Unexpected unfreeze: {unfreeze!r} value.") + return out + + +def _parse_requirements(strs: str | Iterable[str]) -> Iterator[_RequirementWithComment]: + r"""Adapted from ``pkg_resources.parse_requirements`` to include comments and pip arguments. + + Parses a sequence or string of requirement lines, preserving trailing comments and associating any + preceding pip arguments (``--...``) with the subsequent requirement. Lines starting with ``-r`` or + containing direct URLs are ignored. + + >>> txt = ['# ignored', '', 'this # is an', '--piparg', 'example', 'foo # strict', 'thing', '-r different/file.txt'] + >>> [r.adjust('none') for r in _parse_requirements(txt)] + ['this', 'example', 'foo # strict', 'thing'] + >>> txt = '\\n'.join(txt) + >>> [r.adjust('none') for r in _parse_requirements(txt)] + ['this', 'example', 'foo # strict', 'thing'] + + Args: + strs: Either an iterable of requirement lines or a single multi-line string. + + Yields: + _RequirementWithComment: Parsed requirement objects with preserved comment and pip argument. + + """ + lines = _yield_lines(strs) + pip_argument = None + for line in lines: + # Drop comments -- a hash without a space may be in a URL. + if " #" in line: + comment_pos = line.find(" #") + line, comment = line[:comment_pos], line[comment_pos:] + else: + comment = "" + # If there is a line continuation, drop it, and append the next line. + if line.endswith("\\"): + line = line[:-1].strip() + try: + line += next(lines) + except StopIteration: + return + # If there's a pip argument, save it + if line.startswith("--"): + pip_argument = line + continue + if line.startswith("-r "): + # linked requirement files are unsupported + continue + if "@" in line or re.search("https?://", line): + # skip lines with links like `pesq @ git+https://github.com/ludlows/python-pesq` + continue + yield _RequirementWithComment(line, comment=comment, pip_argument=pip_argument) + pip_argument = None + + +def load_requirements(path_dir: str, file_name: str = "base.txt", unfreeze: str = "all") -> list[str]: + """Load, parse, and optionally relax requirement specifiers from a file. + + >>> import os + >>> from lightning_utilities import _PROJECT_ROOT + >>> path_req = os.path.join(_PROJECT_ROOT, "requirements") + >>> load_requirements(path_req, "docs.txt", unfreeze="major") # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + ['sphinx<6.0,>=4.0', ...] + + Args: + path_dir: Directory containing the requirements file. + file_name: The requirements filename inside ``path_dir``. + unfreeze: Unfreeze strategy: ``"none"``, ``"major"``, or ``"all"`` (see ``_RequirementWithComment.adjust``). + + Returns: + A list of requirement strings adjusted according to ``unfreeze``. + + Raises: + ValueError: If ``unfreeze`` is not one of the supported options. + FileNotFoundError: If the composed path does not exist. + + """ + if unfreeze not in {"none", "major", "all"}: + raise ValueError(f'unsupported option of "{unfreeze}"') + path = Path(path_dir) / file_name + if not path.exists(): + raise FileNotFoundError(f"missing file for {(path_dir, file_name, path)}") + text = path.read_text() + return [req.adjust(unfreeze) for req in _parse_requirements(text)] From 1c82b89d45b7a6ac0b81875840e8ec2011a984d0 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 18 Feb 2026 08:52:32 +0000 Subject: [PATCH 04/15] update --- src/litdata/imports.py | 292 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 267 insertions(+), 25 deletions(-) diff --git a/src/litdata/imports.py b/src/litdata/imports.py index f8913786..d458fa2d 100644 --- a/src/litdata/imports.py +++ b/src/litdata/imports.py @@ -1,27 +1,32 @@ # Copyright The Lightning AI team. # Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# # http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +import functools import importlib +import os +import warnings +from collections.abc import Callable from functools import lru_cache +from importlib.metadata import PackageNotFoundError, distribution +from importlib.metadata import version as _version from importlib.util import find_spec -from typing import TypeVar +from types import ModuleType +from typing import Any, TypeVar -import pkg_resources +from packaging.requirements import Requirement +from packaging.version import InvalidVersion, Version from typing_extensions import ParamSpec T = TypeVar("T") P = ParamSpec("P") +try: + from importlib import metadata +except ImportError: + # Python < 3.8 + import importlib_metadata as metadata # type: ignore + @lru_cache def package_available(package_name: str) -> bool: @@ -61,6 +66,30 @@ def module_available(module_path: str) -> bool: return True +def compare_version(package: str, op: Callable, version: str, use_base_version: bool = False) -> bool: + """Compare package version with some requirements. + + >>> compare_version("torch", operator.ge, "0.1") + True + >>> compare_version("does_not_exist", operator.ge, "0.0") + False + + """ + try: + pkg = importlib.import_module(package) + except (ImportError, RuntimeError): + return False + try: + # Use importlib.metadata to infer version + pkg_version = Version(pkg.__version__) if hasattr(pkg, "__version__") else Version(_version(package)) + except (TypeError, PackageNotFoundError): + # this is mocked by Sphinx, so it should return True to generate all summaries + return True + if use_base_version: + pkg_version = Version(pkg_version.base_version) + return op(pkg_version, Version(version)) + + class RequirementCache: """Boolean-like class to check for requirement and module availability. @@ -80,42 +109,255 @@ class RequirementCache: True >>> bool(RequirementCache("unknown_package")) False + >>> bool(RequirementCache(module="torch.utils")) + True + >>> bool(RequirementCache(module="unknown_package")) + False + >>> bool(RequirementCache(module="unknown.module.path")) + False """ - def __init__(self, requirement: str, module: str | None = None) -> None: + def __init__(self, requirement: str | None = None, module: str | None = None) -> None: + if not (requirement or module): + raise ValueError("At least one arguments need to be set.") self.requirement = requirement self.module = module def _check_requirement(self) -> None: - if hasattr(self, "available"): - return + if not self.requirement: + raise ValueError("Requirement name is required.") try: - # first try the pkg_resources requirement - pkg_resources.require(self.requirement) - self.available = True - self.message = f"Requirement {self.requirement!r} met" - except Exception as ex: + req = Requirement(self.requirement) + pkg_version = Version(_version(req.name)) + self.available = req.specifier.contains(pkg_version, prereleases=True) and ( + not req.extras or self._check_extras_available(req) + ) + except (PackageNotFoundError, InvalidVersion) as ex: self.available = False - self.message = f"{ex.__class__.__name__}: {ex}.\n HINT: Try running `pip install -U {self.requirement!r}`" - requirement_contains_version_specifier = any(c in self.requirement for c in "=<>") - if not requirement_contains_version_specifier or self.module is not None: + self.message = f"{ex.__class__.__name__}: {ex}. HINT: Try running `pip install -U {self.requirement!r}`" + + if self.available: + self.message = f"Requirement {self.requirement!r} met" + else: + req_include_version = any(c in self.requirement for c in "=<>") + if not req_include_version or self.module is not None: module = self.requirement if self.module is None else self.module - # sometimes `pkg_resources.require()` fails but the module is importable + # Sometimes `importlib.metadata.version` fails but the module is importable self.available = module_available(module) if self.available: self.message = f"Module {module!r} available" + self.message = ( + f"Requirement {self.requirement!r} not met. HINT: Try running `pip install -U {self.requirement!r}`" + ) + + def _check_module(self) -> None: + if not self.module: + raise ValueError("Module name is required.") + self.available = module_available(self.module) + if self.available: + self.message = f"Module {self.module!r} available" + else: + self.message = f"Module not found: {self.module!r}. HINT: Try running `pip install -U {self.module}`" + + def _check_available(self) -> None: + if hasattr(self, "available"): + return + if self.requirement: + self._check_requirement() + if getattr(self, "available", True) and self.module: + self._check_module() + + def _check_extras_available(self, requirement: Requirement) -> bool: + if not requirement.extras: + return True + + extra_requirements = self._get_extra_requirements(requirement) + + if not extra_requirements: + # The specified extra is not found in the package metadata + return False + + # Verify each extra requirement is installed + for extra_req in extra_requirements: + try: + extra_dist = distribution(extra_req.name) + extra_installed_version = Version(extra_dist.version) + if extra_req.specifier and not extra_req.specifier.contains(extra_installed_version, prereleases=True): + return False + except importlib.metadata.PackageNotFoundError: + return False + + return True + + def _get_extra_requirements(self, requirement: Requirement) -> list[Requirement]: + dist = distribution(requirement.name) + # Get the required dependencies for the specified extras + extra_requirements = dist.metadata.get_all("Requires-Dist") or [] + return [Requirement(r) for r in extra_requirements if any(extra in r for extra in requirement.extras)] def __bool__(self) -> bool: """Format as bool.""" - self._check_requirement() + self._check_available() return self.available def __str__(self) -> str: """Format as string.""" - self._check_requirement() + self._check_available() return self.message def __repr__(self) -> str: """Format as string.""" return self.__str__() + + +class ModuleAvailableCache(RequirementCache): + """Boolean-like class for check of module availability. + + >>> ModuleAvailableCache("torch") + Module 'torch' available + >>> bool(ModuleAvailableCache("torch.utils")) + True + >>> bool(ModuleAvailableCache("unknown_package")) + False + >>> bool(ModuleAvailableCache("unknown.module.path")) + False + + """ + + def __init__(self, module: str) -> None: + warnings.warn( + "`ModuleAvailableCache` is a special case of `RequirementCache`." + " Please use `RequirementCache(module=...)` instead.", + DeprecationWarning, + stacklevel=4, + ) + super().__init__(module=module) + + +def get_dependency_min_version_spec(package_name: str, dependency_name: str) -> str: + """Return the minimum version specifier of a dependency of a package. + + >>> get_dependency_min_version_spec("pytorch-lightning==1.8.0", "jsonargparse") + '>=4.12.0' + + """ + dependencies = metadata.requires(package_name) or [] + for dep in dependencies: + dependency = Requirement(dep) + if dependency.name == dependency_name: + spec = [str(s) for s in dependency.specifier if str(s)[0] == ">"] + return spec[0] if spec else "" + raise ValueError( + "This is an internal error. Please file a GitHub issue with the error message. Dependency " + f"{dependency_name!r} not found in package {package_name!r}." + ) + + +class LazyModule(ModuleType): + """Proxy module that lazily imports the underlying module the first time it is actually used. + + Args: + module_name: the fully-qualified module name to import + callback: a callback function to call before importing the module + + """ + + def __init__(self, module_name: str, callback: Callable | None = None) -> None: + super().__init__(module_name) + self._module: Any = None + self._callback = callback + + def __getattr__(self, item: str) -> Any: + """Lazily import the underlying module and delegate attribute access to it.""" + if self._module is None: + self._import_module() + + return getattr(self._module, item) + + def __dir__(self) -> list[str]: + """Lazily import the underlying module and return its attributes for introspection (dir()).""" + if self._module is None: + self._import_module() + + return dir(self._module) + + def _import_module(self) -> None: + # Execute callback, if any + if self._callback is not None: + self._callback() + + # Actually import the module + self._module = importlib.import_module(self.__name__) + + # Update this object's dict so that attribute references are efficient + # (__getattr__ is only called on lookups that fail) + self.__dict__.update(self._module.__dict__) + + +def lazy_import(module_name: str, callback: Callable | None = None) -> LazyModule: + """Return a proxy module object that will lazily import the given module the first time it is used. + + Example usage: + + # Lazy version of `import tensorflow as tf` + tf = lazy_import("tensorflow") + # Other commands + # Now the module is loaded + tf.__version__ + + Args: + module_name: the fully-qualified module name to import + callback: a callback function to call before importing the module + + Returns: + a proxy module object that will be lazily imported when first used + + """ + return LazyModule(module_name, callback=callback) + + +def requires(*module_path_version: str, raise_exception: bool = True) -> Callable[[Callable[P, T]], Callable[P, T]]: + """Decorator to check optional dependencies at call time with a clear error/warning message. + + Args: + module_path_version: Python module paths (e.g., ``"torch.cuda"``) and/or pip-style requirements + (e.g., ``"torch>=2.0.0"``) to verify. + raise_exception: If ``True``, raise ``ModuleNotFoundError`` when requirements are not satisfied; + otherwise emit a warning and proceed to call the function. + + Example: + >>> @requires("libpath", raise_exception=bool(int(os.getenv("LIGHTING_TESTING", "0")))) + ... def my_cwd(): + ... from pathlib import Path + ... return Path(__file__).parent + + >>> class MyRndPower: + ... @requires("math", "random") + ... def __init__(self): + ... from math import pow + ... from random import randint + ... self._rnd = pow(randint(1, 9), 2) + + """ + + def decorator(func: Callable[P, T]) -> Callable[P, T]: + reqs = [ + ModuleAvailableCache(mod_ver) if "." in mod_ver else RequirementCache(mod_ver) + for mod_ver in module_path_version + ] + available = all(map(bool, reqs)) + + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + if not available: + missing = os.linesep.join([repr(r) for r in reqs if not bool(r)]) + msg = f"Required dependencies not available: \n{missing}" + if raise_exception: + raise ModuleNotFoundError(msg) + warnings.warn(msg, stacklevel=2) + return func(*args, **kwargs) + + return wrapper + + return decorator From 623f2e91f4f0d338b0b9c1525a503d29346d4553 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 18 Feb 2026 08:54:15 +0000 Subject: [PATCH 05/15] update --- setup.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 34463288..bef4bbaa 100644 --- a/setup.py +++ b/setup.py @@ -6,8 +6,6 @@ from setuptools import find_packages, setup -from litdata.requirements import _parse_requirements - _PATH_ROOT = os.path.dirname(__file__) _PATH_SOURCE = os.path.join(_PATH_ROOT, "src") _PATH_REQUIRES = os.path.join(_PATH_ROOT, "requirements") @@ -20,8 +18,12 @@ def _load_py_module(fname, pkg="litdata"): return py +about = _load_py_module("__about__.py") +requirements_module = _load_py_module("requirements.py") + + def _load_requirements(path_dir: str = _PATH_ROOT, file_name: str = "requirements.txt") -> list: - reqs = _parse_requirements(open(os.path.join(path_dir, file_name)).readlines()) + reqs = requirements_module._parse_requirements(open(os.path.join(path_dir, file_name)).readlines()) return list(map(str, reqs)) From 24bc1c5f19f8af53e3917cd23c11804bbff2e3f1 Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 18 Feb 2026 01:47:47 +0000 Subject: [PATCH 06/15] fix: Further fixes for uint64 defaults --- src/litdata/processing/utilities.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/litdata/processing/utilities.py b/src/litdata/processing/utilities.py index 83090a34..afd88399 100644 --- a/src/litdata/processing/utilities.py +++ b/src/litdata/processing/utilities.py @@ -68,6 +68,10 @@ def _create_dataset( client = LightningClient(retry=False) try: + # Some strings represent protobuf strings, some protouf uint64s + # The uint64s need a default of None so they're not added to the + # request body, which avoids a 400 response due to an invalid request. + # The protobuf string types can default to "" just fine. client.dataset_service_create_dataset( body=DatasetServiceCreateDatasetBody( cloud_space_id=(studio_id if lightning_app_id is None else None) or "", @@ -77,11 +81,11 @@ def _create_dataset( input_dir=input_dir or "", lightning_app_id=lightning_app_id or "", name=name or "", - size=size or "", - num_bytes=num_bytes or "", + size=size, + num_bytes=num_bytes, data_format=(str(data_format) if data_format else data_format) or "", compression=compression or "", - num_chunks=num_chunks or "", + num_chunks=num_chunks, num_bytes_per_chunk=num_bytes_per_chunk or [], storage_dir=storage_dir, type=dataset_type, From 32fc1ddcf5915e9294c2c5eaa2acca2f50f435a6 Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 18 Feb 2026 11:21:43 +0000 Subject: [PATCH 07/15] fix: Fixes tests failures on windows --- src/litdata/streaming/downloader.py | 23 +++++++----- src/litdata/streaming/reader.py | 54 ++++++++++++++++++----------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/src/litdata/streaming/downloader.py b/src/litdata/streaming/downloader.py index 323b7626..53558bee 100644 --- a/src/litdata/streaming/downloader.py +++ b/src/litdata/streaming/downloader.py @@ -539,18 +539,25 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: if not os.path.exists(remote_filepath): raise FileNotFoundError(f"The provided remote_path doesn't exist: {remote_filepath}") + lock_path = local_filepath + ".lock" + lock_acquired = False with ( suppress(Timeout, FileNotFoundError), - FileLock(local_filepath + ".lock", timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0), + FileLock(lock_path, timeout=1 if remote_filepath.endswith(_INDEX_FILENAME) else 0), ): - if remote_filepath == local_filepath or os.path.exists(local_filepath): - return - # make an atomic operation to be safe - temp_file_path = local_filepath + ".tmp" - shutil.copy(remote_filepath, temp_file_path) - os.rename(temp_file_path, local_filepath) + lock_acquired = True + if not (remote_filepath == local_filepath or os.path.exists(local_filepath)): + # make an atomic operation to be safe + temp_file_path = local_filepath + ".tmp" + shutil.copy(remote_filepath, temp_file_path) + os.rename(temp_file_path, local_filepath) + # FileLock doesn't delete its lock file on release — we clean it up manually. + # This must happen after release (Windows can't delete open files) and after the + # work is done (on Linux, deleting an in-use lock file lets other processes lock + # on a new inode, bypassing mutual exclusion). + if lock_acquired: with contextlib.suppress(Exception): - os.remove(local_filepath + ".lock") + os.remove(lock_path) class HFDownloader(Downloader): diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 4077f18b..9dcc17f2 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -113,28 +113,40 @@ def _decrement_local_lock(self, chunk_index: int) -> int: chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)] countpath = chunk_filepath + ".cnt" - with suppress(Timeout, FileNotFoundError), FileLock(countpath + ".lock", timeout=3): - if not os.path.exists(countpath): - return 0 - with open(countpath) as count_f: - try: - curr_count = int(count_f.read().strip()) - except Exception: - curr_count = 1 - curr_count -= 1 - if curr_count <= 0: - with suppress(FileNotFoundError, PermissionError): - os.remove(countpath) - - with suppress(FileNotFoundError, PermissionError): - os.remove(countpath + ".lock") + lock_path = countpath + ".lock" + curr_count = 0 + remove_lock = False + with suppress(Timeout, FileNotFoundError), FileLock(lock_path, timeout=3): + if os.path.exists(countpath): + with open(countpath) as count_f: + try: + curr_count = int(count_f.read().strip()) + except Exception: + curr_count = 1 + curr_count -= 1 + if curr_count <= 0: + with suppress(FileNotFoundError, PermissionError): + os.remove(countpath) + remove_lock = True + else: + with open(countpath, "w+") as count_f: + logger.debug( + _get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"}) + ) + count_f.write(str(curr_count)) + logger.debug( + _get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"}) + ) else: - with open(countpath, "w+") as count_f: - logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"})) - count_f.write(str(curr_count)) - logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"})) - return curr_count - return 0 + remove_lock = True + # FileLock doesn't delete its lock file on release — we clean it up manually. + # This must happen after release (Windows can't delete open files) and after the + # work is done (on Linux, deleting an in-use lock file lets other processes lock + # on a new inode, bypassing mutual exclusion). + if remove_lock: + with suppress(FileNotFoundError, PermissionError): + os.remove(lock_path) + return curr_count def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: """Inform the item loader of the chunk to delete.""" From 65db4b52e4a7c06280698578421e7486d543ccef Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:24:39 +0000 Subject: [PATCH 08/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/streaming/reader.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 9dcc17f2..4802c935 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -130,13 +130,9 @@ def _decrement_local_lock(self, chunk_index: int) -> int: remove_lock = True else: with open(countpath, "w+") as count_f: - logger.debug( - _get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"}) - ) + logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "B"})) count_f.write(str(curr_count)) - logger.debug( - _get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"}) - ) + logger.debug(_get_log_msg({"name": f"decrement_lock_{chunk_index}_to_{curr_count}", "ph": "E"})) else: remove_lock = True # FileLock doesn't delete its lock file on release — we clean it up manually. From 055729e1225c968d7e3a6c1a5306147f82fbcd64 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 18 Feb 2026 11:32:36 +0000 Subject: [PATCH 09/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/processing/utilities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/processing/utilities.py b/src/litdata/processing/utilities.py index afd88399..2eac0563 100644 --- a/src/litdata/processing/utilities.py +++ b/src/litdata/processing/utilities.py @@ -69,7 +69,7 @@ def _create_dataset( try: # Some strings represent protobuf strings, some protouf uint64s - # The uint64s need a default of None so they're not added to the + # The uint64s need a default of None so they're not added to the # request body, which avoids a 400 response due to an invalid request. # The protobuf string types can default to "" just fine. client.dataset_service_create_dataset( From e32475607198448ceda7ea65962044e808c7d5f5 Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 18 Feb 2026 13:03:06 +0000 Subject: [PATCH 10/15] chore: Add debug logging temporarily to investigate issue --- src/litdata/streaming/reader.py | 13 +++++++++++-- tests/streaming/test_lock_cleanup.py | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 4802c935..6a2b7b23 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -146,6 +146,7 @@ def _decrement_local_lock(self, chunk_index: int) -> int: def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: """Inform the item loader of the chunk to delete.""" + logger.debug(f"_apply_delete({chunk_index}, skip_lock={skip_lock}) called") # TODO: Fix the can_delete method can_delete_chunk = self._config.can_delete(chunk_index) chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)] @@ -167,9 +168,17 @@ def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: base_prefix = os.path.splitext(base_name)[0] cache_dir = os.path.dirname(chunk_filepath) pattern = os.path.join(cache_dir, f"{base_prefix}*.lock") - for lock_path in glob.glob(pattern): - with suppress(FileNotFoundError, PermissionError): + matched_locks = glob.glob(pattern) + if matched_locks: + logger.debug(f"_apply_delete({chunk_index}): glob matched {matched_locks}") + for lock_path in matched_locks: + try: os.remove(lock_path) + logger.debug(f"_apply_delete({chunk_index}): removed {lock_path}") + except (FileNotFoundError, PermissionError) as e: + logger.warning(f"_apply_delete({chunk_index}): failed to remove {lock_path}: {e}") + except Exception as e: + logger.warning(f"_apply_delete({chunk_index}): unexpected error removing {lock_path}: {e}") def stop(self) -> None: """Receive the list of the chunk indices to download for the current epoch.""" diff --git a/tests/streaming/test_lock_cleanup.py b/tests/streaming/test_lock_cleanup.py index c9deb5e1..28ef5729 100644 --- a/tests/streaming/test_lock_cleanup.py +++ b/tests/streaming/test_lock_cleanup.py @@ -1,3 +1,4 @@ +import logging import os import shutil from contextlib import suppress @@ -40,7 +41,10 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None: # t @pytest.mark.skipif(not _ZSTD_AVAILABLE, reason="Requires: ['zstd']") -def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir): +def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir, caplog): + # Enable debug logging so _apply_delete diagnostics appear in CI output + caplog.set_level(logging.DEBUG, logger="litdata.streaming.reader") + cache_dir = os.path.join(tmpdir, "cache_dir") remote_dir = os.path.join(tmpdir, "remote_dir") os.makedirs(cache_dir, exist_ok=True) @@ -74,8 +78,15 @@ def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir): chunk_idx = ChunkedIndex(index=idx[0], chunk_index=idx[1], is_last_index=(i == 9)) reader.read(chunk_idx) + # Diagnostic: dump all files and captured logs before asserting + all_files = sorted(os.listdir(cache_dir)) + print(f"\n[DIAG] All files in cache_dir: {all_files}") + print(f"[DIAG] Captured log messages:") + for record in caplog.records: + print(f" [{record.levelname}] {record.message}") + # At the end, no chunk-related lock files should remain - leftover_locks = [f for f in os.listdir(cache_dir) if f.endswith(".lock") and f.startswith("chunk-")] - assert leftover_locks == [] + leftover_locks = [f for f in all_files if f.endswith(".lock") and f.startswith("chunk-")] + assert leftover_locks == [], f"Leftover locks: {leftover_locks}, all files: {all_files}" finally: unregister_downloader(prefix) From 156d88da6cb1d676f568fd36ef398b4d0d6f9ff8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 18 Feb 2026 13:03:54 +0000 Subject: [PATCH 11/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/streaming/test_lock_cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/streaming/test_lock_cleanup.py b/tests/streaming/test_lock_cleanup.py index 28ef5729..bfce226f 100644 --- a/tests/streaming/test_lock_cleanup.py +++ b/tests/streaming/test_lock_cleanup.py @@ -81,7 +81,7 @@ def test_reader_lock_cleanup_with_nonlocal_like_downloader(tmpdir, caplog): # Diagnostic: dump all files and captured logs before asserting all_files = sorted(os.listdir(cache_dir)) print(f"\n[DIAG] All files in cache_dir: {all_files}") - print(f"[DIAG] Captured log messages:") + print("[DIAG] Captured log messages:") for record in caplog.records: print(f" [{record.levelname}] {record.message}") From 1d3f27baad1a65248f1924d692a6deebd07423a3 Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 18 Feb 2026 13:34:51 +0000 Subject: [PATCH 12/15] fix: Clean up download lock files even when chunk refcount is nonzero When _apply_delete returns early due to remaining_locks > 0 (a race between decrement and delete on the last chunk), download lock files like chunk-0-3.zstd.bin.lock were left behind. Extract lock cleanup into _cleanup_download_locks (excluding .cnt.lock files still needed for refcounting) and call it in both code paths. Fixes leftover chunk-*.zstd.bin.lock files on Windows CI. --- src/litdata/streaming/reader.py | 46 +++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 6a2b7b23..8a422bd4 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -144,6 +144,31 @@ def _decrement_local_lock(self, chunk_index: int) -> int: os.remove(lock_path) return curr_count + def _cleanup_download_locks(self, chunk_filepath: str, chunk_index: int) -> None: + """Remove stale download lock files for a chunk. + + Download lock files (e.g. ``chunk-0-3.zstd.bin.lock``) are FileLock artifacts created + during download. They are safe to remove once the chunk exists locally, regardless of + the refcount held in ``.cnt`` files. Reference-count lock files (``.cnt.lock``) are + excluded because they may still be needed by concurrent refcount operations. + + """ + base_name = os.path.basename(chunk_filepath) + base_prefix = os.path.splitext(base_name)[0] + cache_dir = os.path.dirname(chunk_filepath) + pattern = os.path.join(cache_dir, f"{base_prefix}*.lock") + matched_locks = [p for p in glob.glob(pattern) if not p.endswith(".cnt.lock")] + if matched_locks: + logger.debug(f"_apply_delete({chunk_index}): glob matched {matched_locks}") + for lock_path in matched_locks: + try: + os.remove(lock_path) + logger.debug(f"_apply_delete({chunk_index}): removed {lock_path}") + except (FileNotFoundError, PermissionError) as e: + logger.warning(f"_apply_delete({chunk_index}): failed to remove {lock_path}: {e}") + except Exception as e: + logger.warning(f"_apply_delete({chunk_index}): unexpected error removing {lock_path}: {e}") + def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: """Inform the item loader of the chunk to delete.""" logger.debug(f"_apply_delete({chunk_index}, skip_lock={skip_lock}) called") @@ -154,8 +179,12 @@ def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: if not skip_lock: remaining_locks = self._remaining_locks(chunk_filepath) if remaining_locks > 0: # Can't delete this, something has it + logger.debug( + f"_apply_delete({chunk_index}): skipping data deletion, remaining_locks={remaining_locks}" + ) if _DEBUG: print(f"Skip delete {chunk_filepath} by {self._rank or 0}, current lock count: {remaining_locks}") + self._cleanup_download_locks(chunk_filepath, chunk_index) return if _DEBUG: @@ -163,22 +192,7 @@ def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: tombstone_file.write(f"Deleted {chunk_filepath} by {self._rank or 0}. Debug: {can_delete_chunk}") self._item_loader.delete(chunk_index, chunk_filepath) - - base_name = os.path.basename(chunk_filepath) - base_prefix = os.path.splitext(base_name)[0] - cache_dir = os.path.dirname(chunk_filepath) - pattern = os.path.join(cache_dir, f"{base_prefix}*.lock") - matched_locks = glob.glob(pattern) - if matched_locks: - logger.debug(f"_apply_delete({chunk_index}): glob matched {matched_locks}") - for lock_path in matched_locks: - try: - os.remove(lock_path) - logger.debug(f"_apply_delete({chunk_index}): removed {lock_path}") - except (FileNotFoundError, PermissionError) as e: - logger.warning(f"_apply_delete({chunk_index}): failed to remove {lock_path}: {e}") - except Exception as e: - logger.warning(f"_apply_delete({chunk_index}): unexpected error removing {lock_path}: {e}") + self._cleanup_download_locks(chunk_filepath, chunk_index) def stop(self) -> None: """Receive the list of the chunk indices to download for the current epoch.""" From 3b9c1b9d3a30d5710a2ddfcd6a8ea4d1621d7402 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 18 Feb 2026 13:36:49 +0000 Subject: [PATCH 13/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/litdata/streaming/reader.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 8a422bd4..51bce6e7 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -179,9 +179,7 @@ def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: if not skip_lock: remaining_locks = self._remaining_locks(chunk_filepath) if remaining_locks > 0: # Can't delete this, something has it - logger.debug( - f"_apply_delete({chunk_index}): skipping data deletion, remaining_locks={remaining_locks}" - ) + logger.debug(f"_apply_delete({chunk_index}): skipping data deletion, remaining_locks={remaining_locks}") if _DEBUG: print(f"Skip delete {chunk_filepath} by {self._rank or 0}, current lock count: {remaining_locks}") self._cleanup_download_locks(chunk_filepath, chunk_index) From 8d5001d0278d63acfcb331d5b617d101b68406da Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 18 Feb 2026 16:26:39 +0000 Subject: [PATCH 14/15] fix: Close last chunk's file handle before requesting its deletion PyTreeLoader._open_handle is only closed when a subsequent chunk is loaded, which never happens for the final chunk. On Windows this causes os.remove to fail with PermissionError in the prepare thread. Move the item_loader.close() call before the delete request so the file handle is released and the data file can actually be removed. --- src/litdata/streaming/reader.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 51bce6e7..ab2ea856 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -189,7 +189,11 @@ def _apply_delete(self, chunk_index: int, skip_lock: bool = False) -> None: with open(chunk_filepath + ".tmb", "w+") as tombstone_file: tombstone_file.write(f"Deleted {chunk_filepath} by {self._rank or 0}. Debug: {can_delete_chunk}") - self._item_loader.delete(chunk_index, chunk_filepath) + try: + self._item_loader.delete(chunk_index, chunk_filepath) + except (FileNotFoundError, PermissionError) as e: + logger.debug(f"_apply_delete({chunk_index}): could not remove data file: {e}") + self._cleanup_download_locks(chunk_filepath, chunk_index) def stop(self) -> None: @@ -491,6 +495,10 @@ def read(self, index: ChunkedIndex) -> Any: self._last_chunk_size = index.chunk_size if index.is_last_index and self._prepare_thread: + # Close the item loader's handle on the last chunk before requesting + # deletion. On Windows, os.remove fails if the file is still open. + self._item_loader.close(self._last_chunk_index) + # inform the thread it is time to stop self._prepare_thread._decrement_local_lock(index.chunk_index) self._prepare_thread.delete([index.chunk_index]) @@ -504,7 +512,6 @@ def read(self, index: ChunkedIndex) -> Any: "This can happen if the chunk files are too large." ) self._prepare_thread = None - self._item_loader.close(self._last_chunk_index) self._last_chunk_index = None self._last_chunk_size = None self._chunks_queued_for_download = False From 5f9304a2d886b0981266a106274007f8d328ca69 Mon Sep 17 00:00:00 2001 From: David Edey Date: Wed, 18 Feb 2026 16:31:15 +0000 Subject: [PATCH 15/15] tweak: Remove pin of filelock --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 9146aa30..b2c75416 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ torch torchvision lightning-utilities -filelock <3.24 # v3.24.0 removed lock file auto-delete on Windows, breaking our cleanup logic +filelock numpy boto3 requests