# HG changeset patch # User Franz Glasner # Date 1741165664 -3600 # Node ID bf88323d6bf7f93c9381baf61d10dfff3df40013 # Parent d246b631b85a18859c052fd1ca7b2870d19c022e treesum: Implement --exclude/--include. - Filtering - Document in output - Handle in the "info" command diff -r d246b631b85a -r bf88323d6bf7 cutils/treesum.py --- a/cutils/treesum.py Wed Mar 05 10:06:38 2025 +0100 +++ b/cutils/treesum.py Wed Mar 05 10:07:44 2025 +0100 @@ -502,6 +502,9 @@ out_cm = open(opts.output, "wb") out_cm = CRC32Output(out_cm) + fnmatcher = fnmatch.FnMatcher.build_from_commandline_patterns( + opts.fnmatch_filters) + with out_cm as outfp: writer = TreesumWriter(outfp) for d in opts.directories: @@ -515,7 +518,8 @@ opts.size_only, opts.print_size, opts.utf8, - minimal=opts.minimal).generate( + minimal=opts.minimal, + fnmatcher=fnmatcher).generate( writer, d, comment=opts.comment) @@ -526,7 +530,8 @@ with_generator, with_metadata_mode, with_metadata_full_mode, with_metadata_mtime, size_only, print_size, utf8_mode, - minimal=None,): + minimal=None, + fnmatcher=None): super(V1DirectoryTreesumGenerator, self).__init__() self._algorithm = algorithm self._use_mmap = use_mmap @@ -540,6 +545,7 @@ self._print_size = print_size self._utf8_mode = utf8_mode self._minimal = minimal + self._fnmatcher = fnmatcher def generate(self, writer, root, comment=None): """ @@ -607,6 +613,9 @@ for line in comment: self._writer.write_comment(line) + for action, kind, pattern in self._fnmatcher.definitions(): + self._writer.write_fnmatch_pattern(action, kind, pattern) + if self._minimal is not None: self._writer.write_root( (walk.WalkDirEntry.alt_u8(self._minimal) @@ -614,6 +623,7 @@ else: self._writer.write_root(walk.WalkDirEntry.alt_u8( util.normalize_filename(root, True))) + self._writer.flush() if not self._follow_symlinks.command_line and os.path.islink(root): @@ -648,6 +658,12 @@ else: logging.debug("Handling root directory: %s", root) path = os.path.join(root, *top) if top else root + # Determine also the path to be used for directory filtering + fpath = join_output_path(top, None) if top else "" + if self._fnmatcher: + if not self._fnmatcher.shall_visit(fpath): + logging.debug("Skipping directory: %s", fpath) + return (None, None, None, None) try: with walk.ScanDir(path) as dirscan: fsobjects = list(dirscan) @@ -675,6 +691,7 @@ opath = walk.WalkDirEntry.alt_fs(path) self._writer.write_error(util.interpolate_bytes( b"`%s': %s", opath, errmsg)) + # Reuse from top opath = join_output_path(top, None) if opath: if self._utf8_mode: @@ -710,6 +727,12 @@ effective_fso_name = fso.fsname # Determine the path (mostly its prefix) that is to be printed opath = join_output_path(top, fso.name) + # Determine the path to be used for filename filtering + fpath = opath + if self._fnmatcher: + if not self._fnmatcher.shall_visit(fpath): + logging.debug("Skipping: %s", fpath) + continue if self._utf8_mode: opath = walk.WalkDirEntry.alt_u8(opath) else: @@ -867,6 +890,13 @@ sub_dir_errno, sub_dir_algo, sub_dir_dgst, sub_dir_size = \ self._generate(root, top + (fso.name, )) + # + # Check first whether the directory was selected to be + # excluded + # + if sub_dir_errno is None: + # Yes -- skipped + continue if sub_dir_errno == 0: dir_size += sub_dir_size else: @@ -1191,6 +1221,15 @@ self.write(b"FSENCODING = ") self.writeln(util.b(encoding)) + def write_fnmatch_pattern(self, action, kind, pattern): + self.write(b"FNMATCH (") + self.write(util.b(action)) + self.write(b": ") + self.write(util.b(kind)) + self.write(b":") + self.write(util.b(pattern, "utf-8")) + self.writeln(b")") + def write_flags(self, flags): self.write(b"FLAGS = ") if isinstance(flags, (str, bytes)): @@ -1283,7 +1322,7 @@ PATTERN0 = re.compile(br"\A[ \t]*\r?\n\Z") # empty lines PATTERN1 = re.compile(br"\A(VERSION|FSENCODING|FLAGS|TIMESTAMP|ISOTIMESTAMP|CRC32)[ \t]*=[ \t]*([^ \t]+)[ \t]*\r?\n\Z") # noqa: E501 line too long - PATTERN2 = re.compile(br"\A(ROOT|COMMENT|ERROR|GENERATOR)[ \t]*\((.*)\)[ \t]*\r?\n\Z") # noqa: E501 line too long + PATTERN2 = re.compile(br"\A(ROOT|COMMENT|ERROR|GENERATOR|FNMATCH)[ \t]*\((.*)\)[ \t]*\r?\n\Z") # noqa: E501 line too long PATTERN3 = re.compile(br"\ASIZE[ \t]*\((.*)\)([ \t]*=[ \t]*(\d+))?[ \t]*\r?\n\Z") # noqa: E501 line too long PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)([ \t]*=[ \t]*([A-Za-z0-9=+/]+)?(,(\d+)?)?)?[ \t]*\r?\n\Z") # noqa: E501 line too long @@ -1410,7 +1449,8 @@ mo = self.PATTERN2.search(line) if mo: self._update_crc(line) - if mo.group(1) in (b"COMMENT", b"ERROR", b"GENERATOR"): + if mo.group(1) in (b"COMMENT", b"ERROR", b"GENERATOR", + b"FNMATCH"): return (util.u(mo.group(1)), util.u(mo.group(2), "utf-8")) elif mo.group(1) == b"ROOT": return ("ROOT", mo.group(2)) @@ -1497,6 +1537,7 @@ = size = None errors = set() comments = [] + fnmatch_filters = [] in_block = False block_no = 0 for record in reader: @@ -1519,6 +1560,8 @@ comments.append(record[1]) elif record[0] == "ERROR": errors.add(record[1]) + elif record[0] == "FNMATCH": + fnmatch_filters.append(record[1]) elif record[0] in ("TIMESTAMP", "ISOTIMESTAMP"): pass elif record[0] == "CRC32": @@ -1540,8 +1583,9 @@ if not print_only_last_block: print_block_data( block_no, - root, generator, fsencoding, flags, comments, - errors, algorithm, digest, size) + root, generator, fsencoding, flags, + fnmatch_filters, + comments, errors, algorithm, digest, size) root = generator = flags = fsencoding = algorithm \ = digest = size = None errors = set() @@ -1552,14 +1596,15 @@ if digest is not None or size is not None: print_block_data( block_no, - root, generator, fsencoding, flags, comments, errors, - algorithm, digest, size) + root, generator, fsencoding, flags, fnmatch_filters, + comments, errors, algorithm, digest, size) else: logging.warning("missing block end") -def print_block_data(block_no, tag, generator, fsencoding, flags, comments, - errors, algorithm, digest, size): +def print_block_data(block_no, tag, generator, fsencoding, flags, + fnmatch_filters, comments, errors, + algorithm, digest, size): digeststr = util.n(binascii.hexlify(digest)) if digest else "" sizestr = str(size) if size is not None else "" print("BLOCK No %d:" % (block_no,)) @@ -1570,6 +1615,9 @@ print(" Flags:", flags if flags else "") if comments: print(" Comments:", comments) + if fnmatch_filters: + for f in fnmatch_filters: + print(" FNMatch:", f) print(" Algorithm:", algorithm) if algorithm != "SIZE": print(" Digest:", digeststr) diff -r d246b631b85a -r bf88323d6bf7 cutils/util/fnmatch.py --- a/cutils/util/fnmatch.py Wed Mar 05 10:06:38 2025 +0100 +++ b/cutils/util/fnmatch.py Wed Mar 05 10:07:44 2025 +0100 @@ -10,7 +10,12 @@ from __future__ import print_function, absolute_import -__all__ = [] +__all__ = ["FnMatcher"] + + +import re + +from . import glob HELP_DESCRIPTION = """\ @@ -22,6 +27,109 @@ re: regular expression path: plain text example (rooted), can be a file or a directory or a prefix thereof - filepath: exactly a single file, relative to the root of the tree + fullpath: exactly a single full path (file or directory), relative to the + root of the tree """ + + +def glob_factory(pattern): + + cpat = re.compile( + # automatically anchored + "\\A{}\\Z".format(glob.glob_to_regexp(pattern)), + re.DOTALL) + + def _glob_matcher(s): + return cpat.search(s) is not None + + return _glob_matcher + + +def iglob_factory(pattern): + + cpat = re.compile( + # automatically anchored + "\\A{}\\Z".format(glob.glob_to_regexp(pattern)), + re.DOTALL | re.IGNORECASE) + + def _iglob_matcher(s): + return cpat.search(s) is not None + + return _iglob_matcher + + +def re_factory(pattern): + + cpat = re.compile(pattern, re.DOTALL) + + def _re_matcher(s): + return cpat.search(s) is not None + + return _re_matcher + + +def path_factory(pattern): + + def _path_matcher(s): + return s.startswith(pattern) + + return _path_matcher + + +def fullpath_factory(pattern): + + def _fullpath_matcher(s): + return s == pattern + + return _fullpath_matcher + + +class FnMatcher(object): + + _registry = { + "glob": glob_factory, + "iglob": iglob_factory, + "re": re_factory, + "path": path_factory, + "fullpath": fullpath_factory, + } + + VISIT_DEFAULT = True # Overall default value for visiting + + def __init__(self, matchers): + super(FnMatcher, self).__init__() + self._matchers = matchers + + @classmethod + def build_from_commandline_patterns(klass, filter_definitions): + matchers = [] + for action, kpattern in filter_definitions: + kind, sep, pattern = kpattern.partition(':') + if not sep: + # use the default + kind = "glob" + pattern = kpattern + factory = klass._registry.get(kind, None) + if not factory: + raise RuntimeError("unknown pattern kind: {}".format(kind)) + matchers.append((action, kind, factory(pattern), pattern)) + + return klass(matchers) + + def shall_visit(self, fn): + visit = self.VISIT_DEFAULT + for action, kind, matcher, orig_pattern in self._matchers: + res = matcher(fn) + if res: + if action == "include": + visit = True + elif action == "exclude": + visit = False + else: + raise RuntimeError("unknown action: {}".format(action)) + return visit + + def definitions(self): + for action, kind, matcher, orig_pattern in self._matchers: + yield (action, kind, orig_pattern)