comparison cutils/treesum.py @ 124:3bd3f32b5e60

A first version of "treesum" is working
author Franz Glasner <fzglas.hg@dom66.de>
date Thu, 02 Jan 2025 13:29:20 +0100
parents
children 12d6ec1f8613
comparison
equal deleted inserted replaced
123:4a0c3c9eead7 124:3bd3f32b5e60
1 # -*- coding: utf-8 -*-
2 # :-
3 # :Copyright: (c) 2020-2025 Franz Glasner
4 # :License: BSD-3-Clause
5 # :-
6 r"""Generate and verify checksums for directory trees.
7
8 """
9
10 from __future__ import print_function, absolute_import
11
12
13 __all__ = []
14
15
16 import argparse
17 import base64
18 import binascii
19 import os
20 import sys
21
22 from . import (__version__, __revision__)
23 from . import util
24 from .util import cm
25 from .util import digest
26 from .util import walk
27
28
29 def main(argv=None):
30 aparser = argparse.ArgumentParser(
31 description="Generate checksums for directory trees",
32 fromfile_prefix_chars='@')
33 aparser.add_argument(
34 "--algorithm", "-a", action="store", type=util.argv2algo,
35 help="1 (aka sha1), 224, 256, 384, 512, "
36 "3 (alias for sha3-512), 3-224, 3-256, 3-384, 3-512, "
37 "blake2b, blake2b-256 (default), blake2s, "
38 "blake2 (alias for blake2b), blake2-256 (alias for blake2b-256), "
39 "md5")
40 aparser.add_argument(
41 "--append-output", action="store_true", dest="append_output",
42 help="Append to the output file instead of overwriting it.")
43 aparser.add_argument(
44 "--base64", action="store_true",
45 help="Output checksums in base64 notation, not hexadecimal (OpenBSD).")
46 aparser.add_argument(
47 "--mmap", action="store_true", dest="mmap", default=None,
48 help="Use mmap if available. Default is to determine automatically "
49 "from the filesize.")
50 aparser.add_argument(
51 "--no-mmap", action="store_false", dest="mmap", default=None,
52 help="Dont use mmap.")
53 aparser.add_argument(
54 "--output", "-o", action="store", metavar="OUTPUT",
55 help="Put the checksum into given file. If not given of if it is given"
56 " as `-' then stdout is used.")
57 aparser.add_argument(
58 "--version", "-v", action="version",
59 version="%s (rv:%s)" % (__version__, __revision__))
60 aparser.add_argument(
61 "directories", nargs="*", metavar="DIRECTORY")
62
63 opts = aparser.parse_args(args=argv)
64
65 if not opts.algorithm:
66 opts.algorithm = util.argv2algo("blake2b-256")
67
68 return treesum(opts)
69
70
71 def gen_opts(directories=[],
72 algorithm="BLAKE2b-256",
73 append_output=False,
74 base64=False,
75 mmap=None,
76 output=None):
77 opts = argparse.Namespace(directories=directories,
78 algorithm=(util.algotag2algotype(algorithm),
79 algorithm),
80 append_output=append_output,
81 base64=base64,
82 mmap=mmap,
83 output=output)
84 return opts
85
86
87 def treesum(opts):
88 # XXX TBD: opts.check and opts.checklist (as in shasum.py)
89 return generate_treesum(opts)
90
91
92 def generate_treesum(opts):
93 if not opts.directories:
94 opts.directories.append(".")
95
96 if opts.output is None or opts.output == "-":
97 if hasattr(sys.stdout, "buffer"):
98 out_cm = cm.nullcontext(sys.stdout.buffer)
99 else:
100 out_cm = cm.nullcontext(sys.stdout)
101 else:
102 if opts.append_output:
103 out_cm = open(opts.output, "ab")
104 else:
105 out_cm = open(opts.output, "wb")
106
107 with out_cm as outfp:
108 for d in opts.directories:
109 generate_treesum_for_directory(
110 outfp, d, opts.algorithm, opts.mmap, opts.base64)
111
112
113 def generate_treesum_for_directory(
114 outfp, root, algorithm, use_mmap, use_base64):
115 """
116
117 :param outfp: a *binary* file with a "write()" and a "flush()" method
118
119 """
120 outfp.write(format_bsd_line("ROOT", None, root, False))
121 outfp.flush()
122 dir_digests = {}
123
124 for top, dirs, nondirs in walk.walk(root, follow_symlinks=False):
125 dir_dgst = algorithm[0]()
126 for dn in dirs:
127 if dn.is_symlink:
128 linktgt = util.fsencode(os.readlink(dn.path))
129 linkdgst = algorithm[0]()
130 linkdgst.update(linktgt)
131 dir_dgst.update(b"1:S,%d:%s," % (len(dn.fsname), dn.fsname))
132 dir_dgst.update(linkdgst.digest())
133 if top:
134 opath = "/".join(top) + "/" + dn.name
135 else:
136 opath = dn.name
137 outfp.write(
138 format_bsd_line(
139 algorithm[1],
140 linkdgst.digest(),
141 "%s/./@" % (opath,),
142 use_base64))
143 outfp.flush()
144 else:
145 # fetch from dir_digests
146 dgst = dir_digests[top + (dn.name,)]
147 dir_dgst.update(b"1:d,%d:%s," % (len(dn.fsname), dn.fsname))
148 dir_dgst.update(dgst)
149 for fn in nondirs:
150 dir_dgst.update(b"1:f,%d:%s," % (len(fn.fsname), fn.fsname))
151 dgst = digest.compute_digest_file(
152 algorithm[0], fn.path, use_mmap=use_mmap)
153 dir_dgst.update(dgst)
154 if top:
155 opath = "/".join(top) + "/" + fn.name
156 else:
157 opath = fn.name
158 outfp.write(format_bsd_line(
159 algorithm[1], dgst, opath, use_base64))
160 outfp.flush()
161 outfp.write(format_bsd_line(
162 algorithm[1], dir_dgst.digest(), "/".join(top) + "/", use_base64))
163 outfp.flush()
164 dir_digests[top] = dir_dgst.digest()
165
166
167 def format_bsd_line(digestname, digest, filename, use_base64):
168 ls = os.linesep if isinstance(os.linesep, bytes) \
169 else os.linesep.encode("utf-8")
170 if not isinstance(digestname, bytes):
171 digestname = digestname.encode("ascii")
172 if not isinstance(filename, bytes):
173 filename = util.fsencode(filename)
174 if digest is None:
175 return b"%s (%s)%s" % (digestname, filename, ls)
176 if use_base64:
177 digest = base64.b64encode(digest)
178 else:
179 digest = binascii.hexlify(digest)
180 return b"%s (%s) = %s%s" \
181 % (digestname, util.normalize_filename(filename, True), digest, ls)
182
183
184 if __name__ == "__main__":
185 sys.exit(main())