-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: Add server runtime metrics aggregator #9894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
import type { | ||
Client, | ||
ClientOptions, | ||
MeasurementUnit, | ||
MetricsAggregator as MetricsAggregatorBase, | ||
Primitive, | ||
} from '@sentry/types'; | ||
import { timestampInSeconds } from '@sentry/utils'; | ||
import { DEFAULT_FLUSH_INTERVAL, MAX_WEIGHT, NAME_AND_TAG_KEY_NORMALIZATION_REGEX } from './constants'; | ||
import { METRIC_MAP } from './instance'; | ||
import type { MetricBucket, MetricType } from './types'; | ||
import { getBucketKey, sanitizeTags } from './utils'; | ||
|
||
/** | ||
* A metrics aggregator that aggregates metrics in memory and flushes them periodically. | ||
*/ | ||
export class MetricsAggregator implements MetricsAggregatorBase { | ||
// TODO(@anonrig): Use FinalizationRegistry to have a proper way of flushing the buckets | ||
// when the aggregator is garbage collected. | ||
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry | ||
private _buckets: MetricBucket; | ||
|
||
// Different metrics have different weights. We use this to limit the number of metrics | ||
// that we store in memory. | ||
private _bucketsTotalWeight; | ||
|
||
private readonly _interval: ReturnType<typeof setInterval>; | ||
|
||
// SDKs are required to shift the flush interval by random() * rollup_in_seconds. | ||
// That shift is determined once per startup to create jittering. | ||
private readonly _flushShift: number; | ||
|
||
// An SDK is required to perform force flushing ahead of scheduled time if the memory | ||
// pressure is too high. There is no rule for this other than that SDKs should be tracking | ||
// abstract aggregation complexity (eg: a counter only carries a single float, whereas a | ||
// distribution is a float per emission). | ||
// | ||
// Force flush is used on either shutdown, flush() or when we exceed the max weight. | ||
private _forceFlush: boolean; | ||
|
||
public constructor(private readonly _client: Client<ClientOptions>) { | ||
this._buckets = new Map(); | ||
this._bucketsTotalWeight = 0; | ||
this._interval = setInterval(() => this._flush(), DEFAULT_FLUSH_INTERVAL); | ||
this._flushShift = Math.floor((Math.random() * DEFAULT_FLUSH_INTERVAL) / 1000); | ||
this._forceFlush = false; | ||
} | ||
|
||
/** | ||
* @inheritDoc | ||
*/ | ||
public add( | ||
metricType: MetricType, | ||
unsanitizedName: string, | ||
value: number | string, | ||
unit: MeasurementUnit = 'none', | ||
unsanitizedTags: Record<string, Primitive> = {}, | ||
maybeFloatTimestamp = timestampInSeconds(), | ||
): void { | ||
const timestamp = Math.floor(maybeFloatTimestamp); | ||
const name = unsanitizedName.replace(NAME_AND_TAG_KEY_NORMALIZATION_REGEX, '_'); | ||
const tags = sanitizeTags(unsanitizedTags); | ||
|
||
const bucketKey = getBucketKey(metricType, name, unit, tags); | ||
let bucketItem = this._buckets.get(bucketKey); | ||
if (bucketItem) { | ||
bucketItem.metric.add(value); | ||
// TODO(abhi): Do we need this check? | ||
if (bucketItem.timestamp < timestamp) { | ||
bucketItem.timestamp = timestamp; | ||
} | ||
} else { | ||
bucketItem = { | ||
// @ts-expect-error we don't need to narrow down the type of value here, saves bundle size. | ||
metric: new METRIC_MAP[metricType](value), | ||
timestamp, | ||
metricType, | ||
name, | ||
unit, | ||
tags, | ||
}; | ||
this._buckets.set(bucketKey, bucketItem); | ||
} | ||
|
||
// We need to keep track of the total weight of the buckets so that we can | ||
// flush them when we exceed the max weight. | ||
this._bucketsTotalWeight += bucketItem.metric.weight; | ||
|
||
if (this._bucketsTotalWeight >= MAX_WEIGHT) { | ||
this.flush(); | ||
} | ||
} | ||
|
||
/** | ||
* Flushes the current metrics to the transport via the transport. | ||
*/ | ||
public flush(): void { | ||
this._forceFlush = true; | ||
this._flush(); | ||
} | ||
|
||
/** | ||
* Shuts down metrics aggregator and clears all metrics. | ||
*/ | ||
public close(): void { | ||
this._forceFlush = true; | ||
clearInterval(this._interval); | ||
this._flush(); | ||
} | ||
|
||
/** | ||
* Flushes the buckets according to the internal state of the aggregator. | ||
* If it is a force flush, which happens on shutdown, it will flush all buckets. | ||
* Otherwise, it will only flush buckets that are older than the flush interval, | ||
* and according to the flush shift. | ||
* | ||
* This function mutates `_forceFlush` and `_bucketsTotalWeight` properties. | ||
*/ | ||
private _flush(): void { | ||
// TODO(@anonrig): Add Atomics for locking to avoid having force flush and regular flush | ||
// running at the same time. | ||
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics | ||
|
||
// This path eliminates the need for checking for timestamps since we're forcing a flush. | ||
// Remember to reset the flag, or it will always flush all metrics. | ||
if (this._forceFlush) { | ||
this._forceFlush = false; | ||
this._bucketsTotalWeight = 0; | ||
this._captureMetrics(this._buckets); | ||
this._buckets.clear(); | ||
return; | ||
} | ||
const cutoffSeconds = Math.floor(timestampInSeconds()) - DEFAULT_FLUSH_INTERVAL / 1000 - this._flushShift; | ||
// TODO(@anonrig): Optimization opportunity. | ||
// Convert this map to an array and store key in the bucketItem. | ||
const flushedBuckets: MetricBucket = new Map(); | ||
for (const [key, bucket] of this._buckets) { | ||
if (bucket.timestamp <= cutoffSeconds) { | ||
flushedBuckets.set(key, bucket); | ||
this._bucketsTotalWeight -= bucket.metric.weight; | ||
} | ||
} | ||
|
||
for (const [key] of flushedBuckets) { | ||
this._buckets.delete(key); | ||
} | ||
|
||
this._captureMetrics(flushedBuckets); | ||
} | ||
|
||
/** | ||
* Only captures a subset of the buckets passed to this function. | ||
* @param flushedBuckets | ||
*/ | ||
private _captureMetrics(flushedBuckets: MetricBucket): void { | ||
if (flushedBuckets.size > 0 && this._client.captureAggregateMetrics) { | ||
// TODO(@anonrig): Optimization opportunity. | ||
// This copy operation can be avoided if we store the key in the bucketItem. | ||
const buckets = Array.from(flushedBuckets).map(([, bucketItem]) => bucketItem); | ||
this._client.captureAggregateMetrics(buckets); | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import type { | ||
Client, | ||
ClientOptions, | ||
MeasurementUnit, | ||
MetricBucketItem, | ||
MetricsAggregator, | ||
Primitive, | ||
} from '@sentry/types'; | ||
import { timestampInSeconds } from '@sentry/utils'; | ||
import { DEFAULT_BROWSER_FLUSH_INTERVAL, NAME_AND_TAG_KEY_NORMALIZATION_REGEX } from './constants'; | ||
import { METRIC_MAP } from './instance'; | ||
import type { MetricBucket, MetricType } from './types'; | ||
import { getBucketKey, sanitizeTags } from './utils'; | ||
|
||
/** | ||
* A simple metrics aggregator that aggregates metrics in memory and flushes them periodically. | ||
* Default flush interval is 5 seconds. | ||
* | ||
* @experimental This API is experimental and might change in the future. | ||
*/ | ||
export class BrowserMetricsAggregator implements MetricsAggregator { | ||
// TODO(@anonrig): Use FinalizationRegistry to have a proper way of flushing the buckets | ||
// when the aggregator is garbage collected. | ||
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry | ||
private _buckets: MetricBucket; | ||
private readonly _interval: ReturnType<typeof setInterval>; | ||
|
||
public constructor(private readonly _client: Client<ClientOptions>) { | ||
this._buckets = new Map(); | ||
this._interval = setInterval(() => this.flush(), DEFAULT_BROWSER_FLUSH_INTERVAL); | ||
} | ||
|
||
/** | ||
* @inheritDoc | ||
*/ | ||
public add( | ||
metricType: MetricType, | ||
unsanitizedName: string, | ||
value: number | string, | ||
unit: MeasurementUnit | undefined = 'none', | ||
unsanitizedTags: Record<string, Primitive> | undefined = {}, | ||
maybeFloatTimestamp: number | undefined = timestampInSeconds(), | ||
): void { | ||
const timestamp = Math.floor(maybeFloatTimestamp); | ||
const name = unsanitizedName.replace(NAME_AND_TAG_KEY_NORMALIZATION_REGEX, '_'); | ||
const tags = sanitizeTags(unsanitizedTags); | ||
|
||
const bucketKey = getBucketKey(metricType, name, unit, tags); | ||
const bucketItem: MetricBucketItem | undefined = this._buckets.get(bucketKey); | ||
if (bucketItem) { | ||
bucketItem.metric.add(value); | ||
// TODO(abhi): Do we need this check? | ||
if (bucketItem.timestamp < timestamp) { | ||
bucketItem.timestamp = timestamp; | ||
} | ||
} else { | ||
this._buckets.set(bucketKey, { | ||
// @ts-expect-error we don't need to narrow down the type of value here, saves bundle size. | ||
metric: new METRIC_MAP[metricType](value), | ||
timestamp, | ||
metricType, | ||
name, | ||
unit, | ||
tags, | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* @inheritDoc | ||
*/ | ||
public flush(): void { | ||
// short circuit if buckets are empty. | ||
if (this._buckets.size === 0) { | ||
return; | ||
} | ||
if (this._client.captureAggregateMetrics) { | ||
// TODO(@anonrig): Use Object.values() when we support ES6+ | ||
const metricBuckets = Array.from(this._buckets).map(([, bucketItem]) => bucketItem); | ||
this._client.captureAggregateMetrics(metricBuckets); | ||
} | ||
this._buckets.clear(); | ||
} | ||
|
||
/** | ||
* @inheritDoc | ||
*/ | ||
public close(): void { | ||
clearInterval(this._interval); | ||
this.flush(); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ export function createMetricEnvelope( | |
return createEnvelope<StatsdEnvelope>(headers, [item]); | ||
} | ||
|
||
function createMetricEnvelopeItem(metricBucketItems: Array<MetricBucketItem>): StatsdItem { | ||
function createMetricEnvelopeItem(metricBucketItems: MetricBucketItem[]): StatsdItem { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so nit, I always prefer Maybe we should enforce one vs. other with lint rule. |
||
const payload = serializeMetricBuckets(metricBucketItems); | ||
const metricHeaders: StatsdItem[0] = { | ||
type: 'statsd', | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.