Skip to content

Implement version_id cache for download endpoint using moka crate #3999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 260 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ tikv-jemallocator = { version = "0.4.1", features = ['unprefixed_malloc_on_suppo
lettre = { version = "0.10.0-rc.4", default-features = false, features = ["file-transport", "smtp-transport", "native-tls", "hostname", "builder"] }
license-exprs = "1.6"
minijinja = "0.8.1"
moka = "0.6.1"
oauth2 = { version = "4.1.0", default-features = false, features = ["reqwest"] }
parking_lot = "0.11"
prometheus = { version = "0.13.0", default-features = false }
Expand Down
12 changes: 12 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::email::Emails;
use crate::github::GitHubClient;
use crate::metrics::{InstanceMetrics, ServiceMetrics};
use diesel::r2d2;
use moka::sync::{Cache, CacheBuilder};
use oauth2::basic::BasicClient;
use reqwest::blocking::Client;
use scheduled_thread_pool::ScheduledThreadPool;
Expand All @@ -31,6 +32,12 @@ pub struct App {
/// The server configuration
pub config: config::Server,

/// Cache the `version_id` of a `canonical_crate_name:semver` pair
///
/// This is used by the download endpoint to reduce the number of database queries. The
/// `version_id` is only cached under the canonical spelling of the crate name.
pub(crate) version_id_cacher: Cache<(String, String), i32>,

/// Count downloads and periodically persist them in the database
pub downloads_counter: DownloadsCounter,

Expand Down Expand Up @@ -148,12 +155,17 @@ impl App {
None
};

let version_id_cacher = CacheBuilder::new(config.version_id_cache_size)
.time_to_live(config.version_id_cache_ttl)
.build();

App {
primary_database,
read_only_replica_database: replica_database,
github,
github_oauth,
config,
version_id_cacher,
downloads_counter: DownloadsCounter::new(),
emails: Emails::from_environment(),
service_metrics: ServiceMetrics::new().expect("could not initialize service metrics"),
Expand Down
11 changes: 11 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ mod database_pools;
pub use self::base::Base;
pub use self::database_pools::DatabasePools;
use std::collections::HashSet;
use std::time::Duration;

const DEFAULT_VERSION_ID_CACHE_SIZE: usize = 10_000;
const DEFAULT_VERSION_ID_CACHE_TTL: u64 = 5 * 60; // 5 minutes

pub struct Server {
pub base: Base,
Expand All @@ -30,6 +34,8 @@ pub struct Server {
pub instance_metrics_log_every_seconds: Option<u64>,
pub force_unconditional_redirects: bool,
pub blocked_routes: HashSet<String>,
pub version_id_cache_size: usize,
pub version_id_cache_ttl: Duration,
}

impl Default for Server {
Expand Down Expand Up @@ -107,6 +113,11 @@ impl Default for Server {
blocked_routes: env_optional("BLOCKED_ROUTES")
.map(|routes: String| routes.split(',').map(|s| s.into()).collect())
.unwrap_or_else(HashSet::new),
version_id_cache_size: env_optional("VERSION_ID_CACHE_SIZE")
.unwrap_or(DEFAULT_VERSION_ID_CACHE_SIZE),
version_id_cache_ttl: Duration::from_secs(
env_optional("VERSION_ID_CACHE_TTL").unwrap_or(DEFAULT_VERSION_ID_CACHE_TTL),
),
}
}
}
Expand Down
137 changes: 75 additions & 62 deletions src/controllers/version/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,75 +18,88 @@ pub fn download(req: &mut dyn RequestExt) -> EndpointResult {
let mut crate_name = req.params()["crate_id"].clone();
let version = req.params()["version"].as_str();

// When no database connection is ready unconditional redirects will be performed. This could
// happen if the pool is not healthy or if an operator manually configured the application to
// always perform unconditional redirects (for example as part of the mitigations for an
// outage). See the comments below for a description of what unconditional redirects do.
let conn = if app.config.force_unconditional_redirects {
None
} else {
match req.db_conn() {
Ok(conn) => Some(conn),
Err(PoolError::UnhealthyPool) => None,
Err(err) => return Err(err.into()),
}
};

let mut log_metadata = None;
if let Some(conn) = &conn {
use self::versions::dsl::*;

// Returns the crate name as stored in the database, or an error if we could
// not load the version ID from the database.
let (version_id, canonical_crate_name) = app
.instance_metrics
.downloads_select_query_execution_time
.observe_closure_duration(|| {
versions
.inner_join(crates::table)
.select((id, crates::name))
.filter(Crate::with_name(&crate_name))
.filter(num.eq(version))
.first::<(i32, String)>(&**conn)
})?;

if canonical_crate_name != crate_name {
app.instance_metrics
.downloads_non_canonical_crate_name_total
.inc();
log_metadata = Some(("bot", "dl"));
}
crate_name = canonical_crate_name;

let cache_key = (crate_name.to_string(), version.to_string());
if let Some(version_id) = app.version_id_cacher.get(&cache_key) {
app.instance_metrics.version_id_cache_hits.inc();

// The increment does not happen instantly, but it's deferred to be executed in a batch
// along with other downloads. See crate::downloads_counter for the implementation.
app.downloads_counter.increment(version_id);
} else {
// The download endpoint is the most critical route in the whole crates.io application,
// as it's relied upon by users and automations to download crates. Keeping it working
// is the most important thing for us.
//
// The endpoint relies on the database to fetch the canonical crate name (with the
// right capitalization and hyphenation), but that's only needed to serve clients who
// don't call the endpoint with the crate's canonical name.
//
// Thankfully Cargo always uses the right name when calling the endpoint, and we can
// keep it working during a full database outage by unconditionally redirecting without
// checking whether the crate exists or the rigth name is used. Non-Cargo clients might
// get a 404 response instead of a 500, but that's worth it.
//
// Without a working database we also can't count downloads, but that's also less
// critical than keeping Cargo downloads operational.

app.instance_metrics
.downloads_unconditional_redirects_total
.inc();
log_metadata = Some(("unconditional_redirect", "true"));
}
app.instance_metrics.version_id_cache_misses.inc();

// When no database connection is ready unconditional redirects will be performed. This could
// happen if the pool is not healthy or if an operator manually configured the application to
// always perform unconditional redirects (for example as part of the mitigations for an
// outage). See the comments below for a description of what unconditional redirects do.
let conn = if app.config.force_unconditional_redirects {
None
} else {
match req.db_conn() {
Ok(conn) => Some(conn),
Err(PoolError::UnhealthyPool) => None,
Err(err) => return Err(err.into()),
}
};

if let Some(conn) = &conn {
use self::versions::dsl::*;

// Returns the crate name as stored in the database, or an error if we could
// not load the version ID from the database.
let (version_id, canonical_crate_name) = app
.instance_metrics
.downloads_select_query_execution_time
.observe_closure_duration(|| {
versions
.inner_join(crates::table)
.select((id, crates::name))
.filter(Crate::with_name(&crate_name))
.filter(num.eq(version))
.first::<(i32, String)>(&**conn)
})?;

if canonical_crate_name != crate_name {
app.instance_metrics
.downloads_non_canonical_crate_name_total
.inc();
log_metadata = Some(("bot", "dl"));
crate_name = canonical_crate_name;
} else {
// The version_id is only cached if the provided crate name was canonical.
// Non-canonical requests fallback to the "slow" path with a DB query, but
// we typically only get a few hundred non-canonical requests in a day anyway.
app.version_id_cacher.insert(cache_key, version_id);
}

// The increment does not happen instantly, but it's deferred to be executed in a batch
// along with other downloads. See crate::downloads_counter for the implementation.
app.downloads_counter.increment(version_id);
} else {
// The download endpoint is the most critical route in the whole crates.io application,
// as it's relied upon by users and automations to download crates. Keeping it working
// is the most important thing for us.
//
// The endpoint relies on the database to fetch the canonical crate name (with the
// right capitalization and hyphenation), but that's only needed to serve clients who
// don't call the endpoint with the crate's canonical name.
//
// Thankfully Cargo always uses the right name when calling the endpoint, and we can
// keep it working during a full database outage by unconditionally redirecting without
// checking whether the crate exists or the rigth name is used. Non-Cargo clients might
// get a 404 response instead of a 500, but that's worth it.
//
// Without a working database we also can't count downloads, but that's also less
// critical than keeping Cargo downloads operational.

// Ensure the connection is released to the pool as soon as possible, as the download endpoint
// covers the majority of our traffic and we don't want it to starve other requests.
drop(conn);
app.instance_metrics
.downloads_unconditional_redirects_total
.inc();
log_metadata = Some(("unconditional_redirect", "true"));
}
};

let redirect_url = req
.app()
Expand Down
5 changes: 5 additions & 0 deletions src/metrics/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ metrics! {
pub downloads_select_query_execution_time: Histogram,
/// Number of download requests that are not counted yet.
downloads_not_counted_total: IntGauge,

/// Number of version ID cache hits on the download endpoint.
pub version_id_cache_hits: IntCounter,
/// Number of version ID cache misses on the download endpoint.
pub version_id_cache_misses: IntCounter,
}

// All instance metrics will be prefixed with this namespace.
Expand Down
113 changes: 77 additions & 36 deletions src/tests/krate/downloads.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::builders::{CrateBuilder, VersionBuilder};
use crate::util::{RequestHelper, TestApp};
use crate::util::{MockAnonymousUser, RequestHelper, TestApp};
use cargo_registry::views::EncodableVersionDownload;
use chrono::{Duration, Utc};
use http::StatusCode;
Expand All @@ -9,6 +9,35 @@ struct Downloads {
version_downloads: Vec<EncodableVersionDownload>,
}

fn persist_downloads_count(app: &TestApp) {
app.as_inner()
.downloads_counter
.persist_all_shards(app.as_inner())
.expect("failed to persist downloads count")
.log();
}

#[track_caller]
fn assert_dl_count(
anon: &MockAnonymousUser,
name_and_version: &str,
query: Option<&str>,
count: i32,
) {
let url = format!("/api/v1/crates/{}/downloads", name_and_version);
let downloads: Downloads = if let Some(query) = query {
anon.get_with_query(&url, query).good()
} else {
anon.get(&url).good()
};
let total_downloads = downloads
.version_downloads
.iter()
.map(|vd| vd.downloads)
.sum::<i32>();
assert_eq!(total_downloads, count);
}

#[test]
fn download() {
let (app, anon, user) = TestApp::init().with_user();
Expand All @@ -20,60 +49,37 @@ fn download() {
.expect_build(conn);
});

let assert_dl_count = |name_and_version: &str, query: Option<&str>, count: i32| {
let url = format!("/api/v1/crates/{}/downloads", name_and_version);
let downloads: Downloads = if let Some(query) = query {
anon.get_with_query(&url, query).good()
} else {
anon.get(&url).good()
};
let total_downloads = downloads
.version_downloads
.iter()
.map(|vd| vd.downloads)
.sum::<i32>();
assert_eq!(total_downloads, count);
};

let download = |name_and_version: &str| {
let url = format!("/api/v1/crates/{}/download", name_and_version);
let response = anon.get::<()>(&url);
assert_eq!(response.status(), StatusCode::FOUND);
// TODO: test the with_json code path
};

let persist_downloads_count = || {
app.as_inner()
.downloads_counter
.persist_all_shards(app.as_inner())
.expect("failed to persist downloads count")
.log();
};

download("foo_download/1.0.0");
// No downloads are counted until the counters are persisted
assert_dl_count("foo_download/1.0.0", None, 0);
assert_dl_count("foo_download", None, 0);
persist_downloads_count();
assert_dl_count(&anon, "foo_download/1.0.0", None, 0);
assert_dl_count(&anon, "foo_download", None, 0);
persist_downloads_count(&app);
// Now that the counters are persisted the download counts show up.
assert_dl_count("foo_download/1.0.0", None, 1);
assert_dl_count("foo_download", None, 1);
assert_dl_count(&anon, "foo_download/1.0.0", None, 1);
assert_dl_count(&anon, "foo_download", None, 1);

download("FOO_DOWNLOAD/1.0.0");
persist_downloads_count();
assert_dl_count("FOO_DOWNLOAD/1.0.0", None, 2);
assert_dl_count("FOO_DOWNLOAD", None, 2);
persist_downloads_count(&app);
assert_dl_count(&anon, "FOO_DOWNLOAD/1.0.0", None, 2);
assert_dl_count(&anon, "FOO_DOWNLOAD", None, 2);

let yesterday = (Utc::today() + Duration::days(-1)).format("%F");
let query = format!("before_date={}", yesterday);
assert_dl_count("FOO_DOWNLOAD/1.0.0", Some(&query), 0);
assert_dl_count(&anon, "FOO_DOWNLOAD/1.0.0", Some(&query), 0);
// crate/downloads always returns the last 90 days and ignores date params
assert_dl_count("FOO_DOWNLOAD", Some(&query), 2);
assert_dl_count(&anon, "FOO_DOWNLOAD", Some(&query), 2);

let tomorrow = (Utc::today() + Duration::days(1)).format("%F");
let query = format!("before_date={}", tomorrow);
assert_dl_count("FOO_DOWNLOAD/1.0.0", Some(&query), 2);
assert_dl_count("FOO_DOWNLOAD", Some(&query), 2);
assert_dl_count(&anon, "FOO_DOWNLOAD/1.0.0", Some(&query), 2);
assert_dl_count(&anon, "FOO_DOWNLOAD", Some(&query), 2);
}

#[test]
Expand Down Expand Up @@ -136,3 +142,38 @@ fn force_unconditional_redirect() {
anon.get::<()>("/api/v1/crates/bar-download/1.0.0/download")
.assert_redirect_ends_with("/crates/bar-download/bar-download-1.0.0.crate");
}

#[test]
fn download_caches_version_id() {
use cargo_registry::schema::crates::dsl::*;
use diesel::prelude::*;

let (app, anon, user) = TestApp::init().with_user();
let user = user.as_model();

app.db(|conn| {
CrateBuilder::new("foo_download", user.id)
.version(VersionBuilder::new("1.0.0"))
.expect_build(conn);
});

anon.get::<()>("/api/v1/crates/foo_download/1.0.0/download")
.assert_redirect_ends_with("/crates/foo_download/foo_download-1.0.0.crate");

// Rename the crate, so that `foo_download` will not be found if its version_id was not cached
app.db(|conn| {
diesel::update(crates.filter(name.eq("foo_download")))
.set(name.eq("other"))
.execute(conn)
.unwrap();
});

// This would result in a 404 if the endpoint tried to read from the database
anon.get::<()>("/api/v1/crates/foo_download/1.0.0/download")
.assert_redirect_ends_with("/crates/foo_download/foo_download-1.0.0.crate");

// Downloads are persisted by version_id, so the rename doesn't matter
persist_downloads_count(&app);
// Check download count against the new name, rather than rename it back to the original value
assert_dl_count(&anon, "other/1.0.0", None, 2);
}
Loading