Skip to content

Handle disconnects #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions packages/protocol/src/browser/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,21 @@ export class Client {
message.setResponse(stringify(error));
this.failEmitter.emit(message);

this.eventEmitter.emit({ event: "exit", args: [1] });
this.eventEmitter.emit({ event: "close", args: [] });
try {
this.eventEmitter.emit({ event: "error", args: [error] });
} catch (error) {
// If nothing is listening, EventEmitter will throw an error.
}
this.eventEmitter.emit({ event: "done", args: [true] });
this.eventEmitter.emit({ event: "disconnected", args: [error] });
this.eventEmitter.emit({ event: "done", args: [] });
};

connection.onDown(() => handleDisconnect());
connection.onClose(() => {
clearTimeout(this.pingTimeout as any);
this.pingTimeout = undefined;
handleDisconnect();
this.proxies.clear();
this.successEmitter.dispose();
this.failEmitter.dispose();
this.eventEmitter.dispose();
this.initDataEmitter.dispose();
this.sharedProcessActiveEmitter.dispose();
});
connection.onUp(() => this.disconnected = false);

Expand All @@ -174,8 +174,17 @@ export class Client {
* Make a remote call for a proxy's method using proto.
*/
private remoteCall(proxyId: number | Module, method: string, args: any[]): Promise<any> {
if (this.disconnected) {
return Promise.reject(new Error("disconnected"));
if (this.disconnected && typeof proxyId === "number") {
// Can assume killing or closing works because a disconnected proxy
// is disposed on the server's side.
switch (method) {
case "close":
case "kill":
return Promise.resolve();
}
return Promise.reject(
new Error(`Unable to call "${method}" on proxy ${proxyId}: disconnected`),
);
}

const message = new MethodMessage();
Expand Down Expand Up @@ -223,7 +232,7 @@ export class Client {

// The server will send back a fail or success message when the method
// has completed, so we listen for that based on the message's unique ID.
const promise = new Promise((resolve, reject): void => {
const promise = new Promise((resolve, reject): void => {
const dispose = (): void => {
d1.dispose();
d2.dispose();
Expand All @@ -237,7 +246,7 @@ export class Client {

const d1 = this.successEmitter.event(id, (message) => {
dispose();
resolve(this.parse(message.getResponse()));
resolve(this.parse(message.getResponse(), promise));
});

const d2 = this.failEmitter.event(id, (message) => {
Expand Down Expand Up @@ -450,12 +459,12 @@ export class Client {
callbacks: new Map(),
});

instance.onDone((disconnected: boolean) => {
instance.onDone(() => {
const log = (): void => {
logger.trace(() => [
typeof proxyId === "number" ? "disposed proxy" : "disposed proxy callbacks",
field("proxyId", proxyId),
field("disconnected", disconnected),
field("disconnected", this.disconnected),
field("callbacks", Array.from(this.proxies.values()).reduce((count, proxy) => count + proxy.callbacks.size, 0)),
field("success listeners", this.successEmitter.counts),
field("fail listeners", this.failEmitter.counts),
Expand All @@ -471,7 +480,7 @@ export class Client {
this.eventEmitter.dispose(proxyId);
log();
};
if (!disconnected) {
if (!this.disconnected) {
instance.dispose().then(dispose).catch(dispose);
} else {
dispose();
Expand Down
5 changes: 5 additions & 0 deletions packages/protocol/src/browser/modules/child_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.C

return true; // Always true since we can't get this synchronously.
}

protected handleDisconnect(): void {
this.emit("exit", 1);
this.emit("close");
}
}

export class ChildProcessModule {
Expand Down
4 changes: 4 additions & 0 deletions packages/protocol/src/browser/modules/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class Watcher extends ClientProxy<WatcherProxy> implements fs.FSWatcher {
public close(): void {
this.proxy.close();
}

protected handleDisconnect(): void {
this.emit("close");
}
}

class WriteStream extends Writable<WriteStreamProxy> implements fs.WriteStream {
Expand Down
12 changes: 11 additions & 1 deletion packages/protocol/src/browser/modules/net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,20 @@ export class Socket extends Duplex<NetSocketProxy> implements net.Socket {
}

export class Server extends ClientProxy<NetServerProxy> implements net.Server {
private socketId = 0;
private readonly sockets = new Map<number, net.Socket>();
private _listening: boolean = false;

public constructor(proxyPromise: Promise<NetServerProxy> | NetServerProxy) {
super(proxyPromise);

this.proxy.onConnection((socketProxy) => {
this.emit("connection", new Socket(socketProxy));
const socket = new Socket(socketProxy);
const socketId = this.socketId++;
this.sockets.set(socketId, socket);
socket.on("error", () => this.sockets.delete(socketId))
socket.on("close", () => this.sockets.delete(socketId))
this.emit("connection", socket);
});

this.on("listening", () => this._listening = true);
Expand Down Expand Up @@ -200,6 +206,10 @@ export class Server extends ClientProxy<NetServerProxy> implements net.Server {
public getConnections(cb: (error: Error | null, count: number) => void): void {
cb(null, this.sockets.size);
}

protected handleDisconnect(): void {
this.emit("close");
}
}

type NodeNet = typeof net;
Expand Down
23 changes: 19 additions & 4 deletions packages/protocol/src/browser/modules/node-pty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@ export class NodePtyProcess extends ClientProxy<NodePtyProcessProxy> implements
private _pid = -1;
private _process = "";

public constructor(proxyPromise: Promise<NodePtyProcessProxy>) {
super(proxyPromise);
public constructor(
private readonly moduleProxy: NodePtyModuleProxy,
private readonly file: string,
private readonly args: string[] | string,
private readonly options: pty.IPtyForkOptions,
) {
super(moduleProxy.spawn(file, args, options));
this.on("process", (process) => this._process = process);
}

protected initialize(proxyPromise: Promise<NodePtyProcessProxy>) {
super.initialize(proxyPromise);
this.proxy.getPid().then((pid) => this._pid = pid);
this.proxy.getProcess().then((process) => this._process = process);
this.on("process", (process) => this._process = process);
}

public get pid(): number {
Expand All @@ -32,6 +41,12 @@ export class NodePtyProcess extends ClientProxy<NodePtyProcessProxy> implements
public kill(signal?: string): void {
this.proxy.kill(signal);
}

protected handleDisconnect(): void {
this._process += " (disconnected)";
this.emit("data", "\r\n\nLost connection...\r\n\n");
this.initialize(this.moduleProxy.spawn(this.file, this.args, this.options));
}
}

type NodePty = typeof pty;
Expand All @@ -40,6 +55,6 @@ export class NodePtyModule implements NodePty {
public constructor(private readonly proxy: NodePtyModuleProxy) {}

public spawn = (file: string, args: string[] | string, options: pty.IPtyForkOptions): pty.IPty => {
return new NodePtyProcess(this.proxy.spawn(file, args, options));
return new NodePtyProcess(this.proxy, file, args, options);
}
}
16 changes: 15 additions & 1 deletion packages/protocol/src/browser/modules/spdlog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ import { ClientProxy } from "../../common/proxy";
import { RotatingLoggerProxy, SpdlogModuleProxy } from "../../node/modules/spdlog";

class RotatingLogger extends ClientProxy<RotatingLoggerProxy> implements spdlog.RotatingLogger {
public constructor(
private readonly moduleProxy: SpdlogModuleProxy,
private readonly name: string,
private readonly filename: string,
private readonly filesize: number,
private readonly filecount: number,
) {
super(moduleProxy.createLogger(name, filename, filesize, filecount));
}

public async trace (message: string): Promise<void> { this.proxy.trace(message); }
public async debug (message: string): Promise<void> { this.proxy.debug(message); }
public async info (message: string): Promise<void> { this.proxy.info(message); }
Expand All @@ -13,6 +23,10 @@ class RotatingLogger extends ClientProxy<RotatingLoggerProxy> implements spdlog.
public async clearFormatters (): Promise<void> { this.proxy.clearFormatters(); }
public async flush (): Promise<void> { this.proxy.flush(); }
public async drop (): Promise<void> { this.proxy.drop(); }

protected handleDisconnect(): void {
this.initialize(this.moduleProxy.createLogger(this.name, this.filename, this.filesize, this.filecount));
}
}

export class SpdlogModule {
Expand All @@ -21,7 +35,7 @@ export class SpdlogModule {
public constructor(private readonly proxy: SpdlogModuleProxy) {
this.RotatingLogger = class extends RotatingLogger {
public constructor(name: string, filename: string, filesize: number, filecount: number) {
super(proxy.createLogger(name, filename, filesize, filecount));
super(proxy, name, filename, filesize, filecount);
}
};
}
Expand Down
15 changes: 15 additions & 0 deletions packages/protocol/src/browser/modules/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ export class Writable<T extends WritableProxy = WritableProxy> extends ClientPro
}
});
}

protected handleDisconnect(): void {
this.emit("close");
this.emit("finish");
}
}

export class Readable<T extends IReadableProxy = IReadableProxy> extends ClientProxy<T> implements stream.Readable {
Expand Down Expand Up @@ -154,6 +159,11 @@ export class Readable<T extends IReadableProxy = IReadableProxy> extends ClientP

return this;
}

protected handleDisconnect(): void {
this.emit("close");
this.emit("end");
}
}

export class Duplex<T extends DuplexProxy = DuplexProxy> extends Writable<T> implements stream.Duplex, stream.Readable {
Expand Down Expand Up @@ -230,4 +240,9 @@ export class Duplex<T extends DuplexProxy = DuplexProxy> extends Writable<T> imp

return this;
}

protected handleDisconnect(): void {
super.handleDisconnect();
this.emit("end");
}
}
35 changes: 31 additions & 4 deletions packages/protocol/src/common/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,48 @@ const unpromisify = <T extends ServerProxy>(proxyPromise: Promise<T>): T => {
* need a bunch of `then` calls everywhere.
*/
export abstract class ClientProxy<T extends ServerProxy> extends EventEmitter {
protected readonly proxy: T;
private _proxy: T | undefined;

/**
* You can specify not to bind events in order to avoid emitting twice for
* duplex streams.
*/
public constructor(proxyPromise: Promise<T> | T, bindEvents: boolean = true) {
public constructor(
proxyPromise: Promise<T> | T,
private readonly bindEvents: boolean = true,
) {
super();
this.proxy = isPromise(proxyPromise) ? unpromisify(proxyPromise) : proxyPromise;
if (bindEvents) {
this.initialize(proxyPromise);
if (this.bindEvents) {
this.on("disconnected", (error) => {
try {
this.emit("error", error);
} catch (error) {
// If nothing is listening, EventEmitter will throw an error.
}
this.handleDisconnect();
});
}
}

protected get proxy(): T {
if (!this._proxy) {
throw new Error("not initialized");
}

return this._proxy;
}

protected initialize(proxyPromise: Promise<T> | T): void {
this._proxy = isPromise(proxyPromise) ? unpromisify(proxyPromise) : proxyPromise;
if (this.bindEvents) {
this.proxy.onEvent((event, ...args): void => {
this.emit(event, ...args);
});
}
}

protected abstract handleDisconnect(): void;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/protocol/src/node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export class Server {
try {
const proxy = this.getProxy(proxyId);
if (typeof proxy.instance[method] !== "function") {
throw new Error(`"${method}" is not a function`);
throw new Error(`"${method}" is not a function on proxy ${proxyId}`);
}

response = proxy.instance[method](...args);
Expand Down
2 changes: 2 additions & 0 deletions packages/vscode/src/workbench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import { ServiceCollection } from "vs/platform/instantiation/common/serviceColle
import { URI } from "vs/base/common/uri";

export class Workbench {
public readonly retry = client.retry;

private readonly windowId = parseInt(new Date().toISOString().replace(/[-:.TZ]/g, ""), 10);
private _serviceCollection: ServiceCollection | undefined;
private _clipboardContextKey: RawContextKey<boolean> | undefined;
Expand Down
Loading