Skip to content

Commit c5eab6e

Browse files
committed
Track history of where channel liquidities have been in the past
This introduces two new fields to the `ChannelLiquidity` struct - `min_liquidity_offset_history` and `max_liquidity_offset_history`, both an array of 8 `u16`s. Each entry represents the proportion of time that we spent with the min or max liquidity offset in the given 1/8th of the channel's liquidity range. ie the first bucket in `min_liquidity_offset_history` represents the proportion of time we've thought the channel's minimum liquidity is lower than 1/8th's the channel's capacity. Each bucket is stored, effectively, as a fixed-point number with 5 bits for the fractional part, which is incremented by one (ie 32) each time we update our liquidity estimates and decide our estimates are in that bucket. We then decay each bucket by 2047/2048. Thus, memory of a payment sticks around for more than 8,000 data points, though the majority of that memory decays after 1,387 data points.
1 parent b84b53a commit c5eab6e

File tree

2 files changed

+139
-21
lines changed

2 files changed

+139
-21
lines changed

lightning/src/routing/scoring.rs

Lines changed: 116 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use util::time::Time;
6464
use prelude::*;
6565
use core::fmt;
6666
use core::cell::{RefCell, RefMut};
67+
use core::convert::TryInto;
6768
use core::ops::{Deref, DerefMut};
6869
use core::time::Duration;
6970
use io::{self, Read};
@@ -432,6 +433,48 @@ pub struct ProbabilisticScoringParameters {
432433
pub considered_impossible_penalty_msat: u64,
433434
}
434435

436+
/// Tracks the historical state of a distribution as a weighted average of how much time was spent
437+
/// in each of 8 buckets.
438+
#[derive(Clone, Copy)]
439+
struct HistoricalBucketRangeTracker {
440+
buckets: [u16; 8],
441+
}
442+
443+
impl HistoricalBucketRangeTracker {
444+
fn new() -> Self { Self { buckets: [0; 8] } }
445+
fn track_datapoint(&mut self, bucket_idx: u8) {
446+
// We have 8 leaky buckets for min and max liquidity. Each bucket tracks the amount of time
447+
// we spend in each bucket as a 16-bit fixed-point number with a 5 bit fractional part.
448+
//
449+
// Each time we update our liquidity estimate, we add 32 (1.0 in our fixed-point system) to
450+
// the buckets for the current min and max liquidity offset positions.
451+
//
452+
// We then decay each bucket by multiplying by 2047/2048 (avoiding dividing by a
453+
// non-power-of-two). This ensures we can't actually overflow the u16 - when we get to
454+
// 63,457 adding 32 and decaying by 2047/2048 leaves us back at 63,457.
455+
//
456+
// In total, this allows us to track data for the last 8,000 or so payments across a given
457+
// channel.
458+
//
459+
// These constants are a balance - we try to fit in 2 bytes per bucket to reduce overhead,
460+
// and need to balance having more bits in the decimal part (to ensure decay isn't too
461+
// non-linear) with having too few bits in the mantissa, causing us to not store very many
462+
// datapoints.
463+
//
464+
// The constants were picked experimentally, selecting a decay amount that restricts us
465+
// from overflowing buckets without having to cap them manually.
466+
debug_assert!(bucket_idx < 8);
467+
if bucket_idx < 8 {
468+
for e in self.buckets.iter_mut() {
469+
*e = ((*e as u32) * 2047 / 2048) as u16;
470+
}
471+
self.buckets[bucket_idx as usize] = self.buckets[bucket_idx as usize].saturating_add(32);
472+
}
473+
}
474+
}
475+
476+
impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
477+
435478
/// Accounting for channel liquidity balance uncertainty.
436479
///
437480
/// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
@@ -446,13 +489,18 @@ struct ChannelLiquidity<T: Time> {
446489

447490
/// Time when the liquidity bounds were last modified.
448491
last_updated: T,
492+
493+
min_liquidity_offset_history: HistoricalBucketRangeTracker,
494+
max_liquidity_offset_history: HistoricalBucketRangeTracker,
449495
}
450496

451497
/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and
452498
/// decayed with a given half life.
453-
struct DirectedChannelLiquidity<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> {
499+
struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> {
454500
min_liquidity_offset_msat: L,
455501
max_liquidity_offset_msat: L,
502+
min_liquidity_offset_history: BRT,
503+
max_liquidity_offset_history: BRT,
456504
capacity_msat: u64,
457505
last_updated: U,
458506
now: T,
@@ -593,6 +641,8 @@ impl<T: Time> ChannelLiquidity<T> {
593641
Self {
594642
min_liquidity_offset_msat: 0,
595643
max_liquidity_offset_msat: 0,
644+
min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
645+
max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
596646
last_updated: T::now(),
597647
}
598648
}
@@ -601,16 +651,21 @@ impl<T: Time> ChannelLiquidity<T> {
601651
/// `capacity_msat`.
602652
fn as_directed(
603653
&self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration
604-
) -> DirectedChannelLiquidity<&u64, T, &T> {
605-
let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target {
606-
(&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat)
607-
} else {
608-
(&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat)
609-
};
654+
) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> {
655+
let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
656+
if source < target {
657+
(&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat,
658+
&self.min_liquidity_offset_history, &self.max_liquidity_offset_history)
659+
} else {
660+
(&self.max_liquidity_offset_msat, &self.min_liquidity_offset_msat,
661+
&self.max_liquidity_offset_history, &self.min_liquidity_offset_history)
662+
};
610663

611664
DirectedChannelLiquidity {
612665
min_liquidity_offset_msat,
613666
max_liquidity_offset_msat,
667+
min_liquidity_offset_history,
668+
max_liquidity_offset_history,
614669
capacity_msat,
615670
last_updated: &self.last_updated,
616671
now: T::now(),
@@ -622,16 +677,21 @@ impl<T: Time> ChannelLiquidity<T> {
622677
/// `capacity_msat`.
623678
fn as_directed_mut(
624679
&mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, half_life: Duration
625-
) -> DirectedChannelLiquidity<&mut u64, T, &mut T> {
626-
let (min_liquidity_offset_msat, max_liquidity_offset_msat) = if source < target {
627-
(&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat)
628-
} else {
629-
(&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat)
630-
};
680+
) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> {
681+
let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
682+
if source < target {
683+
(&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat,
684+
&mut self.min_liquidity_offset_history, &mut self.max_liquidity_offset_history)
685+
} else {
686+
(&mut self.max_liquidity_offset_msat, &mut self.min_liquidity_offset_msat,
687+
&mut self.max_liquidity_offset_history, &mut self.min_liquidity_offset_history)
688+
};
631689

632690
DirectedChannelLiquidity {
633691
min_liquidity_offset_msat,
634692
max_liquidity_offset_msat,
693+
min_liquidity_offset_history,
694+
max_liquidity_offset_history,
635695
capacity_msat,
636696
last_updated: &mut self.last_updated,
637697
now: T::now(),
@@ -652,7 +712,7 @@ const PRECISION_LOWER_BOUND_DENOMINATOR: u64 = approx::LOWER_BITS_BOUND;
652712
const AMOUNT_PENALTY_DIVISOR: u64 = 1 << 20;
653713
const BASE_AMOUNT_PENALTY_DIVISOR: u64 = 1 << 30;
654714

655-
impl<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity<L, T, U> {
715+
impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
656716
/// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in
657717
/// this direction.
658718
fn penalty_msat(&self, amount_msat: u64, params: &ProbabilisticScoringParameters) -> u64 {
@@ -722,7 +782,7 @@ impl<L: Deref<Target = u64>, T: Time, U: Deref<Target = T>> DirectedChannelLiqui
722782
}
723783
}
724784

725-
impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, T, U> {
785+
impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
726786
/// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`.
727787
fn failed_at_channel<Log: Deref>(&mut self, amount_msat: u64, chan_descr: fmt::Arguments, logger: &Log) where Log::Target: Logger {
728788
if amount_msat < self.max_liquidity_msat() {
@@ -750,6 +810,21 @@ impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChanne
750810
self.set_max_liquidity_msat(max_liquidity_msat);
751811
}
752812

813+
fn update_history_buckets(&mut self) {
814+
debug_assert!(*self.min_liquidity_offset_msat <= self.capacity_msat);
815+
self.min_liquidity_offset_history.track_datapoint(
816+
// Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
817+
// is zero or the channel's capacity, though the second should generally never happen.
818+
(self.min_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat)
819+
.try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored
820+
debug_assert!(*self.max_liquidity_offset_msat <= self.capacity_msat);
821+
self.max_liquidity_offset_history.track_datapoint(
822+
// Ensure the bucket index we pass is in the range [0, 7], even if the liquidity offset
823+
// is zero or the channel's capacity, though the second should generally never happen.
824+
(self.max_liquidity_offset_msat.saturating_sub(1) * 8 / self.capacity_msat)
825+
.try_into().unwrap_or(32)); // 32 is bogus for 8 buckets, and will be ignored
826+
}
827+
753828
/// Adjusts the lower bound of the channel liquidity balance in this direction.
754829
fn set_min_liquidity_msat(&mut self, amount_msat: u64) {
755830
*self.min_liquidity_offset_msat = amount_msat;
@@ -759,6 +834,7 @@ impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChanne
759834
self.decayed_offset_msat(*self.max_liquidity_offset_msat)
760835
};
761836
*self.last_updated = self.now;
837+
self.update_history_buckets();
762838
}
763839

764840
/// Adjusts the upper bound of the channel liquidity balance in this direction.
@@ -770,6 +846,7 @@ impl<L: DerefMut<Target = u64>, T: Time, U: DerefMut<Target = T>> DirectedChanne
770846
self.decayed_offset_msat(*self.min_liquidity_offset_msat)
771847
};
772848
*self.last_updated = self.now;
849+
self.update_history_buckets();
773850
}
774851
}
775852

@@ -1236,7 +1313,9 @@ impl<T: Time> Writeable for ChannelLiquidity<T> {
12361313
let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed();
12371314
write_tlv_fields!(w, {
12381315
(0, self.min_liquidity_offset_msat, required),
1316+
(1, Some(self.min_liquidity_offset_history), option),
12391317
(2, self.max_liquidity_offset_msat, required),
1318+
(3, Some(self.max_liquidity_offset_history), option),
12401319
(4, duration_since_epoch, required),
12411320
});
12421321
Ok(())
@@ -1248,10 +1327,14 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
12481327
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
12491328
let mut min_liquidity_offset_msat = 0;
12501329
let mut max_liquidity_offset_msat = 0;
1330+
let mut min_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
1331+
let mut max_liquidity_offset_history = Some(HistoricalBucketRangeTracker::new());
12511332
let mut duration_since_epoch = Duration::from_secs(0);
12521333
read_tlv_fields!(r, {
12531334
(0, min_liquidity_offset_msat, required),
1335+
(1, min_liquidity_offset_history, option),
12541336
(2, max_liquidity_offset_msat, required),
1337+
(3, max_liquidity_offset_history, option),
12551338
(4, duration_since_epoch, required),
12561339
});
12571340
// On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards.
@@ -1269,14 +1352,16 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
12691352
Ok(Self {
12701353
min_liquidity_offset_msat,
12711354
max_liquidity_offset_msat,
1355+
min_liquidity_offset_history: min_liquidity_offset_history.unwrap(),
1356+
max_liquidity_offset_history: max_liquidity_offset_history.unwrap(),
12721357
last_updated,
12731358
})
12741359
}
12751360
}
12761361

12771362
#[cfg(test)]
12781363
mod tests {
1279-
use super::{ChannelLiquidity, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime};
1364+
use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringParameters, ProbabilisticScorerUsingTime};
12801365
use util::time::Time;
12811366
use util::time::tests::SinceEpoch;
12821367

@@ -1459,11 +1544,15 @@ mod tests {
14591544
let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
14601545
.with_channel(42,
14611546
ChannelLiquidity {
1462-
min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated
1547+
min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated,
1548+
min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
1549+
max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
14631550
})
14641551
.with_channel(43,
14651552
ChannelLiquidity {
1466-
min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated
1553+
min_liquidity_offset_msat: 700, max_liquidity_offset_msat: 100, last_updated,
1554+
min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
1555+
max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
14671556
});
14681557
let source = source_node_id();
14691558
let target = target_node_id();
@@ -1534,7 +1623,9 @@ mod tests {
15341623
let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
15351624
.with_channel(42,
15361625
ChannelLiquidity {
1537-
min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated
1626+
min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated,
1627+
min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
1628+
max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
15381629
});
15391630
let source = source_node_id();
15401631
let target = target_node_id();
@@ -1592,7 +1683,9 @@ mod tests {
15921683
let mut scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
15931684
.with_channel(42,
15941685
ChannelLiquidity {
1595-
min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated
1686+
min_liquidity_offset_msat: 200, max_liquidity_offset_msat: 400, last_updated,
1687+
min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
1688+
max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
15961689
});
15971690
let source = source_node_id();
15981691
let target = target_node_id();
@@ -1699,7 +1792,9 @@ mod tests {
16991792
let scorer = ProbabilisticScorer::new(params, &network_graph, &logger)
17001793
.with_channel(42,
17011794
ChannelLiquidity {
1702-
min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated
1795+
min_liquidity_offset_msat: 40, max_liquidity_offset_msat: 40, last_updated,
1796+
min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
1797+
max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
17031798
});
17041799
let source = source_node_id();
17051800
let target = target_node_id();

lightning/src/util/ser.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,29 @@ impl_array!(PUBLIC_KEY_SIZE); // for PublicKey
515515
impl_array!(COMPACT_SIGNATURE_SIZE); // for Signature
516516
impl_array!(1300); // for OnionPacket.hop_data
517517

518+
impl Writeable for [u16; 8] {
519+
#[inline]
520+
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
521+
for v in self.iter() {
522+
w.write_all(&v.to_be_bytes())?
523+
}
524+
Ok(())
525+
}
526+
}
527+
528+
impl Readable for [u16; 8] {
529+
#[inline]
530+
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
531+
let mut buf = [0u8; 16];
532+
r.read_exact(&mut buf)?;
533+
let mut res = [0u16; 8];
534+
for (idx, v) in res.iter_mut().enumerate() {
535+
*v = (buf[idx] as u16) << 8 | (buf[idx + 1] as u16)
536+
}
537+
Ok(res)
538+
}
539+
}
540+
518541
// HashMap
519542
impl<K, V> Writeable for HashMap<K, V>
520543
where K: Writeable + Eq + Hash,

0 commit comments

Comments
 (0)