Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

alloc response message #570

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libsqlx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bincode = "1.3.3"
bytemuck = { version = "1.13.1", features = ["derive"] }
bytes = { version = "1.4.0", features = ["serde"] }
bytesize = { version = "1.2.0", features = ["serde"] }
chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.3.11", features = ["derive"] }
color-eyre = "0.6.2"
either = "1.8.1"
Expand Down
6 changes: 2 additions & 4 deletions libsqlx-server/src/hrana/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ pub enum HranaError {
}

impl HranaError {
pub fn code(&self) -> Option<&str>{
pub fn code(&self) -> Option<&str> {
match self {
HranaError::Stmt(e) => Some(e.code()),
HranaError::StreamResponse(e) => Some(e.code()),
HranaError::Stream(_)
| HranaError::Libsqlx(_)
| HranaError::Proto(_) => None,
HranaError::Stream(_) | HranaError::Libsqlx(_) | HranaError::Proto(_) => None,
}
}
}
75 changes: 63 additions & 12 deletions libsqlx-server/src/http/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,49 @@ use std::sync::Arc;
use std::time::Duration;

use axum::extract::{Path, State};
use axum::response::IntoResponse;
use axum::routing::{delete, post};
use axum::{Json, Router};
use chrono::{DateTime, Utc};
use color_eyre::eyre::Result;
use hyper::server::accept::Accept;
use hyper::StatusCode;
use serde::{Deserialize, Deserializer, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::allocation::config::{AllocConfig, DbConfig};
use crate::linc::bus::Bus;
use crate::linc::NodeId;
use crate::manager::Manager;
use crate::meta::DatabaseId;
use crate::meta::{AllocationError, DatabaseId};

impl IntoResponse for crate::error::Error {
fn into_response(self) -> axum::response::Response {
#[derive(Serialize)]
struct ErrorBody {
message: String,
}

let mut resp = Json(ErrorBody {
message: self.to_string(),
})
.into_response();
*resp.status_mut() = match self {
crate::error::Error::Libsqlx(_)
| crate::error::Error::InjectorExited
| crate::error::Error::ConnectionClosed
| crate::error::Error::Io(_)
| crate::error::Error::AllocationClosed
| crate::error::Error::Internal(_)
| crate::error::Error::Heed(_) => StatusCode::INTERNAL_SERVER_ERROR,
crate::error::Error::Allocation(AllocationError::AlreadyExist(_)) => {
StatusCode::BAD_REQUEST
}
};

resp
}
}

pub struct Config {
pub bus: Arc<Bus<Arc<Manager>>>,
Expand Down Expand Up @@ -47,7 +78,19 @@ where
struct ErrorResponse {}

#[derive(Serialize, Debug)]
struct AllocateResp {}
#[serde(rename_all = "lowercase")]
enum DbType {
Primary,
Replica,
}

#[derive(Serialize, Debug)]
struct AllocationSummaryView {
created_at: DateTime<Utc>,
database_name: String,
#[serde(rename = "type")]
ty: DbType,
}

#[derive(Deserialize, Debug)]
struct AllocateReq {
Expand Down Expand Up @@ -134,7 +177,7 @@ const fn default_txn_timeout() -> HumanDuration {
async fn allocate(
State(state): State<Arc<AdminServerState>>,
Json(req): Json<AllocateReq>,
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
) -> crate::Result<Json<AllocationSummaryView>> {
let config = AllocConfig {
max_conccurent_connection: req.max_conccurent_connection.unwrap_or(16),
db_name: req.database_name.clone(),
Expand Down Expand Up @@ -164,19 +207,26 @@ async fn allocate(

let dispatcher = state.bus.clone();
let id = DatabaseId::from_name(&req.database_name);
state.bus.handler().allocate(id, &config, dispatcher).await;
let meta = state.bus.handler().allocate(id, config, dispatcher).await?;

Ok(Json(AllocateResp {}))
Ok(Json(AllocationSummaryView {
created_at: meta.created_at,
database_name: meta.config.db_name,
ty: match meta.config.db_config {
DbConfig::Primary {..} => DbType::Primary,
DbConfig::Replica {..} => DbType::Replica,
}
}))
}

async fn deallocate(
State(state): State<Arc<AdminServerState>>,
Path(database_name): Path<String>,
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
) -> crate::Result<()> {
let id = DatabaseId::from_name(&database_name);
state.bus.handler().deallocate(id).await;
state.bus.handler().deallocate(id).await?;

Ok(Json(AllocateResp {}))
Ok(())
}

#[derive(Serialize, Debug)]
Expand All @@ -191,15 +241,16 @@ struct AllocView {

async fn list_allocs(
State(state): State<Arc<AdminServerState>>,
) -> Result<Json<ListAllocResp>, Json<ErrorResponse>> {
) -> crate::Result<Json<ListAllocResp>> {
let allocs = state
.bus
.handler()
.store()
.list_allocs()
.unwrap()
.list_allocs()?
.into_iter()
.map(|cfg| AllocView { id: cfg.db_name })
.map(|meta| AllocView {
id: meta.config.db_name,
})
.collect();

Ok(Json(ListAllocResp { allocs }))
Expand Down
12 changes: 6 additions & 6 deletions libsqlx-server/src/http/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use axum::response::IntoResponse;
use axum::routing::post;
use axum::{Json, Router};
use color_eyre::Result;
use hyper::StatusCode;
use hyper::server::accept::Accept;
use hyper::StatusCode;
use serde::Serialize;
use tokio::io::{AsyncRead, AsyncWrite};

Expand All @@ -30,12 +30,12 @@ impl IntoResponse for HranaError {
fn into_response(self) -> axum::response::Response {
let (message, code) = match self.code() {
Some(code) => (self.to_string(), code.to_owned()),
None => ("internal error, please check the logs".to_owned(), "INTERNAL_ERROR".to_owned()),
};
let resp = ErrorResponseBody {
message,
code,
None => (
"internal error, please check the logs".to_owned(),
"INTERNAL_ERROR".to_owned(),
),
};
let resp = ErrorResponseBody { message, code };
let mut resp = Json(resp).into_response();
*resp.status_mut() = StatusCode::BAD_REQUEST;
resp
Expand Down
19 changes: 10 additions & 9 deletions libsqlx-server/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::compactor::CompactionQueue;
use crate::linc::bus::Dispatch;
use crate::linc::handler::Handler;
use crate::linc::Inbound;
use crate::meta::{DatabaseId, Store};
use crate::meta::{AllocMeta, DatabaseId, Store};
use crate::replica_commit_store::ReplicaCommitStore;

pub struct Manager {
Expand Down Expand Up @@ -52,24 +52,24 @@ impl Manager {
return Ok(Some(sender.clone()));
}

if let Some(config) = self.meta_store.meta(&database_id)? {
if let Some(meta) = self.meta_store.meta(&database_id)? {
let path = self.db_path.join("dbs").join(database_id.to_string());
tokio::fs::create_dir_all(&path).await?;
let (alloc_sender, inbox) = mpsc::channel(MAX_ALLOC_MESSAGE_QUEUE_LEN);
let alloc = Allocation {
inbox,
database: Database::from_config(
&config,
&meta.config,
path,
dispatcher.clone(),
self.compaction_queue.clone(),
self.replica_commit_store.clone(),
)?,
connections_futs: JoinSet::new(),
next_conn_id: 0,
max_concurrent_connections: config.max_conccurent_connection,
max_concurrent_connections: meta.config.max_conccurent_connection,
dispatcher,
db_name: config.db_name,
db_name: meta.config.db_name,
connections: HashMap::new(),
};

Expand All @@ -86,12 +86,13 @@ impl Manager {
pub async fn allocate(
self: &Arc<Self>,
database_id: DatabaseId,
meta: &AllocConfig,
config: AllocConfig,
dispatcher: Arc<dyn Dispatch>,
) -> crate::Result<()> {
self.store().allocate(&database_id, meta)?;
) -> crate::Result<AllocMeta> {
let meta = self.store().allocate(&database_id, config)?;
self.schedule(database_id, dispatcher).await?;
Ok(())

Ok(meta)
}

pub async fn deallocate(&self, database_id: DatabaseId) -> crate::Result<()> {
Expand Down
26 changes: 19 additions & 7 deletions libsqlx-server/src/meta.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt;
use std::mem::size_of;

use chrono::{DateTime, Utc};
use heed::bytemuck::{Pod, Zeroable};
use heed_types::{OwnedType, SerdeBincode};
use itertools::Itertools;
Expand All @@ -11,9 +12,15 @@ use tokio::task::block_in_place;

use crate::allocation::config::AllocConfig;

#[derive(Debug, Serialize, Deserialize)]
pub struct AllocMeta {
pub config: AllocConfig,
pub created_at: DateTime<Utc>,
}

pub struct Store {
env: heed::Env,
alloc_config_db: heed::Database<OwnedType<DatabaseId>, SerdeBincode<AllocConfig>>,
alloc_config_db: heed::Database<OwnedType<DatabaseId>, SerdeBincode<AllocMeta>>,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Hash, Clone, Copy, Pod, Zeroable)]
Expand Down Expand Up @@ -73,7 +80,7 @@ impl Store {
})
}

pub fn allocate(&self, id: &DatabaseId, meta: &AllocConfig) -> crate::Result<()> {
pub fn allocate(&self, id: &DatabaseId, config: AllocConfig) -> crate::Result<AllocMeta> {
block_in_place(|| {
let mut txn = self.env.write_txn()?;
if self
Expand All @@ -82,14 +89,19 @@ impl Store {
.get(&txn, id)?
.is_some()
{
Err(AllocationError::AlreadyExist(meta.db_name.clone()))?;
Err(AllocationError::AlreadyExist(config.db_name.clone()))?;
};

self.alloc_config_db.put(&mut txn, id, meta)?;
let meta = AllocMeta {
config,
created_at: Utc::now(),
};

self.alloc_config_db.put(&mut txn, id, &meta)?;

txn.commit()?;

Ok(())
Ok(meta)
})
}

Expand All @@ -103,14 +115,14 @@ impl Store {
})
}

pub fn meta(&self, id: &DatabaseId) -> crate::Result<Option<AllocConfig>> {
pub fn meta(&self, id: &DatabaseId) -> crate::Result<Option<AllocMeta>> {
block_in_place(|| {
let txn = self.env.read_txn()?;
Ok(self.alloc_config_db.get(&txn, id)?)
})
}

pub fn list_allocs(&self) -> crate::Result<Vec<AllocConfig>> {
pub fn list_allocs(&self) -> crate::Result<Vec<AllocMeta>> {
block_in_place(|| {
let txn = self.env.read_txn()?;
let res = self
Expand Down