diff --git a/base_object/exceptions.py b/base_object/exceptions.py index 4f74dc5..77268b4 100644 --- a/base_object/exceptions.py +++ b/base_object/exceptions.py @@ -30,3 +30,12 @@ class NoTagsDynamicError(ValueError, AttributeError): ----- Follows convention used in scikit-learn when creating custom errors. """ + + +class InvalidTagError(ValueError): + """Exception class raised if an object's tag is invalid. + + Invalid tags are tag names that are not allowed or allowed tags that have + values that are not allowed based on the object's tag registry + (e.g., tag value is of wrong type or not one of allowable values). + """ diff --git a/base_object/registry/__init__.py b/base_object/registry/__init__.py new file mode 100644 index 0000000..0fcdf7e --- /dev/null +++ b/base_object/registry/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +""":mod:`base_object.registry` provides a registry of package functionality.""" +from typing import List + +from base_object.registry._lookup import all_objects, package_metadata + +__all__: List[str] = ["package_metadata", "all_objects"] +__author__: List[str] = ["RNKuhns"] diff --git a/base_object/registry/_lookup.py b/base_object/registry/_lookup.py new file mode 100644 index 0000000..c07a898 --- /dev/null +++ b/base_object/registry/_lookup.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- +"""Lookup package metadata.""" +import importlib +import inspect +import pkgutil +import sys +from pathlib import Path +from types import FunctionType, ModuleType +from typing import Any, Dict, List, Optional, Tuple, Type, Union + +from base_object.base import BaseObject + +# Conditionally import TypedDict based on Python version +if sys.version_info >= (3, 9): + from typing import TypedDict +else: + from typing_extensions import TypedDict + +__all__: List[str] = ["package_metadata", "all_objects"] +__author__: List[str] = ["RNKuhns"] + +# TOP_LEVEL_PACKAGE_NAME: str = "base_object" +SOURCE_ROOT: str = str(Path(__file__).parent.parent) +# MODULES_TO_IGNORE: Tuple[str, ...] = ("tests", "examples") +# BASE_CLASSES: Tuple[Type[BaseObject], ...] = (BaseObject,) + + +class ClassInfo(TypedDict): + """Type definitions for information on a module's classes.""" + + klass: Type + name: str + description: str + is_concrete_implementation: bool + is_base_class: bool + is_base_object: bool + authors: Optional[Union[List[str], str]] + module_name: str + + +class FunctionInfo(TypedDict): + """Type definitions for information on a module's functions.""" + + func: FunctionType + name: str + description: str + module_name: str + + +class ModuleInfo(TypedDict): + """Module information type definitions.""" + + path: str + name: str + classes: Dict[str, ClassInfo] + functions: Dict[str, FunctionInfo] + __all__: List[str] + authors: str + is_package: bool + contains_concrete_class_implementations: bool + contains_base_classes: bool + contains_base_objects: bool + + +def _is_non_public_module(module_name: str) -> bool: + """Determine if a module is non-public or not. + + Parameters + ---------- + module_name : str + Name of the module. + + Returns + ------- + is_non_public : bool + Whether the module is non-public or not. + """ + is_non_public: bool = "._" in module_name + return is_non_public + + +def _is_ignored_module( + module_name: str, modules_to_ignore: Union[List[str], Tuple[str]] = None +) -> bool: + """Determine if module is one of the ignored modules. + + Paramters + --------- + module_name : str + Name of the module. + modules_to_ignore : list[str] or tuple[str] + The modules that should be ignored when walking the package. + + Returns + ------- + is_ignored : bool + Whether the module is an ignrored module or not. + """ + is_ignored: bool + if modules_to_ignore is not None: + is_ignored = any(part in modules_to_ignore for part in module_name.split(".")) + else: + is_ignored = False + return is_ignored + + +def package_metadata( + path: str = SOURCE_ROOT, + top_level_package_name: Optional[str] = None, + recursive: bool = True, + prefix: str = "", + exclude_nonpublic_modules: bool = True, + modules_to_ignore: Union[List[str], Tuple[str]] = ("tests",), + package_base_classes: Tuple[Union[type, Tuple[Any, ...]]] = (BaseObject,), +) -> Dict[str, ModuleInfo]: + """Return a dictionary mapping all package modules to their metadata. + + Parameters + ---------- + path : str, default=None + String path that should be used as root to find any modules or submodules. + recursive : bool, default=True + Whether to recursively walk through submodules. + + - If True, then submoudles of submodules and so on are found. + - If False, then only first-level submoundes of `package` are found. + prefix : str, default="" + The prefix to use when returning module names on the `path`. + exclude_non_public_modules : bool, default=True + Whether to exclude nonpublic modules (modules where names start with + a leading underscore). + modules_to_ignore : list[str] or tuple[str], default=() + The modules that should be ignored when walking the package. + + Returns + ------- + module_info: dict + Dictionary mapping string submodule name (key) to a dictionary of the + submodules metadata. + """ + if not isinstance(path, str): + raise ValueError("Provide parameter `path` as a string .") + + module_info: Dict[str, ModuleInfo] = {} + for _loader, name, is_pkg in pkgutil.walk_packages(path=[path], prefix=prefix): + # Used to skip-over ignored modules and non-public modules + if _is_ignored_module(name, modules_to_ignore=modules_to_ignore) or ( + exclude_nonpublic_modules and _is_non_public_module(name) + ): + continue + + if isinstance(top_level_package_name, str): + full_name: str = top_level_package_name + "." + name + else: + full_name = name + + try: + module: ModuleType = importlib.import_module(full_name) + designed_imports: List[str] = getattr(module, "__all__", []) + authors: Union[str, List[str]] = getattr(module, "__author__", []) + if isinstance(authors, (list, tuple)): + authors = ", ".join(authors) + # Compile information on classes in the module + module_classes: Dict[str, ClassInfo] = {} + for name, klass in inspect.getmembers(module, inspect.isclass): + klass_authors = getattr(klass, "__author__", authors) + if isinstance(klass_authors, (list, tuple)): + klass_authors = ", ".join(klass_authors) + if klass.__module__ == module.__name__ or name in designed_imports: + module_classes[name] = { + "klass": klass, + "name": klass.__name__, + "description": klass.__doc__.split("\n")[0], + "is_concrete_implementation": ( + issubclass(klass, package_base_classes) + and klass not in package_base_classes + ), + "is_base_class": klass in package_base_classes, + "is_base_object": issubclass(klass, BaseObject), + "authors": klass_authors, + "module_name": module.__name__, + } + + module_functions: Dict[str, FunctionInfo] = {} + for name, func in inspect.getmembers(module, inspect.isfunction): + if func.__module__ == module.__name__ or name in designed_imports: + module_functions[name] = { + "func": func, + "name": func.__name__, + "description": func.__doc__.split("\n")[0], + "module_name": module.__name__, + } + + # Combine all the information on the module together + module_info[full_name] = { + "path": getattr(_loader, "path", ""), + "name": module.__name__, + "classes": module_classes, + "functions": module_functions, + "__all__": designed_imports, + "authors": authors, + "is_package": is_pkg, + "contains_concrete_class_implementations": False, + "contains_base_classes": any( + v["is_base_class"] for v in module_classes.values() + ), + "contains_base_objects": any( + v["is_base_object"] for v in module_classes.values() + ), + } + + except ImportError: + continue + + if recursive and is_pkg: + name_ending: str = name.split(".")[1] if "." in name else name + updated_path: str = "\\".join([path, name_ending]) + module_info.update(package_metadata(path=updated_path, prefix=name + ".")) + + return module_info + + +def all_objects( + path: Optional[str] = None, prefix: str = "", filter_class: object = None +) -> List[ClassInfo]: + """Find all classes inheritting from BaseObject. + + Returns + ------- + base_objects : dict + """ + if path is not None and not isinstance(path, str): + raise ValueError("Provide parameter `path` as a string .") + if not isinstance(prefix, str): + raise ValueError("Provide parameter `prefix` as a string.") + + registry: Dict[str, ModuleInfo] = package_metadata(path=path, prefix=prefix) + + # Filter registry to only include classes that contain BaseObjects + source_mods = { + k: v for k, v in registry.items() if v["contains_base_objects"] is True + } + + # Now filter the classes in the filter modules to just retain those + # classes inheritting from BaseObject + base_objects: List[ClassInfo] = [] + for mod_info in source_mods.values(): + for class_info in mod_info["classes"].values(): + issubclass(class_info["klass"], BaseObject) + if class_info["is_base_object"]: + if filter_class is not None and ( + issubclass(class_info["klass"], filter_class) + ): + base_objects.append(class_info) + + return base_objects diff --git a/base_object/registry/_tag_details.py b/base_object/registry/_tag_details.py new file mode 100644 index 0000000..9466530 --- /dev/null +++ b/base_object/registry/_tag_details.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +""":mod:`base_object.registry` provides a registry of package functionality.""" +from collections.abc import Iterable +from typing import Any, Dict, List, TypeVar + +try: + from typing import TypedDict +except ImportError: + from typing_extensions import TypedDict # type: ignore + +from base_object.base import BaseObject +from base_object.exceptions import InvalidTagError + +__all__: List[str] = [] +__author__: List[str] = ["RNKuhns"] + +T = TypeVar("T", bound="BaseObject") + + +class TagDetails(TypedDict): + """Defines types of included in tag registry details.""" + + allowed_types: Any + allowed_values: Any + can_be_dynamic: bool + + +ALLOWED_TAGS_BY_CLASS: Dict[T, Dict[str, TagDetails]] = { + BaseObject: { + "named_object_parameters": { + "allowed_types": Any, + "allowed_values": Any, + "can_be_dynamic": False, + }, + } +} + + +def check_tags_allowed( + obj: T, + allowed_tags: Dict[str, TagDetails], + dynamic: bool = False, + raise_error: bool = True, +) -> bool: + """Verify if an object's tags are in the allowed tags. + + The check first verifies that tag names are allowed. Then for allowed tag names, + it proceeds to check that the assigned tag value is an allowable type. If + this is true, then it verifies the tag value is in the allowable values. + + Parameters + ---------- + obj : BaseObject + Class or instance inheritting from BaseObject. + allowed_tags : Dict[str, TagDetails] + Dictionary mapping from a string tag name to a dictionary of tag details + that indicate the allowed tag types, allowed tag values and whether the + tag can be a dynamic tag. + dynamic : bool + Whether to check the object's tags or dynamic tags. + + - If True, check the object's dynamic tags. + - If False, check the object's static class tags. + + Returns + ------- + are_tags_allowed : bool + Whether the object's tags are in the registry of tags allowed for that object. + """ + if not ( + isinstance(obj, BaseObject) + or (hasattr(obj, "_tags") and hasattr(obj, "get_tags")) + ): + raise TypeError( + "Input `obj` is not compatible with the BaseObject tag interface." + f"Expected `obj` to be a BaseObject, but received {type(obj)}." + ) + + if dynamic: + obj_tags = obj.get_tags() + class_tags = obj.get_class_tags() + allowed_dynamic_tags = { + tag: details + for tag, details in allowed_tags.items() + if details["can_be_dynamic"] + } + # check that any tags that were dynamically updated are actually allowed + # to be dynamic tags + updated_tags = [tag for tag in obj_tags if obj_tags[tag] != class_tags[tag]] + tags_that_should_not_update = [ + tag for tag in updated_tags if tag not in allowed_dynamic_tags + ] + if len(tags_that_should_not_update) > 0: + _tags_str = "tag" if len(tags_that_should_not_update) == 1 else "tags" + tag_error_msg = ( + f"Found {len(tags_that_should_not_update)} invalid {_tags_str} in " + f"object {obj}.\n" + f"The invalid tags are:\n {''.join(tags_that_should_not_update)}" + ) + raise InvalidTagError(tag_error_msg) + + else: + obj_tags = obj.get_class_tags() + + invalid_tags: Dict[str, str] = {} + # Whether dynamic is True or False all tags that can't be dynamic need to be checked + for obj_tag, obj_tag_value in obj_tags.items(): + # Step 1: check if tag is in allowed tags + tag_allowed = obj_tag in allowed_tags + # Default type and values being not allowed to simplify check logic + tag_type_allowed: bool = False + tag_value_allowed: bool = False + # Step 2: If tag is allowed then check that tag value is an allowed type + if tag_allowed: + allowed_types = allowed_tags[obj_tag]["allowed_types"] + if allowed_types == Any: + tag_type_allowed = True + else: + # isinstance only accepts single type or tuple of types + if isinstance(allowed_types, Iterable): + allowed_types = tuple(allowed_types) + tag_type_allowed = isinstance(obj_tag_value, allowed_types) + + # Step 3: If tag value is allowed type see if it is one of allowed values + if tag_type_allowed: + allowed_values = allowed_tags[obj_tag]["allowed_values"] + if allowed_values == Any: + tag_value_allowed = True + else: + if isinstance(allowed_values, Iterable): + tag_value_allowed = obj_tag_value in allowed_values + else: + tag_value_allowed = obj_tag_value == allowed_values + + if not tag_value_allowed: + tag_msg = ( + f"Expected value of tag {obj_tag} to be one of allowed " + f"values {allowed_values}, but found {obj_tag_value}." + ) + else: + tag_msg = ( + f"Expected value of tag {obj_tag} to be one of the allowed types " + f"{allowed_types}, but the value was {obj_tag_value} with type " + f"{type(obj_tag_value)}." + ) + else: + tag_msg = ( + f"Found tag {obj_tag}, but the only allowable tags are " + f"{', '.join(allowed_tags.keys())}." + ) + + if not (tag_allowed and tag_type_allowed and tag_value_allowed): + invalid_tags[obj_tag] = tag_msg + + # If there are any invalid tags raise an error if user requested + if raise_error and invalid_tags: + _tags_str = "tag" if len(invalid_tags) == 1 else "tags" + invalid_tag_msgs = [ + tag + ": " + msg + "\n" for tag, msg in invalid_tags.items() + ] + tag_error_msg = ( + f"Found {len(invalid_tags)} invalid {_tags_str} in object {obj}.\n" + f"The invalid tags are:\n {''.join(invalid_tag_msgs)}" + ) + raise InvalidTagError(tag_error_msg) + else: + are_tags_allowed = len(invalid_tags) == 0 + return are_tags_allowed diff --git a/build_tools/linting_dependencies.yml b/build_tools/linting_dependencies.yml new file mode 100644 index 0000000..0ab4239 --- /dev/null +++ b/build_tools/linting_dependencies.yml @@ -0,0 +1,7 @@ +name: linting_dependencies + +dependencies: + - flake8 + - isort + - black + - pydocstyle diff --git a/build_tools/test_dependencies.yml b/build_tools/test_dependencies.yml new file mode 100644 index 0000000..0b4ec95 --- /dev/null +++ b/build_tools/test_dependencies.yml @@ -0,0 +1,6 @@ +name: test_dependencies + +dependencies: + - pytest + - coverage + - pytest-cov diff --git a/build_tools/test_env.yml b/build_tools/test_env.yml index c965ec6..ce45773 100644 --- a/build_tools/test_env.yml +++ b/build_tools/test_env.yml @@ -6,3 +6,4 @@ dependencies: - pytest-cov - numpy - safety + - typing_extensions diff --git a/pyproject.toml b/pyproject.toml index 54faac4..25f45d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", ] requires-python = ">=3.7,<3.11" -dependencies = [] +dependencies = ["typing-extensions"] [project.optional-dependencies] dev = [