Mercurial > hgrepos > Python > apps > py-cutils
view cutils/util/walk.py @ 155:bf74ce3c968d
When computing digests use the order imposed by names alone.
No different loops for dirs and nondirs.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Mon, 06 Jan 2025 13:39:12 +0100 |
| parents | c7df81fb84b7 |
| children | 481cc9b26861 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2020-2025 Franz Glasner # :License: BSD-3-Clause # :- r"""Utility sub-module to implement a heavily customized :func:`os.walk`. """ __all__ = ["walk"] import os try: from os import scandir except ImportError: try: from scandir import scandir except ImportError: scandir = None from .cm import nullcontext from .constants import PY2 class WalkDirEntry(object): """A :class:`os.DirEntry` alike to be used in :func:`walk` and for its results. """ __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink", "_is_dir", "_stat_result") def __init__(self, name): self._name = name if PY2: assert isinstance(name, bytes) self._fsname = name else: self._fsname = os.fsencode(name) self._path = None self._fspath = None self._is_symlink = self._is_dir = self._stat_result = None @property def name(self): return self._name @property def fsname(self): return self._fsname @property def path(self): return self._path @property def fspath(self): if self._path is not None: if self._fspath is None: if PY2: assert isinstance(self._path, bytes) self._fspath = self._path else: self._fspath = os.fsencode(self._path) return self._fspath @property def is_symlink(self): return self._is_symlink @property def is_dir(self): return self._is_dir @property def stat(self): return self._stat_result def __repr__(self): tag = "" if self._is_symlink: tag += "l" if self._is_dir: tag += "d" if tag: return "<WalkDirEntry %r (%s)>" % (self._name, tag) return "<WalkDirEntry %r>" % (self._name,) @classmethod def from_direntry(cls_, entry): w = cls_(entry.name) w._path = entry.path try: w._is_dir = entry.is_dir(follow_symlinks=True) except OSError: # # If is_dir() raises an OSError, consider that the entry # is not a directory, same behaviour than os.path.isdir(). # w._is_dir = False try: w._is_symlink = entry.is_symlink() except OSError: # # If is_symlink() raises an OSError, consider that the entry # is not a symbolic link, same behaviour than os.path.islink(). # w._is_symlink = False if not w._is_dir: # Do not supress errors here and (consistently) follow symlinks w._stat_result = entry.stat(follow_symlinks=True) return w @staticmethod def sort_key(entry): return entry._fsname if scandir: def walk(root, follow_symlinks=False): """A heyvily customized :func:`os.walk` alike that differs from the original: - optimized for use in :command:`treesum` - most errors are not suppressed - the `root` is never part of the returned data - the returned directory in "top" is not a string form but a list of individual path segments - there is only one yielded list * contains :class:`WalkDirEntry` * sorted by its fsname The caller can easily get the old dirs and nondirs by filtering the yielded list using "entry.is_dir". - recurse into sub-directories first ("topdown=False") - sort consistently all yielded lists by the filesystem encoding .. note:: The implementation is based on Python 3.11 and needs a functional :func:`os.scandir` or :func:`scandir.scandir` implementation. It intentionally follows the logic in Python 3.11 while it could be simplified because we are not implementing some of the original flags (e.g. like `topdown`). """ normed_root = os.path.normpath(root) yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks) def _walk(root, top, follow_symlinks): # noqa: E303 too many empty lines """:func:`walk` helper""" if top: path = os.path.join(root, *top) else: path = root fsobjects, walk_dirs = [], [] scandir_cm = scandir(path) if not hasattr(scandir_cm, "close"): scandir_cm = nullcontext(scandir_cm) with scandir_cm as scandir_it: while True: try: entry = WalkDirEntry.from_direntry(next(scandir_it)) except StopIteration: break fsobjects.append(entry) # # Always bottom-up: recurse into sub-directories, but exclude # symlinks to directories if follow_symlinks is False # if entry.is_dir: if follow_symlinks: walk_into = True else: walk_into = not entry.is_symlink if walk_into: walk_dirs.append(entry) # Sort by low-level filesystem encoding walk_dirs.sort(key=WalkDirEntry.sort_key) fsobjects.sort(key=WalkDirEntry.sort_key) # Recurse into sub-directories for wd in walk_dirs: yield from _walk(root, top + (wd.name,), follow_symlinks) # Yield after recursion if going bottom up yield top, fsobjects else: raise ImportError("no `scandir()' module available")
