Skip to content

Commit 7f8eca7

Browse files
anonrigAbhiPrasad
andauthored
feat: Add server runtime metrics aggregator (#9894)
## Implements - [x] 10 Second Bucketing: SDKs are required to bucket into 10 second intervals (rollup in seconds) which is the current lower bound of metric accuracy. - [x] Flush Shift: SDKs are required to shift the flush interval by random() * rollup_in_seconds. That shift is determined once per startup to create jittering. - [ ] Force flush: an SDK is required to perform force flushing ahead of scheduled time if the memory pressure is too high. There is no rule for this other than that SDKs should be tracking abstract aggregation complexity (eg: a counter only carries a single float, whereas a distribution is a float per emission). ## Caveats - Force flush requires Node.js 14+ support (FinalizationRegistry). I recommend leaving it after v8 release to make the implementation a lot easier. --------- Co-authored-by: Abhijeet Prasad <[email protected]>
1 parent 5d16aae commit 7f8eca7

20 files changed

+540
-137
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
"clean:deps": "lerna clean --yes && rm -rf node_modules && yarn",
1919
"clean:all": "run-s clean:build clean:caches clean:deps",
2020
"codecov": "codecov",
21-
"fix": "run-s fix:lerna fix:biome",
21+
"fix": "run-p fix:lerna fix:biome",
2222
"fix:lerna": "lerna run fix",
2323
"fix:biome": "biome check --apply .",
2424
"changelog": "ts-node ./scripts/get-commit-list.ts",
2525
"link:yarn": "lerna exec yarn link",
26-
"lint": "run-s lint:lerna lint:biome",
26+
"lint": "run-p lint:lerna lint:biome",
2727
"lint:lerna": "lerna run lint",
2828
"lint:biome": "biome check .",
2929
"validate:es5": "lerna run validate:es5",

packages/bun/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export {
7070
startInactiveSpan,
7171
startSpanManual,
7272
continueTrace,
73+
metrics,
7374
} from '@sentry/core';
7475
export type { SpanStatusType } from '@sentry/core';
7576
export { autoDiscoverNodePerformanceMonitoringIntegrations } from '@sentry/node';

packages/core/src/baseclient.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ export abstract class BaseClient<O extends ClientOptions> implements Client<O> {
407407
* @inheritDoc
408408
*/
409409
public captureAggregateMetrics(metricBucketItems: Array<MetricBucketItem>): void {
410+
DEBUG_BUILD && logger.log(`Flushing aggregated metrics, number of metrics: ${metricBucketItems.length}`);
410411
const metricsEnvelope = createMetricEnvelope(
411412
metricBucketItems,
412413
this._dsn,
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import type {
2+
Client,
3+
ClientOptions,
4+
MeasurementUnit,
5+
MetricsAggregator as MetricsAggregatorBase,
6+
Primitive,
7+
} from '@sentry/types';
8+
import { timestampInSeconds } from '@sentry/utils';
9+
import { DEFAULT_FLUSH_INTERVAL, MAX_WEIGHT, NAME_AND_TAG_KEY_NORMALIZATION_REGEX } from './constants';
10+
import { METRIC_MAP } from './instance';
11+
import type { MetricBucket, MetricType } from './types';
12+
import { getBucketKey, sanitizeTags } from './utils';
13+
14+
/**
15+
* A metrics aggregator that aggregates metrics in memory and flushes them periodically.
16+
*/
17+
export class MetricsAggregator implements MetricsAggregatorBase {
18+
// TODO(@anonrig): Use FinalizationRegistry to have a proper way of flushing the buckets
19+
// when the aggregator is garbage collected.
20+
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry
21+
private _buckets: MetricBucket;
22+
23+
// Different metrics have different weights. We use this to limit the number of metrics
24+
// that we store in memory.
25+
private _bucketsTotalWeight;
26+
27+
private readonly _interval: ReturnType<typeof setInterval>;
28+
29+
// SDKs are required to shift the flush interval by random() * rollup_in_seconds.
30+
// That shift is determined once per startup to create jittering.
31+
private readonly _flushShift: number;
32+
33+
// An SDK is required to perform force flushing ahead of scheduled time if the memory
34+
// pressure is too high. There is no rule for this other than that SDKs should be tracking
35+
// abstract aggregation complexity (eg: a counter only carries a single float, whereas a
36+
// distribution is a float per emission).
37+
//
38+
// Force flush is used on either shutdown, flush() or when we exceed the max weight.
39+
private _forceFlush: boolean;
40+
41+
public constructor(private readonly _client: Client<ClientOptions>) {
42+
this._buckets = new Map();
43+
this._bucketsTotalWeight = 0;
44+
this._interval = setInterval(() => this._flush(), DEFAULT_FLUSH_INTERVAL);
45+
this._flushShift = Math.floor((Math.random() * DEFAULT_FLUSH_INTERVAL) / 1000);
46+
this._forceFlush = false;
47+
}
48+
49+
/**
50+
* @inheritDoc
51+
*/
52+
public add(
53+
metricType: MetricType,
54+
unsanitizedName: string,
55+
value: number | string,
56+
unit: MeasurementUnit = 'none',
57+
unsanitizedTags: Record<string, Primitive> = {},
58+
maybeFloatTimestamp = timestampInSeconds(),
59+
): void {
60+
const timestamp = Math.floor(maybeFloatTimestamp);
61+
const name = unsanitizedName.replace(NAME_AND_TAG_KEY_NORMALIZATION_REGEX, '_');
62+
const tags = sanitizeTags(unsanitizedTags);
63+
64+
const bucketKey = getBucketKey(metricType, name, unit, tags);
65+
let bucketItem = this._buckets.get(bucketKey);
66+
if (bucketItem) {
67+
bucketItem.metric.add(value);
68+
// TODO(abhi): Do we need this check?
69+
if (bucketItem.timestamp < timestamp) {
70+
bucketItem.timestamp = timestamp;
71+
}
72+
} else {
73+
bucketItem = {
74+
// @ts-expect-error we don't need to narrow down the type of value here, saves bundle size.
75+
metric: new METRIC_MAP[metricType](value),
76+
timestamp,
77+
metricType,
78+
name,
79+
unit,
80+
tags,
81+
};
82+
this._buckets.set(bucketKey, bucketItem);
83+
}
84+
85+
// We need to keep track of the total weight of the buckets so that we can
86+
// flush them when we exceed the max weight.
87+
this._bucketsTotalWeight += bucketItem.metric.weight;
88+
89+
if (this._bucketsTotalWeight >= MAX_WEIGHT) {
90+
this.flush();
91+
}
92+
}
93+
94+
/**
95+
* Flushes the current metrics to the transport via the transport.
96+
*/
97+
public flush(): void {
98+
this._forceFlush = true;
99+
this._flush();
100+
}
101+
102+
/**
103+
* Shuts down metrics aggregator and clears all metrics.
104+
*/
105+
public close(): void {
106+
this._forceFlush = true;
107+
clearInterval(this._interval);
108+
this._flush();
109+
}
110+
111+
/**
112+
* Flushes the buckets according to the internal state of the aggregator.
113+
* If it is a force flush, which happens on shutdown, it will flush all buckets.
114+
* Otherwise, it will only flush buckets that are older than the flush interval,
115+
* and according to the flush shift.
116+
*
117+
* This function mutates `_forceFlush` and `_bucketsTotalWeight` properties.
118+
*/
119+
private _flush(): void {
120+
// TODO(@anonrig): Add Atomics for locking to avoid having force flush and regular flush
121+
// running at the same time.
122+
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics
123+
124+
// This path eliminates the need for checking for timestamps since we're forcing a flush.
125+
// Remember to reset the flag, or it will always flush all metrics.
126+
if (this._forceFlush) {
127+
this._forceFlush = false;
128+
this._bucketsTotalWeight = 0;
129+
this._captureMetrics(this._buckets);
130+
this._buckets.clear();
131+
return;
132+
}
133+
const cutoffSeconds = Math.floor(timestampInSeconds()) - DEFAULT_FLUSH_INTERVAL / 1000 - this._flushShift;
134+
// TODO(@anonrig): Optimization opportunity.
135+
// Convert this map to an array and store key in the bucketItem.
136+
const flushedBuckets: MetricBucket = new Map();
137+
for (const [key, bucket] of this._buckets) {
138+
if (bucket.timestamp <= cutoffSeconds) {
139+
flushedBuckets.set(key, bucket);
140+
this._bucketsTotalWeight -= bucket.metric.weight;
141+
}
142+
}
143+
144+
for (const [key] of flushedBuckets) {
145+
this._buckets.delete(key);
146+
}
147+
148+
this._captureMetrics(flushedBuckets);
149+
}
150+
151+
/**
152+
* Only captures a subset of the buckets passed to this function.
153+
* @param flushedBuckets
154+
*/
155+
private _captureMetrics(flushedBuckets: MetricBucket): void {
156+
if (flushedBuckets.size > 0 && this._client.captureAggregateMetrics) {
157+
// TODO(@anonrig): Optimization opportunity.
158+
// This copy operation can be avoided if we store the key in the bucketItem.
159+
const buckets = Array.from(flushedBuckets).map(([, bucketItem]) => bucketItem);
160+
this._client.captureAggregateMetrics(buckets);
161+
}
162+
}
163+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import type {
2+
Client,
3+
ClientOptions,
4+
MeasurementUnit,
5+
MetricBucketItem,
6+
MetricsAggregator,
7+
Primitive,
8+
} from '@sentry/types';
9+
import { timestampInSeconds } from '@sentry/utils';
10+
import { DEFAULT_BROWSER_FLUSH_INTERVAL, NAME_AND_TAG_KEY_NORMALIZATION_REGEX } from './constants';
11+
import { METRIC_MAP } from './instance';
12+
import type { MetricBucket, MetricType } from './types';
13+
import { getBucketKey, sanitizeTags } from './utils';
14+
15+
/**
16+
* A simple metrics aggregator that aggregates metrics in memory and flushes them periodically.
17+
* Default flush interval is 5 seconds.
18+
*
19+
* @experimental This API is experimental and might change in the future.
20+
*/
21+
export class BrowserMetricsAggregator implements MetricsAggregator {
22+
// TODO(@anonrig): Use FinalizationRegistry to have a proper way of flushing the buckets
23+
// when the aggregator is garbage collected.
24+
// Ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry
25+
private _buckets: MetricBucket;
26+
private readonly _interval: ReturnType<typeof setInterval>;
27+
28+
public constructor(private readonly _client: Client<ClientOptions>) {
29+
this._buckets = new Map();
30+
this._interval = setInterval(() => this.flush(), DEFAULT_BROWSER_FLUSH_INTERVAL);
31+
}
32+
33+
/**
34+
* @inheritDoc
35+
*/
36+
public add(
37+
metricType: MetricType,
38+
unsanitizedName: string,
39+
value: number | string,
40+
unit: MeasurementUnit | undefined = 'none',
41+
unsanitizedTags: Record<string, Primitive> | undefined = {},
42+
maybeFloatTimestamp: number | undefined = timestampInSeconds(),
43+
): void {
44+
const timestamp = Math.floor(maybeFloatTimestamp);
45+
const name = unsanitizedName.replace(NAME_AND_TAG_KEY_NORMALIZATION_REGEX, '_');
46+
const tags = sanitizeTags(unsanitizedTags);
47+
48+
const bucketKey = getBucketKey(metricType, name, unit, tags);
49+
const bucketItem: MetricBucketItem | undefined = this._buckets.get(bucketKey);
50+
if (bucketItem) {
51+
bucketItem.metric.add(value);
52+
// TODO(abhi): Do we need this check?
53+
if (bucketItem.timestamp < timestamp) {
54+
bucketItem.timestamp = timestamp;
55+
}
56+
} else {
57+
this._buckets.set(bucketKey, {
58+
// @ts-expect-error we don't need to narrow down the type of value here, saves bundle size.
59+
metric: new METRIC_MAP[metricType](value),
60+
timestamp,
61+
metricType,
62+
name,
63+
unit,
64+
tags,
65+
});
66+
}
67+
}
68+
69+
/**
70+
* @inheritDoc
71+
*/
72+
public flush(): void {
73+
// short circuit if buckets are empty.
74+
if (this._buckets.size === 0) {
75+
return;
76+
}
77+
if (this._client.captureAggregateMetrics) {
78+
// TODO(@anonrig): Use Object.values() when we support ES6+
79+
const metricBuckets = Array.from(this._buckets).map(([, bucketItem]) => bucketItem);
80+
this._client.captureAggregateMetrics(metricBuckets);
81+
}
82+
this._buckets.clear();
83+
}
84+
85+
/**
86+
* @inheritDoc
87+
*/
88+
public close(): void {
89+
clearInterval(this._interval);
90+
this.flush();
91+
}
92+
}

packages/core/src/metrics/constants.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,15 @@ export const TAG_VALUE_NORMALIZATION_REGEX = /[^\w\d_:/@.{}[\]$-]+/g;
2727
* This does not match spec in https://develop.sentry.dev/sdk/metrics
2828
* but was chosen to optimize for the most common case in browser environments.
2929
*/
30-
export const DEFAULT_FLUSH_INTERVAL = 5000;
30+
export const DEFAULT_BROWSER_FLUSH_INTERVAL = 5000;
31+
32+
/**
33+
* SDKs are required to bucket into 10 second intervals (rollup in seconds)
34+
* which is the current lower bound of metric accuracy.
35+
*/
36+
export const DEFAULT_FLUSH_INTERVAL = 10000;
37+
38+
/**
39+
* The maximum number of metrics that should be stored in memory.
40+
*/
41+
export const MAX_WEIGHT = 10000;

packages/core/src/metrics/envelope.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export function createMetricEnvelope(
3030
return createEnvelope<StatsdEnvelope>(headers, [item]);
3131
}
3232

33-
function createMetricEnvelopeItem(metricBucketItems: Array<MetricBucketItem>): StatsdItem {
33+
function createMetricEnvelopeItem(metricBucketItems: MetricBucketItem[]): StatsdItem {
3434
const payload = serializeMetricBuckets(metricBucketItems);
3535
const metricHeaders: StatsdItem[0] = {
3636
type: 'statsd',

packages/core/src/metrics/exports.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ function addToMetricsAggregator(
1717
metricType: MetricType,
1818
name: string,
1919
value: number | string,
20-
data: MetricData = {},
20+
data: MetricData | undefined = {},
2121
): void {
2222
const client = getClient<BaseClient<ClientOptions>>();
2323
const scope = getCurrentScope();
@@ -49,7 +49,7 @@ function addToMetricsAggregator(
4949
/**
5050
* Adds a value to a counter metric
5151
*
52-
* @experimental This API is experimental and might having breaking changes in the future.
52+
* @experimental This API is experimental and might have breaking changes in the future.
5353
*/
5454
export function increment(name: string, value: number = 1, data?: MetricData): void {
5555
addToMetricsAggregator(COUNTER_METRIC_TYPE, name, value, data);
@@ -58,7 +58,7 @@ export function increment(name: string, value: number = 1, data?: MetricData): v
5858
/**
5959
* Adds a value to a distribution metric
6060
*
61-
* @experimental This API is experimental and might having breaking changes in the future.
61+
* @experimental This API is experimental and might have breaking changes in the future.
6262
*/
6363
export function distribution(name: string, value: number, data?: MetricData): void {
6464
addToMetricsAggregator(DISTRIBUTION_METRIC_TYPE, name, value, data);
@@ -67,7 +67,7 @@ export function distribution(name: string, value: number, data?: MetricData): vo
6767
/**
6868
* Adds a value to a set metric. Value must be a string or integer.
6969
*
70-
* @experimental This API is experimental and might having breaking changes in the future.
70+
* @experimental This API is experimental and might have breaking changes in the future.
7171
*/
7272
export function set(name: string, value: number | string, data?: MetricData): void {
7373
addToMetricsAggregator(SET_METRIC_TYPE, name, value, data);
@@ -76,7 +76,7 @@ export function set(name: string, value: number | string, data?: MetricData): vo
7676
/**
7777
* Adds a value to a gauge metric
7878
*
79-
* @experimental This API is experimental and might having breaking changes in the future.
79+
* @experimental This API is experimental and might have breaking changes in the future.
8080
*/
8181
export function gauge(name: string, value: number, data?: MetricData): void {
8282
addToMetricsAggregator(GAUGE_METRIC_TYPE, name, value, data);

0 commit comments

Comments
 (0)