Skip to content

Commit ed77335

Browse files
authored
database transaction support in typescript (#1847)
Adds support for transactions in runtimes/core and runtimes/js We should probably merge encoredev/rust-postgres#1 and update Cargo.toml again. (suggested that change upstream as well sfackler/rust-postgres#1230)
1 parent 3a6af6d commit ed77335

File tree

8 files changed

+370
-298
lines changed

8 files changed

+370
-298
lines changed

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ insta.opt-level = 3
99
lto = true
1010

1111
[patch.crates-io]
12-
tokio-postgres = { git = "https://github.com/encoredev/rust-postgres", branch = "proxy" }
13-
postgres-protocol = { git = "https://github.com/encoredev/rust-postgres", branch = "proxy" }
12+
tokio-postgres = { git = "https://github.com/encoredev/rust-postgres", branch = "encore-patches" }
13+
postgres-protocol = { git = "https://github.com/encoredev/rust-postgres", branch = "encore-patches" }
1414
swc_ecma_parser = { git = "https://github.com/encoredev/swc", branch = "node-resolve-exports" }
1515
swc_ecma_ast = { git = "https://github.com/encoredev/swc", branch = "node-resolve-exports" }
1616
swc_ecma_transforms_base = { git = "https://github.com/encoredev/swc", branch = "node-resolve-exports" }

docs/ts/primitives/databases.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,31 @@ await db.rawExec(
171171
);
172172
```
173173

174+
### Transactions
175+
176+
Transactions allow you to group multiple database operations into a single unit of work. If any operation within the transaction fails, the entire transaction is rolled back, ensuring data consistency.
177+
178+
The transaction type implements `AsyncDisposable`, which automatically rolls back the transaction if it is not explicitly committed or rolled back. This ensures that no open transactions are left accidentally.
179+
180+
For example:
181+
182+
```ts
183+
await using tx = await db.begin();
184+
185+
await db.exec`
186+
INSERT INTO todo_item (title, done)
187+
VALUES (${title1}, false)
188+
`;
189+
190+
await db.exec`
191+
INSERT INTO todo_item (title, done)
192+
VALUES (${title2}, false)
193+
`;
194+
195+
await tx.commit();
196+
```
197+
198+
174199
## Connecting to databases
175200

176201
It's often useful to be able to connect to the database from outside the backend application. For example for scripts, ad-hoc querying, or dumping data for analysis.

runtimes/core/src/sqldb/client.rs

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use crate::sqldb::val::RowValue;
1212
use crate::trace::{protocol, Tracer};
1313
use crate::{model, sqldb};
1414

15+
use super::transaction::Transaction;
16+
1517
type Mgr = PostgresConnectionManager<postgres_native_tls::MakeTlsConnector>;
1618

1719
pub struct Pool {
@@ -99,6 +101,14 @@ impl Pool {
99101
tracer: self.tracer.clone(),
100102
})
101103
}
104+
105+
pub async fn begin(&self, source: Option<&model::Request>) -> Result<Transaction, Error> {
106+
let conn = self.pool.get_owned().await.map_err(|e| match e {
107+
RunError::User(err) => err,
108+
RunError::TimedOut => tokio_postgres::Error::__private_api_timeout(),
109+
})?;
110+
Transaction::begin(conn, self.tracer.clone(), source).await
111+
}
102112
}
103113

104114
pub struct Cursor {
@@ -135,37 +145,14 @@ impl Row {
135145
}
136146
}
137147

138-
type PooledConn =
148+
pub(crate) type PooledConn =
139149
PooledConnection<'static, PostgresConnectionManager<postgres_native_tls::MakeTlsConnector>>;
140150

141151
pub struct Connection {
142152
conn: tokio::sync::RwLock<Option<PooledConn>>,
143153
tracer: QueryTracer,
144154
}
145155

146-
#[derive(Debug)]
147-
pub enum Error {
148-
DB(tokio_postgres::Error),
149-
Closed,
150-
ConnectTimeout,
151-
}
152-
153-
impl std::fmt::Display for Error {
154-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155-
match self {
156-
Error::DB(err) => <tokio_postgres::Error as std::fmt::Display>::fmt(err, f),
157-
Error::Closed => f.write_str("connection_closed"),
158-
Error::ConnectTimeout => f.write_str("timeout establishing connection"),
159-
}
160-
}
161-
}
162-
163-
impl From<tokio_postgres::Error> for Error {
164-
fn from(err: tokio_postgres::Error) -> Self {
165-
Error::DB(err)
166-
}
167-
}
168-
169156
impl Connection {
170157
pub async fn close(&self) {
171158
let mut guard = self.conn.write().await;
@@ -197,11 +184,34 @@ impl Connection {
197184
}
198185
}
199186

187+
#[derive(Debug)]
188+
pub enum Error {
189+
DB(tokio_postgres::Error),
190+
Closed,
191+
ConnectTimeout,
192+
}
193+
194+
impl std::fmt::Display for Error {
195+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196+
match self {
197+
Error::DB(err) => <tokio_postgres::Error as std::fmt::Display>::fmt(err, f),
198+
Error::Closed => f.write_str("connection_closed"),
199+
Error::ConnectTimeout => f.write_str("timeout establishing connection"),
200+
}
201+
}
202+
}
203+
204+
impl From<tokio_postgres::Error> for Error {
205+
fn from(err: tokio_postgres::Error) -> Self {
206+
Error::DB(err)
207+
}
208+
}
209+
200210
#[derive(Debug, Clone)]
201-
struct QueryTracer(Tracer);
211+
pub(crate) struct QueryTracer(Tracer);
202212

203213
impl QueryTracer {
204-
async fn trace<F, Fut>(
214+
pub(crate) async fn trace<F, Fut>(
205215
&self,
206216
source: Option<&model::Request>,
207217
query: &str,
@@ -235,4 +245,36 @@ impl QueryTracer {
235245
stream: Box::pin(stream),
236246
})
237247
}
248+
249+
pub(crate) async fn trace_batch_execute<F, Fut>(
250+
&self,
251+
source: Option<&model::Request>,
252+
query: &str,
253+
exec: F,
254+
) -> Result<(), Error>
255+
where
256+
F: FnOnce() -> Fut,
257+
Fut: Future<Output = Result<(), Error>>,
258+
{
259+
let start_id = if let Some(source) = source {
260+
let id = self
261+
.0
262+
.db_query_start(protocol::DBQueryStartData { source, query });
263+
Some(id)
264+
} else {
265+
None
266+
};
267+
268+
let result = exec().await;
269+
270+
if let Some(start_id) = start_id {
271+
self.0.db_query_end(protocol::DBQueryEndData {
272+
start_id,
273+
source: source.unwrap(),
274+
error: result.as_ref().err(),
275+
});
276+
}
277+
278+
result
279+
}
238280
}

runtimes/core/src/sqldb/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
mod client;
22
mod manager;
3+
mod transaction;
34
mod val;
45

56
pub use client::{Connection, Cursor, Pool, Row};
67
pub use manager::{Database, DatabaseImpl, Manager, ManagerConfig};
8+
pub use transaction::Transaction;
79
pub use val::RowValue;
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use tokio_postgres::types::BorrowToSql;
2+
3+
use crate::model;
4+
5+
use super::{
6+
client::{Error, PooledConn, QueryTracer},
7+
Cursor,
8+
};
9+
10+
// Heavily inspired by rust-postgres, but where the transaction doesnt have a lifetime, so it can
11+
// be shared via napi-rs.
12+
//
13+
// https://github.com/sfackler/rust-postgres/blob/720ffe83216714bf9716a03122c547a2e8e9bfd9/tokio-postgres/src/transaction.rs
14+
15+
pub struct Transaction {
16+
conn: PooledConn,
17+
tracer: QueryTracer,
18+
done: bool,
19+
}
20+
21+
impl Transaction {
22+
pub(crate) async fn begin(
23+
conn: PooledConn,
24+
tracer: QueryTracer,
25+
source: Option<&model::Request>,
26+
) -> Result<Self, Error> {
27+
struct RollbackIfNotDone<'me> {
28+
client: &'me tokio_postgres::Client,
29+
done: bool,
30+
}
31+
32+
impl Drop for RollbackIfNotDone<'_> {
33+
fn drop(&mut self) {
34+
if self.done {
35+
return;
36+
}
37+
38+
self.client.__private_api_rollback(None);
39+
}
40+
}
41+
42+
// This is done, as `Future` created by this method can be dropped after
43+
// `RequestMessages` is synchronously send to the `Connection` by
44+
// `batch_execute()`, but before `Responses` is asynchronously polled to
45+
// completion. In that case `Transaction` won't be created and thus
46+
// won't be rolled back.
47+
{
48+
let mut cleaner = RollbackIfNotDone {
49+
client: &conn,
50+
done: false,
51+
};
52+
53+
tracer
54+
.trace_batch_execute(source, "BEGIN", || async {
55+
conn.batch_execute("BEGIN").await.map_err(Error::from)
56+
})
57+
.await?;
58+
59+
cleaner.done = true;
60+
}
61+
62+
Ok(Transaction {
63+
conn,
64+
tracer,
65+
done: false,
66+
})
67+
}
68+
69+
pub async fn commit(mut self, source: Option<&model::Request>) -> Result<(), Error> {
70+
self.done = true;
71+
self.batch_execute("COMMIT", source).await
72+
}
73+
74+
pub async fn rollback(mut self, source: Option<&model::Request>) -> Result<(), Error> {
75+
self.done = true;
76+
self.batch_execute("ROLLBACK", source).await
77+
}
78+
79+
async fn batch_execute(
80+
&self,
81+
query: &str,
82+
source: Option<&model::Request>,
83+
) -> Result<(), Error> {
84+
self.tracer
85+
.trace_batch_execute(source, query, || async {
86+
self.conn.batch_execute(query).await.map_err(Error::from)
87+
})
88+
.await
89+
}
90+
91+
pub async fn query_raw<P, I>(
92+
&self,
93+
query: &str,
94+
params: I,
95+
source: Option<&model::Request>,
96+
) -> Result<Cursor, Error>
97+
where
98+
P: BorrowToSql,
99+
I: IntoIterator<Item = P>,
100+
I::IntoIter: ExactSizeIterator,
101+
{
102+
self.tracer
103+
.trace(source, query, || async {
104+
self.conn
105+
.query_raw(query, params)
106+
.await
107+
.map_err(Error::from)
108+
})
109+
.await
110+
}
111+
}
112+
113+
impl Drop for Transaction {
114+
fn drop(&mut self) {
115+
if self.done {
116+
return;
117+
}
118+
119+
log::warn!("transaction not completed, forcing rollback");
120+
self.conn.__private_api_rollback(None);
121+
}
122+
}

0 commit comments

Comments
 (0)