Skip to content

feat(core): Add Offline Transport wrapper #6884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export { getEnvelopeEndpointWithUrlEncodedAuth, getReportDialogEndpoint } from '
export { BaseClient } from './baseclient';
export { initAndBind } from './sdk';
export { createTransport } from './transports/base';
export { makeOfflineTransport } from './transports/offline';
export { SDK_VERSION } from './version';
export { getIntegrationsToSetup } from './integration';
export { FunctionToString, InboundFilters } from './integrations';
Expand Down
146 changes: 146 additions & 0 deletions packages/core/src/transports/offline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import type { Envelope, InternalBaseTransportOptions, Transport, TransportMakeRequestResponse } from '@sentry/types';
import { logger } from '@sentry/utils';

export const START_DELAY = 5_000;
const MAX_DELAY = 2_000_000_000;
const DEFAULT_QUEUE_SIZE = 30;

function wasRateLimited(result: TransportMakeRequestResponse): boolean {
return !!(result.headers && result.headers['x-sentry-rate-limits']);
}

type BeforeSendResponse = 'send' | 'queue' | 'drop';

interface OfflineTransportOptions extends InternalBaseTransportOptions {
/**
* The maximum number of events to keep in the offline queue.
*
* Defaults: 30
*/
maxQueueSize?: number;

/**
* Flush the offline queue shortly after startup.
*
* Defaults: false
*/
flushAtStartup?: boolean;

/**
* Called when an event is queued .
*/
eventQueued?: () => void;

/**
* Called before attempting to send an event to Sentry.
*
* Return 'send' to attempt to send the event.
* Return 'queue' to queue the event for sending later.
* Return 'drop' to drop the event.
*/
beforeSend?: (request: Envelope) => BeforeSendResponse | Promise<BeforeSendResponse>;
}

interface OfflineStore {
insert(env: Envelope): Promise<void>;
pop(): Promise<Envelope | undefined>;
}

export type CreateOfflineStore = (maxQueueCount: number) => OfflineStore;

/**
* Wraps a transport and queues events when envelopes fail to send.
*
* @param createTransport The transport to wrap.
* @param createStore The store implementation to use.
*/
export function makeOfflineTransport<TO>(
createTransport: (options: TO) => Transport,
createStore: CreateOfflineStore,
): (options: TO & OfflineTransportOptions) => Transport {
return options => {
const baseTransport = createTransport(options);
const maxQueueSize = options.maxQueueSize === undefined ? DEFAULT_QUEUE_SIZE : options.maxQueueSize;
const store = createStore(maxQueueSize);

let retryDelay = START_DELAY;

function queued(): void {
if (options.eventQueued) {
options.eventQueued();
}
}

function queueRequest(envelope: Envelope): Promise<void> {
return store.insert(envelope).then(() => {
queued();

setTimeout(() => {
void flushQueue();
}, retryDelay);

retryDelay *= 3;

// If the delay is bigger than 2^31 (max signed 32-bit int), setTimeout throws
// an error on node.js and falls back to 1 which can cause a huge number of requests.
if (retryDelay > MAX_DELAY) {
retryDelay = MAX_DELAY;
}
});
}

async function flushQueue(): Promise<void> {
const found = await store.pop();

if (found) {
__DEBUG_BUILD__ && logger.info('[Offline]: Attempting to send previously queued event');
void send(found);
}
}

async function send(request: Envelope): Promise<void | TransportMakeRequestResponse> {
let action = 'send';

if (options.beforeSend) {
action = await options.beforeSend(request);
}

if (action === 'send') {
try {
const result = await baseTransport.send(request);
if (wasRateLimited(result || {})) {
__DEBUG_BUILD__ && logger.info('[Offline]: Event queued due to rate limiting');
action = 'queue';
} else {
// Envelope was successfully sent
// Reset the retry delay
retryDelay = START_DELAY;
// Check if there are any more in the queue
void flushQueue();
return result;
}
} catch (e) {
__DEBUG_BUILD__ && logger.info('[Offline]: Event queued due to error', e);
action = 'queue';
}
}

if (action == 'queue') {
void queueRequest(request);
}

return {};
}

if (options.flushAtStartup) {
setTimeout(() => {
void flushQueue();
}, retryDelay);
}

return {
send,
flush: (timeout?: number) => baseTransport.flush(timeout),
};
};
}
177 changes: 177 additions & 0 deletions packages/core/test/lib/transports/offline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import type {
Envelope,
EventEnvelope,
EventItem,
InternalBaseTransportOptions,
Transport,
TransportMakeRequestResponse,
} from '@sentry/types';
import { createEnvelope } from '@sentry/utils';
import { TextEncoder } from 'util';

import { createTransport, makeOfflineTransport } from '../../../src';
import type { CreateOfflineStore } from '../../../src/transports/offline';
import { START_DELAY } from '../../../src/transports/offline';

const ERROR_ENVELOPE = createEnvelope<EventEnvelope>({ event_id: 'aa3ff046696b4bc6b609ce6d28fde9e2', sent_at: '123' }, [
[{ type: 'event' }, { event_id: 'aa3ff046696b4bc6b609ce6d28fde9e2' }] as EventItem,
]);

const transportOptions = {
recordDroppedEvent: () => undefined, // noop
textEncoder: new TextEncoder(),
};

type MockResult<T> = T | Error;

const createTestTransport = (
...sendResults: MockResult<TransportMakeRequestResponse>[]
): { getSendCount: () => number; baseTransport: (options: InternalBaseTransportOptions) => Transport } => {
let sendCount = 0;

return {
getSendCount: () => sendCount,
baseTransport: (options: InternalBaseTransportOptions) =>
createTransport(options, () => {
return new Promise((resolve, reject) => {
const next = sendResults.shift();

if (next instanceof Error) {
reject(next);
} else {
sendCount += 1;
resolve(next as TransportMakeRequestResponse | undefined);
}
});
}),
};
};

type StoreEvents = ('add' | 'pop')[];

function createTestStore(...popResults: MockResult<Envelope | undefined>[]): {
getCalls: () => StoreEvents;
store: CreateOfflineStore;
} {
const calls: StoreEvents = [];

return {
getCalls: () => calls,
store: (maxQueueCount: number) => ({
insert: async env => {
if (popResults.length < maxQueueCount) {
popResults.push(env);
calls.push('add');
}
},
pop: async () => {
calls.push('pop');
const next = popResults.shift();

if (next instanceof Error) {
throw next;
}

return next;
},
}),
};
}

function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}

describe('makeOfflineTransport', () => {
it('Sends envelope and checks the store for further envelopes', async () => {
expect.assertions(3);

const { getCalls, store } = createTestStore();
const { getSendCount, baseTransport } = createTestTransport({ statusCode: 200 });
const transport = makeOfflineTransport(baseTransport, store)(transportOptions);
const result = await transport.send(ERROR_ENVELOPE);

expect(result).toEqual({ statusCode: 200 });
expect(getSendCount()).toEqual(1);
// After a successful send, the store should be checked
expect(getCalls()).toEqual(['pop']);
});

it('After successfully sending, sends further envelopes found in the store', async () => {
const { getCalls, store } = createTestStore(ERROR_ENVELOPE);
const { getSendCount, baseTransport } = createTestTransport({ statusCode: 200 }, { statusCode: 200 });
const transport = makeOfflineTransport(baseTransport, store)(transportOptions);
const result = await transport.send(ERROR_ENVELOPE);

expect(result).toEqual({ statusCode: 200 });

await delay(100);

expect(getSendCount()).toEqual(2);
// After a successful send, the store should be checked again to ensure it's empty
expect(getCalls()).toEqual(['pop', 'pop']);
});

it('Queues envelope if wrapped transport throws error', async () => {
const { getCalls, store } = createTestStore();
const { getSendCount, baseTransport } = createTestTransport(new Error());
const transport = makeOfflineTransport(baseTransport, store)(transportOptions);
const result = await transport.send(ERROR_ENVELOPE);

expect(result).toEqual({});

await delay(100);

expect(getSendCount()).toEqual(0);
expect(getCalls()).toEqual(['add']);
});

it('Queues envelope if rate limited', async () => {
const { getCalls, store } = createTestStore();
const { getSendCount, baseTransport } = createTestTransport({
headers: { 'x-sentry-rate-limits': 'something', 'retry-after': null },
});
const transport = makeOfflineTransport(baseTransport, store)(transportOptions);
const result = await transport.send(ERROR_ENVELOPE);
expect(result).toEqual({});

await delay(100);

expect(getSendCount()).toEqual(1);
expect(getCalls()).toEqual(['add']);
});

it(
'Retries sending envelope after failure',
async () => {
const { getCalls, store } = createTestStore();
const { getSendCount, baseTransport } = createTestTransport(new Error(), { statusCode: 200 });
const transport = makeOfflineTransport(baseTransport, store)(transportOptions);
const result = await transport.send(ERROR_ENVELOPE);
expect(result).toEqual({});
expect(getCalls()).toEqual(['add']);

await delay(START_DELAY + 1_000);

expect(getSendCount()).toEqual(1);
expect(getCalls()).toEqual(['add', 'pop', 'pop']);
},
START_DELAY + 2_000,
);

it(
'When enabled, sends envelopes found in store shortly after startup',
async () => {
const { getCalls, store } = createTestStore(ERROR_ENVELOPE, ERROR_ENVELOPE);
const { getSendCount, baseTransport } = createTestTransport({ statusCode: 200 }, { statusCode: 200 });
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const _transport = makeOfflineTransport(baseTransport, store)({ ...transportOptions, flushAtStartup: true });

await delay(START_DELAY + 1_000);

expect(getSendCount()).toEqual(2);
expect(getCalls()).toEqual(['pop', 'pop', 'pop']);
},
START_DELAY + 2_000,
);
});