view cutils/util/walk.py @ 177:089c40240061

Add an alternate implementation for generating directory tree digests: - Do not use something like os.walk() but use os.scandir() directly. - Recursively generate the subdirectory digests only when needed and in the right order. This fixes that the order of subdirectories in the output did not match the application order of its directory digests. The new implementation also should make filtering (that will be implemented later) easier. NOTE: The tree digests of the old and the new implementation are identical.
author Franz Glasner <fzglas.hg@dom66.de>
date Sat, 11 Jan 2025 17:41:28 +0100
parents 506d895a8500
children dac26a2d9de5
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2020-2025 Franz Glasner
# :License:   BSD-3-Clause
# :-
r"""Utility sub-module to implement a heavily customized :func:`os.walk`.

"""

__all__ = ["walk",
           "ScanDir"]


import os
try:
    from os import scandir
except ImportError:
    try:
        from scandir import scandir
    except ImportError:
        scandir = None

from . import PY2
from .cm import nullcontext


class WalkDirEntry(object):

    """A :class:`os.DirEntry` alike to be used in :func:`walk` and for
    its results.

    """

    __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink",
                 "_is_dir", "_stat_result")

    def __init__(self, name):
        self._name = name
        if PY2:
            assert isinstance(name, bytes)
            self._fsname = name
        else:
            self._fsname = os.fsencode(name)
        self._path = None
        self._fspath = None
        self._is_symlink = self._is_dir = self._stat_result = None

    @property
    def name(self):
        return self._name

    @property
    def fsname(self):
        return self._fsname

    @property
    def path(self):
        return self._path

    @property
    def fspath(self):
        if self._path is not None:
            if self._fspath is None:
                if PY2:
                    assert isinstance(self._path, bytes)
                    self._fspath = self._path
                else:
                    self._fspath = os.fsencode(self._path)
        return self._fspath

    @property
    def is_symlink(self):
        return self._is_symlink

    @property
    def is_dir(self):
        return self._is_dir

    @property
    def stat(self):
        return self._stat_result

    def __repr__(self):
        tag = ""
        if self._is_symlink:
            tag += "l"
        if self._is_dir:
            tag += "d"
        if tag:
            return "<WalkDirEntry %r (%s)>" % (self._name, tag)
        return "<WalkDirEntry %r>" % (self._name,)

    @classmethod
    def from_direntry(cls_, entry):
        w = cls_(entry.name)
        w._path = entry.path
        try:
            w._is_dir = entry.is_dir(follow_symlinks=True)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = entry.is_symlink()
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        # Do not supress errors here and (consistently) follow symlinks
        w._stat_result = entry.stat(follow_symlinks=True)
        return w

    @classmethod
    def from_path_name(cls_, path, name):
        w = cls_(name)
        w._path = os.path.join(path, name)
        try:
            w._is_dir = os.path.isdir(w._path)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = os.path.islink(w._path)
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        w._stat_result = os.stat(w._path)
        return w

    @staticmethod
    def sort_key(entry):
        return entry._fsname


def walk(root, follow_symlinks=False):
    """A heyvily customized :func:`os.walk` alike that differs from the
    original:

    - optimized for use in :command:`treesum`
    - most errors are not suppressed
    - the `root` is never part of the returned data
    - the returned directory in "top" is not a string form but a list of
      individual path segments
    - there is only one yielded list

      * contains :class:`WalkDirEntry`
      * sorted by its fsname

      The caller can easily get the old dirs and nondirs by filtering
      the yielded list using "entry.is_dir".

    - recurse into sub-directories first ("topdown=False")
    - sort consistently all yielded lists by the filesystem encoding

    .. note:: The implementation is based on Python 3.11 and needs a
              functional :func:`os.scandir` or :func:`scandir.scandir`
              implementation. It intentionally follows the logic in
              Python 3.11 while it could be simplified because we are not
              implementing some of the original flags (e.g. like
              `topdown`).

    """
    normed_root = os.path.normpath(root)
    yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks)


if scandir:

    def _walk(root, top, follow_symlinks):
        """:func:`walk` helper.

        Implemented using :func:`os.scandir`.

        """
        if top:
            path = os.path.join(root, *top)
        else:
            path = root

        fsobjects, walk_dirs = [], []

        scandir_cm = scandir(path)
        if not hasattr(scandir_cm, "close"):
            scandir_cm = nullcontext(scandir_cm)
        with scandir_cm as scandir_it:
            while True:
                try:
                    entry = WalkDirEntry.from_direntry(next(scandir_it))
                except StopIteration:
                    break
                fsobjects.append(entry)
                #
                # Always bottom-up: recurse into sub-directories, but exclude
                # symlinks to directories if follow_symlinks is False
                #
                if entry.is_dir:
                    if follow_symlinks:
                        walk_into = True
                    else:
                        walk_into = not entry.is_symlink
                    if walk_into:
                        walk_dirs.append(entry)

        # Sort by low-level filesystem encoding
        walk_dirs.sort(key=WalkDirEntry.sort_key)
        fsobjects.sort(key=WalkDirEntry.sort_key)

        # Recurse into sub-directories
        for wd in walk_dirs:
            yield from _walk(root, top + (wd.name,), follow_symlinks)
        # Yield after recursion if going bottom up
        yield top, fsobjects


    class ScanDir(object):    # noqa: E303   too many blank lines

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_scandir_it", )

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._scandir_it = os.scandir(path)

        def __iter__(self):
            return self

        def __next__(self):
            return WalkDirEntry.from_direntry(next(self._scandir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            if hasattr(self._scandir_it, "close"):
                self._scandir_it.close()

else:

    def _walk(root, top, follow_symlinks):
        """:func:`walk` helper.

        Implemented using :func:`os.listdir`.

        """
        if top:
            path = os.path.join(root, *top)
        else:
            path = root

        fsobjects, walk_dirs = [], []

        names = os.listdir(path)
        for name in names:
            entry = WalkDirEntry.from_path_name(path, name)
            fsobjects.append(entry)
            #
            # Always bottom-up: recurse into sub-directories, but exclude
            # symlinks to directories if follow_symlinks is False
            #
            if entry.is_dir:
                if follow_symlinks:
                    walk_into = True
                else:
                    walk_into = not entry.is_symlink
                if walk_into:
                    walk_dirs.append(entry)

        # Sort by low-level filesystem encoding
        walk_dirs.sort(key=WalkDirEntry.sort_key)
        fsobjects.sort(key=WalkDirEntry.sort_key)

        # Recurse into sub-directories
        for wd in walk_dirs:
            yield from _walk(root, top + (wd.name,), follow_symlinks)
        # Yield after recursion if going bottom up
        yield top, fsobjects


    class ScanDir(object):    # noqa: E303   too many blank lines

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_listdir_it", "_path")

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._listdir_it = iter(os.listdir(path))
            self._path = path

        def __iter__(self):
            return self

        def __next__(self):
            return WalkDirEntry.from_path_name(self._path,
                                               next(self._listdir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            pass