Skip to content

Expose historical bucket data via new accessors #1961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lightning/src/routing/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ impl fmt::Debug for NodeId {
write!(f, "NodeId({})", log_bytes!(self.0))
}
}
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", log_bytes!(self.0))
}
}

impl core::hash::Hash for NodeId {
fn hash<H: core::hash::Hasher>(&self, hasher: &mut H) {
Expand Down
124 changes: 108 additions & 16 deletions lightning/src/routing/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,22 @@ struct HistoricalMinMaxBuckets<'a> {

impl HistoricalMinMaxBuckets<'_> {
#[inline]
fn calculate_success_probability_times_billion(&self, required_decays: u32, payment_amt_64th_bucket: u8) -> Option<u64> {
fn get_decayed_buckets<T: Time>(&self, now: T, last_updated: T, half_life: Duration)
-> ([u16; 8], [u16; 8], u32) {
let required_decays = now.duration_since(last_updated).as_secs()
.checked_div(half_life.as_secs())
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
let mut min_buckets = *self.min_liquidity_offset_history;
min_buckets.time_decay_data(required_decays);
let mut max_buckets = *self.max_liquidity_offset_history;
max_buckets.time_decay_data(required_decays);
(min_buckets.buckets, max_buckets.buckets, required_decays)
}

#[inline]
fn calculate_success_probability_times_billion<T: Time>(
&self, now: T, last_updated: T, half_life: Duration, payment_amt_64th_bucket: u8)
-> Option<u64> {
// If historical penalties are enabled, calculate the penalty by walking the set of
// historical liquidity bucket (min, max) combinations (where min_idx < max_idx) and, for
// each, calculate the probability of success given our payment amount, then total the
Expand All @@ -619,23 +634,22 @@ impl HistoricalMinMaxBuckets<'_> {
// less than 1/16th of a channel's capacity, or 1/8th if we used the top of the bucket.
let mut total_valid_points_tracked = 0;

// Rather than actually decaying the individual buckets, which would lose precision, we
// simply track whether all buckets would be decayed to zero, in which case we treat it as
// if we had no data.
let mut is_fully_decayed = true;
let mut check_track_bucket_contains_undecayed_points =
|bucket_val: u16| if bucket_val.checked_shr(required_decays).unwrap_or(0) > 0 { is_fully_decayed = false; };
// Check if all our buckets are zero, once decayed and treat it as if we had no data. We
// don't actually use the decayed buckets, though, as that would lose precision.
let (decayed_min_buckets, decayed_max_buckets, required_decays) =
self.get_decayed_buckets(now, last_updated, half_life);
if decayed_min_buckets.iter().all(|v| *v == 0) || decayed_max_buckets.iter().all(|v| *v == 0) {
return None;
}

for (min_idx, min_bucket) in self.min_liquidity_offset_history.buckets.iter().enumerate() {
check_track_bucket_contains_undecayed_points(*min_bucket);
for max_bucket in self.max_liquidity_offset_history.buckets.iter().take(8 - min_idx) {
total_valid_points_tracked += (*min_bucket as u64) * (*max_bucket as u64);
check_track_bucket_contains_undecayed_points(*max_bucket);
}
}
// If the total valid points is smaller than 1.0 (i.e. 32 in our fixed-point scheme), treat
// it as if we were fully decayed.
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 || is_fully_decayed {
if total_valid_points_tracked.checked_shr(required_decays).unwrap_or(0) < 32*32 {
return None;
}

Expand Down Expand Up @@ -717,15 +731,34 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ProbabilisticScorerU
/// Note that this writes roughly one line per channel for which we have a liquidity estimate,
/// which may be a substantial amount of log output.
pub fn debug_log_liquidity_stats(&self) {
let now = T::now();

let graph = self.network_graph.read_only();
for (scid, liq) in self.channel_liquidities.iter() {
if let Some(chan_debug) = graph.channels().get(scid) {
let log_direction = |source, target| {
if let Some((directed_info, _)) = chan_debug.as_directed_to(target) {
let amt = directed_info.effective_capacity().as_msat();
let dir_liq = liq.as_directed(source, target, amt, &self.params);
log_debug!(self.logger, "Liquidity from {:?} to {:?} via {} is in the range ({}, {})",
source, target, scid, dir_liq.min_liquidity_msat(), dir_liq.max_liquidity_msat());

let buckets = HistoricalMinMaxBuckets {
min_liquidity_offset_history: &dir_liq.min_liquidity_offset_history,
max_liquidity_offset_history: &dir_liq.max_liquidity_offset_history,
};
let (min_buckets, max_buckets, _) = buckets.get_decayed_buckets(now,
*dir_liq.last_updated, self.params.historical_no_updates_half_life);

log_debug!(self.logger, core::concat!(
"Liquidity from {} to {} via {} is in the range ({}, {}).\n",
"\tHistorical min liquidity octile relative probabilities: {} {} {} {} {} {} {} {}\n",
"\tHistorical max liquidity octile relative probabilities: {} {} {} {} {} {} {} {}"),
source, target, scid, dir_liq.min_liquidity_msat(), dir_liq.max_liquidity_msat(),
min_buckets[0], min_buckets[1], min_buckets[2], min_buckets[3],
min_buckets[4], min_buckets[5], min_buckets[6], min_buckets[7],
// Note that the liquidity buckets are an offset from the edge, so we
// inverse the max order to get the probabilities from zero.
max_buckets[7], max_buckets[6], max_buckets[5], max_buckets[4],
max_buckets[3], max_buckets[2], max_buckets[1], max_buckets[0]);
} else {
log_debug!(self.logger, "No amount known for SCID {} from {:?} to {:?}", scid, source, target);
}
Expand Down Expand Up @@ -756,6 +789,53 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ProbabilisticScorerU
None
}

/// Query the historical estimated minimum and maximum liquidity available for sending a
/// payment over the channel with `scid` towards the given `target` node.
///
/// Returns two sets of 8 buckets. The first set describes the octiles for lower-bound
/// liquidity estimates, the second set describes the octiles for upper-bound liquidity
/// estimates. Each bucket describes the relative frequency at which we've seen a liquidity
/// bound in the octile relative to the channel's total capacity, on an arbitrary scale.
/// Because the values are slowly decayed, more recent data points are weighted more heavily
/// than older datapoints.
///
/// When scoring, the estimated probability that an upper-/lower-bound lies in a given octile
/// relative to the channel's total capacity is calculated by dividing that bucket's value with
/// the total of all buckets for the given bound.
///
/// For example, a value of `[0, 0, 0, 0, 0, 0, 32]` indicates that we believe the probability
/// of a bound being in the top octile to be 100%, and have never (recently) seen it in any
/// other octiles. A value of `[31, 0, 0, 0, 0, 0, 0, 32]` indicates we've seen the bound being
/// both in the top and bottom octile, and roughly with similar (recent) frequency.
///
/// Because the datapoints are decayed slowly over time, values will eventually return to
/// `Some(([0; 8], [0; 8]))`.
pub fn historical_estimated_channel_liquidity_probabilities(&self, scid: u64, target: &NodeId)
-> Option<([u16; 8], [u16; 8])> {
let graph = self.network_graph.read_only();

if let Some(chan) = graph.channels().get(&scid) {
if let Some(liq) = self.channel_liquidities.get(&scid) {
if let Some((directed_info, source)) = chan.as_directed_to(target) {
let amt = directed_info.effective_capacity().as_msat();
let dir_liq = liq.as_directed(source, target, amt, &self.params);

let buckets = HistoricalMinMaxBuckets {
min_liquidity_offset_history: &dir_liq.min_liquidity_offset_history,
max_liquidity_offset_history: &dir_liq.max_liquidity_offset_history,
};
let (min_buckets, mut max_buckets, _) = buckets.get_decayed_buckets(T::now(),
*dir_liq.last_updated, self.params.historical_no_updates_half_life);
// Note that the liquidity buckets are an offset from the edge, so we inverse
// the max order to get the probabilities from zero.
max_buckets.reverse();
return Some((min_buckets, max_buckets));
}
}
}
None
}

/// Marks the node with the given `node_id` as banned, i.e.,
/// it will be avoided during path finding.
pub fn add_banned(&mut self, node_id: &NodeId) {
Expand Down Expand Up @@ -942,9 +1022,6 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,

if params.historical_liquidity_penalty_multiplier_msat != 0 ||
params.historical_liquidity_penalty_amount_multiplier_msat != 0 {
let required_decays = self.now.duration_since(*self.last_updated).as_secs()
.checked_div(params.historical_no_updates_half_life.as_secs())
.map_or(u32::max_value(), |decays| cmp::min(decays, u32::max_value() as u64) as u32);
let payment_amt_64th_bucket = amount_msat * 64 / self.capacity_msat;
debug_assert!(payment_amt_64th_bucket <= 64);
if payment_amt_64th_bucket > 64 { return res; }
Expand All @@ -954,7 +1031,9 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
max_liquidity_offset_history: &self.max_liquidity_offset_history,
};
if let Some(cumulative_success_prob_times_billion) = buckets
.calculate_success_probability_times_billion(required_decays, payment_amt_64th_bucket as u8) {
.calculate_success_probability_times_billion(self.now, *self.last_updated,
params.historical_no_updates_half_life, payment_amt_64th_bucket as u8)
{
let historical_negative_log10_times_2048 = approx::negative_log10_times_2048(cumulative_success_prob_times_billion + 1, 1024 * 1024 * 1024);
res = res.saturating_add(Self::combined_penalty_msat(amount_msat,
historical_negative_log10_times_2048, params.historical_liquidity_penalty_multiplier_msat,
Expand Down Expand Up @@ -2671,19 +2750,32 @@ mod tests {
};
// With no historical data the normal liquidity penalty calculation is used.
assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 47);
assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
None);

scorer.payment_path_failed(&payment_path_for_amount(1).iter().collect::<Vec<_>>(), 42);
assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 2048);
// The "it failed" increment is 32, where the probability should lie fully in the first
// octile.
assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
Some(([32, 0, 0, 0, 0, 0, 0, 0], [32, 0, 0, 0, 0, 0, 0, 0])));

// Even after we tell the scorer we definitely have enough available liquidity, it will
// still remember that there was some failure in the past, and assign a non-0 penalty.
scorer.payment_path_failed(&payment_path_for_amount(1000).iter().collect::<Vec<_>>(), 43);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doubt/confused, shouldn't we call payment_path_successful here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See payment_for_amount - if we say payment_path_failed at hop 43 (which is after 42) then we're telling the scorer "we were able to pay the required amount at hop 42". If we were to switch to payment_path_successful then we're telling the scorer "we were able to and did pay the required amount at hop 42, so the available liquidity is 1000 less than you think, cause we just pushed funds over it".

assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 198);
// The first octile should be decayed just slightly and the last octile has a new point.
assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
Some(([31, 0, 0, 0, 0, 0, 0, 32], [31, 0, 0, 0, 0, 0, 0, 32])));

// Advance the time forward 16 half-lives (which the docs claim will ensure all data is
// gone), and check that we're back to where we started.
SinceEpoch::advance(Duration::from_secs(10 * 16));
assert_eq!(scorer.channel_penalty_msat(42, &source, &target, usage), 47);
// Once fully decayed we still have data, but its all-0s. In the future we may remove the
// data entirely instead.
assert_eq!(scorer.historical_estimated_channel_liquidity_probabilities(42, &target),
Some(([0; 8], [0; 8])));
}

#[test]
Expand Down