Skip to content

Commit 5bfd3d8

Browse files
authored
Merge pull request #617 from martindurant/interrupt
Add ability to interrupt all fsspec IO tasks
2 parents 3911896 + 6d485e7 commit 5bfd3d8

File tree

2 files changed

+65
-4
lines changed

2 files changed

+65
-4
lines changed

fsspec/asyn.py

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,14 @@ def sync(loop, func, *args, timeout=None, **kwargs):
5151
result = [None]
5252
event = threading.Event()
5353
asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
54-
# Raise FSTimeoutError on event.wait() returns None (timeout) or
55-
# asyncio.TimeoutError to make the timeout behaviors consistency.
56-
if not event.wait(timeout):
57-
raise FSTimeoutError
54+
while True:
55+
# this loops allows thread to get interrupted
56+
if event.wait(1):
57+
break
58+
if timeout is not None:
59+
timeout -= 1
60+
if timeout < 0:
61+
raise FSTimeoutError
5862
if isinstance(result[0], asyncio.TimeoutError):
5963
# suppress asyncio.TimeoutError, raise FSTimeoutError
6064
raise FSTimeoutError
@@ -572,3 +576,43 @@ def mirror_sync_methods(obj):
572576
mth.__doc__ = getattr(
573577
getattr(AbstractFileSystem, smethod, None), "__doc__", ""
574578
)
579+
580+
581+
class FSSpecCoroutineCancel(Exception):
582+
pass
583+
584+
585+
def _dump_running_tasks(
586+
printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
587+
):
588+
import traceback
589+
590+
if PY36:
591+
raise NotImplementedError("Do not call this on Py 3.6")
592+
593+
tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
594+
if printout:
595+
[task.print_stack() for task in tasks]
596+
out = [
597+
{
598+
"locals": task._coro.cr_frame.f_locals,
599+
"file": task._coro.cr_frame.f_code.co_filename,
600+
"firstline": task._coro.cr_frame.f_code.co_firstlineno,
601+
"linelo": task._coro.cr_frame.f_lineno,
602+
"stack": traceback.format_stack(task._coro.cr_frame),
603+
"task": task if with_task else None,
604+
}
605+
for task in tasks
606+
]
607+
if cancel:
608+
for t in tasks:
609+
cbs = t._callbacks
610+
t.cancel()
611+
asyncio.futures.Future.set_exception(t, exc)
612+
asyncio.futures.Future.cancel(t)
613+
[cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
614+
try:
615+
t._coro.throw(exc) # exits coro, unless explicitly handled
616+
except exc:
617+
pass
618+
return out

fsspec/tests/test_async.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import inspect
33
import sys
4+
import time
45

56
import pytest
67

@@ -16,6 +17,22 @@ def test_sync_methods():
1617
assert not inspect.iscoroutinefunction(inst.info)
1718

1819

20+
@pytest.mark.skipif(fsspec.asyn.PY36, reason="missing asyncio features o py36")
21+
def test_interrupt():
22+
loop = fsspec.asyn.get_loop()
23+
24+
async def f():
25+
await asyncio.sleep(1000000)
26+
return True
27+
28+
fut = asyncio.run_coroutine_threadsafe(f(), loop)
29+
time.sleep(0.01) # task launches
30+
out = fsspec.asyn._dump_running_tasks(with_task=True)
31+
task = out[0]["task"]
32+
assert task.done() and fut.done()
33+
assert isinstance(fut.exception(), fsspec.asyn.FSSpecCoroutineCancel)
34+
35+
1936
class _DummyAsyncKlass:
2037
def __init__(self):
2138
self.loop = fsspec.asyn.get_loop()

0 commit comments

Comments
 (0)