view cutils/util/walk.py @ 155:bf74ce3c968d

When computing digests use the order imposed by names alone. No different loops for dirs and nondirs.
author Franz Glasner <fzglas.hg@dom66.de>
date Mon, 06 Jan 2025 13:39:12 +0100
parents c7df81fb84b7
children 481cc9b26861
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2020-2025 Franz Glasner
# :License:   BSD-3-Clause
# :-
r"""Utility sub-module to implement a heavily customized :func:`os.walk`.

"""

__all__ = ["walk"]


import os
try:
    from os import scandir
except ImportError:
    try:
        from scandir import scandir
    except ImportError:
        scandir = None

from .cm import nullcontext
from .constants import PY2


class WalkDirEntry(object):

    """A :class:`os.DirEntry` alike to be used in :func:`walk` and for
    its results.

    """

    __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink",
                 "_is_dir", "_stat_result")

    def __init__(self, name):
        self._name = name
        if PY2:
            assert isinstance(name, bytes)
            self._fsname = name
        else:
            self._fsname = os.fsencode(name)
        self._path = None
        self._fspath = None
        self._is_symlink = self._is_dir = self._stat_result = None

    @property
    def name(self):
        return self._name

    @property
    def fsname(self):
        return self._fsname

    @property
    def path(self):
        return self._path

    @property
    def fspath(self):
        if self._path is not None:
            if self._fspath is None:
                if PY2:
                    assert isinstance(self._path, bytes)
                    self._fspath = self._path
                else:
                    self._fspath = os.fsencode(self._path)
        return self._fspath

    @property
    def is_symlink(self):
        return self._is_symlink

    @property
    def is_dir(self):
        return self._is_dir

    @property
    def stat(self):
        return self._stat_result

    def __repr__(self):
        tag = ""
        if self._is_symlink:
            tag += "l"
        if self._is_dir:
            tag += "d"
        if tag:
            return "<WalkDirEntry %r (%s)>" % (self._name, tag)
        return "<WalkDirEntry %r>" % (self._name,)

    @classmethod
    def from_direntry(cls_, entry):
        w = cls_(entry.name)
        w._path = entry.path
        try:
            w._is_dir = entry.is_dir(follow_symlinks=True)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = entry.is_symlink()
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        if not w._is_dir:
            # Do not supress errors here and (consistently) follow symlinks
            w._stat_result = entry.stat(follow_symlinks=True)
        return w

    @staticmethod
    def sort_key(entry):
        return entry._fsname


if scandir:

    def walk(root, follow_symlinks=False):
        """A heyvily customized :func:`os.walk` alike that differs from the
        original:

        - optimized for use in :command:`treesum`
        - most errors are not suppressed
        - the `root` is never part of the returned data
        - the returned directory in "top" is not a string form but a list of
          individual path segments
        - there is only one yielded list

          * contains :class:`WalkDirEntry`
          * sorted by its fsname

          The caller can easily get the old dirs and nondirs by filtering
          the yielded list using "entry.is_dir".

        - recurse into sub-directories first ("topdown=False")
        - sort consistently all yielded lists by the filesystem encoding

        .. note:: The implementation is based on Python 3.11 and needs a
                  functional :func:`os.scandir` or :func:`scandir.scandir`
                  implementation. It intentionally follows the logic in
                  Python 3.11 while it could be simplified because we are not
                  implementing some of the original flags (e.g. like
                  `topdown`).

        """
        normed_root = os.path.normpath(root)
        yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks)


    def _walk(root, top, follow_symlinks):  # noqa: E303  too many empty lines
        """:func:`walk` helper"""
        if top:
            path = os.path.join(root, *top)
        else:
            path = root

        fsobjects, walk_dirs = [], []

        scandir_cm = scandir(path)
        if not hasattr(scandir_cm, "close"):
            scandir_cm = nullcontext(scandir_cm)
        with scandir_cm as scandir_it:
            while True:
                try:
                    entry = WalkDirEntry.from_direntry(next(scandir_it))
                except StopIteration:
                    break
                fsobjects.append(entry)
                #
                # Always bottom-up: recurse into sub-directories, but exclude
                # symlinks to directories if follow_symlinks is False
                #
                if entry.is_dir:
                    if follow_symlinks:
                        walk_into = True
                    else:
                        walk_into = not entry.is_symlink
                    if walk_into:
                        walk_dirs.append(entry)

        # Sort by low-level filesystem encoding
        walk_dirs.sort(key=WalkDirEntry.sort_key)
        fsobjects.sort(key=WalkDirEntry.sort_key)

        # Recurse into sub-directories
        for wd in walk_dirs:
            yield from _walk(root, top + (wd.name,), follow_symlinks)
        # Yield after recursion if going bottom up
        yield top, fsobjects

else:

    raise ImportError("no `scandir()' module available")