diff cutils/treesum.py @ 275:c72f5b2dbc6f

treesum: Simplify the path computations and make the visible output more consistent. Current format of digest lines: - No `=' if there was a severe error so that digest and/or size could not be computed. - Empty fields for digest and/or size of they were not computed on purpose: * for symbolic links * for special files
author Franz Glasner <fzglas.hg@dom66.de>
date Thu, 20 Feb 2025 15:30:51 +0100
parents 6fe88de236cb
children 9676ecd32a07
line wrap: on
line diff
--- a/cutils/treesum.py	Wed Feb 19 16:42:44 2025 +0100
+++ b/cutils/treesum.py	Thu Feb 20 15:30:51 2025 +0100
@@ -566,13 +566,15 @@
                     len(self._algorithm[1]), util.b(self._algorithm[1]),
                     len(linkdgst.digest()), linkdgst.digest()))
             if self._size_only:
-                self._writer.write_size(b"./@/", 0)
+                self._writer.write_size(b"./@/", b"")
             else:
+                sz = b"" if self._print_size else None
                 self._writer.write_file_digest(
                     self._algorithm[1],
                     b"./@/",
                     dir_dgst.digest(),
-                    self._use_base64)
+                    self._use_base64,
+                    size=sz)
             self._writer.flush()
         else:
             self._generate(os.path.normpath(root), tuple())
@@ -585,10 +587,10 @@
             with walk.ScanDir(path) as dirscan:
                 fsobjects = list(dirscan)
         except OSError as e:
-            if self._utf8_mode:
-                opath = walk.WalkDirEntry.alt_u8(path)
-            else:
-                opath = walk.WalkDirEntry.alt_fs(path)
+            #
+            # NOTE: Sync the error handler code with this method's
+            #       code below before returning!
+            #
             if e.errno == errno.ENOTDIR:
                 # object exists but is not a directory
                 errmsg = b"not a directory"
@@ -602,6 +604,10 @@
                 errmsg = b"no such file or directory"
             else:
                 raise
+            if self._utf8_mode:
+                opath = walk.WalkDirEntry.alt_u8(path)
+            else:
+                opath = walk.WalkDirEntry.alt_fs(path)
             self._writer.write_error(util.interpolate_bytes(
                 b"`%s': %s", opath, errmsg))
             opath = join_output_path(top, None)
@@ -624,11 +630,47 @@
         dir_size = 0
         dir_tainted = False
         for fso in fsobjects:
-            if fso.is_dir:
-                if fso.is_symlink and not self._follow_symlinks.directory:
+            # Determine the effective name to be used for digesting
+            if self._utf8_mode:
+                if fso.u8name is None:
+                    dir_tainted = True
+                    effective_fso_name = fso.alt_u8name
+                else:
+                    effective_fso_name = fso.u8name
+            else:
+                if fso.fsname is None:
+                    dir_tainted = True
+                    effective_fso_name = fso.alt_fsname
+                else:
+                    effective_fso_name = fso.fsname
+            # Determine the path (mostly its prefix) that is to be printed
+            opath = join_output_path(top, fso.name)
+            if self._utf8_mode:
+                opath = walk.WalkDirEntry.alt_u8(opath)
+            else:
+                opath = walk.WalkDirEntry.alt_fs(opath)
+            if fso.is_special:
+                # Determine the tag character
+                if fso.is_chr:
+                    special_tag = b':'
+                elif fso.is_blk:
+                    special_tag = b';'
+                elif fso.is_fifo:
+                    special_tag = b'|'
+                elif fso.is_socket:
+                    special_tag = b'='
+                elif fso.is_door:
+                    special_tag = b'>'
+                elif fso.is_whiteout:
+                    special_tag = b'%'
+                elif fso.is_eventport:
+                    special_tag = b'+'
+                else:
+                    assert False, "unknown special filesystem object"
+                assert fso.stat is not None    # because .is_special is True
+                if fso.is_symlink and not self._follow_symlinks.file:
                     linktgt = walk.WalkDirEntry.from_readlink(
                         os.readlink(fso.path))
-                    # linktgt = util.fsencode(os.readlink(fso.path)))
                     linkdgst = self._algorithm[0]()
                     if self._utf8_mode:
                         if linktgt.u8path is None:
@@ -636,30 +678,102 @@
                             linkdgst.update(linktgt.alt_u8path)
                         else:
                             linkdgst.update(linktgt.u8path)
-                        if fso.u8name is None:
-                            dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"2:@/,%d:%s,",
-                                len(fso.alt_u8name),
-                                fso.alt_u8name))
-                        else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"2:@/,%d:%s,", len(fso.u8name), fso.u8name))
                     else:
                         if linktgt.fspath is None:
                             dir_tainted = True
                             linkdgst.update(linktgt.alt_fspath)
                         else:
                             linkdgst.update(linktgt.fspath)
-                        if fso.fsname is None:
+                    dir_dgst.update(util.interpolate_bytes(
+                        b"2:@%s,%d:%s,",
+                        special_tag,
+                        len(effective_fso_name),
+                        effective_fso_name))
+                    dir_dgst.update(util.interpolate_bytes(
+                        b"%d:%s,%d:%s,",
+                        len(self._algorithm[1]), util.b(self._algorithm[1]),
+                        len(linkdgst.digest()), linkdgst.digest()))
+                    #
+                    # - no mtime and no mode for symlinks
+                    # - also does not count for dir_size
+                    #
+                    if self._size_only:
+                        self._writer.write_size(
+                            util.interpolate_bytes(
+                                b"%s/./@%s", opath, special_tag),
+                            b"")
+                    else:
+                        sz = b"" if self._print_size else None
+                        self._writer.write_file_digest(
+                            self._algorithm[1],
+                            util.interpolate_bytes(
+                                b"%s/./@%s", opath, special_tag),
+                            linkdgst.digest(),
+                            self._use_base64,
+                            size=sz)
+                else:
+                    #
+                    # Follow the symlink to special file and/or handle a
+                    # special file
+                    #
+                    dir_dgst.update(util.interpolate_bytes(
+                        b"1:%s,%d:%s,",
+                        special_tag,
+                        len(effective_fso_name),
+                        effective_fso_name))
+                    # no important size here but a mode
+                    if self._with_metadata_mtime:
+                        mtime = datetime.datetime.utcfromtimestamp(
+                            int(fso.stat.st_mtime))
+                        mtime = util.b(mtime.isoformat("T") + "Z")
+                        dir_dgst.update(util.interpolate_bytes(
+                            b"5:mtime,%d:%s,", len(mtime), mtime))
+                    if self._with_metadata_full_mode:
+                        modestr = util.b(
+                            normalized_mode_str(fso.stat.st_mode))
+                        dir_dgst.update(util.interpolate_bytes(
+                            b"8:fullmode,%d:%s,", len(modestr), modestr))
+                    elif self._with_metadata_mode:
+                        modestr = util.b(normalized_compatible_mode_str(
+                            fso.stat.st_mode))
+                        dir_dgst.update(util.interpolate_bytes(
+                            b"4:mode,%d:%s,", len(modestr), modestr))
+                    if self._size_only:
+                        self._writer.write_size(
+                            util.interpolate_bytes(
+                                b"%s/./%s", opath, special_tag),
+                            b"")
+                    else:
+                        sz = b"" if self._print_size else None
+                        self._writer.write_file_digest(
+                            self._algorithm[1],
+                            util.interpolate_bytes(
+                                b"%s/./%s", opath, special_tag),
+                            b"",
+                            self._use_base64,
+                            size=sz)
+            elif fso.is_dir:
+                assert fso.stat is not None        # because .is_dir is True
+                if fso.is_symlink and not self._follow_symlinks.directory:
+                    linktgt = walk.WalkDirEntry.from_readlink(
+                        os.readlink(fso.path))
+                    linkdgst = self._algorithm[0]()
+                    if self._utf8_mode:
+                        if linktgt.u8path is None:
                             dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"2:@/,%d:%s,",
-                                len(fso.alt_fsname),
-                                fso.alt_fsname))
+                            linkdgst.update(linktgt.alt_u8path)
                         else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"2:@/,%d:%s,", len(fso.fsname), fso.fsname))
+                            linkdgst.update(linktgt.u8path)
+                    else:
+                        if linktgt.fspath is None:
+                            dir_tainted = True
+                            linkdgst.update(linktgt.alt_fspath)
+                        else:
+                            linkdgst.update(linktgt.fspath)
+                    dir_dgst.update(util.interpolate_bytes(
+                        b"2:@/,%d:%s,",
+                        len(effective_fso_name),
+                        effective_fso_name))
                     #
                     # - no mtime and no mode for symlinks
                     # - also does not count for dir_size
@@ -668,16 +782,12 @@
                         b"%d:%s,%d:%s,",
                         len(self._algorithm[1]), util.b(self._algorithm[1]),
                         len(linkdgst.digest()), linkdgst.digest()))
-                    opath = join_output_path(top, fso.name)
-                    if self._utf8_mode:
-                        opath = walk.WalkDirEntry.alt_u8(opath)
-                    else:
-                        opath = walk.WalkDirEntry.alt_fs(opath)
                     if self._size_only:
                         self._writer.write_size(
                             util.interpolate_bytes(b"%s/./@/", opath),
-                            0)
+                            b"")
                     else:
+                        sz = b"" if self._print_size else None
                         self._writer.write_file_digest(
                             self._algorithm[1],
                             util.interpolate_bytes(b"%s/./@/", opath),
@@ -703,26 +813,10 @@
                         assert False
 
                     dir_size += sub_dir_size
-                    if self._utf8_mode:
-                        if fso.u8name is None:
-                            dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:/,%d:%s,",
-                                len(fso.alt_u8name),
-                                fso.alt_u8name))
-                        else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:/,%d:%s,", len(fso.u8name), fso.u8name))
-                    else:
-                        if fso.fsname is None:
-                            dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:/,%d:%s,",
-                                len(fso.alt_fsname),
-                                fso.alt_fsname))
-                        else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:/,%d:%s,", len(fso.fsname), fso.fsname))
+                    dir_dgst.update(util.interpolate_bytes(
+                        b"1:/,%d:%s,",
+                        len(effective_fso_name),
+                        effective_fso_name))
                     if self._with_metadata_full_mode:
                         modestr = util.b(normalized_mode_str(fso.stat.st_mode))
                         dir_dgst.update(util.interpolate_bytes(
@@ -740,11 +834,11 @@
                 if fso.is_symlink and not self._follow_symlinks.file:
                     #
                     # Symbolic link to some filesystem object which is not
-                    # determined to be a link to a directory.
+                    # determined to be a link to a directory or some other
+                    # special file (socket, FIFO, et al.).
                     #
                     linktgt = walk.WalkDirEntry.from_readlink(
                         os.readlink(fso.path))
-                    # linktgt = util.fsencode(os.readlink(fso.path)))
                     linkdgst = self._algorithm[0]()
                     if self._utf8_mode:
                         if linktgt.u8path is None:
@@ -752,30 +846,16 @@
                             linkdgst.update(linktgt.alt_u8path)
                         else:
                             linkdgst.update(linktgt.u8path)
-                        if fso.u8name is None:
-                            dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:@,%d:%s,",
-                                len(fso.alt_u8name),
-                                fso.alt_u8name))
-                        else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:@,%d:%s,", len(fso.u8name), fso.u8name))
                     else:
                         if linktgt.fspath is None:
                             dir_tainted = True
                             linkdgst.update(linktgt.alt_fspath)
                         else:
                             linkdgst.update(linktgt.fspath)
-                        if fso.fsname is None:
-                            dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:@,%d:%s,",
-                                len(fso.alt_fsname),
-                                fso.alt_fsname))
-                        else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"1:@,%d:%s,", len(fso.fsname), fso.fsname))
+                    dir_dgst.update(util.interpolate_bytes(
+                        b"1:@,%d:%s,",
+                        len(effective_fso_name),
+                        effective_fso_name))
                     #
                     # - no mtime and no mode for symlinks
                     # - also does not count for dir_size
@@ -784,51 +864,27 @@
                         b"%d:%s,%d:%s,",
                         len(self._algorithm[1]), util.b(self._algorithm[1]),
                         len(linkdgst.digest()), linkdgst.digest()))
-                    opath = join_output_path(top, fso.name)
-                    if self._utf8_mode:
-                        opath = walk.WalkDirEntry.alt_u8(opath)
-                    else:
-                        opath = walk.WalkDirEntry.alt_fs(opath)
                     if self._size_only:
                         self._writer.write_size(
                             util.interpolate_bytes(b"%s/./@", opath),
-                            0)
+                            b"")
                     else:
+                        sz = b"" if self._print_size else None
                         self._writer.write_file_digest(
                             self._algorithm[1],
                             util.interpolate_bytes(b"%s/./@", opath),
                             linkdgst.digest(),
-                            self._use_base64)
+                            self._use_base64,
+                            size=sz)
                 else:
                     #
                     # Follow the symlink to file or handle a "real" file
                     #
 
-                    if self._utf8_mode:
-                        if fso.u8name is None:
-                            dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"0:,%d:%s,",
-                                len(fso.alt_u8name),
-                                fso.alt_u8name))
-                        else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"0:,%d:%s,", len(fso.u8name), fso.u8name))
-                    else:
-                        if fso.fsname is None:
-                            dir_tainted = True
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"0:,%d:%s,",
-                                len(fso.alt_fsname),
-                                fso.alt_fsname))
-                        else:
-                            dir_dgst.update(util.interpolate_bytes(
-                                b"0:,%d:%s,", len(fso.fsname), fso.fsname))
-                    opath = join_output_path(top, fso.name)
-                    if self._utf8_mode:
-                        opath = walk.WalkDirEntry.alt_u8(opath)
-                    else:
-                        opath = walk.WalkDirEntry.alt_fs(opath)
+                    dir_dgst.update(util.interpolate_bytes(
+                        b"0:,%d:%s,",
+                        len(effective_fso_name),
+                        effective_fso_name))
                     if fso.stat is None:
                         #
                         # Error: most likely a broken symlink here
@@ -1148,7 +1204,7 @@
     PATTERN1 = re.compile(br"\A(VERSION|FSENCODING|FLAGS|TIMESTAMP|ISOTIMESTAMP|CRC32)[ \t]*=[ \t]*([^ \t]+)[ \t]*\r?\n\Z")      # noqa: E501  line too long
     PATTERN2 = re.compile(br"\A(ROOT|COMMENT|ERROR|GENERATOR)[ \t]*\((.*)\)[ \t]*\r?\n\Z")                                       # noqa: E501  line too long
     PATTERN3 = re.compile(br"\ASIZE[ \t]*\((.*)\)([ \t]*=[ \t]*(\d+))?[ \t]*\r?\n\Z")                                               # noqa: E501  line too long
-    PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)([ \t]*=[ \t]*([A-Za-z0-9=+/]+)(,(\d+))?)?[ \t]*\r?\n\Z")              # noqa: E501  line too long
+    PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)([ \t]*=[ \t]*([A-Za-z0-9=+/]+)?(,(\d+)?)?)?[ \t]*\r?\n\Z")              # noqa: E501  line too long
 
     def __init__(self, _fp, _filename, _own_fp):
         self._fp = _fp
@@ -1293,15 +1349,21 @@
                         self._update_crc(line)
                         algo_name = util.n(mo.group(1))
                         if mo.group(3):
-                            if (len(mo.group(4)) ==
-                                    2 * self._get_digest_size(algo_name)):
-                                # hex
-                                digest = binascii.unhexlify(mo.group(4))
+                            if mo.group(4):
+                                if (len(mo.group(4)) ==
+                                        2 * self._get_digest_size(algo_name)):
+                                    # hex
+                                    digest = binascii.unhexlify(mo.group(4))
+                                else:
+                                    # base64
+                                    digest = base64.b64decode(mo.group(4))
                             else:
-                                # base64
-                                digest = base64.b64decode(mo.group(4))
+                                digest = None
                             if mo.group(5):
-                                size = int(util.n(mo.group(6)), 10)
+                                if mo.group(6):
+                                    size = int(util.n(mo.group(6)), 10)
+                                else:
+                                    size = None
                             else:
                                 size = None
                             return (algo_name, mo.group(2), digest, size)