diff src/_apply_pages.py @ 1:1d09e1dec1d9 upstream

ADD: PyMuPDF v1.26.4: the original sdist. It does not yet contain MuPDF. This normally will be downloaded when building PyMuPDF.
author Franz Glasner <fzglas.hg@dom66.de>
date Mon, 15 Sep 2025 11:37:51 +0200
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/_apply_pages.py	Mon Sep 15 11:37:51 2025 +0200
@@ -0,0 +1,253 @@
+import multiprocessing
+import os
+import time
+
+import pymupdf
+
+
+# Support for concurrent processing of document pages.
+#
+
+class _worker_State:
+    pass
+_worker_state = _worker_State()
+
+
+def _worker_init(
+        path,
+        initfn,
+        initfn_args,
+        initfn_kwargs,
+        pagefn,
+        pagefn_args,
+        pagefn_kwargs,
+        stats,
+        ):
+    # pylint: disable=attribute-defined-outside-init
+    _worker_state.path = path
+    _worker_state.pagefn = pagefn
+    _worker_state.pagefn_args = pagefn_args
+    _worker_state.pagefn_kwargs = pagefn_kwargs
+    _worker_state.stats = stats
+    _worker_state.document = None
+    if initfn:
+        initfn(*initfn_args, **initfn_kwargs)
+
+
+def _stats_write(t, label):
+    t = time.time() - t
+    if t >= 10:
+        pymupdf.log(f'{os.getpid()=}: {t:2f}s: {label}.')
+
+
+def _worker_fn(page_number):
+    # Create Document from filename if we haven't already done so.
+    if not _worker_state.document:
+        if _worker_state.stats:
+            t = time.time()
+        _worker_state.document = pymupdf.Document(_worker_state.path)   # pylint: disable=attribute-defined-outside-init
+        if _worker_state.stats:
+            _stats_write(t, 'pymupdf.Document()')
+    
+    if _worker_state.stats:
+        t = time.time()
+    page = _worker_state.document[page_number]
+    if _worker_state.stats:
+        _stats_write(t, '_worker_state.document[page_number]')
+    
+    if _worker_state.stats:
+        t = time.time()
+    ret = _worker_state.pagefn(
+            page,
+            *_worker_state.pagefn_args,
+            **_worker_state.pagefn_kwargs,
+            )
+    if _worker_state.stats:
+        _stats_write(t, '_worker_state.pagefn()')
+    
+    return ret
+    
+
+def _multiprocessing(
+        path,
+        pages,
+        pagefn,
+        pagefn_args,
+        pagefn_kwargs,
+        initfn,
+        initfn_args,
+        initfn_kwargs,
+        concurrency,
+        stats,
+        ):
+    #print(f'_worker_mp(): {concurrency=}', flush=1)
+    with multiprocessing.Pool(
+            concurrency,
+            _worker_init,
+            (
+                path,
+                initfn, initfn_args, initfn_kwargs,
+                pagefn, pagefn_args, pagefn_kwargs,
+                stats,
+            ),
+            ) as pool:
+        result = pool.map_async(_worker_fn, pages)
+        return result.get()
+    
+
+def _fork(
+        path,
+        pages,
+        pagefn,
+        pagefn_args,
+        pagefn_kwargs,
+        initfn,
+        initfn_args,
+        initfn_kwargs,
+        concurrency,
+        stats,
+        ):
+    verbose = 0
+    if concurrency is None:
+        concurrency = multiprocessing.cpu_count()
+    # We write page numbers to `queue_down` and read `(page_num, text)` from
+    # `queue_up`. Workers each repeatedly read the next available page number
+    # from `queue_down`, extract the text and write it onto `queue_up`.
+    #
+    # This is better than pre-allocating a subset of pages to each worker
+    # because it ensures there will never be idle workers until we are near the
+    # end with fewer pages left than workers.
+    #
+    queue_down = multiprocessing.Queue()
+    queue_up = multiprocessing.Queue()
+    def childfn():
+        document = None
+        if verbose:
+            pymupdf.log(f'{os.getpid()=}: {initfn=} {initfn_args=}')
+        _worker_init(
+                path,
+                initfn,
+                initfn_args,
+                initfn_kwargs,
+                pagefn,
+                pagefn_args,
+                pagefn_kwargs,
+                stats,
+                )
+        while 1:
+            if verbose:
+                pymupdf.log(f'{os.getpid()=}: calling get().')
+            page_num = queue_down.get()
+            if verbose:
+                pymupdf.log(f'{os.getpid()=}: {page_num=}.')
+            if page_num is None:
+                break
+            try:
+                if not document:
+                    if stats:
+                        t = time.time()
+                    document = pymupdf.Document(path)
+                    if stats:
+                        _stats_write(t, 'pymupdf.Document(path)')
+                
+                if stats:
+                    t = time.time()
+                page = document[page_num]
+                if stats:
+                    _stats_write(t, 'document[page_num]')
+                
+                if verbose:
+                    pymupdf.log(f'{os.getpid()=}: {_worker_state=}')
+                
+                if stats:
+                    t = time.time()
+                ret = pagefn(
+                        page,
+                        *_worker_state.pagefn_args,
+                        **_worker_state.pagefn_kwargs,
+                        )
+                if stats:
+                    _stats_write(t, f'{page_num=} pagefn()')
+            except Exception as e:
+                if verbose: pymupdf.log(f'{os.getpid()=}: exception {e=}')
+                ret = e
+            if verbose:
+                pymupdf.log(f'{os.getpid()=}: sending {page_num=} {ret=}')
+                
+            queue_up.put( (page_num, ret) )
+
+    error = None
+
+    pids = list()
+    try:
+        # Start child processes.
+        if stats:
+            t = time.time()
+        for i in range(concurrency):
+            p = os.fork()   # pylint: disable=no-member
+            if p == 0:
+                # Child process.
+                try:
+                    try:
+                        childfn()
+                    except Exception as e:
+                        pymupdf.log(f'{os.getpid()=}: childfn() => {e=}')
+                        raise
+                finally:
+                    if verbose:
+                        pymupdf.log(f'{os.getpid()=}: calling os._exit(0)')
+                    os._exit(0)
+            pids.append(p)
+        if stats:
+            _stats_write(t, 'create child processes')
+
+        # Send page numbers.
+        if stats:
+            t = time.time()
+        if verbose:
+            pymupdf.log(f'Sending page numbers.')
+        for page_num in range(len(pages)):
+            queue_down.put(page_num)
+        if stats:
+            _stats_write(t, 'Send page numbers')
+
+        # Collect results. We give up if any worker sends an exception instead
+        # of text, but this hasn't been tested.
+        ret = [None] * len(pages)
+        for i in range(len(pages)):
+            page_num, text = queue_up.get()
+            if verbose:
+                pymupdf.log(f'{page_num=} {len(text)=}')
+            assert ret[page_num] is None
+            if isinstance(text, Exception):
+                if not error:
+                    error = text
+                break
+            ret[page_num] = text
+
+        # Close queue. This should cause exception in workers and terminate
+        # them, but on macos-arm64 this does not seem to happen, so we also
+        # send None, which makes workers terminate.
+        for i in range(concurrency):
+            queue_down.put(None)
+        if verbose: pymupdf.log(f'Closing queues.')
+        queue_down.close()
+
+        if error:
+            raise error
+        if verbose:
+            pymupdf.log(f'After concurrent, returning {len(ret)=}')
+        return ret
+        
+    finally:
+        # Join all child processes.
+        if stats:
+            t = time.time()
+        for pid in pids:
+            if verbose:
+                pymupdf.log(f'waiting for {pid=}.')
+            e = os.waitpid(pid, 0)
+            if verbose:
+                pymupdf.log(f'{pid=} => {e=}')
+        if stats:
+            _stats_write(t, 'Join all child proceses')