# HG changeset patch # User Franz Glasner # Date 1645899620 -3600 # Node ID ae2df602beb472d0ce938ac4469eb7d25cec87e6 # Parent 29fb33aa639a44f6ce94cd3534866d23e46c822b Make shasum.py and dos2unix sub-modules to the new "cutils" package diff -r 29fb33aa639a -r ae2df602beb4 _cutils.py --- a/_cutils.py Sat Feb 26 18:55:57 2022 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -r""" -:Author: Franz Glasner -:Copyright: (c) 2020-2022 Franz Glasner. - All rights reserved. -:License: BSD 3-Clause "New" or "Revised" License. - See :ref:`LICENSE ` for details. - If you cannot find LICENSE see - -:ID: @(#) $HGid$ - -""" - -__version__ = "0.3.3" - -__all__ = ["__version__"] diff -r 29fb33aa639a -r ae2df602beb4 cutils/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cutils/__init__.py Sat Feb 26 19:20:20 2022 +0100 @@ -0,0 +1,18 @@ +r""" +:Author: Franz Glasner +:Copyright: (c) 2020-2022 Franz Glasner. + All rights reserved. +:License: BSD 3-Clause "New" or "Revised" License. + See :ref:`LICENSE ` for details. + If you cannot find LICENSE see + +:ID: @(#) $HGid$ + +""" + +__version__ = "0.3.3" + +__revision__ = "|VCSRevision|" +__date__ = "|VCSJustDate|" + +__all__ = ["__version__"] diff -r 29fb33aa639a -r ae2df602beb4 cutils/dos2unix.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cutils/dos2unix.py Sat Feb 26 19:20:20 2022 +0100 @@ -0,0 +1,103 @@ +r""" +:Author: Franz Glasner +:Copyright: (c) 2020-2022 Franz Glasner. + All rights reserved. +:License: BSD 3-Clause "New" or "Revised" License. + See :ref:`LICENSE ` for details. + If you cannot find LICENSE see + +:ID: @(#) $HGid$ + +""" + +from __future__ import print_function, absolute_import + +from . import (__version__, __revision__, __date__) + +import argparse +import io +import sys + + +def main(argv=None): + aparser = argparse.ArgumentParser( + description="Python implementation of dos2unix", + fromfile_prefix_chars='@') + aparser.add_argument( + "--version", "-V", action="version", + version="%s (rv:%s)" % (__version__, __revision__)) + aparser.add_argument( + "--keepdate", "-k", action="store_true", + help="Keep the date stamp of output file same as input file.") + aparser.add_argument( + "--oldfile", "-o", action="store_false", dest="newfile", default=False, + help="Old file mode. Convert the file and write output to it." + " The program defaults to run in this mode." + " Wildcard names may be used. ") + aparser.add_argument( + "--newfile", "-n", action="store_true", dest="newfile", default=False, + help="New file mode. Convert the infile and write output to outfile." + " File names must be given in pairs and wildcard names should" + " NOT be used or you WILL lose your files.") + aparser.add_argument( + "--quiet", "-q", action="store_true", + help="Quiet mode. Suppress all warning and messages.") + + aparser.add_argument( + "files", nargs="+", metavar="FILE") + + opts = aparser.parse_args(args=argv) + + if opts.keepdate: + raise NotImplementedError("--keepdate, -k") + + return dos2unix(opts) + + +def gen_opts(files=[], newfile=False, keepdate=False, quiet=True): + if keepdate: + raise NotImplementedError("--keepdate, -k") + + if newfile and (len(files) % 2): + raise ValueError("need pairs of files") + + opts = argparse.Namespace(files=files, + newfile=newfile, + keepdate=keepdate, + quiet=quiet) + return opts + + +def dos2unix(opts): + if opts.newfile: + return _convert_copy(opts) + else: + return _convert_inplace(opts) + + +def _convert_inplace(opts): + lines = [] + for filename in opts.files: + with io.open(filename, "rt", encoding="iso-8859-1") as source: + for line in source: + lines.append(line.encode("iso-8859-1")) + with open(filename, "wb") as dest: + for line in lines: + dest.write(line) + + +def _convert_copy(opts): + if len(opts.files) % 2: + print("ERROR: need pairs of files", file=sys.stderr) + return 64 # :manpage:`sysexits(3)` EX_USAGE + idx = 0 + while idx < len(opts.files): + with io.open(opts.files[idx], "rt", encoding="iso-8859-1") as source: + with open(opts.files[idx+1], "wb") as dest: + for line in source: + dest.write(line.encode("iso-8859-1")) + idx += 2 + + +if __name__ == "__main__": + sys.exit(main()) diff -r 29fb33aa639a -r ae2df602beb4 cutils/shasum.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cutils/shasum.py Sat Feb 26 19:20:20 2022 +0100 @@ -0,0 +1,588 @@ +r""" +:Author: Franz Glasner +:Copyright: (c) 2020-2022 Franz Glasner. + All rights reserved. +:License: BSD 3-Clause "New" or "Revised" License. + See :ref:`LICENSE ` for details. + If you cannot find LICENSE see + +:ID: @(#) $HGid$ + +""" + +from __future__ import print_function, absolute_import + +from . import (__version__, __revision__, __date__) + +import argparse +import base64 +import binascii +import errno +import hashlib +try: + from hmac import compare_digest +except ImportError: + compare_digest = None +import io +try: + import mmap +except ImportError: + mmap = None +import os +try: + import pathlib +except ImportError: + pathlib = None +import re +import stat +import sys + + +PY2 = sys.version_info[0] < 3 + +if PY2: + PATH_TYPES = (unicode, str) # noqa: F821 (undefined name 'unicode') +else: + if pathlib: + PATH_TYPES = (str, bytes, pathlib.Path) + else: + PATH_TYPES = (str, bytes) + +CHUNK_SIZE = 1024*1024 +MAP_CHUNK_SIZE = 64*1024*1024 + + +def main(argv=None): + aparser = argparse.ArgumentParser( + description="Python implementation of shasum", + fromfile_prefix_chars='@') + aparser.add_argument( + "--algorithm", "-a", action="store", type=argv2algo, + help="1 (default), 224, 256, 384, 512, 3-224, 3-256, 3-384, 3-512, blake2b, blake2s, md5") + aparser.add_argument( + "--base64", action="store_true", + help="Output checksums in base64 notation, not hexadecimal (OpenBSD).") + aparser.add_argument( + "--binary", "-b", action="store_false", dest="text_mode", default=False, + help="Read in binary mode (default)") + aparser.add_argument( + "--bsd", "-B", action="store_true", dest="bsd", default=False, + help="Write BSD style output. This is also the default output format of :command:`openssl dgst`.") + aparser.add_argument( + "--check", "-c", action="store_true", + help="""Read digests from FILEs and check them. +If this option is specified, the FILE options become checklists. Each +checklist should contain hash results in a supported format, which will +be verified against the specified paths. Output consists of the digest +used, the file name, and an OK, FAILED, or MISSING for the result of +the comparison. This will validate any of the supported checksums. +If no file is given, stdin is used.""") + aparser.add_argument( + "--checklist", "-C", metavar="CHECKLIST", + help="""Compare the checksum of each FILE against the checksums in +the CHECKLIST. Any specified FILE that is not listed in the CHECKLIST will +generate an error.""") + + aparser.add_argument( + "--reverse", "-r", action="store_false", dest="bsd", default=False, + help="Explicitely select normal coreutils style output (to be option compatible with BSD style commands and :command:`openssl dgst -r`)") + aparser.add_argument( + "--tag", action="store_true", dest="bsd", default=False, + help="Alias for the `--bsd' option (to be compatible with :command:`b2sum`)") + aparser.add_argument( + "--text", "-t", action="store_true", dest="text_mode", default=False, + help="Read in text mode (not supported)") + aparser.add_argument( + "--version", "-v", action="version", version="%s (rv:%s)" % (__version__, __revision__)) + aparser.add_argument( + "files", nargs="*", metavar="FILE") + + opts = aparser.parse_args(args=argv) + + if opts.text_mode: + print("ERROR: text mode not supported", file=sys.stderr) + sys.exit(78) # :manpage:`sysexits(3)` EX_CONFIG + + if opts.check and opts.checklist: + print("ERROR: only one of --check or --checklist allowed", + file=sys.stderr) + sys.exit(64) # :manpage:`sysexits(3)` EX_USAGE + + if not opts.algorithm: + opts.algorithm = argv2algo("1") + + opts.dest = None + + return shasum(opts) + + +def gen_opts(files=[], algorithm="SHA1", bsd=False, text_mode=False, + checklist=False, check=False, dest=None, base64=False): + if text_mode: + raise ValueError("text mode not supported") + if checklist and check: + raise ValueError("only one of `checklist' or `check' is allowed") + opts = argparse.Namespace(files=files, + algorithm=(algotag2algotype(algorithm), + algorithm), + bsd=bsd, + checklist=checklist, + check=check, + text_mode=False, + dest=dest, + base64=base64) + return opts + + +def shasum(opts): + if opts.check: + return verify_digests_from_files(opts) + elif opts.checklist: + return verify_digests_with_checklist(opts) + else: + return generate_digests(opts) + + +def generate_digests(opts): + if opts.bsd: + out = out_bsd + else: + out = out_std + if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'): + if PY2: + if sys.platform == "win32": + import os, msvcrt # noqa: E401 + msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) + source = sys.stdin + else: + source = sys.stdin.buffer + out(sys.stdout, + compute_digest_stream(opts.algorithm[0], source), + None, + opts.algorithm[1], + True, + opts.base64) + else: + for fn in opts.files: + out(opts.dest or sys.stdout, + compute_digest_file(opts.algorithm[0], fn), + fn, + opts.algorithm[1], + True, + opts.base64) + return 0 + + +def compare_digests_equal(given_digest, expected_digest, algo): + """Compare a newly computed binary digest `given_digest` with a digest + string (hex or base64) in `expected_digest`. + + :param bytes given_digest: + :param expected_digest: digest (as bytes) or hexlified or base64 encoded + digest (as str) + :type expected_digest: str or bytes or bytearray + :param algo: The algorithm (factory) + :return: `True` if the digests are equal, `False` if not + :rtype: bool + + """ + if isinstance(expected_digest, (bytes, bytearray)) \ + and len(expected_digest) == algo().digest_size: + exd = expected_digest + else: + if len(expected_digest) == algo().digest_size * 2: + # hex + if re.search(r"\A[a-fA-F0-9]+\Z", expected_digest): + try: + exd = binascii.unhexlify(expected_digest) + except TypeError: + return False + else: + return False + else: + # base64 + if re.search( + r"\A(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?\Z", + expected_digest): + try: + exd = base64.b64decode(expected_digest) + except TypeError: + return False + else: + return False + if compare_digest: + return compare_digest(given_digest, exd) + else: + return given_digest == exd + + +def verify_digests_with_checklist(opts): + dest = opts.dest or sys.stdout + exit_code = 0 + if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'): + if PY2: + if sys.platform == "win32": + import os, msvcrt # noqa: E401 + msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) + source = sys.stdin + else: + source = sys.stdin.buffer + pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, None) + if pl is None: + exit_code = 1 + print("-: MISSING", file=dest) + else: + tag, algo, cl_filename, cl_digest = pl + computed_digest = compute_digest_stream(algo, source) + if compare_digests_equal(computed_digest, cl_digest, algo): + res = "OK" + else: + res = "FAILED" + exit_code = 1 + print("{}: {}: {}".format(tag, "-", res), file=dest) + else: + for fn in opts.files: + pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, fn) + if pl is None: + print("{}: MISSING".format(fn), file=dest) + exit_code = 1 + else: + tag, algo, cl_filename, cl_digest = pl + computed_digest = compute_digest_file(algo, fn) + if compare_digests_equal(computed_digest, cl_digest, algo): + res = "OK" + else: + exit_code = 1 + res = "FAILED" + print("{}: {}: {}".format(tag, fn, res), file=dest) + return exit_code + + +def verify_digests_from_files(opts): + dest = opts.dest or sys.stdout + exit_code = 0 + if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'): + for checkline in sys.stdin: + if not checkline: + continue + r, fn, tag = handle_checkline(opts, checkline) + print("{}: {}: {}".format(tag, fn, r.upper()), file=dest) + if r != "ok" and exit_code == 0: + exit_code = 1 + else: + for fn in opts.files: + with io.open(fn, "rt", encoding="utf-8") as checkfile: + for checkline in checkfile: + if not checkline: + continue + r, fn, tag = handle_checkline(opts, checkline) + print("{}: {}: {}".format(tag, fn, r.upper()), file=dest) + if r != "ok" and exit_code == 0: + exit_code = 1 + return exit_code + + +def handle_checkline(opts, line): + """ + :return: a tuple with static "ok", "missing", or "failed", the filename and + the digest used + :rtype: tuple(str, str, str) + + """ + parts = parse_digest_line(opts, line) + if not parts: + raise ValueError( + "improperly formatted digest line: {}".format(line)) + tag, algo, fn, digest = parts + try: + d = compute_digest_file(algo, fn) + if compare_digests_equal(d, digest, algo): + return ("ok", fn, tag) + else: + return ("failed", fn, tag) + except EnvironmentError: + return ("missing", fn, tag) + + +def get_parsed_digest_line_from_checklist(checklist, opts, filename): + if filename is None: + filenames = ("-", "stdin", "", ) + else: + filenames = ( + normalize_filename(filename, strip_leading_dot_slash=True),) + with io.open(checklist, "rt", encoding="utf-8") as clf: + for checkline in clf: + if not checkline: + continue + parts = parse_digest_line(opts, checkline) + if not parts: + raise ValueError( + "improperly formatted digest line: {}".format(checkline)) + fn = normalize_filename(parts[2], strip_leading_dot_slash=True) + if fn in filenames: + return parts + else: + return None + + +def parse_digest_line(opts, line): + """Parse a `line` of a digest file and return its parts. + + :return: a tuple of the normalized algorithm tag, the algorithm + constructor, the filename and the hex digest; + if `line` cannot be parsed successfully `None` is returned + :rtype: tuple(str, obj, str, str) or None + + Handles coreutils and BSD-style file formats. + + """ + # determine checkfile format (BSD or coreutils) + # BSD? + mo = re.search(r"\A(\S+)\s*\((.*)\)\s*=\s*(.+)\n?\Z", line) + if mo: + # (tag, algorithm, filename, digest) + return (mo.group(1), + algotag2algotype(mo.group(1)), + mo.group(2), + mo.group(3)) + else: + # coreutils? + mo = re.search(r"([^\ ]+) [\*\ ]?(.+)\n?\Z", line) + if mo: + # (tag, algorithm, filename, digest) + return (opts.algorithm[1], + opts.algorithm[0], + mo.group(2), + mo.group(1)) + else: + return None + + +def argv2algo(s): + """Convert a command line algorithm specifier into a tuple with the + type/factory of the digest and the algorithms tag for output purposes. + + :param str s: the specifier from the commane line + :return: the internal digest specification + :rtype: a tuple (digest_type_or_factory, name_in_output) + + String comparisons are done case-insensitively. + + """ + s = s.lower() + if s in ("1", "sha1"): + return (hashlib.sha1, "SHA1") + elif s in ("224", "sha224"): + return (hashlib.sha224, "SHA224") + elif s in ("256", "sha256"): + return (hashlib.sha256, "SHA256") + elif s in ("384", "sha384"): + return (hashlib.sha384, "SHA384") + elif s in ("512", "sha512"): + return (hashlib.sha512, "SHA512") + elif s in ("3-224", "sha3-224"): + return (hashlib.sha3_224, "SHA3-224") + elif s in ("3-256", "sha3-256"): + return (hashlib.sha3_256, "SHA3-256") + elif s in ("3-384", "sha3-384"): + return (hashlib.sha3_384, "SHA3-384") + elif s in ("3-512", "sha3-512"): + return (hashlib.sha3_512, "SHA3-512") + elif s in ("blake2b", "blake2b-512"): + return (hashlib.blake2b, "BLAKE2b") + elif s in ("blake2s", "blake2s-256"): + return (hashlib.blake2s, "BLAKE2s") + elif s == "md5": + return (hashlib.md5, "MD5") + else: + raise argparse.ArgumentTypeError( + "`{}' is not a recognized algorithm".format(s)) + + +def algotag2algotype(s): + """Convert the algorithm specifier in a BSD-style digest file to the + type/factory of the corresponding algorithm. + + :param str s: the tag (i.e. normalized name) or the algorithm + :return: the digest type or factory for `s` + + All string comparisons are case-sensitive. + + """ + if s == "SHA1": + return hashlib.sha1 + elif s == "SHA224": + return hashlib.sha224 + elif s == "SHA256": + return hashlib.sha256 + elif s == "SHA384": + return hashlib.sha384 + elif s == "SHA512": + return hashlib.sha512 + elif s == "SHA3-224": + return hashlib.sha3_224 + elif s == "SHA3-256": + return hashlib.sha3_256 + elif s == "SHA3-384": + return hashlib.sha3_384 + elif s == "SHA3-512": + return hashlib.sha3_512 + elif s == "BLAKE2b": + return hashlib.blake2b + elif s == "BLAKE2s": + return hashlib.blake2s + elif s == "MD5": + return hashlib.md5 + else: + raise ValueError("unknown algorithm: {}".format(s)) + + +def out_bsd(dest, digest, filename, digestname, binary, use_base64): + """BSD format output, also :command:`openssl dgst` and + :command:`b2sum --tag" format output + + """ + if use_base64: + digest = base64.b64encode(digest).decode("ascii") + else: + digest = binascii.hexlify(digest).decode("ascii") + if filename is None: + print(digest, file=dest) + else: + print("{} ({}) = {}".format(digestname, + normalize_filename(filename), + digest), + file=dest) + + +def out_std(dest, digest, filename, digestname, binary, use_base64): + """Coreutils format (:command:`shasum` et al.) + + """ + if use_base64: + digest = base64.b64encode(digest).decode("ascii") + else: + digest = binascii.hexlify(digest).decode("ascii") + print("{} {}{}".format( + digest, + '*' if binary else ' ', + '-' if filename is None else normalize_filename(filename)), + file=dest) + + +def compute_digest_file(hashobj, path, use_mmap=True): + """ + :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory + :param path: filename within the filesystem or a file descriptor opened in + binary mode (also a socket or pipe) + :param bool use_mmap: use the :mod:`mmap` module if available + :return: the digest in binary form + :rtype: bytes + + If a file descriptor is given is must support :func:`os.read`. + + """ + h = hashobj() + if isinstance(path, PATH_TYPES): + flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) \ + | getattr(os, "O_SEQUENTIAL", 0) | getattr(os, "O_NOCTTY", 0) + fd = os.open(path, flags) + own_fd = True + else: + fd = path + own_fd = False + try: + try: + st = os.fstat(fd) + except TypeError: + # + # "fd" is most probably a Python socket object. + # (a pipe typically supports fstat) + # + use_mmap = False + else: + if stat.S_ISREG(st[stat.ST_MODE]): + filesize = st[stat.ST_SIZE] + else: + use_mmap = False + if mmap is None or not use_mmap: + # No mmmap available -> use traditional low-level file IO + while True: + try: + buf = os.read(fd, CHUNK_SIZE) + except OSError as e: + if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK): + raise + else: + if len(buf) == 0: + break + h.update(buf) + else: + # + # Use mmap + # + # NOTE: On Windows mmapped files with length 0 are not supported. + # So ensure to not call mmap.mmap() if the file size is 0. + # + madvise = getattr(mmap.mmap, "madvise", None) + if filesize < MAP_CHUNK_SIZE: + mapsize = filesize + else: + mapsize = MAP_CHUNK_SIZE + mapoffset = 0 + rest = filesize + while rest > 0: + m = mmap.mmap(fd, + mapsize, + access=mmap.ACCESS_READ, + offset=mapoffset) + if madvise: + madvise(m, mmap.MADV_SEQUENTIAL) + try: + h.update(m) + finally: + m.close() + rest -= mapsize + mapoffset += mapsize + if rest < mapsize: + mapsize = rest + finally: + if own_fd: + os.close(fd) + return h.digest() + + +def compute_digest_stream(hashobj, instream): + """ + + :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory + :param instream: a bytes input stream to read the data to be hashed from + :return: the digest in binary form + :rtype: bytes + + """ + h = hashobj() + while True: + try: + buf = instream.read(CHUNK_SIZE) + except OSError as e: + if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK): + raise + else: + if buf is not None: + if len(buf) == 0: + break + h.update(buf) + return h.digest() + + +def normalize_filename(filename, strip_leading_dot_slash=False): + filename = filename.replace("\\", "/") + if strip_leading_dot_slash: + while filename.startswith("./"): + filename = filename[2:] + return filename + + +if __name__ == "__main__": + sys.exit(main()) diff -r 29fb33aa639a -r ae2df602beb4 dos2unix.py --- a/dos2unix.py Sat Feb 26 18:55:57 2022 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,111 +0,0 @@ -r""" -:Author: Franz Glasner -:Copyright: (c) 2020-2022 Franz Glasner. - All rights reserved. -:License: BSD 3-Clause "New" or "Revised" License. - See :ref:`LICENSE ` for details. - If you cannot find LICENSE see - -:ID: @(#) $HGid$ - -""" - -from __future__ import print_function - - -try: - from _cutils import __version__ -except ImportError: - __version__ = "unknown" - -__revision__ = "|VCSRevision|" -__date__ = "|VCSJustDate|" - - -import argparse -import io -import sys - - -def main(argv=None): - aparser = argparse.ArgumentParser( - description="Python implementation of dos2unix", - fromfile_prefix_chars='@') - aparser.add_argument( - "--version", "-V", action="version", - version="%s (rv:%s)" % (__version__, __revision__)) - aparser.add_argument( - "--keepdate", "-k", action="store_true", - help="Keep the date stamp of output file same as input file.") - aparser.add_argument( - "--oldfile", "-o", action="store_false", dest="newfile", default=False, - help="Old file mode. Convert the file and write output to it." - " The program defaults to run in this mode." - " Wildcard names may be used. ") - aparser.add_argument( - "--newfile", "-n", action="store_true", dest="newfile", default=False, - help="New file mode. Convert the infile and write output to outfile." - " File names must be given in pairs and wildcard names should" - " NOT be used or you WILL lose your files.") - aparser.add_argument( - "--quiet", "-q", action="store_true", - help="Quiet mode. Suppress all warning and messages.") - - aparser.add_argument( - "files", nargs="+", metavar="FILE") - - opts = aparser.parse_args(args=argv) - - if opts.keepdate: - raise NotImplementedError("--keepdate, -k") - - return dos2unix(opts) - - -def gen_opts(files=[], newfile=False, keepdate=False, quiet=True): - if keepdate: - raise NotImplementedError("--keepdate, -k") - - if newfile and (len(files) % 2): - raise ValueError("need pairs of files") - - opts = argparse.Namespace(files=files, - newfile=newfile, - keepdate=keepdate, - quiet=quiet) - return opts - - -def dos2unix(opts): - if opts.newfile: - return _convert_copy(opts) - else: - return _convert_inplace(opts) - - -def _convert_inplace(opts): - lines = [] - for filename in opts.files: - with io.open(filename, "rt", encoding="iso-8859-1") as source: - for line in source: - lines.append(line.encode("iso-8859-1")) - with open(filename, "wb") as dest: - for line in lines: - dest.write(line) - - -def _convert_copy(opts): - if len(opts.files) % 2: - print("ERROR: need pairs of files", file=sys.stderr) - return 64 # :manpage:`sysexits(3)` EX_USAGE - idx = 0 - while idx < len(opts.files): - with io.open(opts.files[idx], "rt", encoding="iso-8859-1") as source: - with open(opts.files[idx+1], "wb") as dest: - for line in source: - dest.write(line.encode("iso-8859-1")) - idx += 2 - - -if __name__ == "__main__": - sys.exit(main()) diff -r 29fb33aa639a -r ae2df602beb4 setup.py --- a/setup.py Sat Feb 26 18:55:57 2022 +0100 +++ b/setup.py Sat Feb 26 19:20:20 2022 +0100 @@ -19,7 +19,7 @@ _version_re = re.compile(br"^\s*__version__\s*=\s*(\"|')(.*)\1\s*(#.*)?$", re.MULTILINE) -with open(os.path.join(pkg_root, "_cutils.py"), "rb") as vf: +with open(os.path.join(pkg_root, "cutils", "__init__.py"), "rb") as vf: version = _version_re.search(vf.read()).group(2).decode("utf-8") with open(os.path.join(pkg_root, "README.txt"), "rt") as rf: @@ -33,9 +33,7 @@ url="https://pypi.dom66.de/simple/py-cutils/", description="Pure Python implementation of some coreutils", long_description=long_description, - py_modules=["_cutils", - "dos2unix", - "shasum", ], + packages=["cutils",], include_package_data=False, zip_safe=True, platforms="any", @@ -55,8 +53,8 @@ python_requires=">=2.7", entry_points={ "console_scripts": [ - "py-dos2unix=dos2unix:main", - "py-shasum=shasum:main", + "py-dos2unix=cutils.dos2unix:main", + "py-shasum=cutils.shasum:main", ] } ) diff -r 29fb33aa639a -r ae2df602beb4 shasum.py --- a/shasum.py Sat Feb 26 18:55:57 2022 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,596 +0,0 @@ -r""" -:Author: Franz Glasner -:Copyright: (c) 2020-2022 Franz Glasner. - All rights reserved. -:License: BSD 3-Clause "New" or "Revised" License. - See :ref:`LICENSE ` for details. - If you cannot find LICENSE see - -:ID: @(#) $HGid$ - -""" - -from __future__ import print_function - - -try: - from _cutils import __version__ -except ImportError: - __version__ = "unknown" - -__revision__ = "|VCSRevision|" -__date__ = "|VCSJustDate|" - - -import argparse -import base64 -import binascii -import errno -import hashlib -try: - from hmac import compare_digest -except ImportError: - compare_digest = None -import io -try: - import mmap -except ImportError: - mmap = None -import os -try: - import pathlib -except ImportError: - pathlib = None -import re -import stat -import sys - - -PY2 = sys.version_info[0] < 3 - -if PY2: - PATH_TYPES = (unicode, str) # noqa: F821 (undefined name 'unicode') -else: - if pathlib: - PATH_TYPES = (str, bytes, pathlib.Path) - else: - PATH_TYPES = (str, bytes) - -CHUNK_SIZE = 1024*1024 -MAP_CHUNK_SIZE = 64*1024*1024 - - -def main(argv=None): - aparser = argparse.ArgumentParser( - description="Python implementation of shasum", - fromfile_prefix_chars='@') - aparser.add_argument( - "--algorithm", "-a", action="store", type=argv2algo, - help="1 (default), 224, 256, 384, 512, 3-224, 3-256, 3-384, 3-512, blake2b, blake2s, md5") - aparser.add_argument( - "--base64", action="store_true", - help="Output checksums in base64 notation, not hexadecimal (OpenBSD).") - aparser.add_argument( - "--binary", "-b", action="store_false", dest="text_mode", default=False, - help="Read in binary mode (default)") - aparser.add_argument( - "--bsd", "-B", action="store_true", dest="bsd", default=False, - help="Write BSD style output. This is also the default output format of :command:`openssl dgst`.") - aparser.add_argument( - "--check", "-c", action="store_true", - help="""Read digests from FILEs and check them. -If this option is specified, the FILE options become checklists. Each -checklist should contain hash results in a supported format, which will -be verified against the specified paths. Output consists of the digest -used, the file name, and an OK, FAILED, or MISSING for the result of -the comparison. This will validate any of the supported checksums. -If no file is given, stdin is used.""") - aparser.add_argument( - "--checklist", "-C", metavar="CHECKLIST", - help="""Compare the checksum of each FILE against the checksums in -the CHECKLIST. Any specified FILE that is not listed in the CHECKLIST will -generate an error.""") - - aparser.add_argument( - "--reverse", "-r", action="store_false", dest="bsd", default=False, - help="Explicitely select normal coreutils style output (to be option compatible with BSD style commands and :command:`openssl dgst -r`)") - aparser.add_argument( - "--tag", action="store_true", dest="bsd", default=False, - help="Alias for the `--bsd' option (to be compatible with :command:`b2sum`)") - aparser.add_argument( - "--text", "-t", action="store_true", dest="text_mode", default=False, - help="Read in text mode (not supported)") - aparser.add_argument( - "--version", "-v", action="version", version="%s (rv:%s)" % (__version__, __revision__)) - aparser.add_argument( - "files", nargs="*", metavar="FILE") - - opts = aparser.parse_args(args=argv) - - if opts.text_mode: - print("ERROR: text mode not supported", file=sys.stderr) - sys.exit(78) # :manpage:`sysexits(3)` EX_CONFIG - - if opts.check and opts.checklist: - print("ERROR: only one of --check or --checklist allowed", - file=sys.stderr) - sys.exit(64) # :manpage:`sysexits(3)` EX_USAGE - - if not opts.algorithm: - opts.algorithm = argv2algo("1") - - opts.dest = None - - return shasum(opts) - - -def gen_opts(files=[], algorithm="SHA1", bsd=False, text_mode=False, - checklist=False, check=False, dest=None, base64=False): - if text_mode: - raise ValueError("text mode not supported") - if checklist and check: - raise ValueError("only one of `checklist' or `check' is allowed") - opts = argparse.Namespace(files=files, - algorithm=(algotag2algotype(algorithm), - algorithm), - bsd=bsd, - checklist=checklist, - check=check, - text_mode=False, - dest=dest, - base64=base64) - return opts - - -def shasum(opts): - if opts.check: - return verify_digests_from_files(opts) - elif opts.checklist: - return verify_digests_with_checklist(opts) - else: - return generate_digests(opts) - - -def generate_digests(opts): - if opts.bsd: - out = out_bsd - else: - out = out_std - if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'): - if PY2: - if sys.platform == "win32": - import os, msvcrt # noqa: E401 - msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) - source = sys.stdin - else: - source = sys.stdin.buffer - out(sys.stdout, - compute_digest_stream(opts.algorithm[0], source), - None, - opts.algorithm[1], - True, - opts.base64) - else: - for fn in opts.files: - out(opts.dest or sys.stdout, - compute_digest_file(opts.algorithm[0], fn), - fn, - opts.algorithm[1], - True, - opts.base64) - return 0 - - -def compare_digests_equal(given_digest, expected_digest, algo): - """Compare a newly computed binary digest `given_digest` with a digest - string (hex or base64) in `expected_digest`. - - :param bytes given_digest: - :param expected_digest: digest (as bytes) or hexlified or base64 encoded - digest (as str) - :type expected_digest: str or bytes or bytearray - :param algo: The algorithm (factory) - :return: `True` if the digests are equal, `False` if not - :rtype: bool - - """ - if isinstance(expected_digest, (bytes, bytearray)) \ - and len(expected_digest) == algo().digest_size: - exd = expected_digest - else: - if len(expected_digest) == algo().digest_size * 2: - # hex - if re.search(r"\A[a-fA-F0-9]+\Z", expected_digest): - try: - exd = binascii.unhexlify(expected_digest) - except TypeError: - return False - else: - return False - else: - # base64 - if re.search( - r"\A(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?\Z", - expected_digest): - try: - exd = base64.b64decode(expected_digest) - except TypeError: - return False - else: - return False - if compare_digest: - return compare_digest(given_digest, exd) - else: - return given_digest == exd - - -def verify_digests_with_checklist(opts): - dest = opts.dest or sys.stdout - exit_code = 0 - if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'): - if PY2: - if sys.platform == "win32": - import os, msvcrt # noqa: E401 - msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) - source = sys.stdin - else: - source = sys.stdin.buffer - pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, None) - if pl is None: - exit_code = 1 - print("-: MISSING", file=dest) - else: - tag, algo, cl_filename, cl_digest = pl - computed_digest = compute_digest_stream(algo, source) - if compare_digests_equal(computed_digest, cl_digest, algo): - res = "OK" - else: - res = "FAILED" - exit_code = 1 - print("{}: {}: {}".format(tag, "-", res), file=dest) - else: - for fn in opts.files: - pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, fn) - if pl is None: - print("{}: MISSING".format(fn), file=dest) - exit_code = 1 - else: - tag, algo, cl_filename, cl_digest = pl - computed_digest = compute_digest_file(algo, fn) - if compare_digests_equal(computed_digest, cl_digest, algo): - res = "OK" - else: - exit_code = 1 - res = "FAILED" - print("{}: {}: {}".format(tag, fn, res), file=dest) - return exit_code - - -def verify_digests_from_files(opts): - dest = opts.dest or sys.stdout - exit_code = 0 - if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'): - for checkline in sys.stdin: - if not checkline: - continue - r, fn, tag = handle_checkline(opts, checkline) - print("{}: {}: {}".format(tag, fn, r.upper()), file=dest) - if r != "ok" and exit_code == 0: - exit_code = 1 - else: - for fn in opts.files: - with io.open(fn, "rt", encoding="utf-8") as checkfile: - for checkline in checkfile: - if not checkline: - continue - r, fn, tag = handle_checkline(opts, checkline) - print("{}: {}: {}".format(tag, fn, r.upper()), file=dest) - if r != "ok" and exit_code == 0: - exit_code = 1 - return exit_code - - -def handle_checkline(opts, line): - """ - :return: a tuple with static "ok", "missing", or "failed", the filename and - the digest used - :rtype: tuple(str, str, str) - - """ - parts = parse_digest_line(opts, line) - if not parts: - raise ValueError( - "improperly formatted digest line: {}".format(line)) - tag, algo, fn, digest = parts - try: - d = compute_digest_file(algo, fn) - if compare_digests_equal(d, digest, algo): - return ("ok", fn, tag) - else: - return ("failed", fn, tag) - except EnvironmentError: - return ("missing", fn, tag) - - -def get_parsed_digest_line_from_checklist(checklist, opts, filename): - if filename is None: - filenames = ("-", "stdin", "", ) - else: - filenames = ( - normalize_filename(filename, strip_leading_dot_slash=True),) - with io.open(checklist, "rt", encoding="utf-8") as clf: - for checkline in clf: - if not checkline: - continue - parts = parse_digest_line(opts, checkline) - if not parts: - raise ValueError( - "improperly formatted digest line: {}".format(checkline)) - fn = normalize_filename(parts[2], strip_leading_dot_slash=True) - if fn in filenames: - return parts - else: - return None - - -def parse_digest_line(opts, line): - """Parse a `line` of a digest file and return its parts. - - :return: a tuple of the normalized algorithm tag, the algorithm - constructor, the filename and the hex digest; - if `line` cannot be parsed successfully `None` is returned - :rtype: tuple(str, obj, str, str) or None - - Handles coreutils and BSD-style file formats. - - """ - # determine checkfile format (BSD or coreutils) - # BSD? - mo = re.search(r"\A(\S+)\s*\((.*)\)\s*=\s*(.+)\n?\Z", line) - if mo: - # (tag, algorithm, filename, digest) - return (mo.group(1), - algotag2algotype(mo.group(1)), - mo.group(2), - mo.group(3)) - else: - # coreutils? - mo = re.search(r"([^\ ]+) [\*\ ]?(.+)\n?\Z", line) - if mo: - # (tag, algorithm, filename, digest) - return (opts.algorithm[1], - opts.algorithm[0], - mo.group(2), - mo.group(1)) - else: - return None - - -def argv2algo(s): - """Convert a command line algorithm specifier into a tuple with the - type/factory of the digest and the algorithms tag for output purposes. - - :param str s: the specifier from the commane line - :return: the internal digest specification - :rtype: a tuple (digest_type_or_factory, name_in_output) - - String comparisons are done case-insensitively. - - """ - s = s.lower() - if s in ("1", "sha1"): - return (hashlib.sha1, "SHA1") - elif s in ("224", "sha224"): - return (hashlib.sha224, "SHA224") - elif s in ("256", "sha256"): - return (hashlib.sha256, "SHA256") - elif s in ("384", "sha384"): - return (hashlib.sha384, "SHA384") - elif s in ("512", "sha512"): - return (hashlib.sha512, "SHA512") - elif s in ("3-224", "sha3-224"): - return (hashlib.sha3_224, "SHA3-224") - elif s in ("3-256", "sha3-256"): - return (hashlib.sha3_256, "SHA3-256") - elif s in ("3-384", "sha3-384"): - return (hashlib.sha3_384, "SHA3-384") - elif s in ("3-512", "sha3-512"): - return (hashlib.sha3_512, "SHA3-512") - elif s in ("blake2b", "blake2b-512"): - return (hashlib.blake2b, "BLAKE2b") - elif s in ("blake2s", "blake2s-256"): - return (hashlib.blake2s, "BLAKE2s") - elif s == "md5": - return (hashlib.md5, "MD5") - else: - raise argparse.ArgumentTypeError( - "`{}' is not a recognized algorithm".format(s)) - - -def algotag2algotype(s): - """Convert the algorithm specifier in a BSD-style digest file to the - type/factory of the corresponding algorithm. - - :param str s: the tag (i.e. normalized name) or the algorithm - :return: the digest type or factory for `s` - - All string comparisons are case-sensitive. - - """ - if s == "SHA1": - return hashlib.sha1 - elif s == "SHA224": - return hashlib.sha224 - elif s == "SHA256": - return hashlib.sha256 - elif s == "SHA384": - return hashlib.sha384 - elif s == "SHA512": - return hashlib.sha512 - elif s == "SHA3-224": - return hashlib.sha3_224 - elif s == "SHA3-256": - return hashlib.sha3_256 - elif s == "SHA3-384": - return hashlib.sha3_384 - elif s == "SHA3-512": - return hashlib.sha3_512 - elif s == "BLAKE2b": - return hashlib.blake2b - elif s == "BLAKE2s": - return hashlib.blake2s - elif s == "MD5": - return hashlib.md5 - else: - raise ValueError("unknown algorithm: {}".format(s)) - - -def out_bsd(dest, digest, filename, digestname, binary, use_base64): - """BSD format output, also :command:`openssl dgst` and - :command:`b2sum --tag" format output - - """ - if use_base64: - digest = base64.b64encode(digest).decode("ascii") - else: - digest = binascii.hexlify(digest).decode("ascii") - if filename is None: - print(digest, file=dest) - else: - print("{} ({}) = {}".format(digestname, - normalize_filename(filename), - digest), - file=dest) - - -def out_std(dest, digest, filename, digestname, binary, use_base64): - """Coreutils format (:command:`shasum` et al.) - - """ - if use_base64: - digest = base64.b64encode(digest).decode("ascii") - else: - digest = binascii.hexlify(digest).decode("ascii") - print("{} {}{}".format( - digest, - '*' if binary else ' ', - '-' if filename is None else normalize_filename(filename)), - file=dest) - - -def compute_digest_file(hashobj, path, use_mmap=True): - """ - :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory - :param path: filename within the filesystem or a file descriptor opened in - binary mode (also a socket or pipe) - :param bool use_mmap: use the :mod:`mmap` module if available - :return: the digest in binary form - :rtype: bytes - - If a file descriptor is given is must support :func:`os.read`. - - """ - h = hashobj() - if isinstance(path, PATH_TYPES): - flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) \ - | getattr(os, "O_SEQUENTIAL", 0) | getattr(os, "O_NOCTTY", 0) - fd = os.open(path, flags) - own_fd = True - else: - fd = path - own_fd = False - try: - try: - st = os.fstat(fd) - except TypeError: - # - # "fd" is most probably a Python socket object. - # (a pipe typically supports fstat) - # - use_mmap = False - else: - if stat.S_ISREG(st[stat.ST_MODE]): - filesize = st[stat.ST_SIZE] - else: - use_mmap = False - if mmap is None or not use_mmap: - # No mmmap available -> use traditional low-level file IO - while True: - try: - buf = os.read(fd, CHUNK_SIZE) - except OSError as e: - if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK): - raise - else: - if len(buf) == 0: - break - h.update(buf) - else: - # - # Use mmap - # - # NOTE: On Windows mmapped files with length 0 are not supported. - # So ensure to not call mmap.mmap() if the file size is 0. - # - madvise = getattr(mmap.mmap, "madvise", None) - if filesize < MAP_CHUNK_SIZE: - mapsize = filesize - else: - mapsize = MAP_CHUNK_SIZE - mapoffset = 0 - rest = filesize - while rest > 0: - m = mmap.mmap(fd, - mapsize, - access=mmap.ACCESS_READ, - offset=mapoffset) - if madvise: - madvise(m, mmap.MADV_SEQUENTIAL) - try: - h.update(m) - finally: - m.close() - rest -= mapsize - mapoffset += mapsize - if rest < mapsize: - mapsize = rest - finally: - if own_fd: - os.close(fd) - return h.digest() - - -def compute_digest_stream(hashobj, instream): - """ - - :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory - :param instream: a bytes input stream to read the data to be hashed from - :return: the digest in binary form - :rtype: bytes - - """ - h = hashobj() - while True: - try: - buf = instream.read(CHUNK_SIZE) - except OSError as e: - if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK): - raise - else: - if buf is not None: - if len(buf) == 0: - break - h.update(buf) - return h.digest() - - -def normalize_filename(filename, strip_leading_dot_slash=False): - filename = filename.replace("\\", "/") - if strip_leading_dot_slash: - while filename.startswith("./"): - filename = filename[2:] - return filename - - -if __name__ == "__main__": - sys.exit(main())