Mercurial > hgrepos > Python > apps > py-cutils
view cutils/util/walk.py @ 365:d5c920ace3cb
treesum: FIX: Python<2.7.9 has no hashlib.algorithms_available: fall back to hashlib.algorithms in this case
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Wed, 09 Apr 2025 22:14:49 +0200 |
| parents | b256ae4f4bc8 |
| children | 7761a15b9736 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # SPDX-FileCopyrightText: © 2025 Franz Glasner # SPDX-License-Identifier: BSD-3-Clause # :- r"""Utility sub-module to implement a heavily customized :func:`os.walk`. """ from __future__ import print_function, absolute_import __all__ = ["WalkDirEntry", "ScanDir", "getfsencoding"] import logging import os try: from os import scandir except ImportError: try: from scandir import scandir except ImportError: scandir = None import stat import sys from . import PY2 HELP_FILETYPE_INDICATORS = r""" FILETYPE INDICATORS =================== File and directory paths are printed using names analogous to calling "ls -F/--classify". The indicator strings (aka "marker" or "tags") are appended to their names as follows. Some standard indicators are: / denotes a directory /./@/ denotes a symbolic link to a directory /./@ Classifies a symlink to a regular filesystem object (i.e. a regular file). The target objects are not classified as directories or other special filesystem objects. Also used if a symbolic link is broken and the target type cannot determined. /./| --- /./@| FIFO --- symlink to FIFO /./= --- /./@= Socket --- symlink to socket /./> --- /./@> Door -- symlink to door. Solaris. /./% --- /./@% Whiteout --- symlink to whiteout. Typically Used by union filesystems) (BSD). More non-standard indicators are: /./: --- /./@: Character special device --- symlink to character special device /./; --- /./@; Block special device --- symlink to block special device /./+ --- /./@+ Event port --- symlink to event port. Solaris, Illumos. In an aggregated (directory) checksum at the end of a block the following indicators are used: ./@/ Symbolic link to a directory ./@ Symbolic link to other filesystem object. Also used if the link is broken and the target type cannot determined. NOTE: Executable files have no special indicator here. The "ls -F" command would use the `*' character in this case. """ _notset = object() _logger = logging.getLogger(__name__) _FSENCODING = sys.getfilesystemencoding() if PY2: def _unix_path(s): if isinstance(s, bytes): return s.replace(b"\\", b"/") return s.replace(u"\\", u"/") else: def _unix_path(s): return s.replace("\\", "/") class WalkDirEntry(object): """A :class:`os.DirEntry` alike to be used in :func:`walk` and for its results. """ __slots__ = ("_name", "_path", # encoded as given in the ctor "_is_symlink", "_is_reg", "_is_dir", "_stat_result", "_stat_errno", "_stat_errstr", "_alt_fsname", "_alt_u8name") def __init__(self, name, path): self._name = name # the name as given in the constructor """The name exactly as given in the ctor""" self._path = _unix_path(path) """The path as given in the ctor -- but normalized to have slashes""" self._is_symlink = self._is_reg = self._is_dir = self._stat_result = \ self._stat_errno = self._stat_errstr = None self._alt_fsname = self._alt_u8name = _notset @property def name(self): """The original name exactly as given in the ctor""" return self._name @property def path(self): """The original path exactly as given in the ctor.""" return self._path @property def fsname(self): """The name as bytes for the filesystem. Also do not allow CR of LF in the name. :rtype: bytes or None """ if PY2: if isinstance(self._name, bytes): s = self._name try: s = self._name.encode(_FSENCODING, "strict") except UnicodeError: return None else: s = os.fsencode(self._name) if (b'\n' in s) or (b'\r' in s) or (b'\\' in s): return None return s @property def fspath(self): """Always bytes. Also do not allow CR of LF in the path. :rtype: bytes or None """ if PY2: if isinstance(self._path, bytes): p = self._path try: p = self._path.encode(_FSENCODING, "strict") except UnicodeError: return None else: p = os.fsencode(self._path) if (b'\n' in p) or (b'\r' in p) or (b'\\' in p): return None return p @property def alt_fsname(self): """Alternative and "escaped" filesystem name -- always bytes. :rtype: bytes """ if self._alt_fsname is _notset: self._alt_fsname = WalkDirEntry.alt_fs(self._name) return self._alt_fsname @property def alt_fspath(self): """Alternative and "escaped" filesystem path -- always bytes. :rtype: bytes """ return WalkDirEntry.alt_fs(self._path) @staticmethod def alt_fs(what): # # Prevent double encoding ... # ... and hope that the current FS encoding is compatible # with it # if isinstance(what, bytes): s = (what.replace(b'\\', b"\\x5c") .replace(b'\n', b"\\x0a") .replace(b'\r', b"\\x0d") .replace(b'\t', b"\\x09")) else: s = (what.replace(u'\\', u"\\x5c") .replace(u'\n', u"\\x0a") .replace(u'\r', u"\\x0d") .replace(u'\t', u"\\x09")) if PY2: if isinstance(s, bytes): return s else: return s.encode(_FSENCODING, "backslashreplace") else: return os.fsencode(s) @property def uname(self): """Always "real", strictly encoded Unicode or `None` if this is not possible. :rtype: text or None """ if PY2: if isinstance(self._name, bytes): try: return self._name.decode(_FSENCODING, "strict") except UnicodeError: return None else: return self._name else: try: self._name.encode("utf-8", "strict") except UnicodeError: return None return self._name @property def upath(self): """Always "real", strictly encoded Unicode or `None` if this is not possible. :rtype: text or None """ if PY2: if isinstance(self._path, bytes): try: return self._path.decode(_FSENCODING, "strict") except UnicodeError: return None else: return self._path else: try: self._path.encode("utf-8", "strict") except UnicodeError: return None return self._path @property def u8name(self): """`.uname` as UTF-8 or `None` (as strict as `uname`). Also do not allow CR of LF in the name. """ n = self.uname if n is None: return None if (u'\n' in n) or (u'\r' in n) or (u'\\' in n): return None return n.encode("utf-8", "strict") @property def u8path(self): """`.upath` as UTF-8 or `None` (as strict as `upath`. Also do not allow CR or LF in the path. """ p = self.upath if p is None: return None if (u'\n' in p) or (u'\r' in p) or (u'\\' in p): return None return p.encode("utf-8", "strict") @property def alt_u8name(self): if self._alt_u8name is _notset: self._alt_u8name = WalkDirEntry.alt_u8(self._name) return self._alt_u8name @property def alt_u8path(self): return WalkDirEntry.alt_u8(self._path) @staticmethod def alt_u8(what): # # Prevent double encoding ... # ... and hope that the current UTF-8 is compatible # with it # if isinstance(what, bytes): s = (what.replace(b'\\', b"\\x5c") .replace(b'\n', b"\\x0a") .replace(b'\r', b"\\x0d") .replace(b'\t', b"\\x09")) else: s = (what.replace(u'\\', u"\\x5c") .replace(u'\n', u"\\x0a") .replace(u'\r', u"\\x0d") .replace(u'\t', u"\\x09")) if PY2: if isinstance(s, bytes): try: return (s.decode(_FSENCODING, "strict") .encode("utf-8", "strict")) except UnicodeError: return (WalkDirEntry.surrogate_decode(s) .encode("ascii", "backslashreplace")) else: return s.encode("ascii", "backslashreplace") else: return s.encode("utf-8", "backslashreplace") @property def is_symlink(self): return self._is_symlink @property def is_reg(self): return self._is_reg @property def is_dir(self): return self._is_dir @property def is_chr(self): return (stat.S_ISCHR(self._stat_result.st_mode) if self._stat_result is not None else False) @property def is_blk(self): return (stat.S_ISBLK(self._stat_result.st_mode) if self._stat_result is not None else False) @property def is_fifo(self): return (stat.S_ISFIFO(self._stat_result.st_mode) if self._stat_result is not None else False) @property def is_socket(self): return (stat.S_ISSOCK(self._stat_result.st_mode) if self._stat_result is not None else False) @property def is_door(self): test = getattr(stat, "S_ISDOOR", None) return (test(self._stat_result.st_mode) if test and (self._stat_result is not None) else False) @property def is_eventport(self): test = getattr(stat, "S_ISPORT", None) return (test(self._stat_result.st_mode) if test and (self._stat_result is not None) else False) @property def is_whiteout(self): test = getattr(stat, "S_ISWHT", None) return (test(self._stat_result.st_mode) if test and (self._stat_result is not None) else False) @property def is_special(self): """Anything besides a regular file and a directory""" if self._stat_result is None: return False return not (self.is_reg or self.is_dir) @property def special_tag(self): """Return a special tag (string) for a special file""" assert self.is_special if self.is_chr: return ':' elif self.is_blk: return ';' elif self.is_fifo: return '|' elif self.is_socket: return '=' elif self.is_door: return '>' elif self.is_whiteout: return '%' elif self.is_eventport: return '+' _logger.warning( "unknown special file type: 0x%X", stat.S_IFMT(self._stat_result.st_mode)) return '?' @property def stat(self): return self._stat_result @property def stat_errno(self): return self._stat_errno @property def stat_errstr(self): return self._stat_errstr def __repr__(self): tag = "" if self._is_symlink: tag += "l" if self._is_dir: tag += "d" if tag: return "<WalkDirEntry %r (%s)>" % (self._name, tag) return "<WalkDirEntry %r>" % (self._name,) @classmethod def from_direntry(cls_, entry): w = cls_(entry.name, entry.path) try: w._is_dir = entry.is_dir(follow_symlinks=True) except OSError: # # If is_dir() raises an OSError, consider that the entry # is not a directory, same behaviour than os.path.isdir(). # w._is_dir = False try: w._is_symlink = entry.is_symlink() except OSError: # # If is_symlink() raises an OSError, consider that the entry # is not a symbolic link, same behaviour than os.path.islink(). # w._is_symlink = False # Consistently follow symlinks try: w._stat_result = entry.stat(follow_symlinks=True) except OSError as e: w._stat_result = None w._stat_errno = e.errno w._stat_errstr = e.strerror w._is_reg = False else: w._is_reg = stat.S_ISREG(w._stat_result.st_mode) return w @classmethod def from_path_name(cls_, path, name, _do_stat=True): """`_do_stat` is to be used only for testing purposes""" w = cls_(name, os.path.join(path, name)) try: w._is_dir = os.path.isdir(w._path) except OSError: # # If is_dir() raises an OSError, consider that the entry # is not a directory, same behaviour than os.path.isdir(). # w._is_dir = False try: w._is_symlink = os.path.islink(w._path) except OSError: # # If is_symlink() raises an OSError, consider that the entry # is not a symbolic link, same behaviour than os.path.islink(). # w._is_symlink = False if _do_stat: try: w._stat_result = os.stat(w._path) except OSError as e: w._stat_result = None w._stat_errno = e.errno w._stat_errstr = e.strerror w._is_reg = False else: w._is_reg = stat.S_ISREG(w._stat_result.st_mode) return w @classmethod def from_readlink(cls_, path): w = cls_(os.path.basename(path), path) return w @staticmethod def sort_key_fs(entry): return entry.alt_fsname # because it should never throw @staticmethod def sort_key_u8(entry): return entry.alt_u8name # because it should never throw if PY2: @staticmethod def surrogate_decode(what): """Decode the bytes object `what` using surrogates from :pep:`383` for all non-ASCII octets. """ uwhat = [] assert isinstance(what, bytes) for ch in what: chcode = ord(ch) if chcode <= 0x7f: uwhat.append(unichr(chcode)) # noqa: F821 unichr else: uwhat.append(unichr(0xDC00 + chcode)) # noqa: F821 unichr return u"".join(uwhat) if scandir: class ScanDir(object): """An :func:`os.scandir` wrapper that is always an iterator and a context manager. """ __slots__ = ("_scandir_it", ) def __init__(self, path): super(ScanDir, self).__init__() self._scandir_it = scandir(path) def __iter__(self): return self def __next__(self): if self._scandir_it is None: raise StopIteration("closed") return WalkDirEntry.from_direntry(next(self._scandir_it)) if PY2: next = __next__ def __enter__(self): return self def __exit__(self, *args, **kwds): self.close() def close(self): if self._scandir_it is not None: if hasattr(self._scandir_it, "close"): self._scandir_it.close() self._scandir_it = None else: class ScanDir(object): """An :func:`os.scandir` wrapper that is always an iterator and a context manager. """ __slots__ = ("_listdir_it", "_path") def __init__(self, path): super(ScanDir, self).__init__() self._listdir_it = iter(os.listdir(path)) self._path = path def __iter__(self): return self def __next__(self): if self._listdir_it is None: raise StopIteration("closed") return WalkDirEntry.from_path_name(self._path, next(self._listdir_it)) if PY2: next = __next__ def __enter__(self): return self def __exit__(self, *args, **kwds): pass def close(self): pass def getfsencoding(): """Return the stored _FSENCODING of this module""" return _FSENCODING
