Skip to content

Commit dca5d39

Browse files
committed
Introduce graph sync crate for fast-forwarding through gossip data downloaded from a server.
1 parent ad819ea commit dca5d39

File tree

8 files changed

+216
-6
lines changed

8 files changed

+216
-6
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ members = [
77
"lightning-net-tokio",
88
"lightning-persister",
99
"lightning-background-processor",
10+
"lightning-graph-sync"
1011
]
1112

1213
# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it.

lightning-block-sync/src/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
9797
}
9898

9999
/// Client for making HTTP requests.
100-
pub(crate) struct HttpClient {
100+
pub struct HttpClient {
101101
address: SocketAddr,
102102
stream: TcpStream,
103103
}

lightning-graph-sync/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "lightning-graph-sync"
3+
version = "0.0.101"
4+
authors = ["Arik Sosman <[email protected]>"]
5+
license = "MIT OR Apache-2.0"
6+
repository = "http://github.com/rust-bitcoin/rust-lightning"
7+
edition = "2018"
8+
description = """
9+
Utility to fetch gossip routing data from LNSync or LNSync-like server
10+
"""
11+
12+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
13+
14+
[dependencies]
15+
lightning = { version = "0.0.101", path = "../lightning" }
16+
lightning-block-sync = { version = "0.0.101", path = "../lightning-block-sync", features = ["rest-client"] }
17+
tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
18+
19+
[dev-dependencies]
20+
bitcoin = { version = "0.27", default-features = false, features = ["secp-recovery"] }
21+
tokio = { version = "1.0", features = [ "macros", "rt" ] }

lightning-graph-sync/src/error.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use lightning::ln::msgs::{DecodeError, LightningError};
2+
3+
pub enum GraphSyncError{
4+
IOError(std::io::Error),
5+
DecodeError(DecodeError),
6+
LightningError(LightningError),
7+
ProcessingError(String)
8+
}
9+
10+
impl From<std::io::Error> for GraphSyncError {
11+
fn from(error: std::io::Error) -> Self {
12+
Self::IOError(error)
13+
}
14+
}
15+
16+
impl From<DecodeError> for GraphSyncError {
17+
fn from(error: DecodeError) -> Self {
18+
Self::DecodeError(error)
19+
}
20+
}
21+
22+
impl From<LightningError> for GraphSyncError {
23+
fn from(error: LightningError) -> Self {
24+
Self::LightningError(error)
25+
}
26+
}

lightning-graph-sync/src/lib.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use core::ops::Deref;
2+
3+
use lightning::chain;
4+
use lightning::routing::network_graph;
5+
use lightning_block_sync::http::*;
6+
7+
use crate::error::GraphSyncError;
8+
9+
pub mod error;
10+
pub mod processing;
11+
12+
pub async fn sync_network_graph<CS: Deref>(
13+
network_graph: &network_graph::NetworkGraph,
14+
sync_host: &str,
15+
sync_port: u16,
16+
sync_path: &str,
17+
chain_source: &Option<CS>,
18+
) -> Result<(), GraphSyncError>
19+
where
20+
CS::Target: chain::Access,
21+
{
22+
// make sure there is precisely one leading slash
23+
let canonical_path = format!("/{}", sync_path.trim_start_matches("/"));
24+
println!("Attempting sync from: http://{}:{}{}", sync_host, sync_port, canonical_path);
25+
let http_endpoint = HttpEndpoint::for_host(sync_host.into()).with_port(sync_port);
26+
let http_client_result = HttpClient::connect(&http_endpoint);
27+
let mut http_client = http_client_result?;
28+
29+
let response_result = http_client
30+
.get::<BinaryResponse>(&canonical_path, sync_host)
31+
.await;
32+
33+
let response = response_result?;
34+
let response_bytes = response.0;
35+
36+
processing::update_network_graph(&network_graph, &response_bytes[..], &chain_source)
37+
}
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use bitcoin::blockdata::constants::genesis_block;
42+
use bitcoin::Network;
43+
44+
use lightning::routing::network_graph::NetworkGraph;
45+
use crate::error::GraphSyncError;
46+
47+
use crate::sync_network_graph;
48+
49+
#[test]
50+
fn it_works() {
51+
assert_eq!(2 + 2, 4);
52+
}
53+
54+
#[tokio::test]
55+
async fn test_http_request() {
56+
let block_hash = genesis_block(Network::Bitcoin).block_hash();
57+
let network_graph = NetworkGraph::new(block_hash);
58+
59+
let chain_source: Option<Box<dyn lightning::chain::Access>> = None;
60+
61+
let sync_result = sync_network_graph(
62+
&network_graph,
63+
"localhost",
64+
3000,
65+
"delta/500",
66+
&chain_source,
67+
).await;
68+
69+
if !sync_result.is_ok() {
70+
let error_string = match sync_result.as_ref().unwrap_err() {
71+
GraphSyncError::IOError(error) => {
72+
error.to_string()
73+
}
74+
GraphSyncError::DecodeError(error) => {
75+
error.to_string()
76+
}
77+
GraphSyncError::LightningError(error) => {
78+
(&error.err).to_string()
79+
}
80+
GraphSyncError::ProcessingError(error) => {
81+
error.to_string()
82+
}
83+
};
84+
println!("Error: {}", error_string);
85+
}
86+
87+
assert!(sync_result.is_ok())
88+
}
89+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use core::ops::Deref;
2+
use std::io;
3+
use std::io::Read;
4+
use lightning::chain;
5+
6+
use lightning::ln::msgs;
7+
use lightning::ln::wire::encode::Encode;
8+
use lightning::routing::network_graph;
9+
use lightning::util::ser::{BigSize, FixedLengthReader, Readable};
10+
11+
use crate::error::GraphSyncError;
12+
13+
pub fn update_network_graph<CS: Deref>(network_graph: &network_graph::NetworkGraph, update_data: &[u8], chain_source: &Option<CS>) -> Result<(), GraphSyncError> where CS::Target: chain::Access {
14+
let total_input_length = update_data.len() as u64;
15+
let mut read_cursor = io::Cursor::new(update_data);
16+
17+
let mut prefix = Vec::with_capacity(4);
18+
prefix.resize(4, 0);
19+
read_cursor.read_exact(&mut prefix)?;
20+
21+
let _deserialize_unsigned = match &prefix[..] {
22+
&[71, 83, 80, 1] => {
23+
return Err(GraphSyncError::ProcessingError("Signed deserialization not supported".to_string()));
24+
}
25+
&[76, 68, 75, 2] => true,
26+
_ => {
27+
return Err(GraphSyncError::ProcessingError("Prefix must equal 'LDK' and the byte 2".to_string()));
28+
}
29+
};
30+
31+
loop {
32+
let message_length: BigSize = Readable::read(&mut read_cursor)?;
33+
let mut restricted_cursor = FixedLengthReader::new(&mut read_cursor, message_length.0);
34+
35+
let message_type: u16 = Readable::read(&mut restricted_cursor)?;
36+
37+
let update_result = match message_type {
38+
msgs::ChannelAnnouncement::TYPE => {
39+
let channel_announcement: msgs::UnsignedChannelAnnouncement =
40+
Readable::read(&mut restricted_cursor).unwrap();
41+
network_graph
42+
.update_channel_from_unsigned_announcement(&channel_announcement, &chain_source)
43+
}
44+
msgs::NodeAnnouncement::TYPE => {
45+
let node_announcement: msgs::UnsignedNodeAnnouncement =
46+
Readable::read(&mut restricted_cursor)?;
47+
network_graph.update_node_from_unsigned_announcement(&node_announcement)
48+
}
49+
msgs::ChannelUpdate::TYPE => {
50+
let channel_update: msgs::UnsignedChannelUpdate =
51+
Readable::read(&mut restricted_cursor)?;
52+
network_graph.update_channel_unsigned(&channel_update)
53+
}
54+
_ => {
55+
let error_string = format!("Unexpected graph sync message type: {}", message_type);
56+
return Err(GraphSyncError::ProcessingError(error_string));
57+
}
58+
};
59+
60+
// propagate the error
61+
let _update = update_result?;
62+
63+
if read_cursor.position() == total_input_length {
64+
break;
65+
}
66+
}
67+
68+
Ok(())
69+
}

lightning/src/ln/wire.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub trait CustomMessageReader {
3232
/// variant contains a message from [`msgs`] or otherwise the message type if unknown.
3333
#[allow(missing_docs)]
3434
#[derive(Debug)]
35-
pub(crate) enum Message<T> where T: core::fmt::Debug + Type {
35+
pub enum Message<T> where T: core::fmt::Debug + Type {
3636
Init(msgs::Init),
3737
Error(msgs::ErrorMessage),
3838
Ping(msgs::Ping),
@@ -117,7 +117,7 @@ impl<T> Message<T> where T: core::fmt::Debug + Type {
117117
/// # Errors
118118
///
119119
/// Returns an error if the message payload code not be decoded as the specified type.
120-
pub(crate) fn read<R: io::Read, T, H: core::ops::Deref>(
120+
pub fn read<R: io::Read, T, H: core::ops::Deref>(
121121
buffer: &mut R,
122122
custom_reader: H,
123123
) -> Result<Message<T>, msgs::DecodeError>
@@ -232,7 +232,8 @@ pub(crate) fn write<M: Type + Writeable, W: Writer>(message: &M, buffer: &mut W)
232232
message.write(buffer)
233233
}
234234

235-
mod encode {
235+
/// Publish message types
236+
pub mod encode {
236237
/// Defines a constant type identifier for reading messages from the wire.
237238
pub trait Encode {
238239
/// The type identifying the message payload.

lightning/src/util/ser.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,22 +90,25 @@ impl Writer for LengthCalculatingWriter {
9090

9191
/// Essentially std::io::Take but a bit simpler and with a method to walk the underlying stream
9292
/// forward to ensure we always consume exactly the fixed length specified.
93-
pub(crate) struct FixedLengthReader<R: Read> {
93+
pub struct FixedLengthReader<R: Read> {
9494
read: R,
9595
bytes_read: u64,
9696
total_bytes: u64,
9797
}
9898
impl<R: Read> FixedLengthReader<R> {
99+
/// Create new fixed-length-reader
99100
pub fn new(read: R, total_bytes: u64) -> Self {
100101
Self { read, bytes_read: 0, total_bytes }
101102
}
102103

103104
#[inline]
105+
/// How many bytes remain in the FLR?
104106
pub fn bytes_remain(&mut self) -> bool {
105107
self.bytes_read != self.total_bytes
106108
}
107109

108110
#[inline]
111+
/// Consume remaining bytes, moving cursor to the end
109112
pub fn eat_remaining(&mut self) -> Result<(), DecodeError> {
110113
copy(self, &mut sink()).unwrap();
111114
if self.bytes_read != self.total_bytes {
@@ -299,7 +302,7 @@ impl Readable for U48 {
299302
/// encoded in several different ways, which we must check for at deserialization-time. Thus, if
300303
/// you're looking for an example of a variable-length integer to use for your own project, move
301304
/// along, this is a rather poor design.
302-
pub(crate) struct BigSize(pub u64);
305+
pub struct BigSize(pub u64);
303306
impl Writeable for BigSize {
304307
#[inline]
305308
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {

0 commit comments

Comments
 (0)