Mercurial > hgrepos > Python > apps > py-cutils
view cutils/util/digest.py @ 177:089c40240061
Add an alternate implementation for generating directory tree digests:
- Do not use something like os.walk() but use os.scandir() directly.
- Recursively generate the subdirectory digests only when needed and in
the right order.
This fixes that the order of subdirectories in the output did not
match the application order of its directory digests.
The new implementation also should make filtering (that will be
implemented later) easier.
NOTE: The tree digests of the old and the new implementation are identical.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Sat, 11 Jan 2025 17:41:28 +0100 |
| parents | a813094ae4f5 |
| children | 0f4febf646f5 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2020-2025 Franz Glasner # :License: BSD-3-Clause # :- r"""Utility sub-module to implement a file and stream digest computations. """ __all__ = ["compute_digest_file", "compute_digest_stream"] import errno import io import os try: import mmap except ImportError: mmap = None import stat from . import PY2 from . import constants def compute_digest_file(hashobj, path, use_mmap=None): """Compute the digest for a file with a filename of an open fd. :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory :param path: filename within the filesystem or a file descriptor opened in binary mode (also a socket or pipe) :param use_mmap: Use the :mod:`mmap` module if available. If `None` determine automatically. :type use_mmap: bool or None :return: the digest in binary form :rtype: bytes If a file descriptor is given is must support :func:`os.read`. """ h = hashobj() if isinstance(path, constants.PATH_TYPES): flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) \ | getattr(os, "O_SEQUENTIAL", 0) | getattr(os, "O_NOCTTY", 0) fd = os.open(path, flags) own_fd = True else: fd = path own_fd = False try: try: st = os.fstat(fd) except TypeError: # # "fd" is most probably a Python socket object. # (a pipe typically supports fstat) # use_mmap = False else: if stat.S_ISREG(st[stat.ST_MODE]): filesize = st[stat.ST_SIZE] if (use_mmap is None) \ and (filesize > constants.MAX_AUTO_MAP_SIZE): # # This is borrowed from FreeBSD's cp(1) implementation: # Mmap and process if less than 8M (the limit is # so we don't totally trash memory on big files. # This is really a minor hack, but it wins some # CPU back. Some filesystems, such as smbnetfs, # don't support mmap, so this is a best-effort # attempt. # use_mmap = False else: use_mmap = False if use_mmap is None: use_mmap = True if mmap is None or not use_mmap: # No mmap available or wanted -> use traditional low-level file IO fadvise = getattr(os, "posix_fadvise", None) if fadvise: fadvise(fd, 0, 0, os.POSIX_FADV_SEQUENTIAL) if not PY2: fileobj = io.FileIO(fd, mode="r", closefd=False) buf = bytearray(constants.READ_CHUNK_SIZE) with memoryview(buf) as full_view: while True: try: n = fileobj.readinto(buf) except OSError as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR): raise else: if n == 0: break if n == constants.READ_CHUNK_SIZE: h.update(buf) else: with full_view[:n] as partial_view: h.update(partial_view) else: while True: try: buf = os.read(fd, constants.READ_CHUNK_SIZE) except OSError as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR): raise else: if len(buf) == 0: break h.update(buf) else: # # Use mmap # # NOTE: On Windows mmapped files with length 0 are not supported. # So ensure to not call mmap.mmap() if the file size is 0. # madvise = getattr(mmap.mmap, "madvise", None) if filesize <= constants.MAP_WINDOW_SIZE: mapsize = filesize else: mapsize = constants.MAP_WINDOW_SIZE mapoffset = 0 rest = filesize while rest > 0: m = mmap.mmap(fd, mapsize, access=mmap.ACCESS_READ, offset=mapoffset) if madvise: madvise(m, mmap.MADV_SEQUENTIAL) try: h.update(m) finally: m.close() rest -= mapsize mapoffset += mapsize if rest < mapsize: mapsize = rest finally: if own_fd: os.close(fd) return h.digest() def compute_digest_stream(hashobj, instream): """Compute the digest for a given byte string `instream`. :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory :param instream: a bytes input stream to read the data to be hashed from :return: the digest in binary form :rtype: bytes """ h = hashobj() while True: try: buf = instream.read(constants.READ_CHUNK_SIZE) except OSError as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR): raise else: if buf is not None: if len(buf) == 0: break h.update(buf) return h.digest()
