Mercurial > hgrepos > Python > apps > py-cutils
view cutils/util/digest.py @ 378:32b937a73068
treesum: Rename PATTERN0 to PATTERNE
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Mon, 12 May 2025 09:48:23 +0200 |
| parents | 48430941c18c |
| children |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # SPDX-FileCopyrightText: © 2020-2025 Franz Glasner # SPDX-License-Identifier: BSD-3-Clause # :- r"""Utility sub-module to implement a file and stream digest computations. """ from __future__ import print_function, absolute_import __all__ = ["compute_digest_file", "compute_digest_stream"] import errno import io import os try: import mmap except ImportError: mmap = None import stat from . import PY2 from . import constants def compute_digest_file(hashobj, path, use_mmap=None): """Compute the digest for a file with a filename of an open fd. :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory :param path: filename within the filesystem or a file descriptor opened in binary mode (also a socket or pipe) :param use_mmap: Use the :mod:`mmap` module if available. If `None` determine automatically. :type use_mmap: bool or None :return: the digest in binary form :rtype: bytes If a file descriptor is given is must support :func:`os.read`. """ h = hashobj() if isinstance(path, constants.PATH_TYPES): flags = os.O_RDONLY | getattr(os, "O_BINARY", 0) \ | getattr(os, "O_SEQUENTIAL", 0) | getattr(os, "O_NOCTTY", 0) fd = os.open(path, flags) own_fd = True else: fd = path own_fd = False try: try: st = os.fstat(fd) except TypeError: # # "fd" is most probably a Python socket object. # (a pipe typically supports fstat) # use_mmap = False else: if stat.S_ISREG(st[stat.ST_MODE]): filesize = st[stat.ST_SIZE] if (use_mmap is None) \ and (filesize > constants.MAX_AUTO_MAP_SIZE): # # This is borrowed from FreeBSD's cp(1) implementation: # Mmap and process if less than 8M (the limit is # so we don't totally trash memory on big files. # This is really a minor hack, but it wins some # CPU back. Some filesystems, such as smbnetfs, # don't support mmap, so this is a best-effort # attempt. # use_mmap = False else: use_mmap = False if use_mmap is None: use_mmap = True if mmap is None or not use_mmap: # No mmap available or wanted -> use traditional low-level file IO fadvise = getattr(os, "posix_fadvise", None) if fadvise: fadvise(fd, 0, 0, os.POSIX_FADV_SEQUENTIAL) if not PY2: fileobj = io.FileIO(fd, mode="r", closefd=False) buf = bytearray(constants.READ_CHUNK_SIZE) with memoryview(buf) as full_view: while True: try: n = fileobj.readinto(buf) except OSError as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR): raise else: if n == 0: break if n == constants.READ_CHUNK_SIZE: h.update(buf) else: with full_view[:n] as partial_view: h.update(partial_view) else: while True: try: buf = os.read(fd, constants.READ_CHUNK_SIZE) except OSError as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR): raise else: if len(buf) == 0: break h.update(buf) else: # # Use mmap # # NOTE: On Windows mmapped files with length 0 are not supported. # So ensure to not call mmap.mmap() if the file size is 0. # madvise = getattr(mmap.mmap, "madvise", None) if filesize <= constants.MAP_WINDOW_SIZE: mapsize = filesize else: mapsize = constants.MAP_WINDOW_SIZE mapoffset = 0 rest = filesize while rest > 0: m = mmap.mmap(fd, mapsize, access=mmap.ACCESS_READ, offset=mapoffset) if madvise: madvise(m, mmap.MADV_SEQUENTIAL) try: h.update(m) finally: m.close() rest -= mapsize mapoffset += mapsize if rest < mapsize: mapsize = rest finally: if own_fd: os.close(fd) return h.digest() def compute_digest_stream(hashobj, instream): """Compute the digest for a given byte string `instream`. :param hashobj: a :mod:`hashlib` compatible hash algorithm type or factory :param instream: a bytes input stream to read the data to be hashed from :return: the digest in binary form :rtype: bytes """ h = hashobj() while True: try: buf = instream.read(constants.READ_CHUNK_SIZE) except OSError as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR): raise else: if buf is not None: if len(buf) == 0: break h.update(buf) return h.digest()
