diff cutils/treesum.py @ 188:2784fdcc99e5

Implement basic parsing of treesum output. Including CRC32 checks.
author Franz Glasner <fzglas.hg@dom66.de>
date Wed, 15 Jan 2025 14:41:36 +0100
parents 53614a724bf0
children 959c6d37b014
line wrap: on
line diff
--- a/cutils/treesum.py	Tue Jan 14 13:32:25 2025 +0100
+++ b/cutils/treesum.py	Wed Jan 15 14:41:36 2025 +0100
@@ -19,6 +19,7 @@
 import datetime
 import logging
 import os
+import re
 import stat
 import sys
 import time
@@ -456,16 +457,12 @@
                     dir_dgst.update(util.interpolate_bytes(
                         b"%d:%s,", len(sub_dir_dgst), sub_dir_dgst))
                     if self._with_metadata_full_mode:
-                        modestr = normalized_mode_str(fso.stat.st_mode)
-                        if not isinstance(modestr, bytes):
-                            modestr = modestr.encode("ascii")
+                        modestr = util.b(normalized_mode_str(fso.stat.st_mode))
                         dir_dgst.update(util.interpolate_bytes(
                             b"8:fullmode,%d:%s,", len(modestr), modestr))
                     elif self._with_metadata_mode:
-                        modestr = normalized_compatible_mode_str(
-                            fso.stat.st_mode)
-                        if not isinstance(modestr, bytes):
-                            modestr = modestr.encode("ascii")
+                        modestr = util.b(normalized_compatible_mode_str(
+                            fso.stat.st_mode))
                         dir_dgst.update(util.interpolate_bytes(
                             b"4:mode,%d:%s,", len(modestr), modestr))
             else:
@@ -475,21 +472,16 @@
                 if self._with_metadata_mtime:
                     mtime = datetime.datetime.utcfromtimestamp(
                         int(fso.stat.st_mtime))
-                    mtime = mtime.isoformat("T") + "Z"
-                    if not isinstance(mtime, bytes):
-                        mtime = mtime.encode("ascii")
+                    mtime = util.b(mtime.isoformat("T") + "Z")
                     dir_dgst.update(util.interpolate_bytes(
                         b"5:mtime,%d:%s,", len(mtime), mtime))
                 if self._with_metadata_full_mode:
-                    modestr = normalized_mode_str(fso.stat.st_mode)
-                    if not isinstance(modestr, bytes):
-                        modestr = modestr.encode("ascii")
+                    modestr = util.b(normalized_mode_str(fso.stat.st_mode))
                     dir_dgst.update(util.interpolate_bytes(
                         b"8:fullmode,%d:%s,", len(modestr), modestr))
                 elif self._with_metadata_mode:
-                    modestr = normalized_compatible_mode_str(fso.stat.st_mode)
-                    if not isinstance(modestr, bytes):
-                        modestr = modestr.encode("ascii")
+                    modestr = util.b(normalized_compatible_mode_str(
+                        fso.stat.st_mode))
                     dir_dgst.update(util.interpolate_bytes(
                         b"4:mode,%d:%s,", len(modestr), modestr))
                 if not self._size_only:
@@ -600,8 +592,7 @@
 
 
 def format_bsd_line(what, value, filename, use_base64, size=None):
-    ls = os.linesep if isinstance(os.linesep, bytes) \
-        else os.linesep.encode("utf-8")
+    ls = util.b(os.linesep)
     if not isinstance(what, bytes):
         what = what.encode("ascii")
     if what == b"TIMESTAMP":
@@ -609,14 +600,11 @@
         return util.interpolate_bytes(b"TIMESTAMP = %d%s", value, ls)
     if what in (b"ISOTIMESTAMP", b"FLAGS", b"VERSION", b"CRC32"):
         assert filename is None
-        if not isinstance(value, bytes):
-            value = value.encode("ascii")
-        return util.interpolate_bytes(b"%s = %s%s", what, value, ls)
+        return util.interpolate_bytes(b"%s = %s%s", what, util.b(value), ls)
     assert filename is not None
     if what == b"COMMENT":
-        if not isinstance(filename, bytes):
-            filename = filename.encode("utf-8")
-        return util.interpolate_bytes(b"COMMENT (%s)%s", filename, ls)
+        return util.interpolate_bytes(
+            b"COMMENT (%s)%s", util.b(filename, "utf-8"), ls)
     if not isinstance(filename, bytes):
         filename = util.fsencode(filename)
     if what == b"SIZE":
@@ -637,5 +625,197 @@
             b"%s (%s) = %s,%d%s", what, filename, value, size, ls)
 
 
+class TreesumReader(object):
+
+    """Reader to read and/or verify treesum digest files.
+
+    Supports the iterator and context manager protocol.
+
+    """
+
+    PATTERN0 = re.compile(br"\A[ \t]*\r?\n\Z")     # empty lines
+    PATTERN1 = re.compile(br"\A(VERSION|FLAGS|TIMESTAMP|ISOTIMESTAMP|CRC32)[ \t]*=[ \t]*([^ \t]+)[ \t]*\r?\n\Z")      # noqa: E501  line too long
+    PATTERN2 = re.compile(br"\A(ROOT|COMMENT)[ \t]*\((.*)\)[ \t]*\r?\n\Z")
+    PATTERN3 = re.compile(br"\ASIZE[ \t]*\((.*)\)[ \t]*=[ \t]*(\d+)[ \t]*\r?\n\Z")                                    # noqa: E501  line too long
+    PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)[ \t]*=[ \t]*([A-Za-z0-9=+/]+)(,(\d+))?[ \t]*\r?\n\Z")   # noqa: E501  line too long
+
+    def __init__(self, _fp, _filename, _own_fp):
+        self._fp = _fp
+        self._own_fp = _own_fp
+        self._filename = _filename
+        self._line_no = 0
+        self._reset_crc()
+        self._expect_crc = None  # NOTE: tristate: None is different from False
+        self._current_algo_name = self._current_algo_digest_size = None
+
+    @classmethod
+    def from_path(cls_, path):
+        """Open file at `path` and return a reader that owns the file object"""
+        return cls_(open(path, "rb"), path, True)
+
+    @classmethod
+    def from_binary_buffer(cls_, binary_fp, filename):
+        return cls_(binary_fp, filename, False)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def close(self):
+        if self._fp is not None:
+            try:
+                if self._own_fp:
+                    self._fp.close()
+            finally:
+                self._fp = None
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        rec = self.read_record()
+        if rec is None:
+            raise StopIteration()
+        return rec
+
+    if util.PY2:
+        next = __next__
+
+    def all_records(self):
+        """Iterator over all remaining records"""
+        while True:
+            rec = self.read_record()
+            if rec is None:
+                return
+            yield rec
+
+    def read_record(self):
+        """Read and parse the "next" line.
+
+        :returns: `None` at EOF or the parsed contents of the line
+        :rtype: tuple or None
+
+        """
+        # Loop to skip empty lines
+        while True:
+            line = self._get_next_line()
+            if not line:
+                #
+                # Skip for empty files at the very beginning.
+                # Check only after the first VERSION line.
+                #
+                if self._expect_crc is not None:
+                    if self._expect_crc:
+                        logging.warning("CRC32 is missing at EOF")
+                return None
+            if not self.PATTERN0.search(line):
+                break
+            self._update_crc(line)
+        #
+        # At the beginning transparently skip an eventually  embedded signify
+        # signature
+        #
+        if self._line_no == 1:
+            if line.startswith(b"untrusted comment: "):
+                line = self._get_next_line()
+                if not line.endswith(b"\n"):
+                    raise binascii.Error("No valid signify signature value")
+                # Try to decode for an early error check
+                base64.b64decode(line[:-1])
+        mo = self.PATTERN1.search(line)
+        if mo:
+            if mo.group(1) == b"VERSION":
+                if self._expect_crc:
+                    logging.warning("CRC32 missing before line %d",
+                                    self._line_no)
+                self._reset_crc()
+                self._expect_crc = True
+                self._update_crc(line)
+                return ("VERSION", util.n(mo.group(2)))
+            if mo.group(1) == b"CRC32":
+                # TODO: check
+                if self._expect_crc is None:
+                    logging.warning("Lone CRC32 before VERSION in line %d",
+                                    self._line_no)
+                else:
+                    if self._expect_crc:
+                        if (self._hex_crc()
+                                != mo.group(2).decode("latin1").upper()):
+                            logging.warning(
+                                "CRC32 mismatch in line %d:"
+                                " expected: %s, given: %s",
+                                self._line_no,
+                                self._hex_crc(),
+                                mo.group(2).decode("latin1").upper())
+                    else:
+                        logging.warning("CRC32 before VERSION in line %d",
+                                        self._line_no)
+                # Do not update the CRC here but reset the state
+                self._expect_crc = False
+                return ("CRC32", util.n(mo.group(2)))
+            else:
+                self._update_crc(line)
+                return (util.n(mo.group(1)), util.n(mo.group(2)))
+        else:
+            mo = self.PATTERN2.search(line)
+            if mo:
+                self._update_crc(line)
+                if mo.group(1) == b"COMMENT":
+                    return ("COMMENT", util.u(mo.group(2), "utf-8"))
+                elif mo.group(1) == b"ROOT":
+                    return ("ROOT", mo.group(2))
+                assert False, line
+            else:
+                mo = self.PATTERN3.search(line)
+                if mo:
+                    self._update_crc(line)
+                    return ("SIZE", mo.group(1), int(util.n(mo.group(2)), 10))
+                else:
+                    mo = self.PATTERN4.search(line)
+                    if mo:
+                        self._update_crc(line)
+                        algo_name = util.n(mo.group(1))
+                        if (len(mo.group(3)) ==
+                                2 * self._get_digest_size(algo_name)):
+                            # hex
+                            digest = binascii.unhexlify(mo.group(3))
+                        else:
+                            # base64
+                            digest = base64.b64decode(mo.group(3))
+                        if mo.group(4):
+                            size = int(util.n(mo.group(5)), 10)
+                        else:
+                            size = None
+                        return (algo_name, mo.group(2), digest, size)
+                    else:
+                        assert False, line
+        return line
+
+    def _get_next_line(self):
+        line = self._fp.readline(2048)
+        if line:
+            self._line_no += 1
+        return line
+
+    def _reset_crc(self):
+        self._crc32 = zlib.crc32(b"")
+
+    def _update_crc(self, data):
+        self._crc32 = zlib.crc32(data, self._crc32)
+
+    def _hex_crc(self):
+        return (hex(self._crc32)[2:]).upper()
+
+    def _get_digest_size(self, algo_name):
+        if self._current_algo_name == algo_name:
+            return self._current_algo_digest_size
+        h = util.algotag2algotype(algo_name)()
+        self._current_algo_name = algo_name
+        self._current_algo_digest_size = h.digest_size
+        return self._current_algo_digest_size
+
+
 if __name__ == "__main__":
     sys.exit(main())