changeset 302:bf88323d6bf7

treesum: Implement --exclude/--include. - Filtering - Document in output - Handle in the "info" command
author Franz Glasner <fzglas.hg@dom66.de>
date Wed, 05 Mar 2025 10:07:44 +0100
parents d246b631b85a
children 73d13be531b5
files cutils/treesum.py cutils/util/fnmatch.py
diffstat 2 files changed, 168 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/cutils/treesum.py	Wed Mar 05 10:06:38 2025 +0100
+++ b/cutils/treesum.py	Wed Mar 05 10:07:44 2025 +0100
@@ -502,6 +502,9 @@
             out_cm = open(opts.output, "wb")
     out_cm = CRC32Output(out_cm)
 
+    fnmatcher = fnmatch.FnMatcher.build_from_commandline_patterns(
+        opts.fnmatch_filters)
+
     with out_cm as outfp:
         writer = TreesumWriter(outfp)
         for d in opts.directories:
@@ -515,7 +518,8 @@
                 opts.size_only,
                 opts.print_size,
                 opts.utf8,
-                minimal=opts.minimal).generate(
+                minimal=opts.minimal,
+                fnmatcher=fnmatcher).generate(
                     writer, d, comment=opts.comment)
 
 
@@ -526,7 +530,8 @@
                  with_generator,
                  with_metadata_mode, with_metadata_full_mode,
                  with_metadata_mtime, size_only, print_size, utf8_mode,
-                 minimal=None,):
+                 minimal=None,
+                 fnmatcher=None):
         super(V1DirectoryTreesumGenerator, self).__init__()
         self._algorithm = algorithm
         self._use_mmap = use_mmap
@@ -540,6 +545,7 @@
         self._print_size = print_size
         self._utf8_mode = utf8_mode
         self._minimal = minimal
+        self._fnmatcher = fnmatcher
 
     def generate(self, writer, root, comment=None):
         """
@@ -607,6 +613,9 @@
                 for line in comment:
                     self._writer.write_comment(line)
 
+        for action, kind, pattern in self._fnmatcher.definitions():
+            self._writer.write_fnmatch_pattern(action, kind, pattern)
+
         if self._minimal is not None:
             self._writer.write_root(
                 (walk.WalkDirEntry.alt_u8(self._minimal)
@@ -614,6 +623,7 @@
         else:
             self._writer.write_root(walk.WalkDirEntry.alt_u8(
                 util.normalize_filename(root, True)))
+
         self._writer.flush()
 
         if not self._follow_symlinks.command_line and os.path.islink(root):
@@ -648,6 +658,12 @@
         else:
             logging.debug("Handling root directory: %s", root)
         path = os.path.join(root, *top) if top else root
+        # Determine also the path to be used for directory filtering
+        fpath = join_output_path(top, None) if top else ""
+        if self._fnmatcher:
+            if not self._fnmatcher.shall_visit(fpath):
+                logging.debug("Skipping directory: %s", fpath)
+                return (None, None, None, None)
         try:
             with walk.ScanDir(path) as dirscan:
                 fsobjects = list(dirscan)
@@ -675,6 +691,7 @@
                 opath = walk.WalkDirEntry.alt_fs(path)
             self._writer.write_error(util.interpolate_bytes(
                 b"`%s': %s", opath, errmsg))
+            # Reuse from top
             opath = join_output_path(top, None)
             if opath:
                 if self._utf8_mode:
@@ -710,6 +727,12 @@
                     effective_fso_name = fso.fsname
             # Determine the path (mostly its prefix) that is to be printed
             opath = join_output_path(top, fso.name)
+            # Determine the path to be used for filename filtering
+            fpath = opath
+            if self._fnmatcher:
+                if not self._fnmatcher.shall_visit(fpath):
+                    logging.debug("Skipping: %s", fpath)
+                    continue
             if self._utf8_mode:
                 opath = walk.WalkDirEntry.alt_u8(opath)
             else:
@@ -867,6 +890,13 @@
                     sub_dir_errno, sub_dir_algo, sub_dir_dgst, sub_dir_size = \
                             self._generate(root, top + (fso.name, ))
 
+                    #
+                    # Check first whether the directory was selected to be
+                    # excluded
+                    #
+                    if sub_dir_errno is None:
+                        # Yes -- skipped
+                        continue
                     if sub_dir_errno == 0:
                         dir_size += sub_dir_size
                     else:
@@ -1191,6 +1221,15 @@
         self.write(b"FSENCODING = ")
         self.writeln(util.b(encoding))
 
+    def write_fnmatch_pattern(self, action, kind, pattern):
+        self.write(b"FNMATCH (")
+        self.write(util.b(action))
+        self.write(b": ")
+        self.write(util.b(kind))
+        self.write(b":")
+        self.write(util.b(pattern, "utf-8"))
+        self.writeln(b")")
+
     def write_flags(self, flags):
         self.write(b"FLAGS = ")
         if isinstance(flags, (str, bytes)):
@@ -1283,7 +1322,7 @@
 
     PATTERN0 = re.compile(br"\A[ \t]*\r?\n\Z")   # empty lines
     PATTERN1 = re.compile(br"\A(VERSION|FSENCODING|FLAGS|TIMESTAMP|ISOTIMESTAMP|CRC32)[ \t]*=[ \t]*([^ \t]+)[ \t]*\r?\n\Z")      # noqa: E501  line too long
-    PATTERN2 = re.compile(br"\A(ROOT|COMMENT|ERROR|GENERATOR)[ \t]*\((.*)\)[ \t]*\r?\n\Z")                                       # noqa: E501  line too long
+    PATTERN2 = re.compile(br"\A(ROOT|COMMENT|ERROR|GENERATOR|FNMATCH)[ \t]*\((.*)\)[ \t]*\r?\n\Z")                                       # noqa: E501  line too long
     PATTERN3 = re.compile(br"\ASIZE[ \t]*\((.*)\)([ \t]*=[ \t]*(\d+))?[ \t]*\r?\n\Z")                                               # noqa: E501  line too long
     PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)([ \t]*=[ \t]*([A-Za-z0-9=+/]+)?(,(\d+)?)?)?[ \t]*\r?\n\Z")              # noqa: E501  line too long
 
@@ -1410,7 +1449,8 @@
             mo = self.PATTERN2.search(line)
             if mo:
                 self._update_crc(line)
-                if mo.group(1) in (b"COMMENT", b"ERROR", b"GENERATOR"):
+                if mo.group(1) in (b"COMMENT", b"ERROR", b"GENERATOR",
+                                   b"FNMATCH"):
                     return (util.u(mo.group(1)), util.u(mo.group(2), "utf-8"))
                 elif mo.group(1) == b"ROOT":
                     return ("ROOT", mo.group(2))
@@ -1497,6 +1537,7 @@
                 = size = None
             errors = set()
             comments = []
+            fnmatch_filters = []
             in_block = False
             block_no = 0
             for record in reader:
@@ -1519,6 +1560,8 @@
                     comments.append(record[1])
                 elif record[0] == "ERROR":
                     errors.add(record[1])
+                elif record[0] == "FNMATCH":
+                    fnmatch_filters.append(record[1])
                 elif record[0] in ("TIMESTAMP", "ISOTIMESTAMP"):
                     pass
                 elif record[0] == "CRC32":
@@ -1540,8 +1583,9 @@
                         if not print_only_last_block:
                             print_block_data(
                                 block_no,
-                                root, generator, fsencoding, flags, comments,
-                                errors, algorithm, digest, size)
+                                root, generator, fsencoding, flags,
+                                fnmatch_filters,
+                                comments, errors, algorithm, digest, size)
                             root = generator = flags = fsencoding = algorithm \
                                 = digest = size = None
                             errors = set()
@@ -1552,14 +1596,15 @@
                 if digest is not None or size is not None:
                     print_block_data(
                         block_no,
-                        root, generator, fsencoding, flags, comments, errors,
-                        algorithm, digest, size)
+                        root, generator, fsencoding, flags, fnmatch_filters,
+                        comments, errors, algorithm, digest, size)
             else:
                 logging.warning("missing block end")
 
 
-def print_block_data(block_no, tag, generator, fsencoding, flags, comments,
-                     errors, algorithm, digest, size):
+def print_block_data(block_no, tag, generator, fsencoding, flags,
+                     fnmatch_filters, comments, errors,
+                     algorithm, digest, size):
     digeststr = util.n(binascii.hexlify(digest)) if digest else "<no digest>"
     sizestr = str(size) if size is not None else "<no size>"
     print("BLOCK No %d:" % (block_no,))
@@ -1570,6 +1615,9 @@
     print("    Flags:", flags if flags else "<none>")
     if comments:
         print("    Comments:", comments)
+    if fnmatch_filters:
+        for f in fnmatch_filters:
+            print("    FNMatch:", f)
     print("    Algorithm:", algorithm)
     if algorithm != "SIZE":
         print("    Digest:", digeststr)
--- a/cutils/util/fnmatch.py	Wed Mar 05 10:06:38 2025 +0100
+++ b/cutils/util/fnmatch.py	Wed Mar 05 10:07:44 2025 +0100
@@ -10,7 +10,12 @@
 from __future__ import print_function, absolute_import
 
 
-__all__ = []
+__all__ = ["FnMatcher"]
+
+
+import re
+
+from . import glob
 
 
 HELP_DESCRIPTION = """\
@@ -22,6 +27,109 @@
   re: regular expression
   path: plain text example (rooted), can be a file or a directory or a prefix
         thereof
-  filepath: exactly a single file, relative to the root of the tree
+  fullpath: exactly a single full path (file or directory), relative to the
+            root of the tree
 
 """
+
+
+def glob_factory(pattern):
+
+    cpat = re.compile(
+        # automatically anchored
+        "\\A{}\\Z".format(glob.glob_to_regexp(pattern)),
+        re.DOTALL)
+
+    def _glob_matcher(s):
+        return cpat.search(s) is not None
+
+    return _glob_matcher
+
+
+def iglob_factory(pattern):
+
+    cpat = re.compile(
+        # automatically anchored
+        "\\A{}\\Z".format(glob.glob_to_regexp(pattern)),
+        re.DOTALL | re.IGNORECASE)
+
+    def _iglob_matcher(s):
+        return cpat.search(s) is not None
+
+    return _iglob_matcher
+
+
+def re_factory(pattern):
+
+    cpat = re.compile(pattern, re.DOTALL)
+
+    def _re_matcher(s):
+        return cpat.search(s) is not None
+
+    return _re_matcher
+
+
+def path_factory(pattern):
+
+    def _path_matcher(s):
+        return s.startswith(pattern)
+
+    return _path_matcher
+
+
+def fullpath_factory(pattern):
+
+    def _fullpath_matcher(s):
+        return s == pattern
+
+    return _fullpath_matcher
+
+
+class FnMatcher(object):
+
+    _registry = {
+        "glob": glob_factory,
+        "iglob": iglob_factory,
+        "re": re_factory,
+        "path": path_factory,
+        "fullpath": fullpath_factory,
+    }
+
+    VISIT_DEFAULT = True    # Overall default value for visiting
+
+    def __init__(self, matchers):
+        super(FnMatcher, self).__init__()
+        self._matchers = matchers
+
+    @classmethod
+    def build_from_commandline_patterns(klass, filter_definitions):
+        matchers = []
+        for action, kpattern in filter_definitions:
+            kind, sep, pattern = kpattern.partition(':')
+            if not sep:
+                # use the default
+                kind = "glob"
+                pattern = kpattern
+            factory = klass._registry.get(kind, None)
+            if not factory:
+                raise RuntimeError("unknown pattern kind: {}".format(kind))
+            matchers.append((action, kind, factory(pattern), pattern))
+
+        return klass(matchers)
+
+    def shall_visit(self, fn):
+        visit = self.VISIT_DEFAULT
+        for action, kind, matcher, orig_pattern in self._matchers:
+            res = matcher(fn)
+            if res:
+                if action == "include":
+                    visit = True
+                elif action == "exclude":
+                    visit = False
+                else:
+                    raise RuntimeError("unknown action: {}".format(action))
+        return visit
+
+    def definitions(self):
+        for action, kind, matcher, orig_pattern in self._matchers:
+            yield (action, kind, orig_pattern)