Skip to content

Commit ac510bb

Browse files
Use virtual terminal input on Windows when available.
1 parent 92b3a95 commit ac510bb

File tree

1 file changed

+129
-8
lines changed

1 file changed

+129
-8
lines changed

src/prompt_toolkit/input/win32.py

Lines changed: 129 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import msvcrt
1717
from ctypes import windll
1818

19-
from ctypes import Array, pointer
19+
from ctypes import Array, byref, pointer
2020
from ctypes.wintypes import DWORD, HANDLE
2121
from typing import Callable, ContextManager, Iterable, Iterator, TextIO
2222

@@ -35,6 +35,7 @@
3535

3636
from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
3737
from .base import Input
38+
from .vt100_parser import Vt100Parser
3839

3940
__all__ = [
4041
"Win32Input",
@@ -52,6 +53,9 @@
5253
MOUSE_MOVED = 0x0001
5354
MOUSE_WHEELED = 0x0004
5455

56+
# See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx
57+
ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
58+
5559

5660
class _Win32InputBase(Input):
5761
"""
@@ -74,7 +78,12 @@ class Win32Input(_Win32InputBase):
7478

7579
def __init__(self, stdin: TextIO | None = None) -> None:
7680
super().__init__()
77-
self.console_input_reader = ConsoleInputReader()
81+
self._use_virtual_terminal_input = _is_win_vt100_input_enabled()
82+
83+
if self._use_virtual_terminal_input:
84+
self.console_input_reader = Vt100InputReader()
85+
else:
86+
self.console_input_reader = ConsoleInputReader()
7887

7988
def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
8089
"""
@@ -101,7 +110,9 @@ def closed(self) -> bool:
101110
return False
102111

103112
def raw_mode(self) -> ContextManager[None]:
104-
return raw_mode()
113+
return raw_mode(
114+
use_win10_virtual_terminal_input=self._use_virtual_terminal_input
115+
)
105116

106117
def cooked_mode(self) -> ContextManager[None]:
107118
return cooked_mode()
@@ -124,6 +135,88 @@ def handle(self) -> HANDLE:
124135
return self.console_input_reader.handle
125136

126137

138+
class Vt100InputReader:
139+
def __init__(self) -> None:
140+
self._fdcon = None
141+
142+
self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
143+
self._vt100_parser = Vt100Parser(
144+
lambda key_press: self._buffer.append(key_press)
145+
)
146+
147+
# When stdin is a tty, use that handle, otherwise, create a handle from
148+
# CONIN$.
149+
self.handle: HANDLE
150+
if sys.stdin.isatty():
151+
self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
152+
else:
153+
self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
154+
self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
155+
156+
def close(self) -> None:
157+
"Close fdcon."
158+
if self._fdcon is not None:
159+
os.close(self._fdcon)
160+
161+
def read(self) -> Iterable[KeyPress]:
162+
"""
163+
Return a list of `KeyPress` instances. It won't return anything when
164+
there was nothing to read. (This function doesn't block.)
165+
166+
http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
167+
"""
168+
max_count = 2048 # Max events to read at the same time.
169+
170+
read = DWORD(0)
171+
arrtype = INPUT_RECORD * max_count
172+
input_records = arrtype()
173+
174+
# Check whether there is some input to read. `ReadConsoleInputW` would
175+
# block otherwise.
176+
# (Actually, the event loop is responsible to make sure that this
177+
# function is only called when there is something to read, but for some
178+
# reason this happened in the asyncio_win32 loop, and it's better to be
179+
# safe anyway.)
180+
if not wait_for_handles([self.handle], timeout=0):
181+
return
182+
183+
# Get next batch of input event.
184+
windll.kernel32.ReadConsoleInputW(
185+
self.handle, pointer(input_records), max_count, pointer(read)
186+
)
187+
188+
# First, get all the keys from the input buffer, in order to determine
189+
# whether we should consider this a paste event or not.
190+
for key_data in self._get_keys(read, input_records):
191+
self._vt100_parser.feed(key_data)
192+
193+
# Return result.
194+
result = self._buffer
195+
self._buffer = []
196+
return result
197+
198+
def _get_keys(
199+
self, read: DWORD, input_records: Array[INPUT_RECORD]
200+
) -> Iterator[str]:
201+
"""
202+
Generator that yields `KeyPress` objects from the input records.
203+
"""
204+
for i in range(read.value):
205+
ir = input_records[i]
206+
207+
# Get the right EventType from the EVENT_RECORD.
208+
# (For some reason the Windows console application 'cmder'
209+
# [http://gooseberrycreative.com/cmder/] can return '0' for
210+
# ir.EventType. -- Just ignore that.)
211+
if ir.EventType in EventTypes:
212+
ev = getattr(ir.Event, EventTypes[ir.EventType])
213+
214+
# Process if this is a key event. (We also have mouse, menu and
215+
# focus events.)
216+
if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
217+
yield ev.uChar.UnicodeChar
218+
219+
127220
class ConsoleInputReader:
128221
"""
129222
:param recognize_paste: When True, try to discover paste actions and turn
@@ -700,8 +793,11 @@ class raw_mode:
700793
`raw_input` method of `.vt100_input`.
701794
"""
702795

703-
def __init__(self, fileno: int | None = None) -> None:
796+
def __init__(
797+
self, fileno: int | None = None, use_win10_virtual_terminal_input: bool = False
798+
) -> None:
704799
self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
800+
self.use_win10_virtual_terminal_input = use_win10_virtual_terminal_input
705801

706802
def __enter__(self) -> None:
707803
# Remember original mode.
@@ -717,12 +813,15 @@ def _patch(self) -> None:
717813
ENABLE_LINE_INPUT = 0x0002
718814
ENABLE_PROCESSED_INPUT = 0x0001
719815

720-
windll.kernel32.SetConsoleMode(
721-
self.handle,
722-
self.original_mode.value
723-
& ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
816+
new_mode = self.original_mode.value & ~(
817+
ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT
724818
)
725819

820+
if self.use_win10_virtual_terminal_input:
821+
new_mode |= ENABLE_VIRTUAL_TERMINAL_INPUT
822+
823+
windll.kernel32.SetConsoleMode(self.handle, new_mode)
824+
726825
def __exit__(self, *a: object) -> None:
727826
# Restore original mode
728827
windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
@@ -747,3 +846,25 @@ def _patch(self) -> None:
747846
self.original_mode.value
748847
| (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
749848
)
849+
850+
851+
def _is_win_vt100_input_enabled() -> bool:
852+
"""
853+
Returns True when we're running Windows and VT100 escape sequences are
854+
supported.
855+
"""
856+
hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
857+
858+
# Get original console mode.
859+
original_mode = DWORD(0)
860+
windll.kernel32.GetConsoleMode(hconsole, byref(original_mode))
861+
862+
try:
863+
# Try to enable VT100 sequences.
864+
result: int = windll.kernel32.SetConsoleMode(
865+
hconsole, DWORD(ENABLE_VIRTUAL_TERMINAL_INPUT)
866+
)
867+
868+
return result == 1
869+
finally:
870+
windll.kernel32.SetConsoleMode(hconsole, original_mode)

0 commit comments

Comments
 (0)