Mercurial > hgrepos > Python > apps > py-cutils
view cutils/treesum.py @ 382:dd0bc31064e4
treesum: Replace "DigestSizeCollector" by "TreesumInfo"
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Sat, 17 May 2025 13:32:39 +0200 |
| parents | ff4424a7a8cf |
| children | ea73723be05e |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # SPDX-FileCopyrightText: © 2025 Franz Glasner # SPDX-License-Identifier: BSD-3-Clause # :- r"""Generate and verify checksums for directory trees. """ from __future__ import print_function, absolute_import __all__ = [] import argparse import base64 import binascii import collections import datetime import errno import logging import os import re import stat import sys import time from . import (__version__, __revision__) from . import util from .util import cm from .util import digest from .util import fnmatch from .util import walk from .util.crc32 import crc32 def main(argv=None): def _populate_generate_arguments(gp): """Use to populate command aliases. This is because :class:`argparse.ArgumentParser` does not support them for all supported Python versions. """ gp.add_argument( "--accept-treesum", "-A", action=PatternMatchAction, kind="accept-treesum", dest="fnmatch_filters", metavar="PATTERN", help="""Accept an existing treesum file PATTERN for a directory checksum. Implicitly this also acts as `--exclude' option. Can be given more than once.""") gp.add_argument( "--algorithm", "-a", action="store", type=util.argv2algo, help="1 (aka sha1), 224, 256 (aka sha256), 384, 512 (aka sha512), " "3 (alias for sha3-512), 3-224, 3-256, 3-384, 3-512, " "blake2b, blake2b-256, blake2s, " "blake2 (alias for blake2b), " "blake2-256 (alias for blake2b-256), " "md5. " "The default depends on availability in hashlib: " "blake2b-256, sha256 or sha1.") gp.add_argument( "--append-output", action="store_true", dest="append_output", help="Append to the output file instead of overwriting it.") gp.add_argument( "--base64", action="store_true", help="Output checksums in base64 notation, not hexadecimal " "(OpenBSD).") gp.add_argument( "--comment", action="append", default=[], help="Put given comment COMMENT into the output as \"COMMENT\". " "Can be given more than once.") gp.add_argument( "--exclude", "-X", action=PatternMatchAction, kind="exclude", dest="fnmatch_filters", metavar="PATTERN", help="""Exclude names matching the given PATTERN. For help on PATTERN use \"help patterns\". Can be given more than once.""") gp.add_argument( "--follow-directory-symlinks", "-l", action=SymlinkAction, const="follow-directory-symlinks", default=FollowSymlinkConfig(False, False, True), dest="follow_symlinks", help="""Follow symbolic links to directories when walking a directory tree. Augments --physical, --half and -p.""") gp.add_argument( "--follow-file-symlinks", action=SymlinkAction, const="follow-file-symlinks", dest="follow_symlinks", help="""Follow symbolic links to files when walking a directory tree. Augments --physical and --half.""") gp.add_argument( "--full-mode", action="store_true", dest="metadata_full_mode", help="Consider all mode bits as returned from stat(2) when " "computing directory digests. " "Note that mode bits on symbolic links itself are not " "considered.") gp.add_argument( "--generator", choices=("normal", "full", "none"), default="normal", help="""Put a `GENERATOR' line into the output. `full' prints full Python and OS/platform version information, `normal' prints just whether Python 2 or Python 3 is used, and `none' suppresses the output completely. The default is `normal'.""") gp.add_argument( "--grouping-separator", action="store", dest="grouping_separator", metavar="GROUPING-SEPARATOR", help=""" Use the given GROUPING-SEPARATOR as thousands separator. Use an empty GROUPING-SEPARATOR to disable grouping. The effective default depends on the --output-style: for tagged output it is the underscore `_', for tabbed output it is the dot `.'. """) gp.add_argument( "--half", "-H", action=SymlinkAction, dest="follow_symlinks", const=FollowSymlinkConfig(True, False, False), help="""Follow symbolic links given on the command line but do not follow symlinks while traversing the directory tree. Overwrites any other symlink related options (--physical, --logical, -p, --no-follow-directory-symlinks, --no-follow-file-symlinks, et al.).""") gp.add_argument( "--include", "-I", action=PatternMatchAction, kind="include", dest="fnmatch_filters", metavar="PATTERN", help="""Include names matching the given PATTERN. For help on PATTERN use \"help patterns\". Can be given more than once.""") gp.add_argument( "--logical", "-L", action=SymlinkAction, dest="follow_symlinks", const=FollowSymlinkConfig(True, True, True), help="""Follow symbolic links everywhere: on command line arguments and -- while walking -- directory and file symbolic links. Overwrites any other symlink related options (--physical, --half, -p, --no-follow-directory-symlinks, --no-follow-file-symlinks, et al.).""") gp.add_argument( "--minimal", nargs="?", const="", default=None, metavar="TAG", help="Produce minimal output only. If a TAG is given and not " "empty use it as the leading \"ROOT (<TAG>)\" output.") gp.add_argument( "--mmap", action="store_true", dest="mmap", default=None, help="Use mmap if available. Default is to determine " "automatically from the filesize.") gp.add_argument( "--mode", action="store_true", dest="metadata_mode", help="Consider the permission bits of stat(2) using S_IMODE (i.e. " "all bits without the filetype bits) when " "computing directory digests. Note that mode bits on " "symbolic links itself are not considered.") gp.add_argument( "--mtime", action="store_true", dest="metadata_mtime", help="Consider the mtime of files (non-directories) when " "generating digests for directories. Digests for files are " "not affected.") gp.add_argument( "--no-follow-directory-symlinks", action=SymlinkAction, const="no-follow-directory-symlinks", dest="follow_symlinks", help="""Do not follow symbolic links to directories when walking a directory tree. Augments --logical.""") gp.add_argument( "--no-follow-file-symlinks", action=SymlinkAction, const="no-follow-file-symlinks", dest="follow_symlinks", help="""Dont follow symbolic links to files when walking a directory tree. Augments --logical and -p.""") gp.add_argument( "--no-mmap", action="store_false", dest="mmap", default=None, help="Dont use mmap.") gp.add_argument( "--output", "-o", action="store", metavar="OUTPUT", help="Put the checksum into given file. " "If not given or if it is given as `-' then stdout is used.") gp.add_argument( "--output-style", dest="output_style", default="tagged", choices=("tagged", "tag", "tabular", "tab"), help=""" Select the output style: "tagged" or "tag" selects a more BSD style tagged format. "tabular" or "tab" select a more GNU style tabular format. Default is "tagged". """) gp.add_argument( "--physical", "-P", action=SymlinkAction, dest="follow_symlinks", const=FollowSymlinkConfig(False, False, False), help="""Do not follow any symbolic links whether they are given on the command line or when walking the directory tree. Overwrites any other symlink related options (--logical, --half, -p, --follow-directory-symlinks, --follow-file-symlinks, et al.).""") gp.add_argument( "-p", action=SymlinkAction, dest="follow_symlinks", const=FollowSymlinkConfig(False, False, True), help="""Do not follow any symbolic links to directories, whether they are given on the command line or when walking the directory tree, but follow symbolic links to files. Overwrites any other symlink related options (--logical, --half, --physical, --follow-directory-symlinks, --no-follow-file-symlinks, et al.). This is the default.""") gp.add_argument( "--print-size", action="store_true", help="""Print the size of a file or the accumulated sizes of directory content into the output also. The size is not considered when computing digests. For symbolic links the size is not printed also.""") gp.add_argument( "--size-only", action="store_true", help="""Print only the size of files and for each directory its accumulated directory size. Digests are not computed.""") gp.add_argument( "--size-width", action="store", type=int, metavar="SIZE-WIDTH", dest="size_column_width", default=15, help="""Some output styles print the a filesize right-aligned in a column. SIZE-WIDTH is the (minimum) width to be used. The width includes grouping separators. Use 0 if no alignment should be done. Default is 15.""") gp.add_argument( "--utf8", "--utf-8", action="store_true", help="""Encode all file paths using UTF-8 instead of the filesystem encoding. Add some error tag into the path if it cannot representated in Unicode cleanly.""") gp.add_argument( "directories", nargs="*", metavar="DIRECTORY") def _populate_info_arguments(ip): ip.add_argument( "--last", action="store_true", dest="print_only_last_block", help="Print only the last block of every given input file") ip.add_argument( "digest_files", nargs="+", metavar="TREESUM-DIGEST-FILE") parser = argparse.ArgumentParser( description="Generate and verify checksums for directory trees.", fromfile_prefix_chars='@', add_help=False) # # Global options for all sub-commands. # In a group because this allows a customized title. # gparser = parser.add_argument_group(title="Global Options") gparser.add_argument( "--debug", action="store_true", help="Activate debug logging to stderr") gparser.add_argument( "-v", "--version", action="version", version="%s (rv:%s)" % (__version__, __revision__), help="Show program's version number and exit") gparser.add_argument( "-h", "--help", action="help", help="Show this help message and exit") # # Subcommands # subparsers = parser.add_subparsers( dest="subcommand", title="Commands", description="This tool uses subcommands. " "To see detailed help for a specific subcommand use " "the -h/--help option after the subcommand name. " "Or you can use the \"help\" subcommand like " "\"help COMMAND\". " "A list of valid commands and their short descriptions " "is listed below:", metavar="COMMAND") markerparser = subparsers.add_parser( "filetypes", help="Show the filetype indicators for all sorts of files", description=walk.HELP_FILETYPE_INDICATORS, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) genparser = subparsers.add_parser( "generate", help="Generate checksums for directory trees", description="Generate checksums for directory trees.") _populate_generate_arguments(genparser) # And an alias for "generate" genparser2 = subparsers.add_parser( "gen", help="Alias for \"generate\"", description="Generate checksums for directory trees. " "This is an alias to \"generate\".") _populate_generate_arguments(genparser2) infoparser = subparsers.add_parser( "info", help="Print some information from given treesum digest file", description="""Print some informations from given treesum digest files to stdout.""" ) _populate_info_arguments(infoparser) hparser = subparsers.add_parser( "help", help="Show this help message or a subcommand's help and exit", description="Show this help message or a subcommand's help and exit.", add_help=False) hparser.add_argument("help_command", nargs='?', metavar="COMMAND") patparser = subparsers.add_parser( "patterns", help="Show the help for PATTERNs and exit", description=fnmatch.HELP_DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) vparser = subparsers.add_parser( "version", help="Show the program's version number and exit", description="Show the program's version number and exit.") # Parse leniently to just check for "version" and/or help opts, _dummy = parser.parse_known_args(args=argv) if opts.subcommand is None: parser.print_help() return 0 elif opts.subcommand == "version": print("%s (rv:%s)" % (__version__, __revision__), file=sys.stdout) return 0 elif opts.subcommand == "help": if not opts.help_command: parser.print_help() else: if opts.help_command == "generate": genparser.print_help() elif opts.help_command == "gen": genparser2.print_help() elif opts.help_command == "info": infoparser.print_help() elif opts.help_command == "version": vparser.print_help() elif opts.help_command == "help": hparser.print_help() elif opts.help_command == "patterns": patparser.print_help() elif opts.help_command == "filetypes": markerparser.print_help() else: parser.print_help() return 0 elif opts.subcommand == "patterns": patparser.print_help() return 0 elif opts.subcommand == "filetypes": markerparser.print_help() return 0 # Reparse strictly opts = parser.parse_args(args=argv) # Minimal logging -- just for debugging - not for more "normal" use logging.basicConfig( level=logging.DEBUG if opts.debug else logging.WARNING, stream=sys.stderr, format="[%(asctime)s][%(levelname)s][%(process)d:%(name)s] %(message)s" ) logging.captureWarnings(True) return treesum(opts) FollowSymlinkConfig = collections.namedtuple("FollowSymlinkConfig", ["command_line", "directory", "file"]) class SymlinkAction(argparse.Action): """`type' is fixed here. `dest' is a tuple with three items: 1. follow symlinks on the command line 2. follow directory symlinks while walking 3. follow file symlinks while walking (not yet implemented) """ def __init__(self, *args, **kwargs): if "nargs" in kwargs: raise ValueError("`nargs' not allowed") if "type" in kwargs: raise ValueError("`type' not allowed") c = kwargs.get("const", None) if c is None: raise ValueError("a const value is needed") if (not isinstance(c, FollowSymlinkConfig) and c not in ("follow-directory-symlinks", "no-follow-directory-symlinks", "follow-file-symlinks", "no-follow-file-symlinks")): raise ValueError( "invalid value for the `const' configuration value") default = kwargs.get("default", None) if (default is not None and not isinstance(default, FollowSymlinkConfig)): raise TypeError("invalid type for `default'") kwargs["nargs"] = 0 super(SymlinkAction, self).__init__(*args, **kwargs) def __call__(self, parser, namespace, values, option_string=None): curval = getattr(namespace, self.dest, None) if curval is None: curval = FollowSymlinkConfig(False, False, True) if isinstance(self.const, FollowSymlinkConfig): curval = self.const else: if self.const == "follow-directory-symlinks": curval = FollowSymlinkConfig( curval.command_line, True, curval.file) elif self.const == "no-follow-directory-symlinks": curval = FollowSymlinkConfig( curval.command_line, False, curval.file) elif self.const == "follow-file-symlinks": curval = FollowSymlinkConfig( curval.command_line, curval.directory, True) elif self.const == "no-follow-file-symlinks": curval = FollowSymlinkConfig( curval.command_line, curval.directory, False) else: assert False, "Implementation error: not yet implemented" setattr(namespace, self.dest, curval) class PatternMatchAction(argparse.Action): def __init__(self, *args, **kwargs): if "nargs" in kwargs: raise argparse.ArgumentError(None, "`nargs' not allowed") if "type" in kwargs: raise argparse.ArgumentError(None, "`type' not allowed") kwargs["nargs"] = 1 self.__kind = kwargs.pop("kind", None) if self.__kind is None: raise argparse.ArgumentError(None, "`kind' is required") if self.__kind not in ("exclude", "include", "accept-treesum"): raise argparse.ArgumentError( None, "`kind' must be one of `include', `exclude' or" " `accept-treesum'") super(PatternMatchAction, self).__init__(*args, **kwargs) def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest, None) if items is None: items = [] setattr(namespace, self.dest, items) for v in values: items.append((self.__kind, v)) def gen_generate_opts(directories=[], algorithm=util.default_algotag(), append_output=False, base64=False, comment=[], fnmatch_filters=[], follow_symlinks=FollowSymlinkConfig(False, False, False), full_mode=False, generator="normal", grouping_separator=None, # the output writer selects logical=None, minimal=None, mode=False, mmap=None, mtime=False, output=None, output_style="tagged", print_size=False, size_only=False, size_column_width=15, utf8=False): if not isinstance(follow_symlinks, FollowSymlinkConfig): raise TypeError("`follow_symlinks' must be a FollowSymlinkConfig") if not isinstance(fnmatch_filters, (list, tuple, type(None))): raise TypeError("`fnmatch_filters' must be a sequence (list, tuple)") if fnmatch_filters: for f in fnmatch_filters: if not isinstance(f, (tuple, list)): raise TypeError( "items in `fnmatch_filters' must be tuples or lists") if f[0] not in ("exclude", "include"): raise ValueError( "every kind of every item in `fnmatch_filters' must be" " \"include\", \"exclude\" or \"accept-treesum\"" ) if generator not in ("normal", "full", "none"): raise ValueError("given generator `%s' not allowed" % (generator, )) if output_style not in ("tagged", "tag", "tabular", "tab"): raise ValueError( "given output_style `%s' not allowed" % (output_style,)) # Not following symlinks to files is not yet supported: reset to True # if not follow_symlinks.file: # follow_symlinks = follow_symlinks._make([follow_symlinks.command_line, # follow_symlinks.directory, # True]) # logging.warning("Coercing to follow-symlinks-file") opts = argparse.Namespace( directories=directories, algorithm=util.argv2algo(algorithm), append_output=append_output, base64=base64, comment=comment, fnmatch_filters=fnmatch_filters, follow_symlinks=follow_symlinks, generator=generator, grouping_separator=grouping_separator, logical=logical, minimal=minimal, mmap=mmap, metadata_full_mode=full_mode, metadata_mode=mode, metadata_mtime=mtime, output=output, output_style=output_style, print_size=print_size, size_only=size_only, size_column_width=size_column_width, utf8=utf8) return opts def gen_info_opts(digest_files=[], last=False): opts = argparse.Namespace( digest_files=digest_files, print_only_last_block=last) return opts def treesum(opts): # XXX TBD: opts.check and opts.checklist (as in shasum.py) if opts.subcommand in ("generate", "gen"): return generate_treesum(opts) elif opts.subcommand == "info": return print_treesum_digestfile_infos(opts) else: # # NOTE: Subcommands for printing help (e.g. "patterns") should # be handled in the caller. # raise RuntimeError( "command `{}' not handled".format(opts.subcommand)) def generate_treesum(opts): # Provide defaults if not opts.algorithm: opts.algorithm = util.argv2algo(util.default_algotag()) if not opts.directories: opts.directories.append(".") if opts.output is None or opts.output == "-": if hasattr(sys.stdout, "buffer"): out_cm = cm.nullcontext(sys.stdout.buffer) else: out_cm = cm.nullcontext(sys.stdout) is_stdout = True else: if opts.append_output: out_cm = open(opts.output, "ab") else: out_cm = open(opts.output, "wb") is_stdout = False fnmatcher = fnmatch.FnMatcher.build_from_commandline_patterns( opts.fnmatch_filters) if opts.output_style in ("tagged", "tag"): writerstyle = TaggedTreesumWriter elif opts.output_style in ("tabular", "tab"): writerstyle = TabularTreesumWriter else: raise NotImplementedError("`output_style'") with out_cm as outfp: writer = writerstyle(outfp, is_stdout=is_stdout, size_only=opts.size_only, print_size=opts.print_size, use_base64=opts.base64, grouping_separator=opts.grouping_separator, size_column_width=opts.size_column_width, ) for d in opts.directories: V1DirectoryTreesumGenerator( opts.algorithm, opts.mmap, opts.follow_symlinks, opts.generator, opts.metadata_mode, opts.metadata_full_mode, opts.metadata_mtime, opts.size_only, opts.print_size, opts.utf8, minimal=opts.minimal, fnmatcher=fnmatcher).generate( writer, d, comment=opts.comment) class V1DirectoryTreesumGenerator(object): def __init__(self, algorithm, use_mmap, follow_symlinks, with_generator, with_metadata_mode, with_metadata_full_mode, with_metadata_mtime, size_only, print_size, utf8_mode, minimal=None, fnmatcher=None): super(V1DirectoryTreesumGenerator, self).__init__() self._algorithm = algorithm self._use_mmap = use_mmap self._follow_symlinks = follow_symlinks self._with_generator = with_generator self._with_metadata_mode = with_metadata_mode self._with_metadata_full_mode = with_metadata_full_mode self._with_metadata_mtime = with_metadata_mtime self._size_only = size_only self._print_size = print_size self._utf8_mode = utf8_mode self._minimal = minimal self._fnmatcher = fnmatcher def generate(self, writer, root, comment=None): """ :param outfp: a *binary* file with a "write()" and a "flush()" method """ self._writer = writer self._writer.start("1") self._writer.write_fsencoding(util.n(walk.getfsencoding().upper())) self._writer.flush() if self._with_generator == "none": pass # do nothing elif self._with_generator == "normal": self._writer.write_generator("PY2" if util.PY2 else "PY3") elif self._with_generator == "full": import platform info = ("treesum %s (rv:%s), %s %s, %s" % (__version__, __revision__, platform.python_implementation(), platform.python_version(), platform.platform())) self._writer.write_generator(info) else: raise NotImplementedError( "not implemented: %s" % (self._with_generator,)) # # Note: Given non-default flags that are relevant for # directory traversal. # flags = [] if self._with_metadata_full_mode: flags.append("with-metadata-fullmode") elif self._with_metadata_mode: flags.append("with-metadata-mode") if self._with_metadata_mtime: flags.append("with-metadata-mtime") flags.append("follow-symlinks-commandline" if self._follow_symlinks.command_line else "no-follow-symlinks-commandline") flags.append("follow-symlinks-directory" if self._follow_symlinks.directory else "no-follow-symlinks-directory") flags.append("follow-symlinks-file" if self._follow_symlinks.file else "no-follow-symlinks-file") if self._size_only: flags.append("size-only") flags.append("utf8-encoding" if self._utf8_mode else "fs-encoding") if self._print_size: flags.append("print-size") self._writer.write_flags(flags) if self._minimal is None: # Write execution timestamps in POSIX epoch and ISO format ts = int(time.time()) self._writer.write_timestamp(ts) ts = (datetime.datetime.utcfromtimestamp(ts)).isoformat("T") + "Z" self._writer.write_isotimestamp(ts) if comment: for line in comment: self._writer.write_comment(line) for action, kind, pattern in self._fnmatcher.definitions(): self._writer.write_fnmatch_pattern(action, kind, pattern) if self._minimal is not None: self._writer.write_root( (walk.WalkDirEntry.alt_u8(self._minimal) if self._minimal else b"")) else: self._writer.write_root(walk.WalkDirEntry.alt_u8( util.normalize_filename(root, True))) self._writer.flush() if not self._follow_symlinks.command_line and os.path.islink(root): linktgt = walk.WalkDirEntry.from_readlink(os.readlink(root)) linkdgst = self._algorithm[0]() linkdgst.update(linktgt.fspath) dir_dgst = self._algorithm[0]() dir_dgst.update(b"2:L@,") dir_dgst.update( util.interpolate_bytes( b"%d:%s,%d:%s,", len(self._algorithm[1]), util.b(self._algorithm[1]), len(linkdgst.digest()), linkdgst.digest())) if self._size_only: self._writer.write_size(b"./@/", -1) else: sz = -1 if self._print_size else None self._writer.write_file_digest( self._algorithm[1], b"./@/", dir_dgst.digest(), sz) self._writer.flush() else: self._generate(os.path.normpath(root), tuple()) self._writer.finish() def _generate(self, root, top): if top: logging.debug("Recursing into directory: %s/%r", root, top) else: logging.debug("Handling root directory: %s", root) fullpath = os.path.join(root, *top) if top else root # Determine also the path to be used for directory filtering fpath = join_output_path(top, None) if top else "" if self._fnmatcher: logging.debug("Checking match against path: %s", fpath) if not self._fnmatcher.shall_visit(fpath): logging.debug("Skipping directory: %s", fpath) return (None, None, None, None) try: logging.debug("Scanning directory: %s", fullpath) with walk.ScanDir(fullpath) as dirscan: fsobjects = list(dirscan) except OSError as e: # # NOTE: Sync the error handler code with this method's # code below before returning! # if e.errno == errno.ENOTDIR: # object exists but is not a directory errmsg = b"not a directory" elif e.errno in (errno.EACCES, errno.EPERM, getattr(errno, "ENOTCAPABLE", errno.EACCES)): # no permissions errmsg = ( b"access denied / no permissions / missing capabilities") elif e.errno == errno.ENOENT: # given object does not exist errmsg = b"no such file or directory" else: raise self._writer.write_error( b"`%s': %s", walk.WalkDirEntry.alt_bytes(fullpath, self._utf8_mode), errmsg) # Reuse from top opath = walk.WalkDirEntry.alt_bytes( join_output_path(top, None), self._utf8_mode) if self._size_only: self._writer.write_size(opath, None) else: self._writer.write_file_digest( self._algorithm[1], opath, None, None) self._writer.flush() return (e.errno, None, None, None) # Check whether to accept existing treesum digest files if self._fnmatcher: for fso in fsobjects: fpath = join_output_path(top, fso.name) if self._fnmatcher.shall_accept_treesum(fpath): # Yes we have hit a treesum digest file logging.debug("Accepting existing treesum from: %s", fpath) treesum_info = None try: treesum_info = TreesumInfo.collect_last_from_file( os.path.join(root, fpath)) except OSError as e: eno = e.errno emsg = e.strerror except Exception as e: # XXX FIXME: other EIO, EBADF, EFAULT eno = errno.ESRCH emsg = str(e) else: eno = 0 emsg = None opath = walk.WalkDirEntry.alt_bytes( join_output_path(top, None), self._utf8_mode) fpath = walk.WalkDirEntry.alt_bytes( fpath, self._utf8_mode) if eno == 0: # # treesum file could be read. # Now check whether the infos we got from it are # compatible with our current requirements # (digest, size). # if self._size_only: if treesum_info.size is None: # # This is a severe error here: just the size # is required, but we have not got one. # self._writer.write_error( b"Missing required size in treesum-file" b" `%s'", walk.WalkDirEntry.alt_bytes( fso.npath, self._utf8_mode)) self._writer.write_size(opath, None) return (errno.ESRCH, None, None, None) else: if self._print_size: if treesum_info.size is None: # # XXX FIXME: Is this a **severe** error # here? Currently: no # self._writer.write_error( b"Missing size in treesum-file `%s'", walk.WalkDirEntry.alt_bytes( fso.npath, self._utf8_mode)) sz = -1 else: sz = treesum_info.size else: sz = None if treesum_info.digest is None: # # This is really a severe error. Most probably # the treesum file was created with # "--size-only" and contains no digest. # self._writer.write_error( b"Missing required digest in treesum-file" b" `%s'", walk.WalkDirEntry.alt_bytes( fso.npath, self._utf8_mode)) self._writer.write_file_digest( treesum_info.algorithm or "MD5", opath, None, sz) return (errno.ESRCH, None, None, None) # We got all required infos without errors self._writer.write_accept_treesum_file(fpath) if self._size_only: self._writer.write_size(opath, treesum_info.size) else: self._writer.write_file_digest( treesum_info.algorithm, opath, treesum_info.digest, sz) return (0, treesum_info.algorithm, treesum_info.digest, treesum_info.size) else: # # treesum file could not be read # self._writer.write_error( b"Cannot read treesum-file `%s' for directory" b"`%s': %s", walk.WalkDirEntry.alt_bytes( fso.npath, self._utf8_mode), walk.WalkDirEntry.alt_u8( join_output_path(top, None)), util.b(emsg, "utf-8", "backslashreplace")) if self._size_only: self._writer.write_size(opath, None) else: self._writer.write_file_digest( self._algorithm[1], opath, None, None) return (eno, None, None, None) # # No treesum file: just process normally with digesting # if self._utf8_mode: fsobjects.sort(key=walk.WalkDirEntry.sort_key_u8) else: fsobjects.sort(key=walk.WalkDirEntry.sort_key_fs) dir_dgst = self._algorithm[0]() dir_size = 0 dir_tainted = False for fso in fsobjects: # Determine the effective name to be used for digesting if self._utf8_mode: if fso.u8name is None: dir_tainted = True effective_fso_name = fso.alt_u8name else: effective_fso_name = fso.u8name else: if fso.fsname is None: dir_tainted = True effective_fso_name = fso.alt_fsname else: effective_fso_name = fso.fsname # Determine the path (mostly its prefix) that is to be printed opath = join_output_path(top, fso.name) # Determine the path to be used for filename filtering fpath = opath if self._fnmatcher: if not self._fnmatcher.shall_visit(fpath): logging.debug("Skipping: %s", fpath) continue opath = walk.WalkDirEntry.alt_bytes(opath, self._utf8_mode) if fso.is_special: special_tag = util.b(fso.special_tag) assert len(special_tag) == 1 assert fso.stat is not None # because .is_special is True if fso.is_symlink and not self._follow_symlinks.file: linktgt = walk.WalkDirEntry.from_readlink( os.readlink(fso.npath)) linkdgst = self._algorithm[0]() if self._utf8_mode: if linktgt.u8path is None: dir_tainted = True linkdgst.update(linktgt.alt_u8path) else: linkdgst.update(linktgt.u8path) else: if linktgt.fspath is None: dir_tainted = True linkdgst.update(linktgt.alt_fspath) else: linkdgst.update(linktgt.fspath) dir_dgst.update(util.interpolate_bytes( b"2:@%s,%d:%s,", special_tag, len(effective_fso_name), effective_fso_name)) dir_dgst.update(util.interpolate_bytes( b"%d:%s,%d:%s,", len(self._algorithm[1]), util.b(self._algorithm[1]), len(linkdgst.digest()), linkdgst.digest())) # # - no mtime and no mode for symlinks # - also does not count for dir_size # if self._size_only: self._writer.write_size( util.interpolate_bytes( b"%s/./@%s", opath, special_tag), -1) else: sz = -1 if self._print_size else None self._writer.write_file_digest( self._algorithm[1], util.interpolate_bytes( b"%s/./@%s", opath, special_tag), linkdgst.digest(), sz) else: # # Follow the symlink to special file and/or handle a # special file # dir_dgst.update(util.interpolate_bytes( b"1:%s,%d:%s,", special_tag, len(effective_fso_name), effective_fso_name)) # no important size here but a mode if self._with_metadata_mtime: mtime = datetime.datetime.utcfromtimestamp( int(fso.stat.st_mtime)) mtime = util.b(mtime.isoformat("T") + "Z") dir_dgst.update(util.interpolate_bytes( b"5:mtime,%d:%s,", len(mtime), mtime)) if self._with_metadata_full_mode: modestr = util.b( normalized_mode_str(fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"8:fullmode,%d:%s,", len(modestr), modestr)) elif self._with_metadata_mode: modestr = util.b(normalized_compatible_mode_str( fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) if self._size_only: self._writer.write_size( util.interpolate_bytes( b"%s/./%s", opath, special_tag), -1) else: sz = -1 if self._print_size else None self._writer.write_file_digest( self._algorithm[1], util.interpolate_bytes( b"%s/./%s", opath, special_tag), b"", sz) elif fso.is_dir: assert fso.stat is not None # because .is_dir is True if fso.is_symlink and not self._follow_symlinks.directory: linktgt = walk.WalkDirEntry.from_readlink( os.readlink(fso.npath)) linkdgst = self._algorithm[0]() if self._utf8_mode: if linktgt.u8path is None: dir_tainted = True linkdgst.update(linktgt.alt_u8path) else: linkdgst.update(linktgt.u8path) else: if linktgt.fspath is None: dir_tainted = True linkdgst.update(linktgt.alt_fspath) else: linkdgst.update(linktgt.fspath) dir_dgst.update(util.interpolate_bytes( b"2:@/,%d:%s,", len(effective_fso_name), effective_fso_name)) # # - no mtime and no mode for symlinks # - also does not count for dir_size # dir_dgst.update(util.interpolate_bytes( b"%d:%s,%d:%s,", len(self._algorithm[1]), util.b(self._algorithm[1]), len(linkdgst.digest()), linkdgst.digest())) if self._size_only: self._writer.write_size( util.interpolate_bytes(b"%s/./@/", opath), -1) else: sz = -1 if self._print_size else None self._writer.write_file_digest( self._algorithm[1], util.interpolate_bytes(b"%s/./@/", opath), linkdgst.digest(), sz) else: # # Follow the symlink to dir or handle a "real" directory # # Get subdir data from recursing into it sub_dir_errno, sub_dir_algo, sub_dir_dgst, sub_dir_size = \ self._generate(root, top + (fso.name, )) # # Check first whether the directory was selected to be # excluded # if sub_dir_errno is None: # Yes -- skipped continue if sub_dir_errno == 0: if sub_dir_size is None: if self._print_size or self._size_only: dir_tainted = True else: dir_size += (sub_dir_size or 0) else: dir_tainted = True dir_dgst.update(util.interpolate_bytes( b"1:/,%d:%s,", len(effective_fso_name), effective_fso_name)) if self._with_metadata_full_mode: modestr = util.b(normalized_mode_str(fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"8:fullmode,%d:%s,", len(modestr), modestr)) elif self._with_metadata_mode: modestr = util.b(normalized_compatible_mode_str( fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) if sub_dir_errno == 0: dir_dgst.update(util.interpolate_bytes( b"%d:%s,%d:%s,", len(sub_dir_algo), util.b(sub_dir_algo), len(sub_dir_dgst), sub_dir_dgst)) else: # NOTE: error message is already printed here dir_dgst.update(util.interpolate_bytes( b"5:errno,%d:%s", len(str(sub_dir_errno)), util.b(str(sub_dir_errno)))) else: if fso.is_symlink and not self._follow_symlinks.file: # # Symbolic link to some filesystem object which is not # determined to be a link to a directory or some other # special file (socket, FIFO, et al.). # linktgt = walk.WalkDirEntry.from_readlink( os.readlink(fso.npath)) linkdgst = self._algorithm[0]() if self._utf8_mode: if linktgt.u8path is None: dir_tainted = True linkdgst.update(linktgt.alt_u8path) else: linkdgst.update(linktgt.u8path) else: if linktgt.fspath is None: dir_tainted = True linkdgst.update(linktgt.alt_fspath) else: linkdgst.update(linktgt.fspath) dir_dgst.update(util.interpolate_bytes( b"1:@,%d:%s,", len(effective_fso_name), effective_fso_name)) # # - no mtime and no mode for symlinks # - also does not count for dir_size # dir_dgst.update(util.interpolate_bytes( b"%d:%s,%d:%s,", len(self._algorithm[1]), util.b(self._algorithm[1]), len(linkdgst.digest()), linkdgst.digest())) if self._size_only: self._writer.write_size( util.interpolate_bytes(b"%s/./@", opath), -1) else: sz = -1 if self._print_size else None self._writer.write_file_digest( self._algorithm[1], util.interpolate_bytes(b"%s/./@", opath), linkdgst.digest(), sz) else: # # Follow the symlink to file or handle a "real" file # dir_dgst.update(util.interpolate_bytes( b"0:,%d:%s,", len(effective_fso_name), effective_fso_name)) if fso.stat is None: # # Error: most likely a broken symlink here # dir_tainted = True dir_dgst.update(util.interpolate_bytes( b"5:errno,%d:%s,", len(str(fso.stat_errno)), util.b(str(fso.stat_errno)))) self._writer.write_error( b"errno %d: %s", fso.stat_errno, util.b(util.escape_for_output(fso.stat_errstr), "utf-8", "backslashreplace")) logging.error( "Directory entry has symlink problems: %s", fso.npath) if self._size_only: self._writer.write_size(opath, None) else: self._writer.write_file_digest( self._algorithm[1], opath, None, None) else: # # Ok: File has normal stat info # # XXX FIXME: Handle special files (fifo, socket, # block or char devices, ...). # dir_size += fso.stat.st_size if self._with_metadata_mtime: mtime = datetime.datetime.utcfromtimestamp( int(fso.stat.st_mtime)) mtime = util.b(mtime.isoformat("T") + "Z") dir_dgst.update(util.interpolate_bytes( b"5:mtime,%d:%s,", len(mtime), mtime)) if self._with_metadata_full_mode: modestr = util.b( normalized_mode_str(fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"8:fullmode,%d:%s,", len(modestr), modestr)) elif self._with_metadata_mode: modestr = util.b(normalized_compatible_mode_str( fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) if self._size_only: # # size can be printed here because .stat is # available # self._writer.write_size(opath, fso.stat.st_size) else: try: dgst = digest.compute_digest_file( self._algorithm[0], fso.npath, use_mmap=self._use_mmap) except OSError as e: dir_tainted = True self._writer.write_error( b"`%s': errno %d: %s", walk.WalkDirEntry.alt_bytes( fso.npath, self._utf8_mode), e.errno, util.b(util.escape_for_output(e.strerror), "utf-8", "backslashreplace")) sz = (fso.stat.st_size if self._print_size else None) self._writer.write_file_digest( self._algorithm[1], opath, None, sz) else: dir_dgst.update(util.interpolate_bytes( b"%d:%s,%d:%s,", len(self._algorithm[1]), util.b(self._algorithm[1]), len(dgst), dgst)) sz = (fso.stat.st_size if self._print_size else None) self._writer.write_file_digest( self._algorithm[1], opath, dgst, sz) self._writer.flush() if dir_tainted: # # IMPORTANT: Print errors BEFORE the associated digest or size # line. Otherwise the "info" command has a problem. # self._writer.write_error(b"%s", b"directory is tainted") logging.error("Directory has problems: %s", fullpath) opath = walk.WalkDirEntry.alt_bytes( join_output_path(top, None), self._utf8_mode) if self._size_only: self._writer.write_size(opath, dir_size) else: sz = dir_size if self._print_size else None self._writer.write_file_digest( self._algorithm[1], opath, dir_dgst.digest(), sz) self._writer.flush() return (0, self._algorithm[1], dir_dgst.digest(), dir_size) def join_output_path(top, name): if name is None: # a path for a directory is to be computed if top: if isinstance(top[0], bytes): return b"/".join(top) + b"/" else: return u"/".join(top) + u"/" else: return b"" else: # a path for a normal file is to be computed if top: if isinstance(name, bytes): return b"/".join(top) + b"/" + name else: return u"/".join(top) + u"/" + name else: return name def normalized_compatible_mode_str(mode): # XXX FIXME: Windows and "executable" modebits = stat.S_IMODE(mode) modestr = "%o" % (modebits,) if not modestr.startswith("0"): modestr = "0" + modestr return modestr def normalized_mode_str(mode): modestr = "%o" % (mode,) if not modestr.startswith("0"): modestr = "0" + modestr return modestr class WriterBase(object): """(Abstract) base class for all treesum digest file writers. Wraps an output file pointer for a binary file. Provides low-level methods to write data lines. These methods must be used if the CRC is to be updated also. Also holds some very common attributes that control some aspects of the output format (e.g. `.LS`, `.use_base64`). Also holds the current CRC for a block. """ LS = util.b(os.linesep) """Because we write the output as binary files we need the official line separator for your OS as bytes""" DEFAULT_GROUPING_SEPARATOR = "" """Disable the thousands separator in case no subclass redefines it""" def __init__(self, outfp, is_stdout=False, size_only=False, print_size=False, use_base64=False, grouping_separator=None, size_column_width=None): # Poor man's abstract abstract class implemenetation assert self.__class__ is not WriterBase self._outfp = outfp try: self._outfp_is_tty = self._outfp.isatty() except: # noqa: E722 bare except self._outfp_is_tty = None self._is_windows = sys.platform == "win32" self._is_stdout = is_stdout self.size_only = size_only self.print_size = print_size self.use_base64 = use_base64 self.grouping_separator = (grouping_separator if grouping_separator is not None else self.DEFAULT_GROUPING_SEPARATOR) self.size_column_width = size_column_width or 0 self.reset_crc() def write_size(self, filename, sz): """ If `sz` is `None` then this is signals an error because a size is required. If the size should not be printed on purpose the `size` should be as negative number. """ raise NotImplementedError("write_size") def write_file_digest(self, algorithm, filename, digest, size): """ If `size` is `None` and the output of a size is required then this is an error signal. If the size should not be printed on purpose the `size` should be as negative number. If `digest` is `None` is an error signal. """ raise NotImplementedError("write_file_digest") @property def crc(self): return self._crc def reset_crc(self): self._crc = crc32() def writeln(self, line): """Write the bytes `line` into the output file and update the CRC accordingly. :param bytes line: The line to write to (without line ending) """ self.write(line) if self._is_windows: if self._is_stdout: if self._outfp_is_tty: # Windows handles this correctly in its terminal self.write(self.LS) else: # # Simulate a CR-LF for the CRC but write a LF only. # This will be converted to a CR-LF on Windows by # the runtime libs. # self._crc.update(b'\r') self.write(b'\n') else: self.write(self.LS) else: self.write(self.LS) def write(self, data): """Write `data` into the output file and update the CRC accordingly. :param bytes data: The data to write to and to update the CRC with """ if data: self._outfp.write(data) self._crc.update(data) def flush(self): self._outfp.flush() class TaggedTreesumWriter(WriterBase): """Writer to write treesum digest files in a format similar to BSD digest files. Provides high-level methods to write data lines. """ DEFAULT_GROUPING_SEPARATOR = '_' """The default thousands separator""" def __init__(self, outfp, **kwds): # No alignment for the size here kwds["size_column_width"] = 0 super(TaggedTreesumWriter, self).__init__(outfp, **kwds) def start(self, version): """Begin a new block, reset the current CRC and write the VERSION tag. """ self.reset_crc() self.write(b"VERSION = ") self.writeln(util.b(version)) def write_comment(self, comment): self.write(b"COMMENT (") comment = util.escape_for_output(comment) self.write(util.b(comment, "utf-8", "backslashreplace")) self.writeln(b")") def write_generator(self, generator): self.write(b"GENERATOR (") self.write(util.b(generator, "utf-8")) self.writeln(b")") def write_error(self, fmt, *args): self.write(b"ERROR (") self.write(util.interpolate_bytes(fmt, *args)) self.writeln(b")") def write_fsencoding(self, encoding): self.write(b"FSENCODING = ") self.writeln(util.b(encoding)) def write_fnmatch_pattern(self, action, kind, pattern): self.write(b"FNMATCH (") self.write(util.b(action)) self.write(b": ") self.write(util.b(kind)) self.write(b":") self.write(util.b(pattern, "utf-8")) self.writeln(b")") def write_flags(self, flags): self.write(b"FLAGS = ") if isinstance(flags, (str, bytes)): self.writeln(util.b(flags)) else: flags.sort() self.writeln(util.b(",".join(flags))) def write_timestamp(self, ts): self.write(b"TIMESTAMP = ") self.writeln(util.b(str(ts))) def write_isotimestamp(self, ts): self.write(b"ISOTIMESTAMP = ") self.writeln(util.b(ts)) def write_root(self, root): assert isinstance(root, bytes) self.write(b"ROOT (") self.write(root) self.writeln(b")") def write_size(self, filename, sz): assert isinstance(filename, bytes) self.write(b"SIZE (") self.write(filename) self.write(b")") if sz is not None: self.write(b" = ") self.write( b"" if sz < 0 else util.b(format(sz, ',').replace( ',', self.grouping_separator))) self.writeln(b"") def write_accept_treesum_file(self, filename): assert isinstance(filename, bytes) self.write(b"ACCEPT-TREESUM (") self.write(filename) self.writeln(b")") def write_file_digest(self, algorithm, filename, digest, size): assert isinstance(filename, bytes) if digest is not None: if digest != b"": digest = (base64.b64encode(digest) if self.use_base64 else binascii.hexlify(digest)) self.write(util.b(algorithm)) self.write(b" (") self.write(filename) self.write(b")") if digest is not None or size is not None: self.write(b" = ") if digest is not None: self.write(digest) if size is not None: self.write(b",") self.write( b"" if size < 0 else util.b(format(size, ',').replace( ',', self.grouping_separator))) self.writeln(b"") def finish(self): """Finish a block and write the current CRC""" crc = self.crc.hexdigest() self.write(b"CRC32 = ") self.writeln(util.b(crc)) self.flush() class TabularTreesumWriter(WriterBase): """Writer to write treesum digest files in a format similar to tabular GNU digest files. Provides high-level methods to write data lines. """ DEFAULT_GROUPING_SEPARATOR = '.' """The default thousands separator""" def __init__(self, outfp, **kwds): super(TabularTreesumWriter, self).__init__(outfp, **kwds) # Prepare some format strings for performance reasons if self.size_column_width > 0: self._formatstring_size = '>' + str(self.size_column_width) self._errorstring_size = b'?' * self.size_column_width self._emptystring_size = b' ' * self.size_column_width else: self._formatstring_size = ">" self._errorstring_size = b"?????" self._emptystring_size = b'' if self.grouping_separator: self._formatstring_size += ',' self._formatstring_size += 'd' def start(self, version): """Begin a new block, reset the current CRC and write the VERSION tag. """ self.reset_crc() self.write(b"VERSION\t") self.writeln(util.b(version)) def write_comment(self, comment): self.write(b"COMMENT\t") comment = util.escape_for_output(comment) self.writeln(util.b(comment, "utf-8", "backslashreplace")) def write_generator(self, generator): self.write(b"GENERATOR\t") self.writeln(util.b(generator, "utf-8")) def write_error(self, fmt, *args): self.write(b"ERROR\t") self.writeln(util.interpolate_bytes(fmt, *args)) def write_fsencoding(self, encoding): self.write(b"FSENCODING\t") self.writeln(util.b(encoding)) def write_fnmatch_pattern(self, action, kind, pattern): self.write(b"FNMATCH\t") self.write(util.b(action)) self.write(b": ") self.write(util.b(kind)) self.write(b":") self.writeln(util.b(pattern, "utf-8")) def write_flags(self, flags): self.write(b"FLAGS\t") if isinstance(flags, (str, bytes)): self.writeln(util.b(flags)) else: flags.sort() self.writeln(util.b(",".join(flags))) def write_timestamp(self, ts): self.write(b"TIMESTAMP\t") self.writeln(util.b(str(ts))) def write_isotimestamp(self, ts): self.write(b"ISOTIMESTAMP\t") self.writeln(util.b(ts)) def write_root(self, root): assert isinstance(root, bytes) self.write(b"ROOT\t") self.writeln(root) def write_size(self, filename, sz): assert isinstance(filename, bytes) if sz is not None: if sz >= 0: self.write(util.b(format( sz, self._formatstring_size).replace( ',', self.grouping_separator))) else: self.write(self._emptystring_size) else: self.write(self._errorstring_size) self.write(b"\t") self.writeln(filename) def write_accept_treesum_file(self, filename): assert isinstance(filename, bytes) self.write(b"ACCEPT-TREESUM\t") self.writeln(filename) def write_file_digest(self, algorithm, filename, digest, size): assert isinstance(filename, bytes) if digest is not None and digest != b"": digest = (base64.b64encode(digest) if self.use_base64 else binascii.hexlify(digest)) else: # # Compute an error digest string with the "correct" length for # given algorithm # nulldigest = b'\0' * util.algotag2digest_size(algorithm) dsz = len(base64.b64encode(nulldigest) if self.use_base64 else binascii.hexlify(nulldigest)) if digest is None: digest = b'?' * dsz else: digest = b' ' * dsz self.write(util.b(algorithm)) self.write(b":") self.write(digest) self.write(b"\t") if self.print_size: if size is not None: if size >= 0: self.write(util.b(format( size, self._formatstring_size).replace( ',', self.grouping_separator))) else: self.write(self._emptystring_size) else: self.write(self._errorstring_size) self.write(b"\t") self.writeln(filename) def finish(self): """Finish a block and write the current CRC""" crc = self.crc.hexdigest() self.write(b"CRC32\t") self.writeln(util.b(crc)) self.flush() class TreesumReader(object): """Reader to read and/or verify treesum digest files. Supports the iterator and context manager protocol. """ PATTERNC = re.compile(br"\A\s*[#;].*\r?\n\Z") # comments, no CRC PATTERNE = re.compile(br"\A[ \t]*\r?\n\Z") # empty lines PATTERN1 = re.compile(br"\A(VERSION|FSENCODING|FLAGS|TIMESTAMP|ISOTIMESTAMP|CRC32)[ \t]*=[ \t]*([^ \t]+)[ \t]*\r?\n\Z") # noqa: E501 line too long PATTERN2 = re.compile(br"\A(ROOT|COMMENT|ERROR|GENERATOR|FNMATCH|ACCEPT-TREESUM)[ \t]*\((.*)\)[ \t]*\r?\n\Z") # noqa: E501 line too long PATTERN3 = re.compile(br"\ASIZE[ \t]*\((.*)\)([ \t]*=[ \t]*([0-9., '_]*[0-9]))?[ \t]*\r?\n\Z") # noqa: E501 line too long PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)([ \t]*=[ \t]*([A-Za-z0-9=+/]+)?(,([0-9., '_]*[0-9])?)?)?[ \t]*\r?\n\Z") # noqa: E501 line too long def __init__(self, _fp, _filename, _own_fp): self._fp = _fp self._own_fp = _own_fp self._filename = _filename self._line_no = 0 self._reset_crc() self._expect_crc = None # NOTE: tristate: None is different from False self._current_algo_name = self._current_algo_digest_size = None @classmethod def from_path(cls_, path): """Open file at `path` and return a reader that owns the file object""" return cls_(open(path, "rb"), path, True) @classmethod def from_binary_buffer(cls_, binary_fp, filename): return cls_(binary_fp, filename, False) def __enter__(self): return self def __exit__(self, *args): self.close() def close(self): if self._fp is not None: try: if self._own_fp: self._fp.close() finally: self._fp = None def __iter__(self): return self def __next__(self): rec = self.read_record() if rec is None: raise StopIteration() return rec if util.PY2: next = __next__ def all_records(self): """Iterator over all remaining records""" while True: rec = self.read_record() if rec is None: return yield rec def read_record(self): """Read and parse the "next" line. :returns: `None` at EOF or the parsed contents of the line :rtype: tuple or None .. note:: Empty lines and comment lines are handled differently with regard to CRCs: empty lines count for the CRC, comment lines do not. """ # Loop to skip empty lines while True: line = self._get_next_line() if not line: # # Skip for empty files at the very beginning. # Check only after the first VERSION line. # if self._expect_crc is not None: if self._expect_crc: logging.warning("CRC32 is missing at EOF") return None # Skip comments and do NOT update CRC for comment lines if self.PATTERNC.search(line): continue if not self.PATTERNE.search(line): break # Empty lines count for CRC self._update_crc(line) # # At the beginning transparently skip an eventually embedded signify # signature # if self._line_no == 1: if line.startswith(b"untrusted comment: "): line = self._get_next_line() if not line.endswith(b"\n"): raise binascii.Error("No valid signify signature value") # Try to decode for an early error check base64.b64decode(line[:-1]) mo = self.PATTERN1.search(line) if mo: if mo.group(1) == b"VERSION": if self._expect_crc: logging.warning("CRC32 missing before line %d", self._line_no) self._reset_crc() self._expect_crc = True self._update_crc(line) return ("VERSION", util.n(mo.group(2))) if mo.group(1) == b"CRC32": # TODO: check if self._expect_crc is None: logging.warning("Lone CRC32 before VERSION in line %d", self._line_no) else: if self._expect_crc: if (self._hex_crc() != mo.group(2).decode("latin1").upper()): logging.warning( "CRC32 mismatch in line %d:" " expected: %s, given: %s", self._line_no, self._hex_crc(), mo.group(2).decode("latin1").upper()) else: logging.warning("CRC32 before VERSION in line %d", self._line_no) # Do not update the CRC here but reset the state self._expect_crc = False return ("CRC32", util.n(mo.group(2))) else: self._update_crc(line) return (util.n(mo.group(1)), util.n(mo.group(2))) else: mo = self.PATTERN2.search(line) if mo: self._update_crc(line) if mo.group(1) in (b"COMMENT", b"ERROR", b"GENERATOR", b"FNMATCH"): return (util.n(mo.group(1)), util.u(mo.group(2), "utf-8")) elif mo.group(1) == b"ROOT": return ("ROOT", mo.group(2)) elif mo.group(1) == b"ACCEPT-TREESUM": return ("ACCEPT-TREESUM", mo.group(2)) assert False, line else: mo = self.PATTERN3.search(line) if mo: self._update_crc(line) if mo.group(2): return ("SIZE", mo.group(1), util.parse_grouped_decimal_number(mo.group(3))) else: return ("SIZE", mo.group(1), None) else: mo = self.PATTERN4.search(line) if mo: self._update_crc(line) algo_name = util.n(mo.group(1)) if mo.group(3): if mo.group(4): if (len(mo.group(4)) == 2 * self._get_digest_size(algo_name)): # hex digest = binascii.unhexlify(mo.group(4)) else: # base64 digest = base64.b64decode(mo.group(4)) else: digest = None if mo.group(5): if mo.group(6): size = util.parse_grouped_decimal_number( mo.group(6)) else: size = None else: size = None return (algo_name, mo.group(2), digest, size) else: return (algo_name, mo.group(2), None, None) else: raise ValueError( "Cannot parse line: %r" % (line,)) return line def _get_next_line(self): line = self._fp.readline(4096) # along PATH_MAX on Linux if line: self._line_no += 1 return line def _reset_crc(self): self._crc32 = crc32() def _update_crc(self, data): self._crc32.update(data) def _hex_crc(self): return self._crc32.hexdigest() def _get_digest_size(self, algo_name): """Get the `digest_size` from algorithm specifier `algo_name`. Cache this on the assumption, that algorithms do not change very often. Do this because the `digest_size` can only be given by a digest instance. """ if self._current_algo_name == algo_name: return self._current_algo_digest_size sz = util.algotag2digest_size(algo_name) self._current_algo_name = algo_name self._current_algo_digest_size = sz return self._current_algo_digest_size def print_treesum_digestfile_infos(opts): get_infos_from_digestfile( opts.digest_files, print_block_data, opts.print_only_last_block) def get_infos_from_digestfile(digest_files, block_handler, only_last_block=True): for fn in digest_files: if fn == "-": if util.PY2: reader = TreesumReader.from_binary_buffer(sys.stdin) else: reader = TreesumReader.from_binary_buffer(sys.stdin.buffer) else: reader = TreesumReader.from_path(fn) with reader: root = generator = flags = fsencoding = algorithm = digest \ = size = None errors = set() comments = [] fnmatch_filters = [] in_block = False block_no = 0 for record in reader: if record[0] == "VERSION": if record[1] != "1": raise ValueError( "VERSION not yet handled: %r" % (record[1],)) # start a new block in_block = True block_no += 1 root = flags = algorithm = digest = size = None comments = [] elif record[0] == "GENERATOR": generator = record[1] elif record[0] == "FSENCODING": fsencoding = record[1] elif record[0] == "FLAGS": flags = record[1] elif record[0] == "ROOT": root = record[1] elif record[0] == "COMMENT": comments.append(record[1]) elif record[0] == "ERROR": errors.add(record[1]) elif record[0] == "FNMATCH": fnmatch_filters.append(record[1]) elif record[0] in ("TIMESTAMP", "ISOTIMESTAMP"): pass elif record[0] == "ACCEPT-TREESUM": pass elif record[0] == "CRC32": pass # in_block = False else: if not in_block: continue # digest line or size line if not record[1] or record[1] == b"./@/": if record[0] == "SIZE": algorithm = "SIZE" digest = None size = record[2] else: algorithm = record[0] digest = record[2] size = record[3] if not only_last_block: block_handler( block_no, root, generator, fsencoding, flags, fnmatch_filters, comments, errors, algorithm, digest, size) root = generator = flags = fsencoding = algorithm \ = digest = size = None errors = set() comments = [] in_block = False if only_last_block: if not in_block: if digest is not None or size is not None: block_handler( block_no, root, generator, fsencoding, flags, fnmatch_filters, comments, errors, algorithm, digest, size) else: logging.warning("missing block end") def print_block_data(block_no, tag, generator, fsencoding, flags, fnmatch_filters, comments, errors, algorithm, digest, size): digeststr = util.n(binascii.hexlify(digest)) if digest else "<no digest>" sizestr = str(size) if size is not None else "<no size>" print("BLOCK No %d:" % (block_no,)) print(" Tag:", tag) print(" FS-Encoding:", fsencoding) if generator: print(" Generator:", generator) print(" Flags:", flags if flags else "<none>") if comments: print(" Comments:", comments) if fnmatch_filters: for f in fnmatch_filters: print(" FNMatch:", f) print(" Algorithm:", algorithm) if algorithm != "SIZE": print(" Digest:", digeststr) print(" Size:", sizestr) if errors: errorlist = list(errors) errorlist.sort() for idx, err in enumerate(errorlist): if idx == 0: print(" Errors:", err) else: print(" ", err) else: print(" Errors: <none>") class TreesumInfo(object): def __init__(self): self._algorithm = self._digest = self._size = None def __call__(self, block_no, tag, generator, fsencoding, flags, fnmatch_filters, comments, errors, algorithm, digest, size): self._algorithm = algorithm self._digest = digest self._size = size @property def algorithm(self): return self._algorithm @property def digest(self): return self._digest @property def size(self): return self._size @classmethod def collect_last_from_file(cls, digest_file): info = cls() get_infos_from_digestfile([digest_file], info, True) return info if __name__ == "__main__": sys.exit(main())
