Skip to content

Commit b93b02b

Browse files
committed
Improve
1 parent 9a252ca commit b93b02b

File tree

2 files changed

+257
-88
lines changed

2 files changed

+257
-88
lines changed
Lines changed: 93 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,48 @@
11
import type { Envelope, InternalBaseTransportOptions, Transport, TransportMakeRequestResponse } from '@sentry/types';
2-
import { logger } from '@sentry/utils';
2+
import { forEachEnvelopeItem, logger, parseRetryAfterHeader } from '@sentry/utils';
33

4-
export const START_DELAY = 5_000;
5-
const MAX_DELAY = 2_000_000_000;
4+
export const MIN_DELAY = 100; // 100 ms
5+
export const START_DELAY = 5_000; // 5 seconds
6+
const MAX_DELAY = 3.6e6; // 1 hour
67
const DEFAULT_QUEUE_SIZE = 30;
78

8-
function wasRateLimited(result: TransportMakeRequestResponse): boolean {
9-
return !!(result.headers && result.headers['x-sentry-rate-limits']);
10-
}
9+
function isReplayEnvelope(envelope: Envelope): boolean {
10+
let isReplay = false;
11+
12+
forEachEnvelopeItem(envelope, (_, type) => {
13+
if (type === 'replay_event') {
14+
isReplay = true;
15+
}
16+
});
1117

12-
type BeforeSendResponse = 'send' | 'queue' | 'drop';
18+
return isReplay;
19+
}
1320

1421
interface OfflineTransportOptions extends InternalBaseTransportOptions {
1522
/**
16-
* The maximum number of events to keep in the offline queue.
23+
* The maximum number of events to keep in the offline store.
1724
*
1825
* Defaults: 30
1926
*/
2027
maxQueueSize?: number;
2128

2229
/**
23-
* Flush the offline queue shortly after startup.
30+
* Flush the offline store shortly after startup.
2431
*
2532
* Defaults: false
2633
*/
2734
flushAtStartup?: boolean;
2835

2936
/**
30-
* Called when an event is queued .
31-
*/
32-
eventQueued?: () => void;
33-
34-
/**
35-
* Called before attempting to send an event to Sentry.
37+
* Called before an event is stored.
3638
*
37-
* Return 'send' to attempt to send the event.
38-
* Return 'queue' to queue the event for sending later.
39-
* Return 'drop' to drop the event.
39+
* Return false to drop the envelope rather than store it.
40+
*
41+
* @param envelope The envelope that failed to send.
42+
* @param error The error that occurred.
43+
* @param retryDelay The current retry delay in milliseconds.
4044
*/
41-
beforeSend?: (request: Envelope) => BeforeSendResponse | Promise<BeforeSendResponse>;
45+
shouldStore?: (envelope: Envelope, error: Error, retryDelay: number) => boolean | Promise<boolean>;
4246
}
4347

4448
interface OfflineStore {
@@ -48,8 +52,10 @@ interface OfflineStore {
4852

4953
export type CreateOfflineStore = (maxQueueCount: number) => OfflineStore;
5054

55+
type Timer = number | { unref?: () => void };
56+
5157
/**
52-
* Wraps a transport and queues events when envelopes fail to send.
58+
* Wraps a transport and stores and retries events when they fail to send.
5359
*
5460
* @param createTransport The transport to wrap.
5561
* @param createStore The store implementation to use.
@@ -59,88 +65,104 @@ export function makeOfflineTransport<TO>(
5965
createStore: CreateOfflineStore,
6066
): (options: TO & OfflineTransportOptions) => Transport {
6167
return options => {
62-
const baseTransport = createTransport(options);
68+
const transport = createTransport(options);
6369
const maxQueueSize = options.maxQueueSize === undefined ? DEFAULT_QUEUE_SIZE : options.maxQueueSize;
6470
const store = createStore(maxQueueSize);
6571

6672
let retryDelay = START_DELAY;
73+
let flushTimer: Timer | undefined;
6774

68-
function queued(): void {
69-
if (options.eventQueued) {
70-
options.eventQueued();
71-
}
75+
function log(msg: string, error?: Error): void {
76+
__DEBUG_BUILD__ && logger.info(`[Offline]: ${msg}`, error);
7277
}
7378

74-
function queueRequest(envelope: Envelope): Promise<void> {
75-
return store.insert(envelope).then(() => {
76-
queued();
79+
function shouldQueue(env: Envelope, error: Error, retryDelay: number): boolean | Promise<boolean> {
80+
if (isReplayEnvelope(env)) {
81+
return false;
82+
}
7783

78-
setTimeout(() => {
79-
void flushQueue();
80-
}, retryDelay);
84+
if (options.shouldStore) {
85+
return options.shouldStore(env, error, retryDelay);
86+
}
8187

82-
retryDelay *= 3;
88+
return true;
89+
}
8390

84-
// If the delay is bigger than 2^31 (max signed 32-bit int), setTimeout throws
85-
// an error on node.js and falls back to 1 which can cause a huge number of requests.
86-
if (retryDelay > MAX_DELAY) {
87-
retryDelay = MAX_DELAY;
91+
function flushIn(delay: number): void {
92+
if (flushTimer) {
93+
clearTimeout(flushTimer as ReturnType<typeof setTimeout>);
94+
}
95+
96+
flushTimer = setTimeout(async () => {
97+
flushTimer = undefined;
98+
99+
const found = await store.pop();
100+
if (found) {
101+
log('Attempting to send previously queued event');
102+
void send(found).catch(e => {
103+
log('Failed to retry sending', e);
104+
});
88105
}
89-
});
106+
}, delay) as Timer;
107+
108+
// We need to unref the timer in node.js, otherwise the node process never exit.
109+
if (typeof flushTimer !== 'number' && typeof flushTimer.unref === 'function') {
110+
flushTimer.unref();
111+
}
90112
}
91113

92-
async function flushQueue(): Promise<void> {
93-
const found = await store.pop();
114+
function flushWithBackOff(): void {
115+
if (flushTimer) {
116+
return;
117+
}
118+
119+
flushIn(retryDelay);
120+
121+
retryDelay *= 2;
94122

95-
if (found) {
96-
__DEBUG_BUILD__ && logger.info('[Offline]: Attempting to send previously queued event');
97-
void send(found);
123+
if (retryDelay > MAX_DELAY) {
124+
retryDelay = MAX_DELAY;
98125
}
99126
}
100127

101-
async function send(request: Envelope): Promise<void | TransportMakeRequestResponse> {
102-
let action = 'send';
128+
async function send(envelope: Envelope): Promise<void | TransportMakeRequestResponse> {
129+
try {
130+
const result = await transport.send(envelope);
103131

104-
if (options.beforeSend) {
105-
action = await options.beforeSend(request);
106-
}
132+
let delay = MIN_DELAY;
107133

108-
if (action === 'send') {
109-
try {
110-
const result = await baseTransport.send(request);
111-
if (wasRateLimited(result || {})) {
112-
__DEBUG_BUILD__ && logger.info('[Offline]: Event queued due to rate limiting');
113-
action = 'queue';
114-
} else {
115-
// Envelope was successfully sent
116-
// Reset the retry delay
117-
retryDelay = START_DELAY;
118-
// Check if there are any more in the queue
119-
void flushQueue();
134+
if (result) {
135+
// If there's a retry-after header, use that as the next delay.
136+
if (result.headers && result.headers['retry-after']) {
137+
delay = parseRetryAfterHeader(result.headers['retry-after']);
138+
} // If we have a server error, return now so we don't flush the queue.
139+
else if ((result.statusCode || 0) >= 400) {
120140
return result;
121141
}
122-
} catch (e) {
123-
__DEBUG_BUILD__ && logger.info('[Offline]: Event queued due to error', e);
124-
action = 'queue';
125142
}
126-
}
127143

128-
if (action == 'queue') {
129-
void queueRequest(request);
144+
flushIn(delay);
145+
retryDelay = START_DELAY;
146+
return result;
147+
} catch (e) {
148+
if (await shouldQueue(envelope, e, retryDelay)) {
149+
await store.insert(envelope);
150+
flushWithBackOff();
151+
log('Error sending. Event queued', e);
152+
return {};
153+
} else {
154+
throw e;
155+
}
130156
}
131-
132-
return {};
133157
}
134158

135159
if (options.flushAtStartup) {
136-
setTimeout(() => {
137-
void flushQueue();
138-
}, retryDelay);
160+
flushWithBackOff();
139161
}
140162

141163
return {
142164
send,
143-
flush: (timeout?: number) => baseTransport.flush(timeout),
165+
flush: (timeout?: number) => transport.flush(timeout),
144166
};
145167
};
146168
}

0 commit comments

Comments
 (0)