Mercurial > hgrepos > Python > apps > py-cutils
comparison cutils/util/walk.py @ 201:58d93453c307
Much more encoding-related methods for DirWalkEntry and some unittests
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Tue, 21 Jan 2025 14:30:06 +0100 |
| parents | b2aba84ca426 |
| children | 3a85f7bbe0b1 |
comparison
equal
deleted
inserted
replaced
| 200:22f92bf3572c | 201:58d93453c307 |
|---|---|
| 8 """ | 8 """ |
| 9 | 9 |
| 10 from __future__ import print_function, absolute_import | 10 from __future__ import print_function, absolute_import |
| 11 | 11 |
| 12 | 12 |
| 13 __all__ = ["ScanDir", "getfsencoding"] | 13 __all__ = ["WalkDirEntry", "ScanDir", "getfsencoding"] |
| 14 | 14 |
| 15 | 15 |
| 16 import os | 16 import os |
| 17 try: | 17 try: |
| 18 from os import scandir | 18 from os import scandir |
| 24 import sys | 24 import sys |
| 25 | 25 |
| 26 from . import PY2 | 26 from . import PY2 |
| 27 | 27 |
| 28 | 28 |
| 29 _notset = object() | |
| 30 | |
| 31 | |
| 29 _FSENCODING = sys.getfilesystemencoding() | 32 _FSENCODING = sys.getfilesystemencoding() |
| 33 | |
| 34 | |
| 35 if PY2: | |
| 36 | |
| 37 def _unix_path(s): | |
| 38 if isinstance(s, bytes): | |
| 39 return s.replace(b"\\", b"/") | |
| 40 return s.replace(u"\\", u"/") | |
| 41 | |
| 42 else: | |
| 43 | |
| 44 def _unix_path(s): | |
| 45 return s.replace("\\", "/") | |
| 30 | 46 |
| 31 | 47 |
| 32 class WalkDirEntry(object): | 48 class WalkDirEntry(object): |
| 33 | 49 |
| 34 """A :class:`os.DirEntry` alike to be used in :func:`walk` and for | 50 """A :class:`os.DirEntry` alike to be used in :func:`walk` and for |
| 35 its results. | 51 its results. |
| 36 | 52 |
| 37 """ | 53 """ |
| 38 | 54 |
| 39 __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink", | 55 __slots__ = ("_name", "_path", # encoded as given in the ctor |
| 40 "_is_dir", "_stat_result") | 56 "_is_symlink", "_is_dir", "_stat_result", |
| 41 | 57 "_alt_fsname", "_alt_u8name") |
| 42 def __init__(self, name): | 58 |
| 43 self._name = name | 59 def __init__(self, name, path): |
| 44 if PY2: | 60 self._name = name # the name as given in the constructor |
| 45 assert isinstance(name, bytes) | 61 """The name exactly as given in the ctor""" |
| 46 self._fsname = name | 62 self._path = _unix_path(path) |
| 47 else: | 63 """The path as given in the ctor -- but normalized to have slashes""" |
| 48 self._name = name | |
| 49 self._fsname = os.fsencode(name) | |
| 50 self._path = None | |
| 51 self._fspath = None | |
| 52 self._is_symlink = self._is_dir = self._stat_result = None | 64 self._is_symlink = self._is_dir = self._stat_result = None |
| 65 self._alt_fsname = self._alt_u8name = _notset | |
| 53 | 66 |
| 54 @property | 67 @property |
| 55 def name(self): | 68 def name(self): |
| 56 """The native name""" | 69 """The original name exactly as given in the ctor""" |
| 57 return self._name | 70 return self._name |
| 58 | 71 |
| 59 @property | 72 @property |
| 73 def path(self): | |
| 74 """The original path exactly as given in the ctor.""" | |
| 75 return self._path | |
| 76 | |
| 77 @property | |
| 60 def fsname(self): | 78 def fsname(self): |
| 61 """The name as bytes""" | 79 """The name as bytes for the filesystem. |
| 62 return self._fsname | 80 |
| 63 | 81 :rtype: bytes or None |
| 64 @property | 82 |
| 65 def path(self): | 83 """ |
| 66 """Always native""" | 84 if PY2: |
| 67 return self._path | 85 if isinstance(self._name, bytes): |
| 86 return self._name | |
| 87 try: | |
| 88 return self._name.encode(_FSENCODING, "strict") | |
| 89 except UnicodeError: | |
| 90 return None | |
| 91 else: | |
| 92 return os.fsencode(self._name) | |
| 93 | |
| 94 @property | |
| 95 def alt_fsname(self): | |
| 96 """Alternative and "escaped" filesystem name -- always bytes. | |
| 97 | |
| 98 :rtype: bytes | |
| 99 | |
| 100 """ | |
| 101 if self._alt_fsname is _notset: | |
| 102 if PY2: | |
| 103 if isinstance(self._name, bytes): | |
| 104 self._alt_fsname = self._name | |
| 105 else: | |
| 106 self._alt_fsname = self._name.encode( | |
| 107 _FSENCODING, "backslashreplace") | |
| 108 else: | |
| 109 self._alt_fsname = os.fsencode(self._name) | |
| 110 return self._alt_fsname | |
| 68 | 111 |
| 69 @property | 112 @property |
| 70 def fspath(self): | 113 def fspath(self): |
| 71 """Always bytes""" | 114 """Always bytes. |
| 72 if self._path is not None: | 115 |
| 73 if self._fspath is None: | 116 :rtype: bytes or None |
| 74 if PY2: | 117 |
| 75 assert isinstance(self._path, bytes) | 118 """ |
| 76 self._fspath = self._path | 119 if PY2: |
| 120 if isinstance(self._path, bytes): | |
| 121 return self._path | |
| 122 try: | |
| 123 return self._path.encode(_FSENCODING, "strict") | |
| 124 except UnicodeError: | |
| 125 return None | |
| 126 else: | |
| 127 return os.fsencode(self._path) | |
| 128 | |
| 129 @property | |
| 130 def alt_fspath(self): | |
| 131 """Alternative and "escaped" filesystem path -- always bytes. | |
| 132 | |
| 133 :rtype: bytes | |
| 134 | |
| 135 """ | |
| 136 if PY2: | |
| 137 if isinstance(self._path, bytes): | |
| 138 return self._path | |
| 139 return self._path.encode(_FSENCODING, "backslashreplace") | |
| 140 else: | |
| 141 return os.fsencode(self._path) | |
| 142 | |
| 143 @property | |
| 144 def uname(self): | |
| 145 """Always "real", strictly encoded Unicode or `None` if this is not | |
| 146 possible. | |
| 147 | |
| 148 :rtype: text or None | |
| 149 | |
| 150 """ | |
| 151 if PY2: | |
| 152 if isinstance(self._name, bytes): | |
| 153 try: | |
| 154 return self._name.decode(_FSENCODING, "strict") | |
| 155 except UnicodeError: | |
| 156 return None | |
| 157 else: | |
| 158 return self._name | |
| 159 else: | |
| 160 try: | |
| 161 self._name.encode("utf-8", "strict") | |
| 162 except UnicodeError: | |
| 163 return None | |
| 164 return self._name | |
| 165 | |
| 166 @property | |
| 167 def upath(self): | |
| 168 """Always "real", strictly encoded Unicode or `None` if this is not | |
| 169 possible. | |
| 170 | |
| 171 :rtype: text or None | |
| 172 | |
| 173 """ | |
| 174 if PY2: | |
| 175 if isinstance(self._path, bytes): | |
| 176 try: | |
| 177 return self._path.decode(_FSENCODING, "strict") | |
| 178 except UnicodeError: | |
| 179 return None | |
| 180 else: | |
| 181 return self._path | |
| 182 else: | |
| 183 try: | |
| 184 self._path.encode("utf-8", "strict") | |
| 185 except UnicodeError: | |
| 186 return None | |
| 187 return self._path | |
| 188 | |
| 189 @property | |
| 190 def u8name(self): | |
| 191 """`.uname` as UTF-8 or `None` (as strict as `uname`)""" | |
| 192 n = self.uname | |
| 193 return n if n is None else n.encode("utf-8", "strict") | |
| 194 | |
| 195 @property | |
| 196 def u8path(self): | |
| 197 """`.upath` as UTF-8 or `None` (as strict as `upath`""" | |
| 198 p = self.upath | |
| 199 return p if p is None else p.encode("utf-8", "strict") | |
| 200 | |
| 201 @property | |
| 202 def alt_u8name(self): | |
| 203 if self._alt_u8name is _notset: | |
| 204 if PY2: | |
| 205 if isinstance(self._name, bytes): | |
| 206 try: | |
| 207 self._alt_u8name = ( | |
| 208 self._name | |
| 209 .decode(_FSENCODING, "strict") | |
| 210 .encode("utf-8", "strict")) | |
| 211 except UnicodeError: | |
| 212 self._alt_u8name = ( | |
| 213 self.surrogate_decode(self._name) | |
| 214 .encode("ascii", "backslashreplace")) | |
| 77 else: | 215 else: |
| 78 self._fspath = os.fsencode(self._path) | 216 self._alt_u8name = self._name.encode( |
| 79 return self._fspath | 217 "ascii", "backslashreplace") |
| 218 else: | |
| 219 self._alt_u8name = self._name.encode( | |
| 220 "utf-8", "backslashreplace") | |
| 221 return self._alt_u8name | |
| 222 | |
| 223 @property | |
| 224 def alt_u8path(self): | |
| 225 if PY2: | |
| 226 if isinstance(self._path, bytes): | |
| 227 try: | |
| 228 return (self._path.decode(_FSENCODING, "strict") | |
| 229 .encode("utf-8", "strict")) | |
| 230 except UnicodeError: | |
| 231 return (self.surrogate_decode(self._path) | |
| 232 .encode("ascii", "backslashreplace")) | |
| 233 else: | |
| 234 return self._path.encode("ascii", "backslashreplace") | |
| 235 else: | |
| 236 return self._path.encode("utf-8", "backslashreplace") | |
| 80 | 237 |
| 81 @property | 238 @property |
| 82 def is_symlink(self): | 239 def is_symlink(self): |
| 83 return self._is_symlink | 240 return self._is_symlink |
| 84 | 241 |
| 100 return "<WalkDirEntry %r (%s)>" % (self._name, tag) | 257 return "<WalkDirEntry %r (%s)>" % (self._name, tag) |
| 101 return "<WalkDirEntry %r>" % (self._name,) | 258 return "<WalkDirEntry %r>" % (self._name,) |
| 102 | 259 |
| 103 @classmethod | 260 @classmethod |
| 104 def from_direntry(cls_, entry): | 261 def from_direntry(cls_, entry): |
| 105 w = cls_(entry.name) | 262 w = cls_(entry.name, entry.path) |
| 106 w._path = entry.path | |
| 107 try: | 263 try: |
| 108 w._is_dir = entry.is_dir(follow_symlinks=True) | 264 w._is_dir = entry.is_dir(follow_symlinks=True) |
| 109 except OSError: | 265 except OSError: |
| 110 # | 266 # |
| 111 # If is_dir() raises an OSError, consider that the entry | 267 # If is_dir() raises an OSError, consider that the entry |
| 123 # Do not supress errors here and (consistently) follow symlinks | 279 # Do not supress errors here and (consistently) follow symlinks |
| 124 w._stat_result = entry.stat(follow_symlinks=True) | 280 w._stat_result = entry.stat(follow_symlinks=True) |
| 125 return w | 281 return w |
| 126 | 282 |
| 127 @classmethod | 283 @classmethod |
| 128 def from_path_name(cls_, path, name): | 284 def from_path_name(cls_, path, name, _do_stat=True): |
| 129 w = cls_(name) | 285 """`_nostat` is to be used only for testing purposes""" |
| 130 w._path = os.path.join(path, name) | 286 w = cls_(name, os.path.join(path, name)) |
| 131 try: | 287 try: |
| 132 w._is_dir = os.path.isdir(w._path) | 288 w._is_dir = os.path.isdir(w._path) |
| 133 except OSError: | 289 except OSError: |
| 134 # | 290 # |
| 135 # If is_dir() raises an OSError, consider that the entry | 291 # If is_dir() raises an OSError, consider that the entry |
| 142 # | 298 # |
| 143 # If is_symlink() raises an OSError, consider that the entry | 299 # If is_symlink() raises an OSError, consider that the entry |
| 144 # is not a symbolic link, same behaviour than os.path.islink(). | 300 # is not a symbolic link, same behaviour than os.path.islink(). |
| 145 # | 301 # |
| 146 w._is_symlink = False | 302 w._is_symlink = False |
| 147 w._stat_result = os.stat(w._path) | 303 if _do_stat: |
| 304 w._stat_result = os.stat(w._path) | |
| 305 return w | |
| 306 | |
| 307 @classmethod | |
| 308 def from_readlink(cls_, path): | |
| 309 w = cls_(os.path.basename(path), path) | |
| 148 return w | 310 return w |
| 149 | 311 |
| 150 @staticmethod | 312 @staticmethod |
| 151 def sort_key(entry): | 313 def sort_key(entry): |
| 152 return entry._fsname | 314 return entry.alt_fsname # because it should never throw |
| 315 | |
| 316 @staticmethod | |
| 317 def alt_sort_key(entry): | |
| 318 return entry.alt_u8name # because it should never throw | |
| 319 | |
| 320 if PY2: | |
| 321 | |
| 322 @staticmethod | |
| 323 def surrogate_decode(what): | |
| 324 """Decode the bytes object `what` using surrogates from :pep:`383` | |
| 325 for all non-ASCII octets. | |
| 326 | |
| 327 """ | |
| 328 uwhat = [] | |
| 329 assert isinstance(what, bytes) | |
| 330 for ch in what: | |
| 331 chcode = ord(ch) | |
| 332 if chcode <= 0x7f: | |
| 333 uwhat.append(unichr(chcode)) # noqa: F821 unichr | |
| 334 else: | |
| 335 uwhat.append(unichr(0xDC00 + chcode)) # noqa: F821 unichr | |
| 336 return u"".join(uwhat) | |
| 153 | 337 |
| 154 | 338 |
| 155 if scandir: | 339 if scandir: |
| 156 | 340 |
| 157 class ScanDir(object): | 341 class ScanDir(object): |
