Skip to content

Simplify ChaosProxy code #7494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tests/server_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn startup_without_database() {
// Break the networking *before* starting the binary, to ensure the binary can fully startup
// without a database connection. Most of crates.io should not work when started without a
// database, but unconditional redirects will work.
server_bin.chaosproxy.break_networking();
server_bin.chaosproxy.break_networking().unwrap();

let running_server = server_bin.start().unwrap();

Expand Down
26 changes: 13 additions & 13 deletions src/tests/unhealthy_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ fn download_crate_with_broken_networking_primary_database() {
// do an unconditional redirect to the CDN, without checking whether the crate exists or what
// the exact capitalization of crate name is.

app.primary_db_chaosproxy().break_networking();
app.primary_db_chaosproxy().break_networking().unwrap();
assert_unconditional_redirects(&anon);

// After restoring the network and waiting for the database pool to get healthy again redirects
// should be checked again.

app.primary_db_chaosproxy().restore_networking();
app.primary_db_chaosproxy().restore_networking().unwrap();
app.as_inner()
.primary_database
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
Expand Down Expand Up @@ -75,12 +75,12 @@ fn http_error_with_unhealthy_database() {
let response = anon.get::<()>("/api/v1/summary");
assert_eq!(response.status(), StatusCode::OK);

app.primary_db_chaosproxy().break_networking();
app.primary_db_chaosproxy().break_networking().unwrap();

let response = anon.get::<()>("/api/v1/summary");
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);

app.primary_db_chaosproxy().restore_networking();
app.primary_db_chaosproxy().restore_networking().unwrap();
app.as_inner()
.primary_database
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
Expand All @@ -99,14 +99,14 @@ fn fallback_to_replica_returns_user_info() {
.with_chaos_proxy()
.with_user();
app.db_new_user("foo");
app.primary_db_chaosproxy().break_networking();
app.primary_db_chaosproxy().break_networking().unwrap();

// When the primary database is down, requests are forwarded to the replica database
let response = owner.get::<()>(URL);
assert_eq!(response.status(), 200);

// restore primary database connection
app.primary_db_chaosproxy().restore_networking();
app.primary_db_chaosproxy().restore_networking().unwrap();
app.as_inner()
.primary_database
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
Expand All @@ -122,15 +122,15 @@ fn restored_replica_returns_user_info() {
.with_chaos_proxy()
.with_user();
app.db_new_user("foo");
app.primary_db_chaosproxy().break_networking();
app.replica_db_chaosproxy().break_networking();
app.primary_db_chaosproxy().break_networking().unwrap();
app.replica_db_chaosproxy().break_networking().unwrap();

// When both primary and replica database are down, the request returns an error
let response = owner.get::<()>(URL);
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);

// Once the replica database is restored, it should serve as a fallback again
app.replica_db_chaosproxy().restore_networking();
app.replica_db_chaosproxy().restore_networking().unwrap();
app.as_inner()
.read_only_replica_database
.as_ref()
Expand All @@ -142,7 +142,7 @@ fn restored_replica_returns_user_info() {
assert_eq!(response.status(), StatusCode::OK);

// restore connection
app.primary_db_chaosproxy().restore_networking();
app.primary_db_chaosproxy().restore_networking().unwrap();
app.as_inner()
.primary_database
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
Expand All @@ -158,15 +158,15 @@ fn restored_primary_returns_user_info() {
.with_chaos_proxy()
.with_user();
app.db_new_user("foo");
app.primary_db_chaosproxy().break_networking();
app.replica_db_chaosproxy().break_networking();
app.primary_db_chaosproxy().break_networking().unwrap();
app.replica_db_chaosproxy().break_networking().unwrap();

// When both primary and replica database are down, the request returns an error
let response = owner.get::<()>(URL);
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);

// Once the replica database is restored, it should serve as a fallback again
app.primary_db_chaosproxy().restore_networking();
app.primary_db_chaosproxy().restore_networking().unwrap();
app.as_inner()
.primary_database
.wait_until_healthy(DB_HEALTHY_TIMEOUT)
Expand Down
94 changes: 52 additions & 42 deletions src/tests/util/chaosproxy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Context, Error};
use anyhow::{anyhow, Context};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::{
Expand All @@ -10,6 +10,7 @@ use tokio::{
runtime::Runtime,
sync::broadcast::Sender,
};
use tracing::error;
use url::Url;

pub(crate) struct ChaosProxy {
Expand All @@ -23,7 +24,7 @@ pub(crate) struct ChaosProxy {
}

impl ChaosProxy {
pub(crate) fn new(backend_address: SocketAddr) -> Result<Arc<Self>, Error> {
pub(crate) fn new(backend_address: SocketAddr) -> anyhow::Result<Arc<Self>> {
let runtime = Runtime::new().expect("failed to create Tokio runtime");
let listener = runtime.block_on(TcpListener::bind("127.0.0.1:0"))?;

Expand All @@ -42,42 +43,49 @@ impl ChaosProxy {

let instance_clone = instance.clone();
instance.runtime.spawn(async move {
if let Err(err) = instance_clone.server_loop(listener).await {
eprintln!("ChaosProxy server error: {err}");
if let Err(error) = instance_clone.server_loop(listener).await {
error!(%error, "ChaosProxy server error");
}
});

Ok(instance)
}

pub(crate) fn proxy_database_url(url: &str) -> Result<(Arc<Self>, String), Error> {
pub(crate) fn proxy_database_url(url: &str) -> anyhow::Result<(Arc<Self>, String)> {
let mut db_url = Url::parse(url).context("failed to parse database url")?;
let backend_addr = db_url
.socket_addrs(|| Some(5432))
.context("could not resolve database url")?
.first()
.copied()
.ok_or_else(|| anyhow::anyhow!("the database url does not point to any IP"))?;
.ok_or_else(|| anyhow!("the database url does not point to any IP"))?;

let instance = ChaosProxy::new(backend_addr)?;

db_url
.set_ip_host(instance.address.ip())
.map_err(|_| anyhow!("Failed to set IP host on the URL"))?;

db_url
.set_port(Some(instance.address.port()))
.map_err(|_| anyhow!("Failed to set post on the URL"))?;

let instance = ChaosProxy::new(backend_addr).unwrap();
db_url.set_ip_host(instance.address.ip()).unwrap();
db_url.set_port(Some(instance.address.port())).unwrap();
Ok((instance, db_url.into()))
}

pub(crate) fn break_networking(&self) {
pub(crate) fn break_networking(&self) -> anyhow::Result<usize> {
self.break_networking_send
.send(())
.expect("failed to send the break_networking message");
.context("Failed to send the break_networking message")
}

pub(crate) fn restore_networking(&self) {
pub(crate) fn restore_networking(&self) -> anyhow::Result<usize> {
self.restore_networking_send
.send(())
.expect("failed to send the restore_networking message");
.context("Failed to send the restore_networking message")
}

async fn server_loop(self: Arc<Self>, initial_listener: TcpListener) -> Result<(), Error> {
async fn server_loop(&self, initial_listener: TcpListener) -> anyhow::Result<()> {
let mut listener = Some(initial_listener);

let mut break_networking_recv = self.break_networking_send.subscribe();
Expand All @@ -87,7 +95,7 @@ impl ChaosProxy {
if let Some(l) = &listener {
tokio::select! {
accepted = l.accept() => {
self.clone().accept_connection(accepted?.0).await?;
self.accept_connection(accepted?.0).await?;
},

_ = break_networking_recv.recv() => {
Expand All @@ -104,51 +112,53 @@ impl ChaosProxy {
}
}

async fn accept_connection(self: Arc<Self>, accepted: TcpStream) -> Result<(), Error> {
async fn accept_connection(&self, accepted: TcpStream) -> anyhow::Result<()> {
let (client_read, client_write) = accepted.into_split();
let (backend_read, backend_write) = TcpStream::connect(&self.backend_address)
.await?
.into_split();

let self_clone = self.clone();
let break_networking_send = self.break_networking_send.clone();
tokio::spawn(async move {
if let Err(err) = self_clone.proxy_data(client_read, backend_write).await {
eprintln!("ChaosProxy connection error: {err}");
if let Err(error) = proxy_data(break_networking_send, client_read, backend_write).await
{
error!(%error, "ChaosProxy connection error");
}
});

let self_clone = self.clone();
let break_networking_send = self.break_networking_send.clone();
tokio::spawn(async move {
if let Err(err) = self_clone.proxy_data(backend_read, client_write).await {
eprintln!("ChaosProxy connection error: {err}");
if let Err(error) = proxy_data(break_networking_send, backend_read, client_write).await
{
error!(%error, "ChaosProxy connection error");
}
});

Ok(())
}
}

async fn proxy_data(
&self,
mut from: OwnedReadHalf,
mut to: OwnedWriteHalf,
) -> Result<(), Error> {
let mut break_connections_recv = self.break_networking_send.subscribe();
let mut buf = [0; 1024];

loop {
tokio::select! {
len = from.read(&mut buf) => {
let len = len?;
if len == 0 {
// EOF, the socket was closed
return Ok(());
}
to.write_all(&buf[0..len]).await?;
}
_ = break_connections_recv.recv() => {
to.shutdown().await?;
async fn proxy_data(
break_networking_send: Sender<()>,
mut from: OwnedReadHalf,
mut to: OwnedWriteHalf,
) -> anyhow::Result<()> {
let mut break_connections_recv = break_networking_send.subscribe();
let mut buf = [0; 1024];

loop {
tokio::select! {
len = from.read(&mut buf) => {
let len = len?;
if len == 0 {
// EOF, the socket was closed
return Ok(());
}
to.write_all(&buf[0..len]).await?;
}
_ = break_connections_recv.recv() => {
to.shutdown().await?;
return Ok(());
}
}
}
Expand Down