comparison cutils/shasum.py @ 72:ae2df602beb4

Make shasum.py and dos2unix sub-modules to the new "cutils" package
author Franz Glasner <fzglas.hg@dom66.de>
date Sat, 26 Feb 2022 19:20:20 +0100
parents shasum.py@29fb33aa639a
children c3268f4e752f
comparison
equal deleted inserted replaced
71:29fb33aa639a 72:ae2df602beb4
1 r"""
2 :Author: Franz Glasner
3 :Copyright: (c) 2020-2022 Franz Glasner.
4 All rights reserved.
5 :License: BSD 3-Clause "New" or "Revised" License.
6 See :ref:`LICENSE <license>` for details.
7 If you cannot find LICENSE see
8 <https://opensource.org/licenses/BSD-3-Clause>
9 :ID: @(#) $HGid$
10
11 """
12
13 from __future__ import print_function, absolute_import
14
15 from . import (__version__, __revision__, __date__)
16
17 import argparse
18 import base64
19 import binascii
20 import errno
21 import hashlib
22 try:
23 from hmac import compare_digest
24 except ImportError:
25 compare_digest = None
26 import io
27 try:
28 import mmap
29 except ImportError:
30 mmap = None
31 import os
32 try:
33 import pathlib
34 except ImportError:
35 pathlib = None
36 import re
37 import stat
38 import sys
39
40
41 PY2 = sys.version_info[0] < 3
42
43 if PY2:
44 PATH_TYPES = (unicode, str) # noqa: F821 (undefined name 'unicode')
45 else:
46 if pathlib:
47 PATH_TYPES = (str, bytes, pathlib.Path)
48 else:
49 PATH_TYPES = (str, bytes)
50
51 CHUNK_SIZE = 1024*1024
52 MAP_CHUNK_SIZE = 64*1024*1024
53
54
55 def main(argv=None):
56 aparser = argparse.ArgumentParser(
57 description="Python implementation of shasum",
58 fromfile_prefix_chars='@')
59 aparser.add_argument(
60 "--algorithm", "-a", action="store", type=argv2algo,
61 help="1 (default), 224, 256, 384, 512, 3-224, 3-256, 3-384, 3-512, blake2b, blake2s, md5")
62 aparser.add_argument(
63 "--base64", action="store_true",
64 help="Output checksums in base64 notation, not hexadecimal (OpenBSD).")
65 aparser.add_argument(
66 "--binary", "-b", action="store_false", dest="text_mode", default=False,
67 help="Read in binary mode (default)")
68 aparser.add_argument(
69 "--bsd", "-B", action="store_true", dest="bsd", default=False,
70 help="Write BSD style output. This is also the default output format of :command:`openssl dgst`.")
71 aparser.add_argument(
72 "--check", "-c", action="store_true",
73 help="""Read digests from FILEs and check them.
74 If this option is specified, the FILE options become checklists. Each
75 checklist should contain hash results in a supported format, which will
76 be verified against the specified paths. Output consists of the digest
77 used, the file name, and an OK, FAILED, or MISSING for the result of
78 the comparison. This will validate any of the supported checksums.
79 If no file is given, stdin is used.""")
80 aparser.add_argument(
81 "--checklist", "-C", metavar="CHECKLIST",
82 help="""Compare the checksum of each FILE against the checksums in
83 the CHECKLIST. Any specified FILE that is not listed in the CHECKLIST will
84 generate an error.""")
85
86 aparser.add_argument(
87 "--reverse", "-r", action="store_false", dest="bsd", default=False,
88 help="Explicitely select normal coreutils style output (to be option compatible with BSD style commands and :command:`openssl dgst -r`)")
89 aparser.add_argument(
90 "--tag", action="store_true", dest="bsd", default=False,
91 help="Alias for the `--bsd' option (to be compatible with :command:`b2sum`)")
92 aparser.add_argument(
93 "--text", "-t", action="store_true", dest="text_mode", default=False,
94 help="Read in text mode (not supported)")
95 aparser.add_argument(
96 "--version", "-v", action="version", version="%s (rv:%s)" % (__version__, __revision__))
97 aparser.add_argument(
98 "files", nargs="*", metavar="FILE")
99
100 opts = aparser.parse_args(args=argv)
101
102 if opts.text_mode:
103 print("ERROR: text mode not supported", file=sys.stderr)
104 sys.exit(78) # :manpage:`sysexits(3)` EX_CONFIG
105
106 if opts.check and opts.checklist:
107 print("ERROR: only one of --check or --checklist allowed",
108 file=sys.stderr)
109 sys.exit(64) # :manpage:`sysexits(3)` EX_USAGE
110
111 if not opts.algorithm:
112 opts.algorithm = argv2algo("1")
113
114 opts.dest = None
115
116 return shasum(opts)
117
118
119 def gen_opts(files=[], algorithm="SHA1", bsd=False, text_mode=False,
120 checklist=False, check=False, dest=None, base64=False):
121 if text_mode:
122 raise ValueError("text mode not supported")
123 if checklist and check:
124 raise ValueError("only one of `checklist' or `check' is allowed")
125 opts = argparse.Namespace(files=files,
126 algorithm=(algotag2algotype(algorithm),
127 algorithm),
128 bsd=bsd,
129 checklist=checklist,
130 check=check,
131 text_mode=False,
132 dest=dest,
133 base64=base64)
134 return opts
135
136
137 def shasum(opts):
138 if opts.check:
139 return verify_digests_from_files(opts)
140 elif opts.checklist:
141 return verify_digests_with_checklist(opts)
142 else:
143 return generate_digests(opts)
144
145
146 def generate_digests(opts):
147 if opts.bsd:
148 out = out_bsd
149 else:
150 out = out_std
151 if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
152 if PY2:
153 if sys.platform == "win32":
154 import os, msvcrt # noqa: E401
155 msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
156 source = sys.stdin
157 else:
158 source = sys.stdin.buffer
159 out(sys.stdout,
160 compute_digest_stream(opts.algorithm[0], source),
161 None,
162 opts.algorithm[1],
163 True,
164 opts.base64)
165 else:
166 for fn in opts.files:
167 out(opts.dest or sys.stdout,
168 compute_digest_file(opts.algorithm[0], fn),
169 fn,
170 opts.algorithm[1],
171 True,
172 opts.base64)
173 return 0
174
175
176 def compare_digests_equal(given_digest, expected_digest, algo):
177 """Compare a newly computed binary digest `given_digest` with a digest
178 string (hex or base64) in `expected_digest`.
179
180 :param bytes given_digest:
181 :param expected_digest: digest (as bytes) or hexlified or base64 encoded
182 digest (as str)
183 :type expected_digest: str or bytes or bytearray
184 :param algo: The algorithm (factory)
185 :return: `True` if the digests are equal, `False` if not
186 :rtype: bool
187
188 """
189 if isinstance(expected_digest, (bytes, bytearray)) \
190 and len(expected_digest) == algo().digest_size:
191 exd = expected_digest
192 else:
193 if len(expected_digest) == algo().digest_size * 2:
194 # hex
195 if re.search(r"\A[a-fA-F0-9]+\Z", expected_digest):
196 try:
197 exd = binascii.unhexlify(expected_digest)
198 except TypeError:
199 return False
200 else:
201 return False
202 else:
203 # base64
204 if re.search(
205 r"\A(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?\Z",
206 expected_digest):
207 try:
208 exd = base64.b64decode(expected_digest)
209 except TypeError:
210 return False
211 else:
212 return False
213 if compare_digest:
214 return compare_digest(given_digest, exd)
215 else:
216 return given_digest == exd
217
218
219 def verify_digests_with_checklist(opts):
220 dest = opts.dest or sys.stdout
221 exit_code = 0
222 if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
223 if PY2:
224 if sys.platform == "win32":
225 import os, msvcrt # noqa: E401
226 msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
227 source = sys.stdin
228 else:
229 source = sys.stdin.buffer
230 pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, None)
231 if pl is None:
232 exit_code = 1
233 print("-: MISSING", file=dest)
234 else:
235 tag, algo, cl_filename, cl_digest = pl
236 computed_digest = compute_digest_stream(algo, source)
237 if compare_digests_equal(computed_digest, cl_digest, algo):
238 res = "OK"
239 else:
240 res = "FAILED"
241 exit_code = 1
242 print("{}: {}: {}".format(tag, "-", res), file=dest)
243 else:
244 for fn in opts.files:
245 pl = get_parsed_digest_line_from_checklist(opts.checklist, opts, fn)
246 if pl is None:
247 print("{}: MISSING".format(fn), file=dest)
248 exit_code = 1
249 else:
250 tag, algo, cl_filename, cl_digest = pl
251 computed_digest = compute_digest_file(algo, fn)
252 if compare_digests_equal(computed_digest, cl_digest, algo):
253 res = "OK"
254 else:
255 exit_code = 1
256 res = "FAILED"
257 print("{}: {}: {}".format(tag, fn, res), file=dest)
258 return exit_code
259
260
261 def verify_digests_from_files(opts):
262 dest = opts.dest or sys.stdout
263 exit_code = 0
264 if not opts.files or (len(opts.files) == 1 and opts.files[0] == '-'):
265 for checkline in sys.stdin:
266 if not checkline:
267 continue
268 r, fn, tag = handle_checkline(opts, checkline)
269 print("{}: {}: {}".format(tag, fn, r.upper()), file=dest)
270 if r != "ok" and exit_code == 0:
271 exit_code = 1
272 else:
273 for fn in opts.files:
274 with io.open(fn, "rt", encoding="utf-8") as checkfile:
275 for checkline in checkfile:
276 if not checkline:
277 continue
278 r, fn, tag = handle_checkline(opts, checkline)
279 print("{}: {}: {}".format(tag, fn, r.upper()), file=dest)
280 if r != "ok" and exit_code == 0:
281 exit_code = 1
282 return exit_code
283
284
285 def handle_checkline(opts, line):
286 """
287 :return: a tuple with static "ok", "missing", or "failed", the filename and
288 the digest used
289 :rtype: tuple(str, str, str)
290
291 """
292 parts = parse_digest_line(opts, line)
293 if not parts:
294 raise ValueError(
295 "improperly formatted digest line: {}".format(line))
296 tag, algo, fn, digest = parts
297 try:
298 d = compute_digest_file(algo, fn)
299 if compare_digests_equal(d, digest, algo):
300 return ("ok", fn, tag)
301 else:
302 return ("failed", fn, tag)
303 except EnvironmentError:
304 return ("missing", fn, tag)
305
306
307 def get_parsed_digest_line_from_checklist(checklist, opts, filename):
308 if filename is None:
309 filenames = ("-", "stdin", "", )
310 else:
311 filenames = (
312 normalize_filename(filename, strip_leading_dot_slash=True),)
313 with io.open(checklist, "rt", encoding="utf-8") as clf:
314 for checkline in clf:
315 if not checkline:
316 continue
317 parts = parse_digest_line(opts, checkline)
318 if not parts:
319 raise ValueError(
320 "improperly formatted digest line: {}".format(checkline))
321 fn = normalize_filename(parts[2], strip_leading_dot_slash=True)
322 if fn in filenames:
323 return parts
324 else:
325 return None
326
327
328 def parse_digest_line(opts, line):
329 """Parse a `line` of a digest file and return its parts.
330
331 :return: a tuple of the normalized algorithm tag, the algorithm
332 constructor, the filename and the hex digest;
333 if `line` cannot be parsed successfully `None` is returned
334 :rtype: tuple(str, obj, str, str) or None
335
336 Handles coreutils and BSD-style file formats.
337
338 """
339 # determine checkfile format (BSD or coreutils)
340 # BSD?
341 mo = re.search(r"\A(\S+)\s*\((.*)\)\s*=\s*(.+)\n?\Z", line)
342 if mo:
343 # (tag, algorithm, filename, digest)
344 return (mo.group(1),
345 algotag2algotype(mo.group(1)),
346 mo.group(2),
347 mo.group(3))
348 else:
349 # coreutils?
350 mo = re.search(r"([^\ ]+) [\*\ ]?(.+)\n?\Z", line)
351 if mo:
352 # (tag, algorithm, filename, digest)
353 return (opts.algorithm[1],
354 opts.algorithm[0],
355 mo.group(2),
356 mo.group(1))
357 else:
358 return None
359
360
361 def argv2algo(s):
362 """Convert a command line algorithm specifier into a tuple with the
363 type/factory of the digest and the algorithms tag for output purposes.
364
365 :param str s: the specifier from the commane line
366 :return: the internal digest specification
367 :rtype: a tuple (digest_type_or_factory, name_in_output)
368
369 String comparisons are done case-insensitively.
370
371 """
372 s = s.lower()
373 if s in ("1", "sha1"):
374 return (hashlib.sha1, "SHA1")
375 elif s in ("224", "sha224"):
376 return (hashlib.sha224, "SHA224")
377 elif s in ("256", "sha256"):
378 return (hashlib.sha256, "SHA256")
379 elif s in ("384", "sha384"):
380 return (hashlib.sha384, "SHA384")
381 elif s in ("512", "sha512"):
382 return (hashlib.sha512, "SHA512")
383 elif s in ("3-224", "sha3-224"):
384 return (hashlib.sha3_224, "SHA3-224")
385 elif s in ("3-256", "sha3-256"):
386 return (hashlib.sha3_256, "SHA3-256")
387 elif s in ("3-384", "sha3-384"):
388 return (hashlib.sha3_384, "SHA3-384")
389 elif s in ("3-512", "sha3-512"):
390 return (hashlib.sha3_512, "SHA3-512")
391 elif s in ("blake2b", "blake2b-512"):
392 return (hashlib.blake2b, "BLAKE2b")
393 elif s in ("blake2s", "blake2s-256"):
394 return (hashlib.blake2s, "BLAKE2s")
395 elif s == "md5":
396 return (hashlib.md5, "MD5")
397 else:
398 raise argparse.ArgumentTypeError(
399 "`{}' is not a recognized algorithm".format(s))
400
401
402 def algotag2algotype(s):
403 """Convert the algorithm specifier in a BSD-style digest file to the
404 type/factory of the corresponding algorithm.
405
406 :param str s: the tag (i.e. normalized name) or the algorithm
407 :return: the digest type or factory for `s`
408
409 All string comparisons are case-sensitive.
410
411 """
412 if s == "SHA1":
413 return hashlib.sha1
414 elif s == "SHA224":
415 return hashlib.sha224
416 elif s == "SHA256":
417 return hashlib.sha256
418 elif s == "SHA384":
419 return hashlib.sha384
420 elif s == "SHA512":
421 return hashlib.sha512
422 elif s == "SHA3-224":
423 return hashlib.sha3_224
424 elif s == "SHA3-256":
425 return hashlib.sha3_256
426 elif s == "SHA3-384":
427 return hashlib.sha3_384
428 elif s == "SHA3-512":
429 return hashlib.sha3_512
430 elif s == "BLAKE2b":
431 return hashlib.blake2b
432 elif s == "BLAKE2s":
433 return hashlib.blake2s
434 elif s == "MD5":
435 return hashlib.md5
436 else:
437 raise ValueError("unknown algorithm: {}".format(s))
438
439
440 def out_bsd(dest, digest, filename, digestname, binary, use_base64):
441 """BSD format output, also :command:`openssl dgst` and
442 :command:`b2sum --tag" format output
443
444 """
445 if use_base64:
446 digest = base64.b64encode(digest).decode("ascii")
447 else:
448 digest = binascii.hexlify(digest).decode("ascii")
449 if filename is None:
450 print(digest, file=dest)
451 else:
452 print("{} ({}) = {}".format(digestname,
453 normalize_filename(filename),
454 digest),
455 file=dest)
456
457
458 def out_std(dest, digest, filename, digestname, binary, use_base64):
459 """Coreutils format (:command:`shasum` et al.)
460
461 """
462 if use_base64:
463 digest = base64.b64encode(digest).decode("ascii")
464 else:
465 digest = binascii.hexlify(digest).decode("ascii")
466 print("{} {}{}".format(
467 digest,
468 '*' if binary else ' ',
469 '-' if filename is None else normalize_filename(filename)),
470 file=dest)
471
472
473 def compute_digest_file(hashobj, path, use_mmap=True):
474 """
475 :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory
476 :param path: filename within the filesystem or a file descriptor opened in
477 binary mode (also a socket or pipe)
478 :param bool use_mmap: use the :mod:`mmap` module if available
479 :return: the digest in binary form
480 :rtype: bytes
481
482 If a file descriptor is given is must support :func:`os.read`.
483
484 """
485 h = hashobj()
486 if isinstance(path, PATH_TYPES):
487 flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) \
488 | getattr(os, "O_SEQUENTIAL", 0) | getattr(os, "O_NOCTTY", 0)
489 fd = os.open(path, flags)
490 own_fd = True
491 else:
492 fd = path
493 own_fd = False
494 try:
495 try:
496 st = os.fstat(fd)
497 except TypeError:
498 #
499 # "fd" is most probably a Python socket object.
500 # (a pipe typically supports fstat)
501 #
502 use_mmap = False
503 else:
504 if stat.S_ISREG(st[stat.ST_MODE]):
505 filesize = st[stat.ST_SIZE]
506 else:
507 use_mmap = False
508 if mmap is None or not use_mmap:
509 # No mmmap available -> use traditional low-level file IO
510 while True:
511 try:
512 buf = os.read(fd, CHUNK_SIZE)
513 except OSError as e:
514 if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
515 raise
516 else:
517 if len(buf) == 0:
518 break
519 h.update(buf)
520 else:
521 #
522 # Use mmap
523 #
524 # NOTE: On Windows mmapped files with length 0 are not supported.
525 # So ensure to not call mmap.mmap() if the file size is 0.
526 #
527 madvise = getattr(mmap.mmap, "madvise", None)
528 if filesize < MAP_CHUNK_SIZE:
529 mapsize = filesize
530 else:
531 mapsize = MAP_CHUNK_SIZE
532 mapoffset = 0
533 rest = filesize
534 while rest > 0:
535 m = mmap.mmap(fd,
536 mapsize,
537 access=mmap.ACCESS_READ,
538 offset=mapoffset)
539 if madvise:
540 madvise(m, mmap.MADV_SEQUENTIAL)
541 try:
542 h.update(m)
543 finally:
544 m.close()
545 rest -= mapsize
546 mapoffset += mapsize
547 if rest < mapsize:
548 mapsize = rest
549 finally:
550 if own_fd:
551 os.close(fd)
552 return h.digest()
553
554
555 def compute_digest_stream(hashobj, instream):
556 """
557
558 :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory
559 :param instream: a bytes input stream to read the data to be hashed from
560 :return: the digest in binary form
561 :rtype: bytes
562
563 """
564 h = hashobj()
565 while True:
566 try:
567 buf = instream.read(CHUNK_SIZE)
568 except OSError as e:
569 if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK):
570 raise
571 else:
572 if buf is not None:
573 if len(buf) == 0:
574 break
575 h.update(buf)
576 return h.digest()
577
578
579 def normalize_filename(filename, strip_leading_dot_slash=False):
580 filename = filename.replace("\\", "/")
581 if strip_leading_dot_slash:
582 while filename.startswith("./"):
583 filename = filename[2:]
584 return filename
585
586
587 if __name__ == "__main__":
588 sys.exit(main())