Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 499d998

Browse files
committed
implement LINC protocol
1 parent ba0b778 commit 499d998

File tree

11 files changed

+2097
-210
lines changed

11 files changed

+2097
-210
lines changed

Cargo.lock

Lines changed: 302 additions & 210 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqld/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ default-run = "sqld"
66

77
[dependencies]
88
anyhow = "1.0.66"
9+
async-bincode = "0.7.1"
910
async-lock = "2.6.0"
1011
async-trait = "0.1.58"
1112
axum = "0.6.18"
@@ -49,6 +50,7 @@ thiserror = "1.0.38"
4950
tokio = { version = "1.22.2", features = ["rt-multi-thread", "net", "io-std", "io-util", "time", "macros", "sync", "fs", "signal"] }
5051
tokio-stream = "0.1.11"
5152
tokio-tungstenite = "0.19"
53+
tokio-util = { version = "0.7.8", features = ["codec", "net"] }
5254
tonic = { version = "0.8.3", features = ["tls"] }
5355
tower = { version = "0.4.13", features = ["make"] }
5456
tower-http = { version = "0.3.5", features = ["compression-full", "cors", "trace"] }
@@ -67,6 +69,7 @@ url = "2.3"
6769
env_logger = "0.10"
6870
aws-config = "0.55"
6971
aws-sdk-s3 = "0.28"
72+
turmoil = "0.5.5"
7073

7174
[build-dependencies]
7275
prost-build = "0.11.4"

sqld/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ mod error;
3838
mod heartbeat;
3939
mod hrana;
4040
mod http;
41+
mod linc;
4142
mod query;
4243
mod query_analysis;
4344
mod query_result_builder;

sqld/src/linc/bus.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
use std::collections::{hash_map::Entry, HashMap};
2+
use std::sync::Arc;
3+
4+
use anyhow::anyhow;
5+
use parking_lot::Mutex;
6+
use tokio::sync::{mpsc, Notify};
7+
use uuid::Uuid;
8+
9+
use super::connection::{ConnectionHandle, Stream};
10+
11+
type NodeId = Uuid;
12+
type DatabaseId = Uuid;
13+
14+
#[must_use]
15+
pub struct Subscription {
16+
receiver: mpsc::Receiver<Stream>,
17+
bus: Bus,
18+
database_id: DatabaseId,
19+
}
20+
21+
impl Drop for Subscription {
22+
fn drop(&mut self) {
23+
self.bus
24+
.inner
25+
.lock()
26+
.subscriptions
27+
.remove(&self.database_id);
28+
}
29+
}
30+
31+
impl futures::Stream for Subscription {
32+
type Item = Stream;
33+
34+
fn poll_next(
35+
mut self: std::pin::Pin<&mut Self>,
36+
cx: &mut std::task::Context<'_>,
37+
) -> std::task::Poll<Option<Self::Item>> {
38+
self.receiver.poll_recv(cx)
39+
}
40+
}
41+
42+
#[derive(Clone)]
43+
pub struct Bus {
44+
inner: Arc<Mutex<BusInner>>,
45+
pub node_id: NodeId,
46+
}
47+
48+
enum ConnectionSlot {
49+
Handle(ConnectionHandle),
50+
// Interest in the connection when it becomes available
51+
Interest(Arc<Notify>),
52+
}
53+
54+
struct BusInner {
55+
connections: HashMap<NodeId, ConnectionSlot>,
56+
subscriptions: HashMap<DatabaseId, mpsc::Sender<Stream>>,
57+
}
58+
59+
impl Bus {
60+
pub fn new(node_id: NodeId) -> Self {
61+
Self {
62+
node_id,
63+
inner: Arc::new(Mutex::new(BusInner {
64+
connections: HashMap::new(),
65+
subscriptions: HashMap::new(),
66+
})),
67+
}
68+
}
69+
70+
/// open a new stream to the database at `database_id` on the node `node_id`
71+
pub async fn new_stream(
72+
&self,
73+
node_id: NodeId,
74+
database_id: DatabaseId,
75+
) -> anyhow::Result<Stream> {
76+
let get_conn = || {
77+
let mut lock = self.inner.lock();
78+
match lock.connections.entry(node_id) {
79+
Entry::Occupied(mut e) => match e.get_mut() {
80+
ConnectionSlot::Handle(h) => Ok(h.clone()),
81+
ConnectionSlot::Interest(notify) => Err(notify.clone()),
82+
},
83+
Entry::Vacant(e) => {
84+
let notify = Arc::new(Notify::new());
85+
e.insert(ConnectionSlot::Interest(notify.clone()));
86+
Err(notify)
87+
}
88+
}
89+
};
90+
91+
let conn = match get_conn() {
92+
Ok(conn) => conn,
93+
Err(notify) => {
94+
notify.notified().await;
95+
get_conn().map_err(|_| anyhow!("failed to create stream"))?
96+
}
97+
};
98+
99+
conn.new_stream(database_id).await
100+
}
101+
102+
/// Notify a subscription that new stream was openned
103+
pub async fn notify_subscription(
104+
&mut self,
105+
database_id: DatabaseId,
106+
stream: Stream,
107+
) -> anyhow::Result<()> {
108+
let maybe_sender = self.inner.lock().subscriptions.get(&database_id).cloned();
109+
110+
match maybe_sender {
111+
Some(sender) => {
112+
if sender.send(stream).await.is_err() {
113+
anyhow::bail!("subscription for {database_id} closed");
114+
}
115+
116+
Ok(())
117+
}
118+
None => {
119+
anyhow::bail!("no subscription for {database_id}")
120+
}
121+
}
122+
}
123+
124+
#[cfg(test)]
125+
pub fn is_empty(&self) -> bool {
126+
self.inner.lock().connections.is_empty()
127+
}
128+
129+
#[must_use]
130+
pub fn register_connection(&self, node_id: NodeId, conn: ConnectionHandle) -> Registration {
131+
let mut lock = self.inner.lock();
132+
match lock.connections.entry(node_id) {
133+
Entry::Occupied(mut e) => {
134+
if let ConnectionSlot::Interest(ref notify) = e.get() {
135+
notify.notify_waiters();
136+
}
137+
138+
*e.get_mut() = ConnectionSlot::Handle(conn);
139+
}
140+
Entry::Vacant(e) => {
141+
e.insert(ConnectionSlot::Handle(conn));
142+
}
143+
}
144+
145+
Registration {
146+
bus: self.clone(),
147+
node_id,
148+
}
149+
}
150+
151+
pub fn subscribe(&self, database_id: DatabaseId) -> anyhow::Result<Subscription> {
152+
let (sender, receiver) = mpsc::channel(1);
153+
{
154+
let mut inner = self.inner.lock();
155+
156+
if inner.subscriptions.contains_key(&database_id) {
157+
anyhow::bail!("a subscription already exist for that database");
158+
}
159+
160+
inner.subscriptions.insert(database_id, sender);
161+
}
162+
163+
Ok(Subscription {
164+
receiver,
165+
bus: self.clone(),
166+
database_id,
167+
})
168+
}
169+
}
170+
171+
pub struct Registration {
172+
bus: Bus,
173+
node_id: NodeId,
174+
}
175+
176+
impl Drop for Registration {
177+
fn drop(&mut self) {
178+
assert!(self
179+
.bus
180+
.inner
181+
.lock()
182+
.connections
183+
.remove(&self.node_id)
184+
.is_some());
185+
}
186+
}

0 commit comments

Comments
 (0)