Mercurial > hgrepos > Python > apps > py-cutils
view cutils/treesum.py @ 198:c1e875ba4bdc
Put the effective filesystem encoding into the treesum digest file using FSENCODING = <encoding>
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Thu, 16 Jan 2025 23:18:04 +0100 |
| parents | 48e2610978e5 |
| children | 22f92bf3572c |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2020-2025 Franz Glasner # :License: BSD-3-Clause # :- r"""Generate and verify checksums for directory trees. """ from __future__ import print_function, absolute_import __all__ = [] import argparse import base64 import binascii import datetime import logging import os import re import stat import sys import time import zlib from . import (__version__, __revision__) from . import util from .util import cm from .util import digest from .util import walk def main(argv=None): def _populate_generate_arguments(gp): """Use to populate command aliases. This is because :class:`argparse.ArgumentParser` does not support them for all supported Python versions. """ gp.add_argument( "--algorithm", "-a", action="store", type=util.argv2algo, help="1 (aka sha1), 224, 256 (aka sha256), 384, 512 (aka sha512), " "3 (alias for sha3-512), 3-224, 3-256, 3-384, 3-512, " "blake2b, blake2b-256, blake2s, " "blake2 (alias for blake2b), " "blake2-256 (alias for blake2b-256), " "md5. " "The default depends on availability in hashlib: " "blake2b-256, sha256 or sha1.") gp.add_argument( "--append-output", action="store_true", dest="append_output", help="Append to the output file instead of overwriting it.") gp.add_argument( "--base64", action="store_true", help="Output checksums in base64 notation, not hexadecimal " "(OpenBSD).") gp.add_argument( "--comment", action="append", default=[], help="Put given comment COMMENT into the output as \"COMMENT\". " "Can be given more than once.") gp.add_argument( "--follow-directory-symlinks", "-l", action="store_true", dest="follow_directory_symlinks", help="Follow symbolic links to directories when walking a " "directory tree. Note that this is different from using " "\"--logical\" or \"--physical\" for arguments given " "directly on the command line") gp.add_argument( "--full-mode", action="store_true", dest="metadata_full_mode", help="Consider all mode bits as returned from stat(2) when " "computing directory digests. " "Note that mode bits on symbolic links itself are not " "considered.") gp.add_argument( "--logical", "-L", dest="logical", action="store_true", default=None, help="Follow symbolic links given on command line arguments." " Note that this is a different setting as to follow symbolic" " links to directories when traversing a directory tree.") gp.add_argument( "--minimal", nargs="?", const="", default=None, metavar="TAG", help="Produce minimal output only. If a TAG is given and not " "empty use it as the leading \"ROOT (<TAG>)\" output.") gp.add_argument( "--mmap", action="store_true", dest="mmap", default=None, help="Use mmap if available. Default is to determine " "automatically from the filesize.") gp.add_argument( "--mode", action="store_true", dest="metadata_mode", help="Consider the permission bits of stat(2) using S_IMODE (i.e. " "all bits without the filetype bits) when " "computing directory digests. Note that mode bits on " "symbolic links itself are not considered.") gp.add_argument( "--mtime", action="store_true", dest="metadata_mtime", help="Consider the mtime of files (non-directories) when " "generating digests for directories. Digests for files are " "not affected.") gp.add_argument( "--no-mmap", action="store_false", dest="mmap", default=None, help="Dont use mmap.") gp.add_argument( "--output", "-o", action="store", metavar="OUTPUT", help="Put the checksum into given file. " "If not given or if it is given as `-' then stdout is used.") gp.add_argument( "--physical", "-P", dest="logical", action="store_false", default=None, help="Do not follow symbolic links given on comment line " "arguments. This is the default.") gp.add_argument( "--print-size", action="store_true", help="""Print the size of a file or the accumulated sizes of directory content into the output also. The size is not considered when computing digests. For symbolic links the size is not printed also.""") gp.add_argument( "--size-only", action="store_true", help="""Print only the size of files and for each directory its accumulated directory size. Digests are not computed.""") gp.add_argument( "directories", nargs="*", metavar="DIRECTORY") def _populate_info_arguments(ip): ip.add_argument( "--last", action="store_true", dest="print_only_last_block", help="Print only the last block of every given input file") ip.add_argument( "digest_files", nargs="+", metavar="TREESUM-DIGEST-FILE") parser = argparse.ArgumentParser( description="Generate and verify checksums for directory trees.", fromfile_prefix_chars='@', add_help=False) # # Global options for all sub-commands. # In a group because this allows a customized title. # gparser = parser.add_argument_group(title="Global Options") gparser.add_argument( "--debug", action="store_true", help="Activate debug logging to stderr") gparser.add_argument( "-v", "--version", action="version", version="%s (rv:%s)" % (__version__, __revision__), help="Show program's version number and exit") gparser.add_argument( "-h", "--help", action="help", help="Show this help message and exit") # # Subcommands # subparsers = parser.add_subparsers( dest="subcommand", title="Commands", description="This tool uses subcommands. " "To see detailed help for a specific subcommand use " "the -h/--help option after the subcommand name. " "A list of valid commands and their short descriptions " "is listed below:", metavar="COMMAND") genparser = subparsers.add_parser( "generate", help="Generate checksums for directory trees", description="Generate checksums for directory trees.") _populate_generate_arguments(genparser) # And an alias for "generate" genparser2 = subparsers.add_parser( "gen", help="Alias for \"generate\"", description="Generate checksums for directory trees. " "This is an alias to \"generate\".") _populate_generate_arguments(genparser2) infoparser = subparsers.add_parser( "info", help="Print some information from given treesum digest file", description="""Print some informations from given treesum digest files to stdout.""" ) _populate_info_arguments(infoparser) hparser = subparsers.add_parser( "help", help="Show this help message or a subcommand's help and exit", description="Show this help message or a subcommand's help and exit.") hparser.add_argument("help_command", nargs='?', metavar="COMMAND") vparser = subparsers.add_parser( "version", help="Show the program's version number and exit", description="Show the program's version number and exit.") # Parse leniently to just check for "version" and/or help opts, _dummy = parser.parse_known_args(args=argv) if opts.subcommand == "version": print("%s (rv:%s)" % (__version__, __revision__), file=sys.stdout) return 0 if opts.subcommand == "help": if not opts.help_command: parser.print_help() else: if opts.help_command == "generate": genparser.print_help() elif opts.help_command == "gen": genparser2.print_help() elif opts.help_command == "info": infoparser.print_help() elif opts.help_command == "version": vparser.print_help() elif opts.help_command == "help": hparser.print_help() else: parser.print_help() return 0 # Reparse strictly opts = parser.parse_args(args=argv) # Minimal logging -- just for debugging - not for more "normal" use logging.basicConfig( level=logging.DEBUG if opts.debug else logging.WARNING, stream=sys.stderr, format="[%(asctime)s][%(levelname)s][%(process)d:%(name)s] %(message)s" ) logging.captureWarnings(True) return treesum(opts) def gen_generate_opts(directories=[], algorithm=util.default_algotag(), append_output=False, base64=False, comment=[], follow_directory_symlinks=False, full_mode=False, logical=None, minimal=None, mode=False, mmap=None, mtime=False, output=None, print_size=False, size_only=False): opts = argparse.Namespace( directories=directories, algorithm=util.argv2algo(algorithm), append_output=append_output, base64=base64, comment=comment, follow_directory_symlinks=follow_directory_symlinks, logical=logical, minimal=minimal, mmap=mmap, metadata_full_mode=full_mode, metadata_mode=mode, metadata_mtime=mtime, output=output, print_size=print_size, size_only=size_only) return opts def gen_info_opts(digest_files=[], last=False): opts = argparse.Namespace( digest_files=digest_files, print_only_last_block=last) return opts def treesum(opts): # XXX TBD: opts.check and opts.checklist (as in shasum.py) if opts.subcommand in ("generate", "gen"): return generate_treesum(opts) elif opts.subcommand == "info": return print_treesum_digestfile_infos(opts) else: raise RuntimeError( "command `{}' not yet handled".format(opts.subcommand)) def generate_treesum(opts): # Provide defaults if not opts.algorithm: opts.algorithm = util.argv2algo(util.default_algotag()) if not opts.directories: opts.directories.append(".") if opts.output is None or opts.output == "-": if hasattr(sys.stdout, "buffer"): out_cm = cm.nullcontext(sys.stdout.buffer) else: out_cm = cm.nullcontext(sys.stdout) else: if opts.append_output: out_cm = open(opts.output, "ab") else: out_cm = open(opts.output, "wb") out_cm = CRC32Output(out_cm) with out_cm as outfp: for d in opts.directories: V1DirectoryTreesumGenerator( opts.algorithm, opts.mmap, opts.base64, opts.logical, opts.follow_directory_symlinks, opts.metadata_mode, opts.metadata_full_mode, opts.metadata_mtime, opts.size_only, opts.print_size, minimal=opts.minimal).generate( outfp, d, comment=opts.comment) class V1DirectoryTreesumGenerator(object): def __init__(self, algorithm, use_mmap, use_base64, handle_root_logical, follow_directory_symlinks, with_metadata_mode, with_metadata_full_mode, with_metadata_mtime, size_only, print_size, minimal=None,): super(V1DirectoryTreesumGenerator, self).__init__() self._algorithm = algorithm self._use_mmap = use_mmap self._use_base64 = use_base64 self._handle_root_logical = handle_root_logical self._follow_directory_symlinks = follow_directory_symlinks self._with_metadata_mode = with_metadata_mode self._with_metadata_full_mode = with_metadata_full_mode self._with_metadata_mtime = with_metadata_mtime self._size_only = size_only self._print_size = print_size self._minimal = minimal def generate(self, outfp, root, comment=None): """ :param outfp: a *binary* file with a "write()" and a "flush()" method """ self._outfp = outfp self._outfp.resetdigest() self._outfp.write(format_bsd_line("VERSION", "1", None, False)) self._outfp.write(format_bsd_line( "FSENCODING", util.n(walk.getfsencoding()), None, False)) self._outfp.flush() # # Note: Given non-default flags that are relevant for # directory traversal. # flags = [] if self._with_metadata_full_mode: flags.append("with-metadata-fullmode") elif self._with_metadata_mode: flags.append("with-metadata-mode") if self._with_metadata_mtime: flags.append("with-metadata-mtime") if self._handle_root_logical: flags.append("logical") if self._follow_directory_symlinks: flags.append("follow-directory-symlinks") if self._size_only: flags.append("size-only") else: if self._print_size: flags.append("print-size") if flags: flags.sort() self._outfp.write( format_bsd_line("FLAGS", ",".join(flags), None, False)) if self._minimal is None: # Write execution timestamps in POSIX epoch and ISO format ts = int(time.time()) self._outfp.write(format_bsd_line("TIMESTAMP", ts, None, False)) ts = (datetime.datetime.utcfromtimestamp(ts)).isoformat("T") self._outfp.write(format_bsd_line("ISOTIMESTAMP", ts, None, False)) if comment: for line in comment: self._outfp.write( format_bsd_line("COMMENT", None, line, False)) if self._minimal is not None: self._outfp.write(format_bsd_line( "ROOT", None, self._minimal if self._minimal else "", False)) else: self._outfp.write(format_bsd_line("ROOT", None, root, False)) self._outfp.flush() if not self._handle_root_logical and os.path.islink(root): linktgt = util.fsencode(os.readlink(root)) linkdgst = self._algorithm[0]() linkdgst.update( util.interpolate_bytes(b"%d:%s,", len(linktgt), linktgt)) dir_dgst = self._algorithm[0]() dir_dgst.update(b"1:L,") dir_dgst.update( util.interpolate_bytes( b"%d:%s,", len(linkdgst.digest()), linkdgst.digest())) if self._size_only: self._outfp.write( format_bsd_line( "SIZE", None, "./@/", False, 0)) else: self._outfp.write( format_bsd_line( self._algorithm[1], dir_dgst.digest(), "./@/", self._use_base64)) self._outfp.flush() self._outfp.write(format_bsd_line( "CRC32", self._outfp.hexcrcdigest(), None, False)) return self._generate(os.path.normpath(root), tuple()) self._outfp.write(format_bsd_line( "CRC32", self._outfp.hexcrcdigest(), None, False)) def _generate(self, root, top): logging.debug("Handling %s/%r", root, top) path = os.path.join(root, *top) if top else root with walk.ScanDir(path) as dirscan: fsobjects = list(dirscan) fsobjects.sort(key=walk.WalkDirEntry.sort_key) dir_dgst = self._algorithm[0]() dir_size = 0 for fso in fsobjects: if fso.is_dir: if fso.is_symlink and not self._follow_directory_symlinks: linktgt = util.fsencode(os.readlink(fso.path)) linkdgst = self._algorithm[0]() linkdgst.update( util.interpolate_bytes( b"%d:%s,", len(linktgt), linktgt)) dir_dgst.update(util.interpolate_bytes( b"1:S,%d:%s,", len(fso.fsname), fso.fsname)) # # - no mtime and no mode for symlinks # - also does not count for dir_size # dir_dgst.update(util.interpolate_bytes( b"%d:%s,", len(linkdgst.digest()), linkdgst.digest())) opath = "/".join(top) + "/" + fso.name if top else fso.name if self._size_only: self._outfp.write(format_bsd_line( "SIZE", None, "%s/./@/" % (opath,), False, 0)) else: self._outfp.write(format_bsd_line( self._algorithm[1], linkdgst.digest(), "%s/./@/" % (opath,), self._use_base64)) self._outfp.flush() else: # # Follow the symlink to dir or handle a "real" directory # # Get subdir data from recursing into it sub_dir_dgst, sub_dir_size = self._generate( root, top + (fso.name, )) dir_size += sub_dir_size dir_dgst.update(util.interpolate_bytes( b"1:d,%d:%s,", len(fso.fsname), fso.fsname)) dir_dgst.update(util.interpolate_bytes( b"%d:%s,", len(sub_dir_dgst), sub_dir_dgst)) if self._with_metadata_full_mode: modestr = util.b(normalized_mode_str(fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"8:fullmode,%d:%s,", len(modestr), modestr)) elif self._with_metadata_mode: modestr = util.b(normalized_compatible_mode_str( fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) else: dir_dgst.update(util.interpolate_bytes( b"1:f,%d:%s,", len(fso.fsname), fso.fsname)) dir_size += fso.stat.st_size if self._with_metadata_mtime: mtime = datetime.datetime.utcfromtimestamp( int(fso.stat.st_mtime)) mtime = util.b(mtime.isoformat("T") + "Z") dir_dgst.update(util.interpolate_bytes( b"5:mtime,%d:%s,", len(mtime), mtime)) if self._with_metadata_full_mode: modestr = util.b(normalized_mode_str(fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"8:fullmode,%d:%s,", len(modestr), modestr)) elif self._with_metadata_mode: modestr = util.b(normalized_compatible_mode_str( fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) if not self._size_only: dgst = digest.compute_digest_file( self._algorithm[0], fso.path, use_mmap=self._use_mmap) dir_dgst.update(util.interpolate_bytes( b"%d:%s,", len(dgst), dgst)) opath = "/".join(top) + "/" + fso.name if top else fso.name if self._size_only: self._outfp.write(format_bsd_line( "SIZE", None, opath, False, fso.stat.st_size)) else: if self._print_size: self._outfp.write(format_bsd_line( self._algorithm[1], dgst, opath, self._use_base64, fso.stat.st_size)) else: self._outfp.write(format_bsd_line( self._algorithm[1], dgst, opath, self._use_base64)) self._outfp.flush() opath = "/".join(top) + "/" if top else "" if self._size_only: self._outfp.write(format_bsd_line( "SIZE", None, opath, False, dir_size)) else: if self._print_size: self._outfp.write(format_bsd_line( self._algorithm[1], dir_dgst.digest(), opath, self._use_base64, dir_size)) else: self._outfp.write(format_bsd_line( self._algorithm[1], dir_dgst.digest(), opath, self._use_base64)) self._outfp.flush() return (dir_dgst.digest(), dir_size) class CRC32Output(object): """Wrapper for a minimal binary file contextmanager that calculates the CRC32 of the written bytes on the fly. Also acts as context manager proxy for the given context manager. """ __slots__ = ("_fp_cm", "_fp", "_crc32") def __init__(self, fp_cm): self._fp_cm = fp_cm self._fp = None self.resetdigest() def __enter__(self): assert self._fp is None self._fp = self._fp_cm.__enter__() return self def __exit__(self, *args): rv = self._fp_cm.__exit__(*args) self._fp = None return rv def write(self, what): self._fp.write(what) self._crc32 = zlib.crc32(what, self._crc32) def flush(self): self._fp.flush() def resetdigest(self): """Reset the current CRC digest""" self._crc32 = zlib.crc32(b"") def hexcrcdigest(self): """ :rtype: str """ return (hex(self.crcdigest())[2:]).upper() def crcdigest(self): """ :rtype: int """ if util.PY2: # Return the bitpattern as unsigned 32-bit number return (~self._crc32 ^ 0xFFFFFFFF) else: return self._crc32 def normalized_compatible_mode_str(mode): # XXX FIXME: Windows and "executable" modebits = stat.S_IMODE(mode) modestr = "%o" % (modebits,) if not modestr.startswith("0"): modestr = "0" + modestr return modestr def normalized_mode_str(mode): modestr = "%o" % (mode,) if not modestr.startswith("0"): modestr = "0" + modestr return modestr def format_bsd_line(what, value, filename, use_base64, size=None): ls = util.b(os.linesep) if not isinstance(what, bytes): what = what.encode("ascii") if what == b"TIMESTAMP": assert filename is None return util.interpolate_bytes(b"TIMESTAMP = %d%s", value, ls) if what in (b"FSENCODING", b"ISOTIMESTAMP", b"FLAGS", b"VERSION", b"CRC32"): assert filename is None return util.interpolate_bytes(b"%s = %s%s", what, util.b(value), ls) assert filename is not None if what == b"COMMENT": return util.interpolate_bytes( b"COMMENT (%s)%s", util.b(filename, "utf-8"), ls) if not isinstance(filename, bytes): filename = util.fsencode(filename) if what == b"SIZE": return util.interpolate_bytes(b"SIZE (%s) = %d%s", filename, size, ls) if value is None: return util.interpolate_bytes(b"%s (%s)%s", what, filename, ls) if use_base64: value = base64.b64encode(value) else: value = binascii.hexlify(value) if filename != b"./@/": filename = util.normalize_filename(filename, True) if size is None: return util.interpolate_bytes( b"%s (%s) = %s%s", what, filename, value, ls) else: return util.interpolate_bytes( b"%s (%s) = %s,%d%s", what, filename, value, size, ls) class TreesumReader(object): """Reader to read and/or verify treesum digest files. Supports the iterator and context manager protocol. """ PATTERN0 = re.compile(br"\A[ \t]*\r?\n\Z") # empty lines PATTERN1 = re.compile(br"\A(VERSION|FSENCODING|FLAGS|TIMESTAMP|ISOTIMESTAMP|CRC32)[ \t]*=[ \t]*([^ \t]+)[ \t]*\r?\n\Z") # noqa: E501 line too long PATTERN2 = re.compile(br"\A(ROOT|COMMENT)[ \t]*\((.*)\)[ \t]*\r?\n\Z") PATTERN3 = re.compile(br"\ASIZE[ \t]*\((.*)\)[ \t]*=[ \t]*(\d+)[ \t]*\r?\n\Z") # noqa: E501 line too long PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)[ \t]*=[ \t]*([A-Za-z0-9=+/]+)(,(\d+))?[ \t]*\r?\n\Z") # noqa: E501 line too long def __init__(self, _fp, _filename, _own_fp): self._fp = _fp self._own_fp = _own_fp self._filename = _filename self._line_no = 0 self._reset_crc() self._expect_crc = None # NOTE: tristate: None is different from False self._current_algo_name = self._current_algo_digest_size = None @classmethod def from_path(cls_, path): """Open file at `path` and return a reader that owns the file object""" return cls_(open(path, "rb"), path, True) @classmethod def from_binary_buffer(cls_, binary_fp, filename): return cls_(binary_fp, filename, False) def __enter__(self): return self def __exit__(self, *args): self.close() def close(self): if self._fp is not None: try: if self._own_fp: self._fp.close() finally: self._fp = None def __iter__(self): return self def __next__(self): rec = self.read_record() if rec is None: raise StopIteration() return rec if util.PY2: next = __next__ def all_records(self): """Iterator over all remaining records""" while True: rec = self.read_record() if rec is None: return yield rec def read_record(self): """Read and parse the "next" line. :returns: `None` at EOF or the parsed contents of the line :rtype: tuple or None """ # Loop to skip empty lines while True: line = self._get_next_line() if not line: # # Skip for empty files at the very beginning. # Check only after the first VERSION line. # if self._expect_crc is not None: if self._expect_crc: logging.warning("CRC32 is missing at EOF") return None if not self.PATTERN0.search(line): break self._update_crc(line) # # At the beginning transparently skip an eventually embedded signify # signature # if self._line_no == 1: if line.startswith(b"untrusted comment: "): line = self._get_next_line() if not line.endswith(b"\n"): raise binascii.Error("No valid signify signature value") # Try to decode for an early error check base64.b64decode(line[:-1]) mo = self.PATTERN1.search(line) if mo: if mo.group(1) == b"VERSION": if self._expect_crc: logging.warning("CRC32 missing before line %d", self._line_no) self._reset_crc() self._expect_crc = True self._update_crc(line) return ("VERSION", util.n(mo.group(2))) if mo.group(1) == b"CRC32": # TODO: check if self._expect_crc is None: logging.warning("Lone CRC32 before VERSION in line %d", self._line_no) else: if self._expect_crc: if (self._hex_crc() != mo.group(2).decode("latin1").upper()): logging.warning( "CRC32 mismatch in line %d:" " expected: %s, given: %s", self._line_no, self._hex_crc(), mo.group(2).decode("latin1").upper()) else: logging.warning("CRC32 before VERSION in line %d", self._line_no) # Do not update the CRC here but reset the state self._expect_crc = False return ("CRC32", util.n(mo.group(2))) else: self._update_crc(line) return (util.n(mo.group(1)), util.n(mo.group(2))) else: mo = self.PATTERN2.search(line) if mo: self._update_crc(line) if mo.group(1) == b"COMMENT": return ("COMMENT", util.u(mo.group(2), "utf-8")) elif mo.group(1) == b"ROOT": return ("ROOT", mo.group(2)) assert False, line else: mo = self.PATTERN3.search(line) if mo: self._update_crc(line) return ("SIZE", mo.group(1), int(util.n(mo.group(2)), 10)) else: mo = self.PATTERN4.search(line) if mo: self._update_crc(line) algo_name = util.n(mo.group(1)) if (len(mo.group(3)) == 2 * self._get_digest_size(algo_name)): # hex digest = binascii.unhexlify(mo.group(3)) else: # base64 digest = base64.b64decode(mo.group(3)) if mo.group(4): size = int(util.n(mo.group(5)), 10) else: size = None return (algo_name, mo.group(2), digest, size) else: assert False, line return line def _get_next_line(self): line = self._fp.readline(4096) # along PATH_MAX on Linux if line: self._line_no += 1 return line def _reset_crc(self): self._crc32 = zlib.crc32(b"") def _update_crc(self, data): self._crc32 = zlib.crc32(data, self._crc32) def _hex_crc(self): return (hex(self._crc32)[2:]).upper() def _get_digest_size(self, algo_name): if self._current_algo_name == algo_name: return self._current_algo_digest_size h = util.algotag2algotype(algo_name)() self._current_algo_name = algo_name self._current_algo_digest_size = h.digest_size return self._current_algo_digest_size def print_treesum_digestfile_infos(opts): print_infos_for_digestfile(opts.digest_files, opts.print_only_last_block) def print_infos_for_digestfile(digest_files, print_only_last_block=True): for fn in digest_files: if fn == "-": if util.PY2: reader = TreesumReader.from_binary_buffer(sys.stdin) else: reader = TreesumReader.from_binary_buffer(sys.stdin.buffer) else: reader = TreesumReader.from_path(fn) with reader: root = flags = algorithm = digest = size = None comments = [] in_block = False block_no = 0 for record in reader: if record[0] == "VERSION": assert record[1] == "1" # start a new block in_block = True block_no += 1 root = flags = algorithm = digest = size = None comments = [] elif record[0] == "FLAGS": flags = record[1] elif record[0] == "ROOT": root = record[1] elif record[0] == "COMMENT": comments.append(record[1]) elif record[0] in ("FSENCODING", "TIMESTAMP", "ISOTIMESTAMP"): pass elif record[0] == "CRC32": pass # in_block = False else: if not in_block: continue # digest line or size line if not record[1] or record[1] == b"./@/": if record[0] == "SIZE": algorithm = "SIZE" digest = None size = record[2] else: algorithm = record[0] digest = record[2] size = record[3] if not print_only_last_block: print_block_data( block_no, root, flags, comments, algorithm, digest, size) root = flags = algorithm = digest = size = None in_block = False if print_only_last_block: if not in_block: if digest is not None or size is not None: print_block_data( block_no, root, flags, comments, algorithm, digest, size) else: logging.warning("missing block end") def print_block_data(block_no, tag, flags, comments, algorithm, digest, size): digeststr = util.n(binascii.hexlify(digest)) if digest else "<no digest>" sizestr = str(size) if size is not None else "<no size>" print("BLOCK No %d:" % (block_no,)) print(" Tag:", tag) print(" Flags:", flags if flags else "<none>") print(" Comments:", comments if comments else "") print(" Algorithm:", algorithm) if algorithm != "SIZE": print(" Digest:", digeststr) print(" Size:", sizestr) if __name__ == "__main__": sys.exit(main())
