Skip to content

Commit ce06dd6

Browse files
committed
Rewrite interactive client with synchronous API.
Fix #1312.
1 parent 25a5252 commit ce06dd6

File tree

1 file changed

+34
-105
lines changed

1 file changed

+34
-105
lines changed

src/websockets/__main__.py

Lines changed: 34 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
from __future__ import annotations
22

33
import argparse
4-
import asyncio
54
import os
65
import signal
76
import sys
87
import threading
9-
from typing import Any, Set
108

11-
from .exceptions import ConnectionClosed
12-
from .frames import Close
13-
from .legacy.client import connect
9+
10+
try:
11+
import readline # noqa
12+
except ImportError: # Windows has no `readline` normally
13+
pass
14+
15+
from .sync.client import ClientConnection, connect
1416
from .version import version as websockets_version
1517

1618

@@ -46,21 +48,6 @@ def win_enable_vt100() -> None:
4648
raise RuntimeError("unable to set console mode")
4749

4850

49-
def exit_from_event_loop_thread(
50-
loop: asyncio.AbstractEventLoop,
51-
stop: asyncio.Future[None],
52-
) -> None:
53-
loop.stop()
54-
if not stop.done():
55-
# When exiting the thread that runs the event loop, raise
56-
# KeyboardInterrupt in the main thread to exit the program.
57-
if sys.platform == "win32":
58-
ctrl_c = signal.CTRL_C_EVENT
59-
else:
60-
ctrl_c = signal.SIGINT
61-
os.kill(os.getpid(), ctrl_c)
62-
63-
6451
def print_during_input(string: str) -> None:
6552
sys.stdout.write(
6653
# Save cursor position
@@ -93,63 +80,20 @@ def print_over_input(string: str) -> None:
9380
sys.stdout.flush()
9481

9582

96-
async def run_client(
97-
uri: str,
98-
loop: asyncio.AbstractEventLoop,
99-
inputs: asyncio.Queue[str],
100-
stop: asyncio.Future[None],
101-
) -> None:
102-
try:
103-
websocket = await connect(uri)
104-
except Exception as exc:
105-
print_over_input(f"Failed to connect to {uri}: {exc}.")
106-
exit_from_event_loop_thread(loop, stop)
107-
return
108-
else:
109-
print_during_input(f"Connected to {uri}.")
110-
111-
try:
112-
while True:
113-
incoming: asyncio.Future[Any] = asyncio.create_task(websocket.recv())
114-
outgoing: asyncio.Future[Any] = asyncio.create_task(inputs.get())
115-
done: Set[asyncio.Future[Any]]
116-
pending: Set[asyncio.Future[Any]]
117-
done, pending = await asyncio.wait(
118-
[incoming, outgoing, stop], return_when=asyncio.FIRST_COMPLETED
119-
)
120-
121-
# Cancel pending tasks to avoid leaking them.
122-
if incoming in pending:
123-
incoming.cancel()
124-
if outgoing in pending:
125-
outgoing.cancel()
126-
127-
if incoming in done:
128-
try:
129-
message = incoming.result()
130-
except ConnectionClosed:
131-
break
132-
else:
133-
if isinstance(message, str):
134-
print_during_input("< " + message)
135-
else:
136-
print_during_input("< (binary) " + message.hex())
137-
138-
if outgoing in done:
139-
message = outgoing.result()
140-
await websocket.send(message)
141-
142-
if stop in done:
143-
break
144-
145-
finally:
146-
await websocket.close()
147-
assert websocket.close_code is not None and websocket.close_reason is not None
148-
close_status = Close(websocket.close_code, websocket.close_reason)
149-
150-
print_over_input(f"Connection closed: {close_status}.")
151-
152-
exit_from_event_loop_thread(loop, stop)
83+
def print_incoming_messages(websocket: ClientConnection, stop: threading.Event) -> None:
84+
for message in websocket:
85+
if isinstance(message, str):
86+
print_during_input("< " + message)
87+
else:
88+
print_during_input("< (binary) " + message.hex())
89+
if not stop.is_set():
90+
# When the server closes the connection, raise KeyboardInterrupt
91+
# in the main thread to exit the program.
92+
if sys.platform == "win32":
93+
ctrl_c = signal.CTRL_C_EVENT
94+
else:
95+
ctrl_c = signal.SIGINT
96+
os.kill(os.getpid(), ctrl_c)
15397

15498

15599
def main() -> None:
@@ -184,47 +128,32 @@ def main() -> None:
184128
sys.stderr.flush()
185129

186130
try:
187-
import readline # noqa
188-
except ImportError: # Windows has no `readline` normally
189-
pass
190-
191-
# Create an event loop that will run in a background thread.
192-
loop = asyncio.new_event_loop()
193-
194-
# Due to zealous removal of the loop parameter in the Queue constructor,
195-
# we need a factory coroutine to run in the freshly created event loop.
196-
async def queue_factory() -> asyncio.Queue[str]:
197-
return asyncio.Queue()
198-
199-
# Create a queue of user inputs. There's no need to limit its size.
200-
inputs: asyncio.Queue[str] = loop.run_until_complete(queue_factory())
201-
202-
# Create a stop condition when receiving SIGINT or SIGTERM.
203-
stop: asyncio.Future[None] = loop.create_future()
131+
websocket = connect(args.uri)
132+
except Exception as exc:
133+
print(f"Failed to connect to {args.uri}: {exc}.")
134+
sys.exit(1)
135+
else:
136+
print(f"Connected to {args.uri}.")
204137

205-
# Schedule the task that will manage the connection.
206-
loop.create_task(run_client(args.uri, loop, inputs, stop))
138+
stop = threading.Event()
207139

208-
# Start the event loop in a background thread.
209-
thread = threading.Thread(target=loop.run_forever)
140+
# Start the thread that reads messages from the connection.
141+
thread = threading.Thread(target=print_incoming_messages, args=(websocket, stop))
210142
thread.start()
211143

212144
# Read from stdin in the main thread in order to receive signals.
213145
try:
214146
while True:
215147
# Since there's no size limit, put_nowait is identical to put.
216148
message = input("> ")
217-
loop.call_soon_threadsafe(inputs.put_nowait, message)
149+
websocket.send(message)
218150
except (KeyboardInterrupt, EOFError): # ^C, ^D
219-
loop.call_soon_threadsafe(stop.set_result, None)
151+
stop.set()
152+
websocket.close()
153+
print_over_input("Connection closed.")
220154

221-
# Wait for the event loop to terminate.
222155
thread.join()
223156

224-
# For reasons unclear, even though the loop is closed in the thread,
225-
# it still thinks it's running here.
226-
loop.close()
227-
228157

229158
if __name__ == "__main__":
230159
main()

0 commit comments

Comments
 (0)