Mercurial > hgrepos > Python > apps > py-cutils
changeset 121:2dc26a2f3d1c
A heavily customized "os.walk()" alike to support the coming treeview implementation
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Wed, 01 Jan 2025 17:52:41 +0100 |
| parents | a548783381b6 |
| children | 1e5127028254 |
| files | cutils/util/walk.py |
| diffstat | 1 files changed, 200 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cutils/util/walk.py Wed Jan 01 17:52:41 2025 +0100 @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +# :- +# :Copyright: (c) 2020-2025 Franz Glasner +# :License: BSD-3-Clause +# :- +r"""Utility sub-module to implement a heavily customized :func:`os.walk`. + +""" + +__all__ = ["walk"] + + +import os +try: + from os import scandir +except ImportError: + try: + from scandir import scandir + except ImportError: + scandir = None +from contextlib import closing + +from .cm import nullcontext +from .constants import PY2 + + +class WalkDirEntry(object): + + """A :class:`os.DirEntry` alike to be used in :func:`walk` and for + its results. + + """ + + __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink", + "_is_dir", "_stat_result") + + def __init__(self, name): + self._name = name + if PY2: + assert isinstance(name, bytes) + self._fsname = name + else: + self._fsname = os.fsencode(name) + self._path = None + self._fspath = None + self._is_symlink = self._is_dir = self._stat_result = None + + @property + def name(self): + return self._name + + @property + def fsname(self): + return self._fsname + + @property + def path(self): + return self._path + + @property + def fspath(self): + if self._path is not None: + if self._fspath is None: + if PY2: + assert isinstance(self._path, bytes) + self._fspath = self._path + else: + self._fspath = os.fsencode(self._path) + return self._fspath + + @property + def is_symlink(self): + return self._is_symlink + + @property + def is_dir(self): + return self._is_dir + + @property + def stat(self): + return self._stat_result + + def __repr__(self): + tag = "" + if self._is_symlink: + tag += "l" + if self._is_dir: + tag += "d" + if tag: + return "<WalkDirEntry %r (%s)>" % (self._name, tag) + return "<WalkDirEntry %r>" % (self._name,) + + @classmethod + def from_direntry(cls_, entry): + w = cls_(entry.name) + w._path = entry.path + try: + w._is_dir = entry.is_dir(follow_symlinks=True) + except OSError: + # + # If is_dir() raises an OSError, consider that the entry + # is not a directory, same behaviour than os.path.isdir(). + # + w._is_dir = False + try: + w._is_symlink = entry.is_symlink() + except OSError: + # + # If is_symlink() raises an OSError, consider that the entry + # is not a symbolic link, same behaviour than os.path.islink(). + # + w._is_symlink = False + if not w._is_dir: + # Do not supress errors here and (consistently) follow symlinks + w._stat_result = entry.stat(follow_symlinks=True) + return w + + @staticmethod + def sort_key(entry): + return entry._fsname + + +if scandir: + + def walk(root, follow_symlinks=False): + """A heyvily customized :func:`os.walk` alike that differs from the + original: + + - optimized for use in :command:`treesum` + - most errors are not suppressed + - the `root` is never part of the returned data + - the returned directory in "top" is not a string form but a list of + individual path segments + - all other yielded lists contain WalkDirEntry elements instead of + strings + - recurse into sub-directories first ("topdown=False") + - sort consistently all yielded lists by the filesystem encoding + + .. note:: The implementation is based on Python 3.11 and needs a + functional :func:`os.scandir` or :func:`scandir.scandir` + implementation. It intentionally follows the logic in + Python 3.11 while it could be simplified because we are not + implementing some of the original flags (e.g. like + `topdown`). + + """ + normed_root = os.path.normpath(root) + yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks) + + + def _walk(root, top, follow_symlinks): # noqa: E303 too many empty lines + """:func:`walk` helper""" + if top: + path = os.path.join(root, *top) + else: + path = root + + dirs, nondirs, walk_dirs = [], [], [] + + scandir_it = scandir(path) + if hasattr(scandir_it, "close"): + scandir_ctx = closing(scandir_it) + else: + scandir_ctx = nullcontext(scandir_it) + with scandir_ctx as scandir_it: + while True: + try: + entry = WalkDirEntry.from_direntry(next(scandir_it)) + except StopIteration: + break + if entry.is_dir: + dirs.append(entry) + else: + nondirs.append(entry) + # + # Always bottom-up: recurse into sub-directories, but exclude + # symlinks to directories if follow_symlinks is False + # + if entry.is_dir: + if follow_symlinks: + walk_into = True + else: + walk_into = not entry.is_symlink + if walk_into: + walk_dirs.append(entry) + + # Sort by low-level filesystem encoding + walk_dirs.sort(key=WalkDirEntry.sort_key) + dirs.sort(key=WalkDirEntry.sort_key) + nondirs.sort(key=WalkDirEntry.sort_key) + + # Recurse into sub-directories + for wd in walk_dirs: + yield from _walk(root, top + (wd.name,), follow_symlinks) + # Yield after recursion if going bottom up + yield top, dirs, nondirs + +else: + + raise ImportError("no `scandir()' module available")
