-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(core): Add Offline Transport wrapper #6884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 4 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
3ba2356
in progress
timfish ef57767
Merge remote-tracking branch 'upstream/master' into feat/offline
timfish 26e5b0c
Simplify a bit and add some tests
timfish 9a252ca
Fix linting
timfish b93b02b
Improve
timfish 5f1af25
Review changes
timfish f86f4e6
Make store a wrapper
timfish c0938e4
createStore needs to be optional
timfish 8070337
Remove redundant jsdoc param
timfish fd9bb07
Review changes and add some exports
timfish File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import type { Envelope, InternalBaseTransportOptions, Transport, TransportMakeRequestResponse } from '@sentry/types'; | ||
import { logger } from '@sentry/utils'; | ||
|
||
export const START_DELAY = 5_000; | ||
const MAX_DELAY = 2_000_000_000; | ||
const DEFAULT_QUEUE_SIZE = 30; | ||
|
||
function wasRateLimited(result: TransportMakeRequestResponse): boolean { | ||
return !!(result.headers && result.headers['x-sentry-rate-limits']); | ||
} | ||
|
||
type BeforeSendResponse = 'send' | 'queue' | 'drop'; | ||
|
||
interface OfflineTransportOptions extends InternalBaseTransportOptions { | ||
/** | ||
* The maximum number of events to keep in the offline queue. | ||
* | ||
* Defaults: 30 | ||
*/ | ||
maxQueueSize?: number; | ||
|
||
/** | ||
* Flush the offline queue shortly after startup. | ||
* | ||
* Defaults: false | ||
*/ | ||
flushAtStartup?: boolean; | ||
|
||
/** | ||
* Called when an event is queued . | ||
*/ | ||
eventQueued?: () => void; | ||
|
||
/** | ||
* Called before attempting to send an event to Sentry. | ||
* | ||
* Return 'send' to attempt to send the event. | ||
* Return 'queue' to queue the event for sending later. | ||
* Return 'drop' to drop the event. | ||
*/ | ||
beforeSend?: (request: Envelope) => BeforeSendResponse | Promise<BeforeSendResponse>; | ||
} | ||
|
||
interface OfflineStore { | ||
insert(env: Envelope): Promise<void>; | ||
pop(): Promise<Envelope | undefined>; | ||
} | ||
|
||
export type CreateOfflineStore = (maxQueueCount: number) => OfflineStore; | ||
|
||
/** | ||
* Wraps a transport and queues events when envelopes fail to send. | ||
* | ||
* @param createTransport The transport to wrap. | ||
* @param createStore The store implementation to use. | ||
*/ | ||
export function makeOfflineTransport<TO>( | ||
createTransport: (options: TO) => Transport, | ||
createStore: CreateOfflineStore, | ||
): (options: TO & OfflineTransportOptions) => Transport { | ||
return options => { | ||
const baseTransport = createTransport(options); | ||
const maxQueueSize = options.maxQueueSize === undefined ? DEFAULT_QUEUE_SIZE : options.maxQueueSize; | ||
const store = createStore(maxQueueSize); | ||
|
||
let retryDelay = START_DELAY; | ||
|
||
function queued(): void { | ||
if (options.eventQueued) { | ||
options.eventQueued(); | ||
} | ||
} | ||
|
||
function queueRequest(envelope: Envelope): Promise<void> { | ||
return store.insert(envelope).then(() => { | ||
queued(); | ||
|
||
setTimeout(() => { | ||
void flushQueue(); | ||
}, retryDelay); | ||
|
||
retryDelay *= 3; | ||
|
||
// If the delay is bigger than 2^31 (max signed 32-bit int), setTimeout throws | ||
// an error on node.js and falls back to 1 which can cause a huge number of requests. | ||
if (retryDelay > MAX_DELAY) { | ||
retryDelay = MAX_DELAY; | ||
} | ||
}); | ||
} | ||
|
||
async function flushQueue(): Promise<void> { | ||
const found = await store.pop(); | ||
|
||
if (found) { | ||
__DEBUG_BUILD__ && logger.info('[Offline]: Attempting to send previously queued event'); | ||
void send(found); | ||
} | ||
} | ||
|
||
async function send(request: Envelope): Promise<void | TransportMakeRequestResponse> { | ||
let action = 'send'; | ||
|
||
if (options.beforeSend) { | ||
action = await options.beforeSend(request); | ||
} | ||
|
||
if (action === 'send') { | ||
try { | ||
const result = await baseTransport.send(request); | ||
if (wasRateLimited(result || {})) { | ||
__DEBUG_BUILD__ && logger.info('[Offline]: Event queued due to rate limiting'); | ||
action = 'queue'; | ||
} else { | ||
// Envelope was successfully sent | ||
// Reset the retry delay | ||
retryDelay = START_DELAY; | ||
// Check if there are any more in the queue | ||
void flushQueue(); | ||
return result; | ||
} | ||
} catch (e) { | ||
__DEBUG_BUILD__ && logger.info('[Offline]: Event queued due to error', e); | ||
action = 'queue'; | ||
} | ||
} | ||
|
||
if (action == 'queue') { | ||
void queueRequest(request); | ||
} | ||
|
||
return {}; | ||
} | ||
|
||
if (options.flushAtStartup) { | ||
setTimeout(() => { | ||
void flushQueue(); | ||
}, retryDelay); | ||
} | ||
|
||
return { | ||
send, | ||
flush: (timeout?: number) => baseTransport.flush(timeout), | ||
}; | ||
}; | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
import type { | ||
Envelope, | ||
EventEnvelope, | ||
EventItem, | ||
InternalBaseTransportOptions, | ||
Transport, | ||
TransportMakeRequestResponse, | ||
} from '@sentry/types'; | ||
import { createEnvelope } from '@sentry/utils'; | ||
import { TextEncoder } from 'util'; | ||
|
||
import { createTransport, makeOfflineTransport } from '../../../src'; | ||
import type { CreateOfflineStore } from '../../../src/transports/offline'; | ||
import { START_DELAY } from '../../../src/transports/offline'; | ||
|
||
const ERROR_ENVELOPE = createEnvelope<EventEnvelope>({ event_id: 'aa3ff046696b4bc6b609ce6d28fde9e2', sent_at: '123' }, [ | ||
[{ type: 'event' }, { event_id: 'aa3ff046696b4bc6b609ce6d28fde9e2' }] as EventItem, | ||
]); | ||
|
||
const transportOptions = { | ||
recordDroppedEvent: () => undefined, // noop | ||
textEncoder: new TextEncoder(), | ||
}; | ||
|
||
type MockResult<T> = T | Error; | ||
|
||
const createTestTransport = ( | ||
...sendResults: MockResult<TransportMakeRequestResponse>[] | ||
): { getSendCount: () => number; baseTransport: (options: InternalBaseTransportOptions) => Transport } => { | ||
let sendCount = 0; | ||
|
||
return { | ||
getSendCount: () => sendCount, | ||
baseTransport: (options: InternalBaseTransportOptions) => | ||
createTransport(options, () => { | ||
return new Promise((resolve, reject) => { | ||
const next = sendResults.shift(); | ||
|
||
if (next instanceof Error) { | ||
reject(next); | ||
} else { | ||
sendCount += 1; | ||
resolve(next as TransportMakeRequestResponse | undefined); | ||
} | ||
}); | ||
}), | ||
}; | ||
}; | ||
|
||
type StoreEvents = ('add' | 'pop')[]; | ||
|
||
function createTestStore(...popResults: MockResult<Envelope | undefined>[]): { | ||
getCalls: () => StoreEvents; | ||
store: CreateOfflineStore; | ||
} { | ||
const calls: StoreEvents = []; | ||
|
||
return { | ||
getCalls: () => calls, | ||
store: (maxQueueCount: number) => ({ | ||
insert: async env => { | ||
if (popResults.length < maxQueueCount) { | ||
popResults.push(env); | ||
calls.push('add'); | ||
} | ||
}, | ||
pop: async () => { | ||
calls.push('pop'); | ||
const next = popResults.shift(); | ||
|
||
if (next instanceof Error) { | ||
throw next; | ||
} | ||
|
||
return next; | ||
}, | ||
}), | ||
}; | ||
} | ||
|
||
function delay(ms: number): Promise<void> { | ||
return new Promise(resolve => setTimeout(resolve, ms)); | ||
} | ||
|
||
describe('makeOfflineTransport', () => { | ||
it('Sends envelope and checks the store for further envelopes', async () => { | ||
expect.assertions(3); | ||
|
||
const { getCalls, store } = createTestStore(); | ||
const { getSendCount, baseTransport } = createTestTransport({ statusCode: 200 }); | ||
const transport = makeOfflineTransport(baseTransport, store)(transportOptions); | ||
const result = await transport.send(ERROR_ENVELOPE); | ||
|
||
expect(result).toEqual({ statusCode: 200 }); | ||
expect(getSendCount()).toEqual(1); | ||
// After a successful send, the store should be checked | ||
expect(getCalls()).toEqual(['pop']); | ||
}); | ||
|
||
it('After successfully sending, sends further envelopes found in the store', async () => { | ||
const { getCalls, store } = createTestStore(ERROR_ENVELOPE); | ||
const { getSendCount, baseTransport } = createTestTransport({ statusCode: 200 }, { statusCode: 200 }); | ||
const transport = makeOfflineTransport(baseTransport, store)(transportOptions); | ||
const result = await transport.send(ERROR_ENVELOPE); | ||
|
||
expect(result).toEqual({ statusCode: 200 }); | ||
|
||
await delay(100); | ||
|
||
expect(getSendCount()).toEqual(2); | ||
// After a successful send, the store should be checked again to ensure it's empty | ||
expect(getCalls()).toEqual(['pop', 'pop']); | ||
}); | ||
|
||
it('Queues envelope if wrapped transport throws error', async () => { | ||
const { getCalls, store } = createTestStore(); | ||
const { getSendCount, baseTransport } = createTestTransport(new Error()); | ||
const transport = makeOfflineTransport(baseTransport, store)(transportOptions); | ||
const result = await transport.send(ERROR_ENVELOPE); | ||
|
||
expect(result).toEqual({}); | ||
|
||
await delay(100); | ||
|
||
expect(getSendCount()).toEqual(0); | ||
expect(getCalls()).toEqual(['add']); | ||
}); | ||
|
||
it('Queues envelope if rate limited', async () => { | ||
const { getCalls, store } = createTestStore(); | ||
const { getSendCount, baseTransport } = createTestTransport({ | ||
headers: { 'x-sentry-rate-limits': 'something', 'retry-after': null }, | ||
}); | ||
const transport = makeOfflineTransport(baseTransport, store)(transportOptions); | ||
const result = await transport.send(ERROR_ENVELOPE); | ||
expect(result).toEqual({}); | ||
|
||
await delay(100); | ||
|
||
expect(getSendCount()).toEqual(1); | ||
expect(getCalls()).toEqual(['add']); | ||
}); | ||
|
||
it( | ||
'Retries sending envelope after failure', | ||
async () => { | ||
const { getCalls, store } = createTestStore(); | ||
const { getSendCount, baseTransport } = createTestTransport(new Error(), { statusCode: 200 }); | ||
const transport = makeOfflineTransport(baseTransport, store)(transportOptions); | ||
const result = await transport.send(ERROR_ENVELOPE); | ||
expect(result).toEqual({}); | ||
expect(getCalls()).toEqual(['add']); | ||
|
||
await delay(START_DELAY + 1_000); | ||
|
||
expect(getSendCount()).toEqual(1); | ||
expect(getCalls()).toEqual(['add', 'pop', 'pop']); | ||
}, | ||
START_DELAY + 2_000, | ||
); | ||
|
||
it( | ||
'When enabled, sends envelopes found in store shortly after startup', | ||
async () => { | ||
const { getCalls, store } = createTestStore(ERROR_ENVELOPE, ERROR_ENVELOPE); | ||
const { getSendCount, baseTransport } = createTestTransport({ statusCode: 200 }, { statusCode: 200 }); | ||
// eslint-disable-next-line @typescript-eslint/no-unused-vars | ||
const _transport = makeOfflineTransport(baseTransport, store)({ ...transportOptions, flushAtStartup: true }); | ||
|
||
await delay(START_DELAY + 1_000); | ||
|
||
expect(getSendCount()).toEqual(2); | ||
expect(getCalls()).toEqual(['pop', 'pop', 'pop']); | ||
}, | ||
START_DELAY + 2_000, | ||
); | ||
}); |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.