diff cutils/shasum.py @ 72:ae2df602beb4

Make shasum.py and dos2unix sub-modules to the new "cutils" package
author Franz Glasner <fzglas.hg@dom66.de>
date Sat, 26 Feb 2022 19:20:20 +0100
parents shasum.py@29fb33aa639a
children c3268f4e752f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cutils/shasum.py	Sat Feb 26 19:20:20 2022 +0100
@@ -0,0 +1,588 @@
+r"""
+:Author:    Franz Glasner
+:Copyright: (c) 2020-2022 Franz Glasner.
+            All rights reserved.
+:License:   BSD 3-Clause "New" or "Revised" License.
+            See :ref:`LICENSE <license>` for details.
+            If you cannot find LICENSE see
+            <https://opensource.org/licenses/BSD-3-Clause>
+:ID:        @(#) $HGid$
+
+"""
+
+from __future__ import print_function, absolute_import
+
+from . import (__version__, __revision__, __date__)
+
+import argparse
+import base64
+import binascii
+import errno
+import hashlib
+try:
+    from hmac import compare_digest
+except ImportError:
+    compare_digest = None
+import io
+try:
+    import mmap
+except ImportError:
+    mmap = None
+import os
+try:
+    import pathlib
+except ImportError:
+    pathlib = None
+import re
+import stat
+import sys
+
+
+PY2 = sys.version_info[0] < 3
+
+if PY2:
+    PATH_TYPES = (unicode, str)    # noqa: F821 (undefined name 'unicode')
+else:
+    if pathlib:
+        PATH_TYPES = (str, bytes, pathlib.Path)
+    else:
+        PATH_TYPES = (str, bytes)
+
+CHUNK_SIZE = 1024*1024
+MAP_CHUNK_SIZE = 64*1024*1024
+
+
+def main(argv=None):
+    aparser = argparse.ArgumentParser(
+        description="Python implementation of shasum",
+        fromfile_prefix_chars='@')
+    aparser.add_argument(
+        "--algorithm", "-a", action="store", type=argv2algo,
+        help="1 (default), 224, 256, 384, 512, 3-224, 3-256, 3-384, 3-512, blake2b, blake2s, md5")
+    aparser.add_argument(
+        "--base64", action="store_true",
+        help="Output checksums in base64 notation, not hexadecimal (OpenBSD).")
+    aparser.add_argument(
+        "--binary", "-b", action="store_false", dest="text_mode", default=False,
+        help="Read in binary mode (default)")
+    aparser.add_argument(
+        "--bsd", "-B", action="store_true", dest="bsd", default=False,
+        help="Write BSD style output. This is also the default output format of :command:`openssl dgst`.")
+    aparser.add_argument(
+        "--check", "-c", action="store_true",
+        help="""Read digests from FILEs and check them.
+If this option is specified, the FILE options become checklists. Each
+checklist should contain hash results in a supported format, which will
+be verified against the specified paths. Output consists of the digest
+used, the file name, and an OK, FAILED, or MISSING for the result of
+the comparison. This will validate any of the supported checksums.
+If no file is given, stdin is used.""")
+    aparser.add_argument(
+        "--checklist", "-C", metavar="CHECKLIST",
+        help="""Compare the checksum of each FILE against the checksums in
+the CHECKLIST. Any specified FILE that is not listed in the CHECKLIST will
+generate an error.""")
+
+    aparser.add_argument(
+        "--reverse", "-r", action="store_false", dest="bsd", default=False,
+        help="Explicitely select normal coreutils style output (to be option compatible with BSD style commands and :command:`openssl dgst -r`)")
+    aparser.add_argument(
+        "--tag", action="store_true", dest="bsd", default=False,
+        help="Alias for the `--bsd' option (to be compatible with :command:`b2sum`)")
+    aparser.add_argument(
+        "--text", "-t", action="store_true", dest="text_mode", default=False,
+        help="Read in text mode (not supported)")
+    aparser.add_argument(
+        "--version", "-v", action="version", version="%s (rv:%s)" % (__version__, __revision__))
+    aparser.add_argument(
+        "files", nargs="*", metavar="FILE")
+
+    opts = aparser.parse_args(args=argv)
+
+    if opts.text_mode:
+        print("ERROR: text mode not supported", file=sys.stderr)
+        sys.exit(78)   # :manpage:`sysexits(3)`  EX_CONFIG
+
+    if opts.check and opts.checklist:
+        print("ERROR: only one of --check or --checklist allowed",
+              file=sys.stderr)
+        sys.exit(64)   # :manpage:`sysexits(3)`  EX_USAGE
+
+    if not opts.algorithm:
+        opts.algorithm = argv2algo("1")
+
+    opts.dest = None
+
+    return shasum(opts)
+
+
+def gen_opts(files=[], algorithm="SHA1", bsd=False, text_mode=False,
+             checklist=False, check=False, dest=None, base64=False):
+    if text_mode:
+        raise ValueError("text mode not supported")
+    if checklist and check:
+        raise ValueError("only one of `checklist' or `check' is allowed")
+    opts = argparse.Namespace(files=files,
+                              algorithm=(algotag2algotype(algorithm),
+                                         algorithm),
+                              bsd=bsd,
+                              checklist=checklist,
+                              check=check,
+                              text_mode=False,
+                              dest=dest,
+                              base64=base64)
+    return opts
+
+
+def shasum(opts):
+    if opts.check:
+        return verify_digests_from_files(opts)
+    elif opts.checklist:
+        return verify_digests_with_checklist(opts)
+    else:
+        return generate_digests(opts)
+
+
+def generate_digests(opts):
+    if opts.bsd:
+        out = out_bsd
+    else:
+        out = out_std
+    if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
+        if PY2:
+            if sys.platform == "win32":
+                import os, msvcrt   # noqa: E401
+                msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
+            source = sys.stdin
+        else:
+            source = sys.stdin.buffer
+        out(sys.stdout,
+            compute_digest_stream(opts.algorithm[0], source),
+            None,
+            opts.algorithm[1],
+            True,
+            opts.base64)
+    else:
+        for fn in opts.files:
+            out(opts.dest or sys.stdout,
+                compute_digest_file(opts.algorithm[0], fn),
+                fn,
+                opts.algorithm[1],
+                True,
+                opts.base64)
+    return 0
+
+
+def compare_digests_equal(given_digest, expected_digest, algo):
+    """Compare a newly computed binary digest `given_digest` with a digest
+    string (hex or base64) in `expected_digest`.
+
+    :param bytes given_digest:
+    :param expected_digest: digest (as bytes) or hexlified or base64 encoded
+                            digest (as str)
+    :type expected_digest: str or bytes or bytearray
+    :param algo: The algorithm (factory)
+    :return: `True` if the digests are equal, `False` if not
+    :rtype: bool
+
+    """
+    if isinstance(expected_digest, (bytes, bytearray)) \
+       and len(expected_digest) == algo().digest_size:
+        exd = expected_digest
+    else:
+        if len(expected_digest) == algo().digest_size * 2:
+            # hex
+            if re.search(r"\A[a-fA-F0-9]+\Z", expected_digest):
+                try:
+                    exd = binascii.unhexlify(expected_digest)
+                except TypeError:
+                    return False
+            else:
+                return False
+        else:
+            # base64
+            if re.search(
+                    r"\A(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?\Z",
+                    expected_digest):
+                try:
+                    exd = base64.b64decode(expected_digest)
+                except TypeError:
+                    return False
+            else:
+                return False
+    if compare_digest:
+        return compare_digest(given_digest, exd)
+    else:
+        return given_digest == exd
+
+
+def verify_digests_with_checklist(opts):
+    dest = opts.dest or sys.stdout
+    exit_code = 0
+    if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
+        if PY2:
+            if sys.platform == "win32":
+                import os, msvcrt   # noqa: E401
+                msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
+            source = sys.stdin
+        else:
+            source = sys.stdin.buffer
+        pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, None)
+        if pl is None:
+            exit_code = 1
+            print("-: MISSING", file=dest)
+        else:
+            tag, algo, cl_filename, cl_digest = pl
+            computed_digest = compute_digest_stream(algo, source)
+            if compare_digests_equal(computed_digest, cl_digest, algo):
+                res = "OK"
+            else:
+                res = "FAILED"
+                exit_code = 1
+            print("{}: {}: {}".format(tag, "-", res), file=dest)
+    else:
+        for fn in opts.files:
+            pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, fn)
+            if pl is None:
+                print("{}: MISSING".format(fn), file=dest)
+                exit_code = 1
+            else:
+                tag, algo, cl_filename, cl_digest = pl
+                computed_digest = compute_digest_file(algo, fn)
+                if compare_digests_equal(computed_digest, cl_digest, algo):
+                    res = "OK"
+                else:
+                    exit_code = 1
+                    res = "FAILED"
+                print("{}: {}: {}".format(tag, fn, res), file=dest)
+    return exit_code
+
+
+def verify_digests_from_files(opts):
+    dest = opts.dest or sys.stdout
+    exit_code = 0
+    if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
+        for checkline in sys.stdin:
+            if not checkline:
+                continue
+            r, fn, tag = handle_checkline(opts, checkline)
+            print("{}: {}: {}".format(tag, fn, r.upper()), file=dest)
+            if r != "ok" and exit_code == 0:
+                exit_code = 1
+    else:
+        for fn in opts.files:
+            with io.open(fn, "rt", encoding="utf-8") as checkfile:
+                for checkline in checkfile:
+                    if not checkline:
+                        continue
+                    r, fn, tag = handle_checkline(opts, checkline)
+                    print("{}: {}: {}".format(tag, fn, r.upper()), file=dest)
+                    if r != "ok" and exit_code == 0:
+                        exit_code = 1
+    return exit_code
+
+
+def handle_checkline(opts, line):
+    """
+    :return: a tuple with static "ok", "missing", or "failed", the filename and
+             the digest used
+    :rtype: tuple(str, str, str)
+
+    """
+    parts = parse_digest_line(opts, line)
+    if not parts:
+        raise ValueError(
+            "improperly formatted digest line: {}".format(line))
+    tag, algo, fn, digest = parts
+    try:
+        d = compute_digest_file(algo, fn)
+        if compare_digests_equal(d, digest, algo):
+            return ("ok", fn, tag)
+        else:
+            return ("failed", fn, tag)
+    except EnvironmentError:
+        return ("missing", fn, tag)
+
+
+def get_parsed_digest_line_from_checklist(checklist, opts, filename):
+    if filename is None:
+        filenames = ("-", "stdin", "", )
+    else:
+        filenames = (
+            normalize_filename(filename, strip_leading_dot_slash=True),)
+    with io.open(checklist, "rt", encoding="utf-8") as clf:
+        for checkline in clf:
+            if not checkline:
+                continue
+            parts = parse_digest_line(opts, checkline)
+            if not parts:
+                raise ValueError(
+                    "improperly formatted digest line: {}".format(checkline))
+            fn = normalize_filename(parts[2], strip_leading_dot_slash=True)
+            if fn in filenames:
+                return parts
+        else:
+            return None
+
+
+def parse_digest_line(opts, line):
+    """Parse a `line` of a digest file and return its parts.
+
+    :return: a tuple of the normalized algorithm tag, the algorithm
+             constructor, the filename and the hex digest;
+             if `line` cannot be parsed successfully `None` is returned
+    :rtype: tuple(str, obj, str, str) or None
+
+    Handles coreutils and BSD-style file formats.
+
+    """
+    # determine checkfile format (BSD or coreutils)
+    # BSD?
+    mo = re.search(r"\A(\S+)\s*\((.*)\)\s*=\s*(.+)\n?\Z", line)
+    if mo:
+        # (tag, algorithm, filename, digest)
+        return (mo.group(1),
+                algotag2algotype(mo.group(1)),
+                mo.group(2),
+                mo.group(3))
+    else:
+        # coreutils?
+        mo = re.search(r"([^\ ]+) [\*\ ]?(.+)\n?\Z", line)
+        if mo:
+            # (tag, algorithm, filename, digest)
+            return (opts.algorithm[1],
+                    opts.algorithm[0],
+                    mo.group(2),
+                    mo.group(1))
+        else:
+            return None
+
+
+def argv2algo(s):
+    """Convert a command line algorithm specifier into a tuple with the
+    type/factory of the digest and the algorithms tag for output purposes.
+
+    :param str s: the specifier from the commane line
+    :return: the internal digest specification
+    :rtype: a tuple (digest_type_or_factory, name_in_output)
+
+    String comparisons are done case-insensitively.
+
+    """
+    s = s.lower()
+    if s in ("1", "sha1"):
+        return (hashlib.sha1, "SHA1")
+    elif s in ("224", "sha224"):
+        return (hashlib.sha224, "SHA224")
+    elif s in ("256", "sha256"):
+        return (hashlib.sha256, "SHA256")
+    elif s in ("384", "sha384"):
+        return (hashlib.sha384, "SHA384")
+    elif s in ("512", "sha512"):
+        return (hashlib.sha512, "SHA512")
+    elif s in ("3-224", "sha3-224"):
+        return (hashlib.sha3_224, "SHA3-224")
+    elif s in ("3-256", "sha3-256"):
+        return (hashlib.sha3_256, "SHA3-256")
+    elif s in ("3-384", "sha3-384"):
+        return (hashlib.sha3_384, "SHA3-384")
+    elif s in ("3-512", "sha3-512"):
+        return (hashlib.sha3_512, "SHA3-512")
+    elif s in ("blake2b", "blake2b-512"):
+        return (hashlib.blake2b, "BLAKE2b")
+    elif s in ("blake2s", "blake2s-256"):
+        return (hashlib.blake2s, "BLAKE2s")
+    elif s == "md5":
+        return (hashlib.md5, "MD5")
+    else:
+        raise argparse.ArgumentTypeError(
+            "`{}' is not a recognized algorithm".format(s))
+
+
+def algotag2algotype(s):
+    """Convert the algorithm specifier in a BSD-style digest file to the
+    type/factory of the corresponding algorithm.
+
+    :param str s: the tag (i.e. normalized name) or the algorithm
+    :return: the digest type or factory for `s`
+
+    All string comparisons are case-sensitive.
+
+    """
+    if s == "SHA1":
+        return hashlib.sha1
+    elif s == "SHA224":
+        return hashlib.sha224
+    elif s == "SHA256":
+        return hashlib.sha256
+    elif s == "SHA384":
+        return hashlib.sha384
+    elif s == "SHA512":
+        return hashlib.sha512
+    elif s == "SHA3-224":
+        return hashlib.sha3_224
+    elif s == "SHA3-256":
+        return hashlib.sha3_256
+    elif s == "SHA3-384":
+        return hashlib.sha3_384
+    elif s == "SHA3-512":
+        return hashlib.sha3_512
+    elif s == "BLAKE2b":
+        return hashlib.blake2b
+    elif s == "BLAKE2s":
+        return hashlib.blake2s
+    elif s == "MD5":
+        return hashlib.md5
+    else:
+        raise ValueError("unknown algorithm: {}".format(s))
+
+
+def out_bsd(dest, digest, filename, digestname, binary, use_base64):
+    """BSD format output, also :command:`openssl dgst` and
+    :command:`b2sum --tag" format output
+
+    """
+    if use_base64:
+        digest = base64.b64encode(digest).decode("ascii")
+    else:
+        digest = binascii.hexlify(digest).decode("ascii")
+    if filename is None:
+        print(digest, file=dest)
+    else:
+        print("{} ({}) = {}".format(digestname,
+                                    normalize_filename(filename),
+                                    digest),
+              file=dest)
+
+
+def out_std(dest, digest, filename, digestname, binary, use_base64):
+    """Coreutils format (:command:`shasum` et al.)
+
+    """
+    if use_base64:
+        digest = base64.b64encode(digest).decode("ascii")
+    else:
+        digest = binascii.hexlify(digest).decode("ascii")
+    print("{} {}{}".format(
+                digest,
+                '*' if binary else ' ',
+                '-' if filename is None else normalize_filename(filename)),
+          file=dest)
+
+
+def compute_digest_file(hashobj, path, use_mmap=True):
+    """
+    :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory
+    :param path: filename within the filesystem or a file descriptor opened in
+                 binary mode (also a socket or pipe)
+    :param bool use_mmap: use the :mod:`mmap` module if available
+    :return: the digest in binary form
+    :rtype: bytes
+
+    If a file descriptor is given is must support :func:`os.read`.
+
+    """
+    h = hashobj()
+    if isinstance(path, PATH_TYPES):
+        flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) \
+            | getattr(os, "O_SEQUENTIAL", 0) | getattr(os, "O_NOCTTY", 0)
+        fd = os.open(path, flags)
+        own_fd = True
+    else:
+        fd = path
+        own_fd = False
+    try:
+        try:
+            st = os.fstat(fd)
+        except TypeError:
+            #
+            # "fd" is most probably a Python socket object.
+            # (a pipe typically supports fstat)
+            #
+            use_mmap = False
+        else:
+            if stat.S_ISREG(st[stat.ST_MODE]):
+                filesize = st[stat.ST_SIZE]
+            else:
+                use_mmap = False
+        if mmap is None or not use_mmap:
+            # No mmmap available -> use traditional low-level file IO
+            while True:
+                try:
+                    buf = os.read(fd, CHUNK_SIZE)
+                except OSError as e:
+                    if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
+                        raise
+                else:
+                    if len(buf) == 0:
+                        break
+                    h.update(buf)
+        else:
+            #
+            # Use mmap
+            #
+            # NOTE: On Windows mmapped files with length 0 are not supported.
+            #       So ensure to not call mmap.mmap() if the file size is 0.
+            #
+            madvise = getattr(mmap.mmap, "madvise", None)
+            if filesize < MAP_CHUNK_SIZE:
+                mapsize = filesize
+            else:
+                mapsize = MAP_CHUNK_SIZE
+            mapoffset = 0
+            rest = filesize
+            while rest > 0:
+                m = mmap.mmap(fd,
+                              mapsize,
+                              access=mmap.ACCESS_READ,
+                              offset=mapoffset)
+                if madvise:
+                    madvise(m, mmap.MADV_SEQUENTIAL)
+                try:
+                    h.update(m)
+                finally:
+                    m.close()
+                rest -= mapsize
+                mapoffset += mapsize
+                if rest < mapsize:
+                    mapsize = rest
+    finally:
+        if own_fd:
+            os.close(fd)
+    return h.digest()
+
+
+def compute_digest_stream(hashobj, instream):
+    """
+
+    :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory
+    :param instream: a bytes input stream to read the data to be hashed from
+    :return: the digest in binary form
+    :rtype: bytes
+
+    """
+    h = hashobj()
+    while True:
+        try:
+            buf = instream.read(CHUNK_SIZE)
+        except OSError as e:
+            if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
+                raise
+        else:
+            if buf is not None:
+                if len(buf) == 0:
+                    break
+                h.update(buf)
+    return h.digest()
+
+
+def normalize_filename(filename, strip_leading_dot_slash=False):
+    filename = filename.replace("\\", "/")
+    if strip_leading_dot_slash:
+        while filename.startswith("./"):
+            filename = filename[2:]
+    return filename
+
+
+if __name__ == "__main__":
+    sys.exit(main())