Skip to content

Commit 8610b18

Browse files
authored
feat(server): add graceful::Watcher type (#182)
This allows creating an owned type that can be passed to another task before watching the connection. Closes #136
1 parent ab2c9cd commit 8610b18

File tree

1 file changed

+42
-6
lines changed

1 file changed

+42
-6
lines changed

src/server/graceful.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,20 @@ use pin_project_lite::pin_project;
1717
use tokio::sync::watch;
1818

1919
/// A graceful shutdown utility
20+
// Purposefully not `Clone`, see `watcher()` method for why.
2021
pub struct GracefulShutdown {
2122
tx: watch::Sender<()>,
2223
}
2324

25+
/// A watcher side of the graceful shutdown.
26+
///
27+
/// This type can only watch a connection, it cannot trigger a shutdown.
28+
///
29+
/// Call [`GracefulShutdown::watcher()`] to construct one of these.
30+
pub struct Watcher {
31+
rx: watch::Receiver<()>,
32+
}
33+
2434
impl GracefulShutdown {
2535
/// Create a new graceful shutdown helper.
2636
pub fn new() -> Self {
@@ -30,12 +40,20 @@ impl GracefulShutdown {
3040

3141
/// Wrap a future for graceful shutdown watching.
3242
pub fn watch<C: GracefulConnection>(&self, conn: C) -> impl Future<Output = C::Output> {
33-
let mut rx = self.tx.subscribe();
34-
GracefulConnectionFuture::new(conn, async move {
35-
let _ = rx.changed().await;
36-
// hold onto the rx until the watched future is completed
37-
rx
38-
})
43+
self.watcher().watch(conn)
44+
}
45+
46+
/// Create an owned type that can watch a connection.
47+
///
48+
/// This method allows created an owned type that can be sent onto another
49+
/// task before calling [`Watcher::watch()`].
50+
// Internal: this function exists because `Clone` allows footguns.
51+
// If the `tx` were cloned (or the `rx`), race conditions can happens where
52+
// one task starting a shutdown is scheduled and interwined with a task
53+
// starting to watch a connection, and the "watch version" is one behind.
54+
pub fn watcher(&self) -> Watcher {
55+
let rx = self.tx.subscribe();
56+
Watcher { rx }
3957
}
4058

4159
/// Signal shutdown for all watched connections.
@@ -64,6 +82,24 @@ impl Default for GracefulShutdown {
6482
}
6583
}
6684

85+
impl Watcher {
86+
/// Wrap a future for graceful shutdown watching.
87+
pub fn watch<C: GracefulConnection>(self, conn: C) -> impl Future<Output = C::Output> {
88+
let Watcher { mut rx } = self;
89+
GracefulConnectionFuture::new(conn, async move {
90+
let _ = rx.changed().await;
91+
// hold onto the rx until the watched future is completed
92+
rx
93+
})
94+
}
95+
}
96+
97+
impl Debug for Watcher {
98+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99+
f.debug_struct("GracefulWatcher").finish()
100+
}
101+
}
102+
67103
pin_project! {
68104
struct GracefulConnectionFuture<C, F: Future> {
69105
#[pin]

0 commit comments

Comments
 (0)