Skip to content

Commit 79fc062

Browse files
committed
Make proxies decide how to handle disconnects
1 parent a4cca6b commit 79fc062

File tree

8 files changed

+77
-12
lines changed

8 files changed

+77
-12
lines changed

packages/protocol/src/browser/client.ts

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,8 @@ export class Client {
134134
message.setResponse(stringify(error));
135135
this.failEmitter.emit(message);
136136

137-
this.eventEmitter.emit({ event: "exit", args: [1] });
138-
this.eventEmitter.emit({ event: "close", args: [] });
139-
try {
140-
this.eventEmitter.emit({ event: "error", args: [error] });
141-
} catch (error) {
142-
// If nothing is listening, EventEmitter will throw an error.
143-
}
144-
this.eventEmitter.emit({ event: "done", args: [true] });
137+
this.eventEmitter.emit({ event: "disconnected", args: [error] });
138+
this.eventEmitter.emit({ event: "done", args: [] });
145139
};
146140

147141
connection.onDown(() => handleDisconnect());
@@ -450,12 +444,12 @@ export class Client {
450444
callbacks: new Map(),
451445
});
452446

453-
instance.onDone((disconnected: boolean) => {
447+
instance.onDone(() => {
454448
const log = (): void => {
455449
logger.trace(() => [
456450
typeof proxyId === "number" ? "disposed proxy" : "disposed proxy callbacks",
457451
field("proxyId", proxyId),
458-
field("disconnected", disconnected),
452+
field("disconnected", this.disconnected),
459453
field("callbacks", Array.from(this.proxies.values()).reduce((count, proxy) => count + proxy.callbacks.size, 0)),
460454
field("success listeners", this.successEmitter.counts),
461455
field("fail listeners", this.failEmitter.counts),
@@ -471,7 +465,7 @@ export class Client {
471465
this.eventEmitter.dispose(proxyId);
472466
log();
473467
};
474-
if (!disconnected) {
468+
if (!this.disconnected) {
475469
instance.dispose().then(dispose).catch(dispose);
476470
} else {
477471
dispose();

packages/protocol/src/browser/modules/child_process.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,16 @@ export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.C
8787

8888
return true; // Always true since we can't get this synchronously.
8989
}
90+
91+
protected handleDisconnect(error: Error): void {
92+
try {
93+
this.emit("error", error);
94+
} catch (error) {
95+
// If nothing is listening, EventEmitter will throw an error.
96+
}
97+
this.emit("exit", 1);
98+
this.emit("close");
99+
}
90100
}
91101

92102
export class ChildProcessModule {

packages/protocol/src/browser/modules/fs.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ class Watcher extends ClientProxy<WatcherProxy> implements fs.FSWatcher {
4141
public close(): void {
4242
this.proxy.close();
4343
}
44+
45+
protected handleDisconnect(error: Error): void {
46+
try {
47+
this.emit("error", error);
48+
} catch (error) {
49+
// If nothing is listening, EventEmitter will throw an error.
50+
}
51+
this.emit("close");
52+
}
4453
}
4554

4655
class WriteStream extends Writable<WriteStreamProxy> implements fs.WriteStream {

packages/protocol/src/browser/modules/net.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,20 @@ export class Socket extends Duplex<NetSocketProxy> implements net.Socket {
126126
}
127127

128128
export class Server extends ClientProxy<NetServerProxy> implements net.Server {
129+
private socketId = 0;
129130
private readonly sockets = new Map<number, net.Socket>();
130131
private _listening: boolean = false;
131132

132133
public constructor(proxyPromise: Promise<NetServerProxy> | NetServerProxy) {
133134
super(proxyPromise);
134135

135136
this.proxy.onConnection((socketProxy) => {
136-
this.emit("connection", new Socket(socketProxy));
137+
const socket = new Socket(socketProxy);
138+
const socketId = this.socketId++;
139+
this.sockets.set(socketId, socket);
140+
socket.on("error", () => this.sockets.delete(socketId))
141+
socket.on("close", () => this.sockets.delete(socketId))
142+
this.emit("connection", socket);
137143
});
138144

139145
this.on("listening", () => this._listening = true);
@@ -200,6 +206,15 @@ export class Server extends ClientProxy<NetServerProxy> implements net.Server {
200206
public getConnections(cb: (error: Error | null, count: number) => void): void {
201207
cb(null, this.sockets.size);
202208
}
209+
210+
protected handleDisconnect(error: Error): void {
211+
try {
212+
this.emit("error", error);
213+
} catch (error) {
214+
// If nothing is listening, EventEmitter will throw an error.
215+
}
216+
this.emit("close");
217+
}
203218
}
204219

205220
type NodeNet = typeof net;

packages/protocol/src/browser/modules/node-pty.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ export class NodePtyProcess extends ClientProxy<NodePtyProcessProxy> implements
3232
public kill(signal?: string): void {
3333
this.proxy.kill(signal);
3434
}
35+
36+
protected handleDisconnect(): void {
37+
this._process += " (disconnected)";
38+
this.emit("data", "\r\n\nLost connection...");
39+
}
3540
}
3641

3742
type NodePty = typeof pty;

packages/protocol/src/browser/modules/spdlog.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ class RotatingLogger extends ClientProxy<RotatingLoggerProxy> implements spdlog.
1313
public async clearFormatters (): Promise<void> { this.proxy.clearFormatters(); }
1414
public async flush (): Promise<void> { this.proxy.flush(); }
1515
public async drop (): Promise<void> { this.proxy.drop(); }
16+
17+
protected handleDisconnect(): void {
18+
// TODO: reconnect.
19+
}
1620
}
1721

1822
export class SpdlogModule {

packages/protocol/src/browser/modules/stream.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ export class Writable<T extends WritableProxy = WritableProxy> extends ClientPro
8181
}
8282
});
8383
}
84+
85+
protected handleDisconnect(error: Error): void {
86+
try {
87+
this.emit("error", error);
88+
} catch (error) {
89+
// If nothing is listening, EventEmitter will throw an error.
90+
}
91+
this.emit("close");
92+
this.emit("finish");
93+
}
8494
}
8595

8696
export class Readable<T extends IReadableProxy = IReadableProxy> extends ClientProxy<T> implements stream.Readable {
@@ -154,6 +164,16 @@ export class Readable<T extends IReadableProxy = IReadableProxy> extends ClientP
154164

155165
return this;
156166
}
167+
168+
protected handleDisconnect(error: Error): void {
169+
try {
170+
this.emit("error", error);
171+
} catch (error) {
172+
// If nothing is listening, EventEmitter will throw an error.
173+
}
174+
this.emit("close");
175+
this.emit("end");
176+
}
157177
}
158178

159179
export class Duplex<T extends DuplexProxy = DuplexProxy> extends Writable<T> implements stream.Duplex, stream.Readable {
@@ -230,4 +250,9 @@ export class Duplex<T extends DuplexProxy = DuplexProxy> extends Writable<T> imp
230250

231251
return this;
232252
}
253+
254+
protected handleDisconnect(error: Error): void {
255+
super.handleDisconnect(error);
256+
this.emit("end");
257+
}
233258
}

packages/protocol/src/common/proxy.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,11 @@ export abstract class ClientProxy<T extends ServerProxy> extends EventEmitter {
4242
this.proxy.onEvent((event, ...args): void => {
4343
this.emit(event, ...args);
4444
});
45+
this.on("disconnected", (error) => this.handleDisconnect(error));
4546
}
4647
}
48+
49+
protected abstract handleDisconnect(error: Error): void;
4750
}
4851

4952
/**

0 commit comments

Comments
 (0)