Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit a922dc0

Browse files
committed
wip
1 parent 4d26807 commit a922dc0

File tree

13 files changed

+2380
-211
lines changed

13 files changed

+2380
-211
lines changed

Cargo.lock

Lines changed: 302 additions & 210 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/LINC.md

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
# Libsql Inter-Node Communication protocol: LINC protocol
2+
3+
## Overview
4+
5+
This document describes the version 1 of Libsql Inter-Node Communication (LINC)
6+
protocol.
7+
8+
The first version of the protocol aims to merge the existing two
9+
protocol (proxy and replication) into a single one, and adds support for multi-tenancy.
10+
11+
LINC v1 is designed to handle 3 tasks:
12+
- inter-node communication
13+
- database replication
14+
- proxying of request from replicas to primaries
15+
16+
LINC makes use of streams to multiplex messages between databases on different nodes.
17+
18+
LINC v1 is implemented on top of TCP.
19+
20+
LINC uses bincode for message serialization and deserialization.
21+
22+
## Connection protocol
23+
24+
Each node is identified by a `node_id`, and an address.
25+
At startup, a sqld node is configured with list of peers (`(node_id, node_addr)`). A connection between two peers is initiated by the peer with the greatest node_id.
26+
27+
```mermaid
28+
graph TD
29+
node4 --> node3
30+
node4 --> node2
31+
node4 --> node1
32+
node3 --> node2
33+
node3 --> node1
34+
node2 --> node1
35+
node1
36+
```
37+
38+
A new node node can be added to the cluster with no reconfiguration as long as its `node_id` is greater than all other `node_id` in the cluster and it has the address of all the other nodes. In this case, the new node will initiate a connection with all other nodes.
39+
40+
On disconnection, the initiator of the connection attempts to reconnect.
41+
42+
## Messages
43+
44+
```rust
45+
enum Message {
46+
/// Messages destined to a node
47+
Node(NodeMessage),
48+
/// message destined to a stream
49+
Stream {
50+
stream_id: StreamId,
51+
payload: StreamMessage,
52+
},
53+
}
54+
55+
enum NodeMessage {
56+
/// Initial message exchanged between nodes when connecting
57+
Handshake {
58+
protocol_version: String,
59+
node_id: String,
60+
},
61+
/// Request to open a bi-directional stream between the client and the server
62+
OpenStream {
63+
/// Id to give to the newly opened stream
64+
stream_id: StreamId,
65+
/// Id of the database to open the stream to.
66+
database_id: Uuid,
67+
},
68+
/// Close a previously opened stream
69+
CloseStream {
70+
id: StreamId,
71+
},
72+
/// Error type returned while handling a node message
73+
Error(NodeError),
74+
}
75+
76+
enum NodeError {
77+
UnknownStream(StreamId),
78+
HandshakeVersionMismatch { expected: u32 },
79+
StreamAlreadyExist(StreamId),
80+
UnknownDatabase(DatabaseId, StreamId),
81+
}
82+
83+
enum StreamMessage {
84+
/// Replication message between a replica and a primary
85+
Replication(ReplicationMessage),
86+
/// Proxy message between a replica and a primary
87+
Proxy(ProxyMessage),
88+
Error(StreamError),
89+
}
90+
91+
enum ReplicationMessage {
92+
HandshakeResponse {
93+
/// id of the replication log
94+
log_id: Uuid,
95+
/// current frame_no of the primary
96+
current_frame_no: u64,
97+
},
98+
/// Replication request
99+
Replicate {
100+
/// next frame no to send
101+
next_frame_no: u64,
102+
},
103+
/// a batch of frames that are part of the same transaction
104+
Transaction {
105+
/// if not None, then the last frame is a commit frame, and this is the new size of the database.
106+
size_after: Option<u32>,
107+
/// frame_no of the last frame in frames
108+
end_frame_no: u64
109+
/// a batch of frames part of the transaction.
110+
frames: Vec<Frame>
111+
},
112+
/// Error occurred handling a replication message
113+
Error(StreamError)
114+
}
115+
116+
struct Frame {
117+
/// Page id of that frame
118+
page_id: u32,
119+
/// Data
120+
data: Bytes,
121+
}
122+
123+
enum ProxyMessage {
124+
/// Proxy a query to a primary
125+
ProxyRequest {
126+
/// id of the connection to perform the query against
127+
/// If the connection doesn't already exist it is created
128+
/// Id of the request.
129+
/// Responses to this request must have the same id.
130+
connection_id: u32,
131+
req_id: u32,
132+
query: Query,
133+
},
134+
/// Response to a proxied query
135+
ProxyResponse {
136+
/// id of the request this message is a response to.
137+
req_id: u32,
138+
/// Collection of steps to drive the query builder transducer.
139+
row_step: [RowStep]
140+
},
141+
/// Stop processing request `id`.
142+
CancelRequest {
143+
req_id: u32,
144+
},
145+
/// Close Connection with passed id.
146+
CloseConnection {
147+
connection_id: u32,
148+
},
149+
}
150+
151+
/// Steps applied to the query builder transducer to build a response to a proxied query.
152+
/// Those types closely mirror those of the `QueryBuilderTrait`.
153+
enum BuilderStep {
154+
BeginStep,
155+
FinishStep(u64, Option<u64>),
156+
StepError(StepError),
157+
ColsDesc([Column]),
158+
BeginRows,
159+
BeginRow,
160+
AddRowValue(Value),
161+
FinishRow,
162+
FinishRos,
163+
Finish(ConnectionState)
164+
}
165+
166+
// State of the connection after a query was executed
167+
enum ConnectionState {
168+
/// The connection is still in a open transaction state
169+
OpenTxn,
170+
/// The connection is idle.
171+
Idle,
172+
}
173+
174+
struct Column {
175+
/// name of the column
176+
name: string,
177+
/// Declared type of the column, if any.
178+
decl_ty: Option<string>,
179+
}
180+
181+
/// for now, the stringified version of a sqld::error::Error.
182+
struct StepError(String);
183+
184+
enum StreamError {
185+
NotAPrimary,
186+
AlreadyReplicating,
187+
}
188+
```
189+
190+
## Node Handshake
191+
192+
When a node connects to another node, it first need to perform a handshake. The
193+
handshake is initialized by the initializer of the connection. It sends the
194+
following message:
195+
196+
```typescipt
197+
type NodeHandshake = {
198+
version: string, // protocol version
199+
node_id: string,
200+
}
201+
```
202+
203+
If a peer receives a connection from a peer with a id smaller than his, it must reject the handshake with a `IllegalConnection` error
204+
205+
## Streams
206+
207+
Messages destined to a particular database are sent as part of a stream. A
208+
stream is created by sending a `NodeMessage::OpenStream`, specifying the id of
209+
the stream to open, along with the id of the database for which to open this
210+
stream. If the requested database is not on the destination node, the
211+
destination node respond with a `NodeError::UnknownDatabase` error, and the stream in not
212+
opened.
213+
214+
If a node receives a message for a stream that was not opened before, it responds a `NodeError::UnknownStream`
215+
216+
A stream is closed by sending a `CloseStream` with the id of the stream. If the
217+
stream does not exist an `NodeError::UnknownStream` error is returned.
218+
219+
Streams can be opened by either peer. Each stream is identified with by `i32`
220+
stream id. The peer that initiated the original connection allocates positive
221+
stream ids, while the acceptor peer allocates negative ids. 0 is not a legal
222+
value for a stream_id. The receiver of a request for a stream with id 0 must
223+
close the connection immediately.
224+
225+
The peer opening a stream is responsible for sending the close message. The
226+
other peer can close the stream at any point, but must not send close message
227+
for that stream. On subsequent message to that stream, it will respond with an
228+
`UnknownStream` message, forcing the initiator to deal with recreating a
229+
stream if necessary.
230+
231+
## Sub-protocols
232+
233+
### Replication
234+
235+
The replica is responsible for initiating the replication protocol. This is
236+
done by opening a stream to a primary. If the destination of the stream is not a
237+
primary database, it responds with a `StreamError::NotAPrimary` error and immediately close
238+
the stream. If the destination database is a primary, it responds to the stream
239+
open request with a `ReplicationMessage::HandshakeResponse` message. This message informs the
240+
replica of the current log version, and of the primary current replication
241+
index (frame_no).
242+
243+
The replica compares the log version it received from the primary with the one it has, if any. If the
244+
versions don't match, the replica deletes its state and start replicating again from the start.
245+
246+
After a successful handshake, the replica sends a `ReplicationMessage::Replicate` message with the
247+
next frame_no it's expecting. For example if the replica has not replicated any
248+
frame yet, it sends `ReplicationMessage::Replicate { next_frame_no: 0 }` to
249+
signify to the primary that it's expecting to be sent frame 0. The primary
250+
sends the smallest frame with a `frame_no` satisfying `frame_no >=
251+
next_frame_no`. Because logs can be compacted, the next frame_no the primary
252+
sends to the replica isn't necessarily the one the replica is expecting. It's correct to send
253+
the smallest frame >= next_frame_no because frame_nos only move forward in the event of a compaction: a
254+
frame can only be missing if it was written too more recently, hence _moving
255+
forward_ in the log. The primary ensure consistency by moving commit points
256+
accordingly. It is an error for the primary to send a frame_no strictly less
257+
than the requested frame_no, frame_nos can be received in any order.
258+
259+
In the event of a disconnection, it is the replica's duty to re-initiate the replication protocol.
260+
261+
Sending a replicate request twice on the same stream is an error. If a primary
262+
receives more than a single `Replicate` request, it closes the stream and sends
263+
a `StreamError::AlreadyReplicating` request. The replica can re-open a stream and start
264+
replicating again if necessary.
265+
266+
### Proxy
267+
268+
Replicas can proxy queries to their primary. Replica can start sending proxy request after they have sent a replication request.
269+
270+
To proxy a query, a replica sends a `ProxyRequest`. Proxied query on a same connection are serialized. The replica sets the connection id
271+
and the request id for the proxied query. If no connection exists for the
272+
passed id on the primary, one is created. The query is executed on the primary,
273+
and the result rows are returned in `ProxyResponse`. The result rows can be split
274+
into multiple `ProxyResponse`, enabling row streaming. A replica can send a `CancelRequest` to interrupt a request. Any
275+
`ProxyResponse` for that `request_id` can be dropped by the replica, and the
276+
primary should stop sending any more `ProxyResponse` message upon receiving the
277+
cancel request. The primary must rollback a cancelled request.
278+
279+
The primary can reduce the amount of concurrent open transaction by closing the
280+
underlying SQLite connection for proxied connections that are not in a open
281+
transaction state (`is_autocommit` is true). Subsequent requests on that
282+
connection id will re-open a connection, if necessary.

sqld/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ default-run = "sqld"
66

77
[dependencies]
88
anyhow = "1.0.66"
9+
async-bincode = "0.7.1"
910
async-lock = "2.6.0"
1011
async-trait = "0.1.58"
1112
axum = "0.6.18"
@@ -49,6 +50,7 @@ thiserror = "1.0.38"
4950
tokio = { version = "1.22.2", features = ["rt-multi-thread", "net", "io-std", "io-util", "time", "macros", "sync", "fs", "signal"] }
5051
tokio-stream = "0.1.11"
5152
tokio-tungstenite = "0.19"
53+
tokio-util = { version = "0.7.8", features = ["codec", "net"] }
5254
tonic = { version = "0.8.3", features = ["tls"] }
5355
tower = { version = "0.4.13", features = ["make"] }
5456
tower-http = { version = "0.3.5", features = ["compression-full", "cors", "trace"] }
@@ -67,6 +69,7 @@ url = "2.3"
6769
env_logger = "0.10"
6870
aws-config = "0.55"
6971
aws-sdk-s3 = "0.28"
72+
turmoil = "0.5.5"
7073

7174
[build-dependencies]
7275
prost-build = "0.11.4"

sqld/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ mod error;
3838
mod heartbeat;
3939
mod hrana;
4040
mod http;
41+
mod linc;
4142
mod query;
4243
mod query_analysis;
4344
mod query_result_builder;

0 commit comments

Comments
 (0)