view cutils/shasum.py @ 343:b3931b511ed0

Also encode TAB characters specially for output in digest files. Because with the new tabular output style we have the TAB character as column separator.
author Franz Glasner <fzglas.hg@dom66.de>
date Tue, 01 Apr 2025 12:15:05 +0200
parents 48430941c18c
children
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# SPDX-FileCopyrightText: © 2020-2025 Franz Glasner
# SPDX-License-Identifier: BSD-3-Clause
# :-
r"""Pure Python implementation of `shasum`.

"""

from __future__ import print_function, absolute_import


__all__ = []


import argparse
import base64
import binascii
import errno
import io
import os
import re
import sys

from . import (__version__, __revision__)
from . import util
from .util import digest as digestmod


def main(argv=None):
    aparser = argparse.ArgumentParser(
        description="Python implementation of shasum",
        fromfile_prefix_chars='@')
    aparser.add_argument(
        "--algorithm", "-a", action="store", type=util.argv2algo,
        help="""1 (default, aka sha1), 224, 256, 384, 512,
3 (alias for sha3-512), 3-224, 3-256, 3-384, 3-512,
blake2b, blake2b-256, blake2s, blake2 (alias for blake2b),
blake2-256 (alias for blake2b-256), md5""")
    aparser.add_argument(
        "--base64", action="store_true",
        help="Output checksums in base64 notation, not hexadecimal (OpenBSD).")
    aparser.add_argument(
        "--binary", "-b", action="store_false", dest="text_mode",
        default=False,
        help="Read in binary mode (default)")
    aparser.add_argument(
        "--bsd", "-B", action="store_true", dest="bsd", default=False,
        help="""Write BSD style output. This is also the default output format
of :command:`openssl dgst`.""")
    aparser.add_argument(
        "--check", "-c", action="store_true",
        help="""Read digests from FILEs and check them.
If this option is specified, the FILE options become checklists. Each
checklist should contain hash results in a supported format, which will
be verified against the specified paths. Output consists of the digest
used, the file name, and an OK, FAILED, or MISSING for the result of
the comparison. This will validate any of the supported checksums.
If no file is given, stdin is used.""")
    aparser.add_argument(
        "--checklist", "-C", metavar="CHECKLIST",
        help="""Compare the checksum of each FILE against the checksums in
the CHECKLIST. Any specified FILE that is not listed in the CHECKLIST will
generate an error.""")
    aparser.add_argument(
        "--checklist-allow-distinfo", action="store_true",
        dest="allow_distinfo",
        help='''Allow FreeBSD "distinfo" formatted checklists:
ignore SIZE and TIMESTAMP lines.''')

    aparser.add_argument(
        "--follow-symlinks", action="store_true", dest="follow_symlinks",
        help="""Also follow symlinks that resolve to directories.
Only effective if `--recurse` is activated.""")

    aparser.add_argument(
        "--mmap", action="store_true", dest="mmap", default=None,
        help="""Use mmap if available. Default is to determine automatically
 from the filesize.""")
    aparser.add_argument(
        "--no-mmap", action="store_false", dest="mmap", default=None,
        help="Dont use mmap.")

    aparser.add_argument(
        "--recurse", action="store_true",
        help="""Recurse into sub-directories while interpreting every
FILE as a directory.""")

    aparser.add_argument(
        "--reverse", "-r", action="store_false", dest="bsd", default=False,
        help="""Explicitely select normal coreutils style output
(to be option compatible with BSD style commands and
:command:`openssl dgst -r`)""")
    aparser.add_argument(
        "--tag", action="store_true", dest="bsd", default=False,
        help="""Alias for the `--bsd' option (to be compatible with
:command:`b2sum`)""")
    aparser.add_argument(
        "--text", "-t", action="store_true", dest="text_mode", default=False,
        help="Read in text mode (not supported)")
    aparser.add_argument(
        "--version", "-v", action="version",
        version="%s (rv:%s)" % (__version__, __revision__))
    aparser.add_argument(
        "files", nargs="*", metavar="FILE")

    opts = aparser.parse_args(args=argv)

    if opts.text_mode:
        print("ERROR: text mode not supported", file=sys.stderr)
        sys.exit(78)   # :manpage:`sysexits(3)`  EX_CONFIG

    if opts.check and opts.checklist:
        print("ERROR: only one of --check or --checklist allowed",
              file=sys.stderr)
        sys.exit(64)   # :manpage:`sysexits(3)`  EX_USAGE

    if not opts.algorithm:
        opts.algorithm = util.argv2algo("1")

    opts.dest = None

    return shasum(opts)


def gen_opts(files=[], algorithm="SHA1", bsd=False, text_mode=False,
             checklist=False, check=False, dest=None, base64=False,
             allow_distinfo=False, mmap=None, recurse=False,
             follow_symlinks=False):
    if text_mode:
        raise ValueError("text mode not supported")
    if checklist and check:
        raise ValueError("only one of `checklist' or `check' is allowed")
    opts = argparse.Namespace(files=files,
                              algorithm=(util.algotag2algotype(algorithm),
                                         algorithm),
                              bsd=bsd,
                              checklist=checklist,
                              check=check,
                              text_mode=False,
                              dest=dest,
                              base64=base64,
                              allow_distinfo=allow_distinfo,
                              mmap=mmap,
                              recurse=recurse,
                              follow_symlinks=follow_symlinks)
    return opts


def shasum(opts):
    if opts.check:
        return verify_digests_from_files(opts)
    elif opts.checklist:
        return verify_digests_with_checklist(opts)
    else:
        return generate_digests(opts)


def generate_digests(opts):
    if opts.bsd:
        out = out_bsd
    else:
        out = out_std
    if opts.recurse:
        if not opts.files:
            opts.files.append(".")
        for dn in opts.files:
            if not os.path.isdir(dn):
                if os.path.exists(dn):
                    raise OSError(errno.ENOTDIR, "not a directory", dn)
                else:
                    raise OSError(errno.ENOENT, "directory does not exist", dn)
            for dirpath, dirnames, dirfiles in os.walk(
                    dn, followlinks=opts.follow_symlinks):
                dirnames.sort()
                dirfiles.sort()
                for fn in dirfiles:
                    path = os.path.join(dirpath, fn)
                    out(opts.dest or sys.stdout,
                        digestmod.compute_digest_file(
                            opts.algorithm[0], path, use_mmap=opts.mmap),
                        path,
                        opts.algorithm[1],
                        True,
                        opts.base64)
    else:
        if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
            if util.PY2:
                if sys.platform == "win32":
                    import msvcrt   # noqa: E401
                    msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
                source = sys.stdin
            else:
                source = sys.stdin.buffer
            out(sys.stdout,
                digestmod.compute_digest_stream(opts.algorithm[0], source),
                None,
                opts.algorithm[1],
                True,
                opts.base64)
        else:
            for fn in opts.files:
                out(opts.dest or sys.stdout,
                    digestmod.compute_digest_file(
                        opts.algorithm[0], fn, use_mmap=opts.mmap),
                    fn,
                    opts.algorithm[1],
                    True,
                    opts.base64)
    return 0


def compare_digests_equal(given_digest, expected_digest, algo):
    """Compare a newly computed binary digest `given_digest` with a digest
    string (hex or base64) in `expected_digest`.

    :param bytes given_digest:
    :param expected_digest: digest (as bytes) or hexlified or base64 encoded
                            digest (as str)
    :type expected_digest: str or bytes or bytearray
    :param algo: The algorithm (factory)
    :return: `True` if the digests are equal, `False` if not
    :rtype: bool

    """
    if isinstance(expected_digest, (bytes, bytearray)) \
       and len(expected_digest) == algo().digest_size:
        exd = expected_digest
    else:
        if len(expected_digest) == algo().digest_size * 2:
            # hex
            if re.search(r"\A[a-fA-F0-9]+\Z", expected_digest):
                try:
                    exd = binascii.unhexlify(expected_digest)
                except TypeError:
                    return False
            else:
                return False
        else:
            # base64
            if re.search(
                    r"\A(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?\Z",    # noqa: E501  line too long
                    expected_digest):
                try:
                    exd = base64.b64decode(expected_digest)
                except TypeError:
                    return False
            else:
                return False
    return given_digest == exd


def verify_digests_with_checklist(opts):
    dest = opts.dest or sys.stdout
    exit_code = 0
    if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
        if util.PY2:
            if sys.platform == "win32":
                import os, msvcrt   # noqa: E401
                msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
            source = sys.stdin
        else:
            source = sys.stdin.buffer
        pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, None)
        if pl is None:
            exit_code = 1
            print("-: MISSING", file=dest)
        else:
            tag, algo, cl_filename, cl_digest = pl
            computed_digest = digestmod.compute_digest_stream(algo, source)
            if compare_digests_equal(computed_digest, cl_digest, algo):
                res = "OK"
            else:
                res = "FAILED"
                exit_code = 1
            print("{}: {}: {}".format(tag, "-", res), file=dest)
    else:
        for fn in opts.files:
            pl = get_parsed_digest_line_from_checklist(
                opts.checklist, opts, fn)
            if pl is None:
                print("{}: MISSING".format(fn), file=dest)
                exit_code = 1
            else:
                tag, algo, cl_filename, cl_digest = pl
                computed_digest = digestmod.compute_digest_file(
                    algo, fn, use_mmap=opts.mmap)
                if compare_digests_equal(computed_digest, cl_digest, algo):
                    res = "OK"
                else:
                    exit_code = 1
                    res = "FAILED"
                print("{}: {}: {}".format(tag, fn, res), file=dest)
    return exit_code


def verify_digests_from_files(opts):
    dest = opts.dest or sys.stdout
    exit_code = 0
    if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
        for checkline in sys.stdin:
            if not checkline:
                continue
            r, fn, tag = handle_checkline(opts, checkline)
            if tag in ("SIZE", "TIMESTAMP"):
                assert opts.allow_distinfo
                continue
            print("{}: {}: {}".format(tag, fn, r.upper()), file=dest)
            if r != "ok" and exit_code == 0:
                exit_code = 1
    else:
        for fn in opts.files:
            with io.open(fn, "rt", encoding="utf-8") as checkfile:
                for checkline in checkfile:
                    if not checkline:
                        continue
                    r, fn, tag = handle_checkline(opts, checkline)
                    if tag in ("SIZE", "TIMESTAMP"):
                        assert opts.allow_distinfo
                        continue
                    print("{}: {}: {}".format(tag, fn, r.upper()), file=dest)
                    if r != "ok" and exit_code == 0:
                        exit_code = 1
    return exit_code


def handle_checkline(opts, line):
    """
    :return: a tuple with static "ok", "missing", or "failed", the filename and
             the digest used
    :rtype: tuple(str, str, str)

    """
    parts = parse_digest_line(opts, line)
    if not parts:
        raise ValueError(
            "improperly formatted digest line: {}".format(line))
    tag, algo, fn, digest = parts
    if tag in ("SIZE", "TIMESTAMP"):
        assert opts.allow_distinfo
        return (None, None, tag)
    try:
        d = digestmod.compute_digest_file(algo, fn, use_mmap=opts.mmap)
        if compare_digests_equal(d, digest, algo):
            return ("ok", fn, tag)
        else:
            return ("failed", fn, tag)
    except EnvironmentError:
        return ("missing", fn, tag)


def get_parsed_digest_line_from_checklist(checklist, opts, filename):
    if filename is None:
        filenames = ("-", "stdin", "", )
    else:
        filenames = (
            util.normalize_filename(filename, strip_leading_dot_slash=True),)
    with io.open(checklist, "rt", encoding="utf-8") as clf:
        for checkline in clf:
            if not checkline:
                continue
            parts = parse_digest_line(opts, checkline)
            if not parts:
                raise ValueError(
                    "improperly formatted digest line: {}".format(checkline))
            if parts[0] in ("SIZE", "TIMESTAMP"):
                assert opts.allow_distinfo
                continue
            fn = util.normalize_filename(
                parts[2], strip_leading_dot_slash=True)
            if fn in filenames:
                return parts
        else:
            return None


def parse_digest_line(opts, line):
    """Parse a `line` of a digest file and return its parts.

    This is rather strict. But if `opts.allow_distinfo` is `True` then
    some additional keywords ``SIZE`` and ``TIMESTAMP``are recignized
    and returned. The caller is responsible to handle them.

    :return: a tuple of the normalized algorithm tag, the algorithm
             constructor, the filename and the hex digest;
             if `line` cannot be parsed successfully `None` is returned
    :rtype: tuple(str, obj, str, str) or None

    Handles coreutils and BSD-style file formats.

    """
    # determine checkfile format (BSD or coreutils)
    # BSD?
    mo = re.search(r"\A(\S+)\s*\((.*)\)\s*=\s*(.+)\n?\Z", line)
    if mo:
        # (tag, algorithm, filename, digest)
        if opts.allow_distinfo:
            if mo.group(1) == "SIZE":
                return ("SIZE", None, None, mo.group(3))
        return (mo.group(1),
                util.algotag2algotype(mo.group(1)),
                mo.group(2),
                mo.group(3))
    else:
        if opts.allow_distinfo:
            mo = re.search(r"\ATIMESTAMP\s*=\s*([0-9]+)\s*\n\Z", line)
            if mo:
                return ("TIMESTAMP", None, None, mo.group(1))

        # coreutils?
        mo = re.search(r"([^\ ]+) [\*\ ]?(.+)\n?\Z", line)
        if mo:
            # (tag, algorithm, filename, digest)
            return (opts.algorithm[1],
                    opts.algorithm[0],
                    mo.group(2),
                    mo.group(1))
        else:
            return None


def out_bsd(dest, digest, filename, digestname, binary, use_base64):
    """BSD format output, also :command:`openssl dgst` and
    :command:`b2sum --tag" format output

    """
    if use_base64:
        digest = base64.b64encode(digest).decode("ascii")
    else:
        digest = binascii.hexlify(digest).decode("ascii")
    if filename is None:
        print(digest, file=dest)
    else:
        print("{} ({}) = {}".format(digestname,
                                    util.normalize_filename(filename),
                                    digest),
              file=dest)


def out_std(dest, digest, filename, digestname, binary, use_base64):
    """Coreutils format (:command:`shasum` et al.)

    """
    if use_base64:
        digest = base64.b64encode(digest).decode("ascii")
    else:
        digest = binascii.hexlify(digest).decode("ascii")
    print("{} {}{}".format(
              digest,
              '*' if binary else ' ',
              '-' if filename is None else util.normalize_filename(filename)),
          file=dest)


if __name__ == "__main__":
    sys.exit(main())