-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
gh-108951: add TaskGroup.cancel() #127214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8ec2f60
907f1d0
2cfa1e6
bce1fb1
7754aad
452042d
f077241
8345647
7235c20
243db79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -342,6 +342,33 @@ and reliable way to wait for all tasks in the group to finish. | |
|
||
Close the given coroutine if the task group is not active. | ||
|
||
.. method:: cancel() | ||
|
||
Cancel the task group. | ||
|
||
:meth:`~asyncio.Task.cancel` will be called on any tasks in the group that | ||
aren't yet done, as well as the parent (body) of the group. This will | ||
cause the task group context manager to exit *without* | ||
:exc:`asyncio.CancelledError` being raised. | ||
|
||
If :meth:`cancel` is called before entering the task group, the group will be | ||
cancelled upon entry. This is useful for patterns where one piece of | ||
code passes an unused :class:`asyncio.TaskGroup` instance to another in order to have | ||
the ability to cancel anything run within the group. | ||
|
||
:meth:`cancel` is idempotent and may be called after the task group has | ||
already exited. | ||
|
||
Ways to use :meth:`cancel`: | ||
|
||
* call it from the task group body based on some condition or event | ||
* pass the task group instance to child tasks via :meth:`create_task`, allowing a child | ||
task to conditionally cancel the entire entire group | ||
* pass the task group instance or bound :meth:`cancel` method to some other task *before* | ||
opening the task group, allowing remote cancellation | ||
|
||
.. versionadded:: next | ||
|
||
Example:: | ||
|
||
async def main(): | ||
|
@@ -414,53 +441,6 @@ reported by :meth:`asyncio.Task.cancelling`. | |
Improved handling of simultaneous internal and external cancellations | ||
and correct preservation of cancellation counts. | ||
|
||
Terminating a Task Group | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These docs make sense for older versions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably recommending a backport module on PyPI would be better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These docs were just added in September, and backported to 3.13 and 3.12. It's my understanding that the deletion here wouldn't affect the docs of previous versions. As for this PR, I'd expected it to be backported as far back as is allowed by policy. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @belm0 are you interested in applying this change and any previous changes to my taskgroup backport? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is new API, so we won't backport it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm talking about backporting to pypi There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, sure. PyPI is off limits :) |
||
------------------------ | ||
|
||
While terminating a task group is not natively supported by the standard | ||
library, termination can be achieved by adding an exception-raising task | ||
to the task group and ignoring the raised exception: | ||
|
||
.. code-block:: python | ||
|
||
import asyncio | ||
from asyncio import TaskGroup | ||
|
||
class TerminateTaskGroup(Exception): | ||
"""Exception raised to terminate a task group.""" | ||
|
||
async def force_terminate_task_group(): | ||
"""Used to force termination of a task group.""" | ||
raise TerminateTaskGroup() | ||
|
||
async def job(task_id, sleep_time): | ||
print(f'Task {task_id}: start') | ||
await asyncio.sleep(sleep_time) | ||
print(f'Task {task_id}: done') | ||
|
||
async def main(): | ||
try: | ||
async with TaskGroup() as group: | ||
# spawn some tasks | ||
group.create_task(job(1, 0.5)) | ||
group.create_task(job(2, 1.5)) | ||
# sleep for 1 second | ||
await asyncio.sleep(1) | ||
# add an exception-raising task to force the group to terminate | ||
group.create_task(force_terminate_task_group()) | ||
except* TerminateTaskGroup: | ||
pass | ||
|
||
asyncio.run(main()) | ||
|
||
Expected output: | ||
|
||
.. code-block:: text | ||
|
||
Task 1: start | ||
Task 2: start | ||
Task 1: done | ||
|
||
Sleeping | ||
======== | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ def __init__(self): | |
self._errors = [] | ||
self._base_error = None | ||
self._on_completed_fut = None | ||
self._cancel_on_enter = False | ||
|
||
def __repr__(self): | ||
info = [''] | ||
|
@@ -62,6 +63,8 @@ async def __aenter__(self): | |
raise RuntimeError( | ||
f'TaskGroup {self!r} cannot determine the parent task') | ||
self._entered = True | ||
if self._cancel_on_enter: | ||
self.cancel() | ||
|
||
return self | ||
|
||
|
@@ -177,6 +180,10 @@ async def _aexit(self, et, exc): | |
finally: | ||
exc = None | ||
|
||
# If we wanted to raise an error, it would have been done explicitly | ||
# above. Otherwise, either there is no error or we want to suppress | ||
# the original error. | ||
return True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this bugfix need backporting to 3.12? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you think of a case where this bug is visible to the user? If it's visible, yes I'd make a separate PR with corresponding test that can be backported. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the use-case for this code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suppressing exceptions out of the context manager is certainly needed to implement async def test_taskgroup_cancel_body(self):
count = 0
async with asyncio.TaskGroup() as tg:
tg.cancel()
count += 1
await asyncio.sleep(0) # <-- CancelledError will leak out of context manager
count += 1
self.assertEqual(count, 1) Note that the change isn't a blanket suppression. Code prior to this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah it's visible in the Traceback returned for external or "native" cancellation. Ie a cancellation that propagates out of asyncio.run, because someone is using 3.10 semantics or waited on a executor future on a pool that was shutdown |
||
|
||
def create_task(self, coro, *, name=None, context=None): | ||
"""Create a new task in this group and return it. | ||
|
@@ -273,3 +280,30 @@ def _on_task_done(self, task): | |
self._abort() | ||
self._parent_cancel_requested = True | ||
self._parent_task.cancel() | ||
|
||
def cancel(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you think about supporting cancel messages here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I asked on Gitter, but I'm still unclear about how such a message would be accessed and surfaced. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could be logged by the task that gets cancelled, or useful in debugging There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would keep it as-is and maybe add a message in the follow-up PR; this PR is big enough for the review. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
My $0.02:
|
||
"""Cancel the task group | ||
|
||
`cancel()` will be called on any tasks in the group that aren't yet | ||
done, as well as the parent (body) of the group. This will cause the | ||
task group context manager to exit *without* `asyncio.CancelledError` | ||
being raised. | ||
|
||
If `cancel()` is called before entering the task group, the group will be | ||
cancelled upon entry. This is useful for patterns where one piece of | ||
code passes an unused TaskGroup instance to another in order to have | ||
the ability to cancel anything run within the group. | ||
|
||
`cancel()` is idempotent and may be called after the task group has | ||
already exited. | ||
""" | ||
if not self._entered: | ||
self._cancel_on_enter = True | ||
return | ||
if self._exiting and not self._tasks: | ||
return | ||
if not self._aborting: | ||
self._abort() | ||
if self._parent_task and not self._parent_cancel_requested: | ||
self._parent_cancel_requested = True | ||
self._parent_task.cancel() |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. copying comment from @graingert (please make all comments on the code so that there can be a thread and Resolve button)
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -997,6 +997,94 @@ class MyKeyboardInterrupt(KeyboardInterrupt): | |
self.assertIsNotNone(exc) | ||
self.assertListEqual(gc.get_referrers(exc), no_other_refs()) | ||
|
||
async def test_taskgroup_cancel_children(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add a test that a race function works, eg there's only one winner async def test_race_one_winner():
async def race(*fns):
outcome = None
async def run(fn):
nonlocal outcome
outcome = await fn()
tg.stop()
async with asyncio.TaskGroup() as tg:
for fn in fns:
tg.create_task(run(fn))
event = asyncio.Event()
record = []
async def fn_1():
record.append("1 started")
await event.wait()
record.append("1 finished")
return 1
async def fn_2():
record.append("2 started")
await event.wait()
record.append("2 finished")
return 2
async def trigger_event():
record.append("3 started")
event.set()
await asyncio.sleep(10)
record.append("3 finished")
outcome = await race(fn_1, fn_2, trigger_event)
self.assertEquals(outcome, 1)
self.assertListEquals(record, ["1 started", "2 started", "3 started", "1 finished"]) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure we should expect only one winner, and vaguely recall Trio guidance against such expectations. I'm not sure such a guarantee is useful in practice, because a task wouldn't cancel a task group until its real work was completed, and there is no way to prevent multiple tasks finishing their work on the same scheduler pass (short of employing locks within the tasks). Would you clarify your expectation? For example, "for any tasks transitively under a TaskGroup that may call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If they do finish on the same scheduler count only one will resume, so it can call .cancel() on sibling tasks to prevent them finishing This behaviour is already required by staggered_race, and we want to be able to use a TaskGroup in staggered_race |
||
# (asserting that TimeoutError is not raised) | ||
async with asyncio.timeout(1): | ||
async with asyncio.TaskGroup() as tg: | ||
tg.create_task(asyncio.sleep(10)) | ||
tg.create_task(asyncio.sleep(10)) | ||
await asyncio.sleep(0) | ||
tg.cancel() | ||
|
||
async def test_taskgroup_cancel_body(self): | ||
count = 0 | ||
async with asyncio.TaskGroup() as tg: | ||
tg.cancel() | ||
count += 1 | ||
await asyncio.sleep(0) | ||
count += 1 | ||
self.assertEqual(count, 1) | ||
|
||
async def test_taskgroup_cancel_idempotent(self): | ||
count = 0 | ||
async with asyncio.TaskGroup() as tg: | ||
tg.cancel() | ||
tg.cancel() | ||
count += 1 | ||
await asyncio.sleep(0) | ||
count += 1 | ||
self.assertEqual(count, 1) | ||
|
||
async def test_taskgroup_cancel_after_exit(self): | ||
async with asyncio.TaskGroup() as tg: | ||
await asyncio.sleep(0) | ||
# (asserting that exception is not raised) | ||
tg.cancel() | ||
|
||
async def test_taskgroup_cancel_before_enter(self): | ||
tg = asyncio.TaskGroup() | ||
tg.cancel() | ||
count = 0 | ||
async with tg: | ||
count += 1 | ||
await asyncio.sleep(0) | ||
count += 1 | ||
self.assertEqual(count, 1) | ||
|
||
async def test_taskgroup_cancel_before_create_task(self): | ||
async with asyncio.TaskGroup() as tg: | ||
tg.cancel() | ||
# TODO: This behavior is not ideal. We'd rather have no exception | ||
# raised, and the child task run until the first await. | ||
with self.assertRaises(RuntimeError): | ||
tg.create_task(asyncio.sleep(1)) | ||
|
||
async def test_taskgroup_cancel_before_exception(self): | ||
async def raise_exc(parent_tg: asyncio.TaskGroup): | ||
parent_tg.cancel() | ||
raise RuntimeError | ||
|
||
with self.assertRaises(ExceptionGroup): | ||
async with asyncio.TaskGroup() as tg: | ||
tg.create_task(raise_exc(tg)) | ||
await asyncio.sleep(1) | ||
|
||
async def test_taskgroup_cancel_after_exception(self): | ||
async def raise_exc(parent_tg: asyncio.TaskGroup): | ||
try: | ||
raise RuntimeError | ||
finally: | ||
parent_tg.cancel() | ||
|
||
with self.assertRaises(ExceptionGroup): | ||
async with asyncio.TaskGroup() as tg: | ||
tg.create_task(raise_exc(tg)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will happen if some tasks cancels itself? How would this interact with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean the case where a child task calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cancellations (and thus taskgroup stops) happen when the next |
||
await asyncio.sleep(1) | ||
|
||
async def test_taskgroup_body_cancel_before_exception(self): | ||
with self.assertRaises(ExceptionGroup): | ||
async with asyncio.TaskGroup() as tg: | ||
tg.cancel() | ||
raise RuntimeError | ||
|
||
async def test_taskgroup_body_cancel_after_exception(self): | ||
with self.assertRaises(ExceptionGroup): | ||
async with asyncio.TaskGroup() as tg: | ||
try: | ||
raise RuntimeError | ||
finally: | ||
tg.cancel() | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add :meth:`~asyncio.TaskGroup.cancel`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably you want code examples for all of these?