Skip to content

fix(fetch): properly release fetch during long-lived stream handling #13967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 45 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
ed91aaf
fix(fetch): ensure proper cancellation of child streams when parent s…
Lei-k Oct 12, 2024
f878695
style(fetch): fix formatting
Lei-k Oct 12, 2024
d0282f4
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 14, 2024
2c6d34e
fix: resolve multiple ESLint issues
Lei-k Oct 14, 2024
4382ce0
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 14, 2024
c7c943f
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 15, 2024
643a342
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 15, 2024
f3aaa5d
Immediate stream cancellation after timeout in `_tryGetResponseText`
Lei-k Oct 15, 2024
7f92ea3
Merge branch 'fix/fetch-not-release' of https://github.com/Lei-k/sent…
Lei-k Oct 15, 2024
9b83669
fix conflicts
Lei-k Oct 15, 2024
1d29564
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 16, 2024
80aa60d
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 16, 2024
0cf8645
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 18, 2024
6d66814
fix file formatting
Lei-k Oct 18, 2024
0d10106
Update test cases to handle new logic in fetchUtils
Lei-k Oct 18, 2024
536cc02
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 18, 2024
78ac956
feat: define whatwg's stream types
Lei-k Oct 19, 2024
7fd0ffe
fix type error for tests
Lei-k Oct 19, 2024
78b3ed3
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 23, 2024
8c99aee
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 27, 2024
5ec50f6
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 29, 2024
3923ecb
Merge branch 'getsentry:develop' into fix/fetch-not-release
Lei-k Oct 29, 2024
f13e8c2
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 30, 2024
4be816e
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 30, 2024
207ee69
Merge branch 'develop' into fix/fetch-not-release
Lei-k Oct 31, 2024
94c0e2e
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 2, 2024
3ba1c20
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 4, 2024
189848d
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 8, 2024
8139565
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 15, 2024
8780cda
Merge branch 'develop' into fix/fetch-not-release
Lei-k Nov 18, 2024
9ce2d8e
ref: Resolve merge conflict between develop and fix/fetch-not-release
Lei-k Dec 17, 2024
5c6ce1d
ref: fix typo
Lei-k Dec 17, 2024
2ed739d
ref: prettify file format
Lei-k Dec 17, 2024
02c2448
ref: fix file format
Lei-k Dec 17, 2024
b0b096a
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 17, 2024
8181799
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 17, 2024
dc69ad3
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 17, 2024
dd4505e
chore: resolve conflict from PR #14745
Lei-k Dec 18, 2024
e506037
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 18, 2024
423bd67
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 19, 2024
7f0c21b
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 19, 2024
78d4453
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 23, 2024
8137807
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 23, 2024
617fcc3
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 25, 2024
09a0fda
Merge branch 'develop' into fix/fetch-not-release
Lei-k Dec 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
"deno.enablePaths": ["packages/deno/test"],
"editor.defaultFormatter": "biomejs.biome",
"[typescript]": {
"editor.defaultFormatter": "biomejs.biome"
"editor.defaultFormatter": "vscode.typescript-language-features"
}
}
14 changes: 9 additions & 5 deletions packages/core/src/types-hoist/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ export type { StackFrame } from './stackframe';
export type { Stacktrace, StackParser, StackLineParser, StackLineParserFn } from './stacktrace';
export type { PropagationContext, TracePropagationTargets, SerializedTraceData } from './tracing';
export type { StartSpanOptions } from './startSpanOptions';
export type {
TraceparentData,
TransactionSource,
} from './transaction';
export type { TraceparentData, TransactionSource } from './transaction';
export type { CustomSamplingContext, SamplingContext } from './samplingcontext';
export type {
DurationUnit,
Expand All @@ -147,7 +144,14 @@ export type {
TransportRequestExecutor,
} from './transport';
export type { User } from './user';
export type { WebFetchHeaders, WebFetchRequest } from './webfetchapi';
export type {
WebFetchHeaders,
WebFetchRequest,
WebFetchResponse,
WebReadableStream,
WebReadableStreamDefaultReader,
WebReadableStreamReadResult,
} from './whatwg';
export type { WrappedFunction } from './wrappedfunction';
export type {
HandlerDataFetch,
Expand Down
10 changes: 2 additions & 8 deletions packages/core/src/types-hoist/instrument.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// This should be: null | Blob | BufferSource | FormData | URLSearchParams | string
// But since not all of those are available in node, we just export `unknown` here for now

import type { WebFetchHeaders } from './webfetchapi';
import type { WebFetchResponse } from './whatwg';

// Make sure to cast it where needed!
type XHRSendInput = unknown;
Expand Down Expand Up @@ -51,13 +51,7 @@ export interface HandlerDataFetch {
fetchData: SentryFetchData; // This data is among other things dumped directly onto the fetch breadcrumb data
startTimestamp: number;
endTimestamp?: number;
// This is actually `Response` - Note: this type is not complete. Add to it if necessary.
response?: {
readonly ok: boolean;
readonly status: number;
readonly url: string;
headers: WebFetchHeaders;
};
response?: WebFetchResponse;
error?: unknown;
// This is to be consumed by the HttpClient integration
virtualError?: unknown;
Expand Down
17 changes: 0 additions & 17 deletions packages/core/src/types-hoist/webfetchapi.ts

This file was deleted.

38 changes: 38 additions & 0 deletions packages/core/src/types-hoist/whatwg/fetch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// These are vendored types for the standard web fetch API types because typescript needs the DOM types to be able to understand the `Request`, `Headers`, ... types and not everybody has those.

import type { WebReadableStream } from './stream';

export interface WebFetchHeaders {
append(name: string, value: string): void;
delete(name: string): void;
get(name: string): string | null;
has(name: string): boolean;
set(name: string, value: string): void;
forEach(callbackfn: (value: string, key: string, parent: WebFetchHeaders) => void): void;
}

export interface WebFetchRequest {
readonly headers: WebFetchHeaders;
readonly method: string;
readonly url: string;
clone(): WebFetchRequest;
}

export interface WebFetchResponse {
readonly ok: boolean;
readonly status: number;
readonly statusText: string;
readonly headers: WebFetchHeaders;
readonly url: string;
readonly redirected: boolean;
readonly body: WebReadableStream | null;

clone(): WebFetchResponse;

// Methods to consume the response body
json(): Promise<any>; // Parses response as JSON
text(): Promise<string>; // Reads response body as text
arrayBuffer(): Promise<ArrayBuffer>; // Reads response body as ArrayBuffer
blob(): Promise<object>; // Reads response body as Blob
formData(): Promise<object>; // Reads response body as FormData
}
3 changes: 3 additions & 0 deletions packages/core/src/types-hoist/whatwg/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export type { WebReadableStream, WebReadableStreamDefaultReader, WebReadableStreamReadResult } from './stream';

export type { WebFetchHeaders, WebFetchRequest, WebFetchResponse } from './fetch';
23 changes: 23 additions & 0 deletions packages/core/src/types-hoist/whatwg/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
export interface WebReadableStream<R = any> {
locked: boolean; // Indicates if the stream is currently locked

cancel(reason?: any): Promise<void>; // Cancels the stream with an optional reason
getReader(): WebReadableStreamDefaultReader<R>; // Returns a reader for the stream
}

export interface WebReadableStreamDefaultReader<R = any> {
closed: boolean;
// Closes the stream and resolves the reader's lock
cancel(reason?: any): Promise<void>;

// Returns a promise with the next chunk in the stream
read(): Promise<WebReadableStreamReadResult<R>>;

// Releases the reader's lock on the stream
releaseLock(): void;
}

export interface WebReadableStreamReadResult<R = any> {
done: boolean; // True if the reader is done with the stream
value?: R; // The data chunk read from the stream (if not done)
}
134 changes: 90 additions & 44 deletions packages/core/src/utils-hoist/instrument/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import type { HandlerDataFetch } from '../../types-hoist';
import type { HandlerDataFetch, WebFetchResponse, WebReadableStreamDefaultReader } from '../../types-hoist';

import { isError } from '../is';
import { addNonEnumerableProperty, fill } from '../object';
Expand Down Expand Up @@ -117,55 +117,102 @@ function instrumentFetch(onFetchResolved?: (response: Response) => void, skipNat
});
}

async function resolveResponse(res: Response | undefined, onFinishedResolving: () => void): Promise<void> {
if (res && res.body) {
const body = res.body;
const responseReader = body.getReader();
async function resolveReader(reader: WebReadableStreamDefaultReader, onFinishedResolving: () => void): Promise<void> {
let running = true;
while (running) {
try {
// This .read() call will reject/throw when `reader.cancel()`
const { done } = await reader.read();

// Define a maximum duration after which we just cancel
const maxFetchDurationTimeout = setTimeout(
() => {
body.cancel().then(null, () => {
// noop
});
},
90 * 1000, // 90s
);

let readingActive = true;
while (readingActive) {
let chunkTimeout;
try {
// abort reading if read op takes more than 5s
chunkTimeout = setTimeout(() => {
body.cancel().then(null, () => {
// noop on error
});
}, 5000);

// This .read() call will reject/throw when we abort due to timeouts through `body.cancel()`
const { done } = await responseReader.read();

clearTimeout(chunkTimeout);
running = !done;

if (done) {
onFinishedResolving();
readingActive = false;
}
} catch (error) {
readingActive = false;
} finally {
clearTimeout(chunkTimeout);
if (done) {
onFinishedResolving();
}
} catch (_) {
running = false;
}
}
}

/**
* Resolves the body stream of a `Response` object and links its cancellation to a parent `Response` body.
*
* This function attaches a custom `cancel` behavior to both the parent `Response` body and its `getReader()` method.
* When the parent stream or its reader is canceled, it triggers the cancellation of the child stream as well.
* The function also monitors the resolution of the child's body stream using `resolveReader` and performs cleanup.
*
* @param {Response} res - The `Response` object whose body stream will be resolved.
* @param {Response} parentRes - The parent `Response` object whose body stream is linked to the cancellation of `res`.
* @param {() => void} onFinishedResolving - A callback function to be invoked when the body stream of `res` is fully resolved.
*
* Export For Test Only
*/
export function resolveResponse(
res: WebFetchResponse,
parentRes: WebFetchResponse,
onFinishedResolving: () => void,
): void {
if (!res.body || !parentRes.body) {
if (res.body) {
res.body.cancel().catch(_ => {
// noop on error
});
}

clearTimeout(maxFetchDurationTimeout);
return;
}

const body = res.body;
const parentBody = parentRes.body;
// According to the WHATWG Streams API specification, when a stream is locked by calling `getReader()`,
// invoking `stream.cancel()` will result in a TypeError.
// To cancel while the stream is locked, must use `reader.cancel()`
// @seealso: https://streams.spec.whatwg.org
const responseReader = body.getReader();

const originalCancel = parentBody.cancel.bind(parentBody) as (reason?: any) => Promise<any>;

responseReader.releaseLock();
body.cancel().then(null, () => {
// Override cancel method on parent response's body
parentBody.cancel = async (reason?: any) => {
responseReader.cancel('Cancelled by parent stream').catch(_ => {
// noop on error
});
}

await originalCancel(reason);
};

const originalGetReader = parentRes.body.getReader.bind(parentBody) as (
options: ReadableStreamGetReaderOptions,
) => ReadableStreamDefaultReader;

// Override getReader on parent response's body
parentBody.getReader = ((opts?: any) => {
const reader = originalGetReader(opts) as ReadableStreamDefaultReader;

const originalReaderCancel = reader.cancel.bind(reader) as (reason?: any) => Promise<any>;

reader.cancel = async (reason?: any) => {
responseReader.cancel('Cancelled by parent reader').catch(_ => {
// noop on error
});

await originalReaderCancel(reason);
};

return reader;
}) as any;

resolveReader(responseReader, onFinishedResolving).finally(() => {
try {
responseReader.releaseLock();
body.cancel().catch(() => {
// noop on error
});
} catch (_) {
// noop on error
}
});
}

function streamHandler(response: Response): void {
Expand All @@ -177,8 +224,7 @@ function streamHandler(response: Response): void {
return;
}

// eslint-disable-next-line @typescript-eslint/no-floating-promises
resolveResponse(clonedResponseForResolving, () => {
resolveResponse(clonedResponseForResolving as WebFetchResponse, response as WebFetchResponse, () => {
triggerHandlers('fetch-body-resolved', {
endTimestamp: timestampInSeconds() * 1000,
response,
Expand Down
Loading