view cutils/treesum.py @ 156:481cc9b26861

Calculate "stat()" for directories also in a WalkDirEntry
author Franz Glasner <fzglas.hg@dom66.de>
date Mon, 06 Jan 2025 14:38:07 +0100
parents bf74ce3c968d
children 27d1aaf5fe39
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2020-2025 Franz Glasner
# :License:   BSD-3-Clause
# :-
r"""Generate and verify checksums for directory trees.

"""

from __future__ import print_function, absolute_import


__all__ = []


import argparse
import base64
import binascii
import datetime
import os
import sys
import time

from . import (__version__, __revision__)
from . import util
from .util import cm
from .util import digest
from .util import walk


def main(argv=None):

    def _populate_generate_arguments(gp):
        """Use to populate command aliases.

        This is because :class:`argparse.ArgumentParser` does not
        support them for all supported Python versions.

        """
        gp.add_argument(
            "--algorithm", "-a", action="store", type=util.argv2algo,
            help="1 (aka sha1), 224, 256, 384, 512, "
                 "3 (alias for sha3-512), 3-224, 3-256, 3-384, 3-512, "
                 "blake2b, blake2b-256 (default), blake2s, "
                 "blake2 (alias for blake2b), "
                 "blake2-256 (alias for blake2b-256), "
                 "md5")
        gp.add_argument(
            "--append-output", action="store_true", dest="append_output",
            help="Append to the output file instead of overwriting it.")
        gp.add_argument(
            "--base64", action="store_true",
            help="Output checksums in base64 notation, not hexadecimal "
                 "(OpenBSD).")
        gp.add_argument(
            "--comment", action="append", default=[],
            help="Put given comment COMMENT into the output as \"COMMENT\". "
                 "Can be given more than once.")
        gp.add_argument(
            "--follow-directory-symlinks", "-l", action="store_true",
            dest="follow_directory_symlinks",
            help="Follow symbolic links to directories when walking a "
                 "directory tree. Note that this is different from using "
                 "\"--logical\" or \"--physical\" for arguments given "
                 "directly on the command line")
        gp.add_argument(
            "--logical", "-L", dest="logical", action="store_true",
            default=None,
            help="Follow symbolic links given on command line arguments."
                 " Note that this is a different setting as to follow symbolic"
                 " links to directories when traversing a directory tree.")
        gp.add_argument(
            "--minimal", nargs="?", const="", default=None, metavar="TAG",
            help="Produce minimal output only. If a TAG is given and not "
                 "empty use it as the leading \"ROOT (<TAG>)\" output.")
        gp.add_argument(
            "--mmap", action="store_true", dest="mmap", default=None,
            help="Use mmap if available. Default is to determine "
                 "automatically from the filesize.")
        gp.add_argument(
            "--mtime", action="store_true", dest="metadata_mtime",
            help="Consider the mtime of files (non-directories) when "
                 "generating digests for directories. Digests for files are "
                 "not affected.")
        gp.add_argument(
            "--no-mmap", action="store_false", dest="mmap", default=None,
            help="Dont use mmap.")
        gp.add_argument(
            "--output", "-o", action="store", metavar="OUTPUT",
            help="Put the checksum into given file. "
                 "If not given or if it is given as `-' then stdout is used.")
        gp.add_argument(
            "--physical", "-P", dest="logical", action="store_false",
            default=None,
            help="Do not follow symbolic links given on comment line "
                 "arguments. This is the default.")
        gp.add_argument(
            "directories", nargs="*", metavar="DIRECTORY")

    parser = argparse.ArgumentParser(
        description="Generate and verify checksums for directory trees.",
        fromfile_prefix_chars='@',
        add_help=False)

    #
    # Global options for all sub-commands.
    # In a group because this allows a customized title.
    #
    gparser = parser.add_argument_group(title="Global Options")
    gparser.add_argument(
        "-v", "--version", action="version",
        version="%s (rv:%s)" % (__version__, __revision__),
        help="Show program's version number and exit")
    gparser.add_argument(
        "-h", "--help", action="help",
        help="Show this help message and exit")

    #
    # Subcommands
    #
    subparsers = parser.add_subparsers(
        dest="subcommand",
        title="Commands",
        description="This tool uses subcommands. "
                    "To see detailed help for a specific subcommand use "
                    "the -h/--help option after the subcommand name. "
                    "A list of valid commands and their short descriptions "
                    "is listed below:",
        metavar="COMMAND")

    genparser = subparsers.add_parser(
        "generate",
        help="Generate checksums for directory trees.",
        description="Generate checksums for directory trees")
    _populate_generate_arguments(genparser)
    # And an alias for "generate"
    genparser2 = subparsers.add_parser(
        "gen",
        help="Alias for \"generate\"",
        description="Generate checksums for directory trees. "
                    "This is an alias to \"generate\".")
    _populate_generate_arguments(genparser2)

    hparser = subparsers.add_parser(
        "help",
        help="Show this help message or a subcommand's help and exit",
        description="Show this help message or a subcommand's help and exit.")
    hparser.add_argument("help_command", nargs='?', metavar="COMMAND")

    vparser = subparsers.add_parser(
        "version",
        help="Show the program's version number and exit",
        description="Show the program's version number and exit.")

    # Parse leniently to just check for "version" and/or help
    opts, _dummy = parser.parse_known_args(args=argv)

    if opts.subcommand == "version":
        print("%s (rv:%s)" % (__version__, __revision__),
              file=sys.stdout)
        sys.exit(0)
    if opts.subcommand == "help":
        if not opts.help_command:
            parser.print_help()
        else:
            if opts.help_command == "generate":
                genparser.print_help()
            elif opts.help_command == "gen":
                genparser2.print_help()
            elif opts.help_command == "version":
                vparser.print_help()
            elif opts.help_command == "help":
                hparser.print_help()
            else:
                parser.print_help()
        sys.exit(0)

    # Reparse strictly
    opts = parser.parse_args(args=argv)

    return treesum(opts)


def gen_generate_opts(directories=[],
                      algorithm="BLAKE2b-256",
                      append_output=False,
                      base64=False,
                      comment=[],
                      follow_directory_symlinks=False,
                      logical=None,
                      minimal=None,
                      mmap=None,
                      mtime=False,
                      output=None):
    opts = argparse.Namespace(
        directories=directories,
        algorithm=(util.algotag2algotype(algorithm),
                   algorithm),
        append_output=append_output,
        base64=base64,
        comment=comment,
        follow_directory_symlinks=follow_directory_symlinks,
        logical=logical,
        minimal=minimal,
        mmap=mmap,
        metadata_mtime=mtime,
        output=output)
    return opts


def treesum(opts):
    # XXX TBD: opts.check and opts.checklist (as in shasum.py)
    if opts.subcommand in ("generate", "gen"):
        return generate_treesum(opts)
    else:
        raise RuntimeError(
            "command `{}' not yet handled".format(opts.subcommand))


def generate_treesum(opts):
    # Provide defaults
    if not opts.algorithm:
        opts.algorithm = util.argv2algo("blake2b-256")
    if not opts.directories:
        opts.directories.append(".")

    if opts.output is None or opts.output == "-":
        if hasattr(sys.stdout, "buffer"):
            out_cm = cm.nullcontext(sys.stdout.buffer)
        else:
            out_cm = cm.nullcontext(sys.stdout)
    else:
        if opts.append_output:
            out_cm = open(opts.output, "ab")
        else:
            out_cm = open(opts.output, "wb")

    with out_cm as outfp:
        for d in opts.directories:
            generate_treesum_for_directory(
                outfp, d, opts.algorithm, opts.mmap, opts.base64, opts.logical,
                opts.follow_directory_symlinks,
                opts.metadata_mtime,
                minimal=opts.minimal,
                comment=opts.comment)


def generate_treesum_for_directory(
        outfp, root, algorithm, use_mmap, use_base64, handle_root_logical,
        follow_directory_symlinks, with_metadata_mtime,
        minimal=None, comment=None):
    """

    :param outfp: a *binary* file with a "write()" and a "flush()" method

    """
    outfp.write(format_bsd_line("VERSION", "1", None, False))
    outfp.flush()

    # Note given non-default flags that are relevant for directory traversal
    flags = []
    if with_metadata_mtime:
        flags.append("with-metadata-mtime")
    if handle_root_logical:
        flags.append("logical")
    if follow_directory_symlinks:
        flags.append("follow-directory-symlinks")
    if flags:
        outfp.write(format_bsd_line("FLAGS", ",".join(flags), None, False))
        outfp.flush()

    if minimal is None:
        # Write execution timestamps in POSIX epoch and ISO format
        ts = int(time.time())
        outfp.write(format_bsd_line("TIMESTAMP", ts, None, False))
        ts = (datetime.datetime.utcfromtimestamp(ts)).isoformat("T")
        outfp.write(format_bsd_line("ISOTIMESTAMP", ts, None, False))
        outfp.flush()

        if comment:
            for line in comment:
                outfp.write(format_bsd_line("COMMENT", None, line, False))

    if minimal is not None:
        outfp.write(
            format_bsd_line(
                "ROOT", None, minimal if minimal else "", False))
    else:
        outfp.write(format_bsd_line("ROOT", None, root, False))
    outfp.flush()

    dir_digests = {}

    if not handle_root_logical and os.path.islink(root):
        linktgt = util.fsencode(os.readlink(root))
        linkdgst = algorithm[0]()
        linkdgst.update(b"%d:%s," % (len(linktgt), linktgt))
        dir_dgst = algorithm[0]()
        dir_dgst.update(b"1:L,")
        dir_dgst.update(linkdgst.digest())
        outfp.write(
            format_bsd_line(
                algorithm[1],
                dir_dgst.digest(),
                "./@",
                use_base64))
        outfp.flush()
        return

    for top, fsobjects in walk.walk(
            root,
            follow_symlinks=follow_directory_symlinks):
        dir_dgst = algorithm[0]()
        for fso in fsobjects:
            # print("NNNNNNNN", fso.name, fso.stat,
            # "%o (%o)" % (fso.stat.st_mode, stat.S_IMODE(fso.stat.st_mode)))
            if fso.is_dir:
                if fso.is_symlink and not follow_directory_symlinks:
                    linktgt = util.fsencode(os.readlink(fso.path))
                    linkdgst = algorithm[0]()
                    linkdgst.update(b"%d:%s," % (len(linktgt), linktgt))
                    dir_dgst.update(b"1:S,%d:%s,"
                                    % (len(fso.fsname), fso.fsname))
                    dir_dgst.update(linkdgst.digest())
                    opath = "/".join(top) + "/" + fso.name if top else fso.name
                    outfp.write(
                        format_bsd_line(
                            algorithm[1],
                            linkdgst.digest(),
                            "%s/./@" % (opath,),
                            use_base64))
                    outfp.flush()
                    continue
                # fetch from dir_digests
                dgst = dir_digests[top + (fso.name,)]
                dir_dgst.update(b"1:d,%d:%s," % (len(fso.fsname), fso.fsname))
                dir_dgst.update(dgst)
            else:
                dir_dgst.update(b"1:f,%d:%s," % (len(fso.fsname), fso.fsname))
                if with_metadata_mtime:
                    mtime = datetime.datetime.utcfromtimestamp(
                        int(fso.stat.st_mtime))
                    mtime = mtime.isoformat("T") + "Z"
                    if not isinstance(mtime, bytes):
                        mtime = mtime.encode("ascii")
                    dir_dgst.update(b"5:mtime,%d:%s," % (len(mtime), mtime))
                dgst = digest.compute_digest_file(
                    algorithm[0], fso.path, use_mmap=use_mmap)
                dir_dgst.update(dgst)
                opath = "/".join(top) + "/" + fso.name if top else fso.name
                outfp.write(
                    format_bsd_line(
                        algorithm[1], dgst, opath, use_base64))
                outfp.flush()
        opath = "/".join(top) + "/" if top else ""
        outfp.write(format_bsd_line(
            algorithm[1], dir_dgst.digest(), opath, use_base64))
        outfp.flush()
        dir_digests[top] = dir_dgst.digest()


def format_bsd_line(digestname, value, filename, use_base64):
    ls = os.linesep if isinstance(os.linesep, bytes) \
        else os.linesep.encode("utf-8")
    if not isinstance(digestname, bytes):
        digestname = digestname.encode("ascii")
    if digestname == b"TIMESTAMP":
        assert filename is None
        return b"TIMESTAMP = %d%s" % (value, ls)
    if digestname in (b"ISOTIMESTAMP", b"FLAGS", b"VERSION"):
        assert filename is None
        if not isinstance(value, bytes):
            value = value.encode("ascii")
        return b"%s = %s%s" % (digestname, value, ls)
    assert filename is not None
    if digestname == b"COMMENT":
        if not isinstance(filename, bytes):
            filename = filename.encode("utf-8")
        return b"COMMENT (%s)%s" % (filename, ls)
    if not isinstance(filename, bytes):
        filename = util.fsencode(filename)
    if value is None:
        return b"%s (%s)%s" % (digestname, filename, ls)
    if use_base64:
        value = base64.b64encode(value)
    else:
        value = binascii.hexlify(value)
    if filename != b"./@":
        filename = util.normalize_filename(filename, True)
    return b"%s (%s) = %s%s" % (digestname, filename, value, ls)


if __name__ == "__main__":
    sys.exit(main())