diff cutils/treesum.py @ 217:8e38c07c4b85

Handle symlinks to files fully and Implement no-follow-file-symlinks
author Franz Glasner <fzglas.hg@dom66.de>
date Sat, 25 Jan 2025 09:52:22 +0100
parents 5a2d9ec204ce
children dee891ed2307
line wrap: on
line diff
--- a/cutils/treesum.py	Fri Jan 24 17:18:23 2025 +0100
+++ b/cutils/treesum.py	Sat Jan 25 09:52:22 2025 +0100
@@ -69,7 +69,7 @@
             default=FollowSymlinkConfig(False, False, True),
             dest="follow_symlinks",
             help="""Follow symbolic links to directories when walking a
-directory tree. Augments --physical.""")
+directory tree. Augments --physical and -p.""")
         gp.add_argument(
             "--follow-file-symlinks", action=SymlinkAction,
             const="follow-file-symlinks",
@@ -96,7 +96,8 @@
             help="""Follow symbolic links everywhere: on command line
 arguments and -- while walking -- directory and file symbolic links.
 Overwrites any other symlink related options
-(--physical, no-follow-directory-symlinks, no-follow-file-symlinks, et al.).
+(--physical,-p,  no-follow-directory-symlinks, no-follow-file-symlinks,
+et al.).
 """)
         gp.add_argument(
             "--minimal", nargs="?", const="", default=None, metavar="TAG",
@@ -128,7 +129,7 @@
             const="no-follow-file-symlinks",
             dest="follow_symlinks",
             help="""Dont follow symbolic links to files when walking a
-directory tree. Augments --logical.""")
+directory tree. Augments --logical and -p.""")
         gp.add_argument(
             "--no-mmap", action="store_false", dest="mmap", default=None,
             help="Dont use mmap.")
@@ -142,7 +143,17 @@
             help="""Do not follow any symbolic links whether they are given
 on the command line or when walking the directory tree.
 Overwrites any other symlink related options
-(--logical, follow-directory-symlinks, follow-file-symlinks, et al.).
+(--logical, -p, follow-directory-symlinks, follow-file-symlinks, et al.).
+This is the default.""")
+        gp.add_argument(
+            "-p", action=SymlinkAction, dest="follow_symlinks",
+            const=FollowSymlinkConfig(False, False, True),
+            help="""Do not follow any symbolic links to directories,
+whether they are given on the command line or when walking the directory tree,
+but follow symbolic links to files.
+Overwrites any other symlink related options
+(--logical, --physical, follow-directory-symlinks, no-follow-file-symlinks,
+et al.).
 This is the default.""")
         gp.add_argument(
             "--print-size", action="store_true",
@@ -336,10 +347,10 @@
                 assert False, "Implementation error: not yet implemented"
 
         # Not following symlinks to files is not yet supported: reset to True
-        if not curval.file:
-            curval = FollowSymlinkConfig(
-                curval.command_line, curval.directory, True)
-            logging.warning("Coercing options to `follow-file-symlinks'")
+#        if not curval.file:
+#            curval = FollowSymlinkConfig(
+#                curval.command_line, curval.directory, True)
+#            logging.warning("Coercing options to `follow-file-symlinks'")
         setattr(namespace, self.dest, curval)
 
 
@@ -360,14 +371,14 @@
                       print_size=False,
                       size_only=False,
                       utf8=False):
-    # Not following symlinks to files is not yet supported: reset to True
     if not isinstance(follow_symlinks, FollowSymlinkConfig):
         raise TypeError("`follow_symlinks' must be a FollowSymlinkConfig")
-    if not follow_symlinks.file:
-        follow_symlinks = follow_symlinks._make([follow_symlinks.command_line,
-                                                 follow_symlinks.directory,
-                                                 True])
-        logging.warning("Coercing to follow-symlinks-file")
+    # Not following symlinks to files is not yet supported: reset to True
+#    if not follow_symlinks.file:
+#        follow_symlinks = follow_symlinks._make([follow_symlinks.command_line,
+#                                                 follow_symlinks.directory,
+#                                                 True])
+#        logging.warning("Coercing to follow-symlinks-file")
     opts = argparse.Namespace(
         directories=directories,
         algorithm=util.argv2algo(algorithm),
@@ -581,9 +592,6 @@
                 "CRC32", self._outfp.hexcrcdigest(), None, False))
 
     def _generate(self, root, top):
-        # This is currently always True
-        assert self._follow_symlinks.file
-
         logging.debug("Handling %s/%r", root, top)
         path = os.path.join(root, *top) if top else root
         with walk.ScanDir(path) as dirscan:
@@ -708,64 +716,139 @@
                         dir_dgst.update(util.interpolate_bytes(
                             b"4:mode,%d:%s,", len(modestr), modestr))
             else:
-                if self._utf8_mode:
-                    if fso.u8name is None:
-                        dir_tainted = True
-                        dir_dgst.update(util.interpolate_bytes(
-                            b"1:f,%d:%s,",
-                            len(fso.alt_u8name),
-                            fso.alt_u8name))
+                if fso.is_symlink and not self._follow_symlinks.file:
+                    linktgt = walk.WalkDirEntry.from_readlink(
+                        os.readlink(fso.path))
+                    # linktgt = util.fsencode(os.readlink(fso.path)))
+                    linkdgst = self._algorithm[0]()
+                    if self._utf8_mode:
+                        if linktgt.u8path is None:
+                            dir_tainted = True
+                            linkdgst.update(util.interpolate_bytes(
+                                b"%d:%s,",
+                                len(linktgt.alt_u8path),
+                                linktgt.alt_u8path))
+                        else:
+                            linkdgst.update(util.interpolate_bytes(
+                                b"%d:%s,",
+                                len(linktgt.u8path),
+                                linktgt.u8path))
+                        if fso.u8name is None:
+                            dir_tainted = True
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:F,%d:%s,",
+                                len(fso.alt_u8name),
+                                fso.alt_u8name))
+                        else:
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:F,%d:%s,", len(fso.u8name), fso.u8name))
                     else:
-                        dir_dgst.update(util.interpolate_bytes(
-                            b"1:f,%d:%s,", len(fso.u8name), fso.u8name))
-                else:
-                    if fso.fsname is None:
-                        dir_tainted = True
-                        dir_dgst.update(util.interpolate_bytes(
-                            b"1:f,%d:%s,",
-                            len(fso.alt_fsname),
-                            fso.alt_fsname))
-                    else:
-                        dir_dgst.update(util.interpolate_bytes(
-                            b"1:f,%d:%s,", len(fso.fsname), fso.fsname))
-                dir_size += fso.stat.st_size
-                if self._with_metadata_mtime:
-                    mtime = datetime.datetime.utcfromtimestamp(
-                        int(fso.stat.st_mtime))
-                    mtime = util.b(mtime.isoformat("T") + "Z")
+                        if linktgt.fspath is None:
+                            dir_tainted = True
+                            linkdgst.update(util.interpolate_bytes(
+                                b"%d:%s,",
+                                len(linktgt.alt_fspath),
+                                linktgt.alt_fspath))
+                        else:
+                            linkdgst.update(util.interpolate_bytes(
+                                b"%d:%s,",
+                                len(linktgt.fspath),
+                                linktgt.fspath))
+                        if fso.fsname is None:
+                            dir_tainted = True
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:F,%d:%s,",
+                                len(fso.alt_fsname),
+                                fso.alt_fsname))
+                        else:
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:F,%d:%s,", len(fso.fsname), fso.fsname))
+                    #
+                    # - no mtime and no mode for symlinks
+                    # - also does not count for dir_size
+                    #
                     dir_dgst.update(util.interpolate_bytes(
-                        b"5:mtime,%d:%s,", len(mtime), mtime))
-                if self._with_metadata_full_mode:
-                    modestr = util.b(normalized_mode_str(fso.stat.st_mode))
-                    dir_dgst.update(util.interpolate_bytes(
-                        b"8:fullmode,%d:%s,", len(modestr), modestr))
-                elif self._with_metadata_mode:
-                    modestr = util.b(normalized_compatible_mode_str(
-                        fso.stat.st_mode))
-                    dir_dgst.update(util.interpolate_bytes(
-                        b"4:mode,%d:%s,", len(modestr), modestr))
-                if not self._size_only:
-                    dgst = digest.compute_digest_file(
-                        self._algorithm[0], fso.path, use_mmap=self._use_mmap)
-                    dir_dgst.update(util.interpolate_bytes(
-                        b"%d:%s,", len(dgst), dgst))
-                opath = join_output_path(top, fso.name)
-                if self._utf8_mode:
-                    opath = walk.WalkDirEntry.alt_u8(opath)
-                else:
-                    opath = walk.WalkDirEntry.alt_fs(opath)
-                if self._size_only:
-                    self._outfp.write(format_bsd_line(
-                        "SIZE", None, opath, False, fso.stat.st_size))
-                else:
-                    if self._print_size:
+                        b"%d:%s,",
+                        len(linkdgst.digest()), linkdgst.digest()))
+                    opath = join_output_path(top, fso.name)
+                    if self._utf8_mode:
+                        opath = walk.WalkDirEntry.alt_u8(opath)
+                    else:
+                        opath = walk.WalkDirEntry.alt_fs(opath)
+                    if self._size_only:
                         self._outfp.write(format_bsd_line(
-                            self._algorithm[1], dgst, opath, self._use_base64,
-                            fso.stat.st_size))
+                            "SIZE", None, "%s/./@" % (opath,), False, 0))
                     else:
                         self._outfp.write(format_bsd_line(
-                            self._algorithm[1], dgst, opath,
+                            self._algorithm[1],
+                            linkdgst.digest(),
+                            "%s/./@" % (opath,),
                             self._use_base64))
+                    self._outfp.flush()
+                else:
+                    # follow symlinks to files
+                    if self._utf8_mode:
+                        if fso.u8name is None:
+                            dir_tainted = True
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:f,%d:%s,",
+                                len(fso.alt_u8name),
+                                fso.alt_u8name))
+                        else:
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:f,%d:%s,", len(fso.u8name), fso.u8name))
+                    else:
+                        if fso.fsname is None:
+                            dir_tainted = True
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:f,%d:%s,",
+                                len(fso.alt_fsname),
+                                fso.alt_fsname))
+                        else:
+                            dir_dgst.update(util.interpolate_bytes(
+                                b"1:f,%d:%s,", len(fso.fsname), fso.fsname))
+                    dir_size += fso.stat.st_size
+                    if self._with_metadata_mtime:
+                        mtime = datetime.datetime.utcfromtimestamp(
+                            int(fso.stat.st_mtime))
+                        mtime = util.b(mtime.isoformat("T") + "Z")
+                        dir_dgst.update(util.interpolate_bytes(
+                            b"5:mtime,%d:%s,", len(mtime), mtime))
+                    if self._with_metadata_full_mode:
+                        modestr = util.b(normalized_mode_str(fso.stat.st_mode))
+                        dir_dgst.update(util.interpolate_bytes(
+                            b"8:fullmode,%d:%s,", len(modestr), modestr))
+                    elif self._with_metadata_mode:
+                        modestr = util.b(normalized_compatible_mode_str(
+                            fso.stat.st_mode))
+                        dir_dgst.update(util.interpolate_bytes(
+                            b"4:mode,%d:%s,", len(modestr), modestr))
+                    if not self._size_only:
+                        dgst = digest.compute_digest_file(
+                            self._algorithm[0],
+                            fso.path,
+                            use_mmap=self._use_mmap)
+                        dir_dgst.update(util.interpolate_bytes(
+                            b"%d:%s,", len(dgst), dgst))
+                    opath = join_output_path(top, fso.name)
+                    if self._utf8_mode:
+                        opath = walk.WalkDirEntry.alt_u8(opath)
+                    else:
+                        opath = walk.WalkDirEntry.alt_fs(opath)
+                    if self._size_only:
+                        self._outfp.write(format_bsd_line(
+                            "SIZE", None, opath, False, fso.stat.st_size))
+                    else:
+                        if self._print_size:
+                            self._outfp.write(format_bsd_line(
+                                self._algorithm[1],
+                                dgst, opath,
+                                self._use_base64,
+                                fso.stat.st_size))
+                        else:
+                            self._outfp.write(format_bsd_line(
+                                self._algorithm[1], dgst, opath,
+                                self._use_base64))
                 self._outfp.flush()
         opath = join_output_path(top, None)
         if opath: