Mercurial > hgrepos > Python > apps > py-cutils
diff cutils/treesum.py @ 217:8e38c07c4b85
Handle symlinks to files fully and Implement no-follow-file-symlinks
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Sat, 25 Jan 2025 09:52:22 +0100 |
| parents | 5a2d9ec204ce |
| children | dee891ed2307 |
line wrap: on
line diff
--- a/cutils/treesum.py Fri Jan 24 17:18:23 2025 +0100 +++ b/cutils/treesum.py Sat Jan 25 09:52:22 2025 +0100 @@ -69,7 +69,7 @@ default=FollowSymlinkConfig(False, False, True), dest="follow_symlinks", help="""Follow symbolic links to directories when walking a -directory tree. Augments --physical.""") +directory tree. Augments --physical and -p.""") gp.add_argument( "--follow-file-symlinks", action=SymlinkAction, const="follow-file-symlinks", @@ -96,7 +96,8 @@ help="""Follow symbolic links everywhere: on command line arguments and -- while walking -- directory and file symbolic links. Overwrites any other symlink related options -(--physical, no-follow-directory-symlinks, no-follow-file-symlinks, et al.). +(--physical,-p, no-follow-directory-symlinks, no-follow-file-symlinks, +et al.). """) gp.add_argument( "--minimal", nargs="?", const="", default=None, metavar="TAG", @@ -128,7 +129,7 @@ const="no-follow-file-symlinks", dest="follow_symlinks", help="""Dont follow symbolic links to files when walking a -directory tree. Augments --logical.""") +directory tree. Augments --logical and -p.""") gp.add_argument( "--no-mmap", action="store_false", dest="mmap", default=None, help="Dont use mmap.") @@ -142,7 +143,17 @@ help="""Do not follow any symbolic links whether they are given on the command line or when walking the directory tree. Overwrites any other symlink related options -(--logical, follow-directory-symlinks, follow-file-symlinks, et al.). +(--logical, -p, follow-directory-symlinks, follow-file-symlinks, et al.). +This is the default.""") + gp.add_argument( + "-p", action=SymlinkAction, dest="follow_symlinks", + const=FollowSymlinkConfig(False, False, True), + help="""Do not follow any symbolic links to directories, +whether they are given on the command line or when walking the directory tree, +but follow symbolic links to files. +Overwrites any other symlink related options +(--logical, --physical, follow-directory-symlinks, no-follow-file-symlinks, +et al.). This is the default.""") gp.add_argument( "--print-size", action="store_true", @@ -336,10 +347,10 @@ assert False, "Implementation error: not yet implemented" # Not following symlinks to files is not yet supported: reset to True - if not curval.file: - curval = FollowSymlinkConfig( - curval.command_line, curval.directory, True) - logging.warning("Coercing options to `follow-file-symlinks'") +# if not curval.file: +# curval = FollowSymlinkConfig( +# curval.command_line, curval.directory, True) +# logging.warning("Coercing options to `follow-file-symlinks'") setattr(namespace, self.dest, curval) @@ -360,14 +371,14 @@ print_size=False, size_only=False, utf8=False): - # Not following symlinks to files is not yet supported: reset to True if not isinstance(follow_symlinks, FollowSymlinkConfig): raise TypeError("`follow_symlinks' must be a FollowSymlinkConfig") - if not follow_symlinks.file: - follow_symlinks = follow_symlinks._make([follow_symlinks.command_line, - follow_symlinks.directory, - True]) - logging.warning("Coercing to follow-symlinks-file") + # Not following symlinks to files is not yet supported: reset to True +# if not follow_symlinks.file: +# follow_symlinks = follow_symlinks._make([follow_symlinks.command_line, +# follow_symlinks.directory, +# True]) +# logging.warning("Coercing to follow-symlinks-file") opts = argparse.Namespace( directories=directories, algorithm=util.argv2algo(algorithm), @@ -581,9 +592,6 @@ "CRC32", self._outfp.hexcrcdigest(), None, False)) def _generate(self, root, top): - # This is currently always True - assert self._follow_symlinks.file - logging.debug("Handling %s/%r", root, top) path = os.path.join(root, *top) if top else root with walk.ScanDir(path) as dirscan: @@ -708,64 +716,139 @@ dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) else: - if self._utf8_mode: - if fso.u8name is None: - dir_tainted = True - dir_dgst.update(util.interpolate_bytes( - b"1:f,%d:%s,", - len(fso.alt_u8name), - fso.alt_u8name)) + if fso.is_symlink and not self._follow_symlinks.file: + linktgt = walk.WalkDirEntry.from_readlink( + os.readlink(fso.path)) + # linktgt = util.fsencode(os.readlink(fso.path))) + linkdgst = self._algorithm[0]() + if self._utf8_mode: + if linktgt.u8path is None: + dir_tainted = True + linkdgst.update(util.interpolate_bytes( + b"%d:%s,", + len(linktgt.alt_u8path), + linktgt.alt_u8path)) + else: + linkdgst.update(util.interpolate_bytes( + b"%d:%s,", + len(linktgt.u8path), + linktgt.u8path)) + if fso.u8name is None: + dir_tainted = True + dir_dgst.update(util.interpolate_bytes( + b"1:F,%d:%s,", + len(fso.alt_u8name), + fso.alt_u8name)) + else: + dir_dgst.update(util.interpolate_bytes( + b"1:F,%d:%s,", len(fso.u8name), fso.u8name)) else: - dir_dgst.update(util.interpolate_bytes( - b"1:f,%d:%s,", len(fso.u8name), fso.u8name)) - else: - if fso.fsname is None: - dir_tainted = True - dir_dgst.update(util.interpolate_bytes( - b"1:f,%d:%s,", - len(fso.alt_fsname), - fso.alt_fsname)) - else: - dir_dgst.update(util.interpolate_bytes( - b"1:f,%d:%s,", len(fso.fsname), fso.fsname)) - dir_size += fso.stat.st_size - if self._with_metadata_mtime: - mtime = datetime.datetime.utcfromtimestamp( - int(fso.stat.st_mtime)) - mtime = util.b(mtime.isoformat("T") + "Z") + if linktgt.fspath is None: + dir_tainted = True + linkdgst.update(util.interpolate_bytes( + b"%d:%s,", + len(linktgt.alt_fspath), + linktgt.alt_fspath)) + else: + linkdgst.update(util.interpolate_bytes( + b"%d:%s,", + len(linktgt.fspath), + linktgt.fspath)) + if fso.fsname is None: + dir_tainted = True + dir_dgst.update(util.interpolate_bytes( + b"1:F,%d:%s,", + len(fso.alt_fsname), + fso.alt_fsname)) + else: + dir_dgst.update(util.interpolate_bytes( + b"1:F,%d:%s,", len(fso.fsname), fso.fsname)) + # + # - no mtime and no mode for symlinks + # - also does not count for dir_size + # dir_dgst.update(util.interpolate_bytes( - b"5:mtime,%d:%s,", len(mtime), mtime)) - if self._with_metadata_full_mode: - modestr = util.b(normalized_mode_str(fso.stat.st_mode)) - dir_dgst.update(util.interpolate_bytes( - b"8:fullmode,%d:%s,", len(modestr), modestr)) - elif self._with_metadata_mode: - modestr = util.b(normalized_compatible_mode_str( - fso.stat.st_mode)) - dir_dgst.update(util.interpolate_bytes( - b"4:mode,%d:%s,", len(modestr), modestr)) - if not self._size_only: - dgst = digest.compute_digest_file( - self._algorithm[0], fso.path, use_mmap=self._use_mmap) - dir_dgst.update(util.interpolate_bytes( - b"%d:%s,", len(dgst), dgst)) - opath = join_output_path(top, fso.name) - if self._utf8_mode: - opath = walk.WalkDirEntry.alt_u8(opath) - else: - opath = walk.WalkDirEntry.alt_fs(opath) - if self._size_only: - self._outfp.write(format_bsd_line( - "SIZE", None, opath, False, fso.stat.st_size)) - else: - if self._print_size: + b"%d:%s,", + len(linkdgst.digest()), linkdgst.digest())) + opath = join_output_path(top, fso.name) + if self._utf8_mode: + opath = walk.WalkDirEntry.alt_u8(opath) + else: + opath = walk.WalkDirEntry.alt_fs(opath) + if self._size_only: self._outfp.write(format_bsd_line( - self._algorithm[1], dgst, opath, self._use_base64, - fso.stat.st_size)) + "SIZE", None, "%s/./@" % (opath,), False, 0)) else: self._outfp.write(format_bsd_line( - self._algorithm[1], dgst, opath, + self._algorithm[1], + linkdgst.digest(), + "%s/./@" % (opath,), self._use_base64)) + self._outfp.flush() + else: + # follow symlinks to files + if self._utf8_mode: + if fso.u8name is None: + dir_tainted = True + dir_dgst.update(util.interpolate_bytes( + b"1:f,%d:%s,", + len(fso.alt_u8name), + fso.alt_u8name)) + else: + dir_dgst.update(util.interpolate_bytes( + b"1:f,%d:%s,", len(fso.u8name), fso.u8name)) + else: + if fso.fsname is None: + dir_tainted = True + dir_dgst.update(util.interpolate_bytes( + b"1:f,%d:%s,", + len(fso.alt_fsname), + fso.alt_fsname)) + else: + dir_dgst.update(util.interpolate_bytes( + b"1:f,%d:%s,", len(fso.fsname), fso.fsname)) + dir_size += fso.stat.st_size + if self._with_metadata_mtime: + mtime = datetime.datetime.utcfromtimestamp( + int(fso.stat.st_mtime)) + mtime = util.b(mtime.isoformat("T") + "Z") + dir_dgst.update(util.interpolate_bytes( + b"5:mtime,%d:%s,", len(mtime), mtime)) + if self._with_metadata_full_mode: + modestr = util.b(normalized_mode_str(fso.stat.st_mode)) + dir_dgst.update(util.interpolate_bytes( + b"8:fullmode,%d:%s,", len(modestr), modestr)) + elif self._with_metadata_mode: + modestr = util.b(normalized_compatible_mode_str( + fso.stat.st_mode)) + dir_dgst.update(util.interpolate_bytes( + b"4:mode,%d:%s,", len(modestr), modestr)) + if not self._size_only: + dgst = digest.compute_digest_file( + self._algorithm[0], + fso.path, + use_mmap=self._use_mmap) + dir_dgst.update(util.interpolate_bytes( + b"%d:%s,", len(dgst), dgst)) + opath = join_output_path(top, fso.name) + if self._utf8_mode: + opath = walk.WalkDirEntry.alt_u8(opath) + else: + opath = walk.WalkDirEntry.alt_fs(opath) + if self._size_only: + self._outfp.write(format_bsd_line( + "SIZE", None, opath, False, fso.stat.st_size)) + else: + if self._print_size: + self._outfp.write(format_bsd_line( + self._algorithm[1], + dgst, opath, + self._use_base64, + fso.stat.st_size)) + else: + self._outfp.write(format_bsd_line( + self._algorithm[1], dgst, opath, + self._use_base64)) self._outfp.flush() opath = join_output_path(top, None) if opath:
