Skip to content

Commit 5ee1e44

Browse files
committed
Add --retries parameter to CLI commands
1 parent 4b29e49 commit 5ee1e44

File tree

10 files changed

+301
-114
lines changed

10 files changed

+301
-114
lines changed

quickwit/Cargo.lock

Lines changed: 166 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,12 @@ rdkafka = { version = "0.33", default-features = false, features = [
200200
] }
201201
regex = "1.10.0"
202202
regex-syntax = "0.8"
203-
reqwest = { version = "0.11", default-features = false, features = [
203+
reqwest = { version = "0.12", default-features = false, features = [
204204
"json",
205205
"rustls-tls",
206206
] }
207+
reqwest-middleware = "0.4"
208+
reqwest-retry = "0.7"
207209
rust-embed = "6.8.1"
208210
rustls = "0.21"
209211
rustls-pemfile = "1.0.0"
@@ -304,8 +306,8 @@ azure_storage_blobs = { version = "0.13.0", default-features = false, features =
304306
"enable_reqwest_rustls",
305307
] }
306308

307-
opendal = { version = "0.44", default-features = false }
308-
reqsign = { version = "0.14", default-features = false }
309+
opendal = { version = "0.53", default-features = false }
310+
reqsign = { version = "0.16", default-features = false }
309311

310312
quickwit-actors = { path = "quickwit-actors" }
311313
quickwit-aws = { path = "quickwit-aws" }
@@ -316,10 +318,10 @@ quickwit-codegen-example = { path = "quickwit-codegen/example" }
316318
quickwit-common = { path = "quickwit-common" }
317319
quickwit-config = { path = "quickwit-config" }
318320
quickwit-control-plane = { path = "quickwit-control-plane" }
319-
quickwit-index-management = { path = "quickwit-index-management" }
320321
quickwit-datetime = { path = "quickwit-datetime" }
321322
quickwit-directories = { path = "quickwit-directories" }
322323
quickwit-doc-mapper = { path = "quickwit-doc-mapper" }
324+
quickwit-index-management = { path = "quickwit-index-management" }
323325
quickwit-indexing = { path = "quickwit-indexing" }
324326
quickwit-ingest = { path = "quickwit-ingest" }
325327
quickwit-integration-tests = { path = "quickwit-integration-tests" }

quickwit/quickwit-cli/src/index.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub fn build_index_command() -> Command {
6060
.display_order(1)
6161
.required(true),
6262
arg!(--overwrite "Overwrites pre-existing index. This will delete all existing data stored at `index-uri` before creating a new index.")
63+
.display_order(2)
6364
.required(false),
6465
])
6566
)
@@ -110,6 +111,7 @@ pub fn build_index_command() -> Command {
110111
.long_about("Displays descriptive statistics of an index. Displayed statistics are: number of published splits, number of documents, splits min/max timestamps, size of splits.")
111112
.args(&[
112113
arg!(--index <INDEX> "ID of the target index")
114+
.display_order(1)
113115
.required(true),
114116
])
115117
)
@@ -298,7 +300,7 @@ impl IndexCliCommand {
298300
let client_args = ClientArgs::parse(&mut matches)?;
299301
let index_id = matches
300302
.remove_one::<String>("index")
301-
.expect("`index` should be a required arg.");
303+
.expect("`index` should be a required arg");
302304
let assume_yes = matches.get_flag("yes");
303305
Ok(Self::Clear(ClearIndexArgs {
304306
client_args,
@@ -312,7 +314,7 @@ impl IndexCliCommand {
312314
let index_config_uri = matches
313315
.remove_one::<String>("index-config")
314316
.map(|uri| Uri::from_str(&uri))
315-
.expect("`index-config` should be a required arg.")?;
317+
.expect("`index-config` should be a required arg")?;
316318
let overwrite = matches.get_flag("overwrite");
317319
let assume_yes = matches.get_flag("yes");
318320

@@ -328,11 +330,11 @@ impl IndexCliCommand {
328330
let client_args = ClientArgs::parse(&mut matches)?;
329331
let index_id = matches
330332
.remove_one::<String>("index")
331-
.expect("`index` should be a required arg.");
333+
.expect("`index` should be a required arg");
332334
let index_config_uri = matches
333335
.remove_one::<String>("index-config")
334336
.map(|uri| Uri::from_str(&uri))
335-
.expect("`index-config` should be a required arg.")?;
337+
.expect("`index-config` should be a required arg")?;
336338
let assume_yes = matches.get_flag("yes");
337339

338340
Ok(Self::Update(UpdateIndexArgs {
@@ -347,7 +349,8 @@ impl IndexCliCommand {
347349
let client_args = ClientArgs::parse(&mut matches)?;
348350
let index_id = matches
349351
.remove_one::<String>("index")
350-
.expect("`index` should be a required arg.");
352+
.expect("`index` should be a required arg");
353+
351354
Ok(Self::Describe(DescribeIndexArgs {
352355
client_args,
353356
index_id,
@@ -363,7 +366,7 @@ impl IndexCliCommand {
363366
let client_args = ClientArgs::parse_for_ingest(&mut matches)?;
364367
let index_id = matches
365368
.remove_one::<String>("index")
366-
.expect("`index` should be a required arg.");
369+
.expect("`index` should be a required arg");
367370
let input_path_opt = if let Some(input_path) = matches.remove_one::<String>("input-path") {
368371
Uri::from_str(&input_path)?
369372
.filepath()
@@ -401,7 +404,7 @@ impl IndexCliCommand {
401404
fn parse_search_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
402405
let index_id = matches
403406
.remove_one::<String>("index")
404-
.expect("`index` should be a required arg.");
407+
.expect("`index` should be a required arg");
405408
let query = matches
406409
.remove_one::<String>("query")
407410
.context("`query` should be a required arg")?;
@@ -450,7 +453,7 @@ impl IndexCliCommand {
450453
let client_args = ClientArgs::parse(&mut matches)?;
451454
let index_id = matches
452455
.remove_one::<String>("index")
453-
.expect("`index` should be a required arg.");
456+
.expect("`index` should be a required arg");
454457
let dry_run = matches.get_flag("dry-run");
455458
let assume_yes = matches.get_flag("yes");
456459
Ok(Self::Delete(DeleteIndexArgs {

quickwit/quickwit-cli/src/lib.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ fn client_args() -> Vec<Arg> {
9494
.required(false)
9595
.global(true)
9696
.display_order(3),
97+
Arg::new("retries")
98+
.long("retries")
99+
.help(
100+
"Maximum number of retries for transient errors. Default value is 0. The total \
101+
number of attempts will be `1 + RETRIES`.",
102+
)
103+
.required(false)
104+
.global(true)
105+
.default_value("0")
106+
.display_order(4),
97107
]
98108
}
99109

@@ -103,6 +113,7 @@ pub struct ClientArgs {
103113
pub connect_timeout: Option<Timeout>,
104114
pub timeout: Option<Timeout>,
105115
pub commit_timeout: Option<Timeout>,
116+
pub num_retries: u32,
106117
}
107118

108119
impl Default for ClientArgs {
@@ -112,6 +123,7 @@ impl Default for ClientArgs {
112123
connect_timeout: None,
113124
timeout: None,
114125
commit_timeout: None,
126+
num_retries: 0,
115127
}
116128
}
117129
}
@@ -130,7 +142,7 @@ impl ClientArgs {
130142
if let Some(commit_timeout) = self.commit_timeout {
131143
builder = builder.commit_timeout(commit_timeout);
132144
}
133-
builder
145+
builder.num_retries(self.num_retries)
134146
}
135147

136148
pub fn client(self) -> QuickwitClient {
@@ -149,7 +161,7 @@ impl ClientArgs {
149161
let cluster_endpoint = matches
150162
.remove_one::<String>("endpoint")
151163
.map(|endpoint_str| Url::from_str(&endpoint_str))
152-
.expect("`endpoint` should be a required arg.")?;
164+
.expect("`endpoint` should be a required arg")?;
153165
let connect_timeout =
154166
if let Some(duration) = matches.remove_one::<String>("connect-timeout") {
155167
Some(parse_duration_or_none(&duration)?)
@@ -170,11 +182,16 @@ impl ClientArgs {
170182
} else {
171183
None
172184
};
185+
let num_retries = matches
186+
.remove_one::<String>("retries")
187+
.map(|retries| retries.parse::<u32>())
188+
.expect("`retries` should have a default value")?;
173189
Ok(Self {
174190
cluster_endpoint,
175191
connect_timeout,
176192
timeout,
177193
commit_timeout,
194+
num_retries,
178195
})
179196
}
180197
}

quickwit/quickwit-cli/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ mod tests {
226226
"wikipedia",
227227
"--endpoint",
228228
"http://127.0.0.1:8000",
229+
"--retries",
230+
"2",
229231
])?;
230232
let command = CliCommand::parse_cli_args(matches)?;
231233
assert!(matches!(
@@ -243,6 +245,7 @@ mod tests {
243245
&& client_args.connect_timeout.is_none()
244246
&& client_args.commit_timeout.is_none()
245247
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:8000").unwrap()
248+
&& client_args.num_retries == 2
246249
));
247250

248251
let app = build_cli().no_binary_name(true);
@@ -272,6 +275,7 @@ mod tests {
272275
&& client_args.timeout.is_none()
273276
&& client_args.connect_timeout.is_none()
274277
&& client_args.commit_timeout.is_none()
278+
&& client_args.num_retries == 0
275279
&& batch_size_limit == ByteSize::mb(8)
276280
));
277281

@@ -301,6 +305,7 @@ mod tests {
301305
&& client_args.timeout.is_none()
302306
&& client_args.connect_timeout.is_none()
303307
&& client_args.commit_timeout.is_none()
308+
&& client_args.num_retries == 0
304309
&& batch_size_limit == ByteSize::kb(4)
305310
));
306311

@@ -331,6 +336,7 @@ mod tests {
331336
&& client_args.timeout == Some(Timeout::from_secs(10))
332337
&& client_args.connect_timeout == Some(Timeout::from_secs(2))
333338
&& client_args.commit_timeout.is_none()
339+
&& client_args.num_retries == 0
334340
));
335341

336342
let app = build_cli().no_binary_name(true);

quickwit/quickwit-rest-client/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ license.workspace = true
1313
[dependencies]
1414
bytes = { workspace = true }
1515
reqwest = { workspace = true }
16+
reqwest-middleware = { workspace = true }
17+
reqwest-retry = { workspace = true }
1618
serde = { workspace = true }
1719
serde_json = { workspace = true }
1820
thiserror = { workspace = true }
@@ -30,6 +32,7 @@ quickwit-search = { workspace = true }
3032
quickwit-serve = { workspace = true }
3133

3234
[dev-dependencies]
35+
http = { workspace = true }
3336
wiremock = { workspace = true }
3437

3538
quickwit-config = { workspace = true, features = ["testsuite"] }

quickwit/quickwit-rest-client/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub enum Error {
3636
// Json serialization/deserialization error.
3737
#[error("Serde JSON error: {0}")]
3838
Json(#[from] serde_json::error::Error),
39+
// Error returned by reqwest middleware.
40+
#[error(transparent)]
41+
Middleware(#[from] reqwest_middleware::Error),
3942
// Error returned by url lib when parsing a string.
4043
#[error("URL parsing error: {0}")]
4144
UrlParse(String),

quickwit/quickwit-rest-client/src/rest_client.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use quickwit_serve::{
2727
};
2828
use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE};
2929
use reqwest::tls::Certificate;
30-
use reqwest::{Client, ClientBuilder, Method, StatusCode, Url};
30+
use reqwest::{ClientBuilder as ReqwestClientBuilder, Method, StatusCode, Url};
31+
use reqwest_middleware::{ClientBuilder as ReqwestMiddlewareClientBuilder, ClientWithMiddleware};
32+
use reqwest_retry::policies::ExponentialBackoff;
33+
use reqwest_retry::RetryTransientMiddleware;
3134
use serde::Serialize;
3235
use serde_json::json;
3336

@@ -47,28 +50,41 @@ pub const DEFAULT_CLIENT_COMMIT_TIMEOUT: Timeout = Timeout::from_mins(30);
4750
struct Transport {
4851
base_url: Url,
4952
api_url: Url,
50-
client: Client,
53+
client: ClientWithMiddleware,
5154
}
5255

5356
impl Transport {
54-
fn new(endpoint: Url, connect_timeout: Timeout, ca_cert: Option<Certificate>) -> Self {
57+
fn new(
58+
endpoint: Url,
59+
connect_timeout: Timeout,
60+
ca_cert: Option<Certificate>,
61+
num_retries: u32,
62+
) -> Self {
5563
let base_url = endpoint;
5664
let api_url = base_url
5765
.join("api/v1/")
58-
.expect("Endpoint should not be malformed.");
59-
let mut client_builder = ClientBuilder::new();
66+
.expect("root url should be well-formed");
67+
let mut reqwest_client_builder = ReqwestClientBuilder::new();
6068
if let Some(duration) = connect_timeout.as_duration_opt() {
61-
client_builder = client_builder.connect_timeout(duration);
69+
reqwest_client_builder = reqwest_client_builder.connect_timeout(duration);
6270
}
6371
if let Some(ca_cert) = ca_cert {
64-
client_builder = client_builder
72+
reqwest_client_builder = reqwest_client_builder
6573
.tls_built_in_root_certs(false)
6674
.add_root_certificate(ca_cert);
6775
}
76+
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(num_retries);
77+
let retry_transient_middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
78+
let reqwest_client = reqwest_client_builder
79+
.build()
80+
.expect("`client_builder.build()` should not fail");
81+
let client = ReqwestMiddlewareClientBuilder::new(reqwest_client)
82+
.with(retry_transient_middleware)
83+
.build();
6884
Self {
6985
base_url,
7086
api_url,
71-
client: client_builder.build().expect("Client should be built."),
87+
client,
7288
}
7389
}
7490

@@ -129,6 +145,8 @@ pub struct QuickwitClientBuilder {
129145
detailed_response: bool,
130146
/// Validate against a custom TLS certificate authority
131147
ca_cert: Option<Certificate>,
148+
/// Maximum number of retries for transient errors.
149+
num_retries: u32,
132150
}
133151

134152
impl QuickwitClientBuilder {
@@ -143,6 +161,7 @@ impl QuickwitClientBuilder {
143161
use_legacy_ingest: false,
144162
detailed_response: false,
145163
ca_cert: None,
164+
num_retries: 0,
146165
}
147166
}
148167

@@ -187,8 +206,18 @@ impl QuickwitClientBuilder {
187206
self
188207
}
189208

209+
pub fn num_retries(mut self, num_retries: u32) -> Self {
210+
self.num_retries = num_retries;
211+
self
212+
}
213+
190214
pub fn build(self) -> QuickwitClient {
191-
let transport = Transport::new(self.base_url, self.connect_timeout, self.ca_cert);
215+
let transport = Transport::new(
216+
self.base_url,
217+
self.connect_timeout,
218+
self.ca_cert,
219+
self.num_retries,
220+
);
192221
QuickwitClient {
193222
transport,
194223
timeout: self.timeout,
@@ -744,6 +773,7 @@ mod test {
744773
use std::path::PathBuf;
745774
use std::str::FromStr;
746775

776+
use http::StatusCode;
747777
use quickwit_config::{ConfigFormat, SourceConfig};
748778
use quickwit_indexing::mock_split;
749779
use quickwit_ingest::CommitType;
@@ -752,7 +782,7 @@ mod test {
752782
ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString,
753783
};
754784
use reqwest::header::CONTENT_TYPE;
755-
use reqwest::{StatusCode, Url};
785+
use reqwest::Url;
756786
use serde_json::json;
757787
use tokio::fs::File;
758788
use tokio::io::AsyncReadExt;
@@ -771,9 +801,7 @@ mod test {
771801
let server_url = Url::parse(&format!("http://127.0.0.1:{port}")).unwrap();
772802
let qw_client = QuickwitClientBuilder::new(server_url).build();
773803
let error = qw_client.indexes().list().await.unwrap_err();
774-
775-
assert!(matches!(error, Error::Client(_)));
776-
assert!(error.to_string().contains("tcp connect error"));
804+
assert!(matches!(error, Error::Middleware(_)));
777805
}
778806

779807
#[tokio::test]

0 commit comments

Comments
 (0)