@@ -520,6 +520,21 @@ impl BucketValue {
520
520
Self :: Distribution ( m) => m. internal_size ( ) ,
521
521
}
522
522
}
523
+
524
+ /// Estimates the number of bytes needed to encode the bucket.
525
+ /// Note that this does not necessarily match the exact memory footprint of the bucket,
526
+ /// because datastructures might have a memory overhead.
527
+ ///
528
+ /// This is very similar to [`BucketValue::relative_size`], which can possibly be removed.
529
+ pub fn cost ( & self ) -> usize {
530
+ match self {
531
+ Self :: Counter ( _) => 8 ,
532
+ Self :: Set ( s) => 4 * s. len ( ) ,
533
+ Self :: Gauge ( _) => 5 * 8 ,
534
+ // Distribution values are stored as maps of (f64, u32) pairs
535
+ Self :: Distribution ( m) => 12 * m. internal_size ( ) ,
536
+ }
537
+ }
523
538
}
524
539
525
540
impl From < MetricValue > for BucketValue {
@@ -537,7 +552,7 @@ impl From<MetricValue> for BucketValue {
537
552
///
538
553
/// Currently either a [`MetricValue`] or another `BucketValue`.
539
554
trait MergeValue : Into < BucketValue > {
540
- /// Merges `self` into the given `bucket_value`.
555
+ /// Merges `self` into the given `bucket_value` and returns the additional cost for storing this value .
541
556
///
542
557
/// Aggregation is performed according to the rules documented in [`BucketValue`].
543
558
fn merge_into ( self , bucket_value : & mut BucketValue ) -> Result < ( ) , AggregateMetricsError > ;
@@ -1018,6 +1033,48 @@ enum AggregatorState {
1018
1033
ShuttingDown ,
1019
1034
}
1020
1035
1036
+ #[ derive( Debug , Default ) ]
1037
+ struct CostTracker {
1038
+ total_cost : usize ,
1039
+ // Choosing a BTreeMap instead of a HashMap here, under the assumption that a BTreeMap
1040
+ // is still more efficient for the number of project keys we store.
1041
+ cost_per_project_key : BTreeMap < ProjectKey , usize > ,
1042
+ }
1043
+
1044
+ impl CostTracker {
1045
+ fn add_cost ( & mut self , project_key : ProjectKey , cost : usize ) {
1046
+ self . total_cost += cost;
1047
+ let project_cost = self . cost_per_project_key . entry ( project_key) . or_insert ( 0 ) ;
1048
+ * project_cost += cost;
1049
+ }
1050
+
1051
+ fn subtract_cost ( & mut self , project_key : ProjectKey , cost : usize ) {
1052
+ match self . cost_per_project_key . entry ( project_key) {
1053
+ btree_map:: Entry :: Vacant ( _) => {
1054
+ relay_log:: error!(
1055
+ "Trying to subtract cost for a project key that has not been tracked"
1056
+ ) ;
1057
+ }
1058
+ btree_map:: Entry :: Occupied ( mut entry) => {
1059
+ // Handle per-project cost:
1060
+ let project_cost = entry. get_mut ( ) ;
1061
+ if cost > * project_cost {
1062
+ relay_log:: error!( "Subtracting a project cost higher than what we tracked" ) ;
1063
+ self . total_cost = self . total_cost . saturating_sub ( * project_cost) ;
1064
+ * project_cost = 0 ;
1065
+ } else {
1066
+ * project_cost -= cost;
1067
+ self . total_cost = self . total_cost . saturating_sub ( cost) ;
1068
+ }
1069
+ if * project_cost == 0 {
1070
+ // Remove this project_key from the map
1071
+ entry. remove ( ) ;
1072
+ }
1073
+ }
1074
+ } ;
1075
+ }
1076
+ }
1077
+
1021
1078
/// A collector of [`Metric`] submissions.
1022
1079
///
1023
1080
/// # Aggregation
@@ -1074,6 +1131,7 @@ pub struct Aggregator {
1074
1131
buckets : HashMap < BucketKey , QueuedBucket > ,
1075
1132
receiver : Recipient < FlushBuckets > ,
1076
1133
state : AggregatorState ,
1134
+ cost_tracker : CostTracker ,
1077
1135
}
1078
1136
1079
1137
impl Aggregator {
@@ -1087,6 +1145,7 @@ impl Aggregator {
1087
1145
buckets : HashMap :: new ( ) ,
1088
1146
receiver,
1089
1147
state : AggregatorState :: Running ,
1148
+ cost_tracker : CostTracker :: default ( ) ,
1090
1149
}
1091
1150
}
1092
1151
@@ -1200,14 +1259,19 @@ impl Aggregator {
1200
1259
1201
1260
let key = Self :: validate_bucket_key ( key, & self . config ) ?;
1202
1261
1262
+ let added_cost;
1203
1263
match self . buckets . entry ( key) {
1204
1264
Entry :: Occupied ( mut entry) => {
1205
1265
relay_statsd:: metric!(
1206
1266
counter( MetricCounters :: MergeHit ) += 1 ,
1207
1267
metric_type = entry. key( ) . metric_type. as_str( ) ,
1208
1268
metric_name = & entry. key( ) . metric_name
1209
1269
) ;
1210
- value. merge_into ( & mut entry. get_mut ( ) . value ) ?;
1270
+ let bucket_value = & mut entry. get_mut ( ) . value ;
1271
+ let cost_before = bucket_value. cost ( ) ;
1272
+ value. merge_into ( bucket_value) ?;
1273
+ let cost_after = bucket_value. cost ( ) ;
1274
+ added_cost = cost_after. saturating_sub ( cost_before) ;
1211
1275
}
1212
1276
Entry :: Vacant ( entry) => {
1213
1277
relay_statsd:: metric!(
@@ -1222,10 +1286,14 @@ impl Aggregator {
1222
1286
) ;
1223
1287
1224
1288
let flush_at = self . config . get_flush_time ( timestamp, project_key) ;
1225
- entry. insert ( QueuedBucket :: new ( flush_at, value. into ( ) ) ) ;
1289
+ let bucket = value. into ( ) ;
1290
+ added_cost = bucket. cost ( ) ;
1291
+ entry. insert ( QueuedBucket :: new ( flush_at, bucket) ) ;
1226
1292
}
1227
1293
}
1228
1294
1295
+ self . cost_tracker . add_cost ( project_key, added_cost) ;
1296
+
1229
1297
Ok ( ( ) )
1230
1298
}
1231
1299
@@ -1299,18 +1367,32 @@ impl Aggregator {
1299
1367
pub fn pop_flush_buckets ( & mut self ) -> HashMap < ProjectKey , Vec < Bucket > > {
1300
1368
relay_statsd:: metric!( gauge( MetricGauges :: Buckets ) = self . buckets. len( ) as u64 ) ;
1301
1369
1370
+ // We only emit statsd metrics for the cost on flush (and not when merging the buckets),
1371
+ // assuming that this gives us more than enough data points.
1372
+ relay_statsd:: metric!(
1373
+ gauge( MetricGauges :: BucketsCost ) = self . cost_tracker. total_cost as u64
1374
+ ) ;
1375
+ for cost in self . cost_tracker . cost_per_project_key . values ( ) {
1376
+ relay_statsd:: metric!(
1377
+ histogram( MetricHistograms :: BucketsCostPerProjectKey ) = * cost as f64
1378
+ ) ;
1379
+ }
1380
+
1302
1381
let mut buckets = HashMap :: < ProjectKey , Vec < Bucket > > :: new ( ) ;
1303
1382
1304
1383
let force = matches ! ( & self . state, AggregatorState :: ShuttingDown ) ;
1305
1384
1306
1385
relay_statsd:: metric!( timer( MetricTimers :: BucketsScanDuration ) , {
1307
1386
let bucket_interval = self . config. bucket_interval;
1387
+ let cost_tracker = & mut self . cost_tracker;
1308
1388
self . buckets. retain( |key, entry| {
1309
1389
if force || entry. elapsed( ) {
1310
1390
// Take the value and leave a placeholder behind. It'll be removed right after.
1311
1391
let value = std:: mem:: replace( & mut entry. value, BucketValue :: Counter ( 0.0 ) ) ;
1392
+ cost_tracker. subtract_cost( key. project_key, value. cost( ) ) ;
1312
1393
let bucket = Bucket :: from_parts( key. clone( ) , bucket_interval, value) ;
1313
1394
buckets. entry( key. project_key) . or_default( ) . push( bucket) ;
1395
+
1314
1396
false
1315
1397
} else {
1316
1398
true
@@ -1883,6 +1965,24 @@ mod tests {
1883
1965
) ;
1884
1966
}
1885
1967
1968
+ #[ test]
1969
+ fn test_bucket_value_cost ( ) {
1970
+ let counter = BucketValue :: Counter ( 123.0 ) ;
1971
+ assert_eq ! ( counter. cost( ) , 8 ) ;
1972
+ let set = BucketValue :: Set ( vec ! [ 1 , 2 , 3 , 4 , 5 ] . into_iter ( ) . collect ( ) ) ;
1973
+ assert_eq ! ( set. cost( ) , 20 ) ;
1974
+ let distribution = BucketValue :: Distribution ( dist ! [ 1. , 2. , 3. ] ) ;
1975
+ assert_eq ! ( distribution. cost( ) , 36 ) ;
1976
+ let gauge = BucketValue :: Gauge ( GaugeValue {
1977
+ max : 43. ,
1978
+ min : 42. ,
1979
+ sum : 85. ,
1980
+ last : 43. ,
1981
+ count : 2 ,
1982
+ } ) ;
1983
+ assert_eq ! ( gauge. cost( ) , 40 ) ;
1984
+ }
1985
+
1886
1986
#[ test]
1887
1987
fn test_aggregator_merge_counters ( ) {
1888
1988
relay_test:: setup ( ) ;
@@ -2059,6 +2159,108 @@ mod tests {
2059
2159
assert_eq ! ( aggregator. buckets. len( ) , 2 ) ;
2060
2160
}
2061
2161
2162
+ #[ test]
2163
+ fn test_cost_tracker ( ) {
2164
+ let project_key1 = ProjectKey :: parse ( "a94ae32be2584e0bbd7a4cbb95971fed" ) . unwrap ( ) ;
2165
+ let project_key2 = ProjectKey :: parse ( "a94ae32be2584e0bbd7a4cbb95971fee" ) . unwrap ( ) ;
2166
+ let project_key3 = ProjectKey :: parse ( "a94ae32be2584e0bbd7a4cbb95971fef" ) . unwrap ( ) ;
2167
+ let mut cost_tracker = CostTracker :: default ( ) ;
2168
+ insta:: assert_debug_snapshot!( cost_tracker, @r###"
2169
+ CostTracker {
2170
+ total_cost: 0,
2171
+ cost_per_project_key: {},
2172
+ }
2173
+ "### ) ;
2174
+ cost_tracker. add_cost ( project_key1, 100 ) ;
2175
+ insta:: assert_debug_snapshot!( cost_tracker, @r###"
2176
+ CostTracker {
2177
+ total_cost: 100,
2178
+ cost_per_project_key: {
2179
+ ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
2180
+ },
2181
+ }
2182
+ "### ) ;
2183
+ cost_tracker. add_cost ( project_key2, 200 ) ;
2184
+ insta:: assert_debug_snapshot!( cost_tracker, @r###"
2185
+ CostTracker {
2186
+ total_cost: 300,
2187
+ cost_per_project_key: {
2188
+ ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
2189
+ ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
2190
+ },
2191
+ }
2192
+ "### ) ;
2193
+ // Unknown project: Will log error, but not crash
2194
+ cost_tracker. subtract_cost ( project_key3, 666 ) ;
2195
+ insta:: assert_debug_snapshot!( cost_tracker, @r###"
2196
+ CostTracker {
2197
+ total_cost: 300,
2198
+ cost_per_project_key: {
2199
+ ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
2200
+ ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
2201
+ },
2202
+ }
2203
+ "### ) ;
2204
+ // Subtract too much: Will log error, but not crash
2205
+ cost_tracker. subtract_cost ( project_key1, 666 ) ;
2206
+ insta:: assert_debug_snapshot!( cost_tracker, @r###"
2207
+ CostTracker {
2208
+ total_cost: 200,
2209
+ cost_per_project_key: {
2210
+ ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
2211
+ },
2212
+ }
2213
+ "### ) ;
2214
+ cost_tracker. subtract_cost ( project_key2, 20 ) ;
2215
+ insta:: assert_debug_snapshot!( cost_tracker, @r###"
2216
+ CostTracker {
2217
+ total_cost: 180,
2218
+ cost_per_project_key: {
2219
+ ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 180,
2220
+ },
2221
+ }
2222
+ "### ) ;
2223
+ cost_tracker. subtract_cost ( project_key2, 180 ) ;
2224
+ insta:: assert_debug_snapshot!( cost_tracker, @r###"
2225
+ CostTracker {
2226
+ total_cost: 0,
2227
+ cost_per_project_key: {},
2228
+ }
2229
+ "### ) ;
2230
+ }
2231
+
2232
+ #[ test]
2233
+ fn test_aggregator_cost_tracking ( ) {
2234
+ // Make sure that the right cost is added / subtracted
2235
+ let receiver = TestReceiver :: start_default ( ) . recipient ( ) ;
2236
+ let mut aggregator = Aggregator :: new ( test_config ( ) , receiver) ;
2237
+ let project_key = ProjectKey :: parse ( "a94ae32be2584e0bbd7a4cbb95971fed" ) . unwrap ( ) ;
2238
+
2239
+ let mut metric = Metric {
2240
+ name : "c:foo" . to_owned ( ) ,
2241
+ unit : MetricUnit :: None ,
2242
+ value : MetricValue :: Counter ( 42. ) ,
2243
+ timestamp : UnixTimestamp :: from_secs ( 999994711 ) ,
2244
+ tags : BTreeMap :: new ( ) ,
2245
+ } ;
2246
+ for ( metric_value, expected_total_cost) in [
2247
+ ( MetricValue :: Counter ( 42. ) , 8 ) ,
2248
+ ( MetricValue :: Counter ( 42. ) , 8 ) , // counters have constant size
2249
+ ( MetricValue :: Set ( 123 ) , 12 ) , // 8 + 1*4
2250
+ ( MetricValue :: Set ( 123 ) , 12 ) , // Same element in set, no change
2251
+ ( MetricValue :: Set ( 456 ) , 16 ) , // Different element in set -> +4
2252
+ ( MetricValue :: Distribution ( 1.0 ) , 28 ) , // 1 unique element -> +12
2253
+ ( MetricValue :: Distribution ( 1.0 ) , 28 ) , // no new element
2254
+ ( MetricValue :: Distribution ( 2.0 ) , 40 ) , // 1 new element -> +12
2255
+ ( MetricValue :: Gauge ( 0.3 ) , 80 ) ,
2256
+ ( MetricValue :: Gauge ( 0.2 ) , 80 ) , // gauge has constant size
2257
+ ] {
2258
+ metric. value = metric_value;
2259
+ aggregator. insert ( project_key, metric. clone ( ) ) . unwrap ( ) ;
2260
+ assert_eq ! ( aggregator. cost_tracker. total_cost, expected_total_cost) ;
2261
+ }
2262
+ }
2263
+
2062
2264
#[ test]
2063
2265
fn test_flush_bucket ( ) {
2064
2266
relay_test:: setup ( ) ;
0 commit comments