view shasum.py @ 14:409dac636805 v0.1

+++++ v0.1
author Franz Glasner <fzglas.hg@dom66.de>
date Fri, 04 Dec 2020 17:41:34 +0100
parents db64e282b049
children 184ab1da1307
line wrap: on
line source

r"""
:Author:    Franz Glasner
:Copyright: (c) 2020 Franz Glasner.
            All rights reserved.
:License:   BSD 3-Clause "New" or "Revised" License.
            See :ref:`LICENSE <license>` for details.
            If you cannot find LICENSE see
            <https://opensource.org/licenses/BSD-3-Clause>
:ID:        @(#) HGid$

"""

from __future__ import print_function


__version__ = "0.1"


import argparse
import hashlib
import io
import re
import sys


PY2 = sys.version_info[0] < 3

CHUNK_SIZE = 1024 * 1024 * 1024


def main(argv=None):
    aparser = argparse.ArgumentParser(
        description="Python implementation of shasum",
        fromfile_prefix_chars='@')
    aparser.add_argument(
        "--algorithm", "-a", action="store", type=argv2algo,
        help="1 (default), 224, 256, 384, 512, 3-224, 3-256, 3-384, 3-512, blake2b, blake2s, md5")
    aparser.add_argument(
        "--binary", "-b", action="store_false", dest="text_mode", default=False,
        help="read in binary mode (default)")
    aparser.add_argument(
        "--bsd", "-B", action="store_true", dest="bsd", default=False,
        help="write BSD style output; also :command:`openssl dgst` style")
    aparser.add_argument(
        "--check", "-c", action="store_true",
        help="read digests from FILEs and check them")
    aparser.add_argument(
        "--reverse", "-r", action="store_false", dest="bsd", default=False,
        help="explicitely select normal coreutils style output (to be option compatible with BSD style commands and :command:`openssl dgst -r`)")
    aparser.add_argument(
        "--tag", action="store_true", dest="bsd", default=False,
        help="alias for the `--bsd' option (to be compatible with :command:`b2sum`)")
    aparser.add_argument(
        "--text", "-t", action="store_true", dest="text_mode", default=False,
        help="read in text mode (not supported)")
    aparser.add_argument(
        "--version", "-v", action="version", version=__version__)
    aparser.add_argument(
        "files", nargs="*", metavar="FILE")

    opts = aparser.parse_args(args=argv)

    if opts.text_mode:
        print("ERROR: text mode not supported", file=sys.stderr)
        sys.exit(78)   # :manpage:`sysexits(3)` EX_CONFIG

    if not opts.algorithm:
        opts.algorithm = argv2algo("1")

    if not opts.files:
        opts.files.append('-')
    if opts.check:
        return verify_digests_from_files(opts)
    else:
        return generate_digests(opts)


def generate_digests(opts):
    if opts.bsd:
        out = out_bsd
    else:
        out = out_std
    if len(opts.files) == 1 and opts.files[0] == '-':
        if PY2:
            if sys.platform == "win32":
                import os, msvcrt   # noqa: E401
                msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
            source = sys.stdin
        else:
            source = sys.stdin.buffer
        out(sys.stdout,
            compute_digest(opts.algorithm[0], source),
            None,
            opts.algorithm[1],
            True)
    else:
        for fn in opts.files:
            with open(fn, "rb") as source:
                out(sys.stdout,
                    compute_digest(opts.algorithm[0], source),
                    fn,
                    opts.algorithm[1],
                    True)
    return 0


def verify_digests_from_files(opts):
    exit_code = 0
    if len(opts.files) == 1 and opts.files[0] == '-':
        for checkline in sys.stdin:
            if not checkline:
                continue
            r, fn = handle_checkline(opts, checkline)
            print("{}: {}".format(fn, r.upper()))
            if r != "ok" and exit_code == 0:
                exit_code = 1
    else:
        for fn in opts.files:
            with io.open(fn, "rt", encoding="utf-8") as checkfile:
                for checkline in checkfile:
                    if not checkline:
                        continue
                    r, fn = handle_checkline(opts, checkline)
                    print("{}: {}".format(fn, r.upper()))
                    if r != "ok" and exit_code == 0:
                        exit_code = 1
    return exit_code


def handle_checkline(opts, line):
    """
    :return: a tuple with static "ok", "missing", or "failed" and the filename
    :rtype: tuple(str, str)

    """
    # determine checkfile format (BSD or coreutils)
    # BSD?
    mo = re.search(r"\A(\S+)\s*\((.*)\)\s*=\s*(.+)\n?\Z", line)
    if mo:
        algo = algotag2algotype(mo.group(1))
        fn = mo.group(2)
        digest = mo.group(3)
    else:
        mo = re.search(r"([^\ ]+) [\*\ ]?(.+)\n?\Z", line)
        if mo:
            algo = opts.algorithm[0]
            fn = mo.group(2)
            digest = mo.group(1)
        else:
            raise ValueError(
                "improperly formatted digest line: {}".format(line))
    try:
        with open(fn, "rb") as input:
            d = compute_digest(algo, input)
            if d.lower() == digest.lower():
                return ("ok", fn)
            else:
                return ("failed", fn)
    except EnvironmentError:
        return ("missing", fn)


def argv2algo(s):
    """Convert a commane line algorithm specifier into a tuple with the
    type/factory of the digest and the algorithms tag for output purposes.

    :param str s: the specifier from the commane line
    :return: the internal digest specification
    :rtype: a tuple (digest_type_or_factory, name_in_output)

    String comparisons are done case-insensitively.

    """
    s = s.lower()
    if s in ("1", "sha1"):
        return (hashlib.sha1, "SHA1")
    elif s in ("224", "sha224"):
        return (hashlib.sha224, "SHA224")
    elif s in ("256", "sha256"):
        return (hashlib.sha256, "SHA256")
    elif s in ("384", "sha384"):
        return (hashlib.sha384, "SHA384")
    elif s in ("512", "sha512"):
        return (hashlib.sha512, "SHA512")
    elif s in ("3-224", "sha3-224"):
        return (hashlib.sha3_224, "SHA3-224")
    elif s in ("3-256", "sha3-256"):
        return (hashlib.sha3_256, "SHA3-256")
    elif s in ("3-384", "sha3-384"):
        return (hashlib.sha3_384, "SHA3-384")
    elif s in ("3-512", "sha3-512"):
        return (hashlib.sha3_512, "SHA3-512")
    elif s in ("blake2b", "blake2b-512"):
        return (hashlib.blake2b, "BLAKE2b")
    elif s in ("blake2s", "blake2s-256"):
        return (hashlib.blake2s, "BLAKE2s")
    elif s == "md5":
        return (hashlib.md5, "MD5")
    else:
        raise argparse.ArgumentTypeError(
            "`{}' is not a recognized algorithm".format(s))


def algotag2algotype(s):
    """Convert the algorithm specifier in a BSD-style digest file to the
    type/factory of the corresponding algorithm.

    :param str s: the tag (i.e. normalized name) or the algorithm
    :return: the digest type or factory for `s`

    All string comparisons are case-sensitive.

    """
    if s == "SHA1":
        return hashlib.sha1
    elif s == "SHA224":
        return hashlib.sha224
    elif s == "SHA256":
        return hashlib.sha256
    elif s == "SHA384":
        return hashlib.sha384
    elif s == "SHA512":
        return hashlib.sha512
    elif s == "SHA3-224":
        return hashlib.sha3_224
    elif s == "SHA3-256":
        return hashlib.sha3_256
    elif s == "SHA3-384":
        return hashlib.sha3_384
    elif s == "SHA3-512":
        return hashlib.sha3_512
    elif s == "BLAKE2b":
        return hashlib.blake2b
    elif s == "BLAKE2s":
        return hashlib.blake2s
    elif s == "MD5":
        return hashlib.md5
    else:
        raise ValueError("unknown algorithm: {}".format(s))


def out_bsd(dest, digest, filename, digestname, binary):
    """BSD format output, also :command:`openssl dgst` and
    :command:`b2sum --tag" format output

    """
    if filename is None:
        print(digest, file=dest)
    else:
        print("{} ({}) = {}".format(digestname, filename, digest),
              file=dest)


def out_std(dest, digest, filename, digestname, binary):
    """Coreutils format (:command:`shasum` et al.)

    """
    print("{} {}{}".format(digest,
                           '*' if binary else ' ',
                           '-' if filename is None else filename),
          file=dest)


def compute_digest(hashobj, instream):
    """

    :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory
    :param instream: a bytes input stream to read the data to be hashed from
    :return: the digest in hex form
    :rtype: str

    """
    h = hashobj()
    while True:
        buf = instream.read(CHUNK_SIZE)
        if buf is not None:
            if len(buf) == 0:
                break
            h.update(buf)
    return h.hexdigest()


if __name__ == "__main__":
    sys.exit(main())