view cutils/util/walk.py @ 203:3a85f7bbe0b1

Common static method for some alternative encodings
author Franz Glasner <fzglas.hg@dom66.de>
date Tue, 21 Jan 2025 18:57:02 +0100
parents 58d93453c307
children ca9d5a0dc9bb
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2020-2025 Franz Glasner
# :License:   BSD-3-Clause
# :-
r"""Utility sub-module to implement a heavily customized :func:`os.walk`.

"""

from __future__ import print_function, absolute_import


__all__ = ["WalkDirEntry", "ScanDir", "getfsencoding"]


import os
try:
    from os import scandir
except ImportError:
    try:
        from scandir import scandir
    except ImportError:
        scandir = None
import sys

from . import PY2


_notset = object()


_FSENCODING = sys.getfilesystemencoding()


if PY2:

    def _unix_path(s):
        if isinstance(s, bytes):
            return s.replace(b"\\", b"/")
        return s.replace(u"\\", u"/")

else:

    def _unix_path(s):
        return s.replace("\\", "/")


class WalkDirEntry(object):

    """A :class:`os.DirEntry` alike to be used in :func:`walk` and for
    its results.

    """

    __slots__ = ("_name", "_path",     # encoded as given in the ctor
                 "_is_symlink", "_is_dir", "_stat_result",
                 "_alt_fsname", "_alt_u8name")

    def __init__(self, name, path):
        self._name = name    # the name as given in the constructor
        """The name exactly as given in the ctor"""
        self._path = _unix_path(path)
        """The path as given in the ctor -- but normalized to have slashes"""
        self._is_symlink = self._is_dir = self._stat_result = None
        self._alt_fsname = self._alt_u8name = _notset

    @property
    def name(self):
        """The original name exactly as given in the ctor"""
        return self._name

    @property
    def path(self):
        """The original path exactly as given in the ctor."""
        return self._path

    @property
    def fsname(self):
        """The name as bytes for the filesystem.

        :rtype: bytes or None

        """
        if PY2:
            if isinstance(self._name, bytes):
                return self._name
            try:
                return self._name.encode(_FSENCODING, "strict")
            except UnicodeError:
                return None
        else:
            return os.fsencode(self._name)

    @property
    def alt_fsname(self):
        """Alternative and "escaped" filesystem name -- always bytes.

        :rtype: bytes

        """
        if self._alt_fsname is _notset:
            self._alt_fsname = WalkDirEntry.alt_fs(self._name)
        return self._alt_fsname

    @property
    def fspath(self):
        """Always bytes.

        :rtype: bytes or None

        """
        if PY2:
            if isinstance(self._path, bytes):
                return self._path
            try:
                return self._path.encode(_FSENCODING, "strict")
            except UnicodeError:
                return None
        else:
            return os.fsencode(self._path)

    @property
    def alt_fspath(self):
        """Alternative and "escaped" filesystem path -- always bytes.

        :rtype: bytes

        """
        return WalkDirEntry.alt_fs(self._path)

    @staticmethod
    def alt_fs(what):
        if PY2:
            if isinstance(what, bytes):
                return what
            return what.encode(_FSENCODING, "backslashreplace")
        else:
            return os.fsencode(what)

    @property
    def uname(self):
        """Always "real", strictly encoded Unicode or `None` if this is not
        possible.

        :rtype: text or None

        """
        if PY2:
            if isinstance(self._name, bytes):
                try:
                    return self._name.decode(_FSENCODING, "strict")
                except UnicodeError:
                    return None
            else:
                return self._name
        else:
            try:
                self._name.encode("utf-8", "strict")
            except UnicodeError:
                return None
            return self._name

    @property
    def upath(self):
        """Always "real", strictly encoded Unicode or `None` if this is not
        possible.

        :rtype: text or None

        """
        if PY2:
            if isinstance(self._path, bytes):
                try:
                    return self._path.decode(_FSENCODING, "strict")
                except UnicodeError:
                    return None
            else:
                return self._path
        else:
            try:
                self._path.encode("utf-8", "strict")
            except UnicodeError:
                return None
            return self._path

    @property
    def u8name(self):
        """`.uname` as UTF-8 or `None` (as strict as `uname`)"""
        n = self.uname
        return n if n is None else n.encode("utf-8", "strict")

    @property
    def u8path(self):
        """`.upath` as UTF-8 or `None` (as strict as `upath`"""
        p = self.upath
        return p if p is None else p.encode("utf-8", "strict")

    @property
    def alt_u8name(self):
        if self._alt_u8name is _notset:
            self._alt_u8name = WalkDirEntry.alt_u8(self._name)
        return self._alt_u8name

    @property
    def alt_u8path(self):
        return WalkDirEntry.alt_u8(self._path)

    @staticmethod
    def alt_u8(what):
        if PY2:
            if isinstance(what, bytes):
                try:
                    return (what.decode(_FSENCODING, "strict")
                            .encode("utf-8", "strict"))
                except UnicodeError:
                    return (WalkDirEntry.surrogate_decode(what)
                            .encode("ascii", "backslashreplace"))
            else:
                return what.encode("ascii", "backslashreplace")
        else:
            return what.encode("utf-8", "backslashreplace")

    @property
    def is_symlink(self):
        return self._is_symlink

    @property
    def is_dir(self):
        return self._is_dir

    @property
    def stat(self):
        return self._stat_result

    def __repr__(self):
        tag = ""
        if self._is_symlink:
            tag += "l"
        if self._is_dir:
            tag += "d"
        if tag:
            return "<WalkDirEntry %r (%s)>" % (self._name, tag)
        return "<WalkDirEntry %r>" % (self._name,)

    @classmethod
    def from_direntry(cls_, entry):
        w = cls_(entry.name, entry.path)
        try:
            w._is_dir = entry.is_dir(follow_symlinks=True)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = entry.is_symlink()
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        # Do not supress errors here and (consistently) follow symlinks
        w._stat_result = entry.stat(follow_symlinks=True)
        return w

    @classmethod
    def from_path_name(cls_, path, name, _do_stat=True):
        """`_nostat` is to be used only for testing purposes"""
        w = cls_(name, os.path.join(path, name))
        try:
            w._is_dir = os.path.isdir(w._path)
        except OSError:
            #
            # If is_dir() raises an OSError, consider that the entry
            # is not a directory, same behaviour than os.path.isdir().
            #
            w._is_dir = False
        try:
            w._is_symlink = os.path.islink(w._path)
        except OSError:
            #
            # If is_symlink() raises an OSError, consider that the entry
            # is not a symbolic link, same behaviour than os.path.islink().
            #
            w._is_symlink = False
        if _do_stat:
            w._stat_result = os.stat(w._path)
        return w

    @classmethod
    def from_readlink(cls_, path):
        w = cls_(os.path.basename(path), path)
        return w

    @staticmethod
    def sort_key(entry):
        return entry.alt_fsname     # because it should never throw

    @staticmethod
    def alt_sort_key(entry):
        return entry.alt_u8name     # because it should never throw

    if PY2:

        @staticmethod
        def surrogate_decode(what):
            """Decode the bytes object `what` using surrogates from :pep:`383`
            for all non-ASCII octets.

            """
            uwhat = []
            assert isinstance(what, bytes)
            for ch in what:
                chcode = ord(ch)
                if chcode <= 0x7f:
                    uwhat.append(unichr(chcode))   # noqa: F821 unichr
                else:
                    uwhat.append(unichr(0xDC00 + chcode))  # noqa: F821 unichr
            return u"".join(uwhat)


if scandir:

    class ScanDir(object):

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_scandir_it", )

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._scandir_it = scandir(path)

        def __iter__(self):
            return self

        def __next__(self):
            if self._scandir_it is None:
                raise StopIteration("closed")
            return WalkDirEntry.from_direntry(next(self._scandir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            self.close()

        def close(self):
            if self._scandir_it is not None:
                if hasattr(self._scandir_it, "close"):
                    self._scandir_it.close()
                self._scandir_it = None

else:

    class ScanDir(object):

        """An :func:`os.scandir` wrapper that is always an iterator and
        a context manager.

        """

        __slots__ = ("_listdir_it", "_path")

        def __init__(self, path):
            super(ScanDir, self).__init__()
            self._listdir_it = iter(os.listdir(path))
            self._path = path

        def __iter__(self):
            return self

        def __next__(self):
            if self._listdir_it is None:
                raise StopIteration("closed")
            return WalkDirEntry.from_path_name(self._path,
                                               next(self._listdir_it))

        if PY2:
            next = __next__

        def __enter__(self):
            return self

        def __exit__(self, *args, **kwds):
            pass

        def close(self):
            pass


def getfsencoding():
    """Return the stored _FSENCODING of this module"""
    return _FSENCODING