view cutils/util/walk.py @ 199:b2aba84ca426

Implement a "close()" method for cutils.util.walk.ScanDir. Also check for closed iterators when __next__ is called.
author Franz Glasner <fzglas.hg@dom66.de>
date Fri, 17 Jan 2025 15:24:08 +0100
parents c1e875ba4bdc
children 58d93453c307
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2020-2025 Franz Glasner
# :License:   BSD-3-Clause
# :-
r"""Utility sub-module to implement a heavily customized :func:`os.walk`.

"""

from __future__ import print_function, absolute_import


__all__ = ["ScanDir", "getfsencoding"]


import os
try:
    from os import scandir
except ImportError:
    try:
        from scandir import scandir
    except ImportError:
        scandir = None
import sys

from . import PY2


_FSENCODING = sys.getfilesystemencoding()


class WalkDirEntry(object):

    """A :class:`os.DirEntry` alike to be used in :func:`walk` and for
    its results.

    """

    __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink",
                 "_is_dir", "_stat_result")

    def __init__(self, name):
        self._name = name
        if PY2:
            assert isinstance(name, bytes)
            self._fsname = name
        else:
            self._name = name
            self._fsname = os.fsencode(name)
        self._path = None
        self._fspath = None
        self._is_symlink = self._is_dir = self._stat_result = None

    @property
    def name(self):
        """The native name"""
        return self._name

    @property
    def fsname(self):
        """The name as bytes"""
        return self._fsname

    @property
    def path(self):
        """Always native"""
        return self._path

    @property
    def fspath(self):
        """Always bytes"""
        if self._path is not None:
            if self._fspath is None:
                if PY2:
                    assert isinstance(self._path, bytes)
                    self._fspath = self._path
                else:
                    self._fspath = os.fsencode(self._path)
        return self._fspath

    @property
    def is_symlink(self):
        return self._is_symlink

    @property
    def is_dir(self):
        return self._is_dir

    @property
    def stat(self):
        return self._stat_result

    def __repr__(self):
        tag = ""
        if self._is_symlink:
            tag += "l"
        if self._is_dir:
            tag += "d"
        if tag:
            return "<WalkDirEntry %r (%s)>" % (self._name, tag)
        return "<WalkDirEntry %r>" % (self._name,)

    @classmethod
    def from_direntry(cls_, entry):
        w = cls_(entry.name)
        w._path = entry.path
        try:
            w._is_dir = entry.is_dir(follow_symlinks=True)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = entry.is_symlink()
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        # Do not supress errors here and (consistently) follow symlinks
        w._stat_result = entry.stat(follow_symlinks=True)
        return w

    @classmethod
    def from_path_name(cls_, path, name):
        w = cls_(name)
        w._path = os.path.join(path, name)
        try:
            w._is_dir = os.path.isdir(w._path)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = os.path.islink(w._path)
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        w._stat_result = os.stat(w._path)
        return w

    @staticmethod
    def sort_key(entry):
        return entry._fsname


if scandir:

    class ScanDir(object):

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_scandir_it", )

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._scandir_it = scandir(path)

        def __iter__(self):
            return self

        def __next__(self):
            if self._scandir_it is None:
                raise StopIteration("closed")
            return WalkDirEntry.from_direntry(next(self._scandir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            self.close()

        def close(self):
            if self._scandir_it is not None:
                if hasattr(self._scandir_it, "close"):
                    self._scandir_it.close()
                self._scandir_it = None

else:

    class ScanDir(object):

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_listdir_it", "_path")

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._listdir_it = iter(os.listdir(path))
            self._path = path

        def __iter__(self):
            return self

        def __next__(self):
            if self._listdir_it is None:
                raise StopIteration("closed")
            return WalkDirEntry.from_path_name(self._path,
                                               next(self._listdir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            pass

        def close(self):
            pass


def getfsencoding():
    """Return the stored _FSENCODING of this module"""
    return _FSENCODING