Skip to content

Commit be2add9

Browse files
authored
fix: event handler leak in the wrapRpc function (#733)
We were adding the close event listener but removing the exit one. This commit fixes this leak + few others bugs in this file and adds unit tests to prevent regressions. Closes: #723
1 parent cb327ee commit be2add9

File tree

3 files changed

+245
-51
lines changed

3 files changed

+245
-51
lines changed

src/rpc/wrap-rpc.ts

Lines changed: 62 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { ChildProcess } from 'child_process';
2-
import * as process from 'process';
2+
3+
import { createControlledPromise } from '../utils/async/controlled-promise';
34

45
import { RpcExitError } from './rpc-error';
56
import type { RpcRemoteMethod, RpcMessage } from './types';
@@ -17,61 +18,71 @@ export function wrapRpc<T extends (...args: any[]) => any>(
1718

1819
const id = uuid();
1920

20-
const resultPromise = new Promise((resolve, reject) => {
21-
const handleMessage = (message: RpcMessage) => {
22-
if (message.id === id) {
23-
if (message.type === 'resolve') {
24-
resolve(message.value);
25-
unsubscribe();
26-
} else if (message.type === 'reject') {
27-
reject(message.error);
28-
unsubscribe();
29-
}
30-
}
31-
};
32-
const handleClose = (code: string | number | null, signal: string | null) => {
33-
reject(
34-
new RpcExitError(
35-
code
36-
? `Process ${process.pid} exited with code "${code}" [${signal}]`
37-
: `Process ${process.pid} exited [${signal}].`,
38-
code,
39-
signal
40-
)
41-
);
42-
unsubscribe();
43-
};
21+
// create promises
22+
const {
23+
promise: resultPromise,
24+
resolve: resolveResult,
25+
reject: rejectResult,
26+
} = createControlledPromise<T>();
27+
const {
28+
promise: sendPromise,
29+
resolve: resolveSend,
30+
reject: rejectSend,
31+
} = createControlledPromise<void>();
4432

45-
const subscribe = () => {
46-
childProcess.on('message', handleMessage);
47-
childProcess.on('close', handleClose);
48-
};
49-
const unsubscribe = () => {
50-
childProcess.off('message', handleMessage);
51-
childProcess.off('exit', handleClose);
52-
};
33+
const handleMessage = (message: RpcMessage) => {
34+
if (message?.id === id) {
35+
if (message.type === 'resolve') {
36+
// assume the contract is respected
37+
resolveResult(message.value as T);
38+
removeHandlers();
39+
} else if (message.type === 'reject') {
40+
rejectResult(message.error);
41+
removeHandlers();
42+
}
43+
}
44+
};
45+
const handleClose = (code: string | number | null, signal: string | null) => {
46+
rejectResult(
47+
new RpcExitError(
48+
code
49+
? `Process ${childProcess.pid} exited with code ${code}` +
50+
(signal ? ` [${signal}]` : '')
51+
: `Process ${childProcess.pid} exited` + (signal ? ` [${signal}]` : ''),
52+
code,
53+
signal
54+
)
55+
);
56+
removeHandlers();
57+
};
5358

54-
subscribe();
55-
});
59+
// to prevent event handler leaks
60+
const removeHandlers = () => {
61+
childProcess.off('message', handleMessage);
62+
childProcess.off('close', handleClose);
63+
};
5664

57-
await new Promise<void>((resolve, reject) => {
58-
childProcess.send(
59-
{
60-
type: 'call',
61-
id,
62-
args,
63-
},
64-
(error) => {
65-
if (error) {
66-
reject(error);
67-
} else {
68-
resolve(undefined);
69-
}
65+
// add event listeners
66+
childProcess.on('message', handleMessage);
67+
childProcess.on('close', handleClose);
68+
// send call message
69+
childProcess.send(
70+
{
71+
type: 'call',
72+
id,
73+
args,
74+
},
75+
(error) => {
76+
if (error) {
77+
rejectSend(error);
78+
removeHandlers();
79+
} else {
80+
resolveSend(undefined);
7081
}
71-
);
72-
});
82+
}
83+
);
7384

74-
return resultPromise;
85+
return sendPromise.then(() => resultPromise);
7586
}) as RpcRemoteMethod<T>;
7687
}
7788

src/utils/async/controlled-promise.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
function createControlledPromise<T = unknown>() {
2+
let resolve: (value: T) => void = () => undefined;
3+
let reject: (error: unknown) => void = () => undefined;
4+
const promise = new Promise<T>((aResolve, aReject) => {
5+
resolve = aResolve;
6+
reject = aReject;
7+
});
8+
9+
return {
10+
promise,
11+
resolve,
12+
reject,
13+
};
14+
}
15+
16+
export { createControlledPromise };

test/unit/rpc/wrap-rpc.spec.ts

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import type { ChildProcess } from 'child_process';
2+
3+
import { RpcExitError, wrapRpc } from '../../../src/rpc';
4+
5+
describe('wrapRpc', () => {
6+
let childProcessMock: ChildProcess;
7+
let eventHandlers: Record<string, Array<(...args: unknown[]) => void>>;
8+
let messageIds: string[];
9+
10+
beforeEach(() => {
11+
eventHandlers = {};
12+
messageIds = [];
13+
childProcessMock = {
14+
connected: true,
15+
pid: 1234,
16+
send: jest.fn((message, callback) => {
17+
messageIds.push(message?.id);
18+
callback();
19+
}),
20+
on: jest.fn((name, handlerToAdd) => {
21+
if (!eventHandlers[name]) {
22+
eventHandlers[name] = [];
23+
}
24+
eventHandlers[name].push(handlerToAdd);
25+
}),
26+
off: jest.fn((name, handlerToRemove) => {
27+
if (!eventHandlers[name]) {
28+
return;
29+
}
30+
eventHandlers[name] = eventHandlers[name].filter((handler) => handler !== handlerToRemove);
31+
}),
32+
// we don't have to implement all methods - it would take a lot of code to do so
33+
} as unknown as ChildProcess;
34+
});
35+
36+
it('returns new functions without adding event handlers', () => {
37+
const wrapped = wrapRpc(childProcessMock);
38+
expect(wrapped).toBeInstanceOf(Function);
39+
expect(eventHandlers).toEqual({});
40+
});
41+
42+
it("throws an error if child process doesn't have IPC channels", async () => {
43+
childProcessMock.send = undefined;
44+
const wrapped = wrapRpc(childProcessMock);
45+
await expect(wrapped()).rejects.toEqual(new Error("Process 1234 doesn't have IPC channels"));
46+
expect(eventHandlers).toEqual({});
47+
});
48+
49+
it("throws an error if child process doesn't have open IPC channels", async () => {
50+
// @ts-expect-error We're using mock here :)
51+
childProcessMock.connected = false;
52+
const wrapped = wrapRpc(childProcessMock);
53+
await expect(wrapped()).rejects.toEqual(
54+
new Error("Process 1234 doesn't have open IPC channels")
55+
);
56+
expect(eventHandlers).toEqual({});
57+
});
58+
59+
it('sends a call message', async () => {
60+
const wrapped = wrapRpc(childProcessMock);
61+
wrapped('foo', 1234);
62+
expect(childProcessMock.send).toHaveBeenCalledWith(
63+
{
64+
type: 'call',
65+
id: expect.any(String),
66+
args: ['foo', 1234],
67+
},
68+
expect.any(Function)
69+
);
70+
expect(eventHandlers).toEqual({
71+
message: [expect.any(Function)],
72+
close: [expect.any(Function)],
73+
});
74+
});
75+
76+
it('ignores invalid message', async () => {
77+
const wrapped = wrapRpc<() => void>(childProcessMock);
78+
wrapped();
79+
expect(messageIds).toEqual([expect.any(String)]);
80+
expect(eventHandlers['message']).toEqual([expect.any(Function)]);
81+
const triggerMessage = eventHandlers['message'][0];
82+
83+
triggerMessage(undefined);
84+
triggerMessage('test');
85+
triggerMessage({});
86+
triggerMessage({ id: 'test' });
87+
88+
expect(eventHandlers).toEqual({
89+
message: [expect.any(Function)],
90+
close: [expect.any(Function)],
91+
});
92+
});
93+
94+
it('resolves on valid resolve message', async () => {
95+
const wrapped = wrapRpc<() => void>(childProcessMock);
96+
const promise = wrapped();
97+
expect(messageIds).toEqual([expect.any(String)]);
98+
expect(eventHandlers['message']).toEqual([expect.any(Function)]);
99+
const triggerMessage = eventHandlers['message'][0];
100+
const id = messageIds[0];
101+
102+
triggerMessage({
103+
id,
104+
type: 'resolve',
105+
value: 41,
106+
});
107+
108+
expect(promise).resolves.toEqual(41);
109+
expect(eventHandlers).toEqual({
110+
message: [],
111+
close: [],
112+
});
113+
});
114+
115+
it('rejects on valid reject message', async () => {
116+
const wrapped = wrapRpc<() => void>(childProcessMock);
117+
const promise = wrapped();
118+
expect(messageIds).toEqual([expect.any(String)]);
119+
expect(eventHandlers['message']).toEqual([expect.any(Function)]);
120+
const triggerMessage = eventHandlers['message'][0];
121+
const id = messageIds[0];
122+
123+
triggerMessage({
124+
id,
125+
type: 'reject',
126+
error: 'sad error',
127+
});
128+
129+
expect(promise).rejects.toEqual('sad error');
130+
expect(eventHandlers).toEqual({
131+
message: [],
132+
close: [],
133+
});
134+
});
135+
136+
it('rejects on send error', async () => {
137+
(childProcessMock.send as jest.Mock).mockImplementation((message, callback) =>
138+
callback(new Error('cannot send'))
139+
);
140+
const wrapped = wrapRpc<() => void>(childProcessMock);
141+
142+
expect(wrapped()).rejects.toEqual(new Error('cannot send'));
143+
expect(eventHandlers).toEqual({
144+
message: [],
145+
close: [],
146+
});
147+
});
148+
149+
it.each([
150+
{ code: 100, signal: 'SIGINT', message: 'Process 1234 exited with code 100 [SIGINT]' },
151+
{ code: -1, signal: undefined, message: 'Process 1234 exited with code -1' },
152+
{ code: undefined, signal: undefined, message: 'Process 1234 exited' },
153+
])('rejects on process close with %p', async ({ code, signal, message }) => {
154+
const wrapped = wrapRpc<() => void>(childProcessMock);
155+
const promise = wrapped();
156+
expect(eventHandlers['close']).toEqual([expect.any(Function)]);
157+
const triggerClose = eventHandlers['close'][0];
158+
159+
triggerClose(code, signal);
160+
161+
expect(promise).rejects.toEqual(new RpcExitError(message, code, signal));
162+
expect(eventHandlers).toEqual({
163+
message: [],
164+
close: [],
165+
});
166+
});
167+
});

0 commit comments

Comments
 (0)