|
| 1 | +import functools |
1 | 2 | import hashlib |
2 | 3 | import os |
3 | 4 | import re |
| 5 | +import shutil |
| 6 | +import subprocess # nosec |
| 7 | +import sys |
4 | 8 | import warnings |
5 | 9 | from pathlib import Path |
| 10 | +from typing import Callable, Iterator, List, Set, Tuple, Union |
| 11 | + |
| 12 | +Filepath = Union[str, os.PathLike] |
6 | 13 |
|
7 | 14 |
|
8 | 15 | # This function was created to ensure the same conversion is used throughout |
@@ -41,22 +48,128 @@ def is_in_dir(child: Path, parent: Path, strict: bool = False) -> bool: |
41 | 48 | return False |
42 | 49 |
|
43 | 50 |
|
44 | | -def link_tree(src: Path, dest: Path) -> None: |
| 51 | +def _run_command(*args): |
| 52 | + """Run an external command and return the output""" |
| 53 | + result = subprocess.run(args, # nosec |
| 54 | + stdout=subprocess.PIPE, |
| 55 | + # Encoding to output as a string. |
| 56 | + encoding=sys.getdefaultencoding(), |
| 57 | + check=True) |
| 58 | + return result.stdout |
| 59 | + |
| 60 | + |
| 61 | +def git_root(path: Filepath) -> str: |
| 62 | + output = _run_command( |
| 63 | + "git", "-C", os.fspath(path), "rev-parse", "--show-toplevel") |
| 64 | + return output.strip() # Remove trailing newline |
| 65 | + |
| 66 | + |
| 67 | +def git_ls_files(path: Filepath) -> List[str]: |
| 68 | + output = _run_command("git", "-C", os.fspath(path), "ls-files", |
| 69 | + # Make sure submodules are included. |
| 70 | + "--recurse-submodules") |
| 71 | + # Remove trailing newlines and split to output all the paths |
| 72 | + return output.strip("\n").split("\n") |
| 73 | + |
| 74 | + |
| 75 | +def _duplicate_tree(src: Filepath, dest: Filepath |
| 76 | + ) -> Iterator[Tuple[str, str, bool]]: |
| 77 | + """Traverses src and for each file or directory yields a path to it, |
| 78 | + its destination, and whether it is a directory.""" |
| 79 | + for entry in os.scandir(src): # type: os.DirEntry |
| 80 | + if entry.is_dir(): |
| 81 | + dir_src = entry.path |
| 82 | + dir_dest = os.path.join(dest, entry.name) |
| 83 | + yield dir_src, dir_dest, True |
| 84 | + yield from _duplicate_tree(dir_src, dir_dest) |
| 85 | + elif entry.is_file() or entry.is_symlink(): |
| 86 | + yield entry.path, os.path.join(dest, entry.name), False |
| 87 | + else: |
| 88 | + warnings.warn(f"Unsupported filetype for copying. " |
| 89 | + f"Skipping {entry.path}") |
| 90 | + |
| 91 | + |
| 92 | +def _duplicate_git_tree(src: Filepath, dest: Filepath |
| 93 | + ) -> Iterator[Tuple[str, str, bool]]: |
| 94 | + """Traverses src, finds all files registered in git and for each file or |
| 95 | + directory yields a path to it, its destination and whether it is a |
| 96 | + directory""" |
| 97 | + # A set of dirs we have already yielded. '' is the output of |
| 98 | + # os.path.dirname when the path is in the current directory. |
| 99 | + yielded_dirs: Set[str] = {''} |
| 100 | + for path in git_ls_files(src): |
| 101 | + # git ls-files does not list directories. Yield parent first to prevent |
| 102 | + # creating files in non-existing directories. Also check if it is |
| 103 | + # yielded before so each directory is only yielded once. |
| 104 | + parent = os.path.dirname(path) |
| 105 | + if parent not in yielded_dirs: |
| 106 | + # This maybe a nested directory, with non-existing parents itself. |
| 107 | + # Therefore: |
| 108 | + # - List parents from deepest to least deep by using os.path.dirname # noqa: E501 |
| 109 | + # - Reverse the list to yield directories from least deep to deepest # noqa: E501 |
| 110 | + # This ensures parents are always yielded before children. |
| 111 | + parents = [] |
| 112 | + while parent not in yielded_dirs: |
| 113 | + yielded_dirs.add(parent) |
| 114 | + parents.append(parent) |
| 115 | + parent = os.path.dirname(parent) |
| 116 | + |
| 117 | + for parent in reversed(parents): |
| 118 | + src_parent = os.path.join(src, parent) |
| 119 | + dest_parent = os.path.join(dest, parent) |
| 120 | + yield src_parent, dest_parent, True |
| 121 | + |
| 122 | + # Yield the actual file if the directory has already been yielded. |
| 123 | + src_path = os.path.join(src, path) |
| 124 | + dest_path = os.path.join(dest, path) |
| 125 | + yield src_path, dest_path, False |
| 126 | + |
| 127 | + |
| 128 | +def duplicate_tree(src: Filepath, dest: Filepath, |
| 129 | + symlink: bool = False, |
| 130 | + git_aware: bool = False): |
| 131 | + """ |
| 132 | + Duplicates a filetree |
| 133 | + :param src: The source directory |
| 134 | + :param dest: The destination directory |
| 135 | + :param symlink: Create symlinks nstead of copying the files. |
| 136 | + :param git_aware: Only copy/symlink files registered by git. |
| 137 | + """ |
| 138 | + if not symlink and not git_aware: |
| 139 | + shutil.copytree(src, dest) |
| 140 | + return |
| 141 | + |
| 142 | + if not os.path.isdir(src): |
| 143 | + # shutil.copytree also throws a NotADirectoryError |
| 144 | + raise NotADirectoryError(f"Not a directory: '{src}'") |
| 145 | + |
| 146 | + if git_aware: |
| 147 | + path_iter = _duplicate_git_tree(src, dest) |
| 148 | + else: |
| 149 | + path_iter = _duplicate_tree(src, dest) |
| 150 | + if symlink: |
| 151 | + copy: Callable[[Filepath, Filepath], None] = \ |
| 152 | + functools.partial(os.symlink, target_is_directory=False) |
| 153 | + else: |
| 154 | + copy = shutil.copy2 # Preserves metadata, also used by shutil.copytree |
| 155 | + |
| 156 | + os.makedirs(dest, exist_ok=False) |
| 157 | + for src_path, dest_path, is_dir in path_iter: |
| 158 | + if is_dir: |
| 159 | + os.mkdir(dest_path) |
| 160 | + else: |
| 161 | + copy(src_path, dest_path) |
| 162 | + |
| 163 | + |
| 164 | +def link_tree(src: Filepath, dest: Filepath) -> None: |
45 | 165 | """ |
46 | 166 | Copies a tree by mimicking the directory structure and soft-linking the |
47 | 167 | files |
48 | 168 | :param src: The source directory |
49 | 169 | :param dest: The destination directory |
50 | 170 | """ |
51 | | - if src.is_dir(): |
52 | | - dest.mkdir(parents=True) |
53 | | - for path in os.listdir(str(src)): |
54 | | - link_tree(Path(src, path), Path(dest, path)) |
55 | | - elif src.is_file() or src.is_symlink(): |
56 | | - dest.symlink_to(src, target_is_directory=False) |
57 | | - else: # Only copy files and symlinks, no devices etc. |
58 | | - warnings.warn(f"Unsupported filetype. Skipping copying: '{str(src)}' " |
59 | | - f"to '{str(dest)}'.") |
| 171 | + # THIS FUNCTION IS KEPT FOR BACKWARDS-COMPATIBILITY |
| 172 | + duplicate_tree(src, dest, symlink=True) |
60 | 173 |
|
61 | 174 |
|
62 | 175 | # block_size 64k with python is a few percent faster than linux native md5sum. |
|
0 commit comments