Mercurial > hgrepos > Python > apps > py-cutils
changeset 372:bfe1160fbfd3
treesum: Make ERROR outputs more consistent: use native paths where possible
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Sun, 13 Apr 2025 14:15:33 +0200 |
| parents | 29a301ff2501 |
| children | 9cba56c87946 |
| files | cutils/treesum.py cutils/util/__init__.py cutils/util/walk.py |
| diffstat | 3 files changed, 187 insertions(+), 101 deletions(-) [+] |
line wrap: on
line diff
--- a/cutils/treesum.py Sat Apr 12 09:05:48 2025 +0200 +++ b/cutils/treesum.py Sun Apr 13 14:15:33 2025 +0200 @@ -741,15 +741,17 @@ logging.debug("Recursing into directory: %s/%r", root, top) else: logging.debug("Handling root directory: %s", root) - path = os.path.join(root, *top) if top else root + fullpath = os.path.join(root, *top) if top else root # Determine also the path to be used for directory filtering fpath = join_output_path(top, None) if top else "" if self._fnmatcher: + logging.debug("Checking match against path: %s", fpath) if not self._fnmatcher.shall_visit(fpath): logging.debug("Skipping directory: %s", fpath) return (None, None, None, None) try: - with walk.ScanDir(path) as dirscan: + logging.debug("Scanning directory: %s", fullpath) + with walk.ScanDir(fullpath) as dirscan: fsobjects = list(dirscan) except OSError as e: # @@ -769,19 +771,13 @@ errmsg = b"no such file or directory" else: raise - if self._utf8_mode: - opath = walk.WalkDirEntry.alt_u8(path) - else: - opath = walk.WalkDirEntry.alt_fs(path) - self._writer.write_error(util.interpolate_bytes( - b"`%s': %s", opath, errmsg)) + self._writer.write_error( + b"`%s': %s", + walk.WalkDirEntry.alt_bytes(fullpath, self._utf8_mode), + errmsg) # Reuse from top - opath = join_output_path(top, None) - if opath: - if self._utf8_mode: - opath = walk.WalkDirEntry.alt_u8(opath) - else: - opath = walk.WalkDirEntry.alt_fs(opath) + opath = walk.WalkDirEntry.alt_bytes( + join_output_path(top, None), self._utf8_mode) if self._size_only: self._writer.write_size(opath, None) else: @@ -810,15 +806,10 @@ else: eno = 0 emsg = None - if self._utf8_mode: - fpath = walk.WalkDirEntry.alt_u8(fpath) - else: - fpath = walk.WalkDirEntry.alt_fs(fpath) - opath = join_output_path(top, None) - if self._utf8_mode: - opath = walk.WalkDirEntry.alt_u8(opath) - else: - opath = walk.WalkDirEntry.alt_fs(opath) + opath = walk.WalkDirEntry.alt_bytes( + join_output_path(top, None), self._utf8_mode) + fpath = walk.WalkDirEntry.alt_bytes( + fpath, self._utf8_mode) if eno == 0: # # treesum file could be read. @@ -832,12 +823,11 @@ # This is a severe error here: just the size # is required, but we have not got one. # - self._writer.write_error(util.b( - util.interpolate_bytes( - b"No size in treesum-file `%s' while" - b" requiring it", - fpath), - "utf-8")) + self._writer.write_error( + b"Missing required size in treesum-file" + b" `%s'", + walk.WalkDirEntry.alt_bytes( + fso.npath, self._utf8_mode)) self._writer.write_size(opath, None) return (errno.ESRCH, None, None, None) else: @@ -847,11 +837,10 @@ # XXX FIXME: Is this a **severe** error # here? Currently: no # - self._writer.write_error(util.b( - util.interpolate_bytes( - b"No size in treesum-file `%s'", - fpath), - "utf-8")) + self._writer.write_error( + b"Missing size in treesum-file `%s'", + walk.WalkDirEntry.alt_bytes( + fso.npath, self._utf8_mode)) sz = -1 else: sz = collector.size @@ -863,12 +852,11 @@ # the treesum file was created with # "--size-only" and contains no digest. # - self._writer.write_error(util.b( - util.interpolate_bytes( - b"No digest in treesum-file `%s' while" - b" it is required", - fpath), - "utf-8")) + self._writer.write_error( + b"Missing required digest in treesum-file" + b" `%s'", + walk.WalkDirEntry.alt_bytes( + fso.npath, self._utf8_mode)) self._writer.write_file_digest( collector.algorithm or "MD5", opath, @@ -891,12 +879,14 @@ # # treesum file could not be read # - self._writer.write_error(util.interpolate_bytes( + self._writer.write_error( b"Cannot read treesum-file `%s' for directory" b"`%s': %s", - fpath, - opath, - util.b(emsg, "utf-8"))) + walk.WalkDirEntry.alt_bytes( + fso.npath, self._utf8_mode), + walk.WalkDirEntry.alt_u8( + join_output_path(top, None)), + util.b(emsg, "utf-8", "backslashreplace")) if self._size_only: self._writer.write_size(opath, None) else: @@ -935,16 +925,14 @@ if not self._fnmatcher.shall_visit(fpath): logging.debug("Skipping: %s", fpath) continue - if self._utf8_mode: - opath = walk.WalkDirEntry.alt_u8(opath) - else: - opath = walk.WalkDirEntry.alt_fs(opath) + opath = walk.WalkDirEntry.alt_bytes(opath, self._utf8_mode) if fso.is_special: special_tag = util.b(fso.special_tag) + assert len(special_tag) == 1 assert fso.stat is not None # because .is_special is True if fso.is_symlink and not self._follow_symlinks.file: linktgt = walk.WalkDirEntry.from_readlink( - os.readlink(fso.path)) + os.readlink(fso.npath)) linkdgst = self._algorithm[0]() if self._utf8_mode: if linktgt.u8path is None: @@ -1028,7 +1016,7 @@ assert fso.stat is not None # because .is_dir is True if fso.is_symlink and not self._follow_symlinks.directory: linktgt = walk.WalkDirEntry.from_readlink( - os.readlink(fso.path)) + os.readlink(fso.npath)) linkdgst = self._algorithm[0]() if self._utf8_mode: if linktgt.u8path is None: @@ -1121,7 +1109,7 @@ # special file (socket, FIFO, et al.). # linktgt = walk.WalkDirEntry.from_readlink( - os.readlink(fso.path)) + os.readlink(fso.npath)) linkdgst = self._algorithm[0]() if self._utf8_mode: if linktgt.u8path is None: @@ -1176,13 +1164,15 @@ b"5:errno,%d:%s,", len(str(fso.stat_errno)), util.b(str(fso.stat_errno)))) - self._writer.write_error(util.interpolate_bytes( + self._writer.write_error( b"errno %d: %s", fso.stat_errno, - util.b(fso.stat_errstr, "utf-8"))) + util.b(util.escape_for_output(fso.stat_errstr), + "utf-8", + "backslashreplace")) logging.error( - "Directory entry has symlink problems: %r", - opath) + "Directory entry has symlink problems: %s", + fso.npath) if self._size_only: self._writer.write_size(opath, None) else: @@ -1222,16 +1212,18 @@ try: dgst = digest.compute_digest_file( self._algorithm[0], - fso.path, + fso.npath, use_mmap=self._use_mmap) except OSError as e: dir_tainted = True self._writer.write_error( - util.interpolate_bytes( - b"`%s': errno %d: %s", - opath, - e.errno, - util.b(e.strerror, "utf-8"))) + b"`%s': errno %d: %s", + walk.WalkDirEntry.alt_bytes( + fso.npath, self._utf8_mode), + e.errno, + util.b(util.escape_for_output(e.strerror), + "utf-8", + "backslashreplace")) sz = (fso.stat.st_size if self._print_size else None) self._writer.write_file_digest( @@ -1248,19 +1240,15 @@ self._writer.write_file_digest( self._algorithm[1], opath, dgst, sz) self._writer.flush() - opath = join_output_path(top, None) - if opath: - if self._utf8_mode: - opath = walk.WalkDirEntry.alt_u8(opath) - else: - opath = walk.WalkDirEntry.alt_fs(opath) if dir_tainted: # # IMPORTANT: Print errors BEFORE the associated digest or size # line. Otherwise the "info" command has a problem. # - self._writer.write_error(b"directory is tainted") - logging.error("Directory has problems: %r", opath) + self._writer.write_error(b"%s", b"directory is tainted") + logging.error("Directory has problems: %s", fullpath) + opath = walk.WalkDirEntry.alt_bytes( + join_output_path(top, None), self._utf8_mode) if self._size_only: self._writer.write_size(opath, dir_size) else: @@ -1455,7 +1443,8 @@ def write_comment(self, comment): self.write(b"COMMENT (") - self.write(util.b(comment, "utf-8")) + comment = util.escape_for_output(comment) + self.write(util.b(comment, "utf-8", "backslashreplace")) self.writeln(b")") def write_generator(self, generator): @@ -1463,9 +1452,9 @@ self.write(util.b(generator, "utf-8")) self.writeln(b")") - def write_error(self, error): + def write_error(self, fmt, *args): self.write(b"ERROR (") - self.write(util.b(error, "utf-8")) + self.write(util.interpolate_bytes(fmt, *args)) self.writeln(b")") def write_fsencoding(self, encoding): @@ -1591,15 +1580,16 @@ def write_comment(self, comment): self.write(b"COMMENT\t") - self.writeln(util.b(comment, "utf-8")) + comment = util.escape_for_output(comment) + self.writeln(util.b(comment, "utf-8", "backslashreplace")) def write_generator(self, generator): self.write(b"GENERATOR\t") self.writeln(util.b(generator, "utf-8")) - def write_error(self, error): + def write_error(self, fmt, *args): self.write(b"ERROR\t") - self.writeln(util.b(error, "utf-8")) + self.writeln(util.interpolate_bytes(fmt, *args)) def write_fsencoding(self, encoding): self.write(b"FSENCODING\t")
--- a/cutils/util/__init__.py Sat Apr 12 09:05:48 2025 +0200 +++ b/cutils/util/__init__.py Sun Apr 13 14:15:33 2025 +0200 @@ -14,6 +14,7 @@ "PY35", "n", "b", "u", "normalize_filename", + "escape_for_output", "argv2algo", "algotag2algotype", "algotag2digest_size", @@ -44,10 +45,10 @@ return s.encode(encoding) return s - def b(s, encoding="ascii"): + def b(s, encoding="ascii", errors="strict"): """Convert `s` to bytes""" if isinstance(s, unicode): # noqa: F821 undefined name 'unicode' - return s.encode(encoding) + return s.encode(encoding, errors) return s def u(s, encoding="ascii"): @@ -64,15 +65,32 @@ return s.decode(encoding) return s - def b(s, encoding="ascii"): + def b(s, encoding="ascii", errors="strict"): """Convert `s` to bytes""" if isinstance(s, str): - return s.encode(encoding) + return s.encode(encoding, errors) return s u = n +def escape_for_output(what): + """Escape `what` in such a way that the output can be safely written into + a line and/or column-oriented output file + + """ + if isinstance(what, bytes): + return (what.replace(b'\\', b"\\\\") + .replace(b'\n', b"\\x0a") + .replace(b'\r', b"\\x0d") + .replace(b'\t', b"\\x09")) + else: + return (what.replace(u'\\', u"\\\\") + .replace(u'\n', u"\\x0a") + .replace(u'\r', u"\\x0d") + .replace(u'\t', u"\\x09")) + + def default_algotag(): """Determine the "best" default algorithm.
--- a/cutils/util/walk.py Sat Apr 12 09:05:48 2025 +0200 +++ b/cutils/util/walk.py Sun Apr 13 14:15:33 2025 +0200 @@ -25,7 +25,7 @@ import stat import sys -from . import PY2 +from . import PY2, escape_for_output HELP_FILETYPE_INDICATORS = r""" @@ -124,7 +124,8 @@ """ - __slots__ = ("_name", "_path", # encoded as given in the ctor + __slots__ = ("_name", "_npath", # encoded as given in the ctor + "_path", # encoded as given but with shashes "_is_symlink", "_is_reg", "_is_dir", "_stat_result", "_stat_errno", "_stat_errstr", "_alt_fsname", "_alt_u8name") @@ -132,6 +133,8 @@ def __init__(self, name, path): self._name = name # the name as given in the constructor """The name exactly as given in the ctor""" + self._npath = path + """The path exactly as given in the ctor""" self._path = _unix_path(path) """The path as given in the ctor -- but normalized to have slashes""" self._is_symlink = self._is_reg = self._is_dir = self._stat_result = \ @@ -144,6 +147,11 @@ return self._name @property + def npath(self): + """The original path exactly as given in the ctor""" + return self._npath + + @property def path(self): """The original path exactly as given in the ctor -- but normalized to have forward slashes""" @@ -172,6 +180,28 @@ return s @property + def fsnpath(self): + """Always bytes. + + Also do not allow TAB, CR or LF in the path. + + :rtype: bytes or None + + """ + if PY2: + if isinstance(self._npath, bytes): + p = self._npath + try: + p = self._npath.encode(_FSENCODING, "strict") + except UnicodeError: + return None + else: + p = os.fsencode(self._npath) + if (b'\n' in p) or (b'\r' in p) or (b'\t' in p) or (b'\\' in p): + return None + return p + + @property def fspath(self): """Always bytes. @@ -205,6 +235,15 @@ return self._alt_fsname @property + def alt_fsnpath(self): + """Alternative and "escaped" filesystem path -- always bytes. + + :rtype: bytes + + """ + return WalkDirEntry.alt_fs(self._npath) + + @property def alt_fspath(self): """Alternative and "escaped" filesystem path -- always bytes. @@ -220,16 +259,7 @@ # ... and hope that the current FS encoding is compatible # with it # - if isinstance(what, bytes): - s = (what.replace(b'\\', b"\\\\") - .replace(b'\n', b"\\x0a") - .replace(b'\r', b"\\x0d") - .replace(b'\t', b"\\x09")) - else: - s = (what.replace(u'\\', u"\\\\") - .replace(u'\n', u"\\x0a") - .replace(u'\r', u"\\x0d") - .replace(u'\t', u"\\x09")) + s = escape_for_output(what) if PY2: if isinstance(s, bytes): return s @@ -262,6 +292,29 @@ return self._name @property + def unpath(self): + """Always "real", strictly encoded Unicode or `None` if this is not + possible. + + :rtype: text or None + + """ + if PY2: + if isinstance(self._npath, bytes): + try: + return self._npath.decode(_FSENCODING, "strict") + except UnicodeError: + return None + else: + return self._npath + else: + try: + self._npath.encode("utf-8", "strict") + except UnicodeError: + return None + return self._npath + + @property def upath(self): """Always "real", strictly encoded Unicode or `None` if this is not possible. @@ -299,6 +352,20 @@ return n.encode("utf-8", "strict") @property + def u8npath(self): + """`.unpath` as UTF-8 or `None` (as strict as `upath`. + + Also do not allow TAB, CR or LF in the path. + + """ + p = self.unpath + if p is None: + return None + if (u'\n' in p) or (u'\r' in p) or (u'\t' in p) or (u'\\' in p): + return None + return p.encode("utf-8", "strict") + + @property def u8path(self): """`.upath` as UTF-8 or `None` (as strict as `upath`. @@ -319,6 +386,10 @@ return self._alt_u8name @property + def alt_u8npath(self): + return WalkDirEntry.alt_u8(self._npath) + + @property def alt_u8path(self): return WalkDirEntry.alt_u8(self._path) @@ -329,16 +400,7 @@ # ... and hope that the current UTF-8 is compatible # with it # - if isinstance(what, bytes): - s = (what.replace(b'\\', b"\\\\") - .replace(b'\n', b"\\x0a") - .replace(b'\r', b"\\x0d") - .replace(b'\t', b"\\x09")) - else: - s = (what.replace(u'\\', u"\\\\") - .replace(u'\n', u"\\x0a") - .replace(u'\r', u"\\x0d") - .replace(u'\t', u"\\x09")) + s = escape_for_output(what) if PY2: if isinstance(s, bytes): try: @@ -352,6 +414,22 @@ else: return s.encode("utf-8", "backslashreplace") + @staticmethod + def alt_bytes(what, use_utf8): + if not what: + return what + if use_utf8: + return WalkDirEntry.alt_u8(what) + else: + return WalkDirEntry.alt_fs(what) + + @staticmethod + def alt_text(what, use_utf8): + b = WalkDirEntry.alt_bytes(what, use_utf8) + if PY2: + return b + return b.decode("iso-8859-1") + @property def is_symlink(self): return self._is_symlink
