Mercurial > hgrepos > Python > apps > py-cutils
view cutils/util/walk.py @ 177:089c40240061
Add an alternate implementation for generating directory tree digests:
- Do not use something like os.walk() but use os.scandir() directly.
- Recursively generate the subdirectory digests only when needed and in
the right order.
This fixes that the order of subdirectories in the output did not
match the application order of its directory digests.
The new implementation also should make filtering (that will be
implemented later) easier.
NOTE: The tree digests of the old and the new implementation are identical.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Sat, 11 Jan 2025 17:41:28 +0100 |
| parents | 506d895a8500 |
| children | dac26a2d9de5 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2020-2025 Franz Glasner # :License: BSD-3-Clause # :- r"""Utility sub-module to implement a heavily customized :func:`os.walk`. """ __all__ = ["walk", "ScanDir"] import os try: from os import scandir except ImportError: try: from scandir import scandir except ImportError: scandir = None from . import PY2 from .cm import nullcontext class WalkDirEntry(object): """A :class:`os.DirEntry` alike to be used in :func:`walk` and for its results. """ __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink", "_is_dir", "_stat_result") def __init__(self, name): self._name = name if PY2: assert isinstance(name, bytes) self._fsname = name else: self._fsname = os.fsencode(name) self._path = None self._fspath = None self._is_symlink = self._is_dir = self._stat_result = None @property def name(self): return self._name @property def fsname(self): return self._fsname @property def path(self): return self._path @property def fspath(self): if self._path is not None: if self._fspath is None: if PY2: assert isinstance(self._path, bytes) self._fspath = self._path else: self._fspath = os.fsencode(self._path) return self._fspath @property def is_symlink(self): return self._is_symlink @property def is_dir(self): return self._is_dir @property def stat(self): return self._stat_result def __repr__(self): tag = "" if self._is_symlink: tag += "l" if self._is_dir: tag += "d" if tag: return "<WalkDirEntry %r (%s)>" % (self._name, tag) return "<WalkDirEntry %r>" % (self._name,) @classmethod def from_direntry(cls_, entry): w = cls_(entry.name) w._path = entry.path try: w._is_dir = entry.is_dir(follow_symlinks=True) except OSError: # # If is_dir() raises an OSError, consider that the entry # is not a directory, same behaviour than os.path.isdir(). # w._is_dir = False try: w._is_symlink = entry.is_symlink() except OSError: # # If is_symlink() raises an OSError, consider that the entry # is not a symbolic link, same behaviour than os.path.islink(). # w._is_symlink = False # Do not supress errors here and (consistently) follow symlinks w._stat_result = entry.stat(follow_symlinks=True) return w @classmethod def from_path_name(cls_, path, name): w = cls_(name) w._path = os.path.join(path, name) try: w._is_dir = os.path.isdir(w._path) except OSError: # # If is_dir() raises an OSError, consider that the entry # is not a directory, same behaviour than os.path.isdir(). # w._is_dir = False try: w._is_symlink = os.path.islink(w._path) except OSError: # # If is_symlink() raises an OSError, consider that the entry # is not a symbolic link, same behaviour than os.path.islink(). # w._is_symlink = False w._stat_result = os.stat(w._path) return w @staticmethod def sort_key(entry): return entry._fsname def walk(root, follow_symlinks=False): """A heyvily customized :func:`os.walk` alike that differs from the original: - optimized for use in :command:`treesum` - most errors are not suppressed - the `root` is never part of the returned data - the returned directory in "top" is not a string form but a list of individual path segments - there is only one yielded list * contains :class:`WalkDirEntry` * sorted by its fsname The caller can easily get the old dirs and nondirs by filtering the yielded list using "entry.is_dir". - recurse into sub-directories first ("topdown=False") - sort consistently all yielded lists by the filesystem encoding .. note:: The implementation is based on Python 3.11 and needs a functional :func:`os.scandir` or :func:`scandir.scandir` implementation. It intentionally follows the logic in Python 3.11 while it could be simplified because we are not implementing some of the original flags (e.g. like `topdown`). """ normed_root = os.path.normpath(root) yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks) if scandir: def _walk(root, top, follow_symlinks): """:func:`walk` helper. Implemented using :func:`os.scandir`. """ if top: path = os.path.join(root, *top) else: path = root fsobjects, walk_dirs = [], [] scandir_cm = scandir(path) if not hasattr(scandir_cm, "close"): scandir_cm = nullcontext(scandir_cm) with scandir_cm as scandir_it: while True: try: entry = WalkDirEntry.from_direntry(next(scandir_it)) except StopIteration: break fsobjects.append(entry) # # Always bottom-up: recurse into sub-directories, but exclude # symlinks to directories if follow_symlinks is False # if entry.is_dir: if follow_symlinks: walk_into = True else: walk_into = not entry.is_symlink if walk_into: walk_dirs.append(entry) # Sort by low-level filesystem encoding walk_dirs.sort(key=WalkDirEntry.sort_key) fsobjects.sort(key=WalkDirEntry.sort_key) # Recurse into sub-directories for wd in walk_dirs: yield from _walk(root, top + (wd.name,), follow_symlinks) # Yield after recursion if going bottom up yield top, fsobjects class ScanDir(object): # noqa: E303 too many blank lines """An :func:`os.scandir` wrapper that is always an iterator and a context manager. """ __slots__ = ("_scandir_it", ) def __init__(self, path): super(ScanDir, self).__init__() self._scandir_it = os.scandir(path) def __iter__(self): return self def __next__(self): return WalkDirEntry.from_direntry(next(self._scandir_it)) if PY2: next = __next__ def __enter__(self): return self def __exit__(self, *args, **kwds): if hasattr(self._scandir_it, "close"): self._scandir_it.close() else: def _walk(root, top, follow_symlinks): """:func:`walk` helper. Implemented using :func:`os.listdir`. """ if top: path = os.path.join(root, *top) else: path = root fsobjects, walk_dirs = [], [] names = os.listdir(path) for name in names: entry = WalkDirEntry.from_path_name(path, name) fsobjects.append(entry) # # Always bottom-up: recurse into sub-directories, but exclude # symlinks to directories if follow_symlinks is False # if entry.is_dir: if follow_symlinks: walk_into = True else: walk_into = not entry.is_symlink if walk_into: walk_dirs.append(entry) # Sort by low-level filesystem encoding walk_dirs.sort(key=WalkDirEntry.sort_key) fsobjects.sort(key=WalkDirEntry.sort_key) # Recurse into sub-directories for wd in walk_dirs: yield from _walk(root, top + (wd.name,), follow_symlinks) # Yield after recursion if going bottom up yield top, fsobjects class ScanDir(object): # noqa: E303 too many blank lines """An :func:`os.scandir` wrapper that is always an iterator and a context manager. """ __slots__ = ("_listdir_it", "_path") def __init__(self, path): super(ScanDir, self).__init__() self._listdir_it = iter(os.listdir(path)) self._path = path def __iter__(self): return self def __next__(self): return WalkDirEntry.from_path_name(self._path, next(self._listdir_it)) if PY2: next = __next__ def __enter__(self): return self def __exit__(self, *args, **kwds): pass
