Skip to content

gh-133485: Use _interpreters.call() in InterpreterPoolExecutor #133957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 56 additions & 68 deletions Lib/concurrent/futures/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
task = (fn, args, kwargs)
data = pickle.dumps(task)
return data
return task

if initializer is not None:
try:
Expand All @@ -65,35 +61,6 @@ def create_context():
return cls(initdata, shared)
return create_context, resolve_task

@classmethod
@contextlib.contextmanager
def _capture_exc(cls, resultsid):
try:
yield
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
_interpqueues.put(resultsid, (None, exc))
raise # re-raise

@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None))

@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
with cls._capture_exc(resultsid):
_interpqueues.put(resultsid, (res, None))

@classmethod
def _call_pickled(cls, pickled, resultsid):
with cls._capture_exc(resultsid):
fn, args, kwargs = pickle.loads(pickled)
cls._call(fn, args, kwargs, resultsid)

def __init__(self, initdata, shared=None):
self.initdata = initdata
self.shared = dict(shared) if shared else None
Expand All @@ -104,11 +71,56 @@ def __del__(self):
if self.interpid is not None:
self.finalize()

def _exec(self, script):
assert self.interpid is not None
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
def _call(self, fn, args, kwargs):
def do_call(resultsid, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except BaseException as exc:
# Avoid relying on globals.
import _interpreters
import _interpqueues
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
try:
_interpqueues.put(resultsid, exc)
except _interpreters.NotShareableError:
# The exception is not shareable.
import sys
import traceback
print('exception is not shareable:', file=sys.stderr)
traceback.print_exception(exc)
_interpqueues.put(resultsid, None)
raise # re-raise

args = (self.resultsid, fn, *args)
res, excinfo = _interpreters.call(self.interpid, do_call, args, kwargs)
if excinfo is not None:
raise ExecutionFailed(excinfo)
return res

def _get_exception(self):
# Wait for the exception data to show up.
while True:
try:
excdata = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
raise # re-raise
except _interpqueues.QueueError as exc:
if exc.__cause__ is not None or exc.__context__ is not None:
raise # re-raise
if str(exc).endswith(' is empty'):
continue
else:
raise # re-raise
except ModuleNotFoundError:
# interpreters.queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
exc, unboundop = excdata
assert unboundop is None, unboundop
return exc

def initialize(self):
assert self.interpid is None, self.interpid
Expand All @@ -119,8 +131,6 @@ def initialize(self):
maxsize = 0
self.resultsid = _interpqueues.create(maxsize)

self._exec(f'from {__name__} import WorkerContext')

if self.shared:
_interpreters.set___main___attrs(
self.interpid, self.shared, restrict=True)
Expand Down Expand Up @@ -148,37 +158,15 @@ def finalize(self):
pass

def run(self, task):
data = task
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'

fn, args, kwargs = task
try:
self._exec(script)
except ExecutionFailed as exc:
exc_wrapper = exc
else:
exc_wrapper = None

# Return the result, or raise the exception.
while True:
try:
obj = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
return self._call(fn, args, kwargs)
except ExecutionFailed as wrapper:
exc = self._get_exception()
if exc is None:
# The exception must have been not shareable.
raise # re-raise
except _interpqueues.QueueError:
continue
except ModuleNotFoundError:
# interpreters.queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
(res, exc), unboundop = obj
assert unboundop is None, unboundop
if exc is not None:
assert res is None, res
assert exc_wrapper is not None
raise exc from exc_wrapper
return res
raise exc from wrapper


class BrokenInterpreterPool(_thread.BrokenThreadPool):
Expand Down
4 changes: 4 additions & 0 deletions Lib/test/test_concurrent_futures/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
def init(x):
global INITIALIZER_STATUS
INITIALIZER_STATUS = x
# InterpreterPoolInitializerTest.test_initializer fails
# if we don't have a LOAD_GLOBAL. (It could be any global.)
# We will address this separately.
INITIALIZER_STATUS
Comment on lines +23 to +26
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markshannon, any ideas on why this is happening? It smells like a ceval bug, but it certainly could be something I've done wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@neonene neonene Jun 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be related changes in inspect.getclosurevars() since 83ba8c2:

before:
ClosureVars(nonlocals={},
            globals={'INITIALIZER_STATUS': 'uninitialized'},
            builtins={}, unbound=set())
after:
ClosureVars(nonlocals={},
            globals={},
            builtins={}, unbound=set())
  • init() on main (without L26):
  3           RESUME                   0

  5           LOAD_FAST_BORROW         0 (x)
              STORE_GLOBAL             0 (INITIALIZER_STATUS)
              LOAD_CONST               0 (None)
              RETURN_VALUE
  • 3.3.5 (2014):
  5           0 LOAD_FAST                0 (x)
              3 STORE_GLOBAL             0 (INITIALIZER_STATUS)
              6 LOAD_CONST               0 (None)
              9 RETURN_VALUE


def get_init_status():
return INITIALIZER_STATUS
Expand Down
Loading
Loading