@@ -767,6 +767,12 @@ enum AggregateMetricsErrorKind {
767
767
/// A metric bucket had a too long string (metric name or a tag key/value).
768
768
#[ fail( display = "found invalid string" ) ]
769
769
InvalidStringLength ,
770
+ /// A metric bucket is too large for the global bytes limit.
771
+ #[ fail( display = "total metrics limit exceeded" ) ]
772
+ TotalLimitExceeded ,
773
+ /// A metric bucket is too large for the per-project bytes limit.
774
+ #[ fail( display = "project metrics limit exceeded" ) ]
775
+ ProjectLimitExceeded ,
770
776
}
771
777
772
778
#[ derive( Clone , Debug , PartialEq , Eq , Hash ) ]
@@ -863,6 +869,23 @@ pub struct AggregatorConfig {
863
869
///
864
870
/// Defaults to `200` bytes.
865
871
pub max_tag_value_length : usize ,
872
+
873
+ /// Maximum amount of bytes used for metrics aggregation.
874
+ ///
875
+ /// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory.
876
+ /// This is only an approximation and does not take into account things such as pre-allocation
877
+ /// in hashmaps.
878
+ ///
879
+ /// Defaults to `None`, i.e. no limit.
880
+ pub max_total_bucket_bytes : Option < usize > ,
881
+
882
+ /// Maximum amount of bytes used for metrics aggregation per project.
883
+ ///
884
+ /// Similar measuring technique to `max_total_bucket_bytes`, but instead of a
885
+ /// global/process-wide limit, it is enforced per project id.
886
+ ///
887
+ /// Defaults to `None`, i.e. no limit.
888
+ pub max_project_bucket_bytes : Option < usize > ,
866
889
}
867
890
868
891
impl AggregatorConfig {
@@ -962,6 +985,8 @@ impl Default for AggregatorConfig {
962
985
max_name_length : 200 ,
963
986
max_tag_key_length : 200 ,
964
987
max_tag_value_length : 200 ,
988
+ max_total_bucket_bytes : None ,
989
+ max_project_bucket_bytes : None ,
965
990
}
966
991
}
967
992
}
@@ -1050,6 +1075,23 @@ impl Message for FlushBuckets {
1050
1075
type Result = Result < ( ) , Vec < Bucket > > ;
1051
1076
}
1052
1077
1078
+ /// Check whether the aggregator has not (yet) exceeded its total limits. Used for healthchecks.
1079
+ pub struct AcceptsMetrics ;
1080
+
1081
+ impl Message for AcceptsMetrics {
1082
+ type Result = bool ;
1083
+ }
1084
+
1085
+ impl Handler < AcceptsMetrics > for Aggregator {
1086
+ type Result = bool ;
1087
+
1088
+ fn handle ( & mut self , _msg : AcceptsMetrics , _ctx : & mut Self :: Context ) -> Self :: Result {
1089
+ !self
1090
+ . cost_tracker
1091
+ . totals_cost_exceeded ( self . config . max_total_bucket_bytes )
1092
+ }
1093
+ }
1094
+
1053
1095
enum AggregatorState {
1054
1096
Running ,
1055
1097
ShuttingDown ,
@@ -1062,6 +1104,46 @@ struct CostTracker {
1062
1104
}
1063
1105
1064
1106
impl CostTracker {
1107
+ fn totals_cost_exceeded ( & self , max_total_cost : Option < usize > ) -> bool {
1108
+ if let Some ( max_total_cost) = max_total_cost {
1109
+ if self . total_cost >= max_total_cost {
1110
+ return true ;
1111
+ }
1112
+ }
1113
+
1114
+ false
1115
+ }
1116
+
1117
+ fn check_limits_exceeded (
1118
+ & self ,
1119
+ project_key : ProjectKey ,
1120
+ max_total_cost : Option < usize > ,
1121
+ max_project_cost : Option < usize > ,
1122
+ ) -> Result < ( ) , AggregateMetricsError > {
1123
+ if self . totals_cost_exceeded ( max_total_cost) {
1124
+ relay_log:: configure_scope ( |scope| {
1125
+ scope. set_extra ( "bucket.project_key" , project_key. as_str ( ) . to_owned ( ) . into ( ) ) ;
1126
+ } ) ;
1127
+ return Err ( AggregateMetricsErrorKind :: TotalLimitExceeded . into ( ) ) ;
1128
+ }
1129
+
1130
+ if let Some ( max_project_cost) = max_project_cost {
1131
+ let project_cost = self
1132
+ . cost_per_project_key
1133
+ . get ( & project_key)
1134
+ . cloned ( )
1135
+ . unwrap_or ( 0 ) ;
1136
+ if project_cost >= max_project_cost {
1137
+ relay_log:: configure_scope ( |scope| {
1138
+ scope. set_extra ( "bucket.project_key" , project_key. as_str ( ) . to_owned ( ) . into ( ) ) ;
1139
+ } ) ;
1140
+ return Err ( AggregateMetricsErrorKind :: ProjectLimitExceeded . into ( ) ) ;
1141
+ }
1142
+ }
1143
+
1144
+ Ok ( ( ) )
1145
+ }
1146
+
1065
1147
fn add_cost ( & mut self , project_key : ProjectKey , cost : usize ) {
1066
1148
self . total_cost += cost;
1067
1149
let project_cost = self . cost_per_project_key . entry ( project_key) . or_insert ( 0 ) ;
@@ -1291,6 +1373,38 @@ impl Aggregator {
1291
1373
1292
1374
let key = Self :: validate_bucket_key ( key, & self . config ) ?;
1293
1375
1376
+ // XXX: This is not a great implementation of cost enforcement.
1377
+ //
1378
+ // * it takes two lookups of the project key in the cost tracker to merge a bucket: once in
1379
+ // `check_limits_exceeded` and once in `add_cost`.
1380
+ //
1381
+ // * the limits are not actually enforced consistently
1382
+ //
1383
+ // A bucket can be merged that exceeds the cost limit, and only the next bucket will be
1384
+ // limited because the limit is now reached. This implementation was chosen because it's
1385
+ // currently not possible to determine cost accurately upfront: The bucket values have to
1386
+ // be merged together to figure out how costly the merge was. Changing that would force
1387
+ // us to unravel a lot of abstractions that we have already built.
1388
+ //
1389
+ // As a result of that, it is possible to exceed the bucket cost limit significantly
1390
+ // until we have guaranteed upper bounds on the cost of a single bucket (which we
1391
+ // currently don't, because a metric can have arbitrary amount of tag values).
1392
+ //
1393
+ // Another consequence is that a MergeValue that adds zero cost (such as an existing
1394
+ // counter bucket being incremented) is currently rejected even though it doesn't have to
1395
+ // be.
1396
+ //
1397
+ // The flipside of this approach is however that there's more optimization potential: If
1398
+ // the limit is already exceeded, we could implement an optimization that drops envelope
1399
+ // items before they are parsed, as we can be sure that the new metric bucket will be
1400
+ // rejected in the aggregator regardless of whether it is merged into existing buckets,
1401
+ // whether it is just a counter, etc.
1402
+ self . cost_tracker . check_limits_exceeded (
1403
+ project_key,
1404
+ self . config . max_total_bucket_bytes ,
1405
+ self . config . max_project_bucket_bytes ,
1406
+ ) ?;
1407
+
1294
1408
let added_cost;
1295
1409
match self . buckets . entry ( key) {
1296
1410
Entry :: Occupied ( mut entry) => {
@@ -1694,6 +1808,8 @@ mod tests {
1694
1808
max_name_length : 200 ,
1695
1809
max_tag_key_length : 200 ,
1696
1810
max_tag_value_length : 200 ,
1811
+ max_project_bucket_bytes : None ,
1812
+ max_total_bucket_bytes : None ,
1697
1813
}
1698
1814
}
1699
1815
@@ -2623,4 +2739,56 @@ mod tests {
2623
2739
. unwrap ( ) ;
2624
2740
assert_eq ! ( validation. tags. len( ) , 0 ) ;
2625
2741
}
2742
+
2743
+ #[ test]
2744
+ fn test_aggregator_cost_enforcement_total ( ) {
2745
+ let config = AggregatorConfig {
2746
+ max_total_bucket_bytes : Some ( 1 ) ,
2747
+ ..test_config ( )
2748
+ } ;
2749
+
2750
+ let metric = Metric {
2751
+ name : "c:foo" . to_owned ( ) ,
2752
+ unit : MetricUnit :: None ,
2753
+ value : MetricValue :: Counter ( 42. ) ,
2754
+ timestamp : UnixTimestamp :: from_secs ( 999994711 ) ,
2755
+ tags : BTreeMap :: new ( ) ,
2756
+ } ;
2757
+
2758
+ let receiver = TestReceiver :: start_default ( ) . recipient ( ) ;
2759
+ let mut aggregator = Aggregator :: new ( config, receiver) ;
2760
+ let project_key = ProjectKey :: parse ( "a94ae32be2584e0bbd7a4cbb95971fed" ) . unwrap ( ) ;
2761
+
2762
+ aggregator. insert ( project_key, metric. clone ( ) ) . unwrap ( ) ;
2763
+ assert_eq ! (
2764
+ aggregator. insert( project_key, metric) . unwrap_err( ) . kind,
2765
+ AggregateMetricsErrorKind :: TotalLimitExceeded
2766
+ ) ;
2767
+ }
2768
+
2769
+ #[ test]
2770
+ fn test_aggregator_cost_enforcement_project ( ) {
2771
+ let config = AggregatorConfig {
2772
+ max_project_bucket_bytes : Some ( 1 ) ,
2773
+ ..test_config ( )
2774
+ } ;
2775
+
2776
+ let metric = Metric {
2777
+ name : "c:foo" . to_owned ( ) ,
2778
+ unit : MetricUnit :: None ,
2779
+ value : MetricValue :: Counter ( 42. ) ,
2780
+ timestamp : UnixTimestamp :: from_secs ( 999994711 ) ,
2781
+ tags : BTreeMap :: new ( ) ,
2782
+ } ;
2783
+
2784
+ let receiver = TestReceiver :: start_default ( ) . recipient ( ) ;
2785
+ let mut aggregator = Aggregator :: new ( config, receiver) ;
2786
+ let project_key = ProjectKey :: parse ( "a94ae32be2584e0bbd7a4cbb95971fed" ) . unwrap ( ) ;
2787
+
2788
+ aggregator. insert ( project_key, metric. clone ( ) ) . unwrap ( ) ;
2789
+ assert_eq ! (
2790
+ aggregator. insert( project_key, metric) . unwrap_err( ) . kind,
2791
+ AggregateMetricsErrorKind :: ProjectLimitExceeded
2792
+ ) ;
2793
+ }
2626
2794
}
0 commit comments