comparison cutils/util/walk.py @ 121:2dc26a2f3d1c

A heavily customized "os.walk()" alike to support the coming treeview implementation
author Franz Glasner <fzglas.hg@dom66.de>
date Wed, 01 Jan 2025 17:52:41 +0100
parents
children 4a0c3c9eead7
comparison
equal deleted inserted replaced
120:a548783381b6 121:2dc26a2f3d1c
1 # -*- coding: utf-8 -*-
2 # :-
3 # :Copyright: (c) 2020-2025 Franz Glasner
4 # :License: BSD-3-Clause
5 # :-
6 r"""Utility sub-module to implement a heavily customized :func:`os.walk`.
7
8 """
9
10 __all__ = ["walk"]
11
12
13 import os
14 try:
15 from os import scandir
16 except ImportError:
17 try:
18 from scandir import scandir
19 except ImportError:
20 scandir = None
21 from contextlib import closing
22
23 from .cm import nullcontext
24 from .constants import PY2
25
26
27 class WalkDirEntry(object):
28
29 """A :class:`os.DirEntry` alike to be used in :func:`walk` and for
30 its results.
31
32 """
33
34 __slots__ = ("_name", "_fsname", "_path", "_fspath", "_is_symlink",
35 "_is_dir", "_stat_result")
36
37 def __init__(self, name):
38 self._name = name
39 if PY2:
40 assert isinstance(name, bytes)
41 self._fsname = name
42 else:
43 self._fsname = os.fsencode(name)
44 self._path = None
45 self._fspath = None
46 self._is_symlink = self._is_dir = self._stat_result = None
47
48 @property
49 def name(self):
50 return self._name
51
52 @property
53 def fsname(self):
54 return self._fsname
55
56 @property
57 def path(self):
58 return self._path
59
60 @property
61 def fspath(self):
62 if self._path is not None:
63 if self._fspath is None:
64 if PY2:
65 assert isinstance(self._path, bytes)
66 self._fspath = self._path
67 else:
68 self._fspath = os.fsencode(self._path)
69 return self._fspath
70
71 @property
72 def is_symlink(self):
73 return self._is_symlink
74
75 @property
76 def is_dir(self):
77 return self._is_dir
78
79 @property
80 def stat(self):
81 return self._stat_result
82
83 def __repr__(self):
84 tag = ""
85 if self._is_symlink:
86 tag += "l"
87 if self._is_dir:
88 tag += "d"
89 if tag:
90 return "<WalkDirEntry %r (%s)>" % (self._name, tag)
91 return "<WalkDirEntry %r>" % (self._name,)
92
93 @classmethod
94 def from_direntry(cls_, entry):
95 w = cls_(entry.name)
96 w._path = entry.path
97 try:
98 w._is_dir = entry.is_dir(follow_symlinks=True)
99 except OSError:
100 #
101 # If is_dir() raises an OSError, consider that the entry
102 # is not a directory, same behaviour than os.path.isdir().
103 #
104 w._is_dir = False
105 try:
106 w._is_symlink = entry.is_symlink()
107 except OSError:
108 #
109 # If is_symlink() raises an OSError, consider that the entry
110 # is not a symbolic link, same behaviour than os.path.islink().
111 #
112 w._is_symlink = False
113 if not w._is_dir:
114 # Do not supress errors here and (consistently) follow symlinks
115 w._stat_result = entry.stat(follow_symlinks=True)
116 return w
117
118 @staticmethod
119 def sort_key(entry):
120 return entry._fsname
121
122
123 if scandir:
124
125 def walk(root, follow_symlinks=False):
126 """A heyvily customized :func:`os.walk` alike that differs from the
127 original:
128
129 - optimized for use in :command:`treesum`
130 - most errors are not suppressed
131 - the `root` is never part of the returned data
132 - the returned directory in "top" is not a string form but a list of
133 individual path segments
134 - all other yielded lists contain WalkDirEntry elements instead of
135 strings
136 - recurse into sub-directories first ("topdown=False")
137 - sort consistently all yielded lists by the filesystem encoding
138
139 .. note:: The implementation is based on Python 3.11 and needs a
140 functional :func:`os.scandir` or :func:`scandir.scandir`
141 implementation. It intentionally follows the logic in
142 Python 3.11 while it could be simplified because we are not
143 implementing some of the original flags (e.g. like
144 `topdown`).
145
146 """
147 normed_root = os.path.normpath(root)
148 yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks)
149
150
151 def _walk(root, top, follow_symlinks): # noqa: E303 too many empty lines
152 """:func:`walk` helper"""
153 if top:
154 path = os.path.join(root, *top)
155 else:
156 path = root
157
158 dirs, nondirs, walk_dirs = [], [], []
159
160 scandir_it = scandir(path)
161 if hasattr(scandir_it, "close"):
162 scandir_ctx = closing(scandir_it)
163 else:
164 scandir_ctx = nullcontext(scandir_it)
165 with scandir_ctx as scandir_it:
166 while True:
167 try:
168 entry = WalkDirEntry.from_direntry(next(scandir_it))
169 except StopIteration:
170 break
171 if entry.is_dir:
172 dirs.append(entry)
173 else:
174 nondirs.append(entry)
175 #
176 # Always bottom-up: recurse into sub-directories, but exclude
177 # symlinks to directories if follow_symlinks is False
178 #
179 if entry.is_dir:
180 if follow_symlinks:
181 walk_into = True
182 else:
183 walk_into = not entry.is_symlink
184 if walk_into:
185 walk_dirs.append(entry)
186
187 # Sort by low-level filesystem encoding
188 walk_dirs.sort(key=WalkDirEntry.sort_key)
189 dirs.sort(key=WalkDirEntry.sort_key)
190 nondirs.sort(key=WalkDirEntry.sort_key)
191
192 # Recurse into sub-directories
193 for wd in walk_dirs:
194 yield from _walk(root, top + (wd.name,), follow_symlinks)
195 # Yield after recursion if going bottom up
196 yield top, dirs, nondirs
197
198 else:
199
200 raise ImportError("no `scandir()' module available")