Mercurial > hgrepos > Python > apps > py-cutils
diff cutils/treesum.py @ 188:2784fdcc99e5
Implement basic parsing of treesum output.
Including CRC32 checks.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Wed, 15 Jan 2025 14:41:36 +0100 |
| parents | 53614a724bf0 |
| children | 959c6d37b014 |
line wrap: on
line diff
--- a/cutils/treesum.py Tue Jan 14 13:32:25 2025 +0100 +++ b/cutils/treesum.py Wed Jan 15 14:41:36 2025 +0100 @@ -19,6 +19,7 @@ import datetime import logging import os +import re import stat import sys import time @@ -456,16 +457,12 @@ dir_dgst.update(util.interpolate_bytes( b"%d:%s,", len(sub_dir_dgst), sub_dir_dgst)) if self._with_metadata_full_mode: - modestr = normalized_mode_str(fso.stat.st_mode) - if not isinstance(modestr, bytes): - modestr = modestr.encode("ascii") + modestr = util.b(normalized_mode_str(fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"8:fullmode,%d:%s,", len(modestr), modestr)) elif self._with_metadata_mode: - modestr = normalized_compatible_mode_str( - fso.stat.st_mode) - if not isinstance(modestr, bytes): - modestr = modestr.encode("ascii") + modestr = util.b(normalized_compatible_mode_str( + fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) else: @@ -475,21 +472,16 @@ if self._with_metadata_mtime: mtime = datetime.datetime.utcfromtimestamp( int(fso.stat.st_mtime)) - mtime = mtime.isoformat("T") + "Z" - if not isinstance(mtime, bytes): - mtime = mtime.encode("ascii") + mtime = util.b(mtime.isoformat("T") + "Z") dir_dgst.update(util.interpolate_bytes( b"5:mtime,%d:%s,", len(mtime), mtime)) if self._with_metadata_full_mode: - modestr = normalized_mode_str(fso.stat.st_mode) - if not isinstance(modestr, bytes): - modestr = modestr.encode("ascii") + modestr = util.b(normalized_mode_str(fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"8:fullmode,%d:%s,", len(modestr), modestr)) elif self._with_metadata_mode: - modestr = normalized_compatible_mode_str(fso.stat.st_mode) - if not isinstance(modestr, bytes): - modestr = modestr.encode("ascii") + modestr = util.b(normalized_compatible_mode_str( + fso.stat.st_mode)) dir_dgst.update(util.interpolate_bytes( b"4:mode,%d:%s,", len(modestr), modestr)) if not self._size_only: @@ -600,8 +592,7 @@ def format_bsd_line(what, value, filename, use_base64, size=None): - ls = os.linesep if isinstance(os.linesep, bytes) \ - else os.linesep.encode("utf-8") + ls = util.b(os.linesep) if not isinstance(what, bytes): what = what.encode("ascii") if what == b"TIMESTAMP": @@ -609,14 +600,11 @@ return util.interpolate_bytes(b"TIMESTAMP = %d%s", value, ls) if what in (b"ISOTIMESTAMP", b"FLAGS", b"VERSION", b"CRC32"): assert filename is None - if not isinstance(value, bytes): - value = value.encode("ascii") - return util.interpolate_bytes(b"%s = %s%s", what, value, ls) + return util.interpolate_bytes(b"%s = %s%s", what, util.b(value), ls) assert filename is not None if what == b"COMMENT": - if not isinstance(filename, bytes): - filename = filename.encode("utf-8") - return util.interpolate_bytes(b"COMMENT (%s)%s", filename, ls) + return util.interpolate_bytes( + b"COMMENT (%s)%s", util.b(filename, "utf-8"), ls) if not isinstance(filename, bytes): filename = util.fsencode(filename) if what == b"SIZE": @@ -637,5 +625,197 @@ b"%s (%s) = %s,%d%s", what, filename, value, size, ls) +class TreesumReader(object): + + """Reader to read and/or verify treesum digest files. + + Supports the iterator and context manager protocol. + + """ + + PATTERN0 = re.compile(br"\A[ \t]*\r?\n\Z") # empty lines + PATTERN1 = re.compile(br"\A(VERSION|FLAGS|TIMESTAMP|ISOTIMESTAMP|CRC32)[ \t]*=[ \t]*([^ \t]+)[ \t]*\r?\n\Z") # noqa: E501 line too long + PATTERN2 = re.compile(br"\A(ROOT|COMMENT)[ \t]*\((.*)\)[ \t]*\r?\n\Z") + PATTERN3 = re.compile(br"\ASIZE[ \t]*\((.*)\)[ \t]*=[ \t]*(\d+)[ \t]*\r?\n\Z") # noqa: E501 line too long + PATTERN4 = re.compile(br"\A([A-Za-z0-9_-]+)[ \t]*\((.*)\)[ \t]*=[ \t]*([A-Za-z0-9=+/]+)(,(\d+))?[ \t]*\r?\n\Z") # noqa: E501 line too long + + def __init__(self, _fp, _filename, _own_fp): + self._fp = _fp + self._own_fp = _own_fp + self._filename = _filename + self._line_no = 0 + self._reset_crc() + self._expect_crc = None # NOTE: tristate: None is different from False + self._current_algo_name = self._current_algo_digest_size = None + + @classmethod + def from_path(cls_, path): + """Open file at `path` and return a reader that owns the file object""" + return cls_(open(path, "rb"), path, True) + + @classmethod + def from_binary_buffer(cls_, binary_fp, filename): + return cls_(binary_fp, filename, False) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def close(self): + if self._fp is not None: + try: + if self._own_fp: + self._fp.close() + finally: + self._fp = None + + def __iter__(self): + return self + + def __next__(self): + rec = self.read_record() + if rec is None: + raise StopIteration() + return rec + + if util.PY2: + next = __next__ + + def all_records(self): + """Iterator over all remaining records""" + while True: + rec = self.read_record() + if rec is None: + return + yield rec + + def read_record(self): + """Read and parse the "next" line. + + :returns: `None` at EOF or the parsed contents of the line + :rtype: tuple or None + + """ + # Loop to skip empty lines + while True: + line = self._get_next_line() + if not line: + # + # Skip for empty files at the very beginning. + # Check only after the first VERSION line. + # + if self._expect_crc is not None: + if self._expect_crc: + logging.warning("CRC32 is missing at EOF") + return None + if not self.PATTERN0.search(line): + break + self._update_crc(line) + # + # At the beginning transparently skip an eventually embedded signify + # signature + # + if self._line_no == 1: + if line.startswith(b"untrusted comment: "): + line = self._get_next_line() + if not line.endswith(b"\n"): + raise binascii.Error("No valid signify signature value") + # Try to decode for an early error check + base64.b64decode(line[:-1]) + mo = self.PATTERN1.search(line) + if mo: + if mo.group(1) == b"VERSION": + if self._expect_crc: + logging.warning("CRC32 missing before line %d", + self._line_no) + self._reset_crc() + self._expect_crc = True + self._update_crc(line) + return ("VERSION", util.n(mo.group(2))) + if mo.group(1) == b"CRC32": + # TODO: check + if self._expect_crc is None: + logging.warning("Lone CRC32 before VERSION in line %d", + self._line_no) + else: + if self._expect_crc: + if (self._hex_crc() + != mo.group(2).decode("latin1").upper()): + logging.warning( + "CRC32 mismatch in line %d:" + " expected: %s, given: %s", + self._line_no, + self._hex_crc(), + mo.group(2).decode("latin1").upper()) + else: + logging.warning("CRC32 before VERSION in line %d", + self._line_no) + # Do not update the CRC here but reset the state + self._expect_crc = False + return ("CRC32", util.n(mo.group(2))) + else: + self._update_crc(line) + return (util.n(mo.group(1)), util.n(mo.group(2))) + else: + mo = self.PATTERN2.search(line) + if mo: + self._update_crc(line) + if mo.group(1) == b"COMMENT": + return ("COMMENT", util.u(mo.group(2), "utf-8")) + elif mo.group(1) == b"ROOT": + return ("ROOT", mo.group(2)) + assert False, line + else: + mo = self.PATTERN3.search(line) + if mo: + self._update_crc(line) + return ("SIZE", mo.group(1), int(util.n(mo.group(2)), 10)) + else: + mo = self.PATTERN4.search(line) + if mo: + self._update_crc(line) + algo_name = util.n(mo.group(1)) + if (len(mo.group(3)) == + 2 * self._get_digest_size(algo_name)): + # hex + digest = binascii.unhexlify(mo.group(3)) + else: + # base64 + digest = base64.b64decode(mo.group(3)) + if mo.group(4): + size = int(util.n(mo.group(5)), 10) + else: + size = None + return (algo_name, mo.group(2), digest, size) + else: + assert False, line + return line + + def _get_next_line(self): + line = self._fp.readline(2048) + if line: + self._line_no += 1 + return line + + def _reset_crc(self): + self._crc32 = zlib.crc32(b"") + + def _update_crc(self, data): + self._crc32 = zlib.crc32(data, self._crc32) + + def _hex_crc(self): + return (hex(self._crc32)[2:]).upper() + + def _get_digest_size(self, algo_name): + if self._current_algo_name == algo_name: + return self._current_algo_digest_size + h = util.algotag2algotype(algo_name)() + self._current_algo_name = algo_name + self._current_algo_digest_size = h.digest_size + return self._current_algo_digest_size + + if __name__ == "__main__": sys.exit(main())
