Mercurial > hgrepos > Python2 > PyMuPDF
comparison src/_apply_pages.py @ 1:1d09e1dec1d9 upstream
ADD: PyMuPDF v1.26.4: the original sdist.
It does not yet contain MuPDF. This normally will be downloaded when
building PyMuPDF.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Mon, 15 Sep 2025 11:37:51 +0200 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 1:1d09e1dec1d9 |
|---|---|
| 1 import multiprocessing | |
| 2 import os | |
| 3 import time | |
| 4 | |
| 5 import pymupdf | |
| 6 | |
| 7 | |
| 8 # Support for concurrent processing of document pages. | |
| 9 # | |
| 10 | |
| 11 class _worker_State: | |
| 12 pass | |
| 13 _worker_state = _worker_State() | |
| 14 | |
| 15 | |
| 16 def _worker_init( | |
| 17 path, | |
| 18 initfn, | |
| 19 initfn_args, | |
| 20 initfn_kwargs, | |
| 21 pagefn, | |
| 22 pagefn_args, | |
| 23 pagefn_kwargs, | |
| 24 stats, | |
| 25 ): | |
| 26 # pylint: disable=attribute-defined-outside-init | |
| 27 _worker_state.path = path | |
| 28 _worker_state.pagefn = pagefn | |
| 29 _worker_state.pagefn_args = pagefn_args | |
| 30 _worker_state.pagefn_kwargs = pagefn_kwargs | |
| 31 _worker_state.stats = stats | |
| 32 _worker_state.document = None | |
| 33 if initfn: | |
| 34 initfn(*initfn_args, **initfn_kwargs) | |
| 35 | |
| 36 | |
| 37 def _stats_write(t, label): | |
| 38 t = time.time() - t | |
| 39 if t >= 10: | |
| 40 pymupdf.log(f'{os.getpid()=}: {t:2f}s: {label}.') | |
| 41 | |
| 42 | |
| 43 def _worker_fn(page_number): | |
| 44 # Create Document from filename if we haven't already done so. | |
| 45 if not _worker_state.document: | |
| 46 if _worker_state.stats: | |
| 47 t = time.time() | |
| 48 _worker_state.document = pymupdf.Document(_worker_state.path) # pylint: disable=attribute-defined-outside-init | |
| 49 if _worker_state.stats: | |
| 50 _stats_write(t, 'pymupdf.Document()') | |
| 51 | |
| 52 if _worker_state.stats: | |
| 53 t = time.time() | |
| 54 page = _worker_state.document[page_number] | |
| 55 if _worker_state.stats: | |
| 56 _stats_write(t, '_worker_state.document[page_number]') | |
| 57 | |
| 58 if _worker_state.stats: | |
| 59 t = time.time() | |
| 60 ret = _worker_state.pagefn( | |
| 61 page, | |
| 62 *_worker_state.pagefn_args, | |
| 63 **_worker_state.pagefn_kwargs, | |
| 64 ) | |
| 65 if _worker_state.stats: | |
| 66 _stats_write(t, '_worker_state.pagefn()') | |
| 67 | |
| 68 return ret | |
| 69 | |
| 70 | |
| 71 def _multiprocessing( | |
| 72 path, | |
| 73 pages, | |
| 74 pagefn, | |
| 75 pagefn_args, | |
| 76 pagefn_kwargs, | |
| 77 initfn, | |
| 78 initfn_args, | |
| 79 initfn_kwargs, | |
| 80 concurrency, | |
| 81 stats, | |
| 82 ): | |
| 83 #print(f'_worker_mp(): {concurrency=}', flush=1) | |
| 84 with multiprocessing.Pool( | |
| 85 concurrency, | |
| 86 _worker_init, | |
| 87 ( | |
| 88 path, | |
| 89 initfn, initfn_args, initfn_kwargs, | |
| 90 pagefn, pagefn_args, pagefn_kwargs, | |
| 91 stats, | |
| 92 ), | |
| 93 ) as pool: | |
| 94 result = pool.map_async(_worker_fn, pages) | |
| 95 return result.get() | |
| 96 | |
| 97 | |
| 98 def _fork( | |
| 99 path, | |
| 100 pages, | |
| 101 pagefn, | |
| 102 pagefn_args, | |
| 103 pagefn_kwargs, | |
| 104 initfn, | |
| 105 initfn_args, | |
| 106 initfn_kwargs, | |
| 107 concurrency, | |
| 108 stats, | |
| 109 ): | |
| 110 verbose = 0 | |
| 111 if concurrency is None: | |
| 112 concurrency = multiprocessing.cpu_count() | |
| 113 # We write page numbers to `queue_down` and read `(page_num, text)` from | |
| 114 # `queue_up`. Workers each repeatedly read the next available page number | |
| 115 # from `queue_down`, extract the text and write it onto `queue_up`. | |
| 116 # | |
| 117 # This is better than pre-allocating a subset of pages to each worker | |
| 118 # because it ensures there will never be idle workers until we are near the | |
| 119 # end with fewer pages left than workers. | |
| 120 # | |
| 121 queue_down = multiprocessing.Queue() | |
| 122 queue_up = multiprocessing.Queue() | |
| 123 def childfn(): | |
| 124 document = None | |
| 125 if verbose: | |
| 126 pymupdf.log(f'{os.getpid()=}: {initfn=} {initfn_args=}') | |
| 127 _worker_init( | |
| 128 path, | |
| 129 initfn, | |
| 130 initfn_args, | |
| 131 initfn_kwargs, | |
| 132 pagefn, | |
| 133 pagefn_args, | |
| 134 pagefn_kwargs, | |
| 135 stats, | |
| 136 ) | |
| 137 while 1: | |
| 138 if verbose: | |
| 139 pymupdf.log(f'{os.getpid()=}: calling get().') | |
| 140 page_num = queue_down.get() | |
| 141 if verbose: | |
| 142 pymupdf.log(f'{os.getpid()=}: {page_num=}.') | |
| 143 if page_num is None: | |
| 144 break | |
| 145 try: | |
| 146 if not document: | |
| 147 if stats: | |
| 148 t = time.time() | |
| 149 document = pymupdf.Document(path) | |
| 150 if stats: | |
| 151 _stats_write(t, 'pymupdf.Document(path)') | |
| 152 | |
| 153 if stats: | |
| 154 t = time.time() | |
| 155 page = document[page_num] | |
| 156 if stats: | |
| 157 _stats_write(t, 'document[page_num]') | |
| 158 | |
| 159 if verbose: | |
| 160 pymupdf.log(f'{os.getpid()=}: {_worker_state=}') | |
| 161 | |
| 162 if stats: | |
| 163 t = time.time() | |
| 164 ret = pagefn( | |
| 165 page, | |
| 166 *_worker_state.pagefn_args, | |
| 167 **_worker_state.pagefn_kwargs, | |
| 168 ) | |
| 169 if stats: | |
| 170 _stats_write(t, f'{page_num=} pagefn()') | |
| 171 except Exception as e: | |
| 172 if verbose: pymupdf.log(f'{os.getpid()=}: exception {e=}') | |
| 173 ret = e | |
| 174 if verbose: | |
| 175 pymupdf.log(f'{os.getpid()=}: sending {page_num=} {ret=}') | |
| 176 | |
| 177 queue_up.put( (page_num, ret) ) | |
| 178 | |
| 179 error = None | |
| 180 | |
| 181 pids = list() | |
| 182 try: | |
| 183 # Start child processes. | |
| 184 if stats: | |
| 185 t = time.time() | |
| 186 for i in range(concurrency): | |
| 187 p = os.fork() # pylint: disable=no-member | |
| 188 if p == 0: | |
| 189 # Child process. | |
| 190 try: | |
| 191 try: | |
| 192 childfn() | |
| 193 except Exception as e: | |
| 194 pymupdf.log(f'{os.getpid()=}: childfn() => {e=}') | |
| 195 raise | |
| 196 finally: | |
| 197 if verbose: | |
| 198 pymupdf.log(f'{os.getpid()=}: calling os._exit(0)') | |
| 199 os._exit(0) | |
| 200 pids.append(p) | |
| 201 if stats: | |
| 202 _stats_write(t, 'create child processes') | |
| 203 | |
| 204 # Send page numbers. | |
| 205 if stats: | |
| 206 t = time.time() | |
| 207 if verbose: | |
| 208 pymupdf.log(f'Sending page numbers.') | |
| 209 for page_num in range(len(pages)): | |
| 210 queue_down.put(page_num) | |
| 211 if stats: | |
| 212 _stats_write(t, 'Send page numbers') | |
| 213 | |
| 214 # Collect results. We give up if any worker sends an exception instead | |
| 215 # of text, but this hasn't been tested. | |
| 216 ret = [None] * len(pages) | |
| 217 for i in range(len(pages)): | |
| 218 page_num, text = queue_up.get() | |
| 219 if verbose: | |
| 220 pymupdf.log(f'{page_num=} {len(text)=}') | |
| 221 assert ret[page_num] is None | |
| 222 if isinstance(text, Exception): | |
| 223 if not error: | |
| 224 error = text | |
| 225 break | |
| 226 ret[page_num] = text | |
| 227 | |
| 228 # Close queue. This should cause exception in workers and terminate | |
| 229 # them, but on macos-arm64 this does not seem to happen, so we also | |
| 230 # send None, which makes workers terminate. | |
| 231 for i in range(concurrency): | |
| 232 queue_down.put(None) | |
| 233 if verbose: pymupdf.log(f'Closing queues.') | |
| 234 queue_down.close() | |
| 235 | |
| 236 if error: | |
| 237 raise error | |
| 238 if verbose: | |
| 239 pymupdf.log(f'After concurrent, returning {len(ret)=}') | |
| 240 return ret | |
| 241 | |
| 242 finally: | |
| 243 # Join all child processes. | |
| 244 if stats: | |
| 245 t = time.time() | |
| 246 for pid in pids: | |
| 247 if verbose: | |
| 248 pymupdf.log(f'waiting for {pid=}.') | |
| 249 e = os.waitpid(pid, 0) | |
| 250 if verbose: | |
| 251 pymupdf.log(f'{pid=} => {e=}') | |
| 252 if stats: | |
| 253 _stats_write(t, 'Join all child proceses') |
