Mercurial > hgrepos > Python > apps > py-cutils
changeset 261:a3e25957afb7
treesum: instead of using format_bsd_line use a real write object with specialized methods
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Sun, 16 Feb 2025 14:17:10 +0100 |
| parents | 07a0bc723139 |
| children | c3d6599c1b5e |
| files | cutils/treesum.py |
| diffstat | 1 files changed, 177 insertions(+), 118 deletions(-) [+] |
line wrap: on
line diff
--- a/cutils/treesum.py Sun Feb 16 11:07:04 2025 +0100 +++ b/cutils/treesum.py Sun Feb 16 14:17:10 2025 +0100 @@ -438,6 +438,7 @@ out_cm = CRC32Output(out_cm) with out_cm as outfp: + writer = TreesumWriter(outfp) for d in opts.directories: V1DirectoryTreesumGenerator( opts.algorithm, opts.mmap, opts.base64, @@ -450,7 +451,7 @@ opts.print_size, opts.utf8, minimal=opts.minimal).generate( - outfp, d, comment=opts.comment) + writer, d, comment=opts.comment) class V1DirectoryTreesumGenerator(object): @@ -475,31 +476,27 @@ self._utf8_mode = utf8_mode self._minimal = minimal - def generate(self, outfp, root, comment=None): + def generate(self, writer, root, comment=None): """ :param outfp: a *binary* file with a "write()" and a "flush()" method """ - self._outfp = outfp - self._outfp.resetdigest() - self._outfp.write(format_bsd_line("VERSION", "1", None, False)) - self._outfp.write(format_bsd_line( - "FSENCODING", util.n(walk.getfsencoding().upper()), None, False)) - self._outfp.flush() + self._writer = writer + self._writer.start("1") + self._writer.write_fsencoding(util.n(walk.getfsencoding().upper())) + self._writer.flush() if self._with_generator == "none": pass # do nothing elif self._with_generator == "normal": - self._outfp.write(format_bsd_line( - "GENERATOR", None, b"PY2" if util.PY2 else b"PY3", False)) + self._writer.write_generator("PY2" if util.PY2 else "PY3") elif self._with_generator == "full": import platform info = "%s %s, %s" % (platform.python_implementation(), platform.python_version(), platform.platform()) - self._outfp.write(format_bsd_line( - "GENERATOR", None, info.encode("utf-8"), False)) + self._writer.write_generator(info) else: raise NotImplementedError( "not implemented: %s" % (self._with_generator,)) @@ -529,33 +526,26 @@ flags.append("utf8-encoding" if self._utf8_mode else "fs-encoding") if self._print_size: flags.append("print-size") - flags.sort() - self._outfp.write( - format_bsd_line("FLAGS", ",".join(flags), None, False)) + self._writer.write_flags(flags) if self._minimal is None: # Write execution timestamps in POSIX epoch and ISO format ts = int(time.time()) - self._outfp.write(format_bsd_line("TIMESTAMP", ts, None, False)) + self._writer.write_timestamp(ts) ts = (datetime.datetime.utcfromtimestamp(ts)).isoformat("T") - self._outfp.write(format_bsd_line("ISOTIMESTAMP", ts, None, False)) + self._writer.write_isotimestamp(ts) if comment: for line in comment: - self._outfp.write( - format_bsd_line("COMMENT", None, line, False)) + self._writer.write_comment(line) if self._minimal is not None: - self._outfp.write(format_bsd_line( - "ROOT", - None, + self._writer.write_root( (walk.WalkDirEntry.alt_u8(self._minimal) - if self._minimal else b""), - False)) + if self._minimal else b"")) else: - self._outfp.write(format_bsd_line( - "ROOT", None, walk.WalkDirEntry.alt_u8(root), False)) - self._outfp.flush() + self._writer.write_root(walk.WalkDirEntry.alt_u8(root)) + self._writer.flush() if not self._follow_symlinks.command_line and os.path.islink(root): linktgt = walk.WalkDirEntry.from_readlink(os.readlink(root)) @@ -569,28 +559,19 @@ util.interpolate_bytes( b"%d:%s,", len(linkdgst.digest()), linkdgst.digest())) if self._size_only: - self._outfp.write( - format_bsd_line( - "SIZE", - None, - "./@/", - False, - 0)) + self._writer.write_size(b"./@/", 0) else: - self._outfp.write( - format_bsd_line( - self._algorithm[1], - dir_dgst.digest(), - "./@/", - self._use_base64)) - self._outfp.flush() - self._outfp.write(format_bsd_line( - "CRC32", self._outfp.hexcrcdigest(), None, False)) + self._writer.write_file_digest( + self._algorithm[1], + b"./@/", + dir_dgst.digest(), + self._use_base64) + self._writer.flush() + self._writer.finish() return self._generate(os.path.normpath(root), tuple()) - self._outfp.write(format_bsd_line( - "CRC32", self._outfp.hexcrcdigest(), None, False)) + self._writer.finish() def _generate(self, root, top): logging.debug("Handling %s/%r", root, top) @@ -666,17 +647,16 @@ else: opath = walk.WalkDirEntry.alt_fs(opath) if self._size_only: - self._outfp.write(format_bsd_line( - "SIZE", None, + self._writer.write_size( util.interpolate_bytes(b"%s/./@/", opath), - False, 0)) + 0) else: - self._outfp.write(format_bsd_line( + self._writer.write_file_digest( self._algorithm[1], + util.interpolate_bytes(b"%s/./@/", opath), linkdgst.digest(), - util.interpolate_bytes(b"%s/./@/", opath), - self._use_base64)) - self._outfp.flush() + self._use_base64) + self._writer.flush() else: # # Follow the symlink to dir or handle a "real" directory @@ -779,17 +759,16 @@ else: opath = walk.WalkDirEntry.alt_fs(opath) if self._size_only: - self._outfp.write(format_bsd_line( - "SIZE", None, + self._writer.write_size( util.interpolate_bytes(b"%s/./@", opath), - False, 0)) + 0) else: - self._outfp.write(format_bsd_line( + self._writer.write_file_digest( self._algorithm[1], + util.interpolate_bytes(b"%s/./@", opath), linkdgst.digest(), - util.interpolate_bytes(b"%s/./@", opath), - self._use_base64)) - self._outfp.flush() + self._use_base64) + self._writer.flush() else: # # Follow the symlink to file or handle a "real" file @@ -844,20 +823,14 @@ else: opath = walk.WalkDirEntry.alt_fs(opath) if self._size_only: - self._outfp.write(format_bsd_line( - "SIZE", None, opath, False, fso.stat.st_size)) + self._writer.write_size(opath, fso.stat.st_size) else: - if self._print_size: - self._outfp.write(format_bsd_line( - self._algorithm[1], - dgst, opath, - self._use_base64, - fso.stat.st_size)) - else: - self._outfp.write(format_bsd_line( - self._algorithm[1], dgst, opath, - self._use_base64)) - self._outfp.flush() + sz = fso.stat.st_size if self._print_size else None + self._writer.write_file_digest( + self._algorithm[1], opath, dgst, + use_base64=self._use_base64, + size=sz) + self._writer.flush() opath = join_output_path(top, None) if opath: if self._utf8_mode: @@ -865,26 +838,20 @@ else: opath = walk.WalkDirEntry.alt_fs(opath) if self._size_only: - self._outfp.write(format_bsd_line( - "SIZE", None, opath, False, dir_size)) + self._writer.write_size(opath, dir_size) else: if dir_tainted: # # IMPORTANT: Print errors BEFORE the associated digest line. # Otherwise the "info" command has a problem. # - self._outfp.write(format_bsd_line( - b"ERROR", None, b"directory is tainted", False, None)) + self._writer.write_error(b"directory is tainted") logging.error("Directory has filename problems: %r", opath) - if self._print_size: - self._outfp.write(format_bsd_line( - self._algorithm[1], dir_dgst.digest(), opath, - self._use_base64, dir_size)) - else: - self._outfp.write(format_bsd_line( - self._algorithm[1], dir_dgst.digest(), opath, - self._use_base64)) - self._outfp.flush() + sz = dir_size if self._print_size else None + self._writer.write_file_digest( + self._algorithm[1], opath, dir_dgst.digest(), + use_base64=self._use_base64, size=sz) + self._writer.flush() return (dir_dgst.digest(), dir_size) @@ -971,39 +938,131 @@ return modestr -def format_bsd_line(what, value, filename, use_base64, size=None): - ls = util.b(os.linesep) - if not isinstance(what, bytes): - what = what.encode("ascii") - if what == b"TIMESTAMP": - assert filename is None - return util.interpolate_bytes(b"TIMESTAMP = %d%s", value, ls) - if what in (b"FSENCODING", b"ISOTIMESTAMP", b"FLAGS", b"VERSION", - b"CRC32"): - assert filename is None - return util.interpolate_bytes(b"%s = %s%s", what, util.b(value), ls) - assert filename is not None - if what in (b"COMMENT", b"ERROR", b"GENERATOR"): - return util.interpolate_bytes( - b"%s (%s)%s", what, util.b(filename, "utf-8"), ls) - if not isinstance(filename, bytes): - filename = util.fsencode(filename) - if what == b"SIZE": - return util.interpolate_bytes(b"SIZE (%s) = %d%s", filename, size, ls) - if value is None: - return util.interpolate_bytes(b"%s (%s)%s", what, filename, ls) - if use_base64: - value = base64.b64encode(value) - else: - value = binascii.hexlify(value) - if filename != b"./@/": - filename = util.normalize_filename(filename, True) - if size is None: - return util.interpolate_bytes( - b"%s (%s) = %s%s", what, filename, value, ls) - else: - return util.interpolate_bytes( - b"%s (%s) = %s,%d%s", what, filename, value, size, ls) +class TreesumWriter(object): + + """Writer to write treesum digest files in a format similar to BSD + digest files. + + Wraps an output file pointer for a binary file. + + Provides high-level methods to write data lines. + + Also holds the current CRC for a block. + + """ + + LS = util.b(os.linesep) + + def __init__(self, outfp): + self._outfp = outfp + self._reset_crc() + + def _reset_crc(self): + self._crc = crc32() + + def start(self, version): + """Begin a new block, reset the current CRC and write the VERSION + tag. + + """ + self._reset_crc() + self.write(b"VERSION = ") + self.writeln(util.b(version)) + + def write_comment(self, comment): + self.write(b"COMMENT (") + self.write(util.b(comment, "utf-8")) + self.writeln(b")") + + def write_generator(self, generator): + self.write(b"GENERATOR (") + self.write(util.b(generator, "utf-8")) + self.writeln(b")") + + def write_error(self, error): + self.write(b"ERROR (") + self.write(util.b(error, "utf-8")) + self.writeln(b")") + + def write_fsencoding(self, encoding): + self.write(b"FSENCODING = ") + self.writeln(util.b(encoding)) + + def write_flags(self, flags): + self.write(b"FLAGS = ") + if isinstance(flags, (str, bytes, bytearray)): + self.writeln(util.b(flags)) + else: + flags.sort() + self.writeln(util.b(",".join(flags))) + + def write_timestamp(self, ts): + self.write(b"TIMESTAMP = ") + self.writeln(util.b(str(ts))) + + def write_isotimestamp(self, ts): + self.write(b"ISOTIMESTAMP = ") + self.writeln(util.b(ts)) + + def write_root(self, root): + assert isinstance(root, bytes) + self.write(b"ROOT (") + self.write(root) + self.writeln(b")") + + def write_size(self, filename, sz): + assert isinstance(filename, bytes) + self.write(b"SIZE (") + self.write(filename) + self.write(b") = ") + self.writeln(util.b(str(sz))) + + def write_file_digest(self, algorithm, filename, digest, + use_base64=False, size=None): + digest = (base64.b64encode(digest) + if use_base64 + else binascii.hexlify(digest)) + if filename != b"./@/": + filename = util.normalize_filename(filename, True) + self.write(util.b(algorithm)) + self.write(b" (") + self.write(filename) + self.write(b") = ") + self.write(digest) + if size is not None: + self.write(b",") + self.writeln(util.b(str(size))) + else: + self.writeln(b"") + + def finish(self): + """Finish a block and write the current CRC""" + crc = self._crc.hexdigest() + self.write(b"CRC32 = ") + self.writeln(util.b(crc)) + + def writeln(self, line): + """Write the bytes `line` into the output file and update the CRC + accordingly. + + :param bytes line: The line to write to (without line ending) + + """ + self.write(line) + self.write(self.LS) + + def write(self, data): + """Write `data` into the output file and update the CRC accordingly. + + :param bytes data: The data to write to and to update the CRC with + + """ + if data: + self._outfp.write(data) + self._crc.update(data) + + def flush(self): + self._outfp.flush() class TreesumReader(object):
