diff cutils/util/walk.py @ 121:2dc26a2f3d1c

A heavily customized "os.walk()" alike to support the coming treeview implementation
author Franz Glasner <fzglas.hg@dom66.de>
date Wed, 01 Jan 2025 17:52:41 +0100
parents
children 4a0c3c9eead7
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cutils/util/walk.py	Wed Jan 01 17:52:41 2025 +0100
@@ -0,0 +1,200 @@
+# -*- coding: utf-8 -*-
+# :-
+# :Copyright: (c) 2020-2025 Franz Glasner
+# :License:   BSD-3-Clause
+# :-
+r"""Utility sub-module to implement a heavily customized :func:`os.walk`.
+
+"""
+
+__all__ = ["walk"]
+
+
+import os
+try:
+    from os import scandir
+except ImportError:
+    try:
+        from scandir import scandir
+    except ImportError:
+        scandir = None
+from contextlib import closing
+
+from .cm import nullcontext
+from .constants import PY2
+
+
+class WalkDirEntry(object):
+
+    """A :class:`os.DirEntry` alike to be used in :func:`walk` and for
+    its results.
+
+    """
+
+    __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink",
+                 "_is_dir", "_stat_result")
+
+    def __init__(self, name):
+        self._name = name
+        if PY2:
+            assert isinstance(name, bytes)
+            self._fsname = name
+        else:
+            self._fsname = os.fsencode(name)
+        self._path = None
+        self._fspath = None
+        self._is_symlink = self._is_dir = self._stat_result = None
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def fsname(self):
+        return self._fsname
+
+    @property
+    def path(self):
+        return self._path
+
+    @property
+    def fspath(self):
+        if self._path is not None:
+            if self._fspath is None:
+                if PY2:
+                    assert isinstance(self._path, bytes)
+                    self._fspath = self._path
+                else:
+                    self._fspath = os.fsencode(self._path)
+        return self._fspath
+
+    @property
+    def is_symlink(self):
+        return self._is_symlink
+
+    @property
+    def is_dir(self):
+        return self._is_dir
+
+    @property
+    def stat(self):
+        return self._stat_result
+
+    def __repr__(self):
+        tag = ""
+        if self._is_symlink:
+            tag += "l"
+        if self._is_dir:
+            tag += "d"
+        if tag:
+            return "<WalkDirEntry %r (%s)>" % (self._name, tag)
+        return "<WalkDirEntry %r>" % (self._name,)
+
+    @classmethod
+    def from_direntry(cls_, entry):
+        w = cls_(entry.name)
+        w._path = entry.path
+        try:
+            w._is_dir = entry.is_dir(follow_symlinks=True)
+        except OSError:
+            #
+            # If is_dir() raises an OSError, consider that the entry
+            # is not a directory, same behaviour than os.path.isdir().
+            #
+            w._is_dir = False
+        try:
+            w._is_symlink = entry.is_symlink()
+        except OSError:
+            #
+            # If is_symlink() raises an OSError, consider that the entry
+            # is not a symbolic link, same behaviour than os.path.islink().
+            #
+            w._is_symlink = False
+        if not w._is_dir:
+            # Do not supress errors here and (consistently) follow symlinks
+            w._stat_result = entry.stat(follow_symlinks=True)
+        return w
+
+    @staticmethod
+    def sort_key(entry):
+        return entry._fsname
+
+
+if scandir:
+
+    def walk(root, follow_symlinks=False):
+        """A heyvily customized :func:`os.walk` alike that differs from the
+        original:
+
+        - optimized for use in :command:`treesum`
+        - most errors are not suppressed
+        - the `root` is never part of the returned data
+        - the returned directory in "top" is not a string form but a list of
+          individual path segments
+        - all other yielded lists contain WalkDirEntry elements instead of
+          strings
+        - recurse into sub-directories first ("topdown=False")
+        - sort consistently all yielded lists by the filesystem encoding
+
+        .. note:: The implementation is based on Python 3.11 and needs a
+                  functional :func:`os.scandir` or :func:`scandir.scandir`
+                  implementation. It intentionally follows the logic in
+                  Python 3.11 while it could be simplified because we are not
+                  implementing some of the original flags (e.g. like
+                  `topdown`).
+
+        """
+        normed_root = os.path.normpath(root)
+        yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks)
+
+
+    def _walk(root, top, follow_symlinks):  # noqa: E303  too many empty lines
+        """:func:`walk` helper"""
+        if top:
+            path = os.path.join(root, *top)
+        else:
+            path = root
+
+        dirs, nondirs, walk_dirs = [], [], []
+
+        scandir_it = scandir(path)
+        if hasattr(scandir_it, "close"):
+            scandir_ctx = closing(scandir_it)
+        else:
+            scandir_ctx = nullcontext(scandir_it)
+        with scandir_ctx as scandir_it:
+            while True:
+                try:
+                    entry = WalkDirEntry.from_direntry(next(scandir_it))
+                except StopIteration:
+                    break
+                if entry.is_dir:
+                    dirs.append(entry)
+                else:
+                    nondirs.append(entry)
+                #
+                # Always bottom-up: recurse into sub-directories, but exclude
+                # symlinks to directories if follow_symlinks is False
+                #
+                if entry.is_dir:
+                    if follow_symlinks:
+                        walk_into = True
+                    else:
+                        walk_into = not entry.is_symlink
+                    if walk_into:
+                        walk_dirs.append(entry)
+
+        # Sort by low-level filesystem encoding
+        walk_dirs.sort(key=WalkDirEntry.sort_key)
+        dirs.sort(key=WalkDirEntry.sort_key)
+        nondirs.sort(key=WalkDirEntry.sort_key)
+
+        # Recurse into sub-directories
+        for wd in walk_dirs:
+            yield from _walk(root, top + (wd.name,), follow_symlinks)
+        # Yield after recursion if going bottom up
+        yield top, dirs, nondirs
+
+else:
+
+    raise ImportError("no `scandir()' module available")