Skip to content

Commit cb9466b

Browse files
Use _interpreters.call().
1 parent 37c41f8 commit cb9466b

File tree

3 files changed

+112
-90
lines changed

3 files changed

+112
-90
lines changed

Lib/concurrent/futures/interpreter.py

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
4545
# XXX Circle back to this later.
4646
raise TypeError('scripts not supported')
4747
else:
48-
# Functions defined in the __main__ module can't be pickled,
49-
# so they can't be used here. In the future, we could possibly
50-
# borrow from multiprocessing to work around this.
5148
task = (fn, args, kwargs)
52-
data = pickle.dumps(task)
53-
return data
49+
return task
5450

5551
if initializer is not None:
5652
try:
@@ -65,35 +61,6 @@ def create_context():
6561
return cls(initdata, shared)
6662
return create_context, resolve_task
6763

68-
@classmethod
69-
@contextlib.contextmanager
70-
def _capture_exc(cls, resultsid):
71-
try:
72-
yield
73-
except BaseException as exc:
74-
# Send the captured exception out on the results queue,
75-
# but still leave it unhandled for the interpreter to handle.
76-
_interpqueues.put(resultsid, (None, exc))
77-
raise # re-raise
78-
79-
@classmethod
80-
def _send_script_result(cls, resultsid):
81-
_interpqueues.put(resultsid, (None, None))
82-
83-
@classmethod
84-
def _call(cls, func, args, kwargs, resultsid):
85-
with cls._capture_exc(resultsid):
86-
res = func(*args or (), **kwargs or {})
87-
# Send the result back.
88-
with cls._capture_exc(resultsid):
89-
_interpqueues.put(resultsid, (res, None))
90-
91-
@classmethod
92-
def _call_pickled(cls, pickled, resultsid):
93-
with cls._capture_exc(resultsid):
94-
fn, args, kwargs = pickle.loads(pickled)
95-
cls._call(fn, args, kwargs, resultsid)
96-
9764
def __init__(self, initdata, shared=None):
9865
self.initdata = initdata
9966
self.shared = dict(shared) if shared else None
@@ -104,11 +71,56 @@ def __del__(self):
10471
if self.interpid is not None:
10572
self.finalize()
10673

107-
def _exec(self, script):
108-
assert self.interpid is not None
109-
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
74+
def _call(self, fn, args, kwargs):
75+
def do_call(resultsid, func, *args, **kwargs):
76+
try:
77+
return func(*args, **kwargs)
78+
except BaseException as exc:
79+
# Avoid relying on globals.
80+
import _interpreters
81+
import _interpqueues
82+
# Send the captured exception out on the results queue,
83+
# but still leave it unhandled for the interpreter to handle.
84+
try:
85+
_interpqueues.put(resultsid, exc)
86+
except _interpreters.NotShareableError:
87+
# The exception is not shareable.
88+
import sys
89+
import traceback
90+
print('exception is not shareable:', file=sys.stderr)
91+
traceback.print_exception(exc)
92+
_interpqueues.put(resultsid, None)
93+
raise # re-raise
94+
95+
args = (self.resultsid, fn, *args)
96+
res, excinfo = _interpreters.call(self.interpid, do_call, args, kwargs)
11097
if excinfo is not None:
11198
raise ExecutionFailed(excinfo)
99+
return res
100+
101+
def _get_exception(self):
102+
# Wait for the exception data to show up.
103+
while True:
104+
try:
105+
excdata = _interpqueues.get(self.resultsid)
106+
except _interpqueues.QueueNotFoundError:
107+
raise # re-raise
108+
except _interpqueues.QueueError as exc:
109+
if exc.__cause__ is not None or exc.__context__ is not None:
110+
raise # re-raise
111+
if str(exc).endswith(' is empty'):
112+
continue
113+
else:
114+
raise # re-raise
115+
except ModuleNotFoundError:
116+
# interpreters.queues doesn't exist, which means
117+
# QueueEmpty doesn't. Act as though it does.
118+
continue
119+
else:
120+
break
121+
exc, unboundop = excdata
122+
assert unboundop is None, unboundop
123+
return exc
112124

113125
def initialize(self):
114126
assert self.interpid is None, self.interpid
@@ -119,8 +131,6 @@ def initialize(self):
119131
maxsize = 0
120132
self.resultsid = _interpqueues.create(maxsize)
121133

122-
self._exec(f'from {__name__} import WorkerContext')
123-
124134
if self.shared:
125135
_interpreters.set___main___attrs(
126136
self.interpid, self.shared, restrict=True)
@@ -148,37 +158,15 @@ def finalize(self):
148158
pass
149159

150160
def run(self, task):
151-
data = task
152-
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
153-
161+
fn, args, kwargs = task
154162
try:
155-
self._exec(script)
156-
except ExecutionFailed as exc:
157-
exc_wrapper = exc
158-
else:
159-
exc_wrapper = None
160-
161-
# Return the result, or raise the exception.
162-
while True:
163-
try:
164-
obj = _interpqueues.get(self.resultsid)
165-
except _interpqueues.QueueNotFoundError:
163+
return self._call(fn, args, kwargs)
164+
except ExecutionFailed as wrapper:
165+
exc = self._get_exception()
166+
if exc is None:
167+
# The exception must have been not shareable.
166168
raise # re-raise
167-
except _interpqueues.QueueError:
168-
continue
169-
except ModuleNotFoundError:
170-
# interpreters.queues doesn't exist, which means
171-
# QueueEmpty doesn't. Act as though it does.
172-
continue
173-
else:
174-
break
175-
(res, exc), unboundop = obj
176-
assert unboundop is None, unboundop
177-
if exc is not None:
178-
assert res is None, res
179-
assert exc_wrapper is not None
180-
raise exc from exc_wrapper
181-
return res
169+
raise exc from wrapper
182170

183171

184172
class BrokenInterpreterPool(_thread.BrokenThreadPool):

Lib/test/test_concurrent_futures/test_init.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
def init(x):
2121
global INITIALIZER_STATUS
2222
INITIALIZER_STATUS = x
23+
# InterpreterPoolInitializerTest.test_initializer fails
24+
# if we don't have a LOAD_GLOBAL. (It could be any global.)
25+
# We will address this separately.
26+
INITIALIZER_STATUS
2327

2428
def get_init_status():
2529
return INITIALIZER_STATUS

Lib/test/test_concurrent_futures/test_interpreter_pool.py

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import contextlib
33
import io
44
import os
5-
import pickle
5+
import select
66
import time
77
import unittest
88
from concurrent.futures.interpreter import (
@@ -22,10 +22,14 @@ def noop():
2222

2323

2424
def write_msg(fd, msg):
25+
import os
2526
os.write(fd, msg + b'\0')
2627

2728

28-
def read_msg(fd):
29+
def read_msg(fd, timeout=10.0):
30+
r, _, _ = select.select([fd], [], [], timeout)
31+
if fd not in r:
32+
raise TimeoutError('nothing to read')
2933
msg = b''
3034
while ch := os.read(fd, 1):
3135
if ch == b'\0':
@@ -121,19 +125,32 @@ def init2():
121125
nonlocal count
122126
count += 1
123127

124-
with self.assertRaises(pickle.PicklingError):
125-
self.executor_type(initializer=init1)
126-
with self.assertRaises(pickle.PicklingError):
127-
self.executor_type(initializer=init2)
128+
with contextlib.redirect_stderr(io.StringIO()) as stderr:
129+
with self.executor_type(initializer=init1) as executor:
130+
fut = executor.submit(lambda: None)
131+
self.assertIn('NotShareableError', stderr.getvalue())
132+
with self.assertRaises(BrokenInterpreterPool):
133+
fut.result()
134+
135+
with contextlib.redirect_stderr(io.StringIO()) as stderr:
136+
with self.executor_type(initializer=init2) as executor:
137+
fut = executor.submit(lambda: None)
138+
self.assertIn('NotShareableError', stderr.getvalue())
139+
with self.assertRaises(BrokenInterpreterPool):
140+
fut.result()
128141

129142
def test_init_instance_method(self):
130143
class Spam:
131144
def initializer(self):
132145
raise NotImplementedError
133146
spam = Spam()
134147

135-
with self.assertRaises(pickle.PicklingError):
136-
self.executor_type(initializer=spam.initializer)
148+
with contextlib.redirect_stderr(io.StringIO()) as stderr:
149+
with self.executor_type(initializer=spam.initializer) as executor:
150+
fut = executor.submit(lambda: None)
151+
self.assertIn('NotShareableError', stderr.getvalue())
152+
with self.assertRaises(BrokenInterpreterPool):
153+
fut.result()
137154

138155
def test_init_shared(self):
139156
msg = b'eggs'
@@ -178,8 +195,6 @@ def test_init_exception_in_func(self):
178195
stderr = stderr.getvalue()
179196
self.assertIn('ExecutionFailed: Exception: spam', stderr)
180197
self.assertIn('Uncaught in the interpreter:', stderr)
181-
self.assertIn('The above exception was the direct cause of the following exception:',
182-
stderr)
183198

184199
@unittest.expectedFailure
185200
def test_submit_script(self):
@@ -208,19 +223,24 @@ def task2():
208223
return spam
209224

210225
executor = self.executor_type()
211-
with self.assertRaises(pickle.PicklingError):
212-
executor.submit(task1)
213-
with self.assertRaises(pickle.PicklingError):
214-
executor.submit(task2)
226+
227+
fut = executor.submit(task1)
228+
with self.assertRaises(_interpreters.NotShareableError):
229+
fut.result()
230+
231+
fut = executor.submit(task2)
232+
with self.assertRaises(_interpreters.NotShareableError):
233+
fut.result()
215234

216235
def test_submit_local_instance(self):
217236
class Spam:
218237
def __init__(self):
219238
self.value = True
220239

221240
executor = self.executor_type()
222-
with self.assertRaises(pickle.PicklingError):
223-
executor.submit(Spam)
241+
fut = executor.submit(Spam)
242+
with self.assertRaises(_interpreters.NotShareableError):
243+
fut.result()
224244

225245
def test_submit_instance_method(self):
226246
class Spam:
@@ -229,8 +249,9 @@ def run(self):
229249
spam = Spam()
230250

231251
executor = self.executor_type()
232-
with self.assertRaises(pickle.PicklingError):
233-
executor.submit(spam.run)
252+
fut = executor.submit(spam.run)
253+
with self.assertRaises(_interpreters.NotShareableError):
254+
fut.result()
234255

235256
def test_submit_func_globals(self):
236257
executor = self.executor_type()
@@ -242,6 +263,7 @@ def test_submit_func_globals(self):
242263

243264
@unittest.expectedFailure
244265
def test_submit_exception_in_script(self):
266+
# Scripts are not supported currently.
245267
fut = self.executor.submit('raise Exception("spam")')
246268
with self.assertRaises(Exception) as captured:
247269
fut.result()
@@ -289,13 +311,21 @@ def test_idle_thread_reuse(self):
289311
executor.shutdown(wait=True)
290312

291313
def test_pickle_errors_propagate(self):
292-
# GH-125864: Pickle errors happen before the script tries to execute, so the
293-
# queue used to wait infinitely.
294-
314+
# GH-125864: Pickle errors happen before the script tries to execute,
315+
# so the queue used to wait infinitely.
295316
fut = self.executor.submit(PickleShenanigans(0))
296-
with self.assertRaisesRegex(RuntimeError, "gotcha"):
317+
expected = _interpreters.NotShareableError
318+
with self.assertRaisesRegex(expected, 'unpickled'):
297319
fut.result()
298320

321+
def test_no_stale_references(self):
322+
# Weak references don't cross between interpreters.
323+
raise unittest.SkipTest('not applicable')
324+
325+
def test_free_reference(self):
326+
# Weak references don't cross between interpreters.
327+
raise unittest.SkipTest('not applicable')
328+
299329

300330
class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):
301331

0 commit comments

Comments
 (0)