view cutils/treesum.py @ 176:7f5d05a625fd

Implement preconditions for some debug logging
author Franz Glasner <fzglas.hg@dom66.de>
date Sat, 11 Jan 2025 13:20:14 +0100
parents fc1055878775
children 089c40240061
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2020-2025 Franz Glasner
# :License:   BSD-3-Clause
# :-
r"""Generate and verify checksums for directory trees.

"""

from __future__ import print_function, absolute_import


__all__ = []


import argparse
import base64
import binascii
import datetime
import logging
import os
import stat
import sys
import time

from . import (__version__, __revision__)
from . import util
from .util import cm
from .util import digest
from .util import walk


def main(argv=None):

    def _populate_generate_arguments(gp):
        """Use to populate command aliases.

        This is because :class:`argparse.ArgumentParser` does not
        support them for all supported Python versions.

        """
        gp.add_argument(
            "--algorithm", "-a", action="store", type=util.argv2algo,
            help="1 (aka sha1), 224, 256 (aka sha256), 384, 512 (aka sha512), "
                 "3 (alias for sha3-512), 3-224, 3-256, 3-384, 3-512, "
                 "blake2b, blake2b-256, blake2s, "
                 "blake2 (alias for blake2b), "
                 "blake2-256 (alias for blake2b-256), "
                 "md5. "
                 "The default depends on availability in hashlib: "
                 "blake2b-256, sha256 or sha1.")
        gp.add_argument(
            "--append-output", action="store_true", dest="append_output",
            help="Append to the output file instead of overwriting it.")
        gp.add_argument(
            "--base64", action="store_true",
            help="Output checksums in base64 notation, not hexadecimal "
                 "(OpenBSD).")
        gp.add_argument(
            "--comment", action="append", default=[],
            help="Put given comment COMMENT into the output as \"COMMENT\". "
                 "Can be given more than once.")
        gp.add_argument(
            "--debug", action="store_true",
            help="Activate debug logging to stderr")
        gp.add_argument(
            "--follow-directory-symlinks", "-l", action="store_true",
            dest="follow_directory_symlinks",
            help="Follow symbolic links to directories when walking a "
                 "directory tree. Note that this is different from using "
                 "\"--logical\" or \"--physical\" for arguments given "
                 "directly on the command line")
        gp.add_argument(
            "--full-mode", action="store_true", dest="metadata_full_mode",
            help="Consider all mode bits as returned from stat(2) when "
                 "computing directory digests. "
                 "Note that mode bits on symbolic links itself are not "
                 "considered.")
        gp.add_argument(
            "--logical", "-L", dest="logical", action="store_true",
            default=None,
            help="Follow symbolic links given on command line arguments."
                 " Note that this is a different setting as to follow symbolic"
                 " links to directories when traversing a directory tree.")
        gp.add_argument(
            "--minimal", nargs="?", const="", default=None, metavar="TAG",
            help="Produce minimal output only. If a TAG is given and not "
                 "empty use it as the leading \"ROOT (<TAG>)\" output.")
        gp.add_argument(
            "--mmap", action="store_true", dest="mmap", default=None,
            help="Use mmap if available. Default is to determine "
                 "automatically from the filesize.")
        gp.add_argument(
            "--mode", action="store_true", dest="metadata_mode",
            help="Consider the permission bits of stat(2) using S_IMODE (i.e. "
                 "all bits without the filetype bits) when "
                 "computing directory digests. Note that mode bits on "
                 "symbolic links itself are not considered.")
        gp.add_argument(
            "--mtime", action="store_true", dest="metadata_mtime",
            help="Consider the mtime of files (non-directories) when "
                 "generating digests for directories. Digests for files are "
                 "not affected.")
        gp.add_argument(
            "--no-mmap", action="store_false", dest="mmap", default=None,
            help="Dont use mmap.")
        gp.add_argument(
            "--output", "-o", action="store", metavar="OUTPUT",
            help="Put the checksum into given file. "
                 "If not given or if it is given as `-' then stdout is used.")
        gp.add_argument(
            "--physical", "-P", dest="logical", action="store_false",
            default=None,
            help="Do not follow symbolic links given on comment line "
                 "arguments. This is the default.")
        gp.add_argument(
            "--print-size", action="store_true",
            help="""Print the size of a file or the accumulated sizes of
directory content into the output also.
The size is not considered when computing digests. For symbolic links
the size is not printed also.""")
        gp.add_argument(
            "--size-only", action="store_true",
            help="""Print only the size of files and for each directory its
accumulated directory size. Digests are not computed.""")
        gp.add_argument(
            "directories", nargs="*", metavar="DIRECTORY")

    parser = argparse.ArgumentParser(
        description="Generate and verify checksums for directory trees.",
        fromfile_prefix_chars='@',
        add_help=False)

    #
    # Global options for all sub-commands.
    # In a group because this allows a customized title.
    #
    gparser = parser.add_argument_group(title="Global Options")
    gparser.add_argument(
        "-v", "--version", action="version",
        version="%s (rv:%s)" % (__version__, __revision__),
        help="Show program's version number and exit")
    gparser.add_argument(
        "-h", "--help", action="help",
        help="Show this help message and exit")

    #
    # Subcommands
    #
    subparsers = parser.add_subparsers(
        dest="subcommand",
        title="Commands",
        description="This tool uses subcommands. "
                    "To see detailed help for a specific subcommand use "
                    "the -h/--help option after the subcommand name. "
                    "A list of valid commands and their short descriptions "
                    "is listed below:",
        metavar="COMMAND")

    genparser = subparsers.add_parser(
        "generate",
        help="Generate checksums for directory trees.",
        description="Generate checksums for directory trees")
    _populate_generate_arguments(genparser)
    # And an alias for "generate"
    genparser2 = subparsers.add_parser(
        "gen",
        help="Alias for \"generate\"",
        description="Generate checksums for directory trees. "
                    "This is an alias to \"generate\".")
    _populate_generate_arguments(genparser2)

    hparser = subparsers.add_parser(
        "help",
        help="Show this help message or a subcommand's help and exit",
        description="Show this help message or a subcommand's help and exit.")
    hparser.add_argument("help_command", nargs='?', metavar="COMMAND")

    vparser = subparsers.add_parser(
        "version",
        help="Show the program's version number and exit",
        description="Show the program's version number and exit.")

    # Parse leniently to just check for "version" and/or help
    opts, _dummy = parser.parse_known_args(args=argv)

    if opts.subcommand == "version":
        print("%s (rv:%s)" % (__version__, __revision__),
              file=sys.stdout)
        return 0
    if opts.subcommand == "help":
        if not opts.help_command:
            parser.print_help()
        else:
            if opts.help_command == "generate":
                genparser.print_help()
            elif opts.help_command == "gen":
                genparser2.print_help()
            elif opts.help_command == "version":
                vparser.print_help()
            elif opts.help_command == "help":
                hparser.print_help()
            else:
                parser.print_help()
        return 0

    # Reparse strictly
    opts = parser.parse_args(args=argv)

    # Minimal logging -- just for debugging - not for more "normal" use
    logging.basicConfig(
        level=logging.DEBUG if opts.debug else logging.WARNING,
        stream=sys.stderr,
        format="[%(asctime)s][%(levelname)s][%(process)d:%(name)s] %(message)s"
    )
    logging.captureWarnings(True)

    return treesum(opts)


def gen_generate_opts(directories=[],
                      algorithm=util.default_algotag(),
                      append_output=False,
                      base64=False,
                      comment=[],
                      follow_directory_symlinks=False,
                      full_mode=False,
                      logical=None,
                      minimal=None,
                      mode=False,
                      mmap=None,
                      mtime=False,
                      output=None,
                      print_size=False,
                      size_only=False):
    opts = argparse.Namespace(
        directories=directories,
        algorithm=util.argv2algo(algorithm),
        append_output=append_output,
        base64=base64,
        comment=comment,
        follow_directory_symlinks=follow_directory_symlinks,
        logical=logical,
        minimal=minimal,
        mmap=mmap,
        metadata_full_mode=full_mode,
        metadata_mode=mode,
        metadata_mtime=mtime,
        output=output,
        print_size=print_size,
        size_only=size_only)
    return opts


def treesum(opts):
    # XXX TBD: opts.check and opts.checklist (as in shasum.py)
    if opts.subcommand in ("generate", "gen"):
        return generate_treesum(opts)
    else:
        raise RuntimeError(
            "command `{}' not yet handled".format(opts.subcommand))


def generate_treesum(opts):
    # Provide defaults
    if not opts.algorithm:
        opts.algorithm = util.argv2algo(util.default_algotag())
    if not opts.directories:
        opts.directories.append(".")

    if opts.output is None or opts.output == "-":
        if hasattr(sys.stdout, "buffer"):
            out_cm = cm.nullcontext(sys.stdout.buffer)
        else:
            out_cm = cm.nullcontext(sys.stdout)
    else:
        if opts.append_output:
            out_cm = open(opts.output, "ab")
        else:
            out_cm = open(opts.output, "wb")

    with out_cm as outfp:
        for d in opts.directories:
            generate_treesum_for_directory(
                outfp, d, opts.algorithm, opts.mmap, opts.base64, opts.logical,
                opts.follow_directory_symlinks,
                opts.metadata_mode,
                opts.metadata_full_mode,
                opts.metadata_mtime,
                opts.size_only,
                opts.print_size,
                minimal=opts.minimal,
                comment=opts.comment)


def generate_treesum_for_directory(
        outfp, root, algorithm, use_mmap, use_base64, handle_root_logical,
        follow_directory_symlinks, with_metadata_mode, with_metadata_full_mode,
        with_metadata_mtime, size_only, print_size,
        minimal=None, comment=None):
    """

    :param outfp: a *binary* file with a "write()" and a "flush()" method

    """
    outfp.write(format_bsd_line("VERSION", "1", None, False))
    outfp.flush()

    # Note given non-default flags that are relevant for directory traversal
    flags = []
    if with_metadata_full_mode:
        flags.append("with-metadata-fullmode")
    elif with_metadata_mode:
        flags.append("with-metadata-mode")
    if with_metadata_mtime:
        flags.append("with-metadata-mtime")
    if handle_root_logical:
        flags.append("logical")
    if follow_directory_symlinks:
        flags.append("follow-directory-symlinks")
    if size_only:
        flags.append("size-only")
    else:
        if print_size:
            flags.append("print-size")
    if flags:
        flags.sort()
        outfp.write(format_bsd_line("FLAGS", ",".join(flags), None, False))
        outfp.flush()

    if minimal is None:
        # Write execution timestamps in POSIX epoch and ISO format
        ts = int(time.time())
        outfp.write(format_bsd_line("TIMESTAMP", ts, None, False))
        ts = (datetime.datetime.utcfromtimestamp(ts)).isoformat("T")
        outfp.write(format_bsd_line("ISOTIMESTAMP", ts, None, False))
        outfp.flush()

        if comment:
            for line in comment:
                outfp.write(format_bsd_line("COMMENT", None, line, False))

    if minimal is not None:
        outfp.write(
            format_bsd_line(
                "ROOT", None, minimal if minimal else "", False))
    else:
        outfp.write(format_bsd_line("ROOT", None, root, False))
    outfp.flush()

    dir_digests = {}

    if not handle_root_logical and os.path.islink(root):
        linktgt = util.fsencode(os.readlink(root))
        linkdgst = algorithm[0]()
        linkdgst.update(
            util.interpolate_bytes(b"%d:%s,", len(linktgt), linktgt))
        dir_dgst = algorithm[0]()
        dir_dgst.update(b"1:L,")
        dir_dgst.update(
            util.interpolate_bytes(
                b"%d:%s,", len(linkdgst.digest()), linkdgst.digest()))
        if size_only:
            outfp.write(
                format_bsd_line(
                    "SIZE",
                    None,
                    "./@",
                    False,
                    0))
        else:
            outfp.write(
                format_bsd_line(
                    algorithm[1],
                    dir_dgst.digest(),
                    "./@",
                    use_base64))
        outfp.flush()
        return

    for top, fsobjects in walk.walk(
            root,
            follow_symlinks=follow_directory_symlinks):
        dir_dgst = algorithm[0]()
        dir_size = 0

        for fso in fsobjects:
            if fso.is_dir:
                if fso.is_symlink and not follow_directory_symlinks:
                    linktgt = util.fsencode(os.readlink(fso.path))
                    linkdgst = algorithm[0]()
                    linkdgst.update(
                        util.interpolate_bytes(
                            b"%d:%s,", len(linktgt), linktgt))
                    dir_dgst.update(util.interpolate_bytes(
                        b"1:S,%d:%s,", len(fso.fsname), fso.fsname))
                    # no mtime and no mode for symlinks
                    dir_dgst.update(util.interpolate_bytes(
                        b"%d:%s,",
                        len(linkdgst.digest()), linkdgst.digest()))
                    opath = "/".join(top) + "/" + fso.name if top else fso.name
                    if size_only:
                        outfp.write(
                            format_bsd_line(
                                "SIZE",
                                None,
                                "%s/./@" % (opath,),
                                False,
                                0))
                    else:
                        outfp.write(
                            format_bsd_line(
                                algorithm[1],
                                linkdgst.digest(),
                                "%s/./@" % (opath,),
                                use_base64))
                    outfp.flush()
                    continue
                # fetch from dir_digests
                dgst, dsz = dir_digests[top + (fso.name,)]
                dir_size += dsz
                dir_dgst.update(util.interpolate_bytes(
                    b"1:d,%d:%s,", len(fso.fsname), fso.fsname))
                dir_dgst.update(util.interpolate_bytes(
                    b"%d:%s,", len(dgst), dgst))
                if with_metadata_full_mode:
                    modestr = normalized_mode_str(fso.stat.st_mode)
                    if not isinstance(modestr, bytes):
                        modestr = modestr.encode("ascii")
                    dir_dgst.update(util.interpolate_bytes(
                        b"8:fullmode,%d:%s,", len(modestr), modestr))
                elif with_metadata_mode:
                    modestr = normalized_compatible_mode_str(fso.stat.st_mode)
                    if not isinstance(modestr, bytes):
                        modestr = modestr.encode("ascii")
                    dir_dgst.update(util.interpolate_bytes(
                        b"4:mode,%d:%s,", len(modestr), modestr))
            else:
                dir_dgst.update(util.interpolate_bytes(
                    b"1:f,%d:%s,", len(fso.fsname), fso.fsname))
                dir_size += fso.stat.st_size
                if with_metadata_mtime:
                    mtime = datetime.datetime.utcfromtimestamp(
                        int(fso.stat.st_mtime))
                    mtime = mtime.isoformat("T") + "Z"
                    if not isinstance(mtime, bytes):
                        mtime = mtime.encode("ascii")
                    dir_dgst.update(util.interpolate_bytes(
                        b"5:mtime,%d:%s,", len(mtime), mtime))
                if with_metadata_full_mode:
                    modestr = normalized_mode_str(fso.stat.st_mode)
                    if not isinstance(modestr, bytes):
                        modestr = modestr.encode("ascii")
                    dir_dgst.update(util.interpolate_bytes(
                        b"8:fullmode,%d:%s,", len(modestr), modestr))
                elif with_metadata_mode:
                    modestr = normalized_compatible_mode_str(fso.stat.st_mode)
                    if not isinstance(modestr, bytes):
                        modestr = modestr.encode("ascii")
                    dir_dgst.update(util.interpolate_bytes(
                        b"4:mode,%d:%s,", len(modestr), modestr))
                if not size_only:
                    dgst = digest.compute_digest_file(
                        algorithm[0], fso.path, use_mmap=use_mmap)
                    dir_dgst.update(util.interpolate_bytes(
                        b"%d:%s,", len(dgst), dgst))
                opath = "/".join(top) + "/" + fso.name if top else fso.name
                if size_only:
                    outfp.write(
                        format_bsd_line(
                            "SIZE", None, opath, False, fso.stat.st_size))
                else:
                    if print_size:
                        outfp.write(
                            format_bsd_line(
                                algorithm[1], dgst, opath, use_base64,
                                fso.stat.st_size))
                    else:
                        outfp.write(
                            format_bsd_line(
                                algorithm[1], dgst, opath, use_base64))
                outfp.flush()
        opath = "/".join(top) + "/" if top else ""
        if size_only:
            outfp.write(format_bsd_line(
                    "SIZE", None, opath, False, dir_size))
        else:
            if print_size:
                outfp.write(format_bsd_line(
                    algorithm[1], dir_dgst.digest(), opath,
                    use_base64, dir_size))
            else:
                outfp.write(format_bsd_line(
                    algorithm[1], dir_dgst.digest(), opath, use_base64))
        outfp.flush()
        dir_digests[top] = (dir_dgst.digest(), dir_size)


def normalized_compatible_mode_str(mode):
    # XXX FIXME: Windows and "executable"
    modebits = stat.S_IMODE(mode)
    modestr = "%o" % (modebits,)
    if not modestr.startswith("0"):
        modestr = "0" + modestr
    return modestr


def normalized_mode_str(mode):
    modestr = "%o" % (mode,)
    if not modestr.startswith("0"):
        modestr = "0" + modestr
    return modestr


def format_bsd_line(what, value, filename, use_base64, size=None):
    ls = os.linesep if isinstance(os.linesep, bytes) \
        else os.linesep.encode("utf-8")
    if not isinstance(what, bytes):
        what = what.encode("ascii")
    if what == b"TIMESTAMP":
        assert filename is None
        return util.interpolate_bytes(b"TIMESTAMP = %d%s", value, ls)
    if what in (b"ISOTIMESTAMP", b"FLAGS", b"VERSION"):
        assert filename is None
        if not isinstance(value, bytes):
            value = value.encode("ascii")
        return util.interpolate_bytes(b"%s = %s%s", what, value, ls)
    assert filename is not None
    if what == b"COMMENT":
        if not isinstance(filename, bytes):
            filename = filename.encode("utf-8")
        return util.interpolate_bytes(b"COMMENT (%s)%s", filename, ls)
    if not isinstance(filename, bytes):
        filename = util.fsencode(filename)
    if what == b"SIZE":
        return util.interpolate_bytes(b"SIZE (%s) = %d%s", filename, size, ls)
    if value is None:
        return util.interpolate_bytes(b"%s (%s)%s", what, filename, ls)
    if use_base64:
        value = base64.b64encode(value)
    else:
        value = binascii.hexlify(value)
    if filename != b"./@":
        filename = util.normalize_filename(filename, True)
    if size is None:
        return util.interpolate_bytes(
            b"%s (%s) = %s%s", what, filename, value, ls)
    else:
        return util.interpolate_bytes(
            b"%s (%s) = %s,%d%s", what, filename, value, size, ls)


if __name__ == "__main__":
    sys.exit(main())