Mercurial > hgrepos > Python > apps > py-cutils
comparison cutils/util/walk.py @ 121:2dc26a2f3d1c
A heavily customized "os.walk()" alike to support the coming treeview implementation
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Wed, 01 Jan 2025 17:52:41 +0100 |
| parents | |
| children | 4a0c3c9eead7 |
comparison
equal
deleted
inserted
replaced
| 120:a548783381b6 | 121:2dc26a2f3d1c |
|---|---|
| 1 # -*- coding: utf-8 -*- | |
| 2 # :- | |
| 3 # :Copyright: (c) 2020-2025 Franz Glasner | |
| 4 # :License: BSD-3-Clause | |
| 5 # :- | |
| 6 r"""Utility sub-module to implement a heavily customized :func:`os.walk`. | |
| 7 | |
| 8 """ | |
| 9 | |
| 10 __all__ = ["walk"] | |
| 11 | |
| 12 | |
| 13 import os | |
| 14 try: | |
| 15 from os import scandir | |
| 16 except ImportError: | |
| 17 try: | |
| 18 from scandir import scandir | |
| 19 except ImportError: | |
| 20 scandir = None | |
| 21 from contextlib import closing | |
| 22 | |
| 23 from .cm import nullcontext | |
| 24 from .constants import PY2 | |
| 25 | |
| 26 | |
| 27 class WalkDirEntry(object): | |
| 28 | |
| 29 """A :class:`os.DirEntry` alike to be used in :func:`walk` and for | |
| 30 its results. | |
| 31 | |
| 32 """ | |
| 33 | |
| 34 __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink", | |
| 35 "_is_dir", "_stat_result") | |
| 36 | |
| 37 def __init__(self, name): | |
| 38 self._name = name | |
| 39 if PY2: | |
| 40 assert isinstance(name, bytes) | |
| 41 self._fsname = name | |
| 42 else: | |
| 43 self._fsname = os.fsencode(name) | |
| 44 self._path = None | |
| 45 self._fspath = None | |
| 46 self._is_symlink = self._is_dir = self._stat_result = None | |
| 47 | |
| 48 @property | |
| 49 def name(self): | |
| 50 return self._name | |
| 51 | |
| 52 @property | |
| 53 def fsname(self): | |
| 54 return self._fsname | |
| 55 | |
| 56 @property | |
| 57 def path(self): | |
| 58 return self._path | |
| 59 | |
| 60 @property | |
| 61 def fspath(self): | |
| 62 if self._path is not None: | |
| 63 if self._fspath is None: | |
| 64 if PY2: | |
| 65 assert isinstance(self._path, bytes) | |
| 66 self._fspath = self._path | |
| 67 else: | |
| 68 self._fspath = os.fsencode(self._path) | |
| 69 return self._fspath | |
| 70 | |
| 71 @property | |
| 72 def is_symlink(self): | |
| 73 return self._is_symlink | |
| 74 | |
| 75 @property | |
| 76 def is_dir(self): | |
| 77 return self._is_dir | |
| 78 | |
| 79 @property | |
| 80 def stat(self): | |
| 81 return self._stat_result | |
| 82 | |
| 83 def __repr__(self): | |
| 84 tag = "" | |
| 85 if self._is_symlink: | |
| 86 tag += "l" | |
| 87 if self._is_dir: | |
| 88 tag += "d" | |
| 89 if tag: | |
| 90 return "<WalkDirEntry %r (%s)>" % (self._name, tag) | |
| 91 return "<WalkDirEntry %r>" % (self._name,) | |
| 92 | |
| 93 @classmethod | |
| 94 def from_direntry(cls_, entry): | |
| 95 w = cls_(entry.name) | |
| 96 w._path = entry.path | |
| 97 try: | |
| 98 w._is_dir = entry.is_dir(follow_symlinks=True) | |
| 99 except OSError: | |
| 100 # | |
| 101 # If is_dir() raises an OSError, consider that the entry | |
| 102 # is not a directory, same behaviour than os.path.isdir(). | |
| 103 # | |
| 104 w._is_dir = False | |
| 105 try: | |
| 106 w._is_symlink = entry.is_symlink() | |
| 107 except OSError: | |
| 108 # | |
| 109 # If is_symlink() raises an OSError, consider that the entry | |
| 110 # is not a symbolic link, same behaviour than os.path.islink(). | |
| 111 # | |
| 112 w._is_symlink = False | |
| 113 if not w._is_dir: | |
| 114 # Do not supress errors here and (consistently) follow symlinks | |
| 115 w._stat_result = entry.stat(follow_symlinks=True) | |
| 116 return w | |
| 117 | |
| 118 @staticmethod | |
| 119 def sort_key(entry): | |
| 120 return entry._fsname | |
| 121 | |
| 122 | |
| 123 if scandir: | |
| 124 | |
| 125 def walk(root, follow_symlinks=False): | |
| 126 """A heyvily customized :func:`os.walk` alike that differs from the | |
| 127 original: | |
| 128 | |
| 129 - optimized for use in :command:`treesum` | |
| 130 - most errors are not suppressed | |
| 131 - the `root` is never part of the returned data | |
| 132 - the returned directory in "top" is not a string form but a list of | |
| 133 individual path segments | |
| 134 - all other yielded lists contain WalkDirEntry elements instead of | |
| 135 strings | |
| 136 - recurse into sub-directories first ("topdown=False") | |
| 137 - sort consistently all yielded lists by the filesystem encoding | |
| 138 | |
| 139 .. note:: The implementation is based on Python 3.11 and needs a | |
| 140 functional :func:`os.scandir` or :func:`scandir.scandir` | |
| 141 implementation. It intentionally follows the logic in | |
| 142 Python 3.11 while it could be simplified because we are not | |
| 143 implementing some of the original flags (e.g. like | |
| 144 `topdown`). | |
| 145 | |
| 146 """ | |
| 147 normed_root = os.path.normpath(root) | |
| 148 yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks) | |
| 149 | |
| 150 | |
| 151 def _walk(root, top, follow_symlinks): # noqa: E303 too many empty lines | |
| 152 """:func:`walk` helper""" | |
| 153 if top: | |
| 154 path = os.path.join(root, *top) | |
| 155 else: | |
| 156 path = root | |
| 157 | |
| 158 dirs, nondirs, walk_dirs = [], [], [] | |
| 159 | |
| 160 scandir_it = scandir(path) | |
| 161 if hasattr(scandir_it, "close"): | |
| 162 scandir_ctx = closing(scandir_it) | |
| 163 else: | |
| 164 scandir_ctx = nullcontext(scandir_it) | |
| 165 with scandir_ctx as scandir_it: | |
| 166 while True: | |
| 167 try: | |
| 168 entry = WalkDirEntry.from_direntry(next(scandir_it)) | |
| 169 except StopIteration: | |
| 170 break | |
| 171 if entry.is_dir: | |
| 172 dirs.append(entry) | |
| 173 else: | |
| 174 nondirs.append(entry) | |
| 175 # | |
| 176 # Always bottom-up: recurse into sub-directories, but exclude | |
| 177 # symlinks to directories if follow_symlinks is False | |
| 178 # | |
| 179 if entry.is_dir: | |
| 180 if follow_symlinks: | |
| 181 walk_into = True | |
| 182 else: | |
| 183 walk_into = not entry.is_symlink | |
| 184 if walk_into: | |
| 185 walk_dirs.append(entry) | |
| 186 | |
| 187 # Sort by low-level filesystem encoding | |
| 188 walk_dirs.sort(key=WalkDirEntry.sort_key) | |
| 189 dirs.sort(key=WalkDirEntry.sort_key) | |
| 190 nondirs.sort(key=WalkDirEntry.sort_key) | |
| 191 | |
| 192 # Recurse into sub-directories | |
| 193 for wd in walk_dirs: | |
| 194 yield from _walk(root, top + (wd.name,), follow_symlinks) | |
| 195 # Yield after recursion if going bottom up | |
| 196 yield top, dirs, nondirs | |
| 197 | |
| 198 else: | |
| 199 | |
| 200 raise ImportError("no `scandir()' module available") |
