Mercurial > hgrepos > Python > apps > py-cutils
view cutils/treesum.py @ 155:bf74ce3c968d
When computing digests use the order imposed by names alone.
No different loops for dirs and nondirs.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Mon, 06 Jan 2025 13:39:12 +0100 |
| parents | 3505406ef9f3 |
| children | 481cc9b26861 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2020-2025 Franz Glasner # :License: BSD-3-Clause # :- r"""Generate and verify checksums for directory trees. """ from __future__ import print_function, absolute_import __all__ = [] import argparse import base64 import binascii import datetime import os import sys import time from . import (__version__, __revision__) from . import util from .util import cm from .util import digest from .util import walk def main(argv=None): def _populate_generate_arguments(gp): """Use to populate command aliases. This is because :class:`argparse.ArgumentParser` does not support them for all supported Python versions. """ gp.add_argument( "--algorithm", "-a", action="store", type=util.argv2algo, help="1 (aka sha1), 224, 256, 384, 512, " "3 (alias for sha3-512), 3-224, 3-256, 3-384, 3-512, " "blake2b, blake2b-256 (default), blake2s, " "blake2 (alias for blake2b), " "blake2-256 (alias for blake2b-256), " "md5") gp.add_argument( "--append-output", action="store_true", dest="append_output", help="Append to the output file instead of overwriting it.") gp.add_argument( "--base64", action="store_true", help="Output checksums in base64 notation, not hexadecimal " "(OpenBSD).") gp.add_argument( "--comment", action="append", default=[], help="Put given comment COMMENT into the output as \"COMMENT\". " "Can be given more than once.") gp.add_argument( "--follow-directory-symlinks", "-l", action="store_true", dest="follow_directory_symlinks", help="Follow symbolic links to directories when walking a " "directory tree. Note that this is different from using " "\"--logical\" or \"--physical\" for arguments given " "directly on the command line") gp.add_argument( "--logical", "-L", dest="logical", action="store_true", default=None, help="Follow symbolic links given on command line arguments." " Note that this is a different setting as to follow symbolic" " links to directories when traversing a directory tree.") gp.add_argument( "--minimal", nargs="?", const="", default=None, metavar="TAG", help="Produce minimal output only. If a TAG is given and not " "empty use it as the leading \"ROOT (<TAG>)\" output.") gp.add_argument( "--mmap", action="store_true", dest="mmap", default=None, help="Use mmap if available. Default is to determine " "automatically from the filesize.") gp.add_argument( "--mtime", action="store_true", dest="metadata_mtime", help="Consider the mtime of files (non-directories) when " "generating digests for directories. Digests for files are " "not affected.") gp.add_argument( "--no-mmap", action="store_false", dest="mmap", default=None, help="Dont use mmap.") gp.add_argument( "--output", "-o", action="store", metavar="OUTPUT", help="Put the checksum into given file. " "If not given or if it is given as `-' then stdout is used.") gp.add_argument( "--physical", "-P", dest="logical", action="store_false", default=None, help="Do not follow symbolic links given on comment line " "arguments. This is the default.") gp.add_argument( "directories", nargs="*", metavar="DIRECTORY") parser = argparse.ArgumentParser( description="Generate and verify checksums for directory trees.", fromfile_prefix_chars='@', add_help=False) # # Global options for all sub-commands. # In a group because this allows a customized title. # gparser = parser.add_argument_group(title="Global Options") gparser.add_argument( "-v", "--version", action="version", version="%s (rv:%s)" % (__version__, __revision__), help="Show program's version number and exit") gparser.add_argument( "-h", "--help", action="help", help="Show this help message and exit") # # Subcommands # subparsers = parser.add_subparsers( dest="subcommand", title="Commands", description="This tool uses subcommands. " "To see detailed help for a specific subcommand use " "the -h/--help option after the subcommand name. " "A list of valid commands and their short descriptions " "is listed below:", metavar="COMMAND") genparser = subparsers.add_parser( "generate", help="Generate checksums for directory trees.", description="Generate checksums for directory trees") _populate_generate_arguments(genparser) # And an alias for "generate" genparser2 = subparsers.add_parser( "gen", help="Alias for \"generate\"", description="Generate checksums for directory trees. " "This is an alias to \"generate\".") _populate_generate_arguments(genparser2) hparser = subparsers.add_parser( "help", help="Show this help message or a subcommand's help and exit", description="Show this help message or a subcommand's help and exit.") hparser.add_argument("help_command", nargs='?', metavar="COMMAND") vparser = subparsers.add_parser( "version", help="Show the program's version number and exit", description="Show the program's version number and exit.") # Parse leniently to just check for "version" and/or help opts, _dummy = parser.parse_known_args(args=argv) if opts.subcommand == "version": print("%s (rv:%s)" % (__version__, __revision__), file=sys.stdout) sys.exit(0) if opts.subcommand == "help": if not opts.help_command: parser.print_help() else: if opts.help_command == "generate": genparser.print_help() elif opts.help_command == "gen": genparser2.print_help() elif opts.help_command == "version": vparser.print_help() elif opts.help_command == "help": hparser.print_help() else: parser.print_help() sys.exit(0) # Reparse strictly opts = parser.parse_args(args=argv) return treesum(opts) def gen_generate_opts(directories=[], algorithm="BLAKE2b-256", append_output=False, base64=False, comment=[], follow_directory_symlinks=False, logical=None, minimal=None, mmap=None, mtime=False, output=None): opts = argparse.Namespace( directories=directories, algorithm=(util.algotag2algotype(algorithm), algorithm), append_output=append_output, base64=base64, comment=comment, follow_directory_symlinks=follow_directory_symlinks, logical=logical, minimal=minimal, mmap=mmap, metadata_mtime=mtime, output=output) return opts def treesum(opts): # XXX TBD: opts.check and opts.checklist (as in shasum.py) if opts.subcommand in ("generate", "gen"): return generate_treesum(opts) else: raise RuntimeError( "command `{}' not yet handled".format(opts.subcommand)) def generate_treesum(opts): # Provide defaults if not opts.algorithm: opts.algorithm = util.argv2algo("blake2b-256") if not opts.directories: opts.directories.append(".") if opts.output is None or opts.output == "-": if hasattr(sys.stdout, "buffer"): out_cm = cm.nullcontext(sys.stdout.buffer) else: out_cm = cm.nullcontext(sys.stdout) else: if opts.append_output: out_cm = open(opts.output, "ab") else: out_cm = open(opts.output, "wb") with out_cm as outfp: for d in opts.directories: generate_treesum_for_directory( outfp, d, opts.algorithm, opts.mmap, opts.base64, opts.logical, opts.follow_directory_symlinks, opts.metadata_mtime, minimal=opts.minimal, comment=opts.comment) def generate_treesum_for_directory( outfp, root, algorithm, use_mmap, use_base64, handle_root_logical, follow_directory_symlinks, with_metadata_mtime, minimal=None, comment=None): """ :param outfp: a *binary* file with a "write()" and a "flush()" method """ outfp.write(format_bsd_line("VERSION", "1", None, False)) outfp.flush() # Note given non-default flags that are relevant for directory traversal flags = [] if with_metadata_mtime: flags.append("with-metadata-mtime") if handle_root_logical: flags.append("logical") if follow_directory_symlinks: flags.append("follow-directory-symlinks") if flags: outfp.write(format_bsd_line("FLAGS", ",".join(flags), None, False)) outfp.flush() if minimal is None: # Write execution timestamps in POSIX epoch and ISO format ts = int(time.time()) outfp.write(format_bsd_line("TIMESTAMP", ts, None, False)) ts = (datetime.datetime.utcfromtimestamp(ts)).isoformat("T") outfp.write(format_bsd_line("ISOTIMESTAMP", ts, None, False)) outfp.flush() if comment: for line in comment: outfp.write(format_bsd_line("COMMENT", None, line, False)) if minimal is not None: outfp.write( format_bsd_line( "ROOT", None, minimal if minimal else "", False)) else: outfp.write(format_bsd_line("ROOT", None, root, False)) outfp.flush() dir_digests = {} if not handle_root_logical and os.path.islink(root): linktgt = util.fsencode(os.readlink(root)) linkdgst = algorithm[0]() linkdgst.update(b"%d:%s," % (len(linktgt), linktgt)) dir_dgst = algorithm[0]() dir_dgst.update(b"1:L,") dir_dgst.update(linkdgst.digest()) outfp.write( format_bsd_line( algorithm[1], dir_dgst.digest(), "./@", use_base64)) outfp.flush() return for top, fsobjects in walk.walk( root, follow_symlinks=follow_directory_symlinks): dir_dgst = algorithm[0]() for fso in fsobjects: if fso.is_dir: if fso.is_symlink and not follow_directory_symlinks: linktgt = util.fsencode(os.readlink(fso.path)) linkdgst = algorithm[0]() linkdgst.update(b"%d:%s," % (len(linktgt), linktgt)) dir_dgst.update(b"1:S,%d:%s," % (len(fso.fsname), fso.fsname)) dir_dgst.update(linkdgst.digest()) opath = "/".join(top) + "/" + fso.name if top else fso.name outfp.write( format_bsd_line( algorithm[1], linkdgst.digest(), "%s/./@" % (opath,), use_base64)) outfp.flush() continue # fetch from dir_digests dgst = dir_digests[top + (fso.name,)] dir_dgst.update(b"1:d,%d:%s," % (len(fso.fsname), fso.fsname)) dir_dgst.update(dgst) else: dir_dgst.update(b"1:f,%d:%s," % (len(fso.fsname), fso.fsname)) if with_metadata_mtime: mtime = datetime.datetime.utcfromtimestamp( int(fso.stat.st_mtime)) mtime = mtime.isoformat("T") + "Z" if not isinstance(mtime, bytes): mtime = mtime.encode("ascii") dir_dgst.update(b"5:mtime,%d:%s," % (len(mtime), mtime)) dgst = digest.compute_digest_file( algorithm[0], fso.path, use_mmap=use_mmap) dir_dgst.update(dgst) opath = "/".join(top) + "/" + fso.name if top else fso.name outfp.write( format_bsd_line( algorithm[1], dgst, opath, use_base64)) outfp.flush() opath = "/".join(top) + "/" if top else "" outfp.write(format_bsd_line( algorithm[1], dir_dgst.digest(), opath, use_base64)) outfp.flush() dir_digests[top] = dir_dgst.digest() def format_bsd_line(digestname, value, filename, use_base64): ls = os.linesep if isinstance(os.linesep, bytes) \ else os.linesep.encode("utf-8") if not isinstance(digestname, bytes): digestname = digestname.encode("ascii") if digestname == b"TIMESTAMP": assert filename is None return b"TIMESTAMP = %d%s" % (value, ls) if digestname in (b"ISOTIMESTAMP", b"FLAGS", b"VERSION"): assert filename is None if not isinstance(value, bytes): value = value.encode("ascii") return b"%s = %s%s" % (digestname, value, ls) assert filename is not None if digestname == b"COMMENT": if not isinstance(filename, bytes): filename = filename.encode("utf-8") return b"COMMENT (%s)%s" % (filename, ls) if not isinstance(filename, bytes): filename = util.fsencode(filename) if value is None: return b"%s (%s)%s" % (digestname, filename, ls) if use_base64: value = base64.b64encode(value) else: value = binascii.hexlify(value) if filename != b"./@": filename = util.normalize_filename(filename, True) return b"%s (%s) = %s%s" % (digestname, filename, value, ls) if __name__ == "__main__": sys.exit(main())
