comparison src/_apply_pages.py @ 1:1d09e1dec1d9 upstream

ADD: PyMuPDF v1.26.4: the original sdist. It does not yet contain MuPDF. This normally will be downloaded when building PyMuPDF.
author Franz Glasner <fzglas.hg@dom66.de>
date Mon, 15 Sep 2025 11:37:51 +0200
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 1:1d09e1dec1d9
1 import multiprocessing
2 import os
3 import time
4
5 import pymupdf
6
7
8 # Support for concurrent processing of document pages.
9 #
10
11 class _worker_State:
12 pass
13 _worker_state = _worker_State()
14
15
16 def _worker_init(
17 path,
18 initfn,
19 initfn_args,
20 initfn_kwargs,
21 pagefn,
22 pagefn_args,
23 pagefn_kwargs,
24 stats,
25 ):
26 # pylint: disable=attribute-defined-outside-init
27 _worker_state.path = path
28 _worker_state.pagefn = pagefn
29 _worker_state.pagefn_args = pagefn_args
30 _worker_state.pagefn_kwargs = pagefn_kwargs
31 _worker_state.stats = stats
32 _worker_state.document = None
33 if initfn:
34 initfn(*initfn_args, **initfn_kwargs)
35
36
37 def _stats_write(t, label):
38 t = time.time() - t
39 if t >= 10:
40 pymupdf.log(f'{os.getpid()=}: {t:2f}s: {label}.')
41
42
43 def _worker_fn(page_number):
44 # Create Document from filename if we haven't already done so.
45 if not _worker_state.document:
46 if _worker_state.stats:
47 t = time.time()
48 _worker_state.document = pymupdf.Document(_worker_state.path) # pylint: disable=attribute-defined-outside-init
49 if _worker_state.stats:
50 _stats_write(t, 'pymupdf.Document()')
51
52 if _worker_state.stats:
53 t = time.time()
54 page = _worker_state.document[page_number]
55 if _worker_state.stats:
56 _stats_write(t, '_worker_state.document[page_number]')
57
58 if _worker_state.stats:
59 t = time.time()
60 ret = _worker_state.pagefn(
61 page,
62 *_worker_state.pagefn_args,
63 **_worker_state.pagefn_kwargs,
64 )
65 if _worker_state.stats:
66 _stats_write(t, '_worker_state.pagefn()')
67
68 return ret
69
70
71 def _multiprocessing(
72 path,
73 pages,
74 pagefn,
75 pagefn_args,
76 pagefn_kwargs,
77 initfn,
78 initfn_args,
79 initfn_kwargs,
80 concurrency,
81 stats,
82 ):
83 #print(f'_worker_mp(): {concurrency=}', flush=1)
84 with multiprocessing.Pool(
85 concurrency,
86 _worker_init,
87 (
88 path,
89 initfn, initfn_args, initfn_kwargs,
90 pagefn, pagefn_args, pagefn_kwargs,
91 stats,
92 ),
93 ) as pool:
94 result = pool.map_async(_worker_fn, pages)
95 return result.get()
96
97
98 def _fork(
99 path,
100 pages,
101 pagefn,
102 pagefn_args,
103 pagefn_kwargs,
104 initfn,
105 initfn_args,
106 initfn_kwargs,
107 concurrency,
108 stats,
109 ):
110 verbose = 0
111 if concurrency is None:
112 concurrency = multiprocessing.cpu_count()
113 # We write page numbers to `queue_down` and read `(page_num, text)` from
114 # `queue_up`. Workers each repeatedly read the next available page number
115 # from `queue_down`, extract the text and write it onto `queue_up`.
116 #
117 # This is better than pre-allocating a subset of pages to each worker
118 # because it ensures there will never be idle workers until we are near the
119 # end with fewer pages left than workers.
120 #
121 queue_down = multiprocessing.Queue()
122 queue_up = multiprocessing.Queue()
123 def childfn():
124 document = None
125 if verbose:
126 pymupdf.log(f'{os.getpid()=}: {initfn=} {initfn_args=}')
127 _worker_init(
128 path,
129 initfn,
130 initfn_args,
131 initfn_kwargs,
132 pagefn,
133 pagefn_args,
134 pagefn_kwargs,
135 stats,
136 )
137 while 1:
138 if verbose:
139 pymupdf.log(f'{os.getpid()=}: calling get().')
140 page_num = queue_down.get()
141 if verbose:
142 pymupdf.log(f'{os.getpid()=}: {page_num=}.')
143 if page_num is None:
144 break
145 try:
146 if not document:
147 if stats:
148 t = time.time()
149 document = pymupdf.Document(path)
150 if stats:
151 _stats_write(t, 'pymupdf.Document(path)')
152
153 if stats:
154 t = time.time()
155 page = document[page_num]
156 if stats:
157 _stats_write(t, 'document[page_num]')
158
159 if verbose:
160 pymupdf.log(f'{os.getpid()=}: {_worker_state=}')
161
162 if stats:
163 t = time.time()
164 ret = pagefn(
165 page,
166 *_worker_state.pagefn_args,
167 **_worker_state.pagefn_kwargs,
168 )
169 if stats:
170 _stats_write(t, f'{page_num=} pagefn()')
171 except Exception as e:
172 if verbose: pymupdf.log(f'{os.getpid()=}: exception {e=}')
173 ret = e
174 if verbose:
175 pymupdf.log(f'{os.getpid()=}: sending {page_num=} {ret=}')
176
177 queue_up.put( (page_num, ret) )
178
179 error = None
180
181 pids = list()
182 try:
183 # Start child processes.
184 if stats:
185 t = time.time()
186 for i in range(concurrency):
187 p = os.fork() # pylint: disable=no-member
188 if p == 0:
189 # Child process.
190 try:
191 try:
192 childfn()
193 except Exception as e:
194 pymupdf.log(f'{os.getpid()=}: childfn() => {e=}')
195 raise
196 finally:
197 if verbose:
198 pymupdf.log(f'{os.getpid()=}: calling os._exit(0)')
199 os._exit(0)
200 pids.append(p)
201 if stats:
202 _stats_write(t, 'create child processes')
203
204 # Send page numbers.
205 if stats:
206 t = time.time()
207 if verbose:
208 pymupdf.log(f'Sending page numbers.')
209 for page_num in range(len(pages)):
210 queue_down.put(page_num)
211 if stats:
212 _stats_write(t, 'Send page numbers')
213
214 # Collect results. We give up if any worker sends an exception instead
215 # of text, but this hasn't been tested.
216 ret = [None] * len(pages)
217 for i in range(len(pages)):
218 page_num, text = queue_up.get()
219 if verbose:
220 pymupdf.log(f'{page_num=} {len(text)=}')
221 assert ret[page_num] is None
222 if isinstance(text, Exception):
223 if not error:
224 error = text
225 break
226 ret[page_num] = text
227
228 # Close queue. This should cause exception in workers and terminate
229 # them, but on macos-arm64 this does not seem to happen, so we also
230 # send None, which makes workers terminate.
231 for i in range(concurrency):
232 queue_down.put(None)
233 if verbose: pymupdf.log(f'Closing queues.')
234 queue_down.close()
235
236 if error:
237 raise error
238 if verbose:
239 pymupdf.log(f'After concurrent, returning {len(ret)=}')
240 return ret
241
242 finally:
243 # Join all child processes.
244 if stats:
245 t = time.time()
246 for pid in pids:
247 if verbose:
248 pymupdf.log(f'waiting for {pid=}.')
249 e = os.waitpid(pid, 0)
250 if verbose:
251 pymupdf.log(f'{pid=} => {e=}')
252 if stats:
253 _stats_write(t, 'Join all child proceses')