view cutils/util/walk.py @ 282:d507ae4943d5

Reordering of methods: make it a little bit more consistent
author Franz Glasner <fzglas.hg@dom66.de>
date Sun, 23 Feb 2025 12:40:28 +0100
parents 16507317e834
children 99b78fa04bc1
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2020-2025 Franz Glasner
# :License:   BSD-3-Clause
# :-
r"""Utility sub-module to implement a heavily customized :func:`os.walk`.

"""

from __future__ import print_function, absolute_import


__all__ = ["WalkDirEntry", "ScanDir", "getfsencoding"]


import os
try:
    from os import scandir
except ImportError:
    try:
        from scandir import scandir
    except ImportError:
        scandir = None
import stat
import sys

from . import PY2


_notset = object()


_FSENCODING = sys.getfilesystemencoding()


if PY2:

    def _unix_path(s):
        if isinstance(s, bytes):
            return s.replace(b"\\", b"/")
        return s.replace(u"\\", u"/")

else:

    def _unix_path(s):
        return s.replace("\\", "/")


class WalkDirEntry(object):

    """A :class:`os.DirEntry` alike to be used in :func:`walk` and for
    its results.

    """

    __slots__ = ("_name", "_path",     # encoded as given in the ctor
                 "_is_symlink", "_is_reg", "_is_dir", "_stat_result",
                 "_stat_errno", "_stat_errstr",
                 "_alt_fsname", "_alt_u8name")

    def __init__(self, name, path):
        self._name = name    # the name as given in the constructor
        """The name exactly as given in the ctor"""
        self._path = _unix_path(path)
        """The path as given in the ctor -- but normalized to have slashes"""
        self._is_symlink = self._is_reg = self._is_dir = self._stat_result = \
            self._stat_errno = self._stat_errstr = None
        self._alt_fsname = self._alt_u8name = _notset

    @property
    def name(self):
        """The original name exactly as given in the ctor"""
        return self._name

    @property
    def path(self):
        """The original path exactly as given in the ctor."""
        return self._path

    @property
    def fsname(self):
        """The name as bytes for the filesystem.

        Also do not allow CR of LF in the name.

        :rtype: bytes or None

        """
        if PY2:
            if isinstance(self._name, bytes):
                s = self._name
            try:
                s = self._name.encode(_FSENCODING, "strict")
            except UnicodeError:
                return None
        else:
            s = os.fsencode(self._name)
        if (b'\n' in s) or (b'\r' in s) or (b'\\' in s):
            return None
        return s

    @property
    def fspath(self):
        """Always bytes.

        Also do not allow CR of LF in the path.

        :rtype: bytes or None

        """
        if PY2:
            if isinstance(self._path, bytes):
                p = self._path
            try:
                p = self._path.encode(_FSENCODING, "strict")
            except UnicodeError:
                return None
        else:
            p = os.fsencode(self._path)
        if (b'\n' in p) or (b'\r' in p) or (b'\\' in p):
            return None
        return p

    @property
    def alt_fsname(self):
        """Alternative and "escaped" filesystem name -- always bytes.

        :rtype: bytes

        """
        if self._alt_fsname is _notset:
            self._alt_fsname = WalkDirEntry.alt_fs(self._name)
        return self._alt_fsname

    @property
    def alt_fspath(self):
        """Alternative and "escaped" filesystem path -- always bytes.

        :rtype: bytes

        """
        return WalkDirEntry.alt_fs(self._path)

    @staticmethod
    def alt_fs(what):
        if PY2:
            if isinstance(what, bytes):
                s = what
            else:
                #
                # Prevent double encoding ...
                # ... and hope that the current FS encoding is compatible
                #     with it
                #
                s = what.replace(u'\\', u"\\x5c")
                s = s.encode(_FSENCODING, "backslashreplace")
                return s.replace(b'\n', b"\\x0a").replace(b'\r', b"\\x0d")
        else:
            s = os.fsencode(what)
        return (s.replace(b'\\', b"\\x5c")
                .replace(b'\n', b"\\x0a")
                .replace(b'\r', b"\\x0d"))

    @property
    def uname(self):
        """Always "real", strictly encoded Unicode or `None` if this is not
        possible.

        :rtype: text or None

        """
        if PY2:
            if isinstance(self._name, bytes):
                try:
                    return self._name.decode(_FSENCODING, "strict")
                except UnicodeError:
                    return None
            else:
                return self._name
        else:
            try:
                self._name.encode("utf-8", "strict")
            except UnicodeError:
                return None
            return self._name

    @property
    def upath(self):
        """Always "real", strictly encoded Unicode or `None` if this is not
        possible.

        :rtype: text or None

        """
        if PY2:
            if isinstance(self._path, bytes):
                try:
                    return self._path.decode(_FSENCODING, "strict")
                except UnicodeError:
                    return None
            else:
                return self._path
        else:
            try:
                self._path.encode("utf-8", "strict")
            except UnicodeError:
                return None
            return self._path

    @property
    def u8name(self):
        """`.uname` as UTF-8 or `None` (as strict as `uname`).

        Also do not allow CR of LF in the name.

        """
        n = self.uname
        if n is None:
            return None
        if (u'\n' in n) or (u'\r' in n) or (u'\\' in n):
            return None
        return n.encode("utf-8", "strict")

    @property
    def u8path(self):
        """`.upath` as UTF-8 or `None` (as strict as `upath`.

        Also do not allow CR or LF in the path.

        """
        p = self.upath
        if p is None:
            return None
        if (u'\n' in p) or (u'\r' in p) or (u'\\' in p):
            return None
        return p.encode("utf-8", "strict")

    @property
    def alt_u8name(self):
        if self._alt_u8name is _notset:
            self._alt_u8name = WalkDirEntry.alt_u8(self._name)
        return self._alt_u8name

    @property
    def alt_u8path(self):
        return WalkDirEntry.alt_u8(self._path)

    @staticmethod
    def alt_u8(what):
        if PY2:
            if isinstance(what, bytes):
                try:
                    s = (what.decode(_FSENCODING, "strict")
                         .encode("utf-8", "strict"))
                except UnicodeError:
                    s = (WalkDirEntry.surrogate_decode(what)
                         .encode("ascii", "backslashreplace"))
            else:
                s = what.encode("ascii", "backslashreplace")
        else:
            s = what.encode("utf-8", "backslashreplace")
        return (s.replace(b'\\', b"\\x5c")
                .replace(b'\n', b"\\x0a")
                .replace(b'\r', b"\\x0d"))

    @property
    def is_symlink(self):
        return self._is_symlink

    @property
    def is_reg(self):
        return self._is_reg

    @property
    def is_dir(self):
        return self._is_dir

    @property
    def is_chr(self):
        return (stat.S_ISCHR(self._stat_result.st_mode)
                if self._stat_result is not None
                else False)

    @property
    def is_blk(self):
        return (stat.S_ISBLK(self._stat_result.st_mode)
                if self._stat_result is not None
                else False)

    @property
    def is_fifo(self):
        return (stat.S_ISFIFO(self._stat_result.st_mode)
                if self._stat_result is not None
                else False)

    @property
    def is_socket(self):
        return (stat.S_ISSOCK(self._stat_result.st_mode)
                if self._stat_result is not None
                else False)

    @property
    def is_door(self):
        test = getattr(stat, "S_ISDOOR", None)
        return (test(self._stat_result.st_mode)
                if test and (self._stat_result is not None)
                else False)

    @property
    def is_eventport(self):
        test = getattr(stat, "S_ISPORT", None)
        return (test(self._stat_result.st_mode)
                if test and (self._stat_result is not None)
                else False)

    @property
    def is_whiteout(self):
        test = getattr(stat, "S_ISWHT", None)
        return (test(self._stat_result.st_mode)
                if test and (self._stat_result is not None)
                else False)

    @property
    def is_special(self):
        """Anything besides a regular file and a directory"""
        if self._stat_result is None:
            return False
        return not (self.is_reg or self.is_dir)

    @property
    def stat(self):
        return self._stat_result

    @property
    def stat_errno(self):
        return self._stat_errno

    @property
    def stat_errstr(self):
        return self._stat_errstr

    def __repr__(self):
        tag = ""
        if self._is_symlink:
            tag += "l"
        if self._is_dir:
            tag += "d"
        if tag:
            return "<WalkDirEntry %r (%s)>" % (self._name, tag)
        return "<WalkDirEntry %r>" % (self._name,)

    @classmethod
    def from_direntry(cls_, entry):
        w = cls_(entry.name, entry.path)
        try:
            w._is_dir = entry.is_dir(follow_symlinks=True)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = entry.is_symlink()
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        # Consistently follow symlinks
        try:
            w._stat_result = entry.stat(follow_symlinks=True)
        except OSError as e:
            w._stat_result = None
            w._stat_errno = e.errno
            w._stat_errstr = e.strerror
            w._is_reg = False
        else:
            w._is_reg = stat.S_ISREG(w._stat_result.st_mode)
        return w

    @classmethod
    def from_path_name(cls_, path, name, _do_stat=True):
        """`_do_stat` is to be used only for testing purposes"""
        w = cls_(name, os.path.join(path, name))
        try:
            w._is_dir = os.path.isdir(w._path)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = os.path.islink(w._path)
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        if _do_stat:
            try:
                w._stat_result = os.stat(w._path)
            except OSError as e:
                w._stat_result = None
                w._stat_errno = e.errno
                w._stat_errstr = e.strerror
                w._is_reg = False
            else:
                w._is_reg = stat.S_ISREG(w._stat_result.st_mode)
        return w

    @classmethod
    def from_readlink(cls_, path):
        w = cls_(os.path.basename(path), path)
        return w

    @staticmethod
    def sort_key_fs(entry):
        return entry.alt_fsname     # because it should never throw

    @staticmethod
    def sort_key_u8(entry):
        return entry.alt_u8name     # because it should never throw

    if PY2:

        @staticmethod
        def surrogate_decode(what):
            """Decode the bytes object `what` using surrogates from :pep:`383`
            for all non-ASCII octets.

            """
            uwhat = []
            assert isinstance(what, bytes)
            for ch in what:
                chcode = ord(ch)
                if chcode <= 0x7f:
                    uwhat.append(unichr(chcode))   # noqa: F821 unichr
                else:
                    uwhat.append(unichr(0xDC00 + chcode))  # noqa: F821 unichr
            return u"".join(uwhat)


if scandir:

    class ScanDir(object):

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_scandir_it", )

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._scandir_it = scandir(path)

        def __iter__(self):
            return self

        def __next__(self):
            if self._scandir_it is None:
                raise StopIteration("closed")
            return WalkDirEntry.from_direntry(next(self._scandir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            self.close()

        def close(self):
            if self._scandir_it is not None:
                if hasattr(self._scandir_it, "close"):
                    self._scandir_it.close()
                self._scandir_it = None

else:

    class ScanDir(object):

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_listdir_it", "_path")

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._listdir_it = iter(os.listdir(path))
            self._path = path

        def __iter__(self):
            return self

        def __next__(self):
            if self._listdir_it is None:
                raise StopIteration("closed")
            return WalkDirEntry.from_path_name(self._path,
                                               next(self._listdir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            pass

        def close(self):
            pass


def getfsencoding():
    """Return the stored _FSENCODING of this module"""
    return _FSENCODING