comparison cutils/util/walk.py @ 162:29dd5528174c

Implement walk._walk() using os.listdir() also. Use it if no scandir is available.
author Franz Glasner <fzglas.hg@dom66.de>
date Thu, 09 Jan 2025 13:36:41 +0100
parents 481cc9b26861
children a813094ae4f5
comparison
equal deleted inserted replaced
161:df927ada9a37 162:29dd5528174c
111 w._is_symlink = False 111 w._is_symlink = False
112 # Do not supress errors here and (consistently) follow symlinks 112 # Do not supress errors here and (consistently) follow symlinks
113 w._stat_result = entry.stat(follow_symlinks=True) 113 w._stat_result = entry.stat(follow_symlinks=True)
114 return w 114 return w
115 115
116 @classmethod
117 def from_path_name(cls_, path, name):
118 w = cls_(name)
119 w._path = os.path.join(path, name)
120 try:
121 w._is_dir = os.path.isdir(w._path)
122 except OSError:
123 #
124 # If is_dir() raises an OSError, consider that the entry
125 # is not a directory, same behaviour than os.path.isdir().
126 #
127 w._is_dir = False
128 try:
129 w._is_symlink = os.path.islink(w._path)
130 except OSError:
131 #
132 # If is_symlink() raises an OSError, consider that the entry
133 # is not a symbolic link, same behaviour than os.path.islink().
134 #
135 w._is_symlink = False
136 w._stat_result = os.stat(w._path)
137 return w
138
116 @staticmethod 139 @staticmethod
117 def sort_key(entry): 140 def sort_key(entry):
118 return entry._fsname 141 return entry._fsname
119 142
120 143
144 def walk(root, follow_symlinks=False):
145 """A heyvily customized :func:`os.walk` alike that differs from the
146 original:
147
148 - optimized for use in :command:`treesum`
149 - most errors are not suppressed
150 - the `root` is never part of the returned data
151 - the returned directory in "top" is not a string form but a list of
152 individual path segments
153 - there is only one yielded list
154
155 * contains :class:`WalkDirEntry`
156 * sorted by its fsname
157
158 The caller can easily get the old dirs and nondirs by filtering
159 the yielded list using "entry.is_dir".
160
161 - recurse into sub-directories first ("topdown=False")
162 - sort consistently all yielded lists by the filesystem encoding
163
164 .. note:: The implementation is based on Python 3.11 and needs a
165 functional :func:`os.scandir` or :func:`scandir.scandir`
166 implementation. It intentionally follows the logic in
167 Python 3.11 while it could be simplified because we are not
168 implementing some of the original flags (e.g. like
169 `topdown`).
170
171 """
172 normed_root = os.path.normpath(root)
173 yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks)
174
175
121 if scandir: 176 if scandir:
122 177
123 def walk(root, follow_symlinks=False): 178 def _walk(root, top, follow_symlinks):
124 """A heyvily customized :func:`os.walk` alike that differs from the 179 """:func:`walk` helper.
125 original: 180
126 181 Implemented using :func:`os.scandir`.
127 - optimized for use in :command:`treesum`
128 - most errors are not suppressed
129 - the `root` is never part of the returned data
130 - the returned directory in "top" is not a string form but a list of
131 individual path segments
132 - there is only one yielded list
133
134 * contains :class:`WalkDirEntry`
135 * sorted by its fsname
136
137 The caller can easily get the old dirs and nondirs by filtering
138 the yielded list using "entry.is_dir".
139
140 - recurse into sub-directories first ("topdown=False")
141 - sort consistently all yielded lists by the filesystem encoding
142
143 .. note:: The implementation is based on Python 3.11 and needs a
144 functional :func:`os.scandir` or :func:`scandir.scandir`
145 implementation. It intentionally follows the logic in
146 Python 3.11 while it could be simplified because we are not
147 implementing some of the original flags (e.g. like
148 `topdown`).
149 182
150 """ 183 """
151 normed_root = os.path.normpath(root)
152 yield from _walk(normed_root, tuple(), follow_symlinks=follow_symlinks)
153
154
155 def _walk(root, top, follow_symlinks): # noqa: E303 too many empty lines
156 """:func:`walk` helper"""
157 if top: 184 if top:
158 path = os.path.join(root, *top) 185 path = os.path.join(root, *top)
159 else: 186 else:
160 path = root 187 path = root
161 188
193 # Yield after recursion if going bottom up 220 # Yield after recursion if going bottom up
194 yield top, fsobjects 221 yield top, fsobjects
195 222
196 else: 223 else:
197 224
198 raise ImportError("no `scandir()' module available") 225 def _walk(root, top, follow_symlinks):
226 """:func:`walk` helper.
227
228 Implemented using :func:`os.listdir`.
229
230 """
231 if top:
232 path = os.path.join(root, *top)
233 else:
234 path = root
235
236 fsobjects, walk_dirs = [], []
237
238 names = os.listdir(path)
239 for name in names:
240 entry = WalkDirEntry.from_path_name(path, name)
241 fsobjects.append(entry)
242 #
243 # Always bottom-up: recurse into sub-directories, but exclude
244 # symlinks to directories if follow_symlinks is False
245 #
246 if entry.is_dir:
247 if follow_symlinks:
248 walk_into = True
249 else:
250 walk_into = not entry.is_symlink
251 if walk_into:
252 walk_dirs.append(entry)
253
254 # Sort by low-level filesystem encoding
255 walk_dirs.sort(key=WalkDirEntry.sort_key)
256 fsobjects.sort(key=WalkDirEntry.sort_key)
257
258 # Recurse into sub-directories
259 for wd in walk_dirs:
260 yield from _walk(root, top + (wd.name,), follow_symlinks)
261 # Yield after recursion if going bottom up
262 yield top, fsobjects