Skip to content

Commit 8899d48

Browse files
Use virtual terminal input on Windows when available.
1 parent 92b3a95 commit 8899d48

File tree

1 file changed

+145
-8
lines changed

1 file changed

+145
-8
lines changed

src/prompt_toolkit/input/win32.py

Lines changed: 145 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import msvcrt
1717
from ctypes import windll
1818

19-
from ctypes import Array, pointer
19+
from ctypes import Array, byref, pointer
2020
from ctypes.wintypes import DWORD, HANDLE
2121
from typing import Callable, ContextManager, Iterable, Iterator, TextIO
2222

@@ -35,6 +35,7 @@
3535

3636
from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
3737
from .base import Input
38+
from .vt100_parser import Vt100Parser
3839

3940
__all__ = [
4041
"Win32Input",
@@ -52,6 +53,9 @@
5253
MOUSE_MOVED = 0x0001
5354
MOUSE_WHEELED = 0x0004
5455

56+
# See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx
57+
ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
58+
5559

5660
class _Win32InputBase(Input):
5761
"""
@@ -74,7 +78,14 @@ class Win32Input(_Win32InputBase):
7478

7579
def __init__(self, stdin: TextIO | None = None) -> None:
7680
super().__init__()
77-
self.console_input_reader = ConsoleInputReader()
81+
self._use_virtual_terminal_input = _is_win_vt100_input_enabled()
82+
83+
self.console_input_reader: Vt100ConsoleInputReader | ConsoleInputReader
84+
85+
if self._use_virtual_terminal_input:
86+
self.console_input_reader = Vt100ConsoleInputReader()
87+
else:
88+
self.console_input_reader = ConsoleInputReader()
7889

7990
def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
8091
"""
@@ -101,7 +112,9 @@ def closed(self) -> bool:
101112
return False
102113

103114
def raw_mode(self) -> ContextManager[None]:
104-
return raw_mode()
115+
return raw_mode(
116+
use_win10_virtual_terminal_input=self._use_virtual_terminal_input
117+
)
105118

106119
def cooked_mode(self) -> ContextManager[None]:
107120
return cooked_mode()
@@ -555,6 +568,102 @@ def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> list[KeyPress]:
555568
return [KeyPress(Keys.WindowsMouseEvent, data)]
556569

557570

571+
class Vt100ConsoleInputReader:
572+
"""
573+
Similar to `ConsoleInputReader`, but for usage when
574+
`ENABLE_VIRTUAL_TERMINAL_INPUT` is enabled. This assumes that Windows sends
575+
us the right vt100 escape sequences and we parse those with our vt100
576+
parser.
577+
578+
(Using this instead of `ConsoleInputReader` results in the "data" attribute
579+
from the `KeyPress` instances to be more correct in edge cases, because
580+
this responds to for instance the terminal being in application cursor keys
581+
mode.)
582+
"""
583+
584+
def __init__(self) -> None:
585+
self._fdcon = None
586+
587+
self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
588+
self._vt100_parser = Vt100Parser(
589+
lambda key_press: self._buffer.append(key_press)
590+
)
591+
592+
# When stdin is a tty, use that handle, otherwise, create a handle from
593+
# CONIN$.
594+
self.handle: HANDLE
595+
if sys.stdin.isatty():
596+
self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
597+
else:
598+
self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
599+
self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
600+
601+
def close(self) -> None:
602+
"Close fdcon."
603+
if self._fdcon is not None:
604+
os.close(self._fdcon)
605+
606+
def read(self) -> Iterable[KeyPress]:
607+
"""
608+
Return a list of `KeyPress` instances. It won't return anything when
609+
there was nothing to read. (This function doesn't block.)
610+
611+
http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
612+
"""
613+
max_count = 2048 # Max events to read at the same time.
614+
615+
read = DWORD(0)
616+
arrtype = INPUT_RECORD * max_count
617+
input_records = arrtype()
618+
619+
# Check whether there is some input to read. `ReadConsoleInputW` would
620+
# block otherwise.
621+
# (Actually, the event loop is responsible to make sure that this
622+
# function is only called when there is something to read, but for some
623+
# reason this happened in the asyncio_win32 loop, and it's better to be
624+
# safe anyway.)
625+
if not wait_for_handles([self.handle], timeout=0):
626+
return []
627+
628+
# Get next batch of input event.
629+
windll.kernel32.ReadConsoleInputW(
630+
self.handle, pointer(input_records), max_count, pointer(read)
631+
)
632+
633+
# First, get all the keys from the input buffer, in order to determine
634+
# whether we should consider this a paste event or not.
635+
for key_data in self._get_keys(read, input_records):
636+
self._vt100_parser.feed(key_data)
637+
638+
# Return result.
639+
result = self._buffer
640+
self._buffer = []
641+
return result
642+
643+
def _get_keys(
644+
self, read: DWORD, input_records: Array[INPUT_RECORD]
645+
) -> Iterator[str]:
646+
"""
647+
Generator that yields `KeyPress` objects from the input records.
648+
"""
649+
for i in range(read.value):
650+
ir = input_records[i]
651+
652+
# Get the right EventType from the EVENT_RECORD.
653+
# (For some reason the Windows console application 'cmder'
654+
# [http://gooseberrycreative.com/cmder/] can return '0' for
655+
# ir.EventType. -- Just ignore that.)
656+
if ir.EventType in EventTypes:
657+
ev = getattr(ir.Event, EventTypes[ir.EventType])
658+
659+
# Process if this is a key event. (We also have mouse, menu and
660+
# focus events.)
661+
if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
662+
u_char = ev.uChar.UnicodeChar
663+
if u_char != "\x00":
664+
yield u_char
665+
666+
558667
class _Win32Handles:
559668
"""
560669
Utility to keep track of which handles are connectod to which callbacks.
@@ -700,8 +809,11 @@ class raw_mode:
700809
`raw_input` method of `.vt100_input`.
701810
"""
702811

703-
def __init__(self, fileno: int | None = None) -> None:
812+
def __init__(
813+
self, fileno: int | None = None, use_win10_virtual_terminal_input: bool = False
814+
) -> None:
704815
self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
816+
self.use_win10_virtual_terminal_input = use_win10_virtual_terminal_input
705817

706818
def __enter__(self) -> None:
707819
# Remember original mode.
@@ -717,12 +829,15 @@ def _patch(self) -> None:
717829
ENABLE_LINE_INPUT = 0x0002
718830
ENABLE_PROCESSED_INPUT = 0x0001
719831

720-
windll.kernel32.SetConsoleMode(
721-
self.handle,
722-
self.original_mode.value
723-
& ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
832+
new_mode = self.original_mode.value & ~(
833+
ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT
724834
)
725835

836+
if self.use_win10_virtual_terminal_input:
837+
new_mode |= ENABLE_VIRTUAL_TERMINAL_INPUT
838+
839+
windll.kernel32.SetConsoleMode(self.handle, new_mode)
840+
726841
def __exit__(self, *a: object) -> None:
727842
# Restore original mode
728843
windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
@@ -747,3 +862,25 @@ def _patch(self) -> None:
747862
self.original_mode.value
748863
| (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
749864
)
865+
866+
867+
def _is_win_vt100_input_enabled() -> bool:
868+
"""
869+
Returns True when we're running Windows and VT100 escape sequences are
870+
supported.
871+
"""
872+
hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
873+
874+
# Get original console mode.
875+
original_mode = DWORD(0)
876+
windll.kernel32.GetConsoleMode(hconsole, byref(original_mode))
877+
878+
try:
879+
# Try to enable VT100 sequences.
880+
result: int = windll.kernel32.SetConsoleMode(
881+
hconsole, DWORD(ENABLE_VIRTUAL_TERMINAL_INPUT)
882+
)
883+
884+
return result == 1
885+
finally:
886+
windll.kernel32.SetConsoleMode(hconsole, original_mode)

0 commit comments

Comments
 (0)