Mercurial > hgrepos > Python > apps > py-cutils
changeset 201:58d93453c307
Much more encoding-related methods for DirWalkEntry and some unittests
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Tue, 21 Jan 2025 14:30:06 +0100 |
| parents | 22f92bf3572c |
| children | b9b38584919b |
| files | cutils/util/walk.py tests/test_walk.py |
| diffstat | 2 files changed, 419 insertions(+), 35 deletions(-) [+] |
line wrap: on
line diff
--- a/cutils/util/walk.py Fri Jan 17 20:12:58 2025 +0100 +++ b/cutils/util/walk.py Tue Jan 21 14:30:06 2025 +0100 @@ -10,7 +10,7 @@ from __future__ import print_function, absolute_import -__all__ = ["ScanDir", "getfsencoding"] +__all__ = ["WalkDirEntry", "ScanDir", "getfsencoding"] import os @@ -26,9 +26,25 @@ from . import PY2 +_notset = object() + + _FSENCODING = sys.getfilesystemencoding() +if PY2: + + def _unix_path(s): + if isinstance(s, bytes): + return s.replace(b"\\", b"/") + return s.replace(u"\\", u"/") + +else: + + def _unix_path(s): + return s.replace("\\", "/") + + class WalkDirEntry(object): """A :class:`os.DirEntry` alike to be used in :func:`walk` and for @@ -36,47 +52,188 @@ """ - __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink", - "_is_dir", "_stat_result") + __slots__ = ("_name", "_path", # encoded as given in the ctor + "_is_symlink", "_is_dir", "_stat_result", + "_alt_fsname", "_alt_u8name") - def __init__(self, name): - self._name = name - if PY2: - assert isinstance(name, bytes) - self._fsname = name - else: - self._name = name - self._fsname = os.fsencode(name) - self._path = None - self._fspath = None + def __init__(self, name, path): + self._name = name # the name as given in the constructor + """The name exactly as given in the ctor""" + self._path = _unix_path(path) + """The path as given in the ctor -- but normalized to have slashes""" self._is_symlink = self._is_dir = self._stat_result = None + self._alt_fsname = self._alt_u8name = _notset @property def name(self): - """The native name""" + """The original name exactly as given in the ctor""" return self._name @property - def fsname(self): - """The name as bytes""" - return self._fsname - - @property def path(self): - """Always native""" + """The original path exactly as given in the ctor.""" return self._path @property + def fsname(self): + """The name as bytes for the filesystem. + + :rtype: bytes or None + + """ + if PY2: + if isinstance(self._name, bytes): + return self._name + try: + return self._name.encode(_FSENCODING, "strict") + except UnicodeError: + return None + else: + return os.fsencode(self._name) + + @property + def alt_fsname(self): + """Alternative and "escaped" filesystem name -- always bytes. + + :rtype: bytes + + """ + if self._alt_fsname is _notset: + if PY2: + if isinstance(self._name, bytes): + self._alt_fsname = self._name + else: + self._alt_fsname = self._name.encode( + _FSENCODING, "backslashreplace") + else: + self._alt_fsname = os.fsencode(self._name) + return self._alt_fsname + + @property def fspath(self): - """Always bytes""" - if self._path is not None: - if self._fspath is None: - if PY2: - assert isinstance(self._path, bytes) - self._fspath = self._path + """Always bytes. + + :rtype: bytes or None + + """ + if PY2: + if isinstance(self._path, bytes): + return self._path + try: + return self._path.encode(_FSENCODING, "strict") + except UnicodeError: + return None + else: + return os.fsencode(self._path) + + @property + def alt_fspath(self): + """Alternative and "escaped" filesystem path -- always bytes. + + :rtype: bytes + + """ + if PY2: + if isinstance(self._path, bytes): + return self._path + return self._path.encode(_FSENCODING, "backslashreplace") + else: + return os.fsencode(self._path) + + @property + def uname(self): + """Always "real", strictly encoded Unicode or `None` if this is not + possible. + + :rtype: text or None + + """ + if PY2: + if isinstance(self._name, bytes): + try: + return self._name.decode(_FSENCODING, "strict") + except UnicodeError: + return None + else: + return self._name + else: + try: + self._name.encode("utf-8", "strict") + except UnicodeError: + return None + return self._name + + @property + def upath(self): + """Always "real", strictly encoded Unicode or `None` if this is not + possible. + + :rtype: text or None + + """ + if PY2: + if isinstance(self._path, bytes): + try: + return self._path.decode(_FSENCODING, "strict") + except UnicodeError: + return None + else: + return self._path + else: + try: + self._path.encode("utf-8", "strict") + except UnicodeError: + return None + return self._path + + @property + def u8name(self): + """`.uname` as UTF-8 or `None` (as strict as `uname`)""" + n = self.uname + return n if n is None else n.encode("utf-8", "strict") + + @property + def u8path(self): + """`.upath` as UTF-8 or `None` (as strict as `upath`""" + p = self.upath + return p if p is None else p.encode("utf-8", "strict") + + @property + def alt_u8name(self): + if self._alt_u8name is _notset: + if PY2: + if isinstance(self._name, bytes): + try: + self._alt_u8name = ( + self._name + .decode(_FSENCODING, "strict") + .encode("utf-8", "strict")) + except UnicodeError: + self._alt_u8name = ( + self.surrogate_decode(self._name) + .encode("ascii", "backslashreplace")) else: - self._fspath = os.fsencode(self._path) - return self._fspath + self._alt_u8name = self._name.encode( + "ascii", "backslashreplace") + else: + self._alt_u8name = self._name.encode( + "utf-8", "backslashreplace") + return self._alt_u8name + + @property + def alt_u8path(self): + if PY2: + if isinstance(self._path, bytes): + try: + return (self._path.decode(_FSENCODING, "strict") + .encode("utf-8", "strict")) + except UnicodeError: + return (self.surrogate_decode(self._path) + .encode("ascii", "backslashreplace")) + else: + return self._path.encode("ascii", "backslashreplace") + else: + return self._path.encode("utf-8", "backslashreplace") @property def is_symlink(self): @@ -102,8 +259,7 @@ @classmethod def from_direntry(cls_, entry): - w = cls_(entry.name) - w._path = entry.path + w = cls_(entry.name, entry.path) try: w._is_dir = entry.is_dir(follow_symlinks=True) except OSError: @@ -125,9 +281,9 @@ return w @classmethod - def from_path_name(cls_, path, name): - w = cls_(name) - w._path = os.path.join(path, name) + def from_path_name(cls_, path, name, _do_stat=True): + """`_nostat` is to be used only for testing purposes""" + w = cls_(name, os.path.join(path, name)) try: w._is_dir = os.path.isdir(w._path) except OSError: @@ -144,12 +300,40 @@ # is not a symbolic link, same behaviour than os.path.islink(). # w._is_symlink = False - w._stat_result = os.stat(w._path) + if _do_stat: + w._stat_result = os.stat(w._path) + return w + + @classmethod + def from_readlink(cls_, path): + w = cls_(os.path.basename(path), path) return w @staticmethod def sort_key(entry): - return entry._fsname + return entry.alt_fsname # because it should never throw + + @staticmethod + def alt_sort_key(entry): + return entry.alt_u8name # because it should never throw + + if PY2: + + @staticmethod + def surrogate_decode(what): + """Decode the bytes object `what` using surrogates from :pep:`383` + for all non-ASCII octets. + + """ + uwhat = [] + assert isinstance(what, bytes) + for ch in what: + chcode = ord(ch) + if chcode <= 0x7f: + uwhat.append(unichr(chcode)) # noqa: F821 unichr + else: + uwhat.append(unichr(0xDC00 + chcode)) # noqa: F821 unichr + return u"".join(uwhat) if scandir:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_walk.py Tue Jan 21 14:30:06 2025 +0100 @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +r"""Unit tests + +""" + +from __future__ import absolute_import, print_function + +import sys +import unittest + +import _test_setup # noqa: F401 imported but unused + +from cutils.util import walk + + +class SurrogateEscapeTests(unittest.TestCase): + + @unittest.skipIf(sys.version_info[0] >= 3, "Skip on Python3") + def test_simple_escape(self): + w = b"\xc4" + + d = walk.WalkDirEntry.surrogate_decode(w) + self.assertEqual(u"\udcc4", d) + + @unittest.skipIf(sys.version_info[0] >= 3, "Skip on Python3") + def test_no_escape_min(self): + w = b"\x00" + + d = walk.WalkDirEntry.surrogate_decode(w) + self.assertEqual(u"\x00", d) + + @unittest.skipIf(sys.version_info[0] >= 3, "Skip on Python3") + def test_no_escape_max(self): + w = b"\x7f" + + d = walk.WalkDirEntry.surrogate_decode(w) + self.assertEqual(u"\x7f", d) + + @unittest.skipIf(sys.version_info[0] >= 3, "Skip on Python3") + def test_escape_min(self): + w = b"\x80" + + d = walk.WalkDirEntry.surrogate_decode(w) + self.assertEqual(u"\udc80", d) + + @unittest.skipIf(sys.version_info[0] >= 3, "Skip on Python3") + def test_escape_max(self): + w = b"\xff" + + d = walk.WalkDirEntry.surrogate_decode(w) + self.assertEqual(u"\udcff", d) + + @unittest.skipIf(sys.version_info[0] >= 3, "Skip on Python3") + def test_complex(self): + w = b"abc\xc4d\x80\x81\xffefg" + d = walk.WalkDirEntry.surrogate_decode(w) + self.assertEqual(u"abc\udcc4d\udc80\udc81\udcffefg", d) + + +class WalkDirEntryTests(unittest.TestCase): + + def setUp(self): + self._orig_fsencoding = walk._FSENCODING + walk._FSENCODING = "ascii" + + def tearDown(self): + walk._FSENCODING = self._orig_fsencoding + + def test_ascii(self): + entry = walk.WalkDirEntry.from_path_name("tests", "_test_setup.py") + self.assertEqual("_test_setup.py", entry.name) + self.assertEqual("tests/_test_setup.py", entry.path) + self.assertEqual(u"_test_setup.py", entry.uname) + self.assertEqual(u"tests/_test_setup.py", entry.upath) + self.assertEqual(b"_test_setup.py", entry.u8name) + self.assertEqual(b"tests/_test_setup.py", entry.u8path) + self.assertEqual(b"_test_setup.py", entry.alt_u8name) + self.assertEqual(b"tests/_test_setup.py", entry.alt_u8path) + self.assertEqual(b"_test_setup.py", entry.alt_fsname) + self.assertEqual(b"tests/_test_setup.py", entry.alt_fspath) + + @unittest.skipIf(sys.version_info[0] < 3, "Skip on Python2") + def test_with_surrogate_escaped_name(self): + # instantiate with a surrogate escaped path from PEP 383 + entry = walk.WalkDirEntry.from_path_name( + "tests", "test-\udcc4", _do_stat=False) + self.assertEqual("test-\udcc4", entry.name) + self.assertEqual("tests/test-\udcc4", entry.path) + self.assertEqual(b"test-\xc4", entry.fsname) + self.assertEqual(b"tests/test-\xc4", entry.fspath) + self.assertEqual(b"test-\xc4", entry.alt_fsname) + self.assertEqual(b"tests/test-\xc4", entry.alt_fspath) + + self.assertIsNone(entry.uname) + self.assertIsNone(entry.upath) + self.assertIsNone(entry.u8name) + self.assertIsNone(entry.u8path) + + self.assertEqual(b"test-\\udcc4", entry.alt_u8name) + self.assertEqual(b"tests/test-\\udcc4", entry.alt_u8path) + + @unittest.skipIf(sys.version_info[0] < 3, "Skip on Python2") + def test_with_surrogate_escaped_path(self): + # instantiate with a surrogate escaped path from PEP 383 + entry = walk.WalkDirEntry.from_path_name( + "tests\udcc5", "test", _do_stat=False) + self.assertEqual("test", entry.name) + self.assertEqual("tests\udcc5/test", entry.path) + self.assertEqual(b"test", entry.fsname) + self.assertEqual(b"tests\xc5/test", entry.fspath) + self.assertEqual(b"test", entry.alt_fsname) + self.assertEqual(b"tests\xc5/test", entry.alt_fspath) + + self.assertEqual("test", entry.uname) + self.assertIsNone(entry.upath) + self.assertEqual(b"test", entry.u8name) + self.assertIsNone(entry.u8path) + + self.assertEqual(b"test", entry.alt_u8name) + self.assertEqual(b"tests\\udcc5/test", entry.alt_u8path) + + @unittest.skipIf(sys.version_info[0] > 2, "Skip on Python3") + def test_py2_with_non_fsdecodable_name(self): + entry = walk.WalkDirEntry.from_path_name( + b"tests", b"test-\xc4", _do_stat=False) + self.assertEqual(b"test-\xc4", entry.name) + self.assertEqual(b"tests/test-\xc4", entry.path) + self.assertEqual(b"test-\xc4", entry.fsname) + self.assertEqual(b"tests/test-\xc4", entry.fspath) + self.assertEqual(b"test-\xc4", entry.alt_fsname) + self.assertEqual(b"tests/test-\xc4", entry.alt_fspath) + + self.assertIsNone(entry.uname) + self.assertIsNone(entry.upath) + self.assertIsNone(entry.u8name) + self.assertIsNone(entry.u8path) + + self.assertEqual(b"test-\\udcc4", entry.alt_u8name) + self.assertEqual(b"tests/test-\\udcc4", entry.alt_u8path) + + @unittest.skipIf(sys.version_info[0] > 2, "Skip on Python3") + def test_py2_with_non_fsdecodable_path(self): + entry = walk.WalkDirEntry.from_path_name( + b"tests\xc5", b"test", _do_stat=False) + self.assertEqual(b"test", entry.name) + self.assertEqual(b"tests\xc5/test", entry.path) + self.assertEqual(b"test", entry.fsname) + self.assertEqual(b"tests\xc5/test", entry.fspath) + self.assertEqual(b"test", entry.alt_fsname) + self.assertEqual(b"tests\xc5/test", entry.alt_fspath) + + self.assertEqual(b"test", entry.uname) + self.assertIsNone(entry.upath) + self.assertEqual(b"test", entry.u8name) + self.assertIsNone(entry.u8path) + + self.assertEqual(b"test", entry.alt_u8name) + self.assertEqual(b"tests\\udcc5/test", entry.alt_u8path) + + @unittest.skipIf(sys.version_info[0] > 2, "Skip on Python3") + def test_py2_with_non_fsencodable_unicode_name(self): + entry = walk.WalkDirEntry.from_path_name( + u"tests", u"test-\xc4", _do_stat=False) + self.assertEqual(u"test-\xc4", entry.name) + self.assertEqual(u"tests/test-\xc4", entry.path) + self.assertIsNone(entry.fsname) + self.assertIsNone(entry.fspath) + self.assertEqual(b"test-\\xc4", entry.alt_fsname) + self.assertEqual(b"tests/test-\\xc4", entry.alt_fspath) + + self.assertEqual(u"test-\xc4", entry.uname) + self.assertEqual(u"tests/test-\xc4", entry.upath) + self.assertEqual(b"test-\xc3\x84", entry.u8name) + self.assertEqual(b"tests/test-\xc3\x84", entry.u8path) + + self.assertEqual(b"test-\\xc4", entry.alt_u8name) + self.assertEqual(b"tests/test-\\xc4", entry.alt_u8path) + + @unittest.skipIf(sys.version_info[0] > 2, "Skip on Python3") + def test_py2_with_non_fsencodable_unicode_path(self): + entry = walk.WalkDirEntry.from_path_name( + u"tests\xc5", u"test", _do_stat=False) + self.assertEqual(u"test", entry.name) + self.assertEqual(u"tests\xc5/test", entry.path) + self.assertEqual(b"test", entry.fsname) + self.assertIsNone(entry.fspath) + self.assertEqual(b"test", entry.alt_fsname) + self.assertEqual(b"tests\\xc5/test", entry.alt_fspath) + + self.assertEqual(u"test", entry.uname) + self.assertEqual(u"tests\xc5/test", entry.upath) + self.assertEqual(b"test", entry.u8name) + self.assertEqual(b"tests\xc3\x85/test", entry.u8path) + + self.assertEqual(b"test", entry.alt_u8name) + self.assertEqual(b"tests\\xc5/test", entry.alt_u8path) + + +if __name__ == "__main__": + sys.exit(unittest.main())
